Руководство администратора
Настоящее Руководство содержит описание функционала, доступного для администратора Tarantool Change Data Capture (TCDC).
Универсальный Обработчик - это приложение для переноса информации от Источника к Приемнику данных. Настройка параметров переноса данных задается через файл конфигурации.
Универсальный Обработчик настраивается через конфигурационный файл application.yaml.
Вся конфигурация может быть собрана в одном файле или разделена на несколько файлов. В последнем случае
каждый файл должен называться application.yaml и располагаться на соответствующем логическом уровне.
Пример разбиения конфигурации на несколько файлов:
│ ...└─ cdc/├─ cdc-worker.jar # исполняемый файл Универсального Обработчика├─ application.yaml # хранит основные настройки, например, настройки сервера и другие, связанные со Spring│└─ config/├─ application.yaml # хранит общие настройки Обработчика, например, для работы с контрольными точками│├─ sink/│ └─ application.yaml # хранит настройки подключения к Приемнику данных└─ source/└─ application.yaml # хранит настройки подключения к Источнику данных
Параметры в файле настроек application.yaml представлены в виде иерархии объектов формата YAML.
Пример:
spring:application:name: CDCWorkerprofiles:active: default
Эту же иерархическую структуру можно представить в виде строки, где уровни разделяются точками:
spring.application.name: CDCWorkerspring.profiles.active: default
Или в комбинированном виде:
spring:application.name: CDCWorkerprofiles.active: default
Комбинированная форма удобна, если на каком-то уровне определяется всего один потомок в дереве настроек. В этом случае можно упростить само дерево, соединяя узлы-потомки через точку.
Файл настроек Универсального Обработчика состоит из нескольких разделов:
spring- конфигурация Spring Boot.logging- настройки логирования.server- настройки сервера.management- настройки Spring Actuator.cdc- общие настройки приложения Универсального Обработчика.source- настройки подключения к Источнику данных.sink- настройки подключения к Приемнику данных.offset- настройки работы с контрольными точками.throttle- настройки для ограничителя трафика.
Раздел spring содержит конфигурацию Spring Boot. Параметры раздела:
application.name- имя приложения. Значение по умолчанию:cdc-worker.profiles.active- имя профиля. Значение по умолчанию:default.
Пример:
spring:application.name: CDCWorkerprofiles.active: default
Раздел logging содержит настройки логирования. Параметры раздела:
logging.file.level- управление уровнем логирования модулей. Всего можно управлять двумя модулями, задавая для них различные уровни логирования при помощи значений:DEBUG,VERBOSE,INFO,WARNING,ERROR. ЗначениеOFFполностью отключает логирование событий модуля. По умолчанию логирование для модулей отключено:org.springframework.boot.SpringApplication- логирование самого процесса запуска приложения.org.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLogger- детальный отчет об автоконфигурации Spring Boot.
Отключение логирования модулей позволяет сократить как общий объем логов, так и количество лишней информации в них. Это ускоряет и упрощает анализ логов.
Пример:
logging:file:name: logs/cdc-worker.loglevel:org.springframework.boot.SpringApplication: OFForg.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLogger: OFF
Раздел server содержит настройки сервера. Параметры раздела:
server.port- порт сервера для подключения приложения. Значение по умолчанию:8000
Пример:
server:port: 8000
Раздел настроек management содержит настройки Spring Boot Actuator. Параметры раздела:
management.endpoints.web.exposure.include- ID адресов мониторинга. По умолчанию сюда включеныhealthиprometheus.management.endpoint- группа параметров управления мониторингом:prometheus.enabled- предоставление данных мониторинга в Prometheus. По умолчанию:true.health.probes.enabled- настройка Spring Boot Actuator для интеграции с Kubernetes. По умолчанию:true.health.show-details- определяет уровень детализации. Значение по умолчанию:always, указывает на полную информацию.
management.health.livenessState.enabled- механизм внутреннего отслеживания и управления состоянием жизнеспособности приложения. По умолчанию:true.management.health.readinessState.enabled- механизм внутреннего отслеживания и управления состоянием готовности приложения к работе. По умолчанию:true.
Пример:
management:endpoints.web.exposure.include:- health- prometheusendpoint:prometheus.enabled: truehealth:probes.enabled: trueshow-details: alwayshealth:livenessState.enabled: truereadinessState.enabled: true
Раздел содержит общие настройки приложения Универсального Обработчика. Параметры раздела:
cdc.worker.id- идентификатор процесса в системе.cdc.shutdown.timeout.ms- максимальное время мягкого останова процесса. По истечении времени происходит жесткий останов процесса.
Пример:
cdc:worker:id: cdc-taskshutdown:timeout:ms: 30000
Раздел содержит настройки подключения к Источнику данных. Параметры раздела:
source.plugin.path- путь, где расположены динамически загружаемые модули для подключения к Источнику данных. Путь должен указывать на папку, внутри которой находятся модули, каждый из которых распакован в свою папку. Просканированным будет только один модуль, что ускорит запуск приложения. См. подробнее.source.connector.*- раздел параметров настройки выбранного коннектора к Источнику данных.source.connector.class- наименование класса подключения к Источнику данных. Определяет, какой из просканированных и загруженных модулей подключения к Источнику данных нужно настроить и запустить. Обязательный параметр.source.transforms- раздел для настройки параметров преобразователей.
Пример:
source:plugin.path: /libs/connectconnector:class: io.tarantool.connector.TarantoolConnectortopic.prefix: cdcmax.batch.size: 2048connect:username: adminpassword: secret-cluster-cookietargets: localhost:3605timeout: 2000replication:idle.timeout: 2000anonymous: trueid: source-worker-1fields.exclude: bucket_idspaces.include: User,all_types,intTest,arrays,all_types_tcsdatetime.zone.enabled: true
Настройка коннекторов для подключения к PostgreSQL и Oracle из экосистемы Debezium также описывается в
разделе source.connector.*. Документация по настройкам коннекторов из экосистемы Debezium:
Предположим следующую схему расположения модулей:
/│ ...└─ libs/└─ connect/└─ source/│ ...├─ postgresql-connector/│ │ ...│ └─ postgresql-connector.jar│ ...└─ tarantool-source-connector/│ ...└─ tarantool-source-connector.jar
Тогда путь должен быть указан как /libs/connect/source. При этом Универсальным Обработчиком будут загружены, но
не запущены два модуля для подключения к Источнику данных. Поскольку сканирование всех модулей подключения может
занимать время, рекомендуется следующая форма расположения модулей:
/│ ...└─ libs/└─ connect/└─ source/│ ...├─ postgresql-connector/│ └─ postgresql-connector/│ │ ...│ └─ tarantool-source-connector.jar│ ...└─ tarantool-source-connector/└─ tarantool-source-connector/│ ...└─ tarantool-source-connector.jar
А в параметре source.plugin.path нужно указать либо
/libs/connect/source/postgresql-connector, либо /libs/connect/source/tarantool-connector. То есть путь всегда
должен указывать на папку уровнем выше относительно папки с модулем подключения к Источнику данных.
Раздел содержит настройки подключения к Приемнику данных. Параметры раздела:
sink.plugin.path- путь, где расположены динамически загружаемые модули для подключения к Приемнику данных. Путь должен указывать на папку, внутри которой находятся модули, каждый из которых распакован в свою папку. Просканированным будет только один модуль, что ускорит запуск приложения. См. подробнее.sink.connector.*- раздел для указания параметров настройки выбранного коннектора к Приемнику данных.sink.connector.class- наименование класса подключения к Приемнику данных. Определяет, какой из просканированных и загруженных модулей подключения к Приемнику данных нужно настроить и запустить. Обязательный параметрsink.transforms- раздел для настройки параметров преобразователей.sink.retry.*- задает правила обработки ошибкиRetriableExceptionпри работе с Приемником данных.sink.retry.count- количество повторных попыток отправки сообщений в Приемник данных. По истечении этих попыток будет выброшена ошибка и Обработчик завершит свою работу.sink.retry.backoff- пауза между повторными попытками отправки сообщений в Приемник данных. Может принимать разные единицы измерения при указании соответствующего суффикса: секунды -s, миллисекунды -ms.sink.retry.timeout- общее время на выполнение всех попыток отправки сообщений в Приемник данных. По истечении этого времени, если сообщения не будут записаны, выбросится ошибка и обработчик завершит свою работу. Может принимать разные единицы измерения при указании соответствующего суффикса: секунды -s, миллисекунды -ms.
Пример:
sink:plugin.path: /libs/connectconnector:class: io.debezium.connector.jdbc.JdbcSinkConnectorconnection:url: jdbc:postgresql://target-postgres:5432/iot?reWriteBatchedInserts=trueusername: iotpassword: iotinsert.mode: upsertprimary.key.mode: record_keytasks.max: 1delete.enabled: trueschema.evolution: basicdatabase.time.zone: UTCauto.create: truequote.identifiers: truetruncate.enabled: falsebatch.size: 500use.reduction.buffer: trueretry:count: 5backoff: 200mstimeout: 3stransforms:names:- topic-from-source-table- topic-from-source-space-nameconfig:topic-from-source-table:type: io.tarantool.cdc.transforms.smt.ExtractTopic$Valuefield.path: source.tableskip.missing.or.null: truetopic-from-source-space-name:type: io.tarantool.cdc.transforms.smt.ExtractTopic$Valuefield.path: source.space_nameskip.missing.or.null: true
Предположим следующую схему расположения модулей:
/│ ...└─ libs/└─ connect/└─ sink/│ ...├─ jdbc-sink-connector/│ │ ...│ └─ jdbc-sink-connector.jar│ ...└─ tarantool-sink-connector/│ ...└─ tarantool-sink-connector.jar
Тогда путь должен быть /libs/connect/sink. При этом универсальным обработчиком будут загружены, но не запущены
два модуля для подключения к Приемнику данных. Поскольку сканирование всех модулей подключения может занимать
время, рекомендуется следующая форма расположения модулей:
/│ ...└─ libs/└─ connect/└─ sink/│ ...├─ jdbc-sink-connector/│ └─ jdbc-sink-connector/│ │ ...│ └─ jdbc-sink-connector.jar│ ...└─ tarantool-sink-connector/└─ tarantool-sink-connector/│ ...└─ tarantool-sink-connector.jar
А в параметре sink.plugin.path указать
либо /libs/connect/sink/jdbc-sink-connector, либо /libs/connect/sink/tarantool-sink-connector. То есть, путь
всегда должен указывать на папку уровнем выше относительно папки с модулем подключения к Приемнику данных.
Раздел содержит настройки контрольных точек. Параметры раздела:
offset.flush.interval.ms- интервал сохранения контрольных точек Обработчика.offset.flush.timeout.ms- максимальное время выполнения операции записи контрольных точек. При превышении времени выбрасывается ошибка записи контрольных точек.offset.failures.max.count- допустимое количество ошибок записи контрольных точек. При превышении этого количества работа Универсального Обработчика завершается. По умолчанию:0. При этом значении работа универсального обработчика завершится после первой же ошибки. Для полного игнорирования ошибок необходимо выставить значение-1.offset.storage.*- пространство для настройки хранилища контрольных точек.offset.storage.type- наименование модуля хранения контрольных точек. Поддерживаемые модули:tqe- хранение контрольных точек в Tarantool Queue Enterprise 2.x/ 3.x. При указании этого модуля необходимо указать параметры:address- адрес и порт сервера, на котором установлен TQE.queue- имя очереди для записи контрольных точек.connection.id- ключ, по которому Обработчик будет сохранять свой снимок контрольных точек. Здесь необходимо указать значениеtask-${worker-id}для того, чтобы Helm-чарт автоматически присваивал уникальный ключ Обработчику. При подключении более чем одного Обработчика к очереди TQE уникальность ключа критически важна, потому что по этому ключу каждый Обработчик сохраняет свой снимок контрольных точек отдельно, не перезаписывая снимки других Обработчиков.
file- хранение контрольных точек в файле. При указании этого модуля необходимо указать параметры:file.name- путь к файлу с контрольными точками.
kafka- хранение контрольных точек в Kafka. При указании этого модуля необходимо указать параметры:servers- список парадрес:портсерверов Kafka.topic- название топика, в который будут сохраняться контрольные точки. Если топика до этого не существовало, он будет создан.partitions.count- число сегментов указанного топика при создании (значение учитывается только если топика до этого не существовало).replication.factor- сколько копий топика должно существовать.
Пример:
offset:flush:interval.ms: 5000timeout.ms: 2500storage:type: tqeaddress: offsets-grpc-server:18182queue: offsetsconnection.id: pg-task-1 # ключ, по которому Обработчик будет сохранять свой снимок контрольных точек.
Раздел содержит настройки ограничителя трафика. Параметры раздела:
throttler.max.eps- задает ограничение по количеству событий в секунду. По умолчанию:-1(ограничение не задано).
Пример:
throttle:throttler.max.eps: -1
Чтобы читать данные из Источников и записывать их в Приемники, Tarantool CDC использует коннекторы. Данные из Источника попадают в Tarantool CDC через коннектор к Источнику (source connector), проходят через ряд преобразований и записываются в Приемник через Коннектор к Приемнику (sink connector).
В поставку Tarantool CDC входят:
- Коннекторы к Источнику и Приемнику данных PostgreSQL.
- Коннекторы к Источнику и Приемнику данных Oracle.
- Коннекторы к Источнику и Приемнику данных Kafka.
- Коннекторы к Источнику и Приемнику данных Tarantool DB 1.x и 2.x.
- Коннекторы к Источнику и Приемнику данных Tarantool Data Grid 2.x.
- Коннекторы к Источнику и Приемнику данных Tarantool Queue Enterprise 2.x и 3.x.
- Коннектор к Приемнику данных Tarantool Column Storage 1.x.
- Коннектор к Приемнику данных Clickhouse.
- Коннектор к Приемнику данных Elasticsearch.
- Подключение сторонних коннекторов.
В файле настроек конкретный коннектор указывается при помощи обязательного параметра class. Параметр должен
быть прописан в разделах connector параметров Источника данных (source) и Приемника данных (sink):
connector:class: io.tarantool.connector.KafkaSourceConnector
Кроме параметра class, в разделе connector указываются и другие параметры коннекторов.
Больше информации о параметрах Kafka, перечисленных в статьях о коннекторах к Источнику и Приемнику, смотрите в официальной документации Kafka.
Коннектор к Источнику Kafka позволяет Tarantool CDC получать данные из брокеров Kafka.
Параметры коннектора указываются в разделе connector:
key.converter- название класса конвертера для ключа. Необязательный параметр, который может принимать 2 значения:org.apache.kafka.connect.json.JsonConverter(по умолчанию);io.apicurio.registry.utils.converter.AvroConverter.
key.converter.schemas.enable- определяет, включать ли схему в каждое сериализованное значение ключа. Необязательный параметр, значение по умолчанию:true.key.converter.schemas.cache.size- максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию:1000.key.converter.decimal.format- задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию:BASE64.key.converter.replace.null.with.default- определяет, заменять ли поля, имеющие значениеNULL, на значение по умолчанию. Приtrueприводит к заменеNULLна значение по умолчанию. ПриfalseзаменыNULLне происходит. Необязательный параметр, значение по умолчанию:true.key.converter.apicurio.registry.url- обязателен приkey.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для ключа. См. подробнее.value.converter- название класса конвертера для значения. Необязательный параметр, который может принимать 2 значения:org.apache.kafka.connect.json.JsonConverter(по умолчанию);io.apicurio.registry.utils.converter.AvroConverter.
value.converter.schemas.enable- определяет, включать ли схему в каждое сериализованное значение. Необязательный параметр, значение по умолчанию:true.value.converter.schemas.cache.size- максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию:1000.value.converter.decimal.format- задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию:BASE64.value.converter.replace.null.with.default- определяет, заменять ли поля, имеющие значениеNULL, на значение по умолчанию. Приtrueприводит к заменеNULLна значение по умолчанию. ПриfalseзаменыNULLне происходит. Необязательный параметр, значение по умолчанию:true.value.converter.apicurio.registry.url- обязателен приvalue.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для значения. См. подробнее.bootstrap.servers- список адресов брокеров, к которым подключаетсяkafka-consumer, cм. подробнее. Параметр так же может быть указан в разделеconsumer. При одновременном указании параметра в 2х местах с отличными друг от друга значениями, приоритетным будет значение в разделеconsumer. Параметр обязательно должен быть указан хотя бы в 1 из допустимых мест.topics- динамическая карта, в которой ключ - название топика, из которогоkafka-consumerбудет забирать данные, а значение - номер партиций, из которыхkafka-consumerбудет забираться данные для текущего топика. Параметр обязателен. Пример карты:topic-1: 0--kafka-consumerбудет забирать данные из 0 партиции топикаtopic-1.second-topic: 1-4--kafka-consumerбудет забирать данные из (1;4) партиций топикаsecond-topic(не включая правую границу). Т.е. будет забирать данные из 1,2,3 партиции.topic-name: 0,1-4--kafka-consumerбудет забирать данные из 0 и [1;4) партиций топикаtopic-name(не включая правую границу). Т.е. будет забирать данные из 0,1,2,3 партиции.
consumer- раздел определяет параметрыkafka-consumer. Все параметры, описанные ниже, относятся к этому разделу и не являются обязательными.fetch.max.bytes- cм. подробнее.max.poll.records- cм. подробнее.enable.auto.commit- определяет, будет ли потребитель автоматически периодически фиксировать смещения потребленных сообщений. Может принимать только предустановленное значение:false.key.deserializer- класс десериализатора, используемый для преобразования байтового представления ключа сообщения из формата хранения в Kafka в объект целевого языка программирования. Может принимать только предустановленное значение:org.apache.kafka.common.serialization.ByteArrayDeserializer.value.deserializer- класс десериализатора, используемый для преобразования байтового представления значения сообщения из формата хранения в Kafka в объект целевого языка программирования. Может принимать только предустановленное значение:org.apache.kafka.common.serialization.ByteArrayDeserializer.auto.offset.reset- Определяет поведение потребителя при отсутствии сохраненного смещения для партиции в группе потребителей. Может принимать только предустановленное значение:earliest.
Пример настройки:
connector:class: io.tarantool.connector.KafkaSourceConnectorkey.converter: org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable: truekey.converter.schemas.cache.size: 1000key.converter.decimal.format: BASE64key.converter.replace.null.with.default: truevalue.converter: org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable: truevalue.converter.schemas.cache.size: 1000value.converter.decimal.format: BASE64value.converter.replace.null.with.default: truebootstrap.servers: broker-1:9092,broker-2:9092topics:topic-1: 0second-topic: 1-4topic-N: 0,1-4consumer:fetch.max.bytes: 52428800max.poll.records: 500
Коннектор к Приемнику данных Kafka позволяет Tarantool CDC передавать данные в брокеры Kafka.
Параметры коннектора указываются в разделе connector.
key.converter- название класса конвертера для ключа. Необязательный параметр, который может принимать 2 значения:org.apache.kafka.connect.json.JsonConverter(по умолчанию);io.apicurio.registry.utils.converter.AvroConverter.
key.converter.schemas.enable- определяет, включать ли схему в каждое сериализованное значение ключа. Необязательный параметр, значение по умолчанию:true.key.converter.schemas.cache.size- максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию:1000.key.converter.decimal.format- задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию:BASE64.key.converter.replace.null.with.default- определяет, заменять ли поля, имеющие значениеNULL, на значение по умолчанию. Приtrueприводит к заменеNULLна значение по умолчанию. ПриfalseзаменыNULLне происходит. Необязательный параметр, значение по умолчанию:true.key.converter.apicurio.registry.url- обязателен приkey.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для ключа. См. подробнее.value.converter- название класса конвертера для значения. Необязательный параметр, который может принимать 2 значения:org.apache.kafka.connect.json.JsonConverter(по умолчанию);io.apicurio.registry.utils.converter.AvroConverter.
value.converter.schemas.enable- определяет, включать ли схему в каждое сериализованное значение. Необязательный параметр, значение по умолчанию:true.value.converter.schemas.cache.size- максимальное количество схем, которые можно кэшировать в этом экземпляре конвертера. Необязательный параметр, значение по умолчанию:1000.value.converter.decimal.format- задает формат, в который будет сериализовываться десятичные числа. Значение не чувствительно к регистру. Необязательный параметр, значение по умолчанию:BASE64.value.converter.replace.null.with.default- определяет, заменять ли поля, имеющие значениеNULL, на значение по умолчанию. Приtrueприводит к заменеNULLна значение по умолчанию. ПриfalseзаменыNULLне происходит. Необязательный параметр, значение по умолчанию:true.value.converter.apicurio.registry.url- обязателен приvalue.converter: io.apicurio.registry.utils.converter.AvroConverter. Указывает адрес реестра схем, в котором хранятся схемы для значения. См. подробнее.bootstrap.servers- список адресов брокеров, к которым подключаютсяkafka-producerиadmin-producer. См. подробнее. Параметр может быть указан в разделахproducerиadmin. При одновременном указании параметра в нескольких местах с отличными друг от друга значениями, приоритетными будут значения параметра в разделахproducerдляkafka-producerи/илиadminдляadmin-producer. Параметр обязательно должен быть указан хотя бы в 1 из допустимых мест.producer- раздел определяет настройкуkafka-producer, cм. подробнее. Параметры этого раздела не являются обязательными. Ниже представлены некоторые из возможных параметров раздела:key.serializer- указывает класс сериализатора, используемый для преобразования объекта ключа сообщения из формата приложения в байтовое представление для хранения в брокере Kafka. Значение по умолчанию:org.apache.kafka.common.serialization.ByteArrayDeserializer.value.serializer- указывает класс сериализатора, используемый для преобразования объекта значения сообщения из формата приложения в байтовое представление для хранения в брокере Kafka. Значение по умолчанию:org.apache.kafka.common.serialization.ByteArrayDeserializer.max.block.ms- определяет максимальное время блокировки при вызове методовsend()иpartitionsFor(), после которого выбрасывается исключениеTimeoutException. Значение по умолчанию:Long.MAX_VALUE.enable.idempotence- регулирут идемпотентный режим работы. Этот режим гарантирует, что отправка сообщений будет выполнена ровно один раз в рамках сессии продюсера, даже при повторных попытках отправки. Значение по умолчанию:false.acks- определяет количество подтверждений, которые продюсер должен получить от лидеров реплик партиций перед тем, как запись будет считаться завершенной. Значение по умолчанию:all.max.in.flight.requests.per.connection- определяет максимальное количество не подтвержденных запросов отправки, которые могут быть отправлены на один брокер без получения ответа. Значение по умолчанию:1.delivery.timeout.ms- определяет максимальное время ожидания подтверждения доставки сообщения, после которого операция отправки считается завершенной (успешно или с ошибкой). Значение по умолчанию:Integer.MAX_VALUE.
admin- раздел параметров для настройкиadmin-producer. Раздел не является обязательным. Особенности настройки см. подробнее.topics- раздел параметров для настройки топиков. Раздел не является обязательным. Особенности настройки см. подробнее.
Пример настройки:
connector:class: io.tarantool.connector.KafkaSinkConnectorkey.converter: org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable: truekey.converter.schemas.cache.size: 1000key.converter.decimal.format: BASE64key.converter.replace.null.with.default: truevalue.converter: org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable: truevalue.converter.schemas.cache.size: 1000value.converter.decimal.format: BASE64value.converter.replace.null.with.default: truebootstrap.servers: broker-1:9092,broker-2:9092producer:batch.size: 512admin:bootstrap.servers: broker-1:9092,broker-2:9092topic.creation.default:partitions: 1replication.factor: 1
Коннектор к Tarantool Column Storage (TCS) позволяет TCDC передавать данные в TCS через JDBC-драйвер Arrow Flight SQL.
{note:important} DDL-команды нельзя передавать через коннектор. При получении такого события произойдет ошибка и Универсальный Обработчик остановится. Все команды по добавлению новых или изменению существующих таблиц, в которые планируется запись, необходимо выполнять вне коннектора. {/note}
Параметры коннектора указываются в разделе connector:
class- имя класса коннектора для записи в TCS по JDBC. Обязательный параметр. Может принимать только одно значение:io.debezium.connector.jdbc.JdbcSinkConnector.
connection.url- строка подключения Arrow Flight SQL. Обязательный параметр. Вид:jdbc:arrow-flight-sql://<хост>:<порт>?useEncryption=0. ПараметрuseEncryption=0отключает шифрование (например, для локальной сборки).
connection.username- имя пользователя TCS. Обязательный параметр.connection.password- пароль пользователя TCS. Обязательный параметр.insert.mode- режим записи строк. Должен быть согласован с фактической схемой таблиц в TCS. Необязательный параметр. Принимает значения:INSERT— вставка новой строки (нужен уникальный первичный ключ).UPDATE— обновление по ключу.
primary.key.mode- способ получения первичного ключа записи для ее сохранения в Приемнике. Необязательный параметр. Принимает значения:none— первичный ключ не требуется.kafka— в качестве первичного ключа используются контрольные точки Kafka (__connect_topic,__connect_partition,__connect_offset).record_key— первичный ключ извлекается из ключа события (record.key).record_value— первичный ключ извлекается из самой записи (record.value).record_header— ключ извлекается из заголовков события (ConnectRecordheaders()). См. подробнее. Набор и порядок полей первичных ключей событий должны соответствовать ключам тех таблиц в TCS, в которые коннектор записывает события.
primary.key.fields- определяет список полей, используемых в качестве первичного ключа. Его поведение зависит от типа ключа в сообщении:- Когда ключ имеет простой тип — параметр становится обязательным, иначе коннектор
выбросит исключение. В этом случае берется первое значение из свойства
primary.key.fields. - Когда ключ сообщения представляет собой структуру, параметр можно не задавать: используются все поля ключа. Если параметр задан, используются только перечисленные поля из структуры ключа.
- Когда ключ имеет простой тип — параметр становится обязательным, иначе коннектор
выбросит исключение. В этом случае берется первое значение из свойства
delete.enabled- определяет, обрабатывать ли события удаления. Необязательный параметр. При значенииtrueсобытия удаления обрабатываются, приfalseигнорируются. Также, при значенииtrue, перед обработкой событийDELETEтаблицы сбрасывается буфер накопленных событийUPDATEдля той же таблицы, чтобы не потерять порядок операций. Значение по умолчанию:false.schema.evolution- поддержка режима изменения схемы таблиц Приемника. Необязательный параметр. Принимает знаения:none- события, приводящие к изменениям схемы таблиц (DDL-команды) не передаются в Приемник коннектором. Это рекомендованное значение при работе с Приемником данных TCS.basic- события, приводящие к изменениям схемы таблиц (DDL-команды) передаются в Приемник коннектором. Это значение установлено по умолчанию.
database.time.zone- часовой пояс для типов даты/времени при записи в БД. Необязательный параметр. Значение по умолчанию:UTC.auto.create- автоматическое создание таблиц в Приемнике при необходимости. Необязательный параметр. Значение по умолчанию:true.quote.identifiers- экранирование идентификаторов в SQL (имен таблиц и колонок). Необязательный параметр. Значение по умолчанию:true.truncate.enabled- разрешение операций очистки таблиц, если они приходят в потоке событий. Необязательный параметр. Значение по умолчанию:false.batch.size- размер пакета накапливаемого пакета записей для передачи в БД. Необязательный параметр. Значение по умолчанию:500.use.reduction.buffer- использование буфера сжатия событий. При работе буфер накапливает несколько событий, относящихся к одной и той же записи (таблица + ключ), и оставляет только последнее из них. Это сокращает количество операций записи, заменяя серию промежуточных обновлений одним, самым новым. Необязательный параметр. Значение по умолчанию:true.hibernate.dialect- диалектHibernateдля генерации SQL. Значение зависит от версии TCS и установленного драйвера JDBC. Необязательный параметр.
Пример настройки:
sink:plugin.path: plugins/sinkconnector:class: io.debezium.connector.jdbc.JdbcSinkConnectorconnection:url: jdbc:arrow-flight-sql://localhost:50051username: tcspassword: tcsinsert.mode: updateprimary.key.mode: record_keydelete.enabled: trueschema.evolution: nonedatabase.time.zone: UTCauto.create: truequote.identifiers: truetruncate.enabled: falsebatch.size: 500use.reduction.buffer: truehibernate.dialect: org.hibernate.dialect.PostgreSQLDialect
Коннектор к приемнику данных Elasticsearch позволяет TCDC передавать события изменения данных в индексы Elasticsearch. Коннектор преобразует каждое событие (вставку, обновление, замену или удаление записи в источнике) в соответствующее действие над документом в Elasticsearch: индексацию, обновление или удаление. Таким образом, документы в Elasticsearch отражают актуальное состояние данных, переданных через CDC.
Параметры коннектора:
-
elastic.connections— список URL для подключения к узлам Elasticsearch. Обязательный параметр. Каждый элемент списка - строка с URL вида<elasticsearch://>[<login>[:<password>]@]<host>:<port>[/][?]или<elasticsearchs://>[<login>[:<password>]@]<host>:<port>[/][?]для подключения по TLS(SSL). Элементы списка перечисляются через запятую.Внимание:
- На данный момент в строке подключения не поддерживаются параметры запроса (например, ?timeout=30s).
- При подключении к узлу с использованием TLS(SSL) доверенные сертификаты в форматах
PEMиPKCS12можно задать через параметрыtls.trust.pkcs12.certificatesиtls.trust.pem.certificates. Цепочка доверия для каждого узла Elasticsearch будет проверяться на основе этих сертификатов.
-
insert.mode— режим вставки записей. Необязательный параметр. Принимает значения:insert— вставка происходит, если в индексе нет документа с таким же_id. Если документ существует, коннектор выбрасывает исключение. Это значение используется по умолчанию.replace— при наличии документа с таким же_idстарый документ заменяется новым.
-
primary.key.mode— способ получения первичного ключа записи для ее сохранения в Elasticsearch. Значение первичного ключа сохраняется в поле документа_id. Необязательный параметр. Принимает значения:record_value— первичный ключ извлекается из самой записи (value). Это значение используется по умолчанию.record_key— первичный ключ извлекается из ключа события (key).
-
primary.key.fields— список имен полей, которые входят в состав значения первичного ключа события, в Elasticsearch. Коннектор проверяет, какие из указанных полей присутствуют в событии, и формирует из них массив в формате JSON, перечисляя их через запятую. Порядок элементов в массиве соответствует порядку перечисления полей в схеме события. Поля, отсутствующие в событии, игнорируются. Если поле присутствует, но его значение равноnull, то в составном ключе (несколько полей)nullдопускается, а в несоставном ключе (одно поле) коннектор выбрасывает исключение. Пустая строка записывается как "". Полученный массив в формате JSON записывается в поле_idсохраняемого документа. Необязательный параметр:- Если параметр не задан или пуст, первичным ключом считаются все поля записи.
- Если параметр задан, коннектор ищет в записи поля с указанными именами. Найденные поля объединяются в состав
_id. Если не найдено ни одного поля, коннектор выбрасывает исключение.
-
delete.enabled— включает или выключает обработку событийDELETE. Необязательный параметр. Возможные значения:true- события обрабатываются.false— события отбрасываются с записью в лог (уровеньDEBUG). Это значение используется по умолчанию.
-
decimal.handling.mode— режим преобразования полей типаDecimal(включаяio.debezium.data.VariableScaleDecimal). Необязательный параметр. Принимает значения:precise,engineer_string— значение представляется в виде экспоненциальной записи (например, 123.45 преобразуется в "1.2345e2"). Тип в Elasticsearch —text. Значениеpreciseиспользуется по умолчанию.plain_string— значение всегда представляется в виде строки десятичной дроби (тип —text).double— значение представляется в виде числа двойной точности (тип —double). Этот тип не подходит для данных, требующих абсолютной точности.
-
datetime.handling.mode— режим преобразования полей типа дата‑время. Необязательный параметр. Принимает значения:datetime— преобразуется в типdateElasticsearch. Это значение используется по умолчанию.string— преобразуется в текстовую строку ISO‑8601 (типtext).numeric— преобразуется в целое число, разрядность которого зависит от исходного типа Kafka Connect:- Для
Timestamp- 64-битное (long) - миллисекунды, отсчитываемые с момента 01 января 1970 года 00 часов 00 минут 00 секунд. - Для
Date— 32-битное целое (int) с количеством дней с 01 января 1970 года. - Для
Time— 32-битное целое (int) с миллисекундами с полуночи.
- Для
-
datetime.infinity.handling.enabled— включает или выключает обработку значений-infinity/+infinityдля логического типаio.debezium.time.ZonedTimestamp. Необязательный параметр. Принимает значения:true:- При
datetime.handling.mode: string— выходной типtextсо значением-infinity/+infinity; - При
datetime.handling.mode: datetimeилиdatetime.handling.mode: numeric— выходной типdateс преобразованием в граничные даты (+5879610-12-31...и-5879609-12-31...).
- При
false:- При
datetime.handling.mode: datetimeилиdatetime.handling.mode: numeric— коннектор выбрасывает исключение. - При
datetime.handling.mode: string— коннектор работает в штатном режиме. Значение по умолчанию:false.
- При
-
io.threads.count— количество потоков обработки операций сетевого ввода‑вывода. Необязательный параметр. Значение0означает отсутствие ограничений количества потоков. По умолчанию используется значение0. -
futures.processing.threads.count— количество потоков для обработки ответов от Elasticsearch. Низкое количество потоков относительно количества событий снижает скорость обработки событий. Увеличение количества потоков повышает скорость обработки, но требует дополнительных ресурсов процессора. Значение0означает использование всех доступных ресурсов хоста. По умолчанию используется значение0. -
tls.trust.pem.certificates— список полных путей к доверенным сертификатам в форматеPEM(расширения.pem,.crt,.ca-bundle) или путей к папкам, где хранятся такие сертификаты (поиск выполняется рекурсивно). Необязательный параметр. -
tls.trust.pkcs12.certificates— список записей видапуть_к_файлу.p12:пароль, где указывается путь к контейнеруPKCS12(.p12,.pfx) и пароль от него. Контейнеры без пароля не допускаются. Необязательный параметр. -
auth.api-key— Строка в формате Base64 для аутентификации в API по ключу. Если параметр задан, он используется вместо аутентификации по логину и паролю. Необязательный параметр.
При получении события для индекса, который отсутствует в кластере Elasticsearch, коннектор автоматически создает этот индекс.
Параметры из разделе index применяются только в момент создания индекса. Если индекс уже существует, эти параметры не используются.
Если параметры не заполнены - используются значения по умолчанию.
Больше информации о настройках индексов Elasticsearch см. по ссылке.
Параметры настройки индексов указываются в виде *.index.<параметр>: индекс1: значение1, индекс2: значение2, __common__: значение_по_умолчанию.
Ключ __common__ задает значение для всех индексов, у которых не указано свое конкретное значение. Если __common__ не
указан, то для всех индексов для которых значения не указаны, будет использоваться значение по умолчанию.
index.number-of-shards— количество первичных шардов индекса. Значение по умолчанию устанавливается на стороне Elasticsearch. Необязательный параметр.index.number-of-routing-shards— количество виртуальных шардов, используемых в ключе маршрутизации записи события в Elasticsearch. Значение по умолчанию:1. Необязательный параметр.index.mapping.total-fields.limit— максимальное количество полей в индексе (включая сопоставления полей, объектов, псевдонимов и вычисляемых полей). Превышение лимита может привести к ухудшению производительности и перегрузке памяти узлов кластера. Необязательный параметр. Значение по умолчанию:1000.index.mapping.dynamic— управляет добавлением новых полей в сопоставление индекса. Принимает значения:true— новые поля автоматически добавляются в сопоставление. Это значение используется по умолчанию.runtime— новые поля добавляются в сопоставление как вычисляемые поля. Такие поля не индексируются и загружаются из исходного документа (_source) во время выполнения запроса.false— новые поля не добавляются в сопоставление. Они не индексируются и не участвуют в поиске, но остаются в исходном документе ответа.strict— при обнаружении нового поля выбрасывается исключение, и документ отклоняется.
index.translog.durability— режим сброса журнала транзакций на диск. Необязательный параметр. Принимает значения:request— синхронный сброс после каждого запроса. Это значение используется по умолчанию.async— асинхронный сброс в фоне с интерваломsync_interval.
index.translog.sync-interval— интервал принудительной записи журнала транзакций на диск. Необязательный параметр. Используется только приdurability: async. Значение по умолчанию:5s. Не может быть менее100ms.index.translog.flush-threshold-size— максимальный общий размер операций вtranslog, после которого выполняется принудительный сброс (flush). Необязательный параметр. Значение по умолчанию:10 GB.index.refresh-interval— интервал выполнения операцииrefresh(делает последние изменения видимыми для поиска). Необязательный параметр. Значение по умолчанию различается для разных типов установок Elasticsearch:- Для облачной установки EalsticSearch (Serverless) -
5s. - Для установки Elasticsearch в Elastic Stack -
1s. При значении-1операцияrefreshотключается.
- Для облачной установки EalsticSearch (Serverless) -
index.auto-expand-replicas— автоматическое расширение числа реплик в зависимости от количества узлов данных в кластере. Формат:нижняя_граница-верхняя_граница(например,0-5) или0-allдля всех узлов. Необязательный параметр. Значение по умолчанию:false(отключено).index.codec— метод сжатия хранимых данных. Необязательный параметр. Принимает значения:default— сжатиеLZ4.best_compression— сжатиеZSTD(более высокая степень сжатия за счет скорости чтения).
index.max-result-window— ограничение на глубину поиска в Elasticsearch. Этот параметр не влияет на передачу данных коннектором, но ограничивает поисковые запросы к уже проиндексированным данным со стороны других сервисов. Необязательный параметр. Значение по умолчанию:10000.index.routing.allocation.enable— разрешение на размещение шардов при их создании или восстановлении. Параметр определяет, какие типы шардов могут быть размещены на узлах кластера. Необязательный параметр. Принимает значения:all— можно размещать любые шарды (первичные и реплики). Это значение используется по умолчанию.primaries— можно размещать только первичные шарды.new_primaries— можно размещать только вновь создаваемые первичные шарды.none— нельзя размещать никакие шарды. При этом значении создание индекса невозможно.
index.routing.rebalance.enable— разрешение на перераспределение шардов. Параметр определяет, какие типы шардов могут быть автоматически перемещены между узлами кластера для выравнивания нагрузки. Необязательный параметр. Принимает значения:all— разрешено перераспределение любых шардов (первичных и реплик). Это значение используется по умолчанию.primaries— разрешено перераспределение только первичных шардов.replicas— разрешено перераспределение только реплик.none— перераспределение запрещено.
index.routing-partition-size— количество шардов, на которые направляются документы при использовании пользовательской маршрутизации. Значение должно быть меньше общего количества шардов индекса. Необязательный параметр. Значение по умолчанию:1(все документы с одинаковым значением пользовательской маршрутизации попадают в один шард).index.soft-deletes.enabled— включение механизма мягких удалений, при которм удаленные документы не удаляются физически, а помечаются как удаленные и временно хранятся для нужд репликации и восстановления. Необязательный параметр. Принимает значения:true- режим мягкого удаления включен. Это значение используется по умолчанию.false- режим мягкого удаления выключен, при удалении документы стираются сразу. Параметр устарел начиная с Elasticsearch 7.6.0, так как мягкие удаления включены по умолчанию и не отключаются.
index.soft-deletes.retention-lease.period— максимальный срок, на который ведущий шард гарантирует хранение истории мягких удалений для реплик. Если реплика не синхронизируется в течение этого периода, история считается утерянной. Необязательный параметр. Значение по умолчанию:12h.index.search.idle.after— время бездействия шарда без поисковых илиget-запросов, после которого шард считается простаивающим. Необязательный параметр. Значение по умолчанию:30s.
Пример настройки:
connector:elastic.connections: elasticsearch://user:pass@localhost:9200, elasticsearchs://10.0.0.1:9200insert.mode: replaceprimary.key:mode: record_valuefields: id, order_numberdelete.enabled: truedecimal.handling.mode: precisedatetime:handling.mode: datetimeinfinity.handling.enabled: trueio.threads.count: 4futures.processing.threads.count: 4tls.trust:pem.certificates: /etc/certs/elastic-ca.crt, /etc/certs/trusted/pkcs12.certificates: /etc/certs/client.p12:secret123, /etc/certs/node.p12:pass456auth.api-key: dXNlcm5hbWU6cGFzc3dvcmQ=index:number-of-shards: users:5, orders:10, __common__:3number-of-routing-shards: users:5, orders:10, __common__:3mapping:total-fields.limit: users:2000, __common__:1000dynamic: __common__:true, users:runtime, logs:falsetranslog:durability: users:async, __common__:requestsync-interval: users:10s, __common__:5sflush-threshold-size: users:5gb, __common__:10gbrefresh-interval: users:-1, orders:30s, __common__:1sauto-expand-replicas: users:0-3, orders:0-all, __common__:falsecodec: users:best_compression, __common__:defaultmax-result-window: users:50000, __common__:10000routing:allocation.enable: users:primaries, __common__:allrebalance.enable: users:replicas, __common__:allrouting-partition-size: users:3, __common__:1soft-deletes:retention-lease.period: users:24h, __common__:12henabled: users:true, __common__:truesearch.idle.after: users:60s, __common__:30s
Коннекторы Tarantool позволяют TCDC подключаться к Tarantool DataBase (TDB) и Tarantool Data Grid 2.x (TDG2) для:
- Получения событий изменения данных из TDB и TDG2 при помощи коннектора к Источнику данных Tarantool.
- Передачи событий изменения данных в TDB Коннекторы Tarantool позволяют TCDC подключаться к Tarantool DataBase (TDB) для:
- Получения событий изменения данных при помощи коннектора к Источнику данных Tarantool.
- Передачи событий изменения данных при помощи коннектора к Приемнику данных Tarantool.
Коннектор к Источнику данных Tarantool позволяет TCDC получать события изменения данных в Tarantool DataBase 1.x/ 2.x (TDB1/ TDB2) и Tarantool Data Grid 2.x (TDG2). Событие может быть вставкой, обновлением или удалением записи.
Параметры коннектора:
topic.prefix— приставка в теме сообщения. Необязательный параметр. Значение по умолчанию не установлено.poll.interval.ms- время, в течение которого коннектор накапливает события в пакете, который будет передан для дальнейшей обработки. Если размер пакета не достиг размера, указанного вmax.batch.size, то пакет передается по истечении указанного времени. Необязательный параметр. Значение по умолчанию:500.max.batch.size- максимальное количество событий, которое вмещает пакет передачи данных. По достижении указанного количества пакет передается для дальнейшей обработки. Необязательный параметр. Значение по умолчанию:2048.connect.username— имя пользователя для подключения к Источнику. Обязательный параметр.connect.password— пароль пользователя для подключения к Источнику. Необязательный параметр. Пустой пароль или отсутствие параметра допускается только с именем пользователяguest.connect.timeout— время ожидания подключения к Источнику в миллисекундах. Если подключение не установлено за это время, операция прерывается. Необязательный параметр. Допустимые значения: от1до2147483647(Integer.MAX_VALUE). Значение по умолчанию:2000.connect.adapter- режим обработки входящих событий. Определяет, как именно коннектор должен читать и преобразовывать кортежи, полученные из Tarantool. Принимает значения:tarantool- события обрабатываются как обычные кортежи Tarantool. Коннектор извлекает и отправляет их в том виде, в котором они хранятся в базе. Этот режим не рекомендуется для подключения к Приемнику TDG2, из-за особенностей хранения данных.tdg2- события обрабатываются как сложные документы TDG2. Этот режим рекомендуется для подключения к Приемнику TDG2.
connect.targets- список узлов, к которым необходимо подключить коннектор. Обязательный параметр. Значение по умолчанию не установлено.replication.anonymous- включает режим анонимной реплики. Необязательный параметр. Принимает значения:false- коннектор подключается как неанонимная реплика.true- коннектор подключается как анонимная реплика. Анонимная реплика не может изменять данные в кластере Tarantool и не учитывается кластером Tarantool в его внутренних расчетах. Это значение используется по умолчанию.
replication.id- произвольный идентификатор сеанса репликации, используемый для генерации уникального экземпляра коннектора в кластере Tarantool. Необязательный параметр.replication.idle.timeout— максимальное время ожидания в миллисекундах, после последнего сигнала (события или пульса) от Tarantool. По истечении этого времени подключение к Источнику прерывается. Необязательный параметр. Допустимые значения: от0до9223372036854775807(Long.MAX_VALUE). Значение по умолчанию:2000.spaces.include— список названий пространств, которые коннектор будет обрабатывать. Названия перечисляются через запятую. Необязательный параметр. Если параметр не указан или пуст, обрабатываются все пространства Источника.spaces.exclude— список названий пространств, которые коннектор не будет обрабатывать. Необязательный параметр. Названия перечисляются через запятую. Параметр имеет приоритет надspaces.include- при указании пространства в обоих параметрах, коннектор не будет обрабатывать это пространство.fields.include— список названий полей, которые коннектор будет обрабатывать. Названия перечисляются через запятую. Необязательный параметр. Под действие параметра попадают поля пространств Источника, указанных вspaces.includeили всех пространств, еслиspaces.includeне указан или пуст. Также можно указывать поля конкретных пространств в формате[space_name].[field_name], гдеspace_name— название пространства,field_name— имя поля. Названия полей, которые не принадлежат ни одному пространству, игнорируются.fields.exclude— список названий полей, которые коннектор не будет обрабатывать. Необязательный параметр. Названия перечисляются через запятую. Также можно указывать поля конкретных пространств в формате[space_name].[field_name], гдеspace_name— название пространства,field_name— имя поля. Параметр имеет приоритет надfields.include- при указании поля в обоих параметрах, коннектор не будет обрабатывать это поле.datetime.zone.enabled— включает или выключает обработку временных зон типов данныхdatetime(для TDB),DateTimeиTimestamp(для TDG2). Необязательный параметр. Принимает значения:false— указанные типы данных кодируются вio.debezium.time.MicroTimestampв виде количества микросекунд, отсчитываемые с момента 01 января 1970 года 00 часов 00 минут 00 секунд. Это значение используется по умолчанию.true— указанные типы данных кодируются вio.debezium.time.ZonedTimestampв виде ISO‑8601 строки со смещением.
integer.handling.mode— режим преобразования полей типаintegerилиunsigned. Необязательный параметр. Принимает значения:int64— все поля указанных типов преобразуются в обычное целое число (со знаком, до 9 квинтиллионов). Это значение используется по умолчанию.decimal— все поля указанных типов преобразуются в десятичное число без дробной части.
Пример настройки:
connector:topic.prefix: cdcpoll.interval.ms: 100max.batch.size: 2048connect:username: adminpassword: secret-cluster-cookietimeout: 2000adapter: tarantooltargets: source-tdb-storage-1-master:3301replication:anonymous: trueid: cdc-source-worker-1idle.timeout: 2000spaces:include: Userexclude: intTestfields:include: all_typesexclude: bucket_iddatetime.zone.enabled: falseinteger.handling.mode: int64
Коннектор к Приемнику данных Tarantool позволяет TCDC записывать события в эти СУБД. Событие может быть вставкой, обновлением или удалением записи из Источника данных.
Параметры коннектора:
tarantool.connection.groups— список URL с настройками для подключения к TDB. Обязательный параметр. Каждый элемент списка - строка с URL видаtarantool://<host><:><port>/?<param1=value1¶m2=value2...>,tarantool://<host><:><port>/?<param1=value1¶m2=value2...>. Элементы списка перечисляются через запятую. Доступные параметры:host— имя узла Tarantool. Значение по умолчанию:localhostport— порт для подключения. Значение по умолчанию:3301.size— количество подключений в группе. Значение по умолчанию:1.user— имя пользователя для подключения. Значение по умолчанию:guest.password— пароль для подключения. Значение по умолчанию отсутствует.auth_type— тип аутентификации группы подключения. Доступные значения:CHAP_SHA1- результат преобразования пароля уникален для каждого подключения, что снижает риски перехвата пароля. Это значение используется по умолчанию.PAP_SHA256- результат преобразования пароля одинаков при каждом подключении.
tag— имя группы подключения, уникальное. Значение по умолчанию отсутствует.
insert.mode— режим вставки записей. Необязательный параметр. Принимает значения:insert— вставка происходит если в пространстве нет кортежа с идентичным первичным ключом. В ином случае коннектор завершает работу с исключением. Это значение используется по умолчанию.replace— при наличии кортежа с таким же первичным ключом перезаписывает старый кортеж новым.
primary.key.mode— способ получения первичного ключа. Необязательный параметр. Принимает значения:record_value(по умолчанию) — поиск первичного ключа производится в самой записиrecord_key— поиск первичного ключа производится в структуреrecord.key
primary.key.fields— список имен полей, потенциально входящих в состав первичного ключа. Параметр связан сprimary.key.modeи определяет части первичного ключа по именам полей. Необязательный параметр. Поведение зависит от значенияprimary.key.mode:- При
record_value:- Когда список полей пустой или отсутствует, первичным ключом считаются все поля в записи.
- Когда список полей не пустой, производится поиск полей записи по названиям из этого списка. При нахождении поля оно считается частью первичного ключа.
- Когда в записи не найдено ни одного поля с названием, указанным в списке полей, коннектор выбрасывает исключение.
- При
record_key:- Когда ключ имеет простой тип — параметр становится обязательным, иначе коннектор
выбросит исключение. В этом случае берется первое значение из свойства
primary.key.fields. - Когда ключ сообщения представляет собой структуру, параметр можно не задавать: используются все поля ключа. Если параметр задан, используются только перечисленные поля из структуры ключа.
- Когда ключ имеет простой тип — параметр становится обязательным, иначе коннектор
выбросит исключение. В этом случае берется первое значение из свойства
- При
delete.enabled— включает или выключает обработку событийDELETE. Необязательный параметр. Возможные значения:true- события обрабатываются.false— события отбрасываются с записью в журнал (уровеньDEBUG). Это значение используется по умолчанию.
truncate.enabled— включает или выключает обработку событийTRUNCATE. Необязательный параметр. Возможные значения:true- события обрабатываются.false— события отбрасываются с записью в журнал (уровеньDEBUG). Это значение используется по умолчанию. Внимание: Включать обработку событийTRUNCATEне рекомендуется. Порядок обработки сообщений такого рода в общем потоке данных не гарантирован, что может привести к непредвиденным потерям данных в Приемнике.
decimal.handling.mode— режим преобразования полей типаio.debezium.data.VariableScaleDecimalиorg.apache.kafka.connect.data.Decimal. Необязательный параметр. Принимает значения:precise,engineer_string— значение представляется в виде экспоненциальной записи (например, 123.45 преобразуется в "1.2345e2"). Значениеpreciseиспользуется по умолчанию.plain_string— значение всегда представляется в виде строки десятичной дроби.double— значение представляется в виде числа двойной точности. Этот тип не подходит для данных, требующих абсолютной точности.
datetime.handling.mode— режим преобразования полей типа дата‑время. Необязательный параметр. Принимает значения:datetime— преобразуется в типdateElasticsearch. Это значение используется по умолчанию.string— преобразуется в текстовую строку ISO‑8601 (типtext).numeric— преобразуется в целое число, разрядность которого зависит от исходного типа Kafka Connect:- Для
Timestamp- 64-битное (long) - миллисекунды, отсчитываемые с момента 01 января 1970 года 00 часов 00 минут 00 секунд. - Для
Date— 32-битное целое (int) с количеством дней с 01 января 1970 года. - Для
Time— 32-битное целое (int) с миллисекундами с полуночи.
- Для
datetime.infinity.handling.enabled— включает или выключает обработку значений-infinity/+infinityдля логического типаio.debezium.time.ZonedTimestamp. Необязательный параметр. Принимает значения:true:- При
datetime.handling.mode: string— выходной типtextсо значением-infinity/+infinity; - При
datetime.handling.mode: datetimeилиdatetime.handling.mode: numeric— выходной типdateс преобразованием в граничные даты (+5879610-12-31...и-5879609-12-31...).
- При
false:- При
datetime.handling.mode: datetimeилиdatetime.handling.mode: numeric— коннектор выбрасывает исключение. - При
datetime.handling.mode: string— коннектор работает в штатном режиме. Значение по умолчанию:false.
- При
Пример настройки:
connector:tarantool.connection.groups: tarantool://host1:3301/?user=admin&password=secret&size=2, tarantool://host2:3301/?user=admin&password=secretinsert.mode: replaceprimary.key:mode: record_valuefields: iddelete.enabled: truetruncate.enabled: falsedecimal.handling.mode: precisedatetime:handling.mode: datetimeinfinity.handling.enabled: false
К коннекторам Debezium относятся:
- Коннекторы к Источнику и Приемнику данных PostgreSQL.
- Коннекторы к Источнику и Приемнику данных Oracle.
Пульс (heartbeat) — механизм, обеспечивающий периодическое обновление позиции чтения в журнале упреждающей записи (WAL) источника данных независимо от наличия изменений в отслеживаемых коннектором таблицах. Благодаря этому механизму контрольные точки сохраняются с заданной периодичностью и сокращается объем WAL, который требуется повторно прочитать при восстановлении коннектора после сбоя.
Коннектор к Источнику сохраняет контрольные точки записи только при обработке событий из отслеживаемых таблиц. Если в этих таблицах нет изменений, позиция чтения журнала упреждающей записи не обновляется.
При перезапуске коннектор продолжает чтение журнала упреждающей записи с последней сохраненной контрольной точки. Если между этой точкой и текущей позицией в журнале находится большой объем записей из других таблиц, коннектор просматривает их все, прежде чем дойдет до конца. Время восстановления работы напрямую зависит от объема журнала, который необходимо прочитать.
Пульс создает искусственные изменения в отдельной созданной служебной таблице, которые попадают в журнал упреждающей записи, прочитываются и отдаются коннектором в обработчик. При обработке таких событий их контрольные точки сохраняются, что позволяет не перечитывать большие куски журнала репликации при возобновлении переноса данных.
В результате при перезапуске коннектору требуется прочитать часть журнала от последней контрольной точки до момента восстановления. Это сокращает время восстановления коннектора после сбоев и помогает периодически очищать журнал упреждающей записи, снижая потребление дисковой памяти на стороне базы данных Источника.
Для правильной работы Пульса в базе данных Источника необходимо создать служебную таблицу, в которую коннектор будет вносить изменения.
Пример создания служебной таблицы для PostgreSQL:
CREATE TABLE public.heartbeat (id SERIAL PRIMARY KEY,ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
Пример создания служебной таблицы для Oracle:
CREATE TABLE heartbeat_table (id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
Работа Пульса настраивается параметрами:
heartbeat.action.query- SQL-запрос, выполняемый на Источнике данных. Используется для внесения изменений в служебную таблицу. При получении этого запроса контрольная точка в WAL Источника обновляется. Таблица, указываемая в запросеheartbeat.action.queryдолжна быть включена в список наблюдаемых таблицtable.include.listконнектора, иначе коннектор не увидит изменения и контрольная точка не обновится.heartbeat.interval.ms- интервал отправки Пульс-сообщений. Значение0отключает Пульс. Рекомендуемое значение:10000(10 секунд).
Пример для PostgreSQL:
connector:heartbeat.interval.ms: 30000heartbeat.action.query: "INSERT INTO heartbeat (ts) VALUES (NOW())"
Пример для Oracle с LogMiner:
connector:heartbeat.interval.ms: 30000heartbeat.action.query: "INSERT INTO heartbeat_table VALUES (SYSDATE)"
При использовании коннектором Oracle OLR (OpenLogReplicator) адаптера с флагом SHOW_CHECKPOINT (flags = 4096 (0x1000))
запрос heartbeat.action.query не нужен при включенном Пульсе, т.к.:
- OLR отправляет события
CHECKPOINTна границах LWN (Log Writer Number) в redo log - независимо от наличия изменений в наблюдаемых таблицах. - При получении событий
CHECKPOINTиCOMMITот OLR, коннектор сразу отправляет Пульс, не дожидаясь окончания интервала времени. - Это гарантирует, что контрольные точки обновляются регулярно даже при отсутствии изменений в наблюдаемых таблицах.
Важно: heartbeat.interval.ms должен быть включен (значение > 0), иначе Пульс не будет работать
даже с контрольными точками OLR.
Пример конфигурации OLR:
{"source": [{"flags": 4096}]}
Пример конфигурации Debezium с OLR:
connector:heartbeat.interval.ms: 10000# heartbeat.action.query не требуется
СУБД/коннектор | Поток изменений в наблюдаемые таблицы | Поток изменений в ненаблюдаемые таблицы | heartbeat.interval.ms | heartbeat.action.query | Примечания |
|---|---|---|---|---|---|
PostgreSQL | Большой | Большой | Не нужен | Не нужен | Нормальная работа |
PostgreSQL | Большой | Малый | Не нужен | Не нужен | Нормальная работа |
PostgreSQL | Малый | Большой | 10000-30000 | Обязателен | WAL растет |
PostgreSQL | Малый | Малый | 10000-30000 | Не обязателен | Мониторинг |
SQL Server | Малый | Большой | 10000-30000 | Обязателен | Записи журнала растут |
Oracle + OLR (флаг 4096) | Малый | Большой | 10000-30000 | Не нужен | OLR шлет контрольную точку |
Oracle + LogMiner | Малый | Большой | 10000-30000 | Обязателен | — |
Tarantool CDC поддерживает подключение сторонних коннекторов, которые не входят в основную поставку TCDC. Коннектор может быть представлен как один или несколько взаимосвязанных JAR-файлов. Эти файлы нужно добавить на тот же уровень, на котором находятся коннекторы из основной поставки.
Добавление сторонних коннекторов различается для экземпляров Tarantool CDC, развернутых:
- В среде Kubernetes
- При помощи инструментов Ansible Tarantool Enterprise
Для того, чтобы добавить сторонние коннекторы к установке Tarantool CDC, развернутой в среде Kubernetes:
- В реестр контейнеров (Container Registry) вашего образа добавьте образ, который содержит один или несколько JAR-файлов вашего коннектора.
-
В файле конфигурации ссылка
values.ymlукажите этот образ в разделеconnectorPackages:connectorPackages:debezium-mysql:image:repository: registry.example.local/cdc-connectors/extra_connetcor # адрес в реестре контейнеровtag: "1.0"pullPolicy: IfNotPresentclasses:- io.debezium.connector.mysql.MySqlConnector # класс коннектора -
В файле конфигурации Helm-чарта
values.ymlукажите класс этого коннектора в разделеsinksилиsources:sources: # или sinks, в зависимости от типа коннектораmysql:common:connector:class: io.debezium.connector.mysql.MySqlConnector -
Проверьте, что Helm чарт корректно генерирует шаблоны с указанными параметрами:
helm template my-release ./helm-chart-cdc --values values.yml | grep -A5 "initContainers"helm template my-release ./helm-chart-cdc --values values.yml | grep -A3 "mountPath: /libs/connect" -
Перезапустите ваш экземпляр Tarantool CDC:
helm upgrade my-release ./helm-chart-cdc --values values.ymlkubectl rollout restart deployment -l app.kubernetes.io/instance=my-releasekubectl rollout status deployment -l app.kubernetes.io/instance=my-release -
После перезапуска в журнале Обработчика должна появиться строка с перечнем доступных коннекторов, в которой присутствует класс из вашего коннектора. Пример:
io.tarantool.cdc.source.SourceConfiguration : available source connectors: [PluginDesc{klass=class io.debezium.connector.mysql.MySqlConnector, name='io.debezium.connector.mysql.MySqlConnector',version='2.7.0.Final', encodedVersion=2.7.0.Final, type=source,typeName='source', location='file:/libs/connect/source/debezium-mysql/'},...]Если в записи перечислены только коробочные коннекторы, проверьте параметр
source.plugin.path/sink.plugin.path.
1. Сборка и публикация образ коннектора.
Один JAR:
FROM busybox:1.36COPY my-connector-1.0.jar /connectors/
Несколько JAR:
Файлы .jar всех коннекторов должны быть собраны внутри папки /connectors/.
В файле pom.xml должны быть перечислены артефакты коннектора. Далее артефакты и их транзитивные зависимости устанавливаются
в одну общую папку.
FROM maven:3.9-eclipse-temurin-17 AS builderWORKDIR /buildCOPY pom.xml .RUN mvn -q dependency:copy-dependencies -DoutputDirectory=/connectorsFROM busybox:1.36COPY --from=builder /connectors /connectors/
Проверка содержимого:
docker run --rm registry.example.local/cdc-connectors/debezium-mysql:1.0 ls -la /connectors/docker run --rm registry.example.local/cdc-connectors/debezium-mysql:1.0 sh -c "cp -r /connectors/. /tmp/ && ls /tmp/"
2. Внесение изменений в values.yml Helm-чарта.
Добавьте раздел верхнего уровня connectorPackages, укажите в нем образ коннектора. Затем добавьте в
раздел sources описание подключения к Источнику, или в раздел sinks описание подключения к
Приемнику, с нужным классом коннектора.
в sources / sinks тот же connector.class, что перечислен в classes пакета.
3. Проверка Helm values перед установкой:
См. 4 шаг в Добавление сторонних коннекторов через Kubernetes
Проверка журнала после установки выполняется так же, как в шаге 6
общей инструкции — в строке
available source connectors: [...] (или available sink connectors: [...]) должен присутствовать
указанный класс коннектора.
Для того, чтобы добавить сторонние коннекторы к установке Tarantool CDC, развернутой при помощи
инструментов Ansible Tarantool Enterprise, выполните скрипт scripts/install-connector.sh с указанием класса коннектора и пути к архиву
или папке, в которой находится коннектор. Скрипт может содержать следующие параметры и команды:
--class <FQCN>- полное имя класса коннектора, напримерcom.example.MysqlSourceConnector. Обязательный параметр.--name <имя>- имя коннектора. По умолчанию определяется из имени JAR-файла, в котором найден указанный класс. Если класс находится в нескольких JAR-файлах и--nameне задан, для выбора JAR используется--version(см. ниже); если этого недостаточно, скрипт остановится и предложит указать--nameявно. Если класс не удалось найти ни в одном JAR, имя берется из имени архива/папки.--version <версия>- версия коннектора. По умолчанию определяется из имени JAR-файла, в котором найден указанный класс. Если класс найден в нескольких JAR-файлах,--versionиспользуется как фильтр для выбора нужного JAR.--install-dir <путь>- папка для установки сторонних коннекторов. Значение по умолчанию:/opt/tarantool-cdc/custom-connectors. Рекомендуется устанавливать сторонние коннекторы вне папки с основным архивом Tarantool CDC/releases, чтобы обновление продукта не приводило к их изменению или удалению.--tcdc-dir <путь>- папка, в которой установлен экземпляр Tarantool CDC. Значение по умолчанию:/opt/tarantool-cdc/releases/current.--role source\|sink- проверка, что коннектор соответствует требуемой роли (Источник или Приемник).--validate-only- выполняет статические проверки коннектора. Команда проверяет:- Целостность переданного архива (
.zip,.tar.gz); - Наличие JAR-файлов;
- Отсутствие дублирующихся артефактов;
- Наличие указанного класса коннектора внутри JAR-файлов;
- Совместимость байткода (не выше Java 17);
- Роль коннектора (
SourceConnectorилиSinkConnector).
- Целостность переданного архива (
Пример:
install-connector.sh \--class io.debezium.connector.mysql.MySqlConnector \--role source \debezium-connector-mysql-2.7.0.Final-plugin.zip
После успешной установки скрипт создает следующую структуру:
/opt/tarantool-cdc/custom-connectors/└── <имя-коннектора>/└── <версия>/├── connector.jar├── dependency-1.jar└── ... # все JAR-файлы из архива (не более 2 уровней вложенности)
И выводит готовый фрагмент для файла настроек:
=== Add to worker.yaml ===source: # или 'sink:', если это коннектор к Приемнику данныхplugin.path: /opt/tarantool-cdc/custom-connectors/<имя>/<версия>connector.class: <FQCN>
После установки необходимо указать путь к коннектору в конфигурации Универсального Обработчика.
В параметре source.plugin.path (или
sink.plugin.path для коннектора к Приемнику)
нужно указать путь до папки с версией коннектора:
source:plugin.path: /opt/tarantool-cdc/custom-connectors/debezium-connector-mysql/2.7.0.Finalconnector:class: io.debezium.connector.mysql.MySqlConnector# параметры коннектора...
После изменения конфигурации необходимо перезапустить Универсальный Обработчик через ATE.
Проверка архива без установки:
install-connector.sh \--class io.debezium.connector.mysql.MySqlConnector \--validate-only \debezium-connector-mysql-2.7.0.Final-plugin.zip
Установка в нестандартную директорию с выводом планируемых действий:
install-connector.sh \--class io.debezium.connector.mysql.MySqlConnector \--install-dir /app/cdc/custom-connectors \--dry-run \debezium-connector-mysql-2.7.0.Final-plugin.zip
Одиночные преобразователи (Single Message Transformations - SMT) - это модули, с помощью которых можно изменять события, обрабатываемые в Tarantool CDC. Изменения могут включать в себя: преобразование структуры тела или ключа события, добавление, удаление, переименование полей; смена типов полей, смена темы событий, изменение заголовков событий.
Преобразователи могут быть настроены как для коннектора к Источнику,
так и для коннектора к Приемнику. Первые, если они определены,
применяются к событиям после того, как они прочитаны из Источника. Вторые применяются перед отправкой событий в целевую систему.
Преобразователи настраиваются в конфигурации коннекторов в разделах transforms.
Параметры преобразователей:
chain— список имен преобразователей в том порядке, в котором они должны применяться к событию. Имена преобразователей задаются пользователем произвольно.config— словарь, где каждый ключ — имя преобразователя, а значение — словарь с его настройками. Имена преобразователей должны совпадать с именами вchain.type- класс преобразователя. Остальные параметры зависят от конкретного типа преобразователя и передаются в него как настройки.
В целом в установке TCDC может использоваться ни одного, один, или несколько преобразователей. В последнем случае
преобразователи объединяются в последовательную цепочку для обработки событий:
сначала преобразователи коннектора к Источнику, в порядке, указанном в параметре source.transforms.chain; затем преобразователи
коннектора к Приемнику данных, в порядке, указанном в параметре sink.transforms.chain.
Пример настройки:
source:connector:... # параметры коннектораtransforms:chain: # порядок применения преобразователей- mongodb-unwrapconfig: # настройки каждого преобразователя коннектора к Источникуmongodb-unwrap:type: io.debezium.connector.mongodb.transforms.ExtractNewDocumentStatedrop.tombstones: falsesink:connector:... # параметры коннектораtransforms:chain: # порядок применения преобразователей- remap- module_2- ...config: # настройки каждого преобразователя коннектора к Приемникуremap:type: io.tarantool.cdc.transforms.smt.Remapmessages:- topic: cdc.inventory.userskey:type: struct... # остальные настройки преобразователя Remapmodule_2:type: org.apache.kafka.connect.transforms.RegexRouterregex: storage_(.*)...
Remap — это модуль для преобразования событий путем создания нового выходящего события с заданной структурой тела и
ключа с последующим заполнением их значениями из входящего события. К значениям могут быть применены различные
преобразования: из строки в число, из строки с датой и временем в логический тип Timestamp, из числа в строку и так далее.
На входе могут быть как типизированные записи, структура которых описывается постоянной схемой, так и нетипизированные, без схемы. На выходе запись всегда типизирована.
Общий алгоритм работы модуля:
- По теме (топику, topic) входящей записи Remap ищет в своей конфигурации объявление структуры новой записи. Если для данной темы объявление отсутствует, запись передается дальше без изменений.
- По найденной структуре создается новая запись.
- Значения извлекаются из входящей записи, проходят через заданные преобразования и присваиваются
соответствующим полям новой записи. Полям, которые обозначены как необязательные и для которых
не указаны ссылки на значения, присваивается
null. - Новая запись передается дальше. Входящая запись отбрасывается.
На верхнем уровне конфигурации модуля Remap определяется набор тем (топиков) с заданными описаниями выходных событий. Каждое описание преобразования состоит из двух частей:
- Объявление компонента данных ключа записи
key; - Объявление компонента данных тела записи
value.
Эти объявления могут быть составными, включать в себя вложенные части, что позволяет описывать произвольные древовидные структуры Объявления компонентов данных ключа и тела записи задаются по одним и тем же правилам.
Атрибуты объявления компонента данных:
type- тип компонента данных. Обязательный параметр. В зависимости от его значения другие атрибуты могут стать обязательными.optional- необязательность компонента данных:true- компонент данных необязательный. Если компонент данных не удается вычислить, то модуль присвоит ему значениеnull.false- компонент данных обязательный. Это значение используется по умолчанию. Если при этом значении компонент данных, или вложенный обязательный компонент любого уровня не является вычислимым, преобразователь Remap выбросит ошибку конфигурации при запуске обработчика. Правила вычислимости компонентов данных описаны отдельно для каждого типа.
params- набор строковых пар ключ-значение для тонкой настройки логических типов. Состав набора пар зависит от логического типа.hints- набор строковых пар ключ-значение для тонкой настройки преобразования извлекаемых данных. Состав набора зависит от применяемого преобразования.modifiers- набор строковых пар ключ-значение, в который входит обязательный элементchain, для указания последовательности применяемых действий над полученным конечным значением перед записью его в поле выходного сообщения. Набор действий зависит от типа компонента данных.default- значение по умолчанию. Не является обязательным, если задан атрибутpath. Поддерживается только для простых типов.path- ссылка на компонент данных во входящей записи. Может быть необязательным, если задан атрибутdefault. Приtype: structатрибутpathне должен задаваться.version- версия схемы, не является обязательной.
Общий вид объявления компонента данных:
messages:- topic: <Тема сообщения>key: # Объявление компонента данных ключа новой записиtype: stringpath: $keyvalue: # Объявление компонента данных тела новой записиtype: <Тип>[/<Логический тип>]version: <Версия схемы>params:<Параметр-1>: <Значение-Параметра-1><Параметр-...>: <Значение-Параметра-...><Параметр-N>: <Значение-Параметра-N>optional: true | falsename: <Наименование поля в структуре>fields:- <Объявление поля 1>- <Объявление поля ...>- <Объявление поля N>items: <Объявление компонента данных элементов массива или значений словаря>key: <Объявление компонента данных ключей словаря>path: <Путь к значению во входящей записи>default: <Значение по умолчанию>hints:<Подсказка-1>: <Значение-Подсказки-1><Подсказка-...>: <Значение-подсказки-...><Подсказка-N>: <Значение-подсказки-N>modifiers:chain: <Цепочка действий над значением><Действие-1>: <Аргумент-Действия-1><Действие-...>: <Аргумент-Действия-...><Действие-N>: <Аргумент-Действия-N>- topic: topic_2key: <Объявление компонента данных ключа новой записи>value: <Объявление компонента данных тела новой записи>- topic: topic_Nkey: <Объявление компонента данных ключа новой записи>value: <Объявление компонента данных тела новой записи>
Тип struct (структура):
Структурный тип объединяет разнородные данные в единую структуру с постоянным набором полей. Для структурного типа
обязательно наличие не пустого списка полей fields. Объявления полей подобны
объявлениям компонентов данных, но для
каждого поля должен быть определен атрибут name. Тип поля может быть структурой, массивом или словарем, что позволяет
создавать древовидные структуры произвольной степени вложенности.
Пример:
type: structoptional: truefields: # Описания полей- name: atype: int32path: $value.a- name: btype: int64path: $key.b- name: ctype: structfields:- name: dtype: booleanpath: $value.enabled- name: etype: stringpath: $value.label
Компонент структурного типа является вычислимым при соблюдении хотя бы одного из следующих условий:
- Необязательная структура (
optional: true), у которой нет вычислимых полей — при вычислении компоненту будет присвоенnull. - Все поля структуры являются вычислимыми (рекурсивно).
Тип array (массив):
Этот тип объединяет однородные данные в ряд переменной длины.
Объявление в атрибуте items является объявлением компонента данных и не должно быть пустым. В нем и в его вложениях
атрибуты path могут отсутствовать или же указывать на значения. При этом первый элемент path может ссылаться как на
текущий обрабатываемый элемент входного массива ($), так и на входящую запись и ее служебные
поля ($value, $key, $header, $timestamp, $partition). Тип описываемого значения также может быть структурой,
массивом или словарем, что позволяет создавать вложенные структуры произвольной глубины.
Атрибут path объявления массива указывает на расположение во входящей записи массива, на основе которого
создается выходной массив в поле новой записи. Выходной массив получается путем преобразования элементов входного
массива в соответствии с объявлением из атрибута items.
Если path отсутствует, то должен быть объявлен атрибут optional: true.
Пример:
type: arrayitems: # Объявление компонента данных элементов массиваtype: stringpath: $.a
Значение компонента данных структурного типа может быть вычислено, если соблюдается хотя бы одно условие из следующих:
- У структуры нет вычислимых полей, значение атрибута
optionalравноtrue- значением этого компонента будетnull. - Все поля структуры являются вычислимыми.
Тип map (словарь):
Этот тип данных объединяет пары ключ-значение в контейнер данных переменной длины, где ключ и значение могут иметь свои типы.
Объявление словаря подобно объявлению массива, добавляется только атрибут key, описывающий структуру ключа. Объявления в
атрибутах key и items описывают компоненты данных и не должны быть пустыми. Для ключей словаря допустимы
только простые типы, а также не должен указываться атрибут path - ключи выходного словаря получаются из значений ключей входного
словаря. В объявлении из атрибута items и в его вложениях атрибуты path могут отсутствовать или же указывать на значения. При
этом первый элемент path может ссылаться как на текущее обрабатываемое значение входного словаря ($), так и на входящую запись
и ее служебные поля ($value, $key, $header, $timestamp, $partition). Тип описываемого значения также может быть структурой,
массивом или словарем, что позволяет создавать вложенные структуры произвольной глубины.
Атрибут path объявления словаря указывает на расположение во входящей записи словаря, на основе которого создается выходной словарь
в поле новой записи. Выходной словарь получается путем преобразования всех ключей и значений входного словаря в соответствии
с объявлениями из атрибутов key и items. Если path отсутствует, то должен быть объявлен атрибут optional: true.
Пример:
type: mapkey: # Объявление компонента данных ключей словаряtype: int32items: # Объявление компонента данных значений словаряtype: stringpath: $.a # Путь к исходному словарю во входящей записи
Значение компонента данных типа "словарь" может быть вычислено, если соблюдается хотя бы одного из следующих условий:
- Задан
pathи значения ключей (key) и значений (items) являются вычислимыми. - Не задан
path, но при этомoptional: true— при вычислении компоненту может быть присвоенnull.
Простые типы:
К простым типам относятся все остальные типы, которые представляют простые (атомарные или скалярные) значения:
int8- целое число со знаком, разрядность - 8 бит.int16- целое число со знаком, разрядность - 16 бит.int32- целое число со знаком, разрядность - 32 бита.int64- целое число со знаком, разрядность - 64 бита.float32- число с плавающей запятой и знаком, разрядность - 32 бита.float64- число с плавающей запятой и знаком, разрядность - 64 бита.string- строка (unicode).bytes- байтовая строка.boolean- двоичный тип (true,false).
Компонент простого типа является вычислимым при соблюдении хотя бы одного из следующих условий:
- Не заданы атрибуты
path,default, но заданoptional: true— при вычислении такому компоненту может быть присвоенnull. - Задан
default, заданoptional: false— при вычислении такому компоненту будет присвоено значение по умолчанию. - Задан
path, заданoptional: false— значение компонента будет извлечено из входящей записи.
Атрибут path отвечает за указание значения во входящей записи. Представляет собой последовательность имен полей, разделенных точкой.
Точка обозначает путь до нужного значения в исходной структуре. Первым элементом пути должен быть служебный символ, указывающий на
корневую структуру для поиска значения. Описание служебных символов:
$- в качестве корня для поиска значения по пути берется текущий обрабатываемый элемент - элемент входного массива или значение входного словаря.$key- в качестве корня для поиска значения по пути берется ключ входящей записи.$value- в качестве корня для поиска значения по пути берется тело входящей записи.$header- указывает на поиск значения в заголовках входящей записи. Путь, начинающийся с$header, состоит только из двух частей: самого служебного символа и названия заголовка, откуда брать значение.$timestamp- указывает на временную метку входящей записи. После этого служебного символа никаких других символов быть не должно.$partition- указывает на номер части топика (partition) входящей записи. После этого служебного символа никаких других символов быть не должно.
Примеры:
$- значение текущего элемента массива или словаря берется целиком. Например, для массивов данных простого типа (например, массив строк), необходимо просто взять строку как значение целиком.$.person.birthdate- дата рождения извлекается из поляbirthdateструктуры, вложенной в полеpersonструктуры текущего обрабатываемого элемента входящего массива.$value.before.price- цена извлекается из поляpriceструктуры, вложенной в полеbeforeструктуры тела записи.$key.user_id- номер пользователя расположен в полеuser_idструктуры ключа записи.$header.x-timestamp- временная метка извлекается из заголовка входящей записиx-timestamp.$timestamp- в качестве значения используется временная метка входящей записи.$partition- в качестве значения используется номер части топика входящей записи.
Нужен для точного описания выходных типов Kafka Connect в схеме конечной записи. Kafka Connect использует набор базовых типов,
поверх которого наложено множество логических типов, для каждого из которых может быть задан свой набор параметров.
Например, для логического типа org.apache.kafka.connect.data.Decimal необходимо задать параметр scale.
Любые другие логические типы из других экосистем, поддерживающих соглашения Kafka Connect,
используют эти наборы параметров для выполнения внутренних преобразований.
Модификаторы modifiers задают набор действий над конечным значением. Эти действия выполняются
после извлечения значения из исходной записи и приведения к целевому типу,
указанному в схеме.
Модификаторы определены только для простых типов. Для каждого типа поддерживается свой набор модификаторов.
Последовательность действий задается особым атрибутом chain, где строкой
через запятую перечисляются действия. Каждое действие представлено именем и
аргументом, тем самым, является функцией от двух аргументов, где первым
аргументом всегда является текущее значение.
Исключение составляет только двоичный тип: для него не требуется указание
chain, только указание флага negate.
Общий алгоритм работы модификаторов:
- Действие берется из списка
chain. - Указанное действие применяется к текущему значению (извлеченному и преобразованному). Получается новое значение.
- Переход к следующему действию из списка
chain:- Если действий больше нет, полученное значение является итоговым.
- Иначе повторяются шаги 1-3.
Модификаторы типа boolean:
Для двоичного типа есть смысл только в одном действии - в обращении исходного значения.
negate- логическое отрицание. Не требует указания аргумента.
Модификаторы типа int8:
Для байтового типа реализованы побитовые операции.
r_shift- сдвинуть все биты вправо на указанное число шагов.l_shift- сдвинуть все биты влево на указанное число шагов.b_and- двоичное "И".b_or- двоичное "ИЛИ".b_xor- двоичное исключающее "ИЛИ".b_not- двоичное отрицание. Не требует указания аргумента.
Модификаторы для типов int16, int32, int64, float32, float64:
add- прибавить к текущему значению некоторое число.sub- вычесть из текущего значения некоторое число.mul- умножить текущее значение на некоторое число.div- разделить текущее значение на некоторое число.
Пример:
type: int32path: $value.pricemodifiers:chain: add,muladd: 100mul: 2
Пример показывает вычисление выражения ((X + 100) * 2), где X - извлеченное
и приведенное к конечному типу значение.
Подсказки hints дают дополнительную информацию для точного преобразования значений.
Например, для преобразования строки в логические типы Kafka Connect для работы
с датой и временем, подсказки позволяют указать формат для разбора строки,
которая может содержать дату и время в формате, отличном от ISO8601. При
приведении строк и числовых данных к логическому типу Decimal в подсказках
можно указать правило округления.
Поддерживаемые подсказки:
math-round- для логического типаDecimal, указывающая на то, какое правило округления должно быть применено к получаемому значению типаDecimal. Допустимые значения (см. подробнее документацию):CEILING;DOWN;FLOOR;HALF_DOWN;HALF_EVEN;HALF_UP;UNNECESSARY;UP.
date-format- применяется для преобразований строк в тип датыorg.apache.kafka.connect.data.Date. В подсказке указывается формат даты для разбора строки.time-format- применяется для преобразований строк в тип времениorg.apache.kafka.connect.data.Time. В подсказке указывается формат времени для разбора строки.datetime-format- применяется для преобразований строк в тип даты и времениorg.apache.kafka.connect.data.Timestamp. В подсказке указывается формат даты и времени для разбора строки.
Логические типы - механизм системы типов Kafka Connect, позволяющий расширить множество поддерживаемых типов данных. Но полноценной поддержкой обладают только логические типы от самого Kafka Connect. Встроенная поддержка этих типов означает, что при сериализации данных в транспортный формат (AVRO или JSON) будет использовано значение основного типа, но при десериализации данных значение будет представлено экземпляром некоторого класса Java, представляющего значение логического типа.
Поддерживаемые логические типы Kafka Connect:
org.apache.kafka.connect.data.Decimal- значения представлены классомjava.math.BigDecimal;org.apache.kafka.connect.data.Date- значения представлены классомjava.util.Date;org.apache.kafka.connect.data.Time- значения представлены классомjava.util.Date;org.apache.kafka.connect.data.Timestamp- значения представлены классомjava.util.Date.
В преобразователе Remap они также поддержаны на низком уровне. Другие
логические типы не поддерживаются, но их можно задать в качестве указания на,
что значения можно обработать соответствующими классами, например, из
экосистемы Debezium. Логический тип указывается через косую черту (/) после
названия основного типа.
Пример:
type: int32/org.apache.kafka.connect.data.Datepath: $value.birthdate
В примере значение $value.birthdate приводится к логическому типу и
представляется экземпляром класса java.util.Date. Способ преобразования
зависит от типа исходного значения. Если исходное значение - число (int8,
int16, int32, int64, float32, float64), то оно будет приведено к
типу int32 (в зависимости от начального типа - с потерями или без), а затем
на основе полученного числа будет создан экземпляр класса java.util.Date.
Если исходное значение - строка и указана подсказка формата (format), то
преобразование выполняется прямым разбором строки по указанному формату. В
случае без указания формата строка приводится к числу int32 с последующим
созданием экземпляра класса java.util.Date.
Ниже представлен пример преобразования типизированного сообщения для уменьшения количества полей.
Ключ исходного и итогового сообщений из топика city_architects:
Примечание: описание схемы опущено для краткости для краткости.
{"schema": {...},"payload": "250d635d-5fea-4e13-ab71-05058398b690"}
Тело исходного сообщения:
Примечание: описание схемы опущено для краткости.
{"schema": {...},"payload": {"id": 0,"name": "Петр","surname": "Берш","patronymic": "Петрович","birthdate": "16.12.1933"}}
Преобразование:
messages:- topic: city_architectskey:type: stringpath: $keyvalue:type: structfields:- name: idtype: int64path: $value.id- name: nametype: stringpath: $value.name- name: surnametype: stringpath: $value.surname
Выходное сообщение:
Примечание: описание схемы опущено для краткости.
{"schema": {...},"payload": {"id": 0,"name": "Петр","surname": "Берш",}}
Начальный тип | Конечный тип | Параметры | Описание преобразования |
|---|---|---|---|
boolean | boolean | ||
string | boolean | Если входящая строка равна "true", то выходное значение равно ИСТИНА. Во всех остальных случаях выходное значение равно ЛОЖЬ. | |
int8 | boolean | Если числовое значение не равно 0, то выходное значение равно ИСТИНА, иначе равно ЛОЖЬ. | |
int16 | boolean | Если числовое значение не равно 0, то выходное значение равно ИСТИНА, иначе равно ЛОЖЬ. | |
int32 | boolean | Если числовое значение не равно 0, то выходное значение равно ИСТИНА, иначе равно ЛОЖЬ. | |
int64 | boolean | Если числовое значение не равно 0, то выходное значение равно ИСТИНА, иначе равно ЛОЖЬ. | |
Decimal | boolean | Если числовое значение не равно 0, то выходное значение равно ИСТИНА, иначе равно ЛОЖЬ. | |
bytes | bytes | Выходное значение равно входному значению. | |
string | bytes | К строке будет применяться преобразование из строки формата Base64 в байтовую строку. Если строка не является Base64, будет выброшено исключение. | |
boolean | bytes | Если входное значение равно ИСТИНА, то выходное значение - байтовая строка с одним байтом: 0 в случае значения ЛОЖЬ или 1 в случае значения ИСТИНА. | |
int8 | bytes | Байтовая строка, содержащая один байт, равный входному значению. | |
int16 | bytes | Байтовая строка, содержащая байтовое представление входного значения в порядке от старшего байта к младшему. | |
int32 | bytes | Байтовая строка, содержащая байтовое представление входного значения в порядке от старшего байта к младшему. | |
int64 | bytes | Байтовая строка, содержащая байтовое представление входного значения в порядке от старшего байта к младшему. | |
float32 | bytes | Байтовая строка, содержащая байтовое представление входного значения в порядке от старшего байта к младшему. | |
float64 | bytes | Байтовая строка, содержащая байтовое представление входного значения в порядке от старшего байта к младшему. | |
int8 | float32 | Приведение целочисленного типа к типу с плавающей запятой. | |
int16 | float32 | Приведение целочисленного типа к типу с плавающей запятой. | |
int32 | float32 | Приведение целочисленного типа к типу с плавающей запятой. | |
int64 | float32 | Приведение целочисленного типа к типу с плавающей запятой. | |
Decimal | float32 | Приведение типа к типу с плавающей запятой с потерей точности. | |
boolean | float32 | Если входящее значение - ИСТИНА, то выходящее значение - 1.0, иначе - 0.0. | |
string | float32 | Разбор строкового представления значения с плавающей запятой. Если строка сформирована неправильно, будет выброшено исключение. | |
int8 | float64 | Приведение целочисленного типа к типу с плавающей запятой. | |
int16 | float64 | Приведение целочисленного типа к типу с плавающей запятой. | |
int32 | float64 | Приведение целочисленного типа к типу с плавающей запятой. | |
int64 | float64 | Приведение целочисленного типа к типу с плавающей запятой. | |
org.apache.kafka.connect.data.Decimal | float64 | Приведение к типу с плавающей запятой с потерей точности. | |
boolean | float64 | Если входящее значение - ИСТИНА, то выходящее значение - 1.0, иначе - 0.0. | |
string | float64 | Разбор строкового представления значения с плавающей запятой. Если строка сформирована неправильно, будет выброшено исключение. | |
int8 | int8 | Выходное значение равно входному значению. | |
int16 | int8 | Выходное значение - младший байт входящего значения (возможна потеря данных). | |
int32 | int8 | Выходное значение - младший байт входящего значения (возможна потеря данных). | |
int64 | int8 | Выходное значение - младший байт входящего значения (возможна потеря данных). | |
org.apache.kafka.connect.data.Decimal | int8 | Выходное значение - младший байт значения, представляющего значение BigDecimal. | |
boolean | int8 | Если входящее значение - ИСТИНА, исходящее - 1. В противном случае - 0. | |
string | int8 | Разбор строкового представления байтового значения. Корректное строковое представление - множество строк, представляющих значения от -128 до 127. | |
int8 | int16 | Приведение к типу int16 (расширение разрядности). | |
int16 | int16 | Выходное значение равно входному значению. | |
int32 | int16 | Выходное значение - 2 младших байта входного значения (уменьшение разрядности с потерей данных). | |
int64 | int16 | Выходное значение - 2 младших байта входного значения (уменьшение разрядности с потерей данных). | |
org.apache.kafka.connect.data.Decimal | int16 | Выходное значение - 2 младших байта значения, представляющего значение BigDecimal. | |
boolean | int16 | Если входное значение - ИСТИНА, то выходное - 1. В противном случае - 0. | |
string | int16 | Разбор строкового представления короткого целого значения. Корректное строковое представление - множество строк, представляющих значения от -32768 до 32767. В противном случае будет выброшено исключение. | |
int8 | int32 | Приведение к типу int32 (расширение разрядности). | |
int16 | int32 | Приведение к типу int32 (расширение разрядности). | |
int32 | int32 | Выходное значение равно входному значению. | |
int64 | int32 | Выходное значение - 4 младших байта входного значения (уменьшение разрядности с потерей данных). | |
org.apache.kafka.connect.data.Decimal | int32 | Выходное значение - 4 младших байта значения, представляющего значение BigDecimal. | |
boolean | int32 | Если входное значение - ИСТИНА, то выходное - 1. В противном случае - 0. | |
string | int32 | Разбор строкового представления целого значения. Корректное строковое представление - множество строк, представляющих значения от -2147483648 до 2147483647. В противном случае будет выброшено исключение. | |
int8 | int64 | Приведение к типу int64 (расширение разрядности). | |
int16 | int64 | Приведение к типу int64 (расширение разрядности). | |
int32 | int64 | Приведение к типу int64 (расширение разрядности). | |
int64 | int64 | Выходное значение равно входному значению. | |
org.apache.kafka.connect.data.Decimal | int64 | Выходное значение - 8 байт значения, представляющего значение BigDecimal. | |
boolean | int64 | Если входное значение - ИСТИНА, то выходное - 1. В противном случае - 0. | |
string | int64 | Разбор строкового представления целого значения. Корректное строковое представление - множество строк, представляющих значения от -9223372036854775808 до 9223372036854775807. В противном случае будет выброшено исключение. | |
org.apache.kafka.connect.data.Date | org.apache.kafka.connect.data.Timestamp | Расширение типа c org.apache.kafka.connect.data.Date до org.apache.kafka.connect.data.Timestamp | |
org.apache.kafka.connect.data.Time | org.apache.kafka.connect.data.Timestamp | Расширение типа c org.apache.kafka.connect.data.Time до org.apache.kafka.connect.data.Timestamp | |
org.apache.kafka.connect.data.Timestamp | org.apache.kafka.connect.data.Timestamp | Выходное значение равно входному значению. | |
boolean | org.apache.kafka.connect.data.Timestamp | Входное значение, приведенное к типу int64, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Timestamp. | |
int8 | org.apache.kafka.connect.data.Timestamp | Входное значение, приведенное к типу int64, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Timestamp. | |
int16 | org.apache.kafka.connect.data.Timestamp | Входное значение, приведенное к типу int64, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Timestamp. | |
int32 | org.apache.kafka.connect.data.Timestamp | Входное значение, приведенное к типу int64, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Timestamp. | |
int64 | org.apache.kafka.connect.data.Timestamp | Входное значение, приведенное к типу int64, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Timestamp. | |
org.apache.kafka.connect.data.Decimal | org.apache.kafka.connect.data.Timestamp | Входное значение, приведенное к типу int64, трактуется как количество миллисекунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Timestamp. | |
string | org.apache.kafka.connect.data.Timestamp | hints.datetime-format | Если задан параметр hints.datetime-format, то строка будет разбираться в соответствии с указанным форматом даты-времени напрямую в экземпляр java.util.Date. В противном случае строка будет приводиться к типу int64 (см. преобразования к int64). |
org.apache.kafka.connect.data.Date | org.apache.kafka.connect.data.Date | Выходное значение равно входному значению. | |
org.apache.kafka.connect.data.Time | org.apache.kafka.connect.data.Date | Преобразование не имеет смысла, т.к. данные и по времени и дате обнуляются. | |
org.apache.kafka.connect.data.Timestamp | org.apache.kafka.connect.data.Date | Усекаются временные данные, остается только дата. | |
boolean | org.apache.kafka.connect.data.Date | Входное значение, приведенное к типу int32, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Date. | |
int8 | org.apache.kafka.connect.data.Date | Входное значение, приведенное к типу int32, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Date. | |
int16 | org.apache.kafka.connect.data.Date | Входное значение, приведенное к типу int32, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Date. | |
int32 | org.apache.kafka.connect.data.Date | Входное значение, приведенное к типу int32, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Date. | |
int64 | org.apache.kafka.connect.data.Date | Входное значение, приведенное к типу int32, трактуется как количество секунд с 1 января 1970 года. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Date. | |
string | org.apache.kafka.connect.data.Date | hints.date-format | Если задан параметр hints.date-format, то строка будет разбираться в соответствии с указанным форматом даты-времени напрямую в экземпляр java.util.Date. В противном случае строка будет приводиться к типу int32 (см. преобразования к int32). |
org.apache.kafka.connect.data.Date | org.apache.kafka.connect.data.Time | Преобразование не имеет смысла, т.к. данные и по времени и дате обнуляются. | |
org.apache.kafka.connect.data.Time | org.apache.kafka.connect.data.Time | Выходное значение равно входному значению. | |
org.apache.kafka.connect.data.Timestamp | org.apache.kafka.connect.data.Time | Время напрямую переводится как время с начала "эпохи". | |
boolean | org.apache.kafka.connect.data.Time | Входное значение, приведенное к типу int32, трактуется как количество секунд с начала суток. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Time. | |
int8 | org.apache.kafka.connect.data.Time | Входное значение, приведенное к типу int32, трактуется как количество секунд с начала суток. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Time. | |
int16 | org.apache.kafka.connect.data.Time | Входное значение, приведенное к типу int32, трактуется как количество секунд с начала суток. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Time. | |
int32 | org.apache.kafka.connect.data.Time | Входное значение, приведенное к типу int32, трактуется как количество секунд с начала суток. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Time. | |
int64 | org.apache.kafka.connect.data.Time | Входное значение, приведенное к типу int32, трактуется как количество секунд с начала суток. На основе этого числа будет создан экземпляр java.util.Date, являющийся логическим представлением типа org.apache.kafka.connect.data.Time. | |
string | org.apache.kafka.connect.data.Time | hints.time-format | Если задан параметр hints.time-format, то строка будет разбираться в соответствии с указанным форматом даты-времени напрямую в экземпляр java.util.Date. В противном случае строка будет приводиться к типу int32 (см. преобразования к int32). |
org.apache.kafka.connect.data.Decimal | org.apache.kafka.connect.data.Decimal | Выходное значение равно входному значению. | |
boolean | org.apache.kafka.connect.data.Decimal | hints.math-round | Если входное значение - ИСТИНА, то выходное значение - 1. В противном случае 0.Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления. |
float32 | org.apache.kafka.connect.data.Decimal | hints.math-round | Приведение к типу BigDecimal, являющимся представлением логического типа org.apache.kafka.connect.data.Decimal.Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления. |
float64 | org.apache.kafka.connect.data.Decimal | hints.math-round | Приведение к типу BigDecimal, являющимся представлением логического типа org.apache.kafka.connect.data.Decimal.Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления. |
int8 | org.apache.kafka.connect.data.Decimal | hints.math-round | Приведение к типу BigDecimal, являющимся представлением логического типа org.apache.kafka.connect.data.Decimal.Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления. |
int16 | org.apache.kafka.connect.data.Decimal | hints.math-round | Приведение к типу BigDecimal, являющимся представлением логического типа org.apache.kafka.connect.data.Decimal.Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления. |
int32 | org.apache.kafka.connect.data.Decimal | hints.math-round | Приведение к типу BigDecimal, являющимся представлением логического типа org.apache.kafka.connect.data.Decimal.Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления. |
int64 | org.apache.kafka.connect.data.Decimal | hints.math-round | Приведение к типу BigDecimal, являющимся представлением логического типа org.apache.kafka.connect.data.Decimal.Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления. |
string | org.apache.kafka.connect.data.Decimal | hints.math-round | Разбор строкового представления числа с ограниченной точностью. Если задан параметр hints.math-round, при создании экземпляра BigDecimal будет задано указанное правило округления. |
bytes | org.apache.kafka.connect.data.Decimal | Разбор байтовой строки, являющейся физическим представлением Decimal. | |
org.apache.kafka.connect.data.Date | string | hints.date-format | Дата кодируется в строку. В качестве формата кодирования используется либо iso8601, либо формат, переданный в параметре hints.date-format. |
org.apache.kafka.connect.data.Time | string | hints.time-format | Дата кодируется в строку. В качестве формата кодирования используется либо iso8601, либо формат, переданный в параметре hints.time-format. |
org.apache.kafka.connect.data.Timestamp | string | hints.datetime-format | Дата кодируется в строку. В качестве формата кодирования используется либо iso8601, либо формат, переданный в параметре hints.datetime-format. |
bytes | string | Преобразование байтовой строки в формат Base64. | |
int8 | string | Строковое представление. | |
int16 | string | Строковое представление. | |
int32 | string | Строковое представление. | |
int64 | string | Строковое представление. | |
float32 | string | Строковое представление. | |
float64 | string | Строковое представление. |
Средства мониторинга Tarantool CDC включают панель Grafana с показателями.
Для наблюдения за работой Tarantool CDC можно использовать специальную панель Grafana. Панель состоит из графиков и таблиц с показателями, которые обновляются в режиме реального времени. Графики и таблицы представлены в группе "Общая информация" и в группах метрик Универсальных Обработчиков, которые входят в установку. Ниже представлено описание элементов панели и нормальное поведение показателей. О том, на какие проблемы может указывать отход показателей от нормы, см. в статье.
В группу "Общая информация" входят следующие графики и таблицы:
- Workers - Показывает состав текущей установки TCDC. Количество запущенных Обработчиков и какие источники и приемники данных они обслуживают. В норме информация должна соответствовать заданной топологии установки Tarantool CDC.
- Total EPS - Общее количество событий в секунду, полученных из Источника и отправленных в Приемник. В норме значение возрастает.
- Events Per Second - Количество событий в секунду, полученных из Источника и отправленных в Приемник для каждого Обработчика. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
- Average Lag - Средняя задержка при переносе событий от Источника в Приемник для всех Обработчиков. Измеряется как разница во времени от появления события в Источнике до окончания его записи в Приемник Обработчиком. На точность показателя влияет рассинхронизация часов в распределенной системе. Для вычисления общего показателя используется среднее от максимальных значений среди всех компонентов системы. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
- Errors - Общая частота возникновения ошибок при обработке событий. В норме графики имеют значение 0.
Ошибки бывают следующих видов:
- Внутренние ошибки - любые ошибки, возникающие внутри Обработчиков во время их работы;
- Ошибки записи контрольных точек - ошибки, возникающие в моменты записи контрольных точек в хранилище состояния.
- Source to Connector Lag - Среднее время переноса событий от источников до коннекторов. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
- Connector to Worker Lag - Среднее время удержания событий в очередях коннекторов. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
- Worker to Target Lag - Среднее время записи событий в приемники. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.

Для каждого из Универсальных Обработчиков представлены следующие графики:
- Events Per Second - Количество событий в секунду, полученных из Источника и отправленных в Приемник Обработчиком. В норме графики стабильные, их уровень колеблется вокруг некоторого значения.
- Errors - Частота возникновения ошибок на Обработчике. В норме графики имеют значение 0. Ошибки бывают следующих видов:
- Внутренние ошибки - любые ошибки, возникающие внутри Обработчиков во время их работы;
- Ошибки записи контрольных точек - ошибки, возникающие в моменты записи контрольных точек в хранилище состояния.
- Throttling Time - Время, затраченное на ограничение трафика. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
- Lag - Задержка переноса событий от Источника в Приемник для Обработчика. Измеряется как разница во времени от появления события в Источнике до окончания его записи в Приемник Обработчиком. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
- Latencies - Сводный график задержек, из которых составляется график Lag. В норме все графики стабильные, их уровень колеблется вокруг некоторого значения.
- Source Task Poll Time Percentiles - Время опроса задачи Источника, в миллисекундах.
В норме при небольшом трафике
50,95и99перцентили принимают значение времени, заданное в настройках Обработчика (параметрworker.connector.poll.interval.ms), чаще всего это500миллисекунд. При большом трафике, если Обработчик успевает вычитывать сообщения из коннектора,95и99перцентили остаются на прежнем уровне или кратковременно снижаются,50-й перцентиль может упасть до миллисекунд. Когда Обработчик не успевает вычитывать сообщения, все перцентили, а также среднее время ожидания, снижаются до нескольких десятков миллисекунд. - Worker Batch Processing - Время, потраченное на обработку пакета событий Обработчиком. В норме графики стабильные, их уровень колеблется вокруг некоторого значения.
- Source to Connector Lag - Время переноса события с момента его появления в Источнике до момента его обработки коннектором к Источнику. В норме графики стабильные, их уровень колеблется вокруг некоторого значения.
- Connector to Worker Lag - Время удержания событий в очереди коннекторов Обработчика. В норме графики стабильные, их уровень колеблется вокруг некоторого значения.
- Worker to Target Lag - Время записи событий в Приемник. В норме графики стабильные, их уровень колеблется вокруг некоторого значения.
- Offsets - Количство событий в секунду, которое подтверждается Обработчиком. В норме уровень графика должен совпадать с уровнем графика Events Per Second Обработчика.
- Memory usage - Потребление памяти виртуальной машиной Java для Обработчика. В норме графики стабильные, их уровень
колеблется вокруг некоторого значения. Отображается три графика:
- Использовано - количество памяти в байтах, используемое обработчиками.
- Зарезервировано - количество памяти, зарезервированной виртуальной машиной Java для дальнейшего использования.
- Доступно - количество памяти, доступной для резервирования и использования виртуальной машиной Java.
- CPU Usage - Потребление процессорных ресурсов Обработчиком. В норме графики стабильные, их уровень колеблется вокруг
некоторого значения. Потребление показывается с трех точек зрения:
- На уровне процесса - сколько процессорного времени потребляется отдельно взятыми процессами Обработчика.
- На уровне системы - сколько процессорного времени потребляется процессами обработчика на уровне операционной системы.
- Среднее на уровне процесса за 15 минут - среднее потребление процессорных ресурсов на уровне отдельно взятых процессов за последние 15 минут.

Данная статья описывает возможные инциденты, их признаки и рекомендуемые действия для устранения неисправностей Tarantool CDC.
Возможные инциденты, отражающиеся в группе "Общая информация":
-
Workers
- Признак:
- Количество каких либо компонентов отличается от заданного в топологии.
- Реагирование:
- Проверить работу соответствующего компонента, получить логи.
- Признак:
-
Total EPS, Events Per Second
- Признак:
- Снижение или увеличение количества обрабатываемых событий.
- Реагирование:
-
Необходимо проверить работу Обработчиков:
- Ошибки репликации;
- Перезапуски - каждый перезапуск требует времени на восстановление работы и увеличивает задержку событий;
- Потребление памяти - высокое потребление памяти может привести к проблемам в сборке мусора;
- Сборщик мусора - пауза Stop The World;
- Сборщик мусора - частота;
- Сборщик мусора - поколения.
Также необходимо проверить:
- Состояние сети и инфраструктуры;
- Состояние очереди (если используется).
Если значения упали до 0 при работающей репликации, следует проверить синхронизацию часов в системе. Если показатели отсутствуют, следует проверить состояние Обработчиков, так как метрики, на основе которых считаются данные показатели, рассчитываются на них.
-
- Признак:
-
Average Lag
- Признак:
- Рост времени обработки события (устойчивый тренд или превышение порога в течение заданного окна).
- Показатели принимают значение 0 при работающей репликации.
- Отсутствие показателей.
- Реагирование:
- См. описание реагирования на симптомы графиков Total EPS и Events Per Second выше.
- Признак:
-
Errors
- Признак:
- Значение больше 0.
- Реагирование:
- При выбросах ошибок Сети или ошибок превышения времени ожидания ответа нужно проверить состояние инфраструктуры и сети.
- При наличии внутренних ошибок CDC нужно проверить ресурсы, потребляемые Обработчиками.
- При наличии ошибок контрольных точек нужно проверить состояние очереди (если используется), инфраструктуры и сети.
- Признак:
-
Source to Connector Lag, Connector to Worker Lag, Worker to Target Lag
- Признак:
- Рост времени обработки события (устойчивый тренд или превышение порога в течение заданного окна).
- Реагирование:
- См. описание реагирования на симптомы графиков Total EPS и Events Per Second выше.
- Признак:
Возможные инциденты, отражающиеся в группе графиков отдельного Универсального Обработчика:
-
Events Per Second
- Признак:
- Снижение или увеличение количества обрабатываемых событий.
- Реагирование:
-
Необходимо проверить работу Обработчика:
- Ошибки репликации;
- Перезапуски - каждый перезапуск требует времени на восстановление работы и увеличивает задержку событий;
- Потребление памяти - высокое потребление памяти может привести к проблемам в сборке мусора;
- Сборщик мусора - пауза Stop The World;
- Сборщик мусора - частота;
- Сборщик мусора - поколения.
Также необходимо проверить:
- Состояние сети и инфраструктуры;
- Состояние очереди (если используется).
Если значения упали до 0 при работающей репликации, следует проверить синхронизацию часов в системе. Если показатели отсутствуют, следует проверить состояние Обработчика, так как метрики, на основе которых считаются данные показатели, рассчитываются на нем.
-
- Признак:
-
Errors
- Признак:
- Значение больше 0.
- Реагирование:
- При выбросах ошибок Сети или ошибок превышения времени ожидания ответа нужно проверить состояние инфраструктуры и сети.
- При наличии внутренних ошибок CDC нужно проверить ресурсы, потребляемые Обработчиком.
- При наличии ошибок контрольных точек нужно проверить состояние очереди (если используется), инфраструктуры и сети.
- Признак:
-
Throttling Time
- Признак:
- Превышение значений показателей установленных пределов.
- Реагирование:
- См. описание реагирования на симптомы графика Events Per Second выше.
- Признак:
-
Lag
- Признак:
- Рост времени обработки события (устойчивый тренд или превышение порога в течение заданного окна).
- Показатели принимают значение 0 при работающей репликации.
- Отсутствие показателей.
- Реагирование:
- См. описание реагирования на симптомы графика Events Per Second выше.
- Признак:
-
Latencies
- Признак:
- Порог срабатывания сигнализации расчитывается экспериментально с учетом результатов нагрузочного тестирования.
- Реагирование:
- При превышении тех или иных зедержек допустимых значений, проверить остальные метрики, связанные со времением и ресурсами.
- Признак:
-
Source Task Poll Time Percentiles
- Признак:
- Порог срабатывания сигнализации расчитывается экспериментально с учетом результатов нагрузочного тестирования.
- Реагирование:
- Информационный показатель для разбора инцидентов. Время обработки ожидания, превышающее время для ожидания, заданное в настройках коннектора, означает медленную работу Обработчика.
- Признак:
-
Worker Batch Processing
- Признак:
- Рост времени обработки события (устойчивый тренд или превышение порога в течение заданного окна).
- Реагирование:
- См. описание реагирования на симптомы графика Events Per Second выше.
- Признак:
-
Source to Connector Lag
- Признак:
- Какой либо из показателей вырос до некоторого значения выше допустимого предела.
- Рост показателей выше некоторого значения.
- Показатели равны 0.
- Реагирование:
- Стабильный, но высокий показатель времени переноса означает медленную сеть, необхоимо проверить нагрузку на систему и сетевые настройки.
- Рост показателя обозначает, что Обработчик не успевает вычитывать сообщения из коннектора. Необходимо подобрать более оптимальные значения параметров коннектора, учитывая данные нагрузочного тестирования.
- Если показатели равны 0 для повторного подключения к репликации, то необходимо проверить синхронизацию часов в системе. Для первого подключения к репликации начало обработки будет выдавать нулевые значения времени переноса.
- Признак:
-
Connector to Worker Lag
- Признак:
- Порог срабатывания сигнализации расчитывается экспериментально с учетом результатов нагрузочного тестирования.
- Какой либо из показателей вырос до некоторого значения выше допустимого предела.
- Рост показателей выше некоторого значения.
- Реагирование:
- Большое время удержания событий в очереди коннектора означает, медленную работу Обработчика. Необходимо подобрать более оптимальные значения параметров коннектора, учитывая данные нагрузочного тестирования.
- Если показатели равны 0 для повторного подключения к репликации, то необходимо проверить синхронизацию часов в системе. Для первого подключения к репликации начало обработки будет выдавать нулевые значения времени переноса.
- Признак:
-
Worker to Target Lag
- Признак:
- Порог срабатывания сигнализации расчитывается экспериментально с учетом результатов нагрузочного тестирования.
- Какой либо из показателей вырос до некоторого значения выше допустимого предела.
- Рост показателей выше некоторого значения.
- Реагирование:
- Большое время записи событий в Приемник может влиять на работу Обработчика, замедляя его, и соответственно,
приводить к увеличению отставания всего процесса репликации. Необходимо проверить:
- Общую нагрузку на систему;
- Состояние сети.
- Большое время записи событий в Приемник может влиять на работу Обработчика, замедляя его, и соответственно,
приводить к увеличению отставания всего процесса репликации. Необходимо проверить:
- Признак:
-
Offsets
- Признак:
- Средний уровень количества сохраненных контрольных точек за некоторый период наблюдения (5 минут, 10 минут, 15 минут) ниже уровня количества обрабатываемых событий.
- Реагирование:
- Проверить ошибки контрольных точек, состояние очереди (если используется), инфраструктуры и сети.
- Признак:
-
Memory usage
- Признак:
- Превышение использованной памяти выше расчетного значения (в т.ч. по всем Обработчикам).
- Неограниченный рост использованной и зарезервированной памяти.
- Реагирование:
- Проверить показатели памяти для Обработчиков. Имеет смысла также проверить средний размер событий - количество потребляемой памяти может расти в зависимости от размера и количества обрабатываемых событий.
- Рост использованной и зарезервированной памяти указывает на утечку ресурсов.
- Признак:
-
CPU Usage
- Признак:
- Суммарное потребление ресурсов процессоров выше расчитанной по топологии квоты ядер.
- Превышение среднего потребления процессора за 15 минут за допустимый предел.
- Реагирование:
- Проверить показатели потребления ресурсов процессора для Воркеров. Имеет смысл проверить потребление памяти - при высоком
потреблении памяти и большом количестве обрабатываемых событий в секунду может наблюдаться возрастание нагрузки на сборщик мусора.
При наблюдении проблем на Обработчиках необходимо сбалансировать параметры Обработчика:
source.connector.max.batch.size;source.connector.batch.split.groups.
- При наблюдении проблем на Обработчике, у которого в качестве Источника данных выступает TQE, необходимо сбалансировать
параметр подписчика TQE
fetch_batch_sizeи скорость работы Обработчика, уменьшив значение параметра при необходимости.
- Проверить показатели потребления ресурсов процессора для Воркеров. Имеет смысл проверить потребление памяти - при высоком
потреблении памяти и большом количестве обрабатываемых событий в секунду может наблюдаться возрастание нагрузки на сборщик мусора.
При наблюдении проблем на Обработчиках необходимо сбалансировать параметры Обработчика:
- Признак:
Чтобы контролировать работоспособность системы, в Tarantool CDC осуществляется сбор двух групп метрик:
- Метрики Универсального Обработчика;
- Метрики Java/JVM.
Типы метрик:
counter- монотонно возрастающий счетчик значений. Не может быть уменьшен, но может быть сброшен до 0.gauge- изменяющееся значение. Может как увеличиваться, так и уменьшаться.histogram- распределение значений по заранее определенным группам (buckets).summary- агрегация гистограмм. Используется в случаях, когда невозможно заранее выделить группы, по которым необходимо распределить значение.
- Тип метрики: summary
- Описание: Время, проведенное событием в очереди самого коннектора.
- Ожидаемое поведение метрики: Колеблется вокруг некоторого значения.
- Аномальное поведение метрики: Значение индикатора возрастает.
- Дата последнего обновления: 2026-03-31
- Тип метрики: summary
- Описание: Время, потраченное на перенос события из Журнала Упреждающей Записи (ЖУЗ/WAL) Источника данных до коннектора к Источнику.
- Ожидаемое поведение метрики: Колеблется вокруг некоторого значения.
- Аномальное поведение метрики: Значение индикатора возрастает.
- Дата последнего обновления: 2026-03-31
- Тип метрики: counter
- Описание: Количество ошибок, возникающих при работе Универсального Обработчика.
- Ожидаемое поведение метрики: Счетчик не увеличивается.
- Аномальное поведение метрики: Скорость возрастания счетчика ненулевая.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Текущее количество подтвержденных контрольных точек.
- Ожидаемое поведение метрики: Колеблется вокруг некоторого значения.
- Аномальное поведение метрики: Значение индикатора возрастает или постоянно равно
0. - Дата последнего обновления: 2026-03-31
- Тип метрики: counter
- Описание: Количество успешно записанных контрольных точек. Счетчик увеличивается на количество успешно подтвержденных сообщений каждый раз при успешной записи контрольных точек.
- Ожидаемое поведение метрики: Монотонно возрастает. Если брать скорость возрастания метрики на интервале бОльшем или равном периоду записи контрольных точек и сравнить со скоростью возрастания количества записанных событий в очередь на том же интервале, то эти метрики должны совпадать.
- Аномальное поведение метрики: Счетчик не возрастает.
- Дата последнего обновления: 2026-03-31
- Тип метрики: counter
- Описание: Количество ошибок записи контрольных точек. При возникновенни ошибки при записи пакета контрольных точек, счетчик ошибок увеличивается на количество точек, которые не были записаны из за ошибки.
- Ожидаемое поведение метрики: Равен
0или не возрастает. - Аномальное поведение метрики: Счетчик возрастает.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Текущее количество неподтвержденных контрольных точек.
- Ожидаемое поведение метрики: Колеблется вокруг некоторого значения.
- Аномальное поведение метрики: Значение индикатора возрастает.
- Дата последнего обновления: 2026-03-31
- Тип метрики: histogram
- Описание: Время, затраченное на выполнение операций чтения данных из коннектора к Источнику данных. Метрика замеряется на уровне приложения.
- Метки:
- quantile:
0.5;0.95;0.99.
- quantile:
- Ожидаемое поведение метрики: Время чтения колеблется вокруг некоторого значения, в небольших пределах. Время чтения не должно превышать расчетных показателей больше, чем на некоторый процент. При увеличении размера сообщений время чтения также может увеличиваться.
- Аномальное поведение метрики: Возрастание без существенного изменения размера сообщений может указывать на следующие
причины:
- Деградация работы Источника.
- Деградация сети на участке между Источником данных и коннектором к Источнику.
- Проблемы коннектора к Источнику данных.
- Перегрузка процессора.
- Утечка, переполнение памяти.
- Дата последнего обновления: 2026-03-31
- Тип метрики: counter
- Описание: Количество событий, прошедших через Универсальный Обработчик, с детализацией по статусу.
- Метки:
- status:
received- Количество событий, полученных из Источника данных.retried- Количество событий, которые потребовали повторной попытки отправки.sent- Количество событий, успешно записанных в Приемник данных.
- status:
- Ожидаемое поведение метрики: Скорость возрастания счетчика стабильна или увеличивается. При небольшом или отсутствующем
потоке репликации, снижении потока сообщений в Приемник данных скорость возрастания счетчика близка к или равна
0на некоторых отрезках времени. - Аномальное поведение метрики: Скорость возрастания счетчика уменьшается или счетчик не увеличивается, причины:
- Ошибки получения данных.
- Увеличение времени чтения или записи данных.
- Деградация сети на участке между коннектором к Приемнику и Приемникмом данных.
- Перегрузка процессора.
- Утечка, переполнение памяти.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Индикатор активности процесса с соответствующими значениями тегов
connectorиtype. Значение этой метрики всегда равно1, когда работает процесс. Суммированием этой метрики можно посчитать количество работающих экземпляров. Тегconnectorобозначает загруженный данным процессом KafkaConnect-процессор. - Метки:
- connector:
source- Информация о запущенном коннекторе к Источнику данных.sink- Информация о запущенном коннекторе к Приемнику данных.
- connector:
- Ожидаемое поведение метрики: Для отдельного экземпляра - значение равно
1. - Аномальное поведение метрики: Значение, равное
0или отсутствие этой метрики в выборке при запросе к системе мониторинга означает, что ни один из экземпляров системы не работает. - Дата последнего обновления: 2026-03-31
- Тип метрики: summary
- Описание: Общее время, затраченное на перенос события от Источника до Приемника данных.
- Ожидаемое поведение метрики: Колеблется вокруг некоторого значения.
- Аномальное поведение метрики: Значение индикатора возрастает.
- Дата последнего обновления: 2026-03-31
- Тип метрики: summary
- Описание: Время, затраченное на выполнение операции записи данных в Приемник данных.
- Метки:
- quantile:
0.5;0.95;0.99.
- quantile:
- Ожидаемое поведение метрики: Время записи колеблется вокруг некоторого значения, в небольших пределах. Время записи не должно превышать расчетных показателей больше, чем на некоторый процент. При увеличении размера сообщений время записи также может увеличиваться.
- Аномальное поведение метрики: Возрастание времени записи сообщений в Приемник может указывать на следующие причины:
- Деградация сети на участке между коннектором к Приемнику и Приемникмом данных.
- Деградация работы Приемника.
- Проблемы коннектора к Приемнику.
- Перегрузка процессора.
- Утечка, переполнение памяти.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Время запуска приложения.
- Метки:
- main_application_class:
io.tarantool.worker.*- Java класс.
- main_application_class:
- Ожидаемое поведение метрики: Время запуска колеблется в небольших пределах вокруг некоторого значения, не занимает существенный процент от общей работы Tarantool CDC.
- Аномальное поведение метрики: При стабильном возрастании времени запуска приложения, если процессы уходят на перезапуск, это является негативной динамикой. Означает перегруз системы.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Приблизительный процент использования процессора сборщиком мусора относительно использования процессора за предыдущий период или с начала запуска процесса.
- Ожидаемое поведение метрики: Показатель стабилен, его значение колеблется с небольшой амплитудой вокруг некоторого значения.
- Аномальное поведение метрики: Возрастание данного показателя является признаком проблем с использованием памяти и сборкой мусора.
- Дата последнего обновления: 2026-03-31
- Тип метрики: summary
- Описание: Время, затраченное на паузы для сборщика мусора.
- Ожидаемое поведение метрики: Значение показателя стабильно, уменьшается или колеблется с небольшой амплитудой вокруг некоторого значения.
- Аномальное поведение метрики: Возрастание затрачиваемого времени на сборку мусора указывает на проблемы с использованием памяти и сборкой мусора. Большие и частые паузы могут влиять на общую пропускную способность системы.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Максимальная пауза в работе сборщика мусора.
- Ожидаемое поведение метрики: Значение показателя стабильно, уменьшается или колеблется с небольшой амплитудой вокруг некоторого значения.
- Аномальное поведение метрики: Возрастание затрачиваемого времени на сборку мусора указывает на проблемы с использованием памяти и сборкой мусора. Большие и частые паузы могут влиять на общую пропускную способность системы.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Количество использованной памяти.
- Метки:
- area:
nonheap;heap.
- id:
CodeHeap;G1 Survivor Space;G1 Old Gen;G1 Eden Space;Metaspace;Compressed Class Space.
- area:
- Ожидаемое поведение метрики: При возрастании потока сообщений и их размера, может наблюдаться пропорциональное увеличение использования памяти. Использование памяти при стабильном потоке сообщений должно быть также стабильно.
- Аномальное поведение метрики: Увеличение использования памяти при отсутствии соответствующих изменений в потоке данных может указывать на утечку памяти.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Текущее количество потоков, запущенных в режиме daemon.
- Ожидаемое поведение метрики: Количество запущенных потоков в режиме daemon стабильно или колеблется в небольших пределах вокруг некоторого значения.
- Аномальное поведение метрики: Возрастание количества работающих потоков в режиме daemon может указывать на утечку ресурсов, перегрузку системы.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Текущее количество запущенных потоков в системе.
- Ожидаемое поведение метрики: Количество запущенных потоков стабильно или колеблется в небольших пределах вокруг некоторого значения.
- Аномальное поведение метрики: Возрастание количества работающих потоков может указывать на утечку ресурсов, перегрузку системы.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Максимальное количество потоков, когда либо работавших в системе одновременно с момента запуска или с сброса индикатора.
- Ожидаемое поведение метрики: Показатель может возрастать в начале работы Tarantool CDC, но в остальное время должен оставаться без изменений.
- Аномальное поведение метрики: Возрастание максимального количества работающих потоков может указывать на утечку ресурсов, перегрузку системы.
- Дата последнего обновления: 2026-03-31
- Тип метрики: counter
- Описание: Общее количество потоков в системе, запущенных и отработанных.
- Ожидаемое поведение метрики: Скорость возрастания метрики нулевая или близка к нулю.
- Аномальное поведение метрики: Возрастание общего количества потоков может указывать на утечку ресурсов, перегрузку системы.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Текущее количество потоков, запущенных Java-машиной.
- Метки:
- state:
blocked;runnable;waiting;terminated;timed-waiting;new.
- state:
- Ожидаемое поведение метрики: Количество потоков стабильно в течении длительного периода работы Tarantool CDC, или
колеблется с небольшой амплитудой вокруг некоторого значения. Количество потоков является примерной суммой
следующих показателей:
- Общие потоки Java Runtime.
- Потоки, запущенные коннекторами для обработки данных.
- Потоки сборщика мусора.
- Аномальное поведение метрики: Возрастание количества потоков может указывать на утечку ресурсов, перегрузку системы с последующим возрастанием нагрузки на сборщик мусора.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Текущее использование процессора Java-машиной.
- Ожидаемое поведение метрики: Низкий показатель использования процессора указывает на стабильную работу системы. Также снижение использования процессора может быть связано со снижением потока данных.
- Аномальное поведение метрики: Возрастание использования процессора может указывать на перегрузку системы.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Количество открытых файловых дескрипторов. Носит информационный характер. Может понадобиться при расследовании причин падений и ошибок, случающихся в компонентах.
- Ожидаемое поведение метрики: Количество открытых файловых дескрипторов (сетевых сокетов) стабильно в течение продолжительного времени работы Tarantool CDC или колеблется с незначительной амплитудой вокруг некоторого значения.
- Аномальное поведение метрики: Стабильное увеличение данной метрики является аномальным, указывает на утечку ресурсов, перегрузку системы.
- Дата последнего обновления: 2026-03-31
- Тип метрики: gauge
- Описание: Средняя загрузка системы в течение одной минуты. С точки зрения JVM это интерпретируется как количество готовых к исполнению объектов, запланированных к выполнению плюс количество таких объектов, выполняющихся на процессоре в данный момент. Временное окно - 1 минута.
- Ожидаемое поведение метрики: Показатель средней загрузки стабилен или колеблется в небольших пределах вокруг некоторого значения, или пропорционально соответствует потоку сообщений.
- Аномальное поведение метрики: Стабильное возрастание средней загрузки может указывать на утечку ресурсов, перегрузку системы.
- Дата последнего обновления: 2026-03-31