docs/content/stable/develop/tutorials/cdc-tutorials/cdc-confluent-cloud.md
This tutorial describes how to configure a YugabyteDB connector to publish changes to Confluent Cloud, and assumes some familiarity with Docker and YAML files.
Set up a Confluent Cloud cluster on the provider of your choice. For more information, refer to Manage Kafka Clusters on Confluent Cloud in the Confluent documentation.
Create an API key for your Confluent cluster and save it. Refer to Use API Keys to Control Access in Confluent Cloud for instructions.
To create a Kafka Connect image with the YugabyteDB connector, start with the Confluent Server Docker Image for Kafka Connect.
Create a directory which will be used to store all related files.
mkdir kafka-connect-ccloud && cd kafka-connect-ccloud
Download the YugabyteDB connector jar.
curl -so debezium-connector-yugabytedb-dz.1.9.5.yb.grpc.2024.2.2.jar https://github.com/yugabyte/debezium-connector-yugabytedb/releases/download/vdz.1.9.5.yb.grpc.2024.2.2/debezium-connector-yugabytedb-dz.1.9.5.yb.grpc.2024.2.2.jar
Create a Dockerfile with the following contents:
FROM confluentinc/cp-server-connect:7.4.0
ADD debezium-connector-yugabytedb-dz.1.9.5.yb.grpc.2024.2.2.jar /usr/share/java/kafka/
USER 1001
To build the image, execute the following command:
docker build . -t custom-connect:latest
Create a docker-compose.yaml file with the following contents:
version: '3'
services:
kafka-CCLOUD-BROKER-ENDPOINT:
container_name: custom-connect:latest
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
CONNECT_BOOTSTRAP_SERVERS: "CCLOUD-BROKER-ENDPOINT.confluent.cloud:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-ccloud'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-group-01-v04
CONNECT_CONFIG_STORAGE_TOPIC: kafka-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: kafka-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/usr/share/java/kafka/'
# Confluent Cloud config
CONNECT_REQUEST_TIMEOUT_MS: "20000"
CONNECT_RETRY_BACKOFF_MS: "500"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_SASL_MECHANISM: "PLAIN"
CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username='CCLOUD_USER' password='CCLOUD_PASSWORD';"
#
CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username='CCLOUD_USER' password='CCLOUD_PASSWORD';"
CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
#
CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username='CCLOUD_USER' password='CCLOUD_PASSWORD';"
CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
command:
- bash
- -c
- |
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
fi
sleep 5
done
#
sleep infinity
This configuration uses SASL for client authentication. For more information about using SASL with Confluent, refer to Authentication with SASL using JAAS in the Confluent documentation.
{{< note title="Using Schema Registry" >}}
If the configuration is such that it needs schema registry as well, then you need to add the following environment variables to the Docker compose file:
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "https://SCHEMA-RGISTRY-CCLOUD-ENDPOINT.confluent.cloud"
CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "https://SCHEMA-REGISTRY-CCLOUD-ENDPOINT.confluent.cloud"
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
For more details on the usage of schema registry, refer to the Confluent documentation.
{{< /note >}}
{{< note title="Using authentication and authorization" >}}
To configure authentication, follow the instructions in Configure Authentication for Confluent Platform with Ansible Playbooks.
To configure authorization, follow the instructions in Configure Authorization for Confluent Platform with Ansible Playbooks.
{{< /note >}}
Start the Kafka Connect cluster:
docker compose up
Deploy the connector. For more information, refer to Deployment.