Skip to content

fix(confluent): Fix KIP-848 consumer assignment and seek handling#525

Merged
evanh merged 10 commits intomainfrom
evanh/fix/update-confluent
Mar 25, 2026
Merged

fix(confluent): Fix KIP-848 consumer assignment and seek handling#525
evanh merged 10 commits intomainfrom
evanh/fix/update-confluent

Conversation

@evanh
Copy link
Copy Markdown
Member

@evanh evanh commented Mar 9, 2026

Fix several bugs when using confluent-kafka 2.13.2+ with Kafka 4+ and the KIP-848 consumer group protocol.

Core fix: KIP-848 assignment and seek handling

With the old rebalancing protocols (eager and cooperative-sticky), assign/incremental_assign is called inside __assign() at the top of the assignment callback, and any seek() calls made by the user in on_assign work correctly because rdkafka already has the partition registered.

With KIP-848, calling incremental_assign that early (before offset resolution) fails with _UNKNOWN_PARTITION. The previous fix deferred incremental_assign to a finally block after on_assign, but this meant any seek() call inside on_assign also failed with _UNKNOWN_PARTITION since the partition still wasn't registered at that point.

The correct placement is after offset resolution but before on_assign fires. At that point rdkafka has fully committed the assignment internally, so both incremental_assign and any subsequent seek() calls in on_assign succeed.

Other changes

  • Bump confluent-kafka to >=2.13.2; KIP-848 requires Kafka 4+, so update run-kafka.sh to use cp-kafka:8.0.0 with KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS=classic,consumer
  • Drop Python 3.9 from the CI test matrix
  • Fix test_kip848_e2e to handle a pre-existing topic from unclean test teardown
  • Remove the xfail marker from TestKafkaStreamsKip848::test_pause_resume_rebalancing — it now passes

evanh and others added 3 commits March 9, 2026 16:17
With the KIP-848 consumer group protocol, calling seek() inside the
on_assign callback fails with _UNKNOWN_PARTITION because rdkafka hasn't
fully registered the partition yet. Fix this by deferring incremental_assign
until after the user's on_assign callback, so any seek() calls made in the
callback are incorporated into the starting offsets passed to rdkafka.

Also fix the test_kip848_e2e topic cleanup to handle pre-existing topics
from unclean test teardown, remove the xfail marker from
test_pause_resume_rebalancing (it now passes), and bump confluent-kafka
to >=2.13.2 with Kafka 4+ (cp-kafka:8.0.0) required for KIP-848 support.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The __pending_seeks block in poll() was never reached: seeks deferred
during on_assign are cleared in the assignment callback's finally block
(after incremental_assign), before poll() can execute. Remove it along
with a leftover commented-out STALE_MEMBER_EPOCH line.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
After the previous cleanup, __pending_seeks was written to in seek()
but never read — its values were unused since incremental_assign reads
from __offsets instead. Remove the dict and all its callsites.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
for partition, offset in offsets.items()
]
if self.__is_cooperative_sticky:
if self.__is_kip848:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With cooperative-sticky, __assign is called from the assignment_callback before on_assign, and calling incremental_assign there is safe — rdkafka processes it immediately and seek() works fine afterwards inside the callback.

With KIP-848, calling incremental_assign at that point and then calling seek() inside the user's on_assign callback fails with _UNKNOWN_PARTITION. The root cause is that with KIP-848, rdkafka hasn't fully committed the incremental assignment to its internal state by the time the Python callback returns control to user code — so the partition isn't yet seekable.

The fix is to defer incremental_assign until after on_assign completes (in the finally block), at which point we use self.__offsets — which already reflects any seek() calls the user made — as the starting offsets. This way, we never need to call seek() at all; the desired position is baked directly into the incremental_assign call.

So the pass here means: skip incremental_assign for now, because it will be called with the correct final offsets later in the finally block, after on_assign has run.

From Claude

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With KIP-848, calling incremental_assign at that point and then calling seek() inside the user's on_assign callback fails with _UNKNOWN_PARTITION.

Do we actually call seek() from the user provided callback ? Do you know about use cases where that is legit ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we don't have this use case. SImplified the code with that in mind.

@evanh evanh marked this pull request as ready for review March 11, 2026 19:58
@evanh evanh requested review from a team as code owners March 11, 2026 19:58
Copy link
Copy Markdown
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some clarifications about the protocol.
I would consider removing all the sticky cooperative protocol. We are unlikely to ever rely on that.

for partition, offset in offsets.items()
]
if self.__is_cooperative_sticky:
if self.__is_kip848:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With KIP-848, calling incremental_assign at that point and then calling seek() inside the user's on_assign callback fails with _UNKNOWN_PARTITION.

Do we actually call seek() from the user provided callback ? Do you know about use cases where that is legit ?

evanh and others added 2 commits March 12, 2026 10:56
Remove cooperative-sticky rebalancing support, which was never used
in practice and never worked properly. KIP-848 is the correct path for
incremental rebalancing going forward.

Simplify the KIP-848 incremental_assign flow: remove the seek()-in-on_assign
workaround since no callers use that pattern. The finally block now uses
the resolved offsets directly.

Replace or-defaulting on topic/partition/offset with asserts in the
EndOfPartition path, since these should always be populated.

Upgrade mypy to 1.19.1 and update pre-commit hook to match, adding
confluent-kafka as a dependency so type stubs are available. Configure
the mypy hook to always check the full project for consistent results.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ed messages

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
evanh and others added 2 commits March 12, 2026 11:07
Update mypy.ini, CI linting and typing jobs to use Python 3.13.
Suppress two new strict covariant TypeVar errors in types.py where
the covariance is semantically correct but mypy 3.13 disallows
covariant TypeVars in class attribute declarations.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
evanh and others added 2 commits March 17, 2026 11:06
Move the incremental_assign call to after offset resolution but before
the on_assign callback fires. This allows seeks performed inside on_assign
to succeed, since the partition is now registered with rdkafka at that point.

Previously, incremental_assign was deferred to the finally block after
on_assign, which meant any seek() call inside on_assign would fail with
_UNKNOWN_PARTITION. Calling it at the very top of the callback (inside
__assign) also fails, but calling it after committed() and offset resolution
are complete works correctly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@evanh
Copy link
Copy Markdown
Member Author

evanh commented Mar 17, 2026

Do we actually call seek() from the user provided callback ? Do you know about use cases where that is legit ?

@fpacifici On further review some of the tests were failing because they did seek during the callback. So I added back the functionality to allow that.

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

KafkaPayload(
message.key(),
message.value(),
message.value() or b"",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tombstone messages silently converted to empty bytes

Medium Severity

message.value() or b"" converts Kafka tombstone messages (where value() returns None) into empty bytes b"", making them indistinguishable from legitimately empty messages. On compacted topics, tombstones signal record deletion, and any downstream logic that needs to detect tombstones loses that signal. While KafkaPayload.value is typed as bytes, the previous code did pass None through at runtime, so any existing consumers relying on that for tombstone detection will silently break.

Fix in Cursor Fix in Web

@evanh evanh merged commit 62145b8 into main Mar 25, 2026
17 checks passed
@evanh evanh deleted the evanh/fix/update-confluent branch March 25, 2026 20:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants