VK Docs logo
Помощь
Обновлена 9 июня 2026 г. в 12:36

Руководство пользователя

Настоящее Руководство содержит пояснения к продукту, описание процесса настройки быстрого запуска, набор примеров по созданию приложений-клиентов Tarantool Queue Enterprise (Message Queue), а также другую информацию, направленную на улучшение взаимодействия пользователя с TQE(MQ).

Введение

Tarantool Queue Enterprise (Message Queue) (TQE(MQ)) — это высокопроизводительная распределенная система управления очередями сообщений. Продукт предназначен для организации надежного обмена данными между сервисами в задачах, критичных для скорости и целостности информации.

TQE(MQ) выступает в роли брокера сообщений, обеспечивающего строгую маршрутизацию, фильтрацию и гарантированную доставку сообщений до потребителя при:

  • Обработке запросов к базам данных;
  • Синхронизации распределенных сервисов;
  • Проведении транзакций в финансовых системах.

Ключевые возможности:

  • Гарантия FIFO (First In, First Out) — на уровне набора реплик (шарда) сообщения обрабатываются в строгом порядке поступления.
  • Избирательная фильтрация — потребители получают только те сообщения, на которые они подписались.
  • Горизонтальное масштабирование — архитектура набора микросервисов с шардированием позволяет наращивать производительность линейно.
  • Отказоустойчивость — автоматическое переключение лидера гарантирует доступность очереди при сбоях отдельных узлов.
  • Дедупликация сообщений — настройки очереди позволяют управлять режимами обработки потенциально одинаковых сообщений.
  • Гибкий ключ шардирования — система позволяет либо указать целевой шард явно, либо автоматически распределять нагрузку равномерно.

Быстрый запуск очереди сообщений

Данный раздел предназначен для ознакомления с продуктом TQE(MQ).

В разделе приведены варианты быстрой установки и запуска приложения – локально и c помощью Docker. Информация об установке TQE(MQ) для промышленного использования приведена в ссылка.

Далее описываются шаги по настройке модуля API (gRPC-сервера) и параметров подключения к очереди. После этого проверяется корректность установки и работоспособность приложения.

Системные требования

TQE(MQ) поддерживает установку на следующие операционные системы (ОС):

  • RED OS 7.3;
  • Astra Linux.

Также возможна установка на другие ОС – ALT Linux, AlmaLinux, Ubuntu, CentOS – но полноценная работоспособность не гарантируется.

Для работы приложения необходима библиотека glibc версии 2.17-260.el7_6.6 и выше. Для проверки текущей установленной версии glibc и обновления необходимо выполнить:

$ rpm -q glibc
$ yum update glibc

Установка экземпляра

  1. Распакуйте архив с дистрибутивом TQE(MQ):
$ tar -xzf <имя_архива>.tar.gz
  1. Перейдите в каталог с распакованным дистрибутивом:
$ cd message-queue-ee

Запуск ядра

  1. Для запуска ядра TQE(MQ) выполните команду:
$ tt start

Ответ должен иметь вид:

• Starting an instance [message-queue-ee:app]...

Выполните проверку успешности запуска приложения:

$ tt status INSTANCE              STATUS   PID    MODE message-queue-ee:app  RUNNING  10841  RO
  1. Далее необходимо настроить конфигурацию хранилища данных для очереди сообщений.

    В данном случае используются преднастроенные файлы конфигурации instances.yml и config.yml. Они поставляются в дистрибутиве и после распаковки находятся в каталоге верхнего уровня message-queue-ee/. В них определяется топология кластера – какие узлы какую роль выполняют, а также задается группировка узлов по наборам реплик для резервирования данных. Также на этом шаге выполняется запуск базы данных в кластере.

$ tt replicaset vshard bootstrap .

Ответ должен иметь вид:

• Discovery application...Orchestrator:      centralized configReplicasets state: bootstrapped• app  Failover: off  Master:   single    • app 127.0.0.1:3301 rw   • Bootstrapping vshard   • Done.

Теперь узел с приложением должен перейти в режим работы RW:

$ tt status INSTANCE              STATUS   PID    MODE message-queue-ee:app  RUNNING  10841  RW

Запуск приложения в Docker

