Back to Charts

⚠️ Repo Archive Notice

incubator/kafka/README.md

latest41.7 KB
Original Source

⚠️ Repo Archive Notice

As of Nov 13, 2020, charts in this repo will no longer be updated. For more information, see the Helm Charts Deprecation and Archive Notice, and Update.

Apache Kafka Helm Chart

This is an implementation of Kafka StatefulSet found here:

DEPRECATION NOTICE

This chart is deprecated and no longer supported.

Pre Requisites:

  • Kubernetes 1.3 with alpha APIs enabled and support for storage classes

  • PV support on underlying infrastructure

  • Requires at least v2.0.0-beta.1 version of helm to support dependency management with requirements.yaml

StatefulSet Details

StatefulSet Caveats

Chart Details

This chart will do the following:

  • Implement a dynamically scalable kafka cluster using Kubernetes StatefulSets

  • Implement a dynamically scalable zookeeper cluster as another Kubernetes StatefulSet required for the Kafka cluster above

  • Expose Kafka protocol endpoints via NodePort services (optional)

Installing the Chart

To install the chart with the release name my-kafka in the default namespace:

$ helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
$ helm install --name my-kafka incubator/kafka

If using a dedicated namespace(recommended) then make sure the namespace exists with:

$ helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
$ kubectl create ns kafka
$ helm install --name my-kafka --namespace kafka incubator/kafka

This chart includes a ZooKeeper chart as a dependency to the Kafka cluster in its requirement.yaml by default. The chart can be customized using the following configurable parameters:

