Версия:

Модуль fiber

Модуль fiber

Общие сведения

С помощью модуля fiber можно:

  • создавать, запускать и управлять файберами,
  • отправлять и получать сообщения для различных процессов (например, разные соединения, сессии или файберы) по каналам, а также
  • использовать механизм синхронизации для файберов, аналогично работе «условных переменных» и функций операционных систем, таких как pthread_cond_wait() плюс pthread_cond_signal().

Индекс

Ниже приведен перечень всех функций и элементов модуля fiber.

Имя Использование
fiber.create() Создание и запуск файбера
fiber.new() Create but do not start a fiber
fiber.self() Получение объекта файбера
fiber.find() Получение объекта файбера по ID
fiber.sleep() Перевод файбера в режим ожидания
fiber.yield() Передача управления
fiber.status() Получение статуса активного файбера
fiber.info() Получение информации о всех файберах
fiber.kill() Отмена файбера
fiber.testcancel() Проверка отмены действующего файбера
fiber_object:id() Получение ID файбера
fiber_object:name() Получение имени файбера
fiber_object:name(name) Назначение имени файбера
fiber_object:status() Получение статуса файбера
fiber_object:cancel() Отмена файбера
fiber_object.storage Локальное хранилище в пределах файбера
fiber_object:set_joinable() Make it possible for a new fiber to join
fiber_object:join() Wait for a fiber’s state to become „dead“
fiber.time() Получение системного времени в секундах
fiber.time64() Получение системного времени в микросекундах
fiber.channel() Создание канала связи
channel_object:put() Отправка сообщения по каналу связи
channel_object:close() Закрытие канала
channel_object:get() Перехват сообщения из канала
channel_object:is_empty() Проверка пустоты канала
channel_object:count() Подсчет сообщений в канале
channel_object:is_full() Проверка заполненности канала
channel_object:has_readers() Проверка пустого канала на наличие читателей в состоянии ожидания
channel_object:has_writers() Проверка полного канала на наличие писателей в состоянии ожидания
channel_object:is_closed() Проверка закрытия канала
fiber.cond() Создание условной переменной
cond_object:wait() Перевод файбера в режим ожидания до пробуждения другим файбером
cond_object:signal() Пробуждение отдельного файбера
cond_object:broadcast() Пробуждение всех файберов

Файберы

Файбер – это набор инструкций, которые выполняются по принципу кооперативной многозадачности. Файберы, управление которых происходит с помощью модуля fiber, связаны с функцией под названием функция для файбера, которую задает пользователь.

A fiber has three possible states: running, suspended or dead. When a fiber is created with fiber.create(), it is running. When a fiber is created with fiber.new() or yields control with fiber.sleep(), it is suspended. When a fiber ends (because the fiber function ends), it is dead.

Все файберы составляют часть реестра файберов. Можно производить поиск по реестру с помощью fiber.find() по ID файбера (fid), который представляет собой числовой идентификатор.

Неконтролируемый файбер можно остановить с помощью fiber_object.cancel. Однако, функция fiber_object.cancel консультативна, то есть сработает только в том случае, если неконтролируемый файбер случайно вызовет fiber.testcancel(). Большинство функций типа box.*, например box.space…delete() или box.space…update(), действительно вызывают fiber.testcancel(), а box.space…select{} не вызовет. В действительности неконтролируемый файбер может перестать отвечать, если он производит большое количество вычислений и не проверяет вероятность отмены.

Другой потенциальной проблемой могут стать файберы, которые не включаются в расписание, поскольку они не подписаны ни на какие события, или потому что соответствующие события не происходят. Такие файберы можно в любое принудительно остановить с помощью fiber.kill(), потому что функция fiber.kill() отправляет асинхронное событие пробуждения на файбер, а fiber.testcancel() проверяет наступление такого события пробуждения.

