Back to Manticoresearch

Синхронизация из Kafka

manual/russian/Integration/Kafka.md

25.9.021.7 KB
Original Source

Синхронизация из Kafka

ПРИМЕЧАНИЕ: эта функциональность требует Manticore Buddy. Если она не работает, убедитесь, что Buddy установлен.

Manticore поддерживает интеграцию с Apache Kafka для приема данных в реальном времени через источники Kafka и материализованные представления, что позволяет индексировать и искать данные в реальном времени. В настоящее время протестированы и поддерживаются apache/kafka версии 3.7.0-4.1.0.

Чтобы начать, вам необходимо:

  1. Определить источник: Указать топик Kafka, из которого Manticore Search будет читать сообщения. Эта настройка включает такие детали, как хост брокера, порт и имя топика.
  2. Настроить целевую таблицу: Выбрать таблицу Manticore реального времени для хранения входящих данных Kafka.
  3. Создать материализованное представление: Настроить материализованное представление (mv) для обработки преобразования данных и сопоставления из Kafka в целевую таблицу в Manticore Search. Здесь вы определите сопоставления полей, преобразования данных, а также любые фильтры или условия для входящего потока данных.

Источник

<!-- example kafka_source -->

Конфигурация source позволяет определить broker, список топиков, группу потребителей и структуру сообщений.

Схема

Определите схему, используя типы полей Manticore, такие как int, float, text, json и т.д.

sql
CREATE SOURCE <source name> [(column type, ...)] [source_options]

Все ключи схемы нечувствительны к регистру, то есть Products, products и PrOdUcTs обрабатываются одинаково. Все они преобразуются в нижний регистр.

Если имена ваших полей не соответствуют синтаксису имен полей, разрешенному в Manticore Search (например, если они содержат специальные символы или начинаются с цифр), вы должны определить сопоставление схемы. Например, $keyName или 123field являются допустимыми ключами в JSON, но не являются допустимыми именами полей в Manticore Search. Если вы попытаетесь использовать недопустимые имена полей без правильного сопоставления, Manticore вернет ошибку, и создание источника завершится неудачей.

Для обработки таких случаев используйте следующий синтаксис схемы для сопоставления недопустимых имен полей с допустимыми:

allowed_field_name 'original JSON key name with special symbols' type

Например:

sql
price_field '$price' float    -- maps JSON key '$price' to field 'price_field'
field_123 '123field' text     -- maps JSON key '123field' to field 'field_123'
<!-- intro -->
SQL:
<!-- request SQL -->
sql
CREATE SOURCE kafka
(id bigint, term text, abbrev '$abbrev' text, GlossDef json)
type='kafka'
broker_list='kafka:9092'
topic_list='my-data'
consumer_group='manticore'
num_consumers='2'
batch=50
<!-- response -->
Query OK, 2 rows affected (0.02 sec)
<!-- intro -->
JSON:
<!-- request JSON -->
JSON
POST /sql?mode=raw -d "CREATE SOURCE kafka (id bigint, term text, abbrev '$abbrev' text, GlossDef json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore' num_consumers='2' batch=50"
<!-- response JSON -->
JSON
[
  {
    "total": 2,
    "error": "",
    "warning": ""
  }
]
<!-- end -->

Опции

ОпцияДопустимые значенияОписание
typekafkaУстанавливает тип источника. В настоящее время поддерживается только kafka
broker_listхост:порт [, ...]Указывает URL-адреса брокеров Kafka
topic_listстрока [, ...]Список топиков Kafka для потребления
consumer_groupстрокаОпределяет группу потребителей Kafka, по умолчанию manticore.
num_consumersintКоличество потребителей для обработки сообщений.
partition_listint [, ...]Список разделов для чтения подробнее.
batchintКоличество сообщений для обработки перед переходом к следующей партии. По умолчанию 100; в противном случае оставшиеся сообщения обрабатываются по таймауту

Целевая таблица

<!-- example kafka_destination -->

Целевая таблица — это обычная таблица реального времени, в которой хранятся результаты обработки сообщений Kafka. Эта таблица должна быть определена в соответствии с требованиями схемы входящих данных и оптимизирована для потребностей производительности запросов вашего приложения. Подробнее о создании таблиц реального времени читайте здесь.

<!-- intro -->
SQL:
<!-- request SQL -->
sql
CREATE TABLE destination_kafka
(id bigint, name text, short_name text, received_at text, size multi);
<!-- response -->
Query OK, 0 rows affected (0.02 sec)
<!-- intro -->
JSON:
<!-- request JSON -->
JSON
POST /sql?mode=raw -d "CREATE TABLE destination_kafka (id bigint, name text, short_name text, received_at text, size multi)"
<!-- response JSON -->
JSON
[
  {
    "total": 0,
    "error": "",
    "warning": ""
  }
]
<!-- end -->

Материализованное представление

<!-- example kafka_mv -->

