examples/README.md
The scripts in this directory provide various examples of using the Confluent Python client for Kafka. While these examples work with any Kafka deployment, they're optimized for Confluent Cloud and Confluent Platform which provide additional features, security, and enterprise support.
Why These Examples Use confluent-kafka-python: Unlike the basic Apache Kafka Python client (kafka-python), this client provides production-ready performance via librdkafka, built-in Schema Registry integration, AsyncIO support, and enterprise features like transactions and exactly-once semantics.
with statement) usage for Producer, Consumer, and AdminClient - shows automatic resource cleanup when exiting the with block.AsyncSchemaRegistryClient and AsyncAvroSerializer (supports Confluent Cloud using --sr-api-key/--sr-api-secret).Architecture: For implementation details and component design, see the AIOProducer Architecture Overview.
The AsyncIO producer works seamlessly with popular Python web frameworks:
FastAPI/Starlette:
from fastapi import FastAPI
from confluent_kafka.aio import AIOProducer
app = FastAPI()
producer = None
@app.on_event("startup")
async def startup():
global producer
producer = AIOProducer({"bootstrap.servers": "localhost:9092"})
@app.post("/events")
async def create_event(data: dict):
delivery_future = await producer.produce("events", value=str(data))
message = await delivery_future
return {"offset": message.offset()}
aiohttp:
from aiohttp import web
from confluent_kafka.aio import AIOProducer
async def init_app():
app = web.Application()
app['producer'] = AIOProducer({"bootstrap.servers": "localhost:9092"})
return app
async def handle_event(request):
producer = request.app['producer']
delivery_future = await producer.produce("events", value="data")
message = await delivery_future
return web.json_response({"offset": message.offset()})
For more details, see Integrating Apache Kafka With Python Asyncio Web Applications.
The AsyncIO producer and consumer work seamlessly with async Schema Registry serializers:
from confluent_kafka.aio import AIOProducer
from confluent_kafka.schema_registry import AsyncSchemaRegistryClient
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer
async def setup_async_avro_producer():
schema_registry_client = AsyncSchemaRegistryClient({"url": "http://localhost:8081"})
avro_serializer = await AsyncAvroSerializer(
schema_registry_client=schema_registry_client,
schema_str=user_schema
)
producer = AIOProducer({"bootstrap.servers": "localhost:9092"})
# Serialize and produce
serialized_data = await avro_serializer(user_data, SerializationContext("users", MessageField.VALUE))
delivery_future = await producer.produce("users", value=serialized_data)
message = await delivery_future
AsyncAvroSerializer / AsyncAvroDeserializerAsyncJSONSerializer / AsyncJSONDeserializerAsyncProtobufSerializer / AsyncProtobufDeserializerNote: Async deserialization follows the same pattern; use the corresponding Async*Deserializer with AIOConsumer.
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer, AsyncAvroDeserializer
from confluent_kafka.schema_registry._async.json_schema import AsyncJSONSerializer, AsyncJSONDeserializer
from confluent_kafka.schema_registry._async.protobuf import AsyncProtobufSerializer, AsyncProtobufDeserializer
API Consistency: The async Schema Registry client and serializers maintain 100% interface parity with their synchronous counterparts. All configuration options, method signatures, and behaviors are identical - just add await and use the Async prefixed classes.
For Client-Side Field Level Encryption (CSFLE) with Data Contract rules, see the encryption examples:
avro_producer_encryption.pyjson_producer_encryption.pyprotobuf_producer_encryption.pyThis client provides serializers for Avro, JSON Schema, and Protobuf that integrate with Schema Registry on both Confluent Platform and Confluent Cloud. Both synchronous and asynchronous versions are available.
# For Confluent Platform (local)
schema_registry_conf = {'url': 'http://localhost:8081'}
# For Confluent Cloud
# schema_registry_conf = {
# 'url': 'https://<ccloud-schema-registry-url>',
# 'basic.auth.user.info': '<sr-api-key>:<sr-api-secret>'
# }
Notes (Confluent Cloud): Find the Schema Registry URL and create an SR API key/secret in the Confluent Cloud Console (Cluster -> Schema Registry). Reference: Confluent Cloud Schema Registry docs
Note for Confluent Cloud Users:
Use synchronous serializers (AvroSerializer, JSONSerializer, ProtobufSerializer) directly in the Producer configuration. The serializer handles schema registration and caching automatically.
# From examples/avro_producer.py
schema_registry_conf = {'url': 'http://localhost:8081'}
# For Confluent Cloud you can pass `--sr-api-key` and `--sr-api-secret` to the script.
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_serializer = AvroSerializer(schema_registry_client,
user_schema,
lambda user, ctx: user.to_dict())
producer_conf = {
'bootstrap.servers': 'localhost:9092',
}
producer = Producer(producer_conf)
# Simplified example of serialize and produce
serialized_value = avro_serializer(some_user_object)
producer.produce('my-topic', key='user1', value=serialized_value)
producer.flush()
Use async serializers with AIOProducer and AIOConsumer. Note that you must
instantiate the serializer and then call it to serialize the data before
producing.
# From examples/README.md
from confluent_kafka.aio import AIOProducer
from confluent_kafka.schema_registry import AsyncSchemaRegistryClient
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer
async def setup_async_avro_producer():
# Use the appropriate configuration from the section above
schema_registry_conf = {"url": "http://localhost:8081"}
schema_registry_client = AsyncSchemaRegistryClient(schema_registry_conf)
avro_serializer = await AsyncAvroSerializer(
schema_registry_client=schema_registry_client,
schema_str=user_schema
)
producer = AIOProducer({"bootstrap.servers": "localhost:9092"})
# Serialize and produce
serialized_data = await avro_serializer(user_data, SerializationContext("users", MessageField.VALUE))
delivery_future = await producer.produce("users", value=serialized_data)
message = await delivery_future
AsyncAvroSerializer / AsyncAvroDeserializerAsyncJSONSerializer / AsyncJSONDeserializerAsyncProtobufSerializer / AsyncProtobufDeserializerNote: Async deserialization follows the same pattern; use the corresponding Async*Deserializer with AIOConsumer.
from confluent_kafka.schema_registry._async.avro import AsyncAvroSerializer, AsyncAvroDeserializer
from confluent_kafka.schema_registry._async.json_schema import AsyncJSONSerializer, AsyncJSONDeserializer
from confluent_kafka.schema_registry._async.protobuf import AsyncProtobufSerializer, AsyncProtobufDeserializer
API Consistency: The async Schema Registry client and serializers maintain 100% interface parity with their synchronous counterparts. All configuration options, method signatures, and behaviors are identical - just add await and use the Async prefixed classes.
Most examples require a running Kafka cluster. You can use:
docker/ subdirectory)# Most examples follow this pattern:
python3 <example_name>.py <bootstrap_servers> [additional_args]
# For example:
python3 producer.py localhost:9092
python3 consumer.py localhost:9092
python3 asyncio_example.py localhost:9092 my-topic
For Avro, JSON, and Protobuf examples, you'll also need a Schema Registry:
python3 avro_producer.py localhost:9092 http://localhost:8081
Check each example's source code for specific command-line arguments and configuration requirements.
It's usually a good idea to install Python dependencies in a virtual environment to avoid conflicts between projects.
To setup a venv with the latest release version of confluent-kafka and dependencies of all examples installed:
python3 -m venv venv_examples
source venv_examples/bin/activate
python3 -m pip install confluent_kafka
python3 -m pip install -r requirements/requirements-examples.txt
To setup a venv that uses the current source tree version of confluent_kafka, you need to have a C compiler and librdkafka installed (from a package, or from source). Then:
python3 -m venv venv_examples
source venv_examples/bin/activate
python3 -m pip install .[examples]
When you're finished with the venv:
deactivate