Справочники¶
Документ рассматривает параметры конфигурации, метрики и функции API для Tarantool Queue Enterprise, функциональность MQ (далее по тексту – TQE(MQ).
Конфигурация¶
Конфигурация модуля API¶
Пример YAML-файла конфигурации модуля API:
app_name: MESSAGE_QUEUE_EE_API
app_version: 1.0
core_host: 0.0.0.0
core_port: 18184
grpc_listen:
- uri: 'tcp://0.0.0.0:18182'
publisher:
enabled: true
tarantool:
user: user
pass: pass
connections:
routers:
- "localhost:3301"
queues:
queue:
connections:
routers:
- "localhost:3302"
consumer:
enabled: true
polling_timeout: 500ms
tarantool:
user: user
pass: pass
connections:
storage-1:
- "localhost:3301"
queues:
queue:
connections:
storage-1:
- "localhost:3302"
log:
file: log.jsonl
format: json
level: info
Основные параметры¶
Основные параметры обеспечивают настройку адресов и портов подключения модуля API.
app_name: MESSAGE_QUEUE_EE_API
app_version: 1.0
core_host: 0.0.0.0
core_port: 18184
grpc_listen:
- uri: 'tcp://0.0.0.0:18182'
grpc_host: 0.0.0.0
grpc_port: 18182
grpc_options:
initial_conn_window_size: 65535
initial_window_size: 65535
header_table_size: 16777216
max_header_list_size: 128
max_concurrent_streams: 128
num_stream_workers: 0
max_recv_msg_size: 4294967296
max_send_msg_size: 2147483647
read_buffer_size: 262144
write_buffer_size: 262144
shared_write_buffer: false
где:
app_name
- название приложения;app_version
- версия приложения;core_host
- адрес сбора метрик и проверки состояния модуля API;core_port
- порт сбора метрик и проверки состояния модуля API;grpc_options
- набор опций для конфигурации grpc-сервера (подробнее здесь). Опции не изменяются после запуска grpc-сервераы:initial_conn_window_size
- начальное окно в байтах http2-соединения, по умолчанию 64kb;initial_window_size
- начальное окно http2-стрима в байта, по умолчанию 64kb;header_table_size
- размер динамической таблицы с заголовками http2, по умолчанию не задано (динамическая таблица не используется);max_header_list_size
- максимальный размер таблицы с заголовками http2, по умолчанию не задано;max_concurrent_streams
- количество одновременных http-стримов, по умолчанию 128;num_stream_workers
- количество воркеров для обработки входящих запросов, по умолчанию 0 (под каждый http2-стрим создается отдельная горутина);max_recv_msg_size
- максимальный размер входящего сообщения в байтах, по умолчанию 4mb;max_send_msg_size
- максимальный размер исходящего сообщения в байтах, по умолчанию 2mb;read_buffer_size
- размер буфер на чтение, по умолчанию 256kb;write_buffer_size
- размер буфер на запись, по умолчанию 256kb;shared_write_buffer
- позволяет переиспользовать буфер на запись, а не создавать под каждое подключение, по умолчаниюfalse
;
grpc_listen
- набор интрефейсов, на которых обслуживает gRPC-сервер;uri
- адрес прослушивания, например:unix:///tmp/tqe-mq.sock
илиtcp://localhost:18182
grpc_host
- (устарело после введенияgrpc_listen
) адрес модуля API;grpc_port
- (устарело после введенияgrpc_listen
) порт модуля API.
publisher¶
Параметр publisher
модуля API обеспечивает настройку параметров публикации.
publisher:
enabled: true
tarantool:
user: user
pass: pass
queues:
queue:
connections:
routers:
- "localhost:3301"
Доступные опции параметра publisher
:
enabled
- параметр доступности сервиса:true
доступен,false
- нет;
tarantool
- секция конфигурации доступа к ядру TQE(MQ):user
- логин доступа к очереди (указывается в секцииcreds
конфигурации ядра TQE(MQ);pass
- пароль доступа к очереди (указывается в секцииcreds
конфигурации ядра TQE(MQ);connections
- секция адресов роутеров (маршрутизаторов) для публикации сообщений.queues
- секция доступных очередей ядра TQE(MQ);connections
- секция адресов роутеров (маршрутизаторов) для публикации сообщений.
consumer¶
Параметр consumer
модуля API обеспечивает настройку параметров подписки на очередь сообщений.
consumer:
enabled: true
polling_timeout: 500ms
tarantool:
user: user
pass: pass
queues:
queue:
connections:
storage-1:
- "localhost:3301"
Доступные опции параметра consumer
:
enabled
- параметр доступности сервиса:true
- доступен,false
- нет;
polling_timeout
- время задержки между запросами новых сообщений, пример: 500ms;tarantool
- секция конфигурации доступа к очереди TQE MQ:user
- логин доступа к очереди (указывается в секцииcreds
конфигурации ядра TQE(MQ);pass
- пароль доступа к очереди (указывается в секцииcreds
конфигурации ядра TQE(MQ);connections
- секция адресов хранилищ для подписки на сообщения.queues
- секция доступных очередей TQE(MQ);connections
- секция адресов хранилищ для подписки на сообщения.
log¶
Параметр log
модуля API обеспечивает настройку параметров журналирования.
log:
file: log.jsonl
format: json
level: info
Доступные опции параметра log
:
file
- имя файла записи журнала (может содержать абсолютный или относительный путь, по умолчанию /dev/stderr либо в файл server.jsonl, при запуске модуля API в фоновом режиме с опцией-d
);format
- формат вывода сообщений, принимаемые значения: text|json (по умолчанию text);level
- уровень журналирования, принимаемые значения: debug|info|warn|error|dpanic(паника в режиме разработки)|panic(паника)|fatal (по умолчанию info).
Конфигурация ядра¶
Конфигурация ядра Tarantool 3 происходит с помощью обновления файла конфигурации или конфигурации в etcd/Tarantool Config Storage. Подробнее про конфигурацию в Tarantool 3 можно прочитать здесь
Пример YAML-файла конфигурации модуля ядра:
credentials:
users:
user:
roles: [super]
password: pass
roles_cfg:
app.roles.queue:
queues:
- name: queue
- name: another_queue
deduplication_mode: basic
- name: archive_queue
storage: disk
- name: queue_disabled_indexe
disabled_filters_by: [routing_key, sharding_key]
app.roles.api:
sharding:
routing:
core-1:
buckets:
- 1
- [2,10]
core-2:
buckets:
- [11,20]
core-3:
buckets:
- [21,1000]
Секция queues¶
Секция queues
отвечает за описание используемых очередей сообщений и настройку их параметров. Настраивается на уровне роли app.roles.queue
. Имеет следующую структуру:
roles_cfg:
app.roles.queue:
queues:
- name: some-queue
latency: 1
disabled_filters_by: [sharding_key]
deduplication_mode: basic
- name: other-queue
где:
name
- название очереди сообщений.latency
- задержка в миллисекундах между оповещениями подписчика о новых сообщениях. По умолчанию –1
.deduplication_mode
- режим дедупликации (т.н. режим устранения избыточности, дублирования данных). Может принимать значенияbasic
,extended
,keep_latest
. По умолчанию –basic
.poll_max_batch
– максимальное количество сообщений, которое одно ядро вернет за один запрос подписки. По умолчанию –100
.storage
– движок хранения очереди. Допустимые значения:memory
для использования memtx,disk
для использования vinyl. По умолчанию -memory
.poll_yield_every
– период (в сообщениях), с которым обработчик запроса будет передавать управление другим обработчикам. По умолчанию –512
.disabled_filters_by
- список отключенных фильтров. Отключение фильтра делает невозможным подписку с фильтрацией по указанному полю. Изменение этой опции после создания очереди позволяет включать фильтраю по полю при его удаление из списка, но не позволяет отключать ее. По умолчанию - [] (фильтрация включена по всем полям). Возможные значения -routing_key
,sharding_key
.
Также существует другой вариант настройки параметров секции queues
, который имеет следующую структуру:
roles_cfg:
app.roles.queue:
queues: ["some-queue", "other-queue"]
Секция sharding¶
Секция sharding
отвечает за описание статического шардинга. Настраивается на уровне роли app.roles.api
:
roles_cfg:
app.roles.api:
sharding:
routing:
core-1:
buckets:
- 1
- [2,300]
core-2:
buckets:
- [301,500]
core-3:
buckets:
- [501,1000]
где:
core-1
- псевдоним набора реплик (replica set) из топологии;buckets
- диапазон обслуживаемых бакетов. Может принимать массив значений, где каждое значение - либоid
одного бакета, либо диапазон бакетов «от и до».
Метрики¶
Контроль работоспособности модулей выполняется на основании проверки метрик производительности.
Метрики модулей предоставляются в формате prometheus.
Метрики модуля API¶
Метрики модуля API доступны на HTTP-эндпоинте /metrics
.
Пример частичного вывода метрик модуля API:
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 5.3002e-05
go_gc_duration_seconds{quantile="0.25"} 7.574e-05
go_gc_duration_seconds{quantile="0.5"} 9.047e-05
go_gc_duration_seconds{quantile="0.75"} 0.000111856
go_gc_duration_seconds{quantile="1"} 0.000338263
go_gc_duration_seconds_sum 1.305067544
go_gc_duration_seconds_count 11608
...
Основный метрики модуля API предоставляют следующую диагностическую информацию:
mqee_grpc_published_messages_total - счетчик опубликованных сообщений в очередь:
grpc_method - название удаленной процедуры;
queue - название очереди.
mqee_grpc_received_messages_total - счетчик полученных gRPC-сервером сообщений на публикацию:
grpc_method - название удаленной процедуры;
queue - название очереди.
mqee_grpc_sent_messages_total - счетчик отправленных сообщений потребителям gRPC-сервером:
queue - название очереди.
mqee_grpc_requests_size_bytes_total - счетчик общего размера запросов в байтах:
grpc_method - название удаленной процедуры;
queue - название очереди.
mqee_grpc_subscribers_count - индикатор текущего количества подписчиков:
queue - название очереди;
routing_key - значение
routing_key
, по которому была произведена подписка.
mqee_grpc_app_info - информация об очереди (
mqee_grpc_app_info{app_name="MESSAGE_QUEUE_EE_API",app_version="test"} 1
);mqee_grpc_gomaxprocs - индикатор текущего значения
GOMAXPROCS
.grpc_server_started_total - счетчик полученных запросов на удаленную процедуру:
grpc_type - тип подключения (
unary
,client_stream
,server_stream
,bidi_stream
);grpc_service - название gRPC-сервиса;
grpc_method - название процедуры:
Publish;
PublishBatch;
ServerReflectionInfo;
Subscribe.
grpc_server_handled_total - счетчик завершенных удаленных процедур:
grpc_type - тип подключения (
unary
,client_stream
,server_stream
,bidi_stream
):grpc_service - название gRPC-сервиса;
grpc_method - название процедуры (см. список в grpc_server_started_total).
grpc_code - код ответа gRPC-код.
grpc_server_msg_received_total - счетчик полученных сообщений:
grpc_type - тип подключения(
unary
,client_stream
,server_stream
,bidi_stream
);grpc_service - название gRPC-сервиса;
grpc_method - название процедуры (см. список в grpc_server_started_total).
grpc_server_msg_sent_total - счетчик отправленных сообщений:
grpc_type - тип подключения(
unary
,client_stream
,server_stream
,bidi_stream
);grpc_service - название gRPC-сервиса;
grpc_method - название процедуры (см. список в grpc_server_started_total).
grpc_server_handling_seconds - гистограмма секунд на обработку gRPC-вызова:
grpc_type - тип подключения(
unary
,client_stream
,server_stream
,bidi_stream
);grpc_service - название gRPC-сервиса;
grpc_method - название процедуры (см. список в grpc_server_started_total).
go_gc_duration_seconds - сводная информация по длительности паузы на цикл сборки мусора окружения Go. Используемый диапазон в секундах: 0, 0.25, 0.5, 0.75, 1;
go_goroutines - индикатор текущего количества goroutine;
go_info - информация о версии окружения Go;
go_memstats_* - набор метрик для отслеживания использования памяти средой выполнения Go;
go_threads - индикатор текущего количества потоков (threads) операционной системы, используемых средой выполнения Go.
Метрики ядра TQE(MQ)¶
Метрики ядра TQE(MQ) доступны на HTTP-эндпоинте /metrics
на каждом экземпляре Tarantool.
Пример частичного вывода метрик модуля ядра:
# HELP tnt_vinyl_disk_index_size Amount of index stored in files
# TYPE tnt_vinyl_disk_index_size gauge
tnt_vinyl_disk_index_size{alias="app"} 0
# HELP tnt_read_only Is instance read only
# TYPE tnt_read_only gauge
tnt_read_only{alias="app"} 0
# HELP tnt_vinyl_disk_data_size Amount of data stored in files
# TYPE tnt_vinyl_disk_data_size gauge
tnt_vinyl_disk_data_size{alias="app"} 0
...
Сейчас метрики ядра TQE содержат только метрики Tarantool. С полным списком можно ознакомиться в справочнике метрик Tarantool
API¶
В этом разделе описана спецификация API на основе протокола gRPC.
Примеры создания gRPC-клиента и выполнения запросов к очереди приведены в документах:
PublisherService¶
Сервер публикации сообщений брокера очередей
Method Name |
Request Type |
Response Type |
Description |
---|---|---|---|
Publish |
PublishRequest |
PublishResponse |
Публикация сообщения в очередь |
PublishBatch |
PublishBatchRequest |
PublishBatchResponse |
Публикация группы сообщений в очередь |
PublishRequest¶
Публикация сообщения в очередь.
Field |
Type |
Label |
Description |
---|---|---|---|
queue |
string |
Название очереди, в которой необходимо опубликовать сообщение |
|
routing_key |
string |
optional |
Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди на консьюмерах |
sharding_key |
string |
optional |
Ключ шардирования. Необходим для распределения данных в системе |
deduplication_key |
string |
optional |
Ключ дедупликации. Необходим для проверки повторных сообщений. Если не указан, то проверка не производится |
payload |
bytes |
Произвольные данные в бинарном формате (тело сообщения) |
|
metadata |
PublishRequest.MetadataEntry |
repeated |
Произвольные данные в бинарном формате. Содержит дополнительные данные для сообщения, которые необходимы для отладки и трассировки |
PublishRequest.MetadataEntry¶
Field |
Type |
Label |
Description |
---|---|---|---|
key |
string |
||
value |
string |
PublishResponse¶
Ответ на публикацию сообщения
Field |
Type |
Label |
Description |
---|---|---|---|
id |
uint64 |
Идентификатор сообщения, добавленного в очередь |
|
metadata |
PublishResponse.MetadataEntry |
repeated |
Содержит дополнительные данные, которые необходимы для отладки и трассировки |
PublishResponse.MetadataEntry¶
Field |
Type |
Label |
Description |
---|---|---|---|
key |
string |
||
value |
string |
PublishBatchRequest¶
Запрос на публикацию группы сообщений в очередь
Field |
Type |
Label |
Description |
---|---|---|---|
queue |
string |
Название очереди, в которой необходимо опубликовать сообщения |
|
sharding_key |
string |
optional |
Ключ шардирования. Необходим для распределения данных в системе |
messages |
BatchRequestMessage |
repeated |
Набор сообщений |
metadata |
PublishBatchRequest.MetadataEntry |
repeated |
Содержит дополнительные данные, которые необходимы для отладки и трассировки |
PublishBatchRequest.MetadataEntry¶
Field |
Type |
Label |
Description |
---|---|---|---|
key |
string |
||
value |
string |
BatchRequestMessage¶
Группа сообщений.
Field |
Type |
Label |
Description |
---|---|---|---|
routing_key |
string |
optional |
Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди на консьюмерах |
deduplication_key |
string |
optional |
Ключ дедупликации. Необходим для проверки повторных сообщений. Если не указан, то проверка не производится |
payload |
bytes |
Произвольные данные в бинарном формате (тело сообщения) |
|
metadata |
BatchRequestMessage.MetadataEntry |
repeated |
Произвольные данные в бинарном формате. Содержит дополнительные для данные сообщения, которые необходимы для отладки и трассировки |
BatchRequestMessage.MetadataEntry¶
Field |
Type |
Label |
Description |
---|---|---|---|
key |
string |
||
value |
string |
PublishBatchResponse¶
Ответ на публикацию группы сообщений.
Field |
Type |
Label |
Description |
---|---|---|---|
ids |
uint64 |
repeated |
Идентификаторы сообщений |
metadata |
PublishBatchResponse.MetadataEntry |
repeated |
Содержит дополнительные данные, которые необходимы для отладки и трассировки |
PublishBatchResponse.MetadataEntry¶
Field |
Type |
Label |
Description |
---|---|---|---|
key |
string |
||
value |
string |
BroadcastRequest¶
Запрос на рассылку сообщения на указанные шарды
Field |
Type |
Label |
Description |
---|---|---|---|
queue |
string |
Название очереди, в которую необходимо опубликовать сообщение |
|
routing_key |
string |
optional |
Ключ маршрутизации сообщения (тип сообщения) необходим для фильтрации сообщений из очереди на консьюмерах |
deduplication_key |
string |
optional |
Ключ дедупликации необходим для проверки повторных сообщений, если не указан, то проверка не производится |
payload |
bytes |
Произвольные данные в бинарном формате, содержит тело сообщения |
|
metadata |
BroadcastRequest.MetadataEntry |
repeated |
Произвольные данные в бинарном формате, содержит дополнительные для сообщения данные, необходимые для отладки и трассировки |
replicasets |
string |
repeated |
Список с названиями репликасетов, на которые нужно опубликовать сообщение. По умолчанию рассылка происходит на все шарды. |
timeout |
uint64 |
optional |
Максимальное время на рассылку сообщения |
BroadcastRequest.MetadataEntry¶
Field |
Type |
Label |
Description |
---|---|---|---|
key |
string |
||
value |
string |
BroadcastResponse¶
Ответ на рассылку сообщения
Field |
Type |
Label |
Description |
---|---|---|---|
code |
uint32 |
Код завершения рассылки: 0 - Успешная публикация 1 - Ошибка на роутере 2 - Ошибка на репликасете |
|
error |
string |
optional |
Сообщение об ошибке |
replicasets |
BroadcastResponse.ReplicasetsEntry |
repeated |
Набор ответов с шардов |
metadata |
BroadcastResponse.MetadataEntry |
repeated |
Содержит дополнительные данные необходимые для отладки и трассировки |
BroadcastResponse.ReplicasetsEntry¶
Field |
Type |
Label |
Description |
---|---|---|---|
key |
string |
||
value |
ReplicasetResponse |
BroadcastResponse.MetadataEntry¶
Field |
Type |
Label |
Description |
---|---|---|---|
key |
string |
||
value |
string |
ReplicasetResponse¶
Ответ репликасета на публикацию сообщения
Field |
Type |
Label |
Description |
---|---|---|---|
success |
Success |
Сообщение об успешной публикации |
|
error |
Error |
Сообщение об ошибке публикации |
Success¶
Сообщение об успешной публикации
Field |
Type |
Label |
Description |
---|---|---|---|
id |
uint64 |
Идентификатор сообщения добавленного в очередь |
Error¶
Сообщение об ошибке публикации
Field |
Type |
Label |
Description |
---|---|---|---|
code |
uint32 |
Код ошибки |
|
message |
string |
Сообщение об ошибке |
ConsumerService¶
Сервер подписок на сообщения брокера очередей
Method Name |
Request Type |
Response Type |
Description |
---|---|---|---|
Subscribe |
SubscriptionRequest |
SubscriptionNotifications stream |
Подписка на сообщения с фильтром |
SubscriptionRequest¶
Запрос на подписку
Field |
Type |
Label |
Description |
---|---|---|---|
queue |
string |
Название очереди |
|
routing_key |
string |
optional |
Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди. Если не указан, то подписка происходит на все типы сообщений в очереди |
cursor |
string |
optional |
Опциональная строка указатель на последнее полученное сообщение. Необходим для возможности получения истории сообщений или восстановления работы консьюмера после сбоя. Если значение не указано - подписка с текущего момента. Значение как пустая строка - подписка с начала очереди. Значение указано - подписка с указанного сообщения в очереди |
sharding_key |
string |
optional |
Ключ шардирования. Необходим для распределения данных в системе. Если не указан, то подписка происходит на все типы сообщений в очереди |
SubscriptionNotifications¶
Сообщение в стриме подписки
Field |
Type |
Label |
Description |
---|---|---|---|
notifications |
SubscriptionNotification |
repeated |
Новые сообщения в очереди с курсорами |
SubscriptionNotification¶
Уведомление клиента о новых сообщения в очереди
Field |
Type |
Label |
Description |
---|---|---|---|
cursor |
string |
Строка-указатель сообщения |
|
message |
QueueMessage |
Сообщение |
QueueMessage¶
Сообщение в очереди
Field |
Type |
Label |
Description |
---|---|---|---|
id |
uint64 |
Идентификатор сообщения. Заполняется автоматически при записи сообщения в очередь |
|
queue |
string |
Название очереди, в которую необходимо опубликовать сообщение |
|
routing_key |
string |
optional |
Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди на консьюмерах |
sharding_key |
string |
optional |
Ключ шардирования. Необходим для распределения данных в системе |
deduplication_key |
string |
optional |
Ключ дедупликации. Необходим для проверки повторных сообщений. Если не указан, то проверка не производится |
payload |
bytes |
Произвольные данные в бинарном формате (тело сообщения) |
|
metadata |
QueueMessage.MetadataEntry |
repeated |
Произвольные данные в бинарном формате. Содержит дополнительные для сообщения данные, которые необходимы для отладки и трассировки |
timestamp |
int64 |
Время вставки сообщения в очередь в наносекундах |
QueueMessage.MetadataEntry¶
Field |
Type |
Label |
Description |
---|---|---|---|
key |
string |
||
value |
string |