Руководство пользователя
Настоящее Руководство содержит пояснения к продукту, описание процесса настройки быстрого запуска, набор примеров по созданию приложений-клиентов Tarantool Queue Enterprise (Message Queue), а также другую информацию, направленную на улучшение взаимодействия пользователя с TQE(MQ).
Tarantool Queue Enterprise (Message Queue) (TQE(MQ)) — это высокопроизводительная распределенная система управления очередями сообщений. Продукт предназначен для организации надежного обмена данными между сервисами в задачах, критичных для скорости и целостности информации.
TQE(MQ) выступает в роли брокера сообщений, обеспечивающего строгую маршрутизацию, фильтрацию и гарантированную доставку сообщений до потребителя при:
- Обработке запросов к базам данных;
- Синхронизации распределенных сервисов;
- Проведении транзакций в финансовых системах.
Ключевые возможности:
- Гарантия FIFO (First In, First Out) — на уровне набора реплик (шарда) сообщения обрабатываются в строгом порядке поступления.
- Избирательная фильтрация — потребители получают только те сообщения, на которые они подписались.
- Горизонтальное масштабирование — архитектура набора микросервисов с шардированием позволяет наращивать производительность линейно.
- Отказоустойчивость — автоматическое переключение лидера гарантирует доступность очереди при сбоях отдельных узлов.
- Дедупликация сообщений — настройки очереди позволяют управлять режимами обработки потенциально одинаковых сообщений.
- Гибкий ключ шардирования — система позволяет либо указать целевой шард явно, либо автоматически распределять нагрузку равномерно.
Данный раздел предназначен для ознакомления с продуктом TQE(MQ).
В разделе приведены варианты быстрой установки и запуска приложения – локально и c помощью Docker. Информация об установке TQE(MQ) для промышленного использования приведена в ссылка.
Далее описываются шаги по настройке модуля API (gRPC-сервера) и параметров подключения к очереди. После этого проверяется корректность установки и работоспособность приложения.
- Системные требования;
- Установка экземпляра;
- Запуск приложения;
- Настройка приложения после установки;
- Проверка установки приложения.
TQE(MQ) поддерживает установку на следующие операционные системы (ОС):
- RED OS 7.3;
- Astra Linux.
Также возможна установка на другие ОС – ALT Linux, AlmaLinux, Ubuntu, CentOS – но полноценная работоспособность не гарантируется.
Для работы приложения необходима библиотека glibc версии 2.17-260.el7_6.6 и выше. Для проверки текущей установленной
версии glibc и обновления необходимо выполнить:
$ rpm -q glibc
$ yum update glibc
- Распакуйте архив с дистрибутивом TQE(MQ):
$ tar -xzf <имя_архива>.tar.gz
- Перейдите в каталог с распакованным дистрибутивом:
$ cd message-queue-ee
- Для запуска ядра TQE(MQ) выполните команду:
$ tt start
Ответ должен иметь вид:
• Starting an instance [message-queue-ee:app]...
Выполните проверку успешности запуска приложения:
$ tt statusINSTANCE STATUS PID MODEmessage-queue-ee:app RUNNING 10841 RO
-
Далее необходимо настроить конфигурацию хранилища данных для очереди сообщений.
В данном случае используются преднастроенные файлы конфигурации
instances.ymlиconfig.yml. Они поставляются в дистрибутиве и после распаковки находятся в каталоге верхнего уровняmessage-queue-ee/. В них определяется топология кластера – какие узлы какую роль выполняют, а также задается группировка узлов по наборам реплик для резервирования данных. Также на этом шаге выполняется запуск базы данных в кластере.
$ tt replicaset vshard bootstrap .
Ответ должен иметь вид:
• Discovery application...Orchestrator: centralized configReplicasets state: bootstrapped• appFailover: offMaster: single• app 127.0.0.1:3301 rw• Bootstrapping vshard• Done.
Теперь узел с приложением должен перейти в режим работы RW:
$ tt statusINSTANCE STATUS PID MODEmessage-queue-ee:app RUNNING 10841 RW
Помимо варианта локального запуска, приведенного выше, TQE(MQ) можно запустить в Docker. Этот способ позволяет запустить несколько экземпляров TQE(MQ) в кластерной конфигурации:
-
Загрузите docker-образ командой (добавить в реестр): docker load < docker-message-queue-ee-vX.X.X.tar.gz
-
Внесите изменения в файл настроек
compose.yamlиз основного пакета поставки, например укажите свои файлы с настройками TQE:
services:core:image: tarantool/message-queue-ee:latestpull_policy: if_not_presentplatform: linux/amd64command: ["tarantool", "--name", "core", "--config", "config.yml"]# Для указания пользовательских настроек создайте# файлы настроек, укажите их и раскомментируйте раздел `volumes`.# volumes:# - "./config.yml:/message-queue-ee/config.yml"# - "./instances.yml:/message-queue-ee/instances.yml"environment:AUTO_BOOTSTRAP: "true"ports:- "3301:3301"- "8081:8081"extra_hosts:- "host.docker.internal:host-gateway"
- После внесения изменений добавьте вручную тег
latestдля того, чтобы изменения применились при установке:
$ docker tag tarantool/message-queue-ee:vX.X.X tarantool/message-queue-ee:latest
- Запустите TQE:
$ docker compose up -d
- Определение параметров модуля API (gRPC-сервера):
$ cat > config.service.yml <<EOFapp_name: MESSAGE_QUEUE_EE_APIapp_version: testcore_host: 0.0.0.0core_port: 18184grpc_host: 0.0.0.0grpc_port: 18182producer:enabled: truetarantool:user: userpass: passqueues:queue:connections:routers:- "localhost:3301"consumer:enabled: truepolling_timeout: 500mstarantool:user: userpass: passqueues:queue:connections:storage-1:- "localhost:3301"EOF
Отсутствие ошибок в терминале свидетельствует о правильной настройке модуля API (gRPC-сервера).
- Запуск модуля API (gRPC-сервера):
$ bin/message-queue-ee -d -config config.service.yml
В каталоге message-queue-ee/scripts расположены скрипты, с помощью которых можно проверить работоспособность приложения.
Для работы скриптов требуется установить утилиту grpcurl.
- Выполните команду:
$ ./scripts/test.sh localhost:18182Test passed.
Сообщение Test passed. означает о корректной установке и работе дистрибутива.
- Если существуют ошибки в установке или работе приложения, то при выполнении скрипта
test.shбудет выдано подобное сообщение:
$ ./scripts/test.sh localhost:18182Test failed: message id expected "1", got "2".
Оно информирует о возникновении проблем и необходимости убедиться в корректности настроек на этапе конфигурации приложения.
- Для проверки публикации в очередь сообщений выполните команду:
$ grpcurl -plaintext -format text -d 'queue:"queue" messages:{ payload:"TestGeneratedMessage" metadata:{ key:"date" value:"121225" } }' \localhost:18182 tarantool.queue_ee.Producer/Produce
Ожидаемый ответ:
ids: 7427358492534505472is_duplicates: false
Для проверки подписки нужно выполнить следующую последовательность действий:
- Откройте двунаправленный поток командой:
$ grpcurl -plaintext -d @ \localhost:18182 tarantool.queue_ee.ConsumerService/Subscribe
- Отправьте запрос на подписку:
{"subscribe_request": {"queue": "queue", "cursor": ""}}
Ожидаемый ответ:
{"notifications": [{"cursor": "AQAAAAAAAAAJAAAAAAAAAHN0b3JhZ2UtMQAAwICKOwxm","message": {"id": "7358127929391316992","queue": "queue","payload": "SomeData","metadata": [{"key": "date","value": "121225"}],"timestamp": "1753167571459351000"}},]}
Совпадение идентификаторов сообщения (поля id) при выполнении публикации сообщения в очереди
и подписки на нее говорит о корректном завершении установки приложения и настройки очереди сообщений.
Ниже приведены примеры по созданию приложений-клиентов TQE(MQ), которые позволят
публиковать сообщения в очередь и подписываться на сообщения очереди по протоколу gRPC с использованием
персистентной подписки. Примеры реализованы на языке go.
Для дальнейшей сборки примеров потребуются следующие установленные и подготовленные компоненты:
goверсии > 1.21.1;protocверсии > 26.1;protoc-gen-goверсии > 1.36.1;proto-gen-grpcверсии > 1.2.0;proto-файлы с описанием gRPC-протокола TQE(MQ).
- Создайте каталог с
go-проектом, в котором в дальнейшем будут размещаться примеры клиентов:
mkdir mqee-clientscd mqee-clientsgo mod init example.com/mqee-clients
- Установите зависимости для сборки примеров клиентов очереди TQE(MQ):
go get google.golang.org/grpcgo get google.golang.org/protobuf
Для генерации gRPC-клиента потребуются компилятор protoc и плагины для компиляции go-файлов.
- Создайте каталог для хранения необходимых для генерации gRPC-клиента исполняемых файлов:
mkdir -p bin
- Скачайте и установите дистрибутив
protoc, указав версию, тип операционной системы и архитектуру:
PB_REL="https://github.com/protocolbuffers/protobuf/releases"VERSION=26.1OS=linuxARCH=x86_64mkdir -p tmpcurl -o tmp/protoc.zip -L $PB_REL/download/v$VERSION/protoc-$VERSION-$OS-$ARCH.zipunzip -j ./tmp/protoc.zip bin/protoc -d bin
- Установите плагины для
protoc:
GOBIN=$(pwd)/bin go install -ldflags '-w -s' google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.0GOBIN=$(pwd)/bin go install -ldflags '-w -s' google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0
- Для создания примеров клиентов очереди необходимо получить архив с
.proto-файлами, которые поставляются в составе основного пакета TQE(MQ). Полученный архив распакуйте во временный каталог:
tar xfzv tmp/protobuf-message-queue-ee-v3.3.0.src.tar.gz -C tmp
- Предварительно для генерации go-файлов с использованием
protocустановите каталогbinв переменную окруженияPATH:
export PATH=$PWD/bin:$PATH
- Сгенерируйте go-файлы проекта:
mkdir -p protocolprotoc \--proto_path=$(pwd)/tmp/message-queue-ee/include/tarantool/message_queue_ee/ \--go_out=protocol \--go_opt=default_api_level=API_OPAQUE \--go_opt=module=gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/server/protocol \--go-grpc_out=protocol \--go-grpc_opt=module=gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/server/protocol \services/consumer.proto services/producer.proto messages/message.proto
- Подготовьте исходный файл примера клиента сервиса подписки на очередь сообщений:
mkdir -p cmd/consumertouch cmd/consumer/main.go
Содержимое файла:
package mainfunc main() {}
- Сначала необходимо создать транспорт для работы gRPC-соединения. Добавьте этот транспорт в функцию
main, указав при этом адрес подключения к gRPC-интерфейсу очереди:
ctx := context.Background()conn, err := grpc.DialContext(ctx,"localhost:18182",grpc.WithTransportCredentials(insecure.NewCredentials()),)if err != nil {panic(err)}defer conn.Close()
- Далее необходимо создать сам клиент сервиса подписки и отправить запрос на подписку на сообщения из
очереди
queue. Для использования персистентной подписки нужно указать идентификатор и время жизни подписки:
client := protocol.NewConsumerServiceClient(conn)subscribe, err := client.Subscribe(ctx)if err != nil {panic(err)}subreq := &protocol.SubscriptionRequest{}subreq.SetQueue("queue")subreq.SetConsumeId("example-consume-id")subreq.SetTtl(float32(time.Hour.Seconds()))streamreq := new(protocol.SubscriptionStreamRequest)streamreq.SetSubscribeRequest(subreq)if err := subscribe.Send(streamreq); err != nil {panic(err)}fmt.Println("Start listening...")
- После того как выполнено подключение, необходимо запустить обработку входящих сообщений. Для этого необходимо создать среду выполнения Go, которая будет получать сообщения из gRPC-потока, выводить краткое уведомление и обновлять состояние подписки:
go func() {for {recv, err := subscribe.Recv()if err != nil {panic(err)}switch recv.WhichResponse() {case protocol.SubscriptionStreamResponse_Notifications_case:nresp := recv.GetNotifications()for _, notification := range notifResp.GetNotifications() {cursor := notification.GetCursor()message := notification.GetMessage()fmt.Printf("Notification:\n\tQueue: %s\n\tCursor: %s\n\tId: %d\n\tPayload: %s\n\n",message.GetQueue(), cursor, message.GetId(), message.GetPayload())comreq := new(protocol.CommitRequest)comreq.SetCursor(cursor)streamreq := new(protocol.SubscriptionStreamRequest)streamreq.SetCommitRequest(comreq)if err := stream.Send(streamreq); err != nil {panic(err)}}fmt.Println("Messages received")case protocol.SubscriptionStreamResponse_CommitResponse_case:resp := recv.GetCommitResponse()cursor := resp.GetCursor()fmt.Printf("Consumer state has updated by %s cursor\n", cursor)}}}()
- Добавьте обработку системных сигналов:
notifyCtx, _ := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)<-notifyCtx.Done()fmt.Println("Exit")
Клиент сервиса подписки на сообщения создан.
- Выполните запуск клиента сервиса подписки на сообщения:
$ go run example.com/mqee-clients/cmd/consumerStart listening...
После этого клиент станет ожидать появления новых сообщений в очереди.
- Создайте файл, который будет содержать исходный код клиента сервиса для публикации сообщений в очередь:
mkdir -p cmd/producertouch cmd/producer/main.go
Первоначальное содержимое файла:
package mainfunc main() {}
- Создайте транспорт для работы gRPC-соединения. Добавьте этот транспорт в функцию
main, указав при этом адрес подключения к gRPC-интерфейсу очереди:
func main() {// ...ctx := context.Background()conn, err := grpc.DialContext(ctx,"localhost:18182",grpc.WithTransportCredentials(insecure.NewCredentials()),)if err != nil {panic(err)}defer conn.Close()// ...}
- Создайте сам клиент сервиса публикации для дальнейшей отправки сообщений в очередь:
client := protocol.NewProducerClient(conn)
- Отправьте запрос на публикацию сообщения с помощью вызова метода
Produce:
responseBatch, err := client.Produce(ctx, &protocol.ProduceRequest{Queue: "queue",Messages: []*protocol.ProduceMessage{{Payload: []byte("First message from batch")},{Payload: []byte("Second message from batch")},},})if err != nil {panic(err)}fmt.Printf("Messages published with ids: %v\n", responseBatch.Ids)
Теперь при запуске клиента будет отправлено два сообщения с помощью вызова метода Produce:
$ go run example.com/mqee-clients/cmd/producerMessages published with ids: [115720 115721]
Если в другом окне терминала запущен клиент-подписчик (cmd/consumer/main.go), то вы получите уведомление
о новых сообщениях:
Notification:Queue: queueCursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQRId: 115729Payload: First message from batchConsumer state has updatedNotification:Queue: queueCursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQSId: 115730Payload: Second message from batchConsumer state has updated
Для того чтобы получить доступ к показателям состояния TQE(MQ) через интернет-браузер, вам необходимо установить Tarantool Cluster Manager (TCM).
- Добавьте вкладку TQE к файлу конфигурации
tcm:
# tcm.yamlfeature:tqe: True
- Перезапустите Tarantool Cluster Manager.
После включения функции вкладка TQE появляется на странице TCM и предоставляет доступ к страницам Метрики (Metrics) и Очереди (Queues).
Страницу Метрики (Metrics) можно просматривать в двух форматах:
- В виде графиков (Charts);
- В виде таблицы (Table).
Страница Очереди (Queues) отображает информацию о работе каждой очереди в реальном времени, включая:
- Задержка (Latency) — временная задержка (в мс) между добавлением сообщения в очередь и его обработкой.
- Максимальный размер выборки (Poll max batch) — количество сообщений, извлекаемых за один запрос на обработку.
- Режим дедупликации (Deduplication mode) — указывает текущий способ обработки дублирующихся сообщений для очереди:
basic- базовый, используется по умолчанию;extended- расширенный;keep_latest- оставлять последнее;keep_first- оставлять первое.
Q: Получаю ошибку о нехватке зависимостей no required module provides package, например:
protocol/consumer_grpc.pb.go:11:2: no required module provides package google.golang.org/grpc; to add it:go get google.golang.org/grpcprotocol/consumer_grpc.pb.go:12:2: no required module provides package google.golang.org/grpc/codes; to add it:go get google.golang.org/grpc/codesprotocol/consumer_grpc.pb.go:13:2: no required module provides package google.golang.org/grpc/status; to add it:go get google.golang.org/grpc/statusprotocol/consumer.pb.go:10:2: no required module provides package google.golang.org/protobuf/reflect/protoreflect; to add it:go get google.golang.org/protobuf/reflect/protoreflectprotocol/consumer.pb.go:11:2: no required module provides package google.golang.org/protobuf/runtime/protoimpl; to add it:go get google.golang.org/protobuf/runtime/protoimplcmd/producer/main.go:9:2: no required module provides package google.golang.org/grpc/credentials/insecure; to add it:go get google.golang.org/grpc/credentials/insecure
A: Выполните команду go mod tidy для установки всех зависимостей.
Q: Получаю ошибку о некорректном bucket_id, например:
# String sharding key.$ grpcurl -plaintext -format text -d 'queue:"queue" sharding_key:"abc" messages:{ payload:"absd" }' localhost:18182 tarantool.queue_ee.Producer/ProduceERROR:Code: InternalMessage: failed to publish messages to tarantool: bucket id is out of range: 0 (total 1000)
A: Возможно в системе настроен режим статического шардирования,
при котором бакеты с данными распределяются по репликам вручную. В таком случае указываемый sharding_key должен
соответствовать одному из настроенных bucket_id:
# String sharding key.$ grpcurl -plaintext -format text -d 'queue:"queue" sharding_key:"1" messages:{ payload:"absd" }' localhost:18182 tarantool.queue_ee.Producer/Produce
Q: Получаю ошибку публикации сообщения
failed to publish messages to tarantool: storage.produce error: id (*): payload may be corrupted:
$ grpcurl -plaintext -format text -d 'queue:"dedup" messages:{ payload:"corruptd" deduplication_key:"dk1" }' localhost:18182 tarantool.queue_ee.Producer/ProduceERROR:Code: InternalMessage: failed to publish messages to tarantool: storage.produce error: id (7358133729337802752): payload may be corrupted (CustomError, code 0x0), see ..../code/tarantool/message-queue-ee/app/queue.lua line 805
A: Включен режим расширенной дедупликации.
Система обнаружила 2 сообщения с одинаковым deduplication_key, но разным значением в payload.
.├── bin│ ├── protoc│ ├── protoc-gen-go│ └── protoc-gen-go-grpc├── cmd│ ├── consumer│ │ └── main.go│ └── producer│ └── main.go├── go.mod├── go.sum├── protocol│ ├── consumer.pb.go│ ├── consumer_grpc.pb.go│ ├── message.pb.go│ ├── producer.pb.go│ └── producer_grpc.pb.go└── tmp├── message-queue-ee│ └── include│ └── tarantool│ ├── message_queue_ee│ │ ├── messages│ │ │ └── message.proto│ │ └── services│ │ ├── consumer.proto│ │ └── producer.proto│ └── queue_ee│ ├── messages│ │ └── message.proto│ └── services│ ├── consumer.proto│ └── producer.proto└── protobuf-message-queue-ee-1.5.2.src.tar.gz16 directories, 20 files
cmd/consumer/main.go
package mainimport ("context""fmt""os/signal""syscall""time""github.com/google/uuid""github.com/vmihailenco/msgpack/v5""gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/internal/protocol""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure")type ExamplePayload struct {Foo stringBar string}func main() {ctx := context.Background()conn, err := grpc.DialContext(ctx,"localhost:18182",grpc.WithTransportCredentials(insecure.NewCredentials()),)if err != nil {panic(err)}defer conn.Close()client := protocol.NewConsumerServiceClient(conn)subscribe, err := client.Subscribe(ctx)if err != nil {panic(err)}req := &protocol.SubscriptionRequest{}req.SetQueue("queue")req.SetConsumeId("example-consume-id-" + uuid.NewString())req.SetTtl(float32(time.Minute.Seconds()))sreq := new(protocol.SubscriptionStreamRequest)sreq.SetSubscribeRequest(req)if err := subscribe.Send(sreq); err != nil {panic(err)}fmt.Println("Start listening...")go func() {for {recv, err := subscribe.Recv()if err != nil {panic(err)}switch recv.WhichResponse() {case protocol.SubscriptionStreamResponse_Notifications_case:notifResp := recv.GetNotifications()if notifResp == nil {panic("returned empty list of notifications")}fmt.Printf("Received %d messages\n", len(notifResp.GetNotifications()))for _, notification := range notifResp.GetNotifications() {example := ExamplePayload{}err = msgpack.Unmarshal(notification.GetMessage().GetPayload(), &example)if err != nil {panic(err)}fmt.Printf("foo: %s bar: %s\n", example.Foo, example.Bar)cursor := notification.GetCursor()comreq := new(protocol.CommitRequest)comreq.SetCursor(cursor)streamreq := new(protocol.SubscriptionStreamRequest)streamreq.SetCommitRequest(comreq)if err := subscribe.Send(streamreq); err != nil {panic(err)}}case protocol.SubscriptionStreamResponse_CommitResponse_case:resp := recv.GetCommitResponse()cursor := resp.GetCursor()fmt.Printf("Consumer state has updated by %s cursor\n", cursor)default:}}}()notifyCtx, _ := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)<-notifyCtx.Done()fmt.Println("exit")}
cmd/producer/main.go:
package mainimport ("context""fmt""time""github.com/vmihailenco/msgpack/v5""gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/internal/protocol""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure")type ExamplePayload struct {Foo stringBar string}func main() {ctx := context.Background()conn, err := grpc.DialContext(ctx,"localhost:18182",grpc.WithTransportCredentials(insecure.NewCredentials()),)if err != nil {panic(err)}defer conn.Close()client := protocol.NewProducerClient(conn)payload, err := msgpack.Marshal(&ExamplePayload{Foo: "ooF",Bar: time.Now().Format(time.RFC3339Nano),})if err != nil {panic(err)}pair := new(protocol.Pair)pair.SetKey([]byte("Content-Type"))pair.SetValue([]byte("application/protobuf"))batchReq := &protocol.ProduceRequest{}batchReq.SetQueue("queue")msg1 := &protocol.ProduceMessage{}msg1.SetRoutingKey([]byte("rk2"))msg1.SetPayload(payload)msg1.SetMetadata([]*protocol.Pair{pair})msg2 := &protocol.ProduceMessage{}msg1.SetRoutingKey([]byte("rk1"))msg2.SetPayload(payload)msg2.SetMetadata([]*protocol.Pair{pair})batchReq.SetMessages([]*protocol.ProduceMessage{msg1, msg2})responseBatch, err := client.Produce(ctx, batchReq)if err != nil {panic(err)}fmt.Printf("Messages batch published with ids: %v\n", responseBatch.GetIds())}