ParameterDescriptionDefault
imageKafka Container image nameconfluentinc/cp-kafka
imageTagKafka Container image tag5.0.1
imagePullPolicyKafka Container pull policyIfNotPresent
replicasKafka Brokers3
componentKafka k8s selector keykafka
resourcesKafka resource requests and limits{}
securityContextKafka containers security context{}
kafkaHeapOptionsKafka broker JVM heap options-Xmx1G-Xms1G
logSubPathSubpath under persistence.mountPath where kafka logs will be placed.logs
schedulerNameName of Kubernetes scheduler (other than the default)nil
serviceAccountNameName of Kubernetes serviceAccount. Useful when needing to pull images from custom repositoriesnil
priorityClassNameName of Kubernetes Pod PriorityClass. https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/#priorityclassnil
affinityDefines affinities and anti-affinities for pods as defined in: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity preferences{}
tolerationsList of node tolerations for the pods. https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/[]
headless.annotationsList of annotations for the headless service. https://kubernetes.io/docs/concepts/services-networking/service/#headless-services[]
headless.targetPortTarget port to be used for the headless service. This is not a required value.nil
headless.portPort to be used for the headless service. https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/9092
external.enabledIf True, exposes Kafka brokers via NodePort (PLAINTEXT by default)false
external.dns.useInternalIf True, add Annotation for internal DNS servicefalse
external.dns.useExternalIf True, add Annotation for external DNS servicetrue
external.servicePortTCP port configured at external services (one per pod) to relay from NodePort to the external listener port.'19092'
external.firstListenerPortTCP port which is added pod index number to arrive at the port used for NodePort and external listener port.'31090'
external.domainDomain in which to advertise Kafka external listeners.cluster.local
external.typeService Type.NodePort
external.distinctDistinct DNS entries for each created A record.false
external.annotationsAdditional annotations for the external service.{}
external.labelsAdditional labels for the external service.{}
external.loadBalancerIPAdd Static IP to the type Load Balancer. Depends on the provider if enabled[]
external.loadBalancerSourceRangesAdd IP ranges that are allowed to access the Load Balancer.[]
podAnnotationsAnnotation to be added to Kafka pods{}
podLabelsLabels to be added to Kafka pods{}
podDisruptionBudgetDefine a Disruption Budget for the Kafka Pods{}
envOverridesAdd additional Environment Variables in the dictionary format{ zookeeper.sasl.enabled: "False" }
configurationOverridesKafka configuration setting overrides in the dictionary format{ "confluent.support.metrics.enable": false }
secretsPass any secrets to the kafka pods. Each secret will be passed as an environment variable by default. The secret can also be mounted to a specific path if required. Environment variable names are generated as: <secretName>_<secretKey> (All upper case){}
additionalPortsAdditional ports to expose on brokers. Useful when the image exposes metrics (like prometheus, etc.) through a javaagent instead of a sidecar{}
readinessProbe.initialDelaySecondsNumber of seconds before probe is initiated.30
readinessProbe.periodSecondsHow often (in seconds) to perform the probe.10
readinessProbe.timeoutSecondsNumber of seconds after which the probe times out.5
readinessProbe.successThresholdMinimum consecutive successes for the probe to be considered successful after having failed.1
readinessProbe.failureThresholdAfter the probe fails this many times, pod will be marked Unready.3
terminationGracePeriodSecondsWait up to this many seconds for a broker to shut down gracefully, after which it is killed60
updateStrategyStatefulSet update strategy to use.{ type: "OnDelete" }
podManagementPolicyStart and stop pods in Parallel or OrderedReady (one-by-one.) Can not change after first release.OrderedReady
persistence.enabledUse a PVC to persist datatrue
persistence.sizeSize of data volume1Gi
persistence.mountPathMount path of data volume/opt/kafka/data
persistence.storageClassStorage class of backing PVCnil
jmx.configMap.enabledEnable the default ConfigMap for JMXtrue
jmx.configMap.overrideConfigAllows config file to be generated by passing values to ConfigMap{}
jmx.configMap.overrideNameAllows setting the name of the ConfigMap to be used""
jmx.portThe jmx port which JMX style metrics are exposed (note: these are not scrapeable by Prometheus)5555
jmx.whitelistObjectNamesAllows setting which JMX objects you want to expose to via JMX stats to JMX Exporter(see values.yaml)
nodeSelectorNode labels for pod assignment{}
prometheus.jmx.resourcesAllows setting resource limits for jmx sidecar container{}
prometheus.jmx.enabledWhether or not to expose JMX metrics to Prometheusfalse
prometheus.jmx.imageJMX Exporter container imagesolsson/kafka-prometheus-jmx-exporter@sha256
prometheus.jmx.imageTagJMX Exporter container image taga23062396cd5af1acdf76512632c20ea6be76885dfc20cd9ff40fb23846557e8
prometheus.jmx.intervalInterval that Prometheus scrapes JMX metrics when using Prometheus Operator10s
prometheus.jmx.scrapeTimeoutTimeout that Prometheus scrapes JMX metrics when using Prometheus Operator10s
prometheus.jmx.portJMX Exporter Port which exposes metrics in Prometheus format for scraping5556
prometheus.kafka.enabledWhether or not to create a separate Kafka exporterfalse
prometheus.kafka.imageKafka Exporter container imagedanielqsj/kafka-exporter
prometheus.kafka.imageTagKafka Exporter container image tagv1.2.0
prometheus.kafka.intervalInterval that Prometheus scrapes Kafka metrics when using Prometheus Operator10s
prometheus.kafka.scrapeTimeoutTimeout that Prometheus scrapes Kafka metrics when using Prometheus Operator10s
prometheus.kafka.portKafka Exporter Port which exposes metrics in Prometheus format for scraping9308
prometheus.kafka.resourcesAllows setting resource limits for kafka-exporter pod{}
prometheus.kafka.affinityDefines affinities and anti-affinities for pods as defined in: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity preferences{}
prometheus.kafka.tolerationsList of node tolerations for the pods. https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/[]
prometheus.operator.enabledTrue if using the Prometheus Operator, False if notfalse
prometheus.operator.serviceMonitor.namespaceNamespace in which to install the ServiceMonitor resource. Default to kube-prometheus install.monitoring
prometheus.operator.serviceMonitor.releaseNamespaceSet namespace to release namespace. Default falsefalse
prometheus.operator.serviceMonitor.selectorDefault to kube-prometheus install (CoreOS recommended), but should be set according to Prometheus install{ prometheus: kube-prometheus }
prometheus.operator.prometheusRule.enabledTrue to create a PrometheusRule resource for Prometheus Operator, False if notfalse
prometheus.operator.prometheusRule.namespaceNamespace in which to install the PrometheusRule resource. Default to kube-prometheus install.monitoring
prometheus.operator.prometheusRule.releaseNamespaceSet namespace to release namespace. Default falsefalse
prometheus.operator.prometheusRule.selectorDefault to kube-prometheus install (CoreOS recommended), but should be set according to Prometheus install{ prometheus: kube-prometheus }
prometheus.operator.prometheusRule.rulesDefine the prometheus rules. See values file for examples{}
configJob.backoffLimitNumber of retries before considering kafka-config job as failed6
topicsList of topics to create & configure. Can specify name, partitions, replicationFactor, reassignPartitions, config. See values.yaml[] (Empty list)
testsEnabledEnable/disable the chart's teststrue
zookeeper.enabledIf True, installs Zookeeper Charttrue
zookeeper.resourcesZookeeper resource requests and limits{}
zookeeper.envEnvironmental variables provided to Zookeeper Zookeeper{ZK_HEAP_SIZE: "1G"}
zookeeper.storageZookeeper Persistent volume size2Gi
zookeeper.image.PullPolicyZookeeper Container pull policyIfNotPresent
zookeeper.urlURL of Zookeeper Cluster (unneeded if installing Zookeeper Chart)""
zookeeper.portPort of Zookeeper Cluster2181
zookeeper.affinityDefines affinities and anti-affinities for pods as defined in: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity preferences{}
zookeeper.nodeSelectorNode labels for pod assignment{}