Like all Lua objects, dead fibers are garbage collected. The Lua garbage collector frees pool allocator memory owned by the fiber, resets all fiber data, and returns the fiber (now called a fiber carcass) to the fiber pool. The carcass can be reused when another fiber is created.

У файбера есть все возможности сопрограммы (coroutine) на языке Lua, и все принципы программирования, которые применяются к сопрограммам на Lua, применимы и к файберам. Однако Tarantool расширил возможности файберов для внутреннего использования. Поэтому, несмотря на возможность и поддержку использования сопрограмм, рекомендуется использовать файберы.

fiber.create(function[, function-arguments])

Создание и запуск файбера. Происходит создание файбера, который незамедлительно начинает работу.

Параметры:
  • function – функция, которая будет связана с файбером
  • function-arguments – что передается в функцию
Возвращается:

созданный объект файбера

Тип возвращаемого значения:
 

пользовательские данные

Пример:

tarantool> fiber = require('fiber')
 ---
 ...
 tarantool> function function_name()
          >   fiber.sleep(1000)
          > end
 ---
 ...
 tarantool> fiber_object = fiber.create(function_name)
 ---
 ...
fiber.new(function[, function-arguments])

Create but do not start a fiber: the fiber is created but does not begin to run immediately – it waits until the fiber creator (that is, the job that is calling fiber.new()) yields. The initial fiber state is „suspended“. Thus fiber.new() differs slightly from fiber.create().

Ordinarily fiber.new() is used in conjunction with fiber_object:set_joinable() and fiber_object:join().

Параметры:
  • function – функция, которая будет связана с файбером
  • function-arguments – что передается в функцию
Возвращается:

созданный объект файбера

Тип возвращаемого значения:
 

пользовательские данные

Пример:

tarantool> fiber = require('fiber')
---
...
tarantool> function function_name()
         >   fiber.sleep(1000)
         > end
---
...
tarantool> fiber_object = fiber.new(function_name)
---
...
fiber.self()
Возвращается:объект файбера для запланированного на данный момент файбера.
Тип возвращаемого значения:
 пользовательские данные

Пример:

tarantool> fiber.self()
 ---
 - status: running
   name: interactive
   id: 101
 ...
fiber.find(id)
Параметры:
  • id – числовой идентификатор файбера.
Возвращается:

объект файбера для указанного файбера.

Тип возвращаемого значения:
 

пользовательские данные

Пример:

tarantool> fiber.find(101)
 ---
 - status: running
   name: interactive
   id: 101
 ...
fiber.sleep(time)

Передача управления планировщику и переход в режим ожидания на указанное количество секунд. Только текущий файбер можно перевести в режим ожидания.

Параметры:
  • time – количество секунд в режиме ожидания.

Пример:

tarantool> fiber.sleep(1.5)
 ---
 ...
fiber.yield()

Передача управления планировщику. Работает аналогично fiber.sleep(0), только fiber.sleep(0) зависит от таймера, fiber.yield() – нет.

Пример:

tarantool> fiber.yield()
 ---
 ...
fiber.status([fiber_object])

Return the status of the current fiber. Or, if optional fiber_object is passed, return the status of the specified fiber.

Возвращается:статус файбера: “dead” (недоступен), “suspended” (приостановлен) или “running” (активен).
Тип возвращаемого значения:
 string (строка)

Пример:

tarantool> fiber.status()
 ---
 - running
 ...
fiber.info()

Возврат информации о всех файберах.

Возвращается:количество переключений контекста, обратная трассировка, ID, общий объем памяти, объем используемой памяти, имя каждого файбера.
Тип возвращаемого значения:
 таблица

Пример:

tarantool> fiber.info()
 ---
 - 101:
     csw: 7
     backtrace: []
     fid: 101
     memory:
       total: 65776
       used: 0
     name: interactive
 ...
fiber.kill(id)

Поиск файбера по числовому идентификатору и его отмена. Другими словами, fiber.kill() объединяет в себе fiber.find() и fiber_object:cancel().

