Версия:

Модуль expirationd

Модуль expirationd

Рассмотрим expirationd – пример Lua-модуля для промышленной эксплуатации, который работает с Tarantool’ом – Tarantool предоставляет его с лицензией Artistic на GitHub. Программа expirationd.lua довольно объемная (около 500 строк), поэтому здесь мы остановимся на пунктах, знания о которых можно расширить, позднее изучив программу полностью.

task.worker_fiber = fiber.create(worker_loop, task)
 log.info("expiration: task %q restarted", task.name)
 ...
 fiber.sleep(expirationd.constants.check_interval)
 ...

Если в Tarantool’е упоминается «демон», то речь идет об использовании файбера. Программа создает файбер и передает управление так, что он периодически запускается, уходит в режим ожидания, а затем повторяет эти действия.

for _, tuple in scan_space.index[0]:pairs(nil, {iterator = box.index.ALL}) do
 ...
         if task.is_tuple_expired(task.args, tuple) then
         task.expired_tuples_count = task.expired_tuples_count + 1
         task.process_expired_tuple(task.space_id, task.args, tuple)
 ...

Команду «for» можно перевести как «выполнить итерацию по индексу сканируемого спейса», а внутри – если кортеж «неактуален» (например, если в кортеже есть поле метки времени, которое меньше текущего времени), то обработать кортеж как неактуальный кортеж.

-- функция обработки неактуального кортежа
 local function default_tuple_drop(space_id, args, tuple)
     local key = fun.map(
         function(x) return tuple[x.fieldno] end,
         box.space[space_id].index[0].parts
     ):totable()
     box.space[space_id]:delete(key)
 end

В конечном итоге, обработка неактуального кортежа приводит к default_tuple_drop(), что приводит к удалению кортежа из первоначального спейса. Сначала используется модуль fun, в частности fun.map. Учитывая, что index[0] всегда является первичным ключом спейса, а index[0].parts[N].fieldno всегда является номером поля для компонента ключа N, функция fun.map() создает таблицу из первичных значений кортежа. Результат fun.map() передается в space_object:delete().

local function expirationd_run_task(name, space_id, is_tuple_expired, options)
 ...

На этом этапе ясно, что expirationd.lua запускает фоновый процесс (файбер), который выполняет итерацию по всем кортежам в спейсе, в рамках кооперативной многозадачности уходит в режим ожидания, чтобы другие файберы могли работать одновременно с ним, а когда находит неактуальный кортеж, удаляет его из спейса. Теперь функцию «expirationd_run_task()» можно использовать в тестировании, где создаются образцы данных, некоторое время работает демон, и выводятся результаты.

Если вы хотите увидеть, как все работает, обратите внимание на нижеприведенные шаги по включению expirationd в тестирование.

  1. Найдите expirationd.lua. Можно воспользоваться стандартным способом, поскольку модуль включен в общий список модулей, но для этой цели просто скопируйте содержимое expirationd.lua в директорию, используемую по умолчанию.
  2. Запустите Tarantool-сервер, как описано выше.
  3. Выполните следующие запросы:
fiber = require('fiber')
 expd = require('expirationd')
 box.cfg{}
 e = box.schema.space.create('expirationd_test')
 e:create_index('primary', {type = 'hash', parts = {1, 'unsigned'}})
 e:replace{1, fiber.time() + 3}
 e:replace{2, fiber.time() + 30}
 function is_tuple_expired(args, tuple)
   if (tuple[2] < fiber.time()) then return true end
   return false
   end
 expd.run_task('expirationd_test', e.id, is_tuple_expired)
 retval = {}
 fiber.sleep(2)
 expd.task_stats()
 fiber.sleep(2)
 expd.task_stats()
 expd.kill_task('expirationd_test')
 e:drop()
 os.exit()

Запросы в работе с базой данных (cfg, space.create, create_index) уже должны быть вам знакомы.

В expirationd передается функция is_tuple_expired, которая задает следующее условие: если второе поле кортежа меньше текущего времени , вернуть true (правда), в противном случае, вернуть false (ложь).

Ключ к запуску модуля – expd = require('expirationd'). Функция «require» – это именно то, что выполняет чтение в программе. Она появится и в дальнейших примерах в данном руководстве, когда будет необходимо запустить модуль, не входящий в ядро Tarantool’а. После того, как Lua-переменной expd присваивается значение модуля expirationd, можно вызвать функцию модуля run_task().

После ухода в режим ожидания на две секунды, когда проводится итерация по спейсам, expd.task_stats() выведет отчет о количестве неактуальных кортежей – «expired_count: 0». После ожидания в течение еще двух секунд expd.task_stats() выведет отчет о количестве неактуальных кортежей – «expired_count: 1». Это показывает, что функция is_tuple_expired() с течением времени вернула «true» для одного из кортежей, поскольку поле метки времени было дольше трех секунд.

Конечно, expirationd можно настроить на выполнение различных задач с помощью разных параметров, что будет очевидно после более детального изучения исходного кода.