Specify parameters using --set key=value[,key=value] argument to helm install

Alternatively a YAML file that specifies the values for the parameters can be provided like this:

bash
$ helm install --name my-kafka -f values.yaml incubator/kafka

Connecting to Kafka from inside Kubernetes

You can connect to Kafka by running a simple pod in the K8s cluster like this with a configuration like this:

yaml
apiVersion: v1
kind: Pod
metadata:
  name: testclient
  namespace: kafka
spec:
  containers:
  - name: kafka
    image: solsson/kafka:0.11.0.0
    command:
      - sh
      - -c
      - "exec tail -f /dev/null"

Once you have the testclient pod above running, you can list all kafka topics with:

kubectl -n kafka exec -ti testclient -- ./bin/kafka-topics.sh --zookeeper my-release-zookeeper:2181 --list

Where my-release is the name of your helm release.

Extensions

Kafka has a rich ecosystem, with lots of tools. This sections is intended to compile all of those tools for which a corresponding Helm chart has already been created.

  • Schema-registry - A confluent project that provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving Avro schemas.

Connecting to Kafka from outside Kubernetes

NodePort External Service Type

Review and optionally override to enable the example text concerned with external access in values.yaml.

Once configured, you should be able to reach Kafka via NodePorts, one per replica. In kops where private, topology is enabled, this feature publishes an internal round-robin DNS record using the following naming scheme. The external access feature of this chart was tested with kops on AWS using flannel networking. If you wish to enable external access to Kafka running in kops, your security groups will likely need to be adjusted to allow non-Kubernetes nodes (e.g. bastion) to access the Kafka external listener port range.

{{ .Release.Name }}.{{ .Values.external.domain }}

If external.distinct is set theses entries will be prefixed with the replica number or broker id.

{{ .Release.Name }}-<BROKER_ID>.{{ .Values.external.domain }}

Port numbers for external access used at container and NodePort are unique to each container in the StatefulSet. Using the default external.firstListenerPort number with a replicas value of 3, the following container and NodePorts will be opened for external access: 31090, 31091, 31092. All of these ports should be reachable from any host to NodePorts are exposed because Kubernetes routes each NodePort from entry node to pod/container listening on the same port (e.g. 31091).

The external.servicePort at each external access service (one such service per pod) is a relay toward the a containerPort with a number matching its respective NodePort. The range of NodePorts is set, but should not actually listen, on all Kafka pods in the StatefulSet. As any given pod will listen only one such port at a time, setting the range at every Kafka pod is a reasonably safe configuration.

Example values.yml for external service type NodePort

The + lines are with the updated values.

 external:
-  enabled: false
+  enabled: true
   # type can be either NodePort or LoadBalancer
   type: NodePort
   # annotations:
@@ -170,14 +170,14 @@ configurationOverrides:
   ##
   ## Setting "advertised.listeners" here appends to "PLAINTEXT://${POD_IP}:9092,", ensure you update the domain
   ## If external service type is Nodeport:
-  # "advertised.listeners": |-
-  #   EXTERNAL://kafka.cluster.local:$((31090 + ${KAFKA_BROKER_ID}))
+  "advertised.listeners": |-
+    EXTERNAL://kafka.cluster.local:$((31090 + ${KAFKA_BROKER_ID}))
   ## If external service type is LoadBalancer and distinct is true:
   # "advertised.listeners": |-
   #   EXTERNAL://kafka-$((${KAFKA_BROKER_ID})).cluster.local:19092
   ## If external service type is LoadBalancer and distinct is false:
   # "advertised.listeners": |-
   #   EXTERNAL://EXTERNAL://${LOAD_BALANCER_IP}:31090
   ## Uncomment to define the EXTERNAL Listener protocol
