docs/kip-848-migration-guide.rst
Starting with confluent-kafka-python 2.12.0 (GA release), the next generation consumer group rebalance protocol defined in KIP-848 <https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol>_ is production-ready.
Note: The new consumer group protocol defined in KIP-848 <https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol>_ is not enabled by default. There are few contract change associated with the new protocol and might cause breaking changes. group.protocol configuration property dictates whether to use the new consumer protocol or older classic protocol. It defaults to classic if not provided.
Overview
What changed:
The Group Leader role (consumer member) is removed. Assignments are calculated by the Group Coordinator (broker) and distributed via heartbeats.
Requirements:
Enablement (client-side):
group.protocol=consumergroup.remote.assignor=<assignor> (optional; broker-controlled
if unset; default broker assignor is uniform)Available Features
All KIP-848 <https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol>_ features are supported including:
classic protocol or downgrade from consumer
protocolContract Changes
Client Configuration changes ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+------------------------------------+-----------------------------------------+
| Classic Protocol (Deprecated | KIP-848 / Next-Gen Replacement |
| Configs in KIP-848) | |
+====================================+=========================================+
| partition.assignment.strategy | group.remote.assignor |
+------------------------------------+-----------------------------------------+
| session.timeout.ms | Broker config: |
| | group.consumer.session.timeout.ms |
+------------------------------------+-----------------------------------------+
| heartbeat.interval.ms | Broker config: |
| | group.consumer.heartbeat.interval.ms|
+------------------------------------+-----------------------------------------+
| group.protocol.type | Not used in the new protocol |
+------------------------------------+-----------------------------------------+
Note: The properties listed under “Classic Protocol (Deprecated Configs in KIP-848)” are no longer used when using the KIP-848 consumer protocol.
Rebalance Callback Changes ^^^^^^^^^^^^^^^^^^^^^^^^^^
The protocol is fully incremental in KIP-848.
In the rebalance callbacks, you must only use (optional - if not used, client will handle it internally):
consumer.incremental_assign(partitions) to assign new
partitionsconsumer.incremental_unassign(partitions) to revoke partitionsDo not use consumer.assign() or consumer.unassign() when
using group.protocol='consumer' (KIP-848).
If you don't provide incremental assign/unassign inside rebalance callbacks, the client will automatically use incremental assign/unassign internally.
⚠️ The partitions list passed to incremental_assign() and
incremental_unassign() contains only the incremental changes
— partitions being added or revoked — not the full
assignment, as was the case with assign() in the classic
protocol.
All assignors under KIP-848 are now sticky, including range,
which was not sticky in the classic protocol.
Static Group Membership ^^^^^^^^^^^^^^^^^^^^^^^
Duplicate group.instance.id handling:
Implications:
group.instance.id.Session Timeout & Fetching ^^^^^^^^^^^^^^^^^^^^^^^^^^
Session timeout is broker-controlled:
In the classic protocol, the client stopped fetching when session timeout expired.
Closing / Auto-Commit ^^^^^^^^^^^^^^^^^^^^^
On close() or unsubscribe with auto-commit enabled:
Error Handling Changes ^^^^^^^^^^^^^^^^^^^^^^
UNKNOWN_TOPIC_OR_PART (subscription case):
TOPIC_AUTHORIZATION_FAILED:
Summary of Key Differences (Classic vs Next-Gen) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
group.remote.assignor and broker-controlled session/heartbeat
configsgroup.instance.idclassic and consumer protocolsMinimal Example Config
Classic Protocol ^^^^^^^^^^^^^^^^
.. code:: properties
group.protocol=classic
partition.assignment.strategy=<range,roundrobin,sticky> session.timeout.ms=45000 heartbeat.interval.ms=15000
Next-Gen Protocol / KIP-848 ^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code:: properties
group.protocol=consumer
Rebalance Callback Migration
Range Assignor (Classic) ^^^^^^^^^^^^^^^^^^^^^^^^
.. code:: python
def on_assign(consumer, partitions): # Full partition list is provided under the classic protocol print(f"[Classic] Assigned partitions: {partitions}") consumer.assign(partitions) # Optional: client handles if not used
def on_revoke(consumer, partitions): print(f"[Classic] Revoked partitions: {partitions}") consumer.unassign() # Optional: client handles if not used
Incremental Assignor (Including Range in Consumer / KIP-848, Any Protocol) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code:: python
def on_assign(consumer, partitions): # Only incremental partitions are passed here (not full list) print(f"[KIP-848] Incrementally assigning: {partitions}") consumer.incremental_assign(partitions) # Optional: client handles if not used
def on_revoke(consumer, partitions): print(f"[KIP-848] Incrementally revoking: {partitions}") consumer.incremental_unassign(partitions) # Optional: client handles if not used
Note: The partitions list contains only partitions being added or revoked, not the full partition list as in the classic consumer.assign().
Upgrade and Downgrade
classic consumers runs under the
classic protocol.consumer protocol member joins.consumer protocol member leaves while classic members remain.Migration Checklist (Next-Gen Protocol / KIP-848 <https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol>_)
group.protocol=consumergroup.remote.assignor; leave unspecified for
broker-controlled (default: uniform), valid options: uniform
or rangegroup.instance.id)