Справочники | Mq_Ee
Справочники

Справочники

Документ рассматривает параметры конфигурации, метрики и функции API для Tarantool Queue Enterprise, функциональность MQ (далее по тексту – TQE(MQ).

Конфигурация

Конфигурация модуля API

Пример YAML-файла конфигурации модуля API:

app_name: MESSAGE_QUEUE_EE_API
app_version: 1.0
core_host: 0.0.0.0
core_port: 18184
grpc_listen:
  - uri: 'tcp://0.0.0.0:18182'

publisher:
  enabled: true
  tarantool:
    user: user
    pass: pass
    connections:
      routers:
        - "localhost:3301"
    queues:
      queue:
        connections:
          routers:
            - "localhost:3302"
consumer:
  enabled: true
  polling_timeout: 500ms
  tarantool:
    user: user
    pass: pass
    connections:
      storage-1:
        - "localhost:3301"
    queues:
      queue:
        connections:
          storage-1:
            - "localhost:3302"

log:
  file: log.jsonl
  format: json
  level: info

Основные параметры

Основные параметры обеспечивают настройку адресов и портов подключения модуля API.

app_name: MESSAGE_QUEUE_EE_API
app_version: 1.0
core_host: 0.0.0.0
core_port: 18184
grpc_listen:
  - uri: 'tcp://0.0.0.0:18182'
grpc_host: 0.0.0.0
grpc_port: 18182
grpc_options:
  initial_conn_window_size: 65535
  initial_window_size: 65535
  header_table_size: 16777216
  max_header_list_size: 128
  max_concurrent_streams: 128
  num_stream_workers: 0
  max_recv_msg_size: 4294967296
  max_send_msg_size: 2147483647
  read_buffer_size: 262144
  write_buffer_size: 262144
  shared_write_buffer: false

где:

  • app_name - название приложения;

  • app_version - версия приложения;

  • core_host - адрес сбора метрик и проверки состояния модуля API;

  • core_port - порт сбора метрик и проверки состояния модуля API;

  • grpc_options - набор опций для конфигурации grpc-сервера (подробнее здесь). Опции не изменяются после запуска grpc-сервераы:

    • initial_conn_window_size - начальное окно в байтах http2-соединения, по умолчанию 64kb;

    • initial_window_size - начальное окно http2-стрима в байта, по умолчанию 64kb;

    • header_table_size - размер динамической таблицы с заголовками http2, по умолчанию не задано (динамическая таблица не используется);

    • max_header_list_size - максимальный размер таблицы с заголовками http2, по умолчанию не задано;

    • max_concurrent_streams - количество одновременных http-стримов, по умолчанию 128;

    • num_stream_workers - количество воркеров для обработки входящих запросов, по умолчанию 0 (под каждый http2-стрим создается отдельная горутина);

    • max_recv_msg_size - максимальный размер входящего сообщения в байтах, по умолчанию 4mb;

    • max_send_msg_size - максимальный размер исходящего сообщения в байтах, по умолчанию 2mb;

    • read_buffer_size - размер буфер на чтение, по умолчанию 256kb;

    • write_buffer_size - размер буфер на запись, по умолчанию 256kb;

    • shared_write_buffer - позволяет переиспользовать буфер на запись, а не создавать под каждое подключение, по умолчанию false;

  • grpc_listen - набор интрефейсов, на которых обслуживает gRPC-сервер;

    • uri - адрес прослушивания, например: unix:///tmp/tqe-mq.sock или tcp://localhost:18182

  • grpc_host - (устарело после введения grpc_listen) адрес модуля API;

  • grpc_port - (устарело после введения grpc_listen) порт модуля API.

publisher

Параметр publisher модуля API обеспечивает настройку параметров публикации.

publisher:
  enabled: true
  tarantool:
    user: user
    pass: pass
    queues:
      queue:
        connections:
          routers:
            - "localhost:3301"

Доступные опции параметра publisher:

  • enabled - параметр доступности сервиса:

    • true доступен,

    • false - нет;

  • tarantool - секция конфигурации доступа к ядру TQE(MQ):

    • user - логин доступа к очереди (указывается в секции creds конфигурации ядра TQE(MQ);

    • pass - пароль доступа к очереди (указывается в секции creds конфигурации ядра TQE(MQ);

    • connections - секция адресов роутеров (маршрутизаторов) для публикации сообщений.

    • queues - секция доступных очередей ядра TQE(MQ);

      • connections - секция адресов роутеров (маршрутизаторов) для публикации сообщений.

consumer

Параметр consumer модуля API обеспечивает настройку параметров подписки на очередь сообщений.

consumer:
  enabled: true
  polling_timeout: 500ms
  tarantool:
    user: user
    pass: pass
    queues:
      queue:
        connections:
          storage-1:
            - "localhost:3301"

Доступные опции параметра consumer:

  • enabled - параметр доступности сервиса:

    • true - доступен,

    • false - нет;

  • polling_timeout - время задержки между запросами новых сообщений, пример: 500ms;

  • tarantool - секция конфигурации доступа к очереди TQE MQ:

    • user - логин доступа к очереди (указывается в секции creds конфигурации ядра TQE(MQ);

    • pass - пароль доступа к очереди (указывается в секции creds конфигурации ядра TQE(MQ);

    • connections - секция адресов хранилищ для подписки на сообщения.

    • queues - секция доступных очередей TQE(MQ);

      • connections - секция адресов хранилищ для подписки на сообщения.

log

Параметр log модуля API обеспечивает настройку параметров журналирования.

log:
  file: log.jsonl
  format: json
  level: info

Доступные опции параметра log:

  • file - имя файла записи журнала (может содержать абсолютный или относительный путь, по умолчанию /dev/stderr либо в файл server.jsonl, при запуске модуля API в фоновом режиме с опцией -d);

  • format - формат вывода сообщений, принимаемые значения: text|json (по умолчанию text);

  • level - уровень журналирования, принимаемые значения: debug|info|warn|error|dpanic(паника в режиме разработки)|panic(паника)|fatal (по умолчанию info).

Конфигурация ядра

Конфигурация ядра Tarantool 3 происходит с помощью обновления файла конфигурации или конфигурации в etcd/Tarantool Config Storage. Подробнее про конфигурацию в Tarantool 3 можно прочитать здесь

Пример YAML-файла конфигурации модуля ядра:

credentials:
  users:
    user:
      roles: [super]
      password: pass

roles_cfg:
  app.roles.queue:
    queues:
      - name: queue
      - name: another_queue
        deduplication_mode: basic
      - name: archive_queue
        storage: disk
      - name: queue_disabled_indexe
        disabled_filters_by: [routing_key, sharding_key]
  app.roles.api:
    sharding:
      routing:
        core-1:
            buckets:
            - 1
            - [2,10]
        core-2:
            buckets:
            - [11,20]
        core-3:
            buckets:
            - [21,1000]

Секция queues

Секция queues отвечает за описание используемых очередей сообщений и настройку их параметров. Настраивается на уровне роли app.roles.queue. Имеет следующую структуру:

roles_cfg:
  app.roles.queue:
      queues:
        - name: some-queue
          latency: 1
          disabled_filters_by: [sharding_key]
      deduplication_mode: basic
        - name: other-queue

где:

  • name - название очереди сообщений.

  • latency - задержка в миллисекундах между оповещениями подписчика о новых сообщениях. По умолчанию – 1.

  • deduplication_mode - режим дедупликации (т.н. режим устранения избыточности, дублирования данных). Может принимать значения basic, extended, keep_latest. По умолчанию – basic.

  • poll_max_batch – максимальное количество сообщений, которое одно ядро вернет за один запрос подписки. По умолчанию – 100.

  • storage – движок хранения очереди. Допустимые значения: memory для использования memtx, disk для использования vinyl. По умолчанию - memory.

  • poll_yield_every – период (в сообщениях), с которым обработчик запроса будет передавать управление другим обработчикам. По умолчанию – 512.

  • disabled_filters_by - список отключенных фильтров. Отключение фильтра делает невозможным подписку с фильтрацией по указанному полю. Изменение этой опции после создания очереди позволяет включать фильтраю по полю при его удаление из списка, но не позволяет отключать ее. По умолчанию - [] (фильтрация включена по всем полям). Возможные значения - routing_key, sharding_key.

Также существует другой вариант настройки параметров секции queues, который имеет следующую структуру:

roles_cfg:
  app.roles.queue:
    queues: ["some-queue", "other-queue"]

Секция sharding

Секция sharding отвечает за описание статического шардинга. Настраивается на уровне роли app.roles.api:

roles_cfg:
  app.roles.api:
    sharding:
      routing:
        core-1:
          buckets:
            - 1
            - [2,300]
        core-2:
          buckets:
            - [301,500]
        core-3:
          buckets:
            - [501,1000]

где:

  • core-1 - псевдоним набора реплик (replica set) из топологии;

  • buckets - диапазон обслуживаемых бакетов. Может принимать массив значений, где каждое значение - либо id одного бакета, либо диапазон бакетов «от и до».

Метрики

Контроль работоспособности модулей выполняется на основании проверки метрик производительности.

Метрики модулей предоставляются в формате prometheus.

Метрики модуля API

Метрики модуля API доступны на HTTP-эндпоинте /metrics.

Пример частичного вывода метрик модуля API:

# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 5.3002e-05
go_gc_duration_seconds{quantile="0.25"} 7.574e-05
go_gc_duration_seconds{quantile="0.5"} 9.047e-05
go_gc_duration_seconds{quantile="0.75"} 0.000111856
go_gc_duration_seconds{quantile="1"} 0.000338263
go_gc_duration_seconds_sum 1.305067544
go_gc_duration_seconds_count 11608
...

Основный метрики модуля API предоставляют следующую диагностическую информацию:

  • mqee_grpc_published_messages_total - счетчик опубликованных сообщений в очередь:

    • grpc_method - название удаленной процедуры;

    • queue - название очереди.

  • mqee_grpc_received_messages_total - счетчик полученных gRPC-сервером сообщений на публикацию:

    • grpc_method - название удаленной процедуры;

    • queue - название очереди.

  • mqee_grpc_sent_messages_total - счетчик отправленных сообщений потребителям gRPC-сервером:

    • queue - название очереди.

  • mqee_grpc_requests_size_bytes_total - счетчик общего размера запросов в байтах:

    • grpc_method - название удаленной процедуры;

    • queue - название очереди.

  • mqee_grpc_subscribers_count - индикатор текущего количества подписчиков:

    • queue - название очереди;

    • routing_key - значение routing_key, по которому была произведена подписка.

  • mqee_grpc_app_info - информация об очереди (mqee_grpc_app_info{app_name="MESSAGE_QUEUE_EE_API",app_version="test"} 1);

  • mqee_grpc_gomaxprocs - индикатор текущего значения GOMAXPROCS.

  • grpc_server_started_total - счетчик полученных запросов на удаленную процедуру:

    • grpc_type - тип подключения (unary, client_stream, server_stream, bidi_stream);

    • grpc_service - название gRPC-сервиса;

    • grpc_method - название процедуры:

      • Publish;

      • PublishBatch;

      • ServerReflectionInfo;

      • Subscribe.

  • grpc_server_handled_total - счетчик завершенных удаленных процедур:

    • grpc_type - тип подключения (unary, client_stream, server_stream, bidi_stream):

    • grpc_service - название gRPC-сервиса;

    • grpc_method - название процедуры (см. список в grpc_server_started_total).

    • grpc_code - код ответа gRPC-код.

  • grpc_server_msg_received_total - счетчик полученных сообщений:

    • grpc_type - тип подключения(unary, client_stream, server_stream, bidi_stream);

    • grpc_service - название gRPC-сервиса;

    • grpc_method - название процедуры (см. список в grpc_server_started_total).

  • grpc_server_msg_sent_total - счетчик отправленных сообщений:

    • grpc_type - тип подключения(unary, client_stream, server_stream, bidi_stream);

    • grpc_service - название gRPC-сервиса;

    • grpc_method - название процедуры (см. список в grpc_server_started_total).

  • grpc_server_handling_seconds - гистограмма секунд на обработку gRPC-вызова:

    • grpc_type - тип подключения(unary, client_stream, server_stream, bidi_stream);

    • grpc_service - название gRPC-сервиса;

    • grpc_method - название процедуры (см. список в grpc_server_started_total).

  • go_gc_duration_seconds - сводная информация по длительности паузы на цикл сборки мусора окружения Go. Используемый диапазон в секундах: 0, 0.25, 0.5, 0.75, 1;

  • go_goroutines - индикатор текущего количества goroutine;

  • go_info - информация о версии окружения Go;

  • go_memstats_* - набор метрик для отслеживания использования памяти средой выполнения Go;

  • go_threads - индикатор текущего количества потоков (threads) операционной системы, используемых средой выполнения Go.

Метрики ядра TQE(MQ)

Метрики ядра TQE(MQ) доступны на HTTP-эндпоинте /metrics на каждом экземпляре Tarantool.

Пример частичного вывода метрик модуля ядра:

# HELP tnt_vinyl_disk_index_size Amount of index stored in files
# TYPE tnt_vinyl_disk_index_size gauge
tnt_vinyl_disk_index_size{alias="app"} 0
# HELP tnt_read_only Is instance read only
# TYPE tnt_read_only gauge
tnt_read_only{alias="app"} 0
# HELP tnt_vinyl_disk_data_size Amount of data stored in files
# TYPE tnt_vinyl_disk_data_size gauge
tnt_vinyl_disk_data_size{alias="app"} 0
...

Сейчас метрики ядра TQE содержат только метрики Tarantool. С полным списком можно ознакомиться в справочнике метрик Tarantool

API

В этом разделе описана спецификация API на основе протокола gRPC.

Примеры создания gRPC-клиента и выполнения запросов к очереди приведены в документах:

PublisherService

Сервер публикации сообщений брокера очередей

Method Name

Request Type

Response Type

Description

Publish

PublishRequest

PublishResponse

Публикация сообщения в очередь

PublishBatch

PublishBatchRequest

PublishBatchResponse

Публикация группы сообщений в очередь

PublishRequest

Публикация сообщения в очередь.

Field

Type

Label

Description

queue

string

Название очереди, в которой необходимо опубликовать сообщение

routing_key

string

optional

Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди на консьюмерах

sharding_key

string

optional

Ключ шардирования. Необходим для распределения данных в системе

deduplication_key

string

optional

Ключ дедупликации. Необходим для проверки повторных сообщений. Если не указан, то проверка не производится

payload

bytes

Произвольные данные в бинарном формате (тело сообщения)

metadata

PublishRequest.MetadataEntry

repeated

Произвольные данные в бинарном формате. Содержит дополнительные данные для сообщения, которые необходимы для отладки и трассировки

PublishRequest.MetadataEntry

Field

Type

Label

Description

key

string

value

string

PublishResponse

Ответ на публикацию сообщения

Field

Type

Label

Description

id

uint64

Идентификатор сообщения, добавленного в очередь

metadata

PublishResponse.MetadataEntry

repeated

Содержит дополнительные данные, которые необходимы для отладки и трассировки

PublishResponse.MetadataEntry

Field

Type

Label

Description

key

string

value

string

PublishBatchRequest

Запрос на публикацию группы сообщений в очередь

Field

Type

Label

Description

queue

string

Название очереди, в которой необходимо опубликовать сообщения

sharding_key

string

optional

Ключ шардирования. Необходим для распределения данных в системе

messages

BatchRequestMessage

repeated

Набор сообщений

metadata

PublishBatchRequest.MetadataEntry

repeated

Содержит дополнительные данные, которые необходимы для отладки и трассировки

PublishBatchRequest.MetadataEntry

Field

Type

Label

Description

key

string

value

string

BatchRequestMessage

Группа сообщений.

Field

Type

Label

Description

routing_key

string

optional

Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди на консьюмерах

deduplication_key

string

optional

Ключ дедупликации. Необходим для проверки повторных сообщений. Если не указан, то проверка не производится

payload

bytes

Произвольные данные в бинарном формате (тело сообщения)

metadata

BatchRequestMessage.MetadataEntry

repeated

Произвольные данные в бинарном формате. Содержит дополнительные для данные сообщения, которые необходимы для отладки и трассировки

BatchRequestMessage.MetadataEntry

Field

Type

Label

Description

key

string

value

string

PublishBatchResponse

Ответ на публикацию группы сообщений.

Field

Type

Label

Description

ids

uint64

repeated

Идентификаторы сообщений

metadata

PublishBatchResponse.MetadataEntry

repeated

Содержит дополнительные данные, которые необходимы для отладки и трассировки

PublishBatchResponse.MetadataEntry

Field

Type

Label

Description

key

string

value

string

BroadcastRequest

Запрос на рассылку сообщения на указанные шарды

Field

Type

Label

Description

queue

string

Название очереди, в которую необходимо опубликовать сообщение

routing_key

string

optional

Ключ маршрутизации сообщения (тип сообщения) необходим для фильтрации сообщений из очереди на консьюмерах

deduplication_key

string

optional

Ключ дедупликации необходим для проверки повторных сообщений, если не указан, то проверка не производится

payload

bytes

Произвольные данные в бинарном формате, содержит тело сообщения

metadata

BroadcastRequest.MetadataEntry

repeated

Произвольные данные в бинарном формате, содержит дополнительные для сообщения данные, необходимые для отладки и трассировки

replicasets

string

repeated

Список с названиями репликасетов, на которые нужно опубликовать сообщение. По умолчанию рассылка происходит на все шарды.

timeout

uint64

optional

Максимальное время на рассылку сообщения

BroadcastRequest.MetadataEntry

Field

Type

Label

Description

key

string

value

string

BroadcastResponse

Ответ на рассылку сообщения

Field

Type

Label

Description

code

uint32

Код завершения рассылки: 0 - Успешная публикация 1 - Ошибка на роутере 2 - Ошибка на репликасете

error

string

optional

Сообщение об ошибке

replicasets

BroadcastResponse.ReplicasetsEntry

repeated

Набор ответов с шардов

metadata

BroadcastResponse.MetadataEntry

repeated

Содержит дополнительные данные необходимые для отладки и трассировки

BroadcastResponse.ReplicasetsEntry

Field

Type

Label

Description

key

string

value

ReplicasetResponse

BroadcastResponse.MetadataEntry

Field

Type

Label

Description

key

string

value

string

ReplicasetResponse

Ответ репликасета на публикацию сообщения

Field

Type

Label

Description

success

Success

Сообщение об успешной публикации

error

Error

Сообщение об ошибке публикации

Success

Сообщение об успешной публикации

Field

Type

Label

Description

id

uint64

Идентификатор сообщения добавленного в очередь

Error

Сообщение об ошибке публикации

Field

Type

Label

Description

code

uint32

Код ошибки

message

string

Сообщение об ошибке

ConsumerService

Сервер подписок на сообщения брокера очередей

Method Name

Request Type

Response Type

Description

Subscribe

SubscriptionRequest

SubscriptionNotifications stream

Подписка на сообщения с фильтром

SubscriptionRequest

Запрос на подписку

Field

Type

Label

Description

queue

string

Название очереди

routing_key

string

optional

Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди. Если не указан, то подписка происходит на все типы сообщений в очереди

cursor

string

optional

Опциональная строка указатель на последнее полученное сообщение. Необходим для возможности получения истории сообщений или восстановления работы консьюмера после сбоя. Если значение не указано - подписка с текущего момента. Значение как пустая строка - подписка с начала очереди. Значение указано - подписка с указанного сообщения в очереди

sharding_key

string

optional

Ключ шардирования. Необходим для распределения данных в системе. Если не указан, то подписка происходит на все типы сообщений в очереди

SubscriptionNotifications

Сообщение в стриме подписки

Field

Type

Label

Description

notifications

SubscriptionNotification

repeated

Новые сообщения в очереди с курсорами

SubscriptionNotification

Уведомление клиента о новых сообщения в очереди

Field

Type

Label

Description

cursor

string

Строка-указатель сообщения

message

QueueMessage

Сообщение

QueueMessage

Сообщение в очереди

Field

Type

Label

Description

id

uint64

Идентификатор сообщения. Заполняется автоматически при записи сообщения в очередь

queue

string

Название очереди, в которую необходимо опубликовать сообщение

routing_key

string

optional

Ключ маршрутизации сообщения (тип сообщения). Необходим для фильтрации сообщений из очереди на консьюмерах

sharding_key

string

optional

Ключ шардирования. Необходим для распределения данных в системе

deduplication_key

string

optional

Ключ дедупликации. Необходим для проверки повторных сообщений. Если не указан, то проверка не производится

payload

bytes

Произвольные данные в бинарном формате (тело сообщения)

metadata

QueueMessage.MetadataEntry

repeated

Произвольные данные в бинарном формате. Содержит дополнительные для сообщения данные, которые необходимы для отладки и трассировки

timestamp

int64

Время вставки сообщения в очередь в наносекундах

QueueMessage.MetadataEntry

Field

Type

Label

Description

key

string

value

string

Found what you were looking for?
Feedback