-  # "listener.security.protocol.map": |-
-  #   PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
+  "listener.security.protocol.map": |-
+    PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT


$ kafkacat -b kafka.cluster.local:31090 -L
Metadata for all topics (from broker 0: kafka.cluster.local:31090/0):
 3 brokers:
  broker 2 at kafka.cluster.local:31092
  broker 1 at kafka.cluster.local:31091
  broker 0 at kafka.cluster.local:31090
 0 topics:

$ kafkacat -b kafka.cluster.local:31090 -P -t test1 -p 0
msg01 from external producer to topic test1

$ kafkacat -b kafka.cluster.local:31090 -C -t test1 -p 0
msg01 from external producer to topic test1

LoadBalancer External Service Type

The load balancer external service type differs from the node port type by routing to the external.servicePort specified in the service for each statefulset container (if external.distinct is set). If external.distinct is false, external.servicePort is unused and will be set to the sum of external.firstListenerPort and the replica number. It is important to note that external.firstListenerPort does not have to be within the configured node port range for the cluster, however a node port will be allocated.

Example values.yml and DNS setup for external service type LoadBalancer with external.distinct: true

The + lines are with the updated values.

 external:
-  enabled: false
+  enabled: true
   # type can be either NodePort or LoadBalancer
-  type: NodePort
+  type: LoadBalancer
   # annotations:
   #  service.beta.kubernetes.io/openstack-internal-load-balancer: "true"
   dns:
@@ -138,10 +138,10 @@ external:
   # If using external service type LoadBalancer and external dns, set distinct to true below.
   # This creates an A record for each statefulset pod/broker. You should then map the
   # A record of the broker to the EXTERNAL IP given by the LoadBalancer in your DNS server.
-  distinct: false
+  distinct: true
   servicePort: 19092
   firstListenerPort: 31090
-  domain: cluster.local
+  domain: example.com
   loadBalancerIP: []
   init:
     image: "lwolf/kubectl_deployer"
@@ -173,11 +173,11 @@ configurationOverrides:
   # "advertised.listeners": |-
   #   EXTERNAL://kafka.cluster.local:$((31090 + ${KAFKA_BROKER_ID}))
   ## If external service type is LoadBalancer and distinct is true:
-  # "advertised.listeners": |-
-  #   EXTERNAL://kafka-$((${KAFKA_BROKER_ID})).cluster.local:19092
+  "advertised.listeners": |-
+    EXTERNAL://kafka-$((${KAFKA_BROKER_ID})).example.com:19092
   ## Uncomment to define the EXTERNAL Listener protocol
-  # "listener.security.protocol.map": |-
-  #   PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
+  "listener.security.protocol.map": |-
+    PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT

$ kubectl -n kafka get svc
NAME                       TYPE           CLUSTER-IP      EXTERNAL-IP      PORT(S)                      AGE
kafka                      ClusterIP      10.39.241.217   <none>           9092/TCP                     2m39s
kafka-0-external           LoadBalancer   10.39.242.45    35.200.238.174   19092:30108/TCP              2m39s
kafka-1-external           LoadBalancer   10.39.241.90    35.244.44.162    19092:30582/TCP              2m39s
kafka-2-external           LoadBalancer   10.39.243.160   35.200.149.80    19092:30539/TCP              2m39s
kafka-headless             ClusterIP      None            <none>           9092/TCP                     2m39s
kafka-zookeeper            ClusterIP      10.39.249.70    <none>           2181/TCP                     2m39s
kafka-zookeeper-headless   ClusterIP      None            <none>           2181/TCP,3888/TCP,2888/TCP   2m39s

DNS A record entries:
kafka-0.example.com A record 35.200.238.174 TTL 60sec
kafka-1.example.com A record 35.244.44.162 TTL 60sec
kafka-2.example.com A record 35.200.149.80 TTL 60sec

$ ping kafka-0.example.com
PING kafka-0.example.com (35.200.238.174): 56 data bytes

$ kafkacat -b kafka-0.example.com:19092 -L
Metadata for all topics (from broker 0: kafka-0.example.com:19092/0):
 3 brokers:
  broker 2 at kafka-2.example.com:19092
  broker 1 at kafka-1.example.com:19092
  broker 0 at kafka-0.example.com:19092
 0 topics:

$ kafkacat -b kafka-0.example.com:19092 -P -t gkeTest -p 0
msg02 for topic gkeTest

