docs/server/features/connectors/sinks/kafka.md
The Kafka sink writes events to a Kafka topic using an idempotent producer for reliable delivery. It can extract the partition key from the record based on specific sources such as the stream ID, headers, or record key and also supports basic authentication and resilience features to handle transient errors.
Before using the Kafka sink connector, ensure you have:
::: tip See the Data Protection documentation for instructions on configuring the encryption token. :::
You can create the Kafka Sink connector as follows. Replace {id} with your desired connector ID:
POST /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"settings": {
"instanceTypeName": "kafka-sink",
"bootstrapServers": "localhost:9092",
"topic": "customers",
"subscription:filter:scope": "stream",
"subscription:filter:filterType": "streamId",
"subscription:filter:expression": "example-stream"
}
}
After creating and starting the Kafka sink connector, every time an event is
appended to the example-stream, the Kafka sink connector will send the record
to the specified Kafka topic. You can find a list of available management API
endpoints in the API Reference.
Adjust these settings to specify the behavior and interaction of your Kafka sink connector with KurrentDB, ensuring it operates according to your requirements and preferences.
::: tip The Kafka sink inherits a set of common settings that are used to configure the connector. The settings can be found in the Sink Options page. :::
The Kafka sink can be configured with the following options:
| Name | Details |
|---|---|
topic | required |
Description:
The Kafka topic to produce records to. |
| bootstrapServers | Description:
Comma-separated list of Kafka broker addresses.
Default: "localhost:9092" |
| defaultHeaders | Description:
Headers included in all produced messages.
Default: Empty |
| Name | Details |
|---|---|
authentication:securityProtocol | Description: |
| Protocol used for Kafka broker communication. |
Default: "plaintext"
Accepted Values:
"plaintext", "saslPlaintext" or "saslSsl" |
| authentication:saslMechanism | Description:
SASL mechanism to use for authentication.Default: "plain"
Accepted Values:
"plain", "scramSha256", or "scramSha512" |
| authentication:username | Description:
SASL username |
| authentication:password | Description:
SASL password || Name | Details |
|---|---|
partitionKeyExtraction:enabled | Description: |
| Enables partition key extraction. |
Default: "false" |
| partitionKeyExtraction:source | Description:
Source for extracting the partition key. See Partitioning
Accepted Values:"partitionKey", "stream", "streamSuffix", or "headers"
Default: "partitionKey" |
| partitionKeyExtraction:expression | Description:
Regular expression for extracting the partition key. |
See the Partitioning section for examples.
The Kafka sink connector relies on its own Kafka retry mechanism and doesn't include the configuration from Resilience configuration.
| Name | Details |
|---|---|
waitForBrokerAck | Description: |
| Whether the producer waits for broker acknowledgment before considering the send operation complete. See Broker Acknowledgment |
Default: "false" |
| resilience:reconnectBackoffMaxMs | Description:
The maximum time to wait before reconnecting to a broker after the connection has been closed.
Default: "20000" |
| resilience:messageSendMaxRetries | Description:
How many times to retry sending a failing Message.
Default: "2147483647" |
| Name | Details |
|---|---|
brokerAddressFamily | Description: |
| Allowed broker IP address families. |
Default: "V4"
Accepted Values: "Any","V4", or "V6" |
| compression:type | Description:
Kafka compression type.
Default: "Zstd"
Accepted Values: "None", "Gzip","Lz4", "Zstd", or "Snappy" |
| compression:level | Description:
Kafka compression level.
Default: "6" |
The Kafka sink provides at-least-once delivery by using Kafka's idempotent producer and retry settings. Messages are only checkpointed after successful delivery confirmation from Kafka.
The waitForBrokerAck setting controls when the connector waits for the
broker acknowledgment:
The Kafka sink uses an idempotent producer by default, so when writing to a single partition, Kafka preserves message order regardless of which produce API is used.
If a failure occurs before acknowledgment, the retry mechanism will redeliver. After a restart, the connector resumes from the last checkpointed position, which may cause previously sent—but uncheckpointed—messages to be reprocessed.
The Kafka sink connector lets you include custom headers in the message headers
it sends to your topic. To add custom headers, use the defaultHeaders setting
in your connector configuration. Each custom header should be specified with the
prefix defaultHeaders: followed by the header name.
Example:
PUT /connectors/{id}
Host: localhost:2113
Content-Type: application/json
{
"defaultHeaders:X-API-Key": "your-api-key-here",
"defaultHeaders:X-Tenant-ID": "production-tenant",
"defaultHeaders:X-Source-System": "KurrentDB"
}
These headers will be included in every message sent by the connector, in addition to the default headers automatically added by the connector's plugin.
The Kafka sink connector supports secure communication with Kafka brokers using SASL authentication. By default, the connector communicates in plaintext without authentication. However, you can configure it to use SASL with different security protocols and authentication mechanisms.
::: note
When using saslSsl, the connector uses your system's trusted CA certificates
for SSL/TLS encryption. This works with managed services like AWS MSK,
Confluent Cloud, and Azure Event Hubs. For self-signed or private CA
certificates, add them to your system's trust store first.
:::
PUT /connectors/{id}/settings
Host: localhost:2113
Content-Type: application/json
{
"authentication:securityProtocol": "saslPlaintext",
"authentication:saslMechanism": "plain",
"authentication:username": "my-username",
"authentication:password": "my-password"
}
PUT /connectors/{id}/settings
Host: localhost:2113
Content-Type: application/json
{
"authentication:securityProtocol": "saslPlaintext",
"authentication:saslMechanism": "scramSha256",
"authentication:username": "my-username",
"authentication:password": "my-password"
}
PUT /connectors/{id}/settings
Host: localhost:2113
Content-Type: application/json
{
"authentication:securityProtocol": "saslPlaintext",
"authentication:saslMechanism": "scramSha512",
"authentication:username": "my-username",
"authentication:password": "my-password"
}
For production environments with encryption (recommended for managed Kafka services):
PUT /connectors/{id}/settings
Host: localhost:2113
Content-Type: application/json
{
"authentication:securityProtocol": "saslSsl",
"authentication:saslMechanism": "plain",
"authentication:username": "my-username",
"authentication:password": "my-password"
}
PUT /connectors/{id}/settings
Host: localhost:2113
Content-Type: application/json
{
"authentication:securityProtocol": "saslSsl",
"authentication:saslMechanism": "scramSha256",
"authentication:username": "my-username",
"authentication:password": "my-password"
}
PUT /connectors/{id}/settings
Host: localhost:2113
Content-Type: application/json
{
"authentication:securityProtocol": "saslSsl",
"authentication:saslMechanism": "scramSha512",
"authentication:username": "my-username",
"authentication:password": "my-password"
}
The Kafka sink connector allows customizing the partition keys that are sent with the message.
By default, it will use "partitionKey" and the message will be distributed
using round-robin partitioning across the available partitions in the topic.
Partition using Stream ID
You can extract part of the stream name using a regular expression (regex) to
define the partition key. The expression is optional and can be customized based
on your naming convention. In this example, the expression captures the stream
name up to _data.
PUT /connectors/{id}/settings
Host: localhost:2113
Content-Type: application/json
{
"partitionKeyExtraction:enabled": "true",
"partitionKeyExtraction:source": "stream",
"partitionKeyExtraction:expression": "^(.*)_data$"
}
Alternatively, if you only need the last segment of the stream name (after a
hyphen), you can use the streamSuffix source. This
doesn't require an expression since it automatically extracts the suffix.
PUT /connectors/{id}/settings
Host: localhost:2113
Content-Type: application/json
{
"partitionKeyExtraction:enabled": "true",
"partitionKeyExtraction:source": "streamSuffix"
}
The streamSuffix source is useful when stream names follow a structured
format, and you want to use only the trailing part as the partition key. For
example, if the stream is named user-123, the partition key would be 123.
Partition using header values
You can create partition keys by combining values from a record's metadata.
PUT /connectors/{id}/settings
Host: localhost:2113
Content-Type: application/json
{
"partitionKeyExtraction:enabled": "true",
"partitionKeyExtraction:source": "headers",
"partitionKeyExtraction:expression": "key1,key2"
}
Specify the header keys you want to use in the partitionKeyExtraction:expression field (e.g., key1,key2). The connector will concatenate the header values with a hyphen (-) to create the partition key.
For example, if your event has headers key1: regionA and key2: zone1, the partition key will be regionA-zone1.
Learn how to set up and use a Kafka Sink connector in KurrentDB through a tutorial.