Back to Kurrentdb

Kafka Source

docs/server/features/connectors/sources/kafka.md

26.1.07.8 KB
Original Source
<Badge type="info" vertical="middle" text="License Required"/>

Overview

The Kafka Source Connector enables you to consume messages from an existing Kafka topic and append them into a KurrentDB stream.

Quick Start

You can create the Kafka Source connector as follows. Replace {id} with your desired connector ID:

http
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json

{
  "settings": {
    "instanceTypeName": "kafka-source",
    "topic": "{{topic}}",
    "consumer:bootstrapServers": "localhost:9092",
    "schemaName": "{{schemaName}}"
  }
}

Settings

Core Options

NameDetails
topicrequired

Description: Kafka topic to consume from. | | schemaName | required

Description: Name of the schema used for message. | | partition | Description: Specific partition to consume from (leave empty for all partitions). | | offset | Description: Starting offset position (leave empty for default behavior). | | preserveMagicByte | Description: Whether to preserve Kafka's schema registry magic byte.

Default: false |

Concurrency Configuration

The Kafka Source Connector processes messages in parallel with multiple consumer tasks. Each task has its own consumer and buffer, all sharing the group kurrentdb-{ConnectorId}. Their channels run independently and merge into one output stream for efficient multi-partition handling.

NameDetails
concurrency:channelCapacityDescription: Capacity of each consumer's bounded channel. Higher values provide more buffering but consume more memory.

Default: 10000 | | concurrency:tasks | Description: The maximum number of tasks to create for this connector.

Default: 1 |

Consumer Configuration

The Consumer section accepts standard Confluent Kafka consumer configuration options. Key options include:

NameDetails
consumer:bootstrapServersDescription: Comma-separated list of Kafka broker addresses.

Default: "localhost:9092" | | consumer:autoOffsetReset | Description: Offset reset behavior when there is no initial offset.

Default: Earliest | | consumer:enableAutoCommit | Description: Enable automatic offset commits.

Default: true | | consumer:autoCommitIntervalMs | Description: Auto-commit interval in milliseconds.

Default: 5000 | | consumer:sessionTimeoutMs | Description: Session timeout in milliseconds.

Default: 45000 | | consumer:enablePartitionEof | Description: Enable end-of-partition notifications.

Default: true | | consumer:securityProtocol | Description: SASL mechanism to use for authentication.

Accepted Values:plaintext, ssl, saslPlaintext

Default: plaintext | | consumer:saslMechanism | Description: Protocol used to communicate with brokers.

Accepted Values:plain

Default: "" | | consumer:sslCaPem | Description: CA certificate string (PEM format) for verifying the broker's key. | | consumer:sslCertificatePem | Description: Client's public key string (PEM format) used for authentication. | | consumer:sslKeyPem | Description: Client's private key string (PEM format) used for authentication. | | consumer:saslUsername | Description: SASL username for use with the PLAIN. | | consumer:saslPassword | Description: SASL password for use with the PLAIN. |

Stream Routing Configuration

The stream section configures how Kafka messages are routed to KurrentDB streams:

NameDetails
stream:strategyDescription: Stream routing strategy.

Accepted Values:

  • topic: Route to stream named after the Kafka topic
  • partitionKey: Route based on Kafka message key
  • fixed: Route all messages to a single stream
  • header: Extract stream name from message headers

Default: topic | | stream:expression | Description: Expression for custom routing, depending on strategy.

  • For fixed: stream name (defaults to topic name if not provided)
  • For header: comma-separated header names to check
  • Otherwise: not used |

Authentication

The default authentication method is plaintext. You can configure the authentication method to use by setting the authentication:securityProtocol option.

SSL/TLS

http
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json

{
    "instanceTypeName": "kafka-source",
    "topic": "customers",

    "consumer:securityProtocol": "ssl",
    "consumer:sslCaPem": "...",
    "consumer:sslCertificatePem": "...",
    "consumer:sslKeyPem": "..."
}

SASL/PLAIN

http
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json

{
    "instanceTypeName": "kafka-source",
    "topic": "customers",

    "consumer:securityProtocol": "saslPlaintext",
    "consumer:saslMechanism": "plain",
    "consumer:saslUsername": "user",
    "consumer:saslPassword": "pass"
}