Параметры:
  • id – ID файбера для отмены.
Исключение:

указанный файбер отсутствует, или отмена невозможна.

Пример:

tarantool> fiber.kill(fiber.id()) -- функция с self может вызвать окончание программы
 ---
 - error: fiber is cancelled
 ...
fiber.testcancel()

Проверка отмены действующего файбера и выдача исключения, если файбер отменен.

Пример:

tarantool> fiber.testcancel()
 ---
 - error: fiber is cancelled
 ...
object fiber_object
fiber_object:id()
Параметры:
Возвращается:

ID файбера.

Тип возвращаемого значения:
 

число

fiber.self():id() can also be expressed as fiber.id().

Пример:

tarantool> fiber_object = fiber.self()
 ---
 ...
 tarantool> fiber_object:id()
 ---
 - 101
 ...
fiber_object:name()
Параметры:
Возвращается:

имя файбера.

Тип возвращаемого значения:
 

string (строка)

fiber.self():name() can also be expressed as fiber.name().

Пример:

tarantool> fiber.self():name()
 ---
 - interactive
 ...
fiber_object:name(name)

Изменение имени файбера. По умолчанию, файбер в интерактивном режиме экземпляра Tarantool’а называется „interactive“, а новые файберы, созданные с помощью fiber.create, называются „lua“. Переименование файберов позволяет легче различать их при использовании fiber.info.

Параметры:
Возвращается:

nil

Пример:

tarantool> fiber.self():name('non-interactive')
 ---
 ...
fiber_object:status()

Возврат статуса указанного файбера.

Параметры:
Возвращается:

статус файбера: “dead” (недоступен), “suspended” (приостановлен) или “running” (активен).

Тип возвращаемого значения:
 

string (строка)

