Back to Manticoresearch

Syncing from Kafka

manual/english/Integration/Kafka.md

25.9.015.7 KB
Original Source

Syncing from Kafka

NOTE: this functionality requires Manticore Buddy. If it doesn't work, make sure Buddy is installed.

Manticore supports integration with Apache Kafka real-time data ingestion through Kafka sources and materialized views, allowing for real-time data indexing and search. Currently, apache/kafka versions 3.7.0-4.1.0 are tested and supported.

To get started, you need to:

  1. Define the source: Specify the Kafka topic from which Manticore Search will read messages. This setup includes details like the broker’s host, port, and topic name.
  2. Set up the destination table: Choose a Manticore real-time table to store the incoming Kafka data.
  3. Create a materialized view: Set up a materialized view (mv) to handle data transformation and mapping from Kafka to the destination table in Manticore Search. Here, you’ll define field mappings, data transformations, and any filters or conditions for the incoming data stream.

Source

<!-- example kafka_source -->

The source configuration allows you to define the broker, topic list, consumer group, and the message structure.

Schema

Define the schema using Manticore field types like int, float, text, json, etc.

sql
CREATE SOURCE <source name> [(column type, ...)] [source_options]

All schema keys are case-insensitive, meaning Products, products, and PrOdUcTs are treated the same. They are all converted to lowercase.

If your field names don't match the field name syntax allowed in Manticore Search (for example, if they contain special characters or start with numbers), you must define a schema mapping. For instance, $keyName or 123field are valid keys in JSON but not valid field names in Manticore Search. If you try to use invalid field names without proper mapping, Manticore will return an error and the source creation will fail.

To handle such cases, use the following schema syntax to map invalid field names to valid ones:

allowed_field_name 'original JSON key name with special symbols' type

For example:

sql
price_field '$price' float    -- maps JSON key '$price' to field 'price_field'
field_123 '123field' text     -- maps JSON key '123field' to field 'field_123'
<!-- intro -->
SQL:
<!-- request SQL -->
sql
CREATE SOURCE kafka
(id bigint, term text, abbrev '$abbrev' text, GlossDef json)
type='kafka'
broker_list='kafka:9092'
topic_list='my-data'
consumer_group='manticore'
num_consumers='2'
batch=50
<!-- response -->
Query OK, 2 rows affected (0.02 sec)
<!-- intro -->
JSON:
<!-- request JSON -->
JSON
POST /sql?mode=raw -d "CREATE SOURCE kafka (id bigint, term text, abbrev '$abbrev' text, GlossDef json) type='kafka' broker_list='kafka:9092' topic_list='my-data' consumer_group='manticore' num_consumers='2' batch=50"
<!-- response JSON -->
JSON
[
  {
    "total": 2,
    "error": "",
    "warning": ""
  }
]
<!-- end -->

Options

OptionAccepted ValuesDescription
typekafkaSets the source type. Currently, only kafka is supported
broker_listhost:port [, ...]Specifies Kafka broker URLs
topic_liststring [, ...]Lists Kafka topics to consume from
consumer_groupstringDefines the Kafka consumer group, defaulting to manticore.
num_consumersintNumber of consumers to handle messages.
partition_listint [, ...]List of partitions for reading more.
batchintNumber of messages to process before moving on. Default is 100; processes remaining messages on timeout otherwise

Destination table

<!-- example kafka_destination -->

The destination table is a regular real-time table where the results of Kafka message processing are stored. This table should be defined to match the schema requirements of the incoming data and optimized for the query performance needs of your application. Read more about creating real-time tables here.

<!-- intro -->
SQL:
<!-- request SQL -->
sql
CREATE TABLE destination_kafka
(id bigint, name text, short_name text, received_at text, size multi);
<!-- response -->
Query OK, 0 rows affected (0.02 sec)
<!-- intro -->
JSON:
<!-- request JSON -->
JSON
POST /sql?mode=raw -d "CREATE TABLE destination_kafka (id bigint, name text, short_name text, received_at text, size multi)"
<!-- response JSON -->
JSON
[
  {
    "total": 0,
    "error": "",
    "warning": ""
  }
]
<!-- end -->

Materialized view

<!-- example kafka_mv -->

A materialized view enables data transformation from Kafka messages. You can rename fields, apply Manticore Search functions, and perform sorting, grouping, and other data operations.

A materialized view acts as a query that moves data from the Kafka source to the destination table, letting you use Manticore Search syntax to customize these queries. Make sure that fields in the select match those in the source.

