content/kapacitor/v1/reference/event_handlers/kafka.md
Apache Kafka is a distributed streaming platform designed for building real-time data pipelines and streaming apps. Configure Kapacitor to send alert messages to a Kafka cluster.
Configuration as well as default option values for the Kafka event
handler are set in your kapacitor.conf.
Below is an example configuration:
[[kafka]]
enabled = true
id = "localhost"
brokers = []
timeout = "10s"
batch-size = 100
batch-timeout = "1s"
use-ssl = false
ssl-ca = ""
ssl-cert = ""
ssl-key = ""
insecure-skip-verify = false
# Optional SASL configuration
sasl-username = "xxxxx"
sasl-password = "xxxxxxxx"
sasl-extensions = {}
sasl-mechanism = ""
sasl-version = ""
# Use if sasl-mechanism is GSSAPI. GSSAPI is for organizations using Kerberos.
sasl-gssapi-service-name = ""
sasl-gssapi-auth-type = "KRB5_USER_AUTH"
sasl-gssapi-disable-pafxfast = false
sasl-gssapi-kerberos-config-path = "/"
sasl-gssapi-key-tab-path = ""
sasl-gssapi-realm = "realm"
# Options if sasl-mechanism is OAUTHBEARER
sasl-oauth-service = "auth0"
sasl-oauth-client-id = "xxxxxxx"
sasl-oauth-client-secret = "xxxxxxxx"
sasl-oauth-token-url = "dedicated-auth0-token-url"
sasl-oauth-token-expiry-margin = "10s"
sasl-oauth-scopes = ""
sasl-oauth-tenant-id = ""
[kafka.sasl-oauth-parameters]
audience = "development"
sasl-access-token = ""
{{% note %}}
Multiple Kafka clients may be configured with multiple [[kafka]] sections in TOML.
The id acts as a unique identifier for each configured Kafka client.
{{% /note %}}
Set to true to enable the Kafka event handler.
A unique identifier for the Kafka cluster.
List of Kafka broker addresses using the host:port format.
Timeout on network operations with the Kafka brokers. If set to 0, a default of 10s is used.
The number of messages batched before being sent to Kafka. If set to 0, a default of 100 is used.
The maximum amount of time to wait before flushing an incomplete batch. If set to 0, a default of 1s is used.
Enable SSL communication.
Must be true for other SSL options to take effect.
Path to certificate authority file.
Path to host certificate file.
Path to certificate private key file.
Use SSL but skip chain and host verification. (Required if using a self-signed certificate.)
Username to use for SASL authentication.
Password to use for SASL authentication.
Arbitrary key value string pairs to pass as a TOML table
SASL mechanism type. Options are:
GSSAPIOAUTHBEARERPLAINSCRAM-SHA-256SCRAM-SHA-512SASL protocol version.
The service name for GSSAPI.
The authorization type for GSSAPI.
Set to true or false.
Path to the Kerberos config file.
Path to the Kerberos key tab.
Default Kerberos realm.
The service name to use when authenticating with SASL/OAUTH. One of:
"" (empty) or customauth0azureadThe client ID to use when authenticating with SASL/OAUTH.
The client secret to use when authenticating with SASL/OAUTH.
The token URL to use when sasl-oauth-service is custom or auth0. Leave empty otherwise.
The margin for the token's expiration time.
Optional scopes to use when authenticating with SASL/OAUTH.
Tenant ID for the AzureAD service.
The optional key/value params for SASL/OAUTH. e.g. audience for AUTH0
Static OAUTH token. Use this instead of other OAUTH params.
The following Kafka event handler options can be set in a
handler file or when using
.kafka() in a TICKscript.
| Name | Type | Description |
|---|---|---|
| cluster | string | Name of the Kafka cluster. |
| topic | string | Kafka topic. In TICKscripts, this is set using .kafkaTopic(). |
| template | string | Message template. |
| disablePartitionById | boolean | Disable partitioning Kafka messages by message ID. |
| partitionAlgorithm | string | Algorithm to use to assign message IDs to Kafka partitions (crc32 (default), murmur2, or fnv-1a). |
{{% note %}}
In Kapacitor 1.6+, messages with the same ID are sent to the same Kafka partition. Previously, messages were sent to the Kafka partition with the least amount of data, regardless of the message ID. Messages with no ID are spread randomly between partitions. This aligns the Kapacitor concept of message IDs with the Kafka concept of message keys.
To revert to the previous behavior, use the disablePartitionById option.
When partitioning by ID, use the partitionHashAlgorithm to specify the method used to assign message IDs to Kafka partitions. Kapacitor supports the following partitioning algorithms:
librdkafka and confluent-kafka-gosarama project
{{% /note %}}id: kafka-event-handler
topic: kapacitor-topic-name
kind: kafka
options:
cluster: kafka-cluster
topic: kafka-topic-name
template: kafka-template-name
disablePartitionById: false
partitionAlgorithm: crc32
|alert()
// ...
.kafka()
.cluster('kafka-cluster')
.kafkaTopic('kafka-topic-name')
.template('kafka-template-name')
.disablePartitionById(FALSE)
.partitionAlgorithm('crc32')
With the Kafka event handler enabled in your kapacitor.conf, use the .kafka()
attribute in your TICKscripts to send alerts to a Kafka cluster or define a
Kafka handler that subscribes to a topic and sends published alerts to Kafka.
The examples below use the following Kafka configuration defined in the kapacitor.conf:
Kafka settings in kapacitor.conf
[[kafka]]
enabled = true
id = "infra-monitoring"
brokers = ["123.45.67.89:9092", "123.45.67.90:9092"]
timeout = "10s"
batch-size = 100
batch-timeout = "1s"
use-ssl = true
ssl-ca = "/etc/ssl/certs/ca.crt"
ssl-cert = "/etc/ssl/certs/cert.crt"
ssl-key = "/etc/ssl/certs/cert-key.key"
insecure-skip-verify = true
The following TICKscript uses the .kafka() event handler to send the message,
"Hey, check your CPU", whenever idle CPU usage drops below 10%.
It publishes the messages to the cpu-alerts topic in the infra-monitoring
Kafka cluster defined in the kapacitor.conf.
kafka-cpu-alert.tick
stream
|from()
.measurement('cpu')
|alert()
.crit(lambda: "usage_idle" < 10)
.message('Hey, check your CPU')
.kafka()
.kafkaTopic('cpu-alerts')
The following setup sends an alert to the cpu topic with the message, "Hey,
check your CPU". A Kafka handler is added that subscribes to the cpu topic and
publishes all alert messages to the cpu-alerts topic associated with the
infra-monitoring Kafka cluster defined in the kapacitor.conf.
Create a TICKscript that publishes alert messages to a topic.
The TICKscript below sends an alert message to the cpu topic any time CPU
idle usage drops below 10% (or CPU usage is above 90%).
stream
|from()
.measurement('cpu')
|alert()
.crit(lambda: "usage_idle" < 10)
.message('Hey, check your CPU')
.topic('cpu')
Add and enable the TICKscript:
kapacitor define cpu_alert -tick cpu_alert.tick
kapacitor enable cpu_alert
Create a handler file that subscribes to the cpu topic and uses the Kafka
event handler to send alerts to the cpu-alerts topic in Kafka.
id: kafka-cpu-alert
topic: cpu
kind: kafka
options:
topic: 'cpu-alerts'
Add the handler:
kapacitor define-topic-handler kafka_cpu_handler.yaml
To use an authentication method other than SSL, configure Kapacitor to use SASL. An example would be using Kapacitor to authenticate directly against Kafka with a username/password. Multiple configuration options are available, but the most common usage is username and password as shown in the following example:
[[kafka]]
enabled = true
id = "infra-monitoring"
brokers = ["123.45.67.89:9092", "123.45.67.90:9092"]
timeout = "10s"
batch-size = 100
batch-timeout = "1s"
use-ssl = true
ssl-ca = "/etc/ssl/certs/ca.crt"
ssl-cert = "/etc/ssl/certs/cert.crt"
ssl-key = "/etc/ssl/certs/cert-key.key"
insecure-skip-verify = true
sasl-username = "kafka"
sasl-password = "kafkapassword"