Помимо варианта локального запуска, приведенного выше, TQE(MQ) можно запустить в Docker. Этот способ позволяет запустить несколько экземпляров TQE(MQ) в кластерной конфигурации:

  1. Загрузите docker-образ командой (добавить в реестр): docker load < docker-message-queue-ee-vX.X.X.tar.gz

  2. Внесите изменения в файл настроек compose.yaml из основного пакета поставки, например укажите свои файлы с настройками TQE:

services:  core:    image: tarantool/message-queue-ee:latest    pull_policy: if_not_present    platform: linux/amd64    command: ["tarantool", "--name", "core", "--config", "config.yml"]    # Для указания пользовательских настроек создайте    # файлы настроек, укажите их и раскомментируйте раздел `volumes`.    # volumes:    #  - "./config.yml:/message-queue-ee/config.yml"    #  - "./instances.yml:/message-queue-ee/instances.yml"    environment:      AUTO_BOOTSTRAP: "true"    ports:      - "3301:3301"      - "8081:8081"    extra_hosts:      - "host.docker.internal:host-gateway"
  1. После внесения изменений добавьте вручную тег latest для того, чтобы изменения применились при установке:
$ docker tag tarantool/message-queue-ee:vX.X.X tarantool/message-queue-ee:latest
  1. Запустите TQE:
$ docker compose up -d

Настройка конфигурации после установки

  1. Определение параметров модуля API (gRPC-сервера):
$ cat > config.service.yml <<EOFapp_name: MESSAGE_QUEUE_EE_APIapp_version: testcore_host: 0.0.0.0core_port: 18184grpc_host: 0.0.0.0grpc_port: 18182producer:  enabled: true  tarantool:    user: user    pass: pass    queues:      queue:        connections:          routers:            - "localhost:3301"consumer:  enabled: true  polling_timeout: 500ms  tarantool:    user: user    pass: pass    queues:      queue:        connections:          storage-1:            - "localhost:3301"EOF

Отсутствие ошибок в терминале свидетельствует о правильной настройке модуля API (gRPC-сервера).

  1. Запуск модуля API (gRPC-сервера):
$ bin/message-queue-ee -d -config config.service.yml

Проверка установки приложения

Использование скриптов проверки работоспособности

В каталоге message-queue-ee/scripts расположены скрипты, с помощью которых можно проверить работоспособность приложения.

Для работы скриптов требуется установить утилиту grpcurl.

  1. Выполните команду:
$ ./scripts/test.sh localhost:18182Test passed.

Сообщение Test passed. означает о корректной установке и работе дистрибутива.

  1. Если существуют ошибки в установке или работе приложения, то при выполнении скрипта test.sh будет выдано подобное сообщение:
$ ./scripts/test.sh localhost:18182Test failed: message id expected "1", got "2".

Оно информирует о возникновении проблем и необходимости убедиться в корректности настроек на этапе конфигурации приложения.

Проверка публикации в очередь сообщений

  1. Для проверки публикации в очередь сообщений выполните команду:
$ grpcurl -plaintext -format text -d 'queue:"queue" messages:{ payload:"TestGeneratedMessage" metadata:{ key:"date" value:"121225" } }' \    localhost:18182 tarantool.queue_ee.Producer/Produce

Ожидаемый ответ:

ids: 7427358492534505472is_duplicates: false

Проверка подписки на очередь сообщений

Для проверки подписки нужно выполнить следующую последовательность действий:

  1. Откройте двунаправленный поток командой:
$ grpcurl -plaintext -d @ \    localhost:18182 tarantool.queue_ee.ConsumerService/Subscribe
  1. Отправьте запрос на подписку:
{"subscribe_request": {"queue": "queue", "cursor": ""}}

Ожидаемый ответ:

{  "notifications": [    {      "cursor": "AQAAAAAAAAAJAAAAAAAAAHN0b3JhZ2UtMQAAwICKOwxm",      "message": {        "id": "7358127929391316992",        "queue": "queue",        "payload": "SomeData",        "metadata": [          {            "key": "date",            "value": "121225"          }        ],        "timestamp": "1753167571459351000"      }    },  ]}

Совпадение идентификаторов сообщения (поля id) при выполнении публикации сообщения в очереди и подписки на нее говорит о корректном завершении установки приложения и настройки очереди сообщений.

