docs/kip-848-migration-guide.md
Starting with confluent-kafka-python 2.12.0 (GA release), the next generation consumer group rebalance protocol defined in KIP-848 is production-ready.
Note: The new consumer group protocol defined in KIP-848 is not enabled by default. There are few contract change associated with the new protocol and might cause breaking changes. group.protocol configuration property dictates whether to use the new consumer protocol or older classic protocol. It defaults to classic if not provided.
What changed:
The Group Leader role (consumer member) is removed. Assignments are calculated by the Group Coordinator (broker) and distributed via heartbeats.
Requirements:
Enablement (client-side):
group.protocol=consumergroup.remote.assignor=<assignor> (optional; broker-controlled if unset; default broker assignor is uniform)All KIP-848 features are supported including:
classic protocol or downgrade from consumer protocol| Classic Protocol (Deprecated Configs in KIP-848) | KIP-848 / Next-Gen Replacement |
|---|---|
partition.assignment.strategy | group.remote.assignor |
session.timeout.ms | Broker config: group.consumer.session.timeout.ms |
heartbeat.interval.ms | Broker config: group.consumer.heartbeat.interval.ms |
group.protocol.type | Not used in the new protocol |
Note: The properties listed under “Classic Protocol (Deprecated Configs in KIP-848)” are no longer used when using the KIP-848 consumer protocol.
consumer.incremental_assign(partitions) to assign new partitionsconsumer.incremental_unassign(partitions) to revoke partitionsconsumer.assign() or consumer.unassign() when using group.protocol='consumer' (KIP-848).partitions list passed to incremental_assign() and incremental_unassign() contains only the incremental changes — partitions being added or revoked — not the full assignment, as was the case with assign() in the classic protocol.range, which was not sticky in the classic protocol.group.instance.id handling:
group.instance.id.close() or unsubscribe with auto-commit enabled:
UNKNOWN_TOPIC_OR_PART (subscription case):
TOPIC_AUTHORIZATION_FAILED:
group.remote.assignor and broker-controlled session/heartbeat configsgroup.instance.idclassic and consumer protocols# Optional; default is 'classic'
group.protocol=classic
partition.assignment.strategy=<range,roundrobin,sticky>
session.timeout.ms=45000
heartbeat.interval.ms=15000
group.protocol=consumer
# Optional: select a remote assignor
# Valid options currently: 'uniform' or 'range'
# group.remote.assignor=<uniform,range>
# If unset, broker chooses the assignor (default: 'uniform')
# Session & heartbeat now controlled by broker:
# group.consumer.session.timeout.ms
# group.consumer.heartbeat.interval.ms
# Rebalance Callback for Range Assignor (Classic Protocol)
def on_assign(consumer, partitions):
# Full partition list is provided under the classic protocol
print(f"[Classic] Assigned partitions: {partitions}")
consumer.assign(partitions) # Optional: client handles if not used
def on_revoke(consumer, partitions):
print(f"[Classic] Revoked partitions: {partitions}")
consumer.unassign() # Optional: client handles if not used
# Rebalance callback for incremental assignor
def on_assign(consumer, partitions):
# Only incremental partitions are passed here (not full list)
print(f"[KIP-848] Incrementally assigning: {partitions}")
consumer.incremental_assign(partitions) # Optional: client handles if not used
def on_revoke(consumer, partitions):
print(f"[KIP-848] Incrementally revoking: {partitions}")
consumer.incremental_unassign(partitions) # Optional: client handles if not used
Note: The partitions list contains only partitions being added or revoked, not the full partition list as in the classic consumer.assign().
classic consumers runs under the classic protocol.consumer protocol member joins.consumer protocol member leaves while classic members remain.group.protocol=consumergroup.remote.assignor; leave unspecified for broker-controlled (default: uniform), valid options: uniform or rangegroup.instance.id)