CREATE MATERIALIZED VIEW <materialized view name>
TO <destination table name> AS
SELECT [column|function [as <new name>], ...] FROM <source name>
<!-- intro -->
SQL:
<!-- request SQL -->
sql
CREATE MATERIALIZED VIEW view_table
TO destination_kafka AS
SELECT id, term as name, abbrev as short_name,
       UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka

<!-- response -->
sql
Query OK, 2 rows affected (0.02 sec)
<!-- end -->

Data is transferred from Kafka to Manticore Search in batches, which are cleared after each run. For calculations across batches, such as AVG, use caution, as these may not work as expected due to batch-by-batch processing.

Field Mapping

Here's a mapping table based on the examples above:

KafkaSourceBufferMVDestination
ididididid
termtermtermterm as namename
unnecessary_key which we're not interested in--
$abbrevabbrevabbrevabbrev as short_nameshort_name
---UTC_TIMESTAMP() as received_atreceived_at
GlossDefglossdefglossdefglossdef.size as sizesize

Listing

<!-- example kafka_listing -->

To view sources and materialized views in Manticore Search, use these commands:

  • SHOW SOURCES: Lists all configured sources.
  • SHOW MVS: Lists all materialized views.
  • SHOW MV view_table: Shows detailed information on a specific materialized view.
<!-- intro -->
SQL:
<!-- request SQL -->
sql
SHOW SOURCES
<!-- response -->
+-------+
| name  |
+-------+
| kafka |
+-------+
<!-- intro -->
JSON:
<!-- request JSON -->
JSON
POST /sql?mode=raw -d "SHOW SOURCES"
<!-- response JSON -->
JSON
[
  {
    "total": 1,
    "error": "",
    "warning": "",
    "columns": [
      {
        "name": {
          "type": "string"
        }
      }
    ],
    "data": [
      {
        "name": "kafka"
      }
    ]
  }
]
<!-- end --> <!-- example kafka_create_source --> <!-- intro -->
SQL:
<!-- request SQL -->
sql
SHOW SOURCE kafka;
<!-- response -->
+--------+-------------------------------------------------------------------+
| Source | Create Table                                                      |
+--------+-------------------------------------------------------------------+
| kafka  | CREATE SOURCE kafka                                               |
|        | (id bigint, term text, abbrev '$abbrev' text, GlossDef json)      |
|        | type='kafka'                                                      |
|        | broker_list='kafka:9092'                                          |
|        | topic_list='my-data'                                              |
|        | consumer_group='manticore'                                        |
|        | num_consumers='2'                                                 |
|        | batch=50                                                          |
+--------+-------------------------------------------------------------------+
<!-- intro -->
JSON:
<!-- request JSON -->
JSON
POST /sql?mode=raw -d "SHOW SOURCE kafka"
<!-- response JSON -->
JSON
[
  {
    "total": 1,
    "error": "",
    "warning": "",
    "columns": [
      {
        "Source": {
          "type": "string"
        }
      },
      {
        "Create Table": {
          "type": "string"
        }
      }
    ],
    "data": [
      {
        "Source": "kafka",
        "Create Table": "CREATE SOURCE kafka \n(id bigint, term text, abbrev '' text, GlossDef json)\ntype='kafka'\nbroker_list='kafka:9092'\ntopic_list='my-data'\nconsumer_group='manticore'\nnum_consumers='2'\n batch=50"
      }
    ]
  }
]
<!-- end --> <!-- example kafka_view --> <!-- intro -->
SQL:
<!-- request SQL -->
sql
SHOW MVS
<!-- response -->
+------------+
| name       |
+------------+
| view_table |
+------------+
<!-- intro -->
JSON:
<!-- request JSON -->
JSON
POST /sql?mode=raw -d "SHOW MVS"
<!-- response JSON -->
JSON
[
  {
    "total": 1,
    "error": "",
    "warning": "",
    "columns": [
      {
        "name": {
          "type": "string"
        }
      }
    ],
    "data": [
      {
        "name": "view_table"
      }
    ]
  }
]
<!-- end --> <!-- example kafka_show --> <!-- intro -->
SQL:
<!-- request SQL -->
sql
SHOW MV view_table
<!-- response -->
+------------+--------------------------------------------------------------------------------------------------------+-----------+
| View       | Create Table                                                                                           | suspended |
+------------+--------------------------------------------------------------------------------------------------------+-----------+
| view_table | CREATE MATERIALIZED VIEW view_table TO destination_kafka AS                                            | 0         |
|            | SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size   |           |
|            | FROM kafka                                                                                             |           |
+------------+--------------------------------------------------------------------------------------------------------+-----------+
<!-- intro -->
JSON:
<!-- request JSON -->
sql
POST /sql?mode=raw -d "SHOW MV view_table"
<!-- response JSON -->
JSON
[
  {
    "total": 1,
    "error": "",
    "warning": "",
    "columns": [
      {
        "View": {
          "type": "string"
        }
      },
      {
        "Create Table": {
          "type": "string"
        }
      },
      {
        "suspended": {
          "type": "string"
        }
      }
    ],
    "data": [
      {
        "View": "view_table",
        "Create Table": "CREATE MATERIALIZED VIEW view_table TO destination_kafka AS SELECT id, term as name, abbrev as short_name, UTC_TIMESTAMP() as received_at, GlossDef.size as size FROM kafka",
        "suspended": 0
      }
    ]
  }
]
<!-- end -->

