docs/server/features/connectors/sources/kafka.md
The Kafka Source Connector enables you to consume messages from an existing Kafka topic and append them into a KurrentDB stream.
You can create the Kafka Source connector as follows. Replace {id} with your desired connector ID:
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"settings": {
"instanceTypeName": "kafka-source",
"topic": "{{topic}}",
"consumer:bootstrapServers": "localhost:9092",
"schemaName": "{{schemaName}}"
}
}
| Name | Details |
|---|---|
topic | required |
Description: Kafka topic to consume from. |
| schemaName | required
Description: Name of the schema used for message. |
| partition | Description: Specific partition to consume from (leave empty for all partitions). |
| offset | Description: Starting offset position (leave empty for default behavior). |
| preserveMagicByte | Description: Whether to preserve Kafka's schema registry magic byte.
Default: false |
The Kafka Source Connector processes messages in parallel with multiple consumer tasks. Each task has its own consumer
and buffer, all sharing the group kurrentdb-{ConnectorId}. Their channels run independently and merge into one output
stream for efficient multi-partition handling.
| Name | Details |
|---|---|
concurrency:channelCapacity | Description: Capacity of each consumer's bounded channel. Higher values provide more buffering but consume more memory. |
Default: 10000 |
| concurrency:tasks | Description: The maximum number of tasks to create for this connector.
Default: 1 |
The Consumer section accepts standard Confluent Kafka consumer configuration options. Key options include:
| Name | Details |
|---|---|
consumer:bootstrapServers | Description: Comma-separated list of Kafka broker addresses. |
Default: "localhost:9092" |
| consumer:autoOffsetReset | Description: Offset reset behavior when there is no initial offset.
Default: Earliest |
| consumer:enableAutoCommit | Description: Enable automatic offset commits.
Default: true |
| consumer:autoCommitIntervalMs | Description: Auto-commit interval in milliseconds.
Default: 5000 |
| consumer:sessionTimeoutMs | Description: Session timeout in milliseconds.
Default: 45000 |
| consumer:enablePartitionEof | Description: Enable end-of-partition notifications.
Default: true |
| consumer:securityProtocol | Description: SASL mechanism to use for authentication.
Accepted Values:plaintext, ssl, saslPlaintext
Default: plaintext |
| consumer:saslMechanism | Description: Protocol used to communicate with brokers.
Accepted Values:plain
Default: "" |
| consumer:sslCaPem | Description: CA certificate string (PEM format) for verifying the broker's key. |
| consumer:sslCertificatePem | Description: Client's public key string (PEM format) used for authentication. |
| consumer:sslKeyPem | Description: Client's private key string (PEM format) used for authentication. |
| consumer:saslUsername | Description: SASL username for use with the PLAIN. |
| consumer:saslPassword | Description: SASL password for use with the PLAIN. |
The stream section configures how Kafka messages are routed to KurrentDB streams:
| Name | Details |
|---|---|
stream:strategy | Description: Stream routing strategy. |
Accepted Values:
topic: Route to stream named after the Kafka topicpartitionKey: Route based on Kafka message keyfixed: Route all messages to a single streamheader: Extract stream name from message headersDefault: topic |
| stream:expression | Description: Expression for custom routing, depending on strategy.
fixed: stream name (defaults to topic name if not provided)header: comma-separated header names to checkThe default authentication method is plaintext. You can configure the authentication method to use by setting the
authentication:securityProtocol option.
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"instanceTypeName": "kafka-source",
"topic": "customers",
"consumer:securityProtocol": "ssl",
"consumer:sslCaPem": "...",
"consumer:sslCertificatePem": "...",
"consumer:sslKeyPem": "..."
}
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"instanceTypeName": "kafka-source",
"topic": "customers",
"consumer:securityProtocol": "saslPlaintext",
"consumer:saslMechanism": "plain",
"consumer:saslUsername": "user",
"consumer:saslPassword": "pass"
}