DEVELOPER.md
This document provides information useful to developers working on confluent-kafka-python.
Fork and Clone
git clone https://github.com/your-username/confluent-kafka-python.git
cd confluent-kafka-python
Create Virtual Environment
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
Note: On Windows the variables for Visual Studio are named INCLUDE and LIB
See the main README.md for platform-specific installation instructions.
If librdkafka is installed in a non-standard location provide the include and library directories with:
C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build
Install confluent-kafka-python (editable) with dev/test/docs extras
pip3 install -e .[dev,tests,docs]
Alternatively you can build the bundle independently with:
python3 -m build
Verify Setup
python3 -c "import confluent_kafka; print('Setup successful!')"
Alternative setup instructions tested with python 3.11
# Modify pyproject.toml to require python version >=3.11
# This fixes the cel-python dependency conflict
uv venv --python 3.11
source .venv/bin/activate
uv sync --extra dev --extra tests
uv pip install trivup setuptools
pytest tests/
# When making changes, change project.version in pyproject.toml before re-running:
uv sync --extra dev --extra tests
src/confluent_kafka/ — core sync client APIssrc/confluent_kafka/aio/ — AsyncIO Producer/Consumer (first-class asyncio, not generated)src/confluent_kafka/schema_registry/ — Schema Registry clients and serdestests/ — unit and integration tests (including async producer tests)examples/ — runnable samples (includes asyncio example)tools/unasync.py — SR-only sync code generation from async sourcesInstall docs dependencies:
pip3 install .[docs]
Build HTML docs:
make docs
Documentation will be generated in docs/_build/.
or:
python3 setup.py build_sphinx
Documentation will be generated in build/sphinx/html.
python3 tools/unasync.py
# Run the script with the --check flag to ensure the sync code is up to date
python3 tools/unasync.py --check
If you make any changes to the async code (in src/confluent_kafka/schema_registry/_async and tests/integration/schema_registry/_async), you must run this script to generate the sync counterparts (in src/confluent_kafka/schema_registry/_sync and tests/integration/schema_registry/_sync). Otherwise, this script will be run in CI with the --check flag and fail the build.
Note: The AsyncIO Producer/Consumer under src/confluent_kafka/aio/ are first-class asyncio implementations and are not generated using unasync.
Source:
src/confluent_kafka/aio/producer/_AIOProducer.py (public async API)src/confluent_kafka/aio/producer/ and helpers in src/confluent_kafka/aio/_common.pyFor a complete usage example, see examples/asyncio_example.py.
Architecture: See the AIOProducer Architecture Overview for component design and data flow details.
Design guidelines:
_common.async_call with a ThreadPoolExecutor.error_cb, throttle_cb, stats_cb, oauth_cb, logger) onto the event loop using _common.wrap_common_callbacks.await producer.flush() and await producer.close() are called in shutdown paths to stop background tasks.Performance considerations:
Event loop safety:
_common.async_call.asyncio.run_coroutine_threadsafe to re-enter the loop if invoked from non-loop threads.Unit tests:
pytest -q
Run async producer tests only:
pytest -q tests/test_AIOProducer.py
pytest -q -k AIOProducer
Integration tests (may require local/CI Kafka cluster; see tests/README.md):
pytest -q tests/integration
See tests/README.md for instructions on how to run tests.
We use automated tools to maintain consistent code style:
# Check formatting
make style-check
# Fix formatting
make style-fix
# Check only changed files
make style-check-changed
make style-fix-changed
# Check formatting
tox -e black,isort
# Check linting
tox -e flake8
# Check typing
tox -e mypy
# Run all formatting and linting checks
tox -e black,isort,flake8,mypy
See “Generate Documentation” above; ensure examples and code blocks compile where applicable.
feature/asyncio-improvements).C_INCLUDE_PATH and LIBRARY_PATH.await producer.close() is called to stop background tasks.