Получение сообщений через Kafka. Настройка подключения, пути передачи данных и сценария
Содержание
Экспериментальная функция
Представленная здесь функция находится на стадии разработки. См. «Поддержка экспериментальных функций».
Введение
В этой статье представлены инструкции по настройке подключения, пути передачи данных и сценария для обмена данными с внешней системой посредством шины сообщений Kafka.
Настройка обмена данными для шин MSMQ и RabbitMQ осуществляется аналогичным образом.
Определения
- Подключения к шинам сообщений Kafka, MSMQ и RabbitMQ используются для обмена данными между и с внешними системами.
- Для обмена данными требуется настроить подключение и путь передачи данных, который обеспечивает преобразование данных между системами.
- Kafka — это распределённая система обмена сообщениями с высокой пропускной способностью.
- Продюсер — это сервис, который записывает сообщения в топик Kafka для передачи потребителям.
- Брокер — это сервис, который хранит и обрабатывает сообщения в топиках Kafka. Брокеры объединены в кластер.
- Потребитель, консюмер — это сервис, который получает сообщения из топиков Kafka.
- Топик Kafka — это хранилище сообщений, объединённых общей бизнес-логикой. Например, в одном топике можно объединить сообщения о заказах, а в другом сообщения о бухгалтерских проводках. К одному топику могут обращаться несколько продюсеров и потребителей. Топик может быть разбит на разделы.
- Раздел, партиция — составная часть топика. Раздел состоит из сегментов — файловых журналов сообщений. Сообщения нумеруются последовательно, записываются в конец сегмента и хранятся неизменными. Разделы могут находиться в разных брокерах.
- При получении сообщений через шину Kafka Comindware Business Application Platform является потребителем, а при отправке — продюсером.
Прикладная задача
Необходимо настроить интеграцию с шиной Kafka, которая будет опрашивать топик CLIENT_ORDERS
, извлекать из него данные о заказах клиентов и создавать для каждого полученного заказа запись в шаблоне «Заказы» с заполненными кодом и суммой заказа.
Исходные данные
- Имеется приложение «Обработка заказов».
- В приложении имеется шаблон записи «Заказы» со следующими атрибутами:
-
Название атрибута Тип данных Код заказа Текст Сумма заказа Число - Имеется сервер Kafka, доступный по адресу
12.34.56.78:9092
Настройка подключения к Kafka
- Откройте страницу «Администрирование» — «Подключения».
- Создайте или настройте подключение к шине сообщений типа «Получение сообщений через Kafka»:
- Системное имя — введите уникальное имя из латинских букв и цифр, например
kafka_receive_messages_connection
- Отключить — установите этот флажок, если требуется деактивировать подключение.
- Описание — введите наглядное описание подключения.
- Запись в файловые журналы — выберите способ записи сведений о работе подключения:
- Полные сведения об обработке сообщения
- Только ошибки
- Отключить
- Список пар хост/порт (разделённых запятой), используемых для подключения к кластеру Kafka — введите адреса одного или нескольких узлов кластера Kafka, например
123.45.67.89:9092
. - Максимальный объем данных (в байтах), который брокеры должны возвращать по запросу сообщений — введите лимит для ответа каждого брокера, при превышении которого ответ будет содержать только первые сообщения, поместившиеся в лимит, а остальные сообщения будут отброшены.
- Количество байтов, которые необходимо попытаться получить для каждого топика-партиции в каждом запросе сообщений — введите объём данных для получения от Kafka, не меньше максимального размера сообщения, допустимого сервером Kafka. При превышении лимита сообщения не будут переданы.
- Тайм-аут (в миллисекундах) для запросов на стороне сервера Kafka — введите лимит времени обработки запроса сервером Kafka. При превышении лимита обмен данными будет прерван.
- Тайм-аут на стороне клиента (в миллисекундах) — введите лимит времени ожидания ответа от сервера Kafka. При превышении лимита сообщения не будут переданы.
- Временной интервал (в миллисекундах) для пакетной обработки сообщений, используемых при запросах сообщений — введите интервал опроса Kafka.
- Имя пользователя и пароль — введите логин и пароль для подключения к серверу Kafka.
- Системное имя — введите уникальное имя из латинских букв и цифр, например
Настройка пути передачи данных через Kafka
- Откройте страницу «Администрирование» — «Пути передачи данных».
- Создайте или настройте путь передачи данных для подключения к шине сообщений типа «Получение сообщений через Kafka».
- Настройте параметры на вкладке «Основные свойства»:
- Подключение — выберите ранее настроенное подключение к шине Kafka:
kafka_receive_messages_connection
- Системное имя — введите уникальное имя из латинских букв и цифр, например
kafka_receive_messages_route
- Отключить — установите этот флажок, если требуется деактивировать путь передачи данных.
- Описание — введите наглядное описание пути передачи данных.
- Номер шины данных — выберите номер потока обработки сообщений, например 0. Если одновременно используется много путей передачи данных через Kafka, для распределения нагрузки может потребоваться выбрать для них разные номера шины данных.
- Подключение — выберите ранее настроенное подключение к шине Kafka:
- Настройте параметры на вкладке «Атрибуты сообщений»:
- Тип сообщения — выберите тип «Получение сообщений через Kafka» (в соответствии с типом пути передачи данных).
- Запрос — с помощью кнопки «Добавить» сформируйте структуру атрибутов, соответствующую структуре сообщения в формате XML/JSON. Значения этих атрибутов будут помещены в переменную и обработаны с помощью сценария, срабатывающего при наступлении события «Получение сообщения»:
- Системное имя — введите имя, совпадающее с именем поля в XML/JSON с точностью до строчных и прописных букв.
- Тип — выберите тип атрибута, соответствующий типу поля в XML/JSON.
- Описание — введите наглядное описание атрибута.
- Обязательный — установите этот флажок, если атрибут должен присутствовать в сообщении.
- Не пустой — установите этот флажок, если атрибут должен иметь значение.
- Массив — установите этот флажок, если атрибут может содержать список значений.
Примечание
Чтобы добавить дочерний атрибут, установите флажок в первом столбце таблицы у родительского атрибута и нажмите кнопку «Добавить».
Пример передачи массива объектов
- Сообщение содержит массив объектов
orders
в формате JSON:{
"orders": [
{
"code": "12-A",
"amount": 123
},
{
"code": "34-B"
"amount": 345
},
{
"code": "56-C",
"amount": 678
}
]
}
- Для получения данных из приведённого выше сообщения создайте следующие атрибуты:
Родительский атрибут Системное имя Тип данных Массив orders Объект Флажок установлен orders code Строка orders amount Число
- Сообщение содержит массив объектов
- Ответ и Ответ с ошибкой — аналогично запросу, сформируйте структуру атрибутов для отправки серверу Kafka ответа на полученный запрос. Значения этим атрибутам можно присвоить с помощью сценария, срабатывающего при наступлении события «Получение сообщения».
- Настройте параметры на вкладке «Интеграция»:
- Очередь — введите название топика Kafka, который необходимо прослушивать, например
CLIENT_ORDERS
. - Тип содержимого — выберите формат передачи данных:
- XML
- JSON
- Уникальная текстовая строка — укажите название группы потребителей. Например, введите имя экземпляра ПО Comindware Business Application Platform. Название группы потребителей служит для отслеживания новых сообщений в топике. Не назначайте это название группы другим потребителям, в противном случае будет утрачен прогресс считывания сообщений из топика.
- Очередь — введите название топика Kafka, который необходимо прослушивать, например
Настройка сценария
Для получения сообщений через шину Kafka и передачи данных в шаблон записи требуется настроить сценарий, срабатывающий при поступлении сообщения из Kafka.
- На странице администрирования приложения выберите пункт «Сценарии».
- Создайте сценарий:
- Название: Получение данных из Kafka
- Системное имя: заполняется автоматически.
- Контекст выполнения: От инициатора
- Статус: на время настройки триггера можно выбрать «Приостановлен», чтобы сценарий не срабатывал без необходимости. После настройки сценария, установите статус «Активен», чтобы сценарий обрабатывал сообщения, появляющиеся в топике Kafka.
- Отобразится конструктор сценария.
- Нажмите заголовок события «Нажатие кнопки».
- Настройте свойства события:
- Тип: Получение сообщения
- Контекстный шаблон: Заказы
- Подключение: kafka_receive_messages_connection
- Путь передачи данных: kafka_receive_messages_connection
- Имя переменной: kafka_message — в этой переменной будет храниться полученное сообщение.
- После события «Получение сообщения» создайте и настройте действие «Повторять по количеству объектов»:
- Переменная: order
- Атрибут или выражение для поиска объектов: формула
На каждой итерации цикла в переменную$$kafka_message->orders
order
будет помещаться объект из полученного в сообщении от Kafka массиваorders
.
- Внутри действия «Повторять по количеству объектов» создайте действие «Создать запись» с целевым шаблоном «Заказы».
- Внутри действия «Создать запись» создайте и настройте действие «Изменить значение атрибутов» следующим образом:
Атрибут Операция со значениями Значение Код заказа Заменить Формула: $$order->code
Сумма заказа Заменить Формула: $$order->amount
Создание топика и тестирование работы интеграции
- С помощью скрипта продюсера Kafka инициализируйте топик, например
CLIENT_ORDERS
:
Здесьsudo -i
cd /usr/share/kafka/bin
bash kafka-console-producer.sh --bootstrap-server 12.34.56.78:9092 --topic CLIENT_ORDER12.34.56/78:9092
— IP-адрес сервера Kafka. - В терминале отобразится запрос сообщения.
- Каждая введённая строка будет отправлена при нажатии клавиши Enter.
- Отправьте в топик сообщение с данными заказов, например, в формате JSON:
{ "orders": [ { "code": "12-A", "amount": 123 }, { "code": "34-B" "amount": 345 }, { "code": "56-C", "amount": 678 } ] }
- Продюсер отправит введённое сообщение в топик Kafka.
- Чтобы завершить работу продюсера нажмите в терминале клавиши Ctrl+C.
- Откройте шаблон записи «Заказы» и нажмите кнопку «Перейти к экземплярам».
- В шаблоне должны быть созданы записи, заполненные данными заказов из сообщения Kafka.
Просмотр журнала работы сценария
- Откройте страницу «Журналы событий».
- Выберите вкладку «Трассировка событий».
- Дважды нажмите событие сценария «Получение данных из Kafka».
- Отобразится цепочка событий со сведениями обо всех шагах сценария.
Связанные статьи
Эта статья была полезна 2 чел.