Перейти к содержанию

Статья для предыдущей поддерживаемой версии ПО — 4.7!

Текущая рекомендованная версия — Comindware Platform 5.0. См. документацию к версии 5.0.

Получение сообщений через Kafka. Настройка подключения, пути передачи данных и сценария

Экспериментальная функция

Представленная здесь функция находится на стадии разработки. См. «Поддержка экспериментальных функций».

Введение

В этой статье представлены инструкции по настройке подключения, пути передачи данных и сценария для обмена данными с внешней системой посредством шины сообщений Kafka.

Настройка обмена данными для шин MSMQ и RabbitMQ осуществляется аналогичным образом.

Определения

  • Подключения к шинам сообщений Kafka, MSMQ и RabbitMQ используются для обмена данными между  и с внешними системами. 
  • Для обмена данными требуется настроить подключение и путь передачи данных, который обеспечивает преобразование данных между системами.
  • Kafka — это распределённая система обмена сообщениями с высокой пропускной способностью.
  • Продюсер — это сервис, который записывает сообщения в топик Kafka для передачи потребителям.
  • Брокер — это сервис, который хранит и обрабатывает сообщения в топиках Kafka. Брокеры объединены в кластер.
  • Потребитель, консюмер — это сервис, который получает сообщения из топиков Kafka.
  • Топик Kafka — это хранилище сообщений, объединённых общей бизнес-логикой. Например, в одном топике можно объединить сообщения о заказах, а в другом сообщения о бухгалтерских проводках. К одному топику могут обращаться несколько продюсеров и потребителей. Топик может быть разбит на разделы.
  • Раздел, партиция — составная часть топика. Раздел состоит из сегментов — файловых журналов сообщений. Сообщения нумеруются последовательно, записываются в конец сегмента и хранятся неизменными. Разделы могут находиться в разных брокерах.
  • При получении сообщений через шину Kafka Comindware Business Application Platform является потребителем, а при отправке — продюсером.

Прикладная задача

Необходимо настроить интеграцию с шиной Kafka, которая будет опрашивать топик CLIENT_ORDERS, извлекать из него данные о заказах клиентов и создавать для каждого полученного заказа запись в шаблоне «Заказы» с заполненными кодом и суммой заказа.

Исходные данные

  • Имеется приложение «Обработка заказов».
  • В приложении имеется шаблон записи «Заказы» со следующими атрибутами:
  • Название атрибута Тип данных
    Код заказа Текст
    Сумма заказа Число
  • Имеется сервер Kafka, доступный по адресу 12.34.56.78:9092

Настройка подключения к Kafka

  1. Откройте страницу «Администрирование» — «Подключения».
  2. Создайте или настройте подключение к шине сообщений типа «Получение сообщений через Kafka»:
    • Системное имя — введите уникальное имя из латинских букв и цифр, например kafka_receive_messages_connection
    • Отключить — установите этот флажок, если требуется деактивировать подключение.
    • Описание — введите наглядное описание подключения.
    • Запись в файловые журналы — выберите способ записи сведений о работе подключения:
      • Полные сведения об обработке сообщения
      • Только ошибки
      • Отключить
    • Список пар хост/порт (разделённых запятой), используемых для подключения к кластеру Kafka — введите адреса одного или нескольких узлов кластера Kafka, например 123.45.67.89:9092.
    • Максимальный объем данных (в байтах), который брокеры должны возвращать по запросу сообщений — введите лимит для ответа каждого брокера, при превышении которого ответ будет содержать только первые сообщения, поместившиеся в лимит, а остальные сообщения будут отброшены.
    • Количество байтов, которые необходимо попытаться получить для каждого топика-партиции в каждом запросе сообщений — введите объём данных для получения от Kafka, не меньше максимального размера сообщения, допустимого сервером Kafka. При превышении лимита сообщения не будут переданы.
    • Тайм-аут (в миллисекундах) для запросов на стороне сервера Kafka — введите лимит времени обработки запроса сервером Kafka. При превышении лимита обмен данными будет прерван.
    • Тайм-аут на стороне клиента (в миллисекундах) — введите лимит времени ожидания ответа от сервера Kafka. При превышении лимита сообщения не будут переданы.
    • Временной интервал (в миллисекундах) для пакетной обработки сообщений, используемых при запросах сообщений — введите интервал опроса Kafka.
    • Имя пользователя и пароль — введите логин и пароль для подключения к серверу Kafka.