Примеры сценариев использования очереди

Ниже приведены примеры по созданию приложений-клиентов TQE(MQ), которые позволят публиковать сообщения в очередь и подписываться на сообщения очереди по протоколу gRPC с использованием персистентной подписки. Примеры реализованы на языке go.

Подготовка

Для дальнейшей сборки примеров потребуются следующие установленные и подготовленные компоненты:

  • go версии > 1.21.1;
  • protoc версии > 26.1;
  • protoc-gen-go версии > 1.36.1;
  • proto-gen-grpc версии > 1.2.0;
  • proto-файлы с описанием gRPC-протокола TQE(MQ).

Создание каталога для сборки клиентов очереди

  1. Создайте каталог с go-проектом, в котором в дальнейшем будут размещаться примеры клиентов:
mkdir mqee-clientscd mqee-clientsgo mod init example.com/mqee-clients
  1. Установите зависимости для сборки примеров клиентов очереди TQE(MQ):
go get google.golang.org/grpcgo get google.golang.org/protobuf

Генерация gRPC-клиента на Go

Для генерации gRPC-клиента потребуются компилятор protoc и плагины для компиляции go-файлов.

  1. Создайте каталог для хранения необходимых для генерации gRPC-клиента исполняемых файлов:
mkdir -p bin
  1. Скачайте и установите дистрибутив protoc, указав версию, тип операционной системы и архитектуру:
PB_REL="https://github.com/protocolbuffers/protobuf/releases"VERSION=26.1OS=linuxARCH=x86_64mkdir -p tmpcurl -o tmp/protoc.zip -L $PB_REL/download/v$VERSION/protoc-$VERSION-$OS-$ARCH.zipunzip -j ./tmp/protoc.zip bin/protoc -d bin
  1. Установите плагины для protoc:
GOBIN=$(pwd)/bin go install -ldflags '-w -s' google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.0GOBIN=$(pwd)/bin go install -ldflags '-w -s' google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0
  1. Для создания примеров клиентов очереди необходимо получить архив с .proto-файлами, которые поставляются в составе основного пакета TQE(MQ). Полученный архив распакуйте во временный каталог:
tar xfzv tmp/protobuf-message-queue-ee-v3.3.0.src.tar.gz -C tmp
  1. Предварительно для генерации go-файлов с использованием protoc установите каталог bin в переменную окружения PATH:
export PATH=$PWD/bin:$PATH
  1. Сгенерируйте go-файлы проекта:
mkdir -p protocolprotoc \ --proto_path=$(pwd)/tmp/message-queue-ee/include/tarantool/message_queue_ee/ \ --go_out=protocol \ --go_opt=default_api_level=API_OPAQUE \ --go_opt=module=gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/server/protocol \ --go-grpc_out=protocol \ --go-grpc_opt=module=gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/server/protocol \ services/consumer.proto services/producer.proto messages/message.proto

Создание клиента-сервиса подписки на сообщения

  1. Подготовьте исходный файл примера клиента сервиса подписки на очередь сообщений:
mkdir -p cmd/consumertouch cmd/consumer/main.go

Содержимое файла:

package mainfunc main() {}
  1. Сначала необходимо создать транспорт для работы gRPC-соединения. Добавьте этот транспорт в функцию main, указав при этом адрес подключения к gRPC-интерфейсу очереди:
ctx := context.Background()conn, err := grpc.DialContext(	ctx,	"localhost:18182",	grpc.WithTransportCredentials(insecure.NewCredentials()),)if err != nil {	panic(err)}defer conn.Close()
  1. Далее необходимо создать сам клиент сервиса подписки и отправить запрос на подписку на сообщения из очереди queue. Для использования персистентной подписки нужно указать идентификатор и время жизни подписки:
client := protocol.NewConsumerServiceClient(conn)subscribe, err := client.Subscribe(ctx)if err != nil {	panic(err)}subreq := &protocol.SubscriptionRequest{}subreq.SetQueue("queue")subreq.SetConsumeId("example-consume-id")subreq.SetTtl(float32(time.Hour.Seconds()))streamreq := new(protocol.SubscriptionStreamRequest)streamreq.SetSubscribeRequest(subreq)if err := subscribe.Send(streamreq); err != nil {	panic(err)}fmt.Println("Start listening...")
  1. После того как выполнено подключение, необходимо запустить обработку входящих сообщений. Для этого необходимо создать среду выполнения Go, которая будет получать сообщения из gRPC-потока, выводить краткое уведомление и обновлять состояние подписки:
go func() {	for {		recv, err := subscribe.Recv()		if err != nil {			panic(err)		}		switch recv.WhichResponse() {		case protocol.SubscriptionStreamResponse_Notifications_case:			nresp := recv.GetNotifications()			for _, notification := range notifResp.GetNotifications() {				cursor := notification.GetCursor()				message := notification.GetMessage()				fmt.Printf("Notification:\n\tQueue: %s\n\tCursor: %s\n\tId: %d\n\tPayload: %s\n\n",					message.GetQueue(), cursor, message.GetId(), message.GetPayload())								comreq := new(protocol.CommitRequest)				comreq.SetCursor(cursor)				streamreq := new(protocol.SubscriptionStreamRequest)				streamreq.SetCommitRequest(comreq)				if err := stream.Send(streamreq); err != nil {					panic(err)				}			}			fmt.Println("Messages received")		case protocol.SubscriptionStreamResponse_CommitResponse_case:			resp := recv.GetCommitResponse()			cursor := resp.GetCursor()			fmt.Printf("Consumer state has updated by %s cursor\n", cursor)		}	}}()
  1. Добавьте обработку системных сигналов:
notifyCtx, _ := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)<-notifyCtx.Done()fmt.Println("Exit")

Клиент сервиса подписки на сообщения создан.

  1. Выполните запуск клиента сервиса подписки на сообщения:
$ go run example.com/mqee-clients/cmd/consumerStart listening...

После этого клиент станет ожидать появления новых сообщений в очереди.

Создание клиента-сервиса публикации сообщений

  1. Создайте файл, который будет содержать исходный код клиента сервиса для публикации сообщений в очередь:
mkdir -p cmd/producertouch cmd/producer/main.go

Первоначальное содержимое файла:

package mainfunc main() {}
  1. Создайте транспорт для работы gRPC-соединения. Добавьте этот транспорт в функцию main, указав при этом адрес подключения к gRPC-интерфейсу очереди:
