Перейти к содержанию

Получение сообщений через Kafka. Настройка подключения, пути передачи данных и сценария

Экспериментальная функция

Представленная здесь функция находится на стадии разработки. См. «Поддержка экспериментальных функций».

Введение

Здесь представлены инструкции по настройке подключения, пути передачи данных и сценария для обмена данными с внешней системой посредством шины сообщений Apache Kafka.

Настройка обмена данными для шин MSMQ и RabbitMQ осуществляется аналогичным образом.

Определения

  • Подключения к шинам сообщений Apache Kafka, MSMQ и RabbitMQ используются для обмена данными между  и с внешними системами. 
  • Для обмена данными требуется настроить подключение и путь передачи данных, который обеспечивает преобразование данных между системами.
  • Apache Kafka — это распределённая система обмена сообщениями с высокой пропускной способностью.
  • Продюсер — это сервис, который записывает сообщения в топик Apache Kafka для передачи потребителям.
  • Брокер — это сервис, который хранит и обрабатывает сообщения в топиках Apache Kafka. Брокеры объединены в кластер.
  • Потребительконсюмер — это сервис, который получает сообщения из топиков Apache Kafka.
  • Топик Apache Kafka — это хранилище сообщений, объединённых общей бизнес-логикой. Например, в одном топике можно объединить сообщения о заказах, а в другом сообщения о бухгалтерских проводках. К одному топику могут обращаться несколько продюсеров и потребителей. Топик может быть разбит на разделы.

  • Разделпартиция — составная часть топика. Раздел состоит из сегментов — файловых журналов сообщений. Сообщения нумеруются последовательно, записываются в конец сегмента и хранятся неизменными. Разделы могут находиться в разных брокерах.

  • При получении сообщений через шину Apache Kafka Comindware Platform является потребителем, а при отправке — продюсером.

Прикладная задача

Необходимо настроить интеграцию с шиной Apache Kafka, которая будет опрашивать топик CLIENT_ORDERS, извлекать из него данные о заказах клиентов и создавать для каждого полученного заказа запись в шаблоне «Заказы» с заполненными кодом и суммой заказа.

Исходные данные

  • Имеется приложение «Обработка заказов».
  • В приложении имеется шаблон записи «Заказы» со следующими атрибутами:

    Название атрибута Тип данных
    Код заказа Текст
    Сумма заказа Число
  • Имеется сервер Apache Kafka, доступный по адресу 12.34.56.78:9092

Настройка подключения к Apache Kafka

  1. Откройте страницу «Администрирование» — «Подключения».
  2. Создайте или настройте подключение к шине сообщений типа «Получение сообщений через Kafka»:

    • Системное имя — введите уникальное имя из латинских букв и цифр, например kafka_receive_messages_connection
    • Отключить — установите этот флажок, если требуется деактивировать подключение.
    • Описание — введите наглядное описание подключения.
    • Запись в файловые журналы — выберите способ записи сведений о работе подключения:

      • Полные сведения об обработке сообщения
      • Только ошибки
      • Отключить
    • Список пар хост/порт (разделённых запятой), используемых для подключения к кластеру Kafka — введите адреса одного или нескольких узлов кластера Apache Kafka, например 12.34.56.78:9092.

    • Максимальный объем данных (в байтах), который брокеры должны возвращать по запросу сообщений — введите лимит для ответа каждого брокера, при превышении которого ответ будет содержать только первые сообщения, поместившиеся в лимит, а остальные сообщения будут отброшены.
    • Количество байтов, которые необходимо попытаться получить для каждого топика-партиции в каждом запросе сообщений — введите объём данных для получения от Apache Kafka, не меньше максимального размера сообщения, допустимого сервером Apache Kafka. При превышении лимита сообщения не будут переданы.
    • Тайм-аут (в миллисекундах) для запросов на стороне сервера Kafka — введите лимит времени обработки запроса сервером Apache Kafka. При превышении лимита обмен данными будет прерван.
    • Тайм-аут на стороне клиента (в миллисекундах) — введите лимит времени ожидания ответа от сервера Apache Kafka. При превышении лимита сообщения не будут переданы.
    • Временной интервал (в миллисекундах) для пакетной обработки сообщений, используемых при запросах сообщений — введите интервал опроса Apache Kafka.
    • Имя пользователя и пароль — введите логин и пароль для подключения к серверу Apache Kafka.

