TDG Documentation portal logo
Помощь
Обновлена 22 июня 2026 г. в 15:31

Запросы к данным через Kafka-коннектор

В руководстве пошагово демонстрируется пример работы TDG c Kafka - от настройки коннектора до выполнения обмена данными. Пример реализует следующую логику:

  • получение объекта из топика (topic) Kafka;
  • изменение обработчиком объекта, полученного в TDG;
  • сохранение измененного объекта в хранилище;
  • отправка измененного объекта в тот же топик.

Для выполнения примера требуются:

Руководство включает в себя следующие шаги:

Настройка коннектора

Коннектор в TDG можно настроить двумя способами:

  • указать параметры коннектора в файле конфигурации .yml;
  • добавить или изменить коннектор, используя веб-интерфейс (только для input-коннекторов). Чтобы узнать больше, обратитесь к разделу Вкладка Connectors.

Зададим настройки первым способом, в файле .yml.

Создайте файл конфигурации config.yml со следующими настройками:

types:  __file: model.avscconnector:  input:    - name: from_kafka      type: kafka      brokers:        - localhost:9092      topics:        - cities      group_id: kafka      routing_key: add_kafka  output:    - name: to_kafka      type: kafka      brokers:        - localhost:9092      topic: citiesinput_processor:  handlers:    - key: add_kafka      function: kafka_input_handler.call  storage:     - key: input_key       type: Cityoutput_processor:  input_key:    handlers:      - function: kafka_output_handler.call        outputs:          - to_kafka

В файле указываются:

  • используемая модель данных;
  • секция connector - настройки input- и output-коннекторов. Разделу input соответствует функция consumer, разделу output - функция producer. Настройки включают в себя имя коннектора, адрес сервера (брокера), а также название топика (cities), к которому будет обращаться TDG. В разделе input также определен ключ маршрутизации routing_key со значением add_kafka;
  • секция input_processor - обработка входящих данных. Здесь заданы ключ для хранилища (input_key), в котором будет сохранен объект City, а также определен обработчик (kafka_input_handler) для ключа маршрутизации add_kafka. Узнать больше про настройку input_processor можно в соответствующем разделе справочника;
  • секция output_processor - обработка исходящих данных. Здесь определены имя хранилища (input_key) и функция-обработчик (kafka_output_handler). Кроме того, задан параметр outputs (to_kafka) - это означает, что объект будет отправлен обратно в топик Kafka. Узнать больше про настройку output_processor можно в соответствующем разделе справочника.

Чтобы ознакомиться со всеми доступными параметрами конфигурации для коннектора Kafka, обратитесь к справочнику по настройке коннектора.

Реализация обработчиков

Обработка входящих данных

Данные в формате JSON, приходящие из Kafka, попадают в обработчики (handlers), заданные в файле конфигурации в секции input_processor.

В функции обработчика можно модифицировать поступившую информации, а также обернуть данные в JSON с ключом routing_key для дальнейшей обработки.

В файле kafka_input_handler.lua укажите функцию, которая будет запускаться в input-процессоре. Функция увеличит значение population и задаст ключу routing_key значение input_key:

#!/usr/bin/env tarantoolreturn {    call = function(params)        params.obj.population = params.obj.population + 1        params.routing_key = "input_key"        return params    end}

Обработка исходящих данных

В разделе output указывается, как объект будет изменен в обработчике перед отправкой во внешние системы. Обработка выполняется после успешного сохранения объектов на экземплярах с ролью storage.

Чтобы обработать объект в output_processor, в файле конфигурации укажите название хранилища (input_key), в котором был сохранен объект, а затем определите обработчик для него (секция handlers).

Создайте файл kafka_output_handler.lua. В нем будет записана функция, вызываемая output-обработчиком. Функция вернет объект City:

#!/usr/bin/env tarantoolreturn {    call = function(params)        return {obj = params.obj}    end}

Загрузка конфигурации

Чтобы выполнить пример, нужно загрузить архив с моделью данных, файлом конфигурации и функциями обработчиков (handlers) в TDG:

  1. Поместите файлы со скриптами обработчиков (kafka_input_handler.lua и kafka_output_handler.lua) в папку src.
  2. Упакуйте в zip-архив:
    • папку src, внутри которой лежат файлы со скриптами обработчиков;
    • модель данных model.avsc;
    • файл конфигурации config.yml.
  3. Загрузите архив в TDG согласно инструкции.

Запуск и настройка Kafka

На сервере (брокере) Kafka создайте новый топик с именем cities. Для демонстрации работы примера и просмотра переданных в топик сообщений напишем простой скрипт на языке Python. Скрипт сыграет роль consumer Kafka, который получает сообщения из топика cities на брокере localhost:9092.

Чтобы работать с Kafka средствами Python, установите модуль kafka-python:

pip install kafka-python

Для запуска чтения сообщений, приходящих из топика cities, подготовьте следующий скрипт на языке Python:

from kafka import KafkaConsumerconsumer = KafkaConsumer('cities')for message in consumer:    print (message)

Чтобы выполнить скрипт, используйте интерактивный режим интерпретатора или сохраните функцию в файл consumer.py, а затем запустите ее командой python consumer.py.

Оставьте работать запущенный consumer.py, а затем переключитесь на новую вкладку консоли.

Запуск обработки объектов

Чтобы запустить отправку сообщений, отправьте в Kafka JSON-объект типа City с полем population. Для этого подготовьте следующий скрипт на языке Python:

from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092')headers = [("Header Key", b"Header Value")]producer.send('cities', value='{"title": "Moscow", "population": 12655050}'.encode('ascii'), headers=headers)producer.flush()

Скрипт содержит подключение к Kafka в качестве producer и отправку простого JSON-объекта: {"title": "Moscow", "population": 12655050}.

Чтобы выполнить скрипт, используйте интерактивный режим интерпретатора или сохраните функцию в файл producer.py, а затем запустите ее командой python producer.py.

В результате TDG получит объект City из топика cities, обработает его (увеличится значение поля population) и сохранит, а затем отправит объект в тот же топик Kafka. Это вызовет повторное получение, обработку и отправку. Пока пример запущен, во вкладке терминала consumer будут появляться все новые сообщения с постоянно возрастающим значением поля population.

Чтобы остановить обработку объектов, выполните одно из действий ниже:

  • Выключите Kafka или TDG.
  • Загрузите в TDG новую конфигурацию.