manual/russian/Integration/Kafka.md
ПРИМЕЧАНИЕ: эта функциональность требует Manticore Buddy. Если она не работает, убедитесь, что Buddy установлен.
Manticore поддерживает интеграцию с Apache Kafka для приема данных в реальном времени через источники Kafka и материализованные представления, что позволяет индексировать и искать данные в реальном времени. В настоящее время протестированы и поддерживаются apache/kafka версии 3.7.0-4.1.0.
Чтобы начать, вам необходимо:
mv) для обработки преобразования данных и сопоставления из Kafka в целевую таблицу в Manticore Search. Здесь вы определите сопоставления полей, преобразования данных, а также любые фильтры или условия для входящего потока данных.Конфигурация source позволяет определить broker, список топиков, группу потребителей и структуру сообщений.
Определите схему, используя типы полей Manticore, такие как int, float, text, json и т.д.
CREATE SOURCE <source name> [(column type, ...)] [source_options]
Все ключи схемы нечувствительны к регистру, то есть Products, products и PrOdUcTs обрабатываются одинаково. Все они преобразуются в нижний регистр.
Если имена ваших полей не соответствуют синтаксису имен полей, разрешенному в Manticore Search (например, если они содержат специальные символы или начинаются с цифр), вы должны определить сопоставление схемы. Например, $keyName или 123field являются допустимыми ключами в JSON, но не являются допустимыми именами полей в Manticore Search. Если вы попытаетесь использовать недопустимые имена полей без правильного сопоставления, Manticore вернет ошибку, и создание источника завершится неудачей.
Для обработки таких случаев используйте следующий синтаксис схемы для сопоставления недопустимых имен полей с допустимыми:
allowed_field_name 'original JSON key name with special symbols' type
Например:
price_field '$price' float -- maps JSON key '$price' to field 'price_field'
field_123 '123field' text -- maps JSON key '123field' to field 'field_123'
CREATE SOURCE kafka
(id bigint, term text, abbrev '$abbrev' text, GlossDef json)
type='kafka'
broker_list='kafka:9092'
topic_list='my-data'
consumer_group='manticore'
num_consumers='2'
batch=50
Query OK, 2 rows affected (0.02 sec)
POST /sql?mode=raw -d "CREATE SOURCE kafka (id bigint, term text, abbrev '$abbrev' text, GlossDef json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore' num_consumers='2' batch=50"
[
{
"total": 2,
"error": "",
"warning": ""
}
]
| Опция | Допустимые значения | Описание |
|---|---|---|
type | kafka | Устанавливает тип источника. В настоящее время поддерживается только kafka |
broker_list | хост:порт [, ...] | Указывает URL-адреса брокеров Kafka |
topic_list | строка [, ...] | Список топиков Kafka для потребления |
consumer_group | строка | Определяет группу потребителей Kafka, по умолчанию manticore. |
num_consumers | int | Количество потребителей для обработки сообщений. |
partition_list | int [, ...] | Список разделов для чтения подробнее. |
batch | int | Количество сообщений для обработки перед переходом к следующей партии. По умолчанию 100; в противном случае оставшиеся сообщения обрабатываются по таймауту |
Целевая таблица — это обычная таблица реального времени, в которой хранятся результаты обработки сообщений Kafka. Эта таблица должна быть определена в соответствии с требованиями схемы входящих данных и оптимизирована для потребностей производительности запросов вашего приложения. Подробнее о создании таблиц реального времени читайте здесь.
<!-- intro -->CREATE TABLE destination_kafka
(id bigint, name text, short_name text, received_at text, size multi);
Query OK, 0 rows affected (0.02 sec)
POST /sql?mode=raw -d "CREATE TABLE destination_kafka (id bigint, name text, short_name text, received_at text, size multi)"
[
{
"total": 0,
"error": "",
"warning": ""
}
]
Материализованное представление позволяет преобразовывать данные из сообщений Kafka. Вы можете переименовывать поля, применять функции Manticore Search, а также выполнять сортировку, группировку и другие операции с данными.
Материализованное представление действует как запрос, который перемещает данные из источника Kafka в целевую таблицу, позволяя вам использовать синтаксис Manticore Search для настройки этих запросов. Убедитесь, что поля в select соответствуют полям в источнике.
CREATE MATERIALIZED VIEW <materialized view name>
TO <destination table name> AS
SELECT [column|function [as <new name>], ...] FROM <source name>
CREATE MATERIALIZED VIEW view_table
TO destination_kafka AS
SELECT id, term as name, abbrev as short_name,
UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka
Query OK, 2 rows affected (0.02 sec)
Данные передаются из Kafka в Manticore Search пакетами, которые очищаются после каждого запуска. Для вычислений между пакетами, таких как AVG, будьте осторожны, так как они могут работать не так, как ожидается, из-за обработки пакет за пакетом.
Вот таблица сопоставления на основе приведенных выше примеров:
| Kafka | Источник | Буфер | МВ | Назначение |
|---|---|---|---|---|
id | id | id | id | id |
term | term | term | term as name | name |
unnecessary_key который нас не интересует | - | - | ||
$abbrev | abbrev | abbrev | abbrev as short_name | short_name |
| - | - | - | UTC_TIMESTAMP() as received_at | received_at |
GlossDef | glossdef | glossdef | glossdef.size as size | size |
Чтобы просмотреть источники и материализованные представления в Manticore Search, используйте следующие команды:
SHOW SOURCES: Выводит список всех настроенных источников.SHOW MVS: Выводит список всех материализованных представлений.SHOW MV view_table: Показывает подробную информацию о конкретном материализованном представлении.SHOW SOURCES
+-------+
| name |
+-------+
| kafka |
+-------+
POST /sql?mode=raw -d "SHOW SOURCES"
[
{
"total": 1,
"error": "",
"warning": "",
"columns": [
{
"name": {
"type": "string"
}
}
],
"data": [
{
"name": "kafka"
}
]
}
]
SHOW SOURCE kafka;
+--------+-------------------------------------------------------------------+
| Source | Create Table |
+--------+-------------------------------------------------------------------+
| kafka | CREATE SOURCE kafka |
| | (id bigint, term text, abbrev '$abbrev' text, GlossDef json) |
| | type='kafka' |
| | broker_list='kafka:9092' |
| | topic_list='my-data' |
| | consumer_group='manticore' |
| | num_consumers='2' |
| | batch=50 |
+--------+-------------------------------------------------------------------+
POST /sql?mode=raw -d "SHOW SOURCE kafka"
[
{
"total": 1,
"error": "",
"warning": "",
"columns": [
{
"Source": {
"type": "string"
}
},
{
"Create Table": {
"type": "string"
}
}
],
"data": [
{
"Source": "kafka",
"Create Table": "CREATE SOURCE kafka \n(id bigint, term text, abbrev '' text, GlossDef json)\ntype='kafka'\nbroker_list='kafka:9092'\ntopic_list='my-data'\nconsumer_group='manticore'\nnum_consumers='2'\n batch=50"
}
]
}
]
SHOW MVS
+------------+
| name |
+------------+
| view_table |
+------------+
POST /sql?mode=raw -d "SHOW MVS"
[
{
"total": 1,
"error": "",
"warning": "",
"columns": [
{
"name": {
"type": "string"
}
}
],
"data": [
{
"name": "view_table"
}
]
}
]
SHOW MV view_table
+------------+--------------------------------------------------------------------------------------------------------+-----------+
| View | Create Table | suspended |
+------------+--------------------------------------------------------------------------------------------------------+-----------+
| view_table | CREATE MATERIALIZED VIEW view_table TO destination_kafka AS | 0 |
| | SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size | |
| | FROM kafka | |
+------------+--------------------------------------------------------------------------------------------------------+-----------+
POST /sql?mode=raw -d "SHOW MV view_table"
[
{
"total": 1,
"error": "",
"warning": "",
"columns": [
{
"View": {
"type": "string"
}
},
{
"Create Table": {
"type": "string"
}
},
{
"suspended": {
"type": "string"
}
}
],
"data": [
{
"View": "view_table",
"Create Table": "CREATE MATERIALIZED VIEW view_table TO destination_kafka AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka",
"suspended": 0
}
]
}
]
Вы можете приостановить потребление данных, изменив материализованные представления.
Если вы удалите source без удаления МП, он автоматически приостановится. После воссоздания источника снимите приостановку МП вручную с помощью команды ALTER.
В настоящее время можно изменять только материализованные представления. Чтобы изменить параметры source, удалите и создайте источник заново.
ALTER MATERIALIZED VIEW view_table suspended=1
Query OK (0.02 sec)
POST /sql?mode=raw -d "ALTER MATERIALIZED VIEW view_table suspended=1"
[
{
"total": 2,
"error": "",
"warning": ""
}
]
Вы также можете указать partition_list для каждой темы Kafka.
Одним из основных преимуществ такого подхода является возможность реализации шардинга вашей таблицы через Kafka.
Для этого следует создать отдельную цепочку source → materialized view → destination table для каждого шарда:
Источники:
CREATE SOURCE kafka_p1 (id bigint, term text)
type='kafka' broker_list='kafka:9092' topic_list='my-data'
consumer_group='manticore' num_consumers='1' partition_list='0' batch=50;
CREATE SOURCE kafka_p2 (id bigint, term text)
type='kafka' broker_list='kafka:9092' topic_list='my-data'
consumer_group='manticore' num_consumers='1' partition_list='1' batch=50;
Таблицы назначения:
CREATE TABLE destination_shard_1 (id bigint, name text);
CREATE TABLE destination_shard_2 (id bigint, name text);
Материализованные представления:
CREATE MATERIALIZED VIEW mv_1 TO destination_shard_1 AS SELECT id, term AS name FROM kafka_p1;
CREATE MATERIALIZED VIEW mv_2 TO destination_shard_2 AS SELECT id, term AS name FROM kafka_p2;
parse.key=truekey.separator={your_delimiter}Иначе Kafka будет распределять сообщения по своим внутренним правилам, что может привести к неравномерному разделению по партициям.
Фиксация смещений Kafka происходит после каждой партии или при истечении времени обработки. Если процесс неожиданно остановится во время выполнения запроса материализованного представления, могут появиться дубликаты записей. Чтобы избежать этого, включите в схему поле id, позволяющее Manticore Search предотвращать дублирование в таблице.