Настройка пути передачи данных через Apache Kafka

  1. Откройте страницу «Администрирование» — «Пути передачи данных».
  2. Создайте или настройте путь передачи данных для подключения к шине сообщений типа «Получение сообщений через Kafka».
  3. Настройте параметры на вкладке «Основные свойства»:

    • Подключение — выберите ранее настроенное подключение к шине Apache Kafkakafka_receive_messages_connection
    • Системное имя — введите уникальное имя из латинских букв и цифр, например kafka_receive_messages_route
    • Отключить — установите этот флажок, если требуется деактивировать путь передачи данных.
    • Описание — введите наглядное описание пути передачи данных.
    • Номер шины данных — выберите номер потока обработки сообщений, например 0. Если одновременно используется много путей передачи данных через Apache Kafka, для распределения нагрузки может потребоваться выбрать для них разные номера шины данных.
  4. Настройте параметры на вкладке «Атрибуты сообщений»:

    • Тип сообщения — выберите тип «Получение сообщений через Kafka» (в соответствии с типом пути передачи данных).
    • Запрос — с помощью кнопки «Добавить» сформируйте структуру атрибутов, соответствующую структуре сообщения в формате XML/JSON. Значения этих атрибутов будут помещены в переменную и обработаны с помощью сценария, срабатывающего при наступлении события «Получение сообщения»:

      • Системное имя — введите имя, совпадающее с именем поля в XML/JSON с точностью до строчных и прописных букв.
      • Тип — выберите тип атрибута, соответствующий типу поля в XML/JSON.
      • Описание — введите наглядное описание атрибута.
      • Обязательный — установите этот флажок, если атрибут должен присутствовать в сообщении.
      • Не пустой — установите этот флажок, если атрибут должен иметь значение.
      • Массив — установите этот флажок, если атрибут может содержать список значений.

      Примечание

      Чтобы добавить дочерний атрибут, установите флажок в первом столбце таблицы у родительского атрибута и нажмите кнопку «Добавить».

      Пример передачи массива объектов

      • Сообщение содержит массив объектов orders в формате JSON:

        { 
        "orders": [
        {
        "code": "12-A",
        "amount": 123
        },
        {
        "code": "34-B",
        "amount": 345
        },
        {
        "code": "56-C",
        "amount": 678
        }
        ]
        }
      • Для получения данных из приведённого выше сообщения создайте следующие атрибуты:

        Родительский атрибут Системное имя Тип данных Массив
        orders Объект Флажок установлен
        orders code Строка
        orders amount Число
    • Ответ и Ответ с ошибкой — аналогично запросу, сформируйте структуру атрибутов для отправки серверу Apache Kafka ответа на полученный запрос. Значения этим атрибутам можно присвоить с помощью сценария, срабатывающего при наступлении события «Получение сообщения».

  5. Настройте параметры на вкладке «Интеграция»:

    • Очередь — введите название топика Apache Kafka, который необходимо прослушивать, например CLIENT_ORDERS.
    • Тип содержимого — выберите формат передачи данных:
      • XML
      • JSON
    • Уникальная текстовая строка — укажите название группы потребителей. Например, введите имя экземпляра ПО Comindware Platform. Название группы потребителей служит для отслеживания новых сообщений в топике. Не назначайте это название группы другим потребителям, в противном случае будет утрачен прогресс считывания сообщений из топика.

Настройка сценария

Для получения сообщений через шину Apache Kafka и передачи данных в шаблон записи требуется настроить сценарий, срабатывающий при поступлении сообщения из Apache Kafka.

  1. На странице администрирования приложения выберите пункт «Сценарии».
  2. Создайте сценарий:

    • НазваниеПолучение данных из Apache Kafka
    • Системное имя: заполняется автоматически.
    • Контекст выполненияОт инициатора
    • Статус: на время настройки триггера можно выбрать «Приостановлен», чтобы сценарий не срабатывал без необходимости. После настройки сценария, установите статус «Активен», чтобы сценарий обрабатывал сообщения, появляющиеся в топике Apache Kafka.
  3. Отобразится конструктор сценария.

  4. Нажмите заголовок события «Нажатие кнопки».
  5. Настройте свойства события:

    • Тип: Получение сообщения
    • Контекстный шаблон: Заказы
    • Подключение: kafka_receive_messages_connection
    • Путь передачи данных: kafka_receive_messages_connection
    • Имя переменной: kafka_message — в этой переменной будет храниться полученное сообщение.
  6. После события «Получение сообщения» создайте и настройте действие «Повторять по количеству объектов»:

    • Переменная: order
    • Атрибут или выражение для поиска объектов: формула

      $$kafka_message->orders 

      На каждой итерации цикла в переменную order будет помещаться объект из полученного в сообщении от Apache Kafka массива orders.

  7. Внутри действия «Повторять по количеству объектов» создайте действие «Создать запись» с целевым шаблоном «Заказы».

  8. Внутри действия «Создать запись» создайте действие «Изменить значение атрибутов».
  9. В действии «Изменить значения атрибутов» на вкладке «Дополнительно» установите флажок «Сбрасывать кэш значений».
  10. На вкладке «Основные» настройте следующие атрибуты:

    Атрибут Операция со значениями Значение
    Код заказа Заменить Формула: $$order->code
    Сумма заказа Заменить Формула: $$order->amount

Настроенный сценарий для получения данных через шину Apache Kafka

Настроенный сценарий для получения данных через шину Apache Kafka

Создание топика и тестирование работы интеграции

  1. С помощью скрипта продюсера Apache Kafka инициализируйте топик, например CLIENT_ORDERS:

    sudo -i 
    cd /usr/share/kafka/bin
    bash kafka-console-producer.sh --bootstrap-server 12.34.56.78:9092 --topic CLIENT_ORDERS

    Здесь 12.34.56.78:9092 — IP-адрес сервера Apache Kafka.

  2. В терминале отобразится запрос сообщения.

  3. Каждая введённая строка будет отправлена при нажатии клавиши Ввод.
  4. Отправьте в топик сообщение с данными заказов, например, в формате JSON:

    { "orders": [ { "code": "12-A", "amount": 123 }, { "code": "34-B", "amount": 345 }, { "code": "56-C", "amount": 678 } ] } 
  5. Продюсер отправит введённое сообщение в топик Apache Kafka.

  6. Чтобы завершить работу продюсера нажмите в терминале клавиши Ctrl+C.
  7. Откройте шаблон записи «Заказы» и нажмите кнопку «Перейти к экземплярам».
  8. В шаблоне должны быть созданы записи, заполненные данными заказов из сообщения Apache Kafka.

Просмотр журнала работы сценария

  1. Откройте страницу «Журналы событий».
  2. Выберите вкладку «Трассировка событий».
  3. Дважды нажмите событие сценария «Получение данных из Apache Kafka».
  4. Отобразится цепочка событий со сведениями обо всех шагах сценария.
К началу