fiber.self():status( can also be expressed as fiber.status().

Пример:

tarantool> fiber.self():status()
 ---
 - running
 ...
fiber_object:cancel()

Отмена файбера. Активные и приостановленные файберы можно отменить. После отмены файбера попытки работать с ним вызовут ошибку, например, вызов fiber_object:id() вызовет ошибку с указанием недоступности файбера error: the fiber is dead.

Параметры:
Возвращается:

nil

Возможные ошибки: нельзя отменить указанный объект файбера.

Пример:

tarantool> fiber.self():cancel() -- kill self, may make program end
---
- error: fiber is cancelled
...
fiber_object.storage

Local storage within the fiber. The storage can contain any number of named values, subject to memory limitations. Naming may be done with fiber_object.storage.name or fiber_object.storage['name']. or with a number fiber_object.storage[number]. Values may be either numbers or strings. The Lua garbage collector will mark or free the local storage when fiber_object:cancel() happens.

Пример:

tarantool> fiber = require('fiber')
---
...
tarantool> function f () fiber.sleep(1000); end
---
...
tarantool> fiber_function = fiber.create(f)
---
...
tarantool> fiber_function.storage.str1 = 'string'
---
...
tarantool> fiber_function.storage['str1']
---
- string
...
tarantool> fiber_function:cancel()
---
...
tarantool> fiber_function.storage['str1']
---
- error: '[string "return fiber_function.storage[''str1'']"]:1: the fiber is dead'
...

См. также box.session.storage.

fiber_object:set_joinable(true_or_false)

fiber_object:set_joinable(true) makes a fiber joinable; fiber_object:set_joinable(false) makes a fiber not joinable; the default is false.

A joinable fiber can be waited for, with fiber_object:join().

Best practice is to call fiber_object:set_joinable() before the fiber function begins to execute, because otherwise the fiber could become „dead“ before fiber_object:set_joinable() takes effect. The usual sequence could be:

  1. Call fiber.new() instead of fiber.create() to create a new fiber_object.

    Do not yield at this point, because that will cause the fiber function to begin.

  2. Call fiber_object:set_joinable(true) to make the new fiber_object joinable.

    Now it is safe to yield.

  3. Call fiber_object:join().

    Usually fiber_object:join() should be called, otherwise the fiber’s status may become „suspended“ when the fiber function ends, instead of „dead“.

Параметры:
  • true_or_false – the boolean value that changes the set_joinable flag
Возвращается:

nil

Пример:

The result of the following sequence of requests is:

  • the global variable d will be 6 (which proves that the function was not executed until after d was set to 1, when fiber.sleep(1) caused a yield);
  • fiber.status(fi2) will be „suspended“ (which proves that after the function was executed the fiber status did not change to „dead“).
fiber=require('fiber')
d=0
function fu2() d=d+5 end
fi2=fiber.new(fu2) fi2:set_joinable(true) d=1 fiber.sleep(1)
print(d)
fiber.status(fi2)
fiber_object:join()

«Join» a joinable fiber. That is, let the fiber’s function run and wait until the fiber’s status is „dead“ (normally a status becomes „dead“ when the function execution finishes). Joining will cause a yield, therefore, if the fiber is currently in a suspended state, execution of its fiber function will resume.

This kind of waiting is more convenient than going into a loop and periodically checking the status; however, it works only if the fiber was created with fiber.new() and was made joinable with fiber_object:set_joinable().

Возвращается:true if successful, false if not successful
Тип возвращаемого значения:
 boolean (логический)

Пример:

The result of the following sequence of requests is:

  • the first fiber.status() call returns „suspended“,
  • the join() call returns true,
  • the elapsed time is usually 5 seconds, and
  • the second fiber.status() call returns „dead“.

This proves that the join() does not return until the function – which sleeps 5 seconds – is „dead“.

fiber=require('fiber')
function fu2() fiber.sleep(5) end
fi2=fiber.new(fu2) fi2:set_joinable(true)
start_time = os.time()
fiber.status(fi2)
fi2:join()
print('elapsed = ' .. os.time() - start_time)
fiber.status(fi2)
fiber.time()
Возвращается:текущее системное время (в секундах с начала отсчета) в виде Lua-числа. Время берется из часов событийного цикла, поэтому вызов полезен лишь для создания искусственных ключей кортежа.
Тип возвращаемого значения:
 num

Пример:

tarantool> fiber.time(), fiber.time()
 ---
 - 1448466279.2415
 - 1448466279.2415
 ...
fiber.time64()
Возвращается:текущее системное время (в микросекундах с начала отсчета) в виде 64-битного целого числа. Время берется из часов событийного цикла.
Тип возвращаемого значения:
 num

Пример:

tarantool> fiber.time(), fiber.time64()
 ---
 - 1448466351.2708
 - 1448466351270762
 ...

Пример

Создание функции, которая будет связана с файбером. Такая функция содержит бесконечный цикл (while 0 == 0 всегда будет правдой). Каждая итерация цикла прибавляет 1 к глобальной переменной под названием gvar, затем уходит в режим ожидания на 2 секунды. Ожидание вызывает неявную передачу управления fiber.yield().

tarantool> fiber = require('fiber')
 tarantool> function function_x()
          >   gvar = 0
          >   while 0 == 0 do
          >     gvar = gvar + 1
          >     fiber.sleep(2)
          >   end
          > end
 ---
 ...

Создание файбера, ассоциация функции function_x с файбером и запуск function_x. Она сразу же «отсоединится», то есть будет работать отдельно от вызывающего метода.

tarantool> gvar = 0

 tarantool> fiber_of_x = fiber.create(function_x)
 ---
 ...

Получение ID файбера (fid) для последующего вывода.

tarantool> fid = fiber_of_x:id()
 ---
 ...

Небольшая остановка, пока работает отсоединенная функция. Затем … Отображение идентификатора файбера, статуса файбера и переменной gvar (значение gvar немного увеличится в зависимости от длительности паузы). Статус будет «suspended» (приостановлен), потому что файбер практически всё время проводит в режиме ожидания или передачи управления.

tarantool> print('#', fid, '. ', fiber_of_x:status(), '. gvar=', gvar)
 # 102 .  suspended . gvar= 399
 ---
 ...

Небольшая остановка, пока работает отсоединенная функция. Затем … Отмена файбера. Затем снова отображение идентификатора файбера, статуса файбера и переменной gvar (значение gvar немного увеличится в зависимости от длительности паузы). На этот раз статус будет «dead» (недоступен), потому что произошла отмена.

tarantool> fiber_of_x:cancel()
 ---
 ...
 tarantool> print('#', fid, '. ', fiber_of_x:status(), '. gvar=', gvar)
 # 102 .  dead . gvar= 421
 ---
 ...

Каналы

Вызов fiber.channel() для выделения спейса и получение объекта канала, который будет называться «channel» в примерах данного раздела.

Вызов других процедур по каналу для отправки сообщений, получения сообщений или проверки статуса канала.

Message exchange is synchronous. The Lua garbage collector will mark or free the channel when no one is using it, as with any other Lua object. Use object-oriented syntax, for example channel:put(message) rather than fiber.channel.put(message).

fiber.channel([capacity])

Создание нового канала связи.

Параметры:
  • capacity (int) – максимальное количество слотов (спейсы для сообщений channel:put), которые можно использовать одновременно. По умолчанию, 0.
возвращается:

новый канал.

тип возвращаемого значения:
 

пользовательские данные, возможно включая строку «channel …».

object channel_object
channel_object:put(message[, timeout])

Отправка сообщения по каналу связи. Если канал заполнен, channel:put() ожидает, пока не освободится слот в канале.

Параметры:
  • message (lua-value) – то, что отправляется, как правило, строка, число или таблица
  • timeout (number) – максимальное количество секунд ожидания, чтобы слот освободился
возвращается:

Если указан параметр времени ожидания timeout, и в канале нет свободного слота в течение указанного времени, возвращается значение false (ложь). Если канал закрыт, возвращается значение false. В остальных случаях возвращается значение true (правда), которое указывает на успешную отправку.

тип возвращаемого значения:
 

boolean (логический)

channel_object:close()

Закрытие канала. Все, кто находится в режиме ожидания в канале, отключаются. Все последующие операции channel:get() вернут нулевое значение nil, а все последующие операции channel:put() вернут false (ложь).

channel_object:get([timeout])

Перехват и удаление сообщения из канала. Если канал пуст, channel:get() будет ожидать сообщения.

Параметры:
  • timeout (number) – максимальное количество секунд ожидания сообщения
возвращается:

Если указан параметр времени ожидания timeout, и в канале нет сообщения в течение указанного времени, возвращается нулевое значение nil. Если канал закрыт, возвращается значение nil. В остальных случаях возвращается сообщение, отправленное на канал с помощью channel:put().

тип возвращаемого значения:
 

как правило, строка, число или таблица, как определяет channel:put()

channel_object:is_empty()

Проверка пустоты канала (отсутствие сообщений).

возвращается:true (правда), если канал пуст. В противном случае, false (ложь).
тип возвращаемого значения:
 boolean (логический)
channel_object:count()

Определение количества сообщений в канале.

возвращается:количество сообщений.
тип возвращаемого значения:
 число
channel_object:is_full()

Проверка заполненности канала.

возвращается:true (правда), если канал заполнен (количество сообщений в канале равно количеству слотов, то есть нет места для новых сообщений). В противном случае, false (ложь).
тип возвращаемого значения:
 boolean (логический)
channel_object:has_readers()

Проверка пустого канала на наличие читателей в состоянии ожидания сообщения после отправки запросов channel:get().

возвращается:true (правда), если на канале есть читатели в ожидании сообщения. В противном случае, false (ложь).
тип возвращаемого значения:
 boolean (логический)
channel_object:has_writers()

Проверка полного канала на наличие писателей в состоянии ожидания после отправки запросов channel:put().

возвращается:true (правда), если на канале есть писатели в состоянии ожидании. В противном случае, false (ложь).
тип возвращаемого значения:
 boolean (логический)
channel_object:is_closed()
возвращается:true (правда), если канал уже закрыт. В противном случае, false (ложь).
тип возвращаемого значения:
 boolean (логический)

Пример

В данном примере дается примерное представление о том, как должны выглядеть функции для файберов. Предполагается, что на функции ссылается fiber.create().

fiber = require('fiber')
 channel = fiber.channel(10)
 function consumer_fiber()
     while true do
         local task = channel:get()
         ...
     end
 end

 function consumer2_fiber()
     while true do
         -- 10 секунд
         local task = channel:get(10)
         if task ~= nil then
             ...
         else
             -- время ожидания
         end
     end
 end

 function producer_fiber()
     while true do
         task = box.space...:select{...}
         ...
         if channel:is_empty() then
             -- канал пуст
         end

         if channel:is_full() then
             -- канал полон
         end

         ...
         if channel:has_readers() then
             -- есть файберы
             -- которые ожидают данные
         end
         ...

         if channel:has_writers() then
             -- есть файберы
             -- которые ожидают читателей
         end
         channel:put(task)
     end
 end

 function producer2_fiber()
     while true do
         task = box.space...select{...}
         -- 10 секунд
         if channel:put(task, 10) then
             ...
         else
             -- время ожидания
         end
     end
 end

Условные переменные

Вызов fiber.cond() используется для создания именованной условной переменной, которая будет называться „cond“ для примеров данного раздела.

Вызов cond:wait() используется, чтобы заставить файбер ожидать сигнал, с помощью условной переменной.

Вызов cond:signal() используется, чтобы отправить сигнал для пробуждения отдельного файбера, который выполнил запрос cond:wait().

Вызов cond:broadcast() используется для отправки сигнала всем файберам, которые выполнили cond:wait().

fiber.cond()

Создание новой условной переменной.

возвращается:новая условная переменная.
тип возвращаемого значения:
 Lua-объект.
object cond_object
cond_object:wait([timeout])

Перевод файбера в режим ожидания до пробуждения другим файбером с помощью метода signal() или broadcast(). Переход в режим ожидания вызывает неявную передачу управления fiber.yield().

Параметры:
  • timeout – количество секунд ожидания, по умолчанию = всегда.
возвращается:

Если указан параметр времени ожидания timeout, и сигнал не передается в течение указанного времени, wait() вернет значение false (ложь). Если передается signal() или broadcast(), wait() вернет true (правда).

тип возвращаемого значения:
 

boolean (логический)

cond_object:signal()

Пробуждение отдельного файбера, который выполнил wait() для той же переменной.

тип возвращаемого значения:
 nil
cond_object:broadcast()

Пробуждение всех файберов, которые выполнили wait() для той же переменной.

тип возвращаемого значения:
 nil

Пример

Предположим, что запущен экземпляр Tarantool’а на прослушивание на localhost по порту 3301. Предположим, что у пользователя guest есть права на подключение. Используем утилиту tarantoolctl для запуска двух клиентов.

В первом терминале введите:

$ tarantoolctl connect '3301'
 tarantool> fiber = require('fiber')
 tarantool> cond = fiber.cond()
 tarantool> cond:wait()

Задача повиснет, поскольку cond:wait() – без дополнительного аргумента времени ожидания timeout – уйдет в режим ожидания до изменения условной переменной.

Во втором терминале введите:

$ tarantoolctl connect '3301'
 tarantool> cond:signal()

Теперь снова взгляните на терминал №1. Он покажет, что ожидание прекратилось, и функция cond:wait() вернула значение true.

В данном примере показана зависимость от использования глобальной условной переменной с произвольным именем cond. В реальной жизни разработчики следят за использованием различных имен для условных переменных в разных приложениях.