fix(confluent): Fix KIP-848 consumer assignment and seek handling#525
fix(confluent): Fix KIP-848 consumer assignment and seek handling#525
Conversation
With the KIP-848 consumer group protocol, calling seek() inside the on_assign callback fails with _UNKNOWN_PARTITION because rdkafka hasn't fully registered the partition yet. Fix this by deferring incremental_assign until after the user's on_assign callback, so any seek() calls made in the callback are incorporated into the starting offsets passed to rdkafka. Also fix the test_kip848_e2e topic cleanup to handle pre-existing topics from unclean test teardown, remove the xfail marker from test_pause_resume_rebalancing (it now passes), and bump confluent-kafka to >=2.13.2 with Kafka 4+ (cp-kafka:8.0.0) required for KIP-848 support. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The __pending_seeks block in poll() was never reached: seeks deferred during on_assign are cleared in the assignment callback's finally block (after incremental_assign), before poll() can execute. Remove it along with a leftover commented-out STALE_MEMBER_EPOCH line. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
After the previous cleanup, __pending_seeks was written to in seek() but never read — its values were unused since incremental_assign reads from __offsets instead. Remove the dict and all its callsites. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
arroyo/backends/kafka/consumer.py
Outdated
| for partition, offset in offsets.items() | ||
| ] | ||
| if self.__is_cooperative_sticky: | ||
| if self.__is_kip848: |
There was a problem hiding this comment.
With cooperative-sticky, __assign is called from the assignment_callback before on_assign, and calling incremental_assign there is safe — rdkafka processes it immediately and seek() works fine afterwards inside the callback.
With KIP-848, calling incremental_assign at that point and then calling seek() inside the user's on_assign callback fails with _UNKNOWN_PARTITION. The root cause is that with KIP-848, rdkafka hasn't fully committed the incremental assignment to its internal state by the time the Python callback returns control to user code — so the partition isn't yet seekable.
The fix is to defer incremental_assign until after on_assign completes (in the finally block), at which point we use self.__offsets — which already reflects any seek() calls the user made — as the starting offsets. This way, we never need to call seek() at all; the desired position is baked directly into the incremental_assign call.
So the pass here means: skip incremental_assign for now, because it will be called with the correct final offsets later in the finally block, after on_assign has run.
From Claude
There was a problem hiding this comment.
With KIP-848, calling incremental_assign at that point and then calling seek() inside the user's on_assign callback fails with _UNKNOWN_PARTITION.
Do we actually call seek() from the user provided callback ? Do you know about use cases where that is legit ?
There was a problem hiding this comment.
No we don't have this use case. SImplified the code with that in mind.
fpacifici
left a comment
There was a problem hiding this comment.
Some clarifications about the protocol.
I would consider removing all the sticky cooperative protocol. We are unlikely to ever rely on that.
arroyo/backends/kafka/consumer.py
Outdated
| for partition, offset in offsets.items() | ||
| ] | ||
| if self.__is_cooperative_sticky: | ||
| if self.__is_kip848: |
There was a problem hiding this comment.
With KIP-848, calling incremental_assign at that point and then calling seek() inside the user's on_assign callback fails with _UNKNOWN_PARTITION.
Do we actually call seek() from the user provided callback ? Do you know about use cases where that is legit ?
Remove cooperative-sticky rebalancing support, which was never used in practice and never worked properly. KIP-848 is the correct path for incremental rebalancing going forward. Simplify the KIP-848 incremental_assign flow: remove the seek()-in-on_assign workaround since no callers use that pattern. The finally block now uses the resolved offsets directly. Replace or-defaulting on topic/partition/offset with asserts in the EndOfPartition path, since these should always be populated. Upgrade mypy to 1.19.1 and update pre-commit hook to match, adding confluent-kafka as a dependency so type stubs are available. Configure the mypy hook to always check the full project for consistent results. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ed messages Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Update mypy.ini, CI linting and typing jobs to use Python 3.13. Suppress two new strict covariant TypeVar errors in types.py where the covariance is semantically correct but mypy 3.13 disallows covariant TypeVars in class attribute declarations. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Move the incremental_assign call to after offset resolution but before the on_assign callback fires. This allows seeks performed inside on_assign to succeed, since the partition is now registered with rdkafka at that point. Previously, incremental_assign was deferred to the finally block after on_assign, which meant any seek() call inside on_assign would fail with _UNKNOWN_PARTITION. Calling it at the very top of the callback (inside __assign) also fails, but calling it after committed() and offset resolution are complete works correctly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@fpacifici On further review some of the tests were failing because they did |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| KafkaPayload( | ||
| message.key(), | ||
| message.value(), | ||
| message.value() or b"", |
There was a problem hiding this comment.
Tombstone messages silently converted to empty bytes
Medium Severity
message.value() or b"" converts Kafka tombstone messages (where value() returns None) into empty bytes b"", making them indistinguishable from legitimately empty messages. On compacted topics, tombstones signal record deletion, and any downstream logic that needs to detect tombstones loses that signal. While KafkaPayload.value is typed as bytes, the previous code did pass None through at runtime, so any existing consumers relying on that for tombstone detection will silently break.


Fix several bugs when using confluent-kafka 2.13.2+ with Kafka 4+ and the KIP-848 consumer group protocol.
Core fix: KIP-848 assignment and seek handling
With the old rebalancing protocols (eager and cooperative-sticky),
assign/incremental_assignis called inside__assign()at the top of the assignment callback, and anyseek()calls made by the user inon_assignwork correctly because rdkafka already has the partition registered.With KIP-848, calling
incremental_assignthat early (before offset resolution) fails with_UNKNOWN_PARTITION. The previous fix deferredincremental_assignto afinallyblock afteron_assign, but this meant anyseek()call insideon_assignalso failed with_UNKNOWN_PARTITIONsince the partition still wasn't registered at that point.The correct placement is after offset resolution but before
on_assignfires. At that point rdkafka has fully committed the assignment internally, so bothincremental_assignand any subsequentseek()calls inon_assignsucceed.Other changes
confluent-kafkato>=2.13.2; KIP-848 requires Kafka 4+, so updaterun-kafka.shto usecp-kafka:8.0.0withKAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS=classic,consumertest_kip848_e2eto handle a pre-existing topic from unclean test teardownxfailmarker fromTestKafkaStreamsKip848::test_pause_resume_rebalancing— it now passes