Материализованное представление позволяет преобразовывать данные из сообщений Kafka. Вы можете переименовывать поля, применять функции Manticore Search, а также выполнять сортировку, группировку и другие операции с данными.

Материализованное представление действует как запрос, который перемещает данные из источника Kafka в целевую таблицу, позволяя вам использовать синтаксис Manticore Search для настройки этих запросов. Убедитесь, что поля в select соответствуют полям в источнике.

CREATE MATERIALIZED VIEW <materialized view name>
TO <destination table name> AS
SELECT [column|function [as <new name>], ...] FROM <source name>
<!-- intro -->
SQL:
<!-- request SQL -->
sql
CREATE MATERIALIZED VIEW view_table
TO destination_kafka AS
SELECT id, term as name, abbrev as short_name,
       UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka

<!-- response -->
sql
Query OK, 2 rows affected (0.02 sec)
<!-- end -->

Данные передаются из Kafka в Manticore Search пакетами, которые очищаются после каждого запуска. Для вычислений между пакетами, таких как AVG, будьте осторожны, так как они могут работать не так, как ожидается, из-за обработки пакет за пакетом.

Сопоставление полей

Вот таблица сопоставления на основе приведенных выше примеров:

KafkaИсточникБуферМВНазначение
ididididid
termtermtermterm as namename
unnecessary_key который нас не интересует--
$abbrevabbrevabbrevabbrev as short_nameshort_name
---UTC_TIMESTAMP() as received_atreceived_at
GlossDefglossdefglossdefglossdef.size as sizesize

Просмотр списка

<!-- example kafka_listing -->

Чтобы просмотреть источники и материализованные представления в Manticore Search, используйте следующие команды:

  • SHOW SOURCES: Выводит список всех настроенных источников.
  • SHOW MVS: Выводит список всех материализованных представлений.
  • SHOW MV view_table: Показывает подробную информацию о конкретном материализованном представлении.
<!-- intro -->
SQL:
<!-- request SQL -->
sql
SHOW SOURCES
<!-- response -->
+-------+
| name  |
+-------+
| kafka |
+-------+
<!-- intro -->
JSON:
<!-- request JSON -->
JSON
POST /sql?mode=raw -d "SHOW SOURCES"
<!-- response JSON -->
JSON
[
  {
    "total": 1,
    "error": "",
    "warning": "",
    "columns": [
      {
        "name": {
          "type": "string"
        }
      }
    ],
    "data": [
      {
        "name": "kafka"
      }
    ]
  }
]
<!-- end --> <!-- example kafka_create_source --> <!-- intro -->
SQL:
<!-- request SQL -->
sql
SHOW SOURCE kafka;
<!-- response -->
+--------+-------------------------------------------------------------------+
| Source | Create Table                                                      |
+--------+-------------------------------------------------------------------+
| kafka  | CREATE SOURCE kafka                                               |
|        | (id bigint, term text, abbrev '$abbrev' text, GlossDef json)      |
|        | type='kafka'                                                      |
|        | broker_list='kafka:9092'                                          |
|        | topic_list='my-data'                                              |
|        | consumer_group='manticore'                                        |
|        | num_consumers='2'                                                 |
|        | batch=50                                                          |
+--------+-------------------------------------------------------------------+
<!-- intro -->
JSON:
<!-- request JSON -->
JSON
POST /sql?mode=raw -d "SHOW SOURCE kafka"
<!-- response JSON -->
JSON
[
  {
    "total": 1,
    "error": "",
    "warning": "",
    "columns": [
      {
        "Source": {
          "type": "string"
        }
      },
      {
        "Create Table": {
          "type": "string"
        }
      }
    ],
    "data": [
      {
        "Source": "kafka",
        "Create Table": "CREATE SOURCE kafka \n(id bigint, term text, abbrev '' text, GlossDef json)\ntype='kafka'\nbroker_list='kafka:9092'\ntopic_list='my-data'\nconsumer_group='manticore'\nnum_consumers='2'\n batch=50"
      }
    ]
  }
]
<!-- end --> <!-- example kafka_view --> <!-- intro -->
SQL:
<!-- request SQL -->
sql
SHOW MVS
<!-- response -->
+------------+
| name       |
+------------+
| view_table |
+------------+
<!-- intro -->
JSON:
<!-- request JSON -->
JSON
POST /sql?mode=raw -d "SHOW MVS"
<!-- response JSON -->
JSON
[
  {
    "total": 1,
    "error": "",
    "warning": "",
    "columns": [
      {
        "name": {
          "type": "string"
        }
      }
    ],
    "data": [
      {
        "name": "view_table"
      }
    ]
  }
]
<!-- end --> <!-- example kafka_show --> <!-- intro -->
SQL:
<!-- request SQL -->
sql
SHOW MV view_table
<!-- response -->
+------------+--------------------------------------------------------------------------------------------------------+-----------+
| View       | Create Table                                                                                           | suspended |
+------------+--------------------------------------------------------------------------------------------------------+-----------+
| view_table | CREATE MATERIALIZED VIEW view_table TO destination_kafka AS                                            | 0         |
|            | SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size   |           |
|            | FROM kafka                                                                                             |           |
+------------+--------------------------------------------------------------------------------------------------------+-----------+
<!-- intro -->
JSON:
<!-- request JSON -->
sql
POST /sql?mode=raw -d "SHOW MV view_table"
<!-- response JSON -->
JSON
[
  {
    "total": 1,
    "error": "",
    "warning": "",
    "columns": [
      {
        "View": {
          "type": "string"
        }
      },
      {
        "Create Table": {
          "type": "string"
        }
      },
      {
        "suspended": {
          "type": "string"
        }
      }
    ],
    "data": [
      {
        "View": "view_table",
        "Create Table": "CREATE MATERIALIZED VIEW view_table TO destination_kafka AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka",
        "suspended": 0
      }
    ]
  }
]
<!-- end -->