func main() {	// ...	ctx := context.Background()	conn, err := grpc.DialContext(		ctx,		"localhost:18182",		grpc.WithTransportCredentials(insecure.NewCredentials()),	)	if err != nil {		panic(err)	}	defer conn.Close()	// ...}
  1. Создайте сам клиент сервиса публикации для дальнейшей отправки сообщений в очередь:
client := protocol.NewProducerClient(conn)
  1. Отправьте запрос на публикацию сообщения с помощью вызова метода Produce:
responseBatch, err := client.Produce(ctx, &protocol.ProduceRequest{	Queue: "queue",	Messages: []*protocol.ProduceMessage{		{Payload: []byte("First message from batch")},		{Payload: []byte("Second message from batch")},	},})if err != nil {	panic(err)}fmt.Printf("Messages published with ids: %v\n", responseBatch.Ids)

Теперь при запуске клиента будет отправлено два сообщения с помощью вызова метода Produce:

$ go run example.com/mqee-clients/cmd/producerMessages published with ids: [115720 115721]

Если в другом окне терминала запущен клиент-подписчик (cmd/consumer/main.go), то вы получите уведомление о новых сообщениях:

Notification:	Queue: queue	Cursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQR	Id: 115729	Payload: First message from batchConsumer state has updatedNotification:	Queue: queue	Cursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQS	Id: 115730	Payload: Second message from batchConsumer state has updated

Наблюдение за состоянием TQE(MQ) в браузере

Для того чтобы получить доступ к показателям состояния TQE(MQ) через интернет-браузер, вам необходимо установить Tarantool Cluster Manager (TCM).

  1. Добавьте вкладку TQE к файлу конфигурации tcm:
# tcm.yaml    feature:        tqe: True
  1. Перезапустите Tarantool Cluster Manager.

После включения функции вкладка TQE появляется на странице TCM и предоставляет доступ к страницам Метрики (Metrics) и Очереди (Queues).

Страницу Метрики (Metrics) можно просматривать в двух форматах:

  • В виде графиков (Charts);
  • В виде таблицы (Table).

Страница Очереди (Queues) отображает информацию о работе каждой очереди в реальном времени, включая:

  • Задержка (Latency) — временная задержка (в мс) между добавлением сообщения в очередь и его обработкой.
  • Максимальный размер выборки (Poll max batch) — количество сообщений, извлекаемых за один запрос на обработку.
  • Режим дедупликации (Deduplication mode) — указывает текущий способ обработки дублирующихся сообщений для очереди:
    • basic - базовый, используется по умолчанию;
    • extended - расширенный;
    • keep_latest - оставлять последнее;
    • keep_first - оставлять первое.

Дополнительная информация

FAQ

Q: Получаю ошибку о нехватке зависимостей no required module provides package, например:

protocol/consumer_grpc.pb.go:11:2: no required module provides package google.golang.org/grpc; to add it:	go get google.golang.org/grpcprotocol/consumer_grpc.pb.go:12:2: no required module provides package google.golang.org/grpc/codes; to add it:	go get google.golang.org/grpc/codesprotocol/consumer_grpc.pb.go:13:2: no required module provides package google.golang.org/grpc/status; to add it:	go get google.golang.org/grpc/statusprotocol/consumer.pb.go:10:2: no required module provides package google.golang.org/protobuf/reflect/protoreflect; to add it:	go get google.golang.org/protobuf/reflect/protoreflectprotocol/consumer.pb.go:11:2: no required module provides package google.golang.org/protobuf/runtime/protoimpl; to add it:	go get google.golang.org/protobuf/runtime/protoimplcmd/producer/main.go:9:2: no required module provides package google.golang.org/grpc/credentials/insecure; to add it:	go get google.golang.org/grpc/credentials/insecure

A: Выполните команду go mod tidy для установки всех зависимостей.

Q: Получаю ошибку о некорректном bucket_id, например:

# String sharding key.$ grpcurl -plaintext -format text -d 'queue:"queue" sharding_key:"abc" messages:{ payload:"absd" }' localhost:18182 tarantool.queue_ee.Producer/ProduceERROR:  Code: Internal  Message: failed to publish messages to tarantool: bucket id is out of range: 0 (total 1000)

A: Возможно в системе настроен режим статического шардирования, при котором бакеты с данными распределяются по репликам вручную. В таком случае указываемый sharding_key должен соответствовать одному из настроенных bucket_id:

# String sharding key.$ grpcurl -plaintext -format text -d 'queue:"queue" sharding_key:"1" messages:{ payload:"absd" }' localhost:18182 tarantool.queue_ee.Producer/Produce

Q: Получаю ошибку публикации сообщения failed to publish messages to tarantool: storage.produce error: id (*): payload may be corrupted:

$ grpcurl -plaintext -format text -d 'queue:"dedup" messages:{ payload:"corruptd" deduplication_key:"dk1" }' localhost:18182 tarantool.queue_ee.Producer/ProduceERROR:  Code: Internal  Message: failed to publish messages to tarantool: storage.produce error: id (7358133729337802752): payload may be corrupted (CustomError, code 0x0), see ..../code/tarantool/message-queue-ee/app/queue.lua line 805

A: Включен режим расширенной дедупликации. Система обнаружила 2 сообщения с одинаковым deduplication_key, но разным значением в payload.

Файловая структура

.├── bin│   ├── protoc│   ├── protoc-gen-go│   └── protoc-gen-go-grpc├── cmd│   ├── consumer│   │   └── main.go│   └── producer│       └── main.go├── go.mod├── go.sum├── protocol│   ├── consumer.pb.go│   ├── consumer_grpc.pb.go│   ├── message.pb.go│   ├── producer.pb.go│   └── producer_grpc.pb.go└── tmp    ├── message-queue-ee    │   └── include    │       └── tarantool    │           ├── message_queue_ee    │           │   ├── messages    │           │   │   └── message.proto    │           │   └── services    │           │       ├── consumer.proto    │           │       └── producer.proto    │           └── queue_ee    │               ├── messages    │               │   └── message.proto    │               └── services    │                   ├── consumer.proto    │                   └── producer.proto    └── protobuf-message-queue-ee-1.5.2.src.tar.gz16 directories, 20 files

Листинги файлов

cmd/consumer/main.go

package mainimport (	"context"	"fmt"	"os/signal"	"syscall"	"time"	"github.com/google/uuid"	"github.com/vmihailenco/msgpack/v5"	"gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/internal/protocol"	"google.golang.org/grpc"	"google.golang.org/grpc/credentials/insecure")type ExamplePayload struct {	Foo string	Bar string}func main() {	ctx := context.Background()	conn, err := grpc.DialContext(		ctx,		"localhost:18182",		grpc.WithTransportCredentials(insecure.NewCredentials()),	)	if err != nil {		panic(err)	}	defer conn.Close()	client := protocol.NewConsumerServiceClient(conn)	subscribe, err := client.Subscribe(ctx)	if err != nil {		panic(err)	}	req := &protocol.SubscriptionRequest{}	req.SetQueue("queue")	req.SetConsumeId("example-consume-id-" + uuid.NewString())	req.SetTtl(float32(time.Minute.Seconds()))	sreq := new(protocol.SubscriptionStreamRequest)	sreq.SetSubscribeRequest(req)	if err := subscribe.Send(sreq); err != nil {		panic(err)	}	fmt.Println("Start listening...")	go func() {		for {			recv, err := subscribe.Recv()			if err != nil {				panic(err)			}			switch recv.WhichResponse() {			case protocol.SubscriptionStreamResponse_Notifications_case:				notifResp := recv.GetNotifications()				if notifResp == nil {					panic("returned empty list of notifications")				}				fmt.Printf("Received %d messages\n", len(notifResp.GetNotifications()))				for _, notification := range notifResp.GetNotifications() {					example := ExamplePayload{}					err = msgpack.Unmarshal(notification.GetMessage().GetPayload(), &example)					if err != nil {						panic(err)					}					fmt.Printf("foo: %s bar: %s\n", example.Foo, example.Bar)					cursor := notification.GetCursor()					comreq := new(protocol.CommitRequest)					comreq.SetCursor(cursor)					streamreq := new(protocol.SubscriptionStreamRequest)					streamreq.SetCommitRequest(comreq)					if err := subscribe.Send(streamreq); err != nil {						panic(err)					}				}			case protocol.SubscriptionStreamResponse_CommitResponse_case:				resp := recv.GetCommitResponse()				cursor := resp.GetCursor()				fmt.Printf("Consumer state has updated by %s cursor\n", cursor)			default:			}		}	}()	notifyCtx, _ := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)	<-notifyCtx.Done()	fmt.Println("exit")}