$ kafkacat -b kafka-0.example.com:19092 -C -t gkeTest -p 0
msg02 for topic gkeTest

Example values.yml and DNS setup for external service type LoadBalancer with external.distinct: false

The + lines are with the updated values.

 external:
-  enabled: false
+  enabled: true
   # type can be either NodePort or LoadBalancer
-  type: NodePort
+  type: LoadBalancer
   # annotations:
   #  service.beta.kubernetes.io/openstack-internal-load-balancer: "true"
   dns:
@@ -138,10 +138,10 @@ external:
   distinct: false
   servicePort: 19092
   firstListenerPort: 31090
   domain: cluster.local
   loadBalancerIP: [35.200.238.174,35.244.44.162,35.200.149.80]
   init:
     image: "lwolf/kubectl_deployer"
@@ -173,11 +173,11 @@ configurationOverrides:
   # "advertised.listeners": |-
   #   EXTERNAL://kafka.cluster.local:$((31090 + ${KAFKA_BROKER_ID}))
   ## If external service type is LoadBalancer and distinct is true:
-  # "advertised.listeners": |-
-  #   EXTERNAL://kafka-$((${KAFKA_BROKER_ID})).cluster.local:19092
+  "advertised.listeners": |-
+    EXTERNAL://${LOAD_BALANCER_IP}:31090
   ## Uncomment to define the EXTERNAL Listener protocol
-  # "listener.security.protocol.map": |-
-  #   PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
+  "listener.security.protocol.map": |-
+    PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT

$ kubectl -n kafka get svc
NAME                       TYPE           CLUSTER-IP      EXTERNAL-IP      PORT(S)                      AGE
kafka                      ClusterIP      10.39.241.217   <none>           9092/TCP                     2m39s
kafka-0-external           LoadBalancer   10.39.242.45    35.200.238.174   31090:30108/TCP              2m39s
kafka-1-external           LoadBalancer   10.39.241.90    35.244.44.162    31090:30582/TCP              2m39s
kafka-2-external           LoadBalancer   10.39.243.160   35.200.149.80    31090:30539/TCP              2m39s
kafka-headless             ClusterIP      None            <none>           9092/TCP                     2m39s
kafka-zookeeper            ClusterIP      10.39.249.70    <none>           2181/TCP                     2m39s
kafka-zookeeper-headless   ClusterIP      None            <none>           2181/TCP,3888/TCP,2888/TCP   2m39s

$ kafkacat -b 35.200.238.174:31090 -L
Metadata for all topics (from broker 0: 35.200.238.174:31090/0):
 3 brokers:
  broker 2 at 35.200.149.80:31090
  broker 1 at 35.244.44.162:31090
  broker 0 at 35.200.238.174:31090
 0 topics:

$ kafkacat -b 35.200.238.174:31090 -P -t gkeTest -p 0
msg02 for topic gkeTest

$ kafkacat -b 35.200.238.174:31090 -C -t gkeTest -p 0
msg02 for topic gkeTest

Known Limitations

  • Only supports storage options that have backends for persistent volume claims (tested mostly on AWS)
  • KAFKA_PORT will be created as an envvar and brokers will fail to start when there is a service named kafka in the same namespace. We work around this be unsetting that envvar unset KAFKA_PORT.

Prometheus Stats

Prometheus vs Prometheus Operator

Standard Prometheus is the default monitoring option for this chart. This chart also supports the CoreOS Prometheus Operator, which can provide additional functionality like automatically updating Prometheus and Alert Manager configuration. If you are interested in installing the Prometheus Operator please see the CoreOS repository for more information or read through the CoreOS blog post introducing the Prometheus Operator

JMX Exporter

The majority of Kafka statistics are provided via JMX and are exposed via the Prometheus JMX Exporter.

The JMX Exporter is a general purpose prometheus provider which is intended for use with any Java application. Because of this, it produces a number of statistics which may not be of interest. To help in reducing these statistics to their relevant components we have created a curated whitelist whitelistObjectNames for the JMX exporter. This whitelist may be modified or removed via the values configuration.

To accommodate compatibility with the Prometheus metrics, this chart performs transformations of raw JMX metrics. For example, broker names and topics names are incorporated into the metric name instead of becoming a label. If you are curious to learn more about any default transformations to the chart metrics, please have reference the configmap template.

Kafka Exporter

The Kafka Exporter is a complementary metrics exporter to the JMX Exporter. The Kafka Exporter provides additional statistics on Kafka Consumer Groups.