Настройка пути передачи данных через Kafka

  1. Откройте страницу «Администрирование» — «Пути передачи данных».
  2. Создайте или настройте путь передачи данных для подключения к шине сообщений типа «Получение сообщений через Kafka».
  3. Настройте параметры на вкладке «Основные свойства»:
    • Подключение — выберите ранее настроенное подключение к шине Kafkakafka_receive_messages_connection
    • Системное имя — введите уникальное имя из латинских букв и цифр, например kafka_receive_messages_route
    • Отключить — установите этот флажок, если требуется деактивировать путь передачи данных.
    • Описание — введите наглядное описание пути передачи данных.
    • Номер шины данных — выберите номер потока обработки сообщений, например 0. Если одновременно используется много путей передачи данных через Kafka, для распределения нагрузки может потребоваться выбрать для них разные номера шины данных.
  4. Настройте параметры на вкладке «Атрибуты сообщений»:
    • Тип сообщения — выберите тип «Получение сообщений через Kafka» (в соответствии с типом пути передачи данных).
    • Запрос — с помощью кнопки «Добавить» сформируйте структуру атрибутов, соответствующую структуре сообщения в формате XML/JSON. Значения этих атрибутов будут помещены в переменную и обработаны с помощью сценария, срабатывающего при наступлении события «Получение сообщения»:
      • Системное имя — введите имя, совпадающее с именем поля в XML/JSON с точностью до строчных и прописных букв.
      • Тип — выберите тип атрибута, соответствующий типу поля в XML/JSON.
      • Описание — введите наглядное описание атрибута.
      • Обязательный — установите этот флажок, если атрибут должен присутствовать в сообщении.
      • Не пустой — установите этот флажок, если атрибут должен иметь значение.
      • Массив — установите этот флажок, если атрибут может содержать список значений.

        Примечание

        Чтобы добавить дочерний атрибут, установите флажок в первом столбце таблицы у родительского атрибута и нажмите кнопку «Добавить».

        Пример передачи массива объектов

        • Сообщение содержит массив объектов orders в формате JSON:
          {  
            "orders": [
              {
              "code": "12-A",
              "amount": 123
              },
              {
              "code": "34-B"
              "amount": 345
              },
              {
              "code": "56-C",
              "amount": 678 
              }
            ]
          }
        • Для получения данных из приведённого выше сообщения создайте следующие атрибуты:
          Родительский атрибут Системное имя Тип данных Массив
            orders Объект Флажок установлен
          orders code Строка  
          orders amount Число  
    • Ответ и Ответ с ошибкой — аналогично запросу, сформируйте структуру атрибутов для отправки серверу Kafka ответа на полученный запрос. Значения этим атрибутам можно присвоить с помощью сценария, срабатывающего при наступлении события «Получение сообщения».
  5. Настройте параметры на вкладке «Интеграция»:
    • Очередь — введите название топика Kafka, который необходимо прослушивать, например CLIENT_ORDERS.
    • Тип содержимого — выберите формат передачи данных:
      • XML
      • JSON
    • Уникальная текстовая строка — укажите название группы потребителей. Например, введите имя экземпляра ПО Comindware Business Application Platform. Название группы потребителей служит для отслеживания новых сообщений в топике. Не назначайте это название группы другим потребителям, в противном случае будет утрачен прогресс считывания сообщений из топика.

Настройка сценария

Для получения сообщений через шину Kafka и передачи данных в шаблон записи требуется настроить сценарий, срабатывающий при поступлении сообщения из Kafka.

  1. На странице администрирования приложения выберите пункт «Сценарии».
  2. Создайте сценарий:
    • НазваниеПолучение данных из Kafka
    • Системное имя: заполняется автоматически.
    • Контекст выполненияОт инициатора
    • Статус: на время настройки триггера можно выбрать «Приостановлен», чтобы сценарий не срабатывал без необходимости. После настройки сценария, установите статус «Активен», чтобы сценарий обрабатывал сообщения, появляющиеся в топике Kafka.
  3. Отобразится конструктор сценария.
  4. Нажмите заголовок события «Нажатие кнопки».
  5. Настройте свойства события:
    • Тип: Получение сообщения
    • Контекстный шаблон: Заказы
    • Подключение: kafka_receive_messages_connection
    • Путь передачи данных: kafka_receive_messages_connection
    • Имя переменной: kafka_message — в этой переменной будет храниться полученное сообщение.
  6. После события «Получение сообщения» создайте и настройте действие «Повторять по количеству объектов»:
    • Переменная: order
    • Атрибут или выражение для поиска объектов: формула
      $$kafka_message->orders
      На каждой итерации цикла в переменную order будет помещаться объект из полученного в сообщении от Kafka массива orders.
  7. Внутри действия «Повторять по количеству объектов» создайте действие «Создать запись» с целевым шаблоном «Заказы».
  8. Внутри действия «Создать запись» создайте и настройте действие «Изменить значение атрибутов» следующим образом:
    Атрибут Операция со значениями Значение
    Код заказа Заменить Формула: $$order->code
    Сумма заказа Заменить Формула: $$order->amount

Настроенный сценарий для получения данных через шину Kafka

Настроенный сценарий для получения данных через шину Kafka

Создание топика и тестирование работы интеграции

  1. С помощью скрипта продюсера Kafka инициализируйте топик, например CLIENT_ORDERS:
    sudo -i
    cd /usr/share/kafka/bin
    bash kafka-console-producer.sh --bootstrap-server 12.34.56.78:9092 --topic CLIENT_ORDER
    Здесь 12.34.56/78:9092 — IP-адрес сервера Kafka.
  2. В терминале отобразится запрос сообщения.
  3. Каждая введённая строка будет отправлена при нажатии клавиши Enter.
  4. Отправьте в топик сообщение с данными заказов, например, в формате JSON:
    { "orders": [ { "code": "12-A", "amount": 123 }, { "code": "34-B" "amount": 345 }, { "code": "56-C", "amount": 678 } ] }
  5. Продюсер отправит введённое сообщение в топик Kafka.
  6. Чтобы завершить работу продюсера нажмите в терминале клавиши Ctrl+C.
  7. Откройте шаблон записи «Заказы» и нажмите кнопку «Перейти к экземплярам».
  8. В шаблоне должны быть созданы записи, заполненные данными заказов из сообщения Kafka.

Просмотр журнала работы сценария

  1. Откройте страницу «Журналы событий».
  2. Выберите вкладку «Трассировка событий».
  3. Дважды нажмите событие сценария «Получение данных из Kafka».
  4. Отобразится цепочка событий со сведениями обо всех шагах сценария.

Связанные статьи