Altering materialized views

<!-- example mv_suspend -->

You can suspend data consumption by altering materialized views.

If you remove the source without deleting the MV, it automatically suspends. After recreating the source, unsuspend the MV manually using the ALTER command.

Currently, only materialized views can be altered. To change source parameters, drop and recreate the source.

<!-- intro -->
SQL:
<!-- request SQL -->
sql
ALTER MATERIALIZED VIEW view_table suspended=1
<!-- response -->
sql
Query OK (0.02 sec)
<!-- intro -->
JSON:
<!-- request JSON -->
JSON
POST /sql?mode=raw -d "ALTER MATERIALIZED VIEW view_table suspended=1"
<!-- response JSON -->
JSON
[
  {
    "total": 2,
    "error": "",
    "warning": ""
  }
]
<!-- end -->

Sharding with Kafka

You can also specify a partition_list for each Kafka topic. One of the main benefits of this approach is the ability to implement sharding for your table via Kafka. To achieve this, you should create a separate chain of sourcematerialized viewdestination table for each shard:

Sources:

sql
CREATE SOURCE kafka_p1 (id bigint, term text)
  type='kafka' broker_list='kafka:9092' topic_list='my-data'
  consumer_group='manticore' num_consumers='1' partition_list='0' batch=50;

CREATE SOURCE kafka_p2 (id bigint, term text)
  type='kafka' broker_list='kafka:9092' topic_list='my-data'
  consumer_group='manticore' num_consumers='1' partition_list='1' batch=50;

Destination Tables:

sql
CREATE TABLE destination_shard_1 (id bigint, name text);
CREATE TABLE destination_shard_2 (id bigint, name text);

Materialized Views:

sql
CREATE MATERIALIZED VIEW mv_1 TO destination_shard_1 AS SELECT id, term AS name FROM kafka_p1;
CREATE MATERIALIZED VIEW mv_2 TO destination_shard_2 AS SELECT id, term AS name FROM kafka_p2;

⚠️ Important Notes:

  • In this setup, rebalancing must be managed manually.
  • Kafka does not distribute messages using a round-robin strategy by default.
  • To achieve round-robin-like distribution when sending data, make sure your Kafka producer is configured with:
    • parse.key=true
    • key.separator={your_delimiter}

Otherwise, Kafka will distribute messages based on its own internal rules, which may lead to uneven partitioning.

Troubleshooting

Duplicate entries

Kafka offsets commit after each batch or when processing times out. If the process stops unexpectedly during a materialized view query, you may see duplicate entries. To avoid this, include an id field in your schema, allowing Manticore Search to prevent duplicates in the table.

How it works internally

  • Worker initialization: After configuring a source and materialized view, Manticore Search sets up a dedicated worker to handle data ingestion from Kafka.
  • Message mapping: Messages are mapped according to the source configuration schema, transforming them into a structured format.
  • Batching: Messages are grouped into batches for efficient processing. Batch size can be adjusted to suit your performance and latency needs.
  • Buffering: Mapped data batches are stored in a buffer table for efficient bulk operations.
  • Materialized view processing: The view logic is applied to data in the buffer table, performing any transformations or filtering.
  • Data transfer: Processed data is then transferred to the destination real-time table.
  • Cleanup: The buffer table is cleared after each batch, ensuring it’s ready for the next set of data.
<!-- proofread -->