cmd/producer/main.go:

package mainimport (	"context"	"fmt"	"time"	"github.com/vmihailenco/msgpack/v5"	"gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/internal/protocol"	"google.golang.org/grpc"	"google.golang.org/grpc/credentials/insecure")type ExamplePayload struct {	Foo string	Bar string}func main() {	ctx := context.Background()	conn, err := grpc.DialContext(		ctx,		"localhost:18182",		grpc.WithTransportCredentials(insecure.NewCredentials()),	)	if err != nil {		panic(err)	}	defer conn.Close()	client := protocol.NewProducerClient(conn)	payload, err := msgpack.Marshal(&ExamplePayload{		Foo: "ooF",		Bar: time.Now().Format(time.RFC3339Nano),	})	if err != nil {		panic(err)	}	pair := new(protocol.Pair)	pair.SetKey([]byte("Content-Type"))	pair.SetValue([]byte("application/protobuf"))	batchReq := &protocol.ProduceRequest{}	batchReq.SetQueue("queue")	msg1 := &protocol.ProduceMessage{}	msg1.SetRoutingKey([]byte("rk2"))	msg1.SetPayload(payload)	msg1.SetMetadata([]*protocol.Pair{pair})	msg2 := &protocol.ProduceMessage{}	msg1.SetRoutingKey([]byte("rk1"))	msg2.SetPayload(payload)	msg2.SetMetadata([]*protocol.Pair{pair})	batchReq.SetMessages([]*protocol.ProduceMessage{msg1, msg2})	responseBatch, err := client.Produce(ctx, batchReq)	if err != nil {		panic(err)	}	fmt.Printf("Messages batch published with ids: %v\n", responseBatch.GetIds())}