Изменение материализованных представлений

<!-- example mv_suspend -->

Вы можете приостановить потребление данных, изменив материализованные представления.

Если вы удалите source без удаления МП, он автоматически приостановится. После воссоздания источника снимите приостановку МП вручную с помощью команды ALTER.

В настоящее время можно изменять только материализованные представления. Чтобы изменить параметры source, удалите и создайте источник заново.

<!-- intro -->
SQL:
<!-- request SQL -->
sql
ALTER MATERIALIZED VIEW view_table suspended=1
<!-- response -->
sql
Query OK (0.02 sec)
<!-- intro -->
JSON:
<!-- request JSON -->
JSON
POST /sql?mode=raw -d "ALTER MATERIALIZED VIEW view_table suspended=1"
<!-- response JSON -->
JSON
[
  {
    "total": 2,
    "error": "",
    "warning": ""
  }
]
<!-- end -->

Шардинг с Kafka

Вы также можете указать partition_list для каждой темы Kafka. Одним из основных преимуществ такого подхода является возможность реализации шардинга вашей таблицы через Kafka. Для этого следует создать отдельную цепочку sourcematerialized viewdestination table для каждого шарда:

Источники:

sql
CREATE SOURCE kafka_p1 (id bigint, term text)
  type='kafka' broker_list='kafka:9092' topic_list='my-data'
  consumer_group='manticore' num_consumers='1' partition_list='0' batch=50;

CREATE SOURCE kafka_p2 (id bigint, term text)
  type='kafka' broker_list='kafka:9092' topic_list='my-data'
  consumer_group='manticore' num_consumers='1' partition_list='1' batch=50;

Таблицы назначения:

sql
CREATE TABLE destination_shard_1 (id bigint, name text);
CREATE TABLE destination_shard_2 (id bigint, name text);

Материализованные представления:

sql
CREATE MATERIALIZED VIEW mv_1 TO destination_shard_1 AS SELECT id, term AS name FROM kafka_p1;
CREATE MATERIALIZED VIEW mv_2 TO destination_shard_2 AS SELECT id, term AS name FROM kafka_p2;

⚠️ Важные замечания:

  • В этой конфигурации ребалансировка должна управляться вручную.
  • По умолчанию Kafka не распределяет сообщения по схеме round-robin.
  • Чтобы добиться распределения, похожего на round-robin при отправке данных, убедитесь, что ваш Kafka producer настроен с:
    • parse.key=true
    • key.separator={your_delimiter}

Иначе Kafka будет распределять сообщения по своим внутренним правилам, что может привести к неравномерному разделению по партициям.

Устранение неполадок

Дублирование записей

Фиксация смещений Kafka происходит после каждой партии или при истечении времени обработки. Если процесс неожиданно остановится во время выполнения запроса материализованного представления, могут появиться дубликаты записей. Чтобы избежать этого, включите в схему поле id, позволяющее Manticore Search предотвращать дублирование в таблице.

Как это работает внутри

  • Инициализация работника: После настройки источника и материализованного представления Manticore Search создает выделенного работника для обработки поступления данных из Kafka.
  • Отображение сообщений: Сообщения сопоставляются согласно схеме конфигурации источника, преобразуясь в структурированный формат.
  • Группировка в партии: Сообщения группируются в партии для эффективной обработки. Размер партии можно настроить в зависимости от ваших требований к производительности и задержкам.
  • Буферизация: Партии преобразованных данных сохраняются во вспомогательной таблице буфера для эффективных массовых операций.
  • Обработка материализованного представления: К данным во вспомогательной таблице применяется логика представления, выполняются преобразования или фильтрация.
  • Передача данных: Обработанные данные затем передаются в конечную таблицу реального времени.
  • Очистка: Вспомогательная таблица очищается после каждой партии, чтобы подготовиться к приему следующего набора данных.
<!-- proofread -->