From 47dfb0547f9db1f6eef9530772bd0685fcafe345 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Mon, 9 Mar 2026 16:17:36 -0400 Subject: [PATCH 1/8] fix(confluent): Fix KIP-848 consumer assignment and seek handling With the KIP-848 consumer group protocol, calling seek() inside the on_assign callback fails with _UNKNOWN_PARTITION because rdkafka hasn't fully registered the partition yet. Fix this by deferring incremental_assign until after the user's on_assign callback, so any seek() calls made in the callback are incorporated into the starting offsets passed to rdkafka. Also fix the test_kip848_e2e topic cleanup to handle pre-existing topics from unclean test teardown, remove the xfail marker from test_pause_resume_rebalancing (it now passes), and bump confluent-kafka to >=2.13.2 with Kafka 4+ (cp-kafka:8.0.0) required for KIP-848 support. Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/ci.yml | 2 +- arroyo/backends/kafka/consumer.py | 90 +++++++++++++++++++++++-------- requirements.txt | 2 +- tests/backends/test_kafka.py | 4 -- tests/test_kip848_e2e.py | 9 ++++ 5 files changed, 80 insertions(+), 27 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a8db56a6..b3dd28a5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -48,7 +48,7 @@ jobs: strategy: max-parallel: 5 matrix: - python: [3.9, "3.10", "3.11", "3.12", "3.13"] + python: ["3.10", "3.11", "3.12", "3.13"] timeout-minutes: 10 steps: - uses: actions/checkout@v2 diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index c6b2e1a4..07a39903 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -163,7 +163,7 @@ def __init__( KafkaError.REQUEST_TIMED_OUT, KafkaError.NOT_COORDINATOR, KafkaError._WAIT_COORD, - KafkaError.STALE_MEMBER_EPOCH, # kip-848 + # KafkaError.STALE_MEMBER_EPOCH, # kip-848 KafkaError.COORDINATOR_LOAD_IN_PROGRESS, ) @@ -174,9 +174,10 @@ def __init__( and e.args[0].code() in retryable_errors, ) + self.__is_kip848 = configuration.get("group.protocol") == "consumer" self.__is_cooperative_sticky = ( configuration.get("partition.assignment.strategy") == "cooperative-sticky" - or configuration.get("group.protocol") == "consumer" + or self.__is_kip848 ) auto_offset_reset = configuration.get("auto.offset.reset", "largest") @@ -251,6 +252,8 @@ def __init__( self.__offsets: MutableMapping[Partition, int] = {} self.__staged_offsets: MutableMapping[Partition, int] = {} self.__paused: Set[Partition] = set() + # Seeks deferred during on_assign for KIP-848 (applied before next poll) + self.__pending_seeks: MutableMapping[Partition, int] = {} self.__commit_retry_policy = commit_retry_policy @@ -335,7 +338,7 @@ def subscribe( raise InvalidState(self.__state) def assignment_callback( - consumer: ConfluentConsumer, partitions: Sequence[ConfluentTopicPartition] + consumer: ConfluentConsumer, partitions: list[ConfluentTopicPartition] ) -> None: if not partitions: logger.info("skipping empty assignment") @@ -374,15 +377,33 @@ def assignment_callback( if on_assign is not None: on_assign(offsets) finally: + if self.__is_kip848: + # For KIP-848, incremental_assign is called here (after the + # user's on_assign callback) so that any seek() calls made + # in the callback are incorporated into the starting offsets. + # Calling seek() inside on_assign for KIP-848 fails because + # rdkafka hasn't processed the partition yet at that point. + kip848_partitions = [ + ConfluentTopicPartition( + p.topic.name, p.index, self.__offsets[p] + ) + for p in offsets + if p in self.__offsets + ] + self.__consumer.incremental_assign(kip848_partitions) + self.__pending_seeks.clear() self.__state = KafkaConsumerState.CONSUMING logger.info("Paused partitions after assignment: %s", self.__paused) def revocation_callback( - consumer: ConfluentConsumer, partitions: Sequence[ConfluentTopicPartition] + consumer: ConfluentConsumer, + confluent_partitions: Sequence[ConfluentTopicPartition], ) -> None: self.__state = KafkaConsumerState.REVOKING - partitions = [Partition(Topic(i.topic), i.partition) for i in partitions] + partitions = [ + Partition(Topic(i.topic), i.partition) for i in confluent_partitions + ] try: if on_revoke is not None: @@ -411,6 +432,7 @@ def revocation_callback( ) self.__paused.discard(partition) + self.__pending_seeks.pop(partition, None) self.__state = KafkaConsumerState.CONSUMING logger.info("Paused partitions after revocation: %s", self.__paused) @@ -465,6 +487,20 @@ def poll( if self.__state is not KafkaConsumerState.CONSUMING: raise InvalidState(self.__state) + # For KIP-848 consumers, seek() calls made inside the on_assign callback + # cannot be applied immediately via rdkafka (the partition isn't yet + # registered as seekable at that point). Instead they are stored in + # __pending_seeks and applied here, before the next rdkafka poll, so + # the consumer fetches from the correct position. + if self.__pending_seeks: + for partition, offset in self.__pending_seeks.items(): + self.__consumer.seek( + ConfluentTopicPartition( + partition.topic.name, partition.index, offset + ) + ) + self.__pending_seeks.clear() + message: Optional[ConfluentMessage] = self.__consumer.poll( *[timeout] if timeout is not None else [] ) @@ -476,8 +512,8 @@ def poll( code = error.code() if code == KafkaError._PARTITION_EOF: raise EndOfPartition( - Partition(Topic(message.topic()), message.partition()), - message.offset(), + Partition(Topic(message.topic() or ""), message.partition() or 0), + message.offset() or 0, ) elif code == KafkaError._TRANSPORT: raise TransportError(str(error)) @@ -493,11 +529,11 @@ def poll( broker_value = BrokerValue( KafkaPayload( message.key(), - message.value(), + message.value() or b"", headers if headers is not None else [], ), - Partition(Topic(message.topic()), message.partition()), - message.offset(), + Partition(Topic(message.topic() or ""), message.partition() or 0), + message.offset() or 0, datetime.utcfromtimestamp(message.timestamp()[1] / 1000.0), ) self.__offsets[broker_value.partition] = broker_value.next_offset @@ -529,7 +565,12 @@ def __assign(self, offsets: Mapping[Partition, int]) -> None: ConfluentTopicPartition(partition.topic.name, partition.index, offset) for partition, offset in offsets.items() ] - if self.__is_cooperative_sticky: + if self.__is_kip848: + # For KIP-848, incremental_assign is called after the on_assign + # callback so user seek()s in the callback are reflected in the + # starting offsets passed to rdkafka. + pass + elif self.__is_cooperative_sticky: self.__consumer.incremental_assign(partitions) else: self.__consumer.assign(partitions) @@ -549,10 +590,17 @@ def seek(self, offsets: Mapping[Partition, int]) -> None: self.__validate_offsets(offsets) - for partition, offset in offsets.items(): - self.__consumer.seek( - ConfluentTopicPartition(partition.topic.name, partition.index, offset) - ) + if self.__is_kip848 and self.__state == KafkaConsumerState.ASSIGNING: + # With KIP-848, seeking inside on_assign fails because rdkafka + # hasn't fully processed incremental_assign yet. Defer until poll(). + self.__pending_seeks.update(offsets) + else: + for partition, offset in offsets.items(): + self.__consumer.seek( + ConfluentTopicPartition( + partition.topic.name, partition.index, offset + ) + ) self.__offsets.update(offsets) def pause(self, partitions: Sequence[Partition]) -> None: @@ -729,7 +777,7 @@ def closed(self) -> bool: @property def member_id(self) -> str: - member_id: str = self.__consumer.memberid() + member_id: str = self.__consumer.memberid() or "" return member_id @@ -738,7 +786,7 @@ def __init__( self, configuration: Mapping[str, Any], use_simple_futures: bool = False ) -> None: self.__configuration = configuration - self.__producer = ConfluentKafkaProducer(configuration) + self.__producer = ConfluentKafkaProducer(dict(configuration)) self.__shutdown_requested = Event() # The worker must execute in a separate thread to ensure that callbacks @@ -815,7 +863,7 @@ def produce( produce( value=payload.value, key=payload.key, - headers=payload.headers, + headers=dict(payload.headers), on_delivery=partial(self.__delivery_callback, future, payload), ) return future @@ -838,7 +886,7 @@ class ConfluentProducer(ConfluentKafkaProducer): # type: ignore[misc] """ def __init__(self, configuration: Mapping[str, Any]) -> None: - super().__init__(configuration) + super().__init__(dict(configuration)) self.producer_name = configuration.get("client.id") or None self.__metrics = get_metrics() self.__produce_counters: MutableMapping[str, int] = defaultdict(int) @@ -873,7 +921,7 @@ def produce(self, *args: Any, **kwargs: Any) -> None: on_delivery = kwargs.pop("on_delivery", None) user_callback = callback or on_delivery wrapped_callback = self.__delivery_callback(user_callback) - super().produce(*args, on_delivery=wrapped_callback, **kwargs) + super().produce(*args, **kwargs, on_delivery=wrapped_callback) def __flush_metrics(self) -> None: for status, count in self.__produce_counters.items(): @@ -887,7 +935,7 @@ def __flush_metrics(self) -> None: ) self.__reset_metrics() - def flush(self, timeout: float = -1) -> int: + def flush(self, timeout: float | None = -1) -> int: # Kafka producer flush should flush metrics too self.__flush_metrics() return cast(int, super().flush(timeout)) diff --git a/requirements.txt b/requirements.txt index 7e4f7dfd..7196e292 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -confluent-kafka>=2.11.0,<2.12.0 +confluent-kafka>=2.13.2 diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index 7fba26dd..f624df66 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -431,10 +431,6 @@ class TestKafkaStreamsIncrementalRebalancing(TestKafkaStreams): class TestKafkaStreamsKip848(TestKafkaStreams): kip_848 = True - @pytest.mark.xfail(reason="To be fixed") - def test_pause_resume_rebalancing(self) -> None: - super().test_pause_resume_rebalancing() - def test_commit_codec() -> None: commit = Commit( diff --git a/tests/test_kip848_e2e.py b/tests/test_kip848_e2e.py index 63f8b983..430da295 100644 --- a/tests/test_kip848_e2e.py +++ b/tests/test_kip848_e2e.py @@ -28,6 +28,15 @@ def get_topic( name = TOPIC configuration = dict(configuration) client = AdminClient(configuration) + + # Delete the topic if it already exists from a previous test run + existing = client.list_topics(timeout=5).topics + if name in existing: + [[key, future]] = client.delete_topics([name]).items() + assert key == name + future.result() + time.sleep(1) + [[key, future]] = client.create_topics( [NewTopic(name, num_partitions=partitions_count, replication_factor=1)] ).items() From 533a25f8165a51ef150767024f701083be486e04 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Tue, 10 Mar 2026 13:31:11 -0400 Subject: [PATCH 2/8] ref(confluent): Remove dead code from KIP-848 poll path The __pending_seeks block in poll() was never reached: seeks deferred during on_assign are cleared in the assignment callback's finally block (after incremental_assign), before poll() can execute. Remove it along with a leftover commented-out STALE_MEMBER_EPOCH line. Co-Authored-By: Claude Sonnet 4.6 --- arroyo/backends/kafka/consumer.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 07a39903..92505591 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -163,7 +163,6 @@ def __init__( KafkaError.REQUEST_TIMED_OUT, KafkaError.NOT_COORDINATOR, KafkaError._WAIT_COORD, - # KafkaError.STALE_MEMBER_EPOCH, # kip-848 KafkaError.COORDINATOR_LOAD_IN_PROGRESS, ) @@ -487,20 +486,6 @@ def poll( if self.__state is not KafkaConsumerState.CONSUMING: raise InvalidState(self.__state) - # For KIP-848 consumers, seek() calls made inside the on_assign callback - # cannot be applied immediately via rdkafka (the partition isn't yet - # registered as seekable at that point). Instead they are stored in - # __pending_seeks and applied here, before the next rdkafka poll, so - # the consumer fetches from the correct position. - if self.__pending_seeks: - for partition, offset in self.__pending_seeks.items(): - self.__consumer.seek( - ConfluentTopicPartition( - partition.topic.name, partition.index, offset - ) - ) - self.__pending_seeks.clear() - message: Optional[ConfluentMessage] = self.__consumer.poll( *[timeout] if timeout is not None else [] ) From 3df6f8b32eeb663e32bd3c52fae82e65265c8603 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Tue, 10 Mar 2026 13:34:38 -0400 Subject: [PATCH 3/8] ref(confluent): Remove unused __pending_seeks dict MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After the previous cleanup, __pending_seeks was written to in seek() but never read — its values were unused since incremental_assign reads from __offsets instead. Remove the dict and all its callsites. Co-Authored-By: Claude Sonnet 4.6 --- arroyo/backends/kafka/consumer.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 92505591..3a2a941d 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -251,8 +251,6 @@ def __init__( self.__offsets: MutableMapping[Partition, int] = {} self.__staged_offsets: MutableMapping[Partition, int] = {} self.__paused: Set[Partition] = set() - # Seeks deferred during on_assign for KIP-848 (applied before next poll) - self.__pending_seeks: MutableMapping[Partition, int] = {} self.__commit_retry_policy = commit_retry_policy @@ -390,7 +388,6 @@ def assignment_callback( if p in self.__offsets ] self.__consumer.incremental_assign(kip848_partitions) - self.__pending_seeks.clear() self.__state = KafkaConsumerState.CONSUMING logger.info("Paused partitions after assignment: %s", self.__paused) @@ -431,7 +428,6 @@ def revocation_callback( ) self.__paused.discard(partition) - self.__pending_seeks.pop(partition, None) self.__state = KafkaConsumerState.CONSUMING logger.info("Paused partitions after revocation: %s", self.__paused) @@ -577,8 +573,10 @@ def seek(self, offsets: Mapping[Partition, int]) -> None: if self.__is_kip848 and self.__state == KafkaConsumerState.ASSIGNING: # With KIP-848, seeking inside on_assign fails because rdkafka - # hasn't fully processed incremental_assign yet. Defer until poll(). - self.__pending_seeks.update(offsets) + # hasn't fully processed incremental_assign yet. The seek offset + # is captured in self.__offsets and applied via incremental_assign + # after the callback completes. + pass else: for partition, offset in offsets.items(): self.__consumer.seek( From bf7c11516883e45d1fdd550892eaf1339123abd5 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Thu, 12 Mar 2026 10:56:22 -0400 Subject: [PATCH 4/8] ref(confluent): Address review feedback on KIP-848 implementation Remove cooperative-sticky rebalancing support, which was never used in practice and never worked properly. KIP-848 is the correct path for incremental rebalancing going forward. Simplify the KIP-848 incremental_assign flow: remove the seek()-in-on_assign workaround since no callers use that pattern. The finally block now uses the resolved offsets directly. Replace or-defaulting on topic/partition/offset with asserts in the EndOfPartition path, since these should always be populated. Upgrade mypy to 1.19.1 and update pre-commit hook to match, adding confluent-kafka as a dependency so type stubs are available. Configure the mypy hook to always check the full project for consistent results. Co-Authored-By: Claude Sonnet 4.6 --- .pre-commit-config.yaml | 6 +- arroyo/backends/kafka/consumer.py | 76 ++++++++----------- arroyo/backends/local/backend.py | 1 + .../run_task_with_multiprocessing.py | 4 +- requirements-test.txt | 2 +- tests/backends/mixins.py | 5 +- tests/backends/test_confluent_producer.py | 6 +- tests/backends/test_kafka.py | 17 ++--- tests/processing/strategies/test_filter.py | 2 +- tests/test_kip848_e2e.py | 5 +- 10 files changed, 57 insertions(+), 67 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 61bcd6b2..47870473 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,11 +22,13 @@ repos: - id: flake8 language_version: python3.12 - repo: https://github.com/pre-commit/mirrors-mypy - rev: 'v0.961' + rev: 'v1.19.1' hooks: - id: mypy args: [--config-file, mypy.ini, --strict] - additional_dependencies: [pytest==7.1.2] + additional_dependencies: [pytest==7.1.2, confluent-kafka==2.13.2] + pass_filenames: false + entry: mypy . --config-file mypy.ini --strict - repo: https://github.com/pycqa/isort rev: 5.12.0 hooks: diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 3a2a941d..8ab8b1f8 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -23,7 +23,6 @@ Tuple, Type, Union, - cast, ) from confluent_kafka import ( @@ -170,14 +169,11 @@ def __init__( 3, 1, lambda e: isinstance(e, KafkaException) + and isinstance(e.args[0], KafkaError) and e.args[0].code() in retryable_errors, ) self.__is_kip848 = configuration.get("group.protocol") == "consumer" - self.__is_cooperative_sticky = ( - configuration.get("partition.assignment.strategy") == "cooperative-sticky" - or self.__is_kip848 - ) auto_offset_reset = configuration.get("auto.offset.reset", "largest") # This is a special flag that controls the auto offset behavior for @@ -375,19 +371,17 @@ def assignment_callback( on_assign(offsets) finally: if self.__is_kip848: - # For KIP-848, incremental_assign is called here (after the - # user's on_assign callback) so that any seek() calls made - # in the callback are incorporated into the starting offsets. - # Calling seek() inside on_assign for KIP-848 fails because - # rdkafka hasn't processed the partition yet at that point. - kip848_partitions = [ - ConfluentTopicPartition( - p.topic.name, p.index, self.__offsets[p] - ) - for p in offsets - if p in self.__offsets - ] - self.__consumer.incremental_assign(kip848_partitions) + # For KIP-848, incremental_assign must be called after + # on_assign to set the starting offsets for newly assigned + # partitions. Calling it before on_assign (in __assign) + # would fail with _UNKNOWN_PARTITION because rdkafka hasn't + # fully processed the assignment at that point. + self.__consumer.incremental_assign( + [ + ConfluentTopicPartition(p.topic.name, p.index, offsets[p]) + for p in offsets + ] + ) self.__state = KafkaConsumerState.CONSUMING logger.info("Paused partitions after assignment: %s", self.__paused) @@ -492,9 +486,15 @@ def poll( if error is not None: code = error.code() if code == KafkaError._PARTITION_EOF: + topic = message.topic() + partition = message.partition() + offset = message.offset() + assert topic is not None + assert partition is not None + assert offset is not None raise EndOfPartition( - Partition(Topic(message.topic() or ""), message.partition() or 0), - message.offset() or 0, + Partition(Topic(topic), partition), + offset, ) elif code == KafkaError._TRANSPORT: raise TransportError(str(error)) @@ -506,7 +506,7 @@ def poll( else: raise ConsumerError(str(error)) - headers: Optional[Headers] = message.headers() + headers: Optional[Headers] = message.headers() # type: ignore[assignment, unused-ignore] broker_value = BrokerValue( KafkaPayload( message.key(), @@ -547,12 +547,11 @@ def __assign(self, offsets: Mapping[Partition, int]) -> None: for partition, offset in offsets.items() ] if self.__is_kip848: - # For KIP-848, incremental_assign is called after the on_assign - # callback so user seek()s in the callback are reflected in the - # starting offsets passed to rdkafka. + # For KIP-848, incremental_assign is deferred to after on_assign + # in the assignment_callback finally block. Calling it here would + # fail with _UNKNOWN_PARTITION because rdkafka hasn't fully + # processed the assignment yet at this point in the callback. pass - elif self.__is_cooperative_sticky: - self.__consumer.incremental_assign(partitions) else: self.__consumer.assign(partitions) self.__offsets.update(offsets) @@ -571,19 +570,10 @@ def seek(self, offsets: Mapping[Partition, int]) -> None: self.__validate_offsets(offsets) - if self.__is_kip848 and self.__state == KafkaConsumerState.ASSIGNING: - # With KIP-848, seeking inside on_assign fails because rdkafka - # hasn't fully processed incremental_assign yet. The seek offset - # is captured in self.__offsets and applied via incremental_assign - # after the callback completes. - pass - else: - for partition, offset in offsets.items(): - self.__consumer.seek( - ConfluentTopicPartition( - partition.topic.name, partition.index, offset - ) - ) + for partition, offset in offsets.items(): + self.__consumer.seek( + ConfluentTopicPartition(partition.topic.name, partition.index, offset) + ) self.__offsets.update(offsets) def pause(self, partitions: Sequence[Partition]) -> None: @@ -846,7 +836,7 @@ def produce( produce( value=payload.value, key=payload.key, - headers=dict(payload.headers), + headers=list(payload.headers), on_delivery=partial(self.__delivery_callback, future, payload), ) return future @@ -863,7 +853,7 @@ def close(self) -> Future[None]: METRICS_FREQUENCY_SEC = 1.0 -class ConfluentProducer(ConfluentKafkaProducer): # type: ignore[misc] +class ConfluentProducer(ConfluentKafkaProducer): # type: ignore[misc, unused-ignore] """ A thin wrapper for confluent_kafka.Producer that adds metrics reporting. """ @@ -904,7 +894,7 @@ def produce(self, *args: Any, **kwargs: Any) -> None: on_delivery = kwargs.pop("on_delivery", None) user_callback = callback or on_delivery wrapped_callback = self.__delivery_callback(user_callback) - super().produce(*args, **kwargs, on_delivery=wrapped_callback) + super().produce(*args, **kwargs, on_delivery=wrapped_callback) # type: ignore[misc] def __flush_metrics(self) -> None: for status, count in self.__produce_counters.items(): @@ -921,7 +911,7 @@ def __flush_metrics(self) -> None: def flush(self, timeout: float | None = -1) -> int: # Kafka producer flush should flush metrics too self.__flush_metrics() - return cast(int, super().flush(timeout)) + return super().flush(timeout) # type: ignore[arg-type, unused-ignore] def __reset_metrics(self) -> None: self.__produce_counters.clear() diff --git a/arroyo/backends/local/backend.py b/arroyo/backends/local/backend.py index 760f7f22..1b07a187 100644 --- a/arroyo/backends/local/backend.py +++ b/arroyo/backends/local/backend.py @@ -216,6 +216,7 @@ def unsubscribe(self) -> None: if self.__closed: raise RuntimeError("consumer is closed") + assert self.__subscription is not None self.__pending_callbacks.append( partial( self.__revoke, diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index f3e3ffae..82ef31a7 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -116,7 +116,7 @@ def __getitem__(self, index: int) -> TBatchValue: pickle.loads( data, buffers=[ - self.block.buf[offset : offset + length].tobytes() + self.block.buf[offset : offset + length].tobytes() # type: ignore[index] for offset, length in buffers ], ), @@ -159,7 +159,7 @@ def buffer_callback(buffer: PickleBuffer) -> None: f"Value exceeds available space in block, {length} " f"bytes needed but {self.block.size - offset} bytes free." ) - self.block.buf[offset : offset + length] = value + self.block.buf[offset : offset + length] = value # type: ignore[index] self.__offset += length buffers.append((offset, length)) diff --git a/requirements-test.txt b/requirements-test.txt index 1327328a..0005b2ba 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,3 @@ pytest==8.4.1 pytest-benchmark==4.0.0 -mypy==0.961 +mypy==1.19.1 diff --git a/tests/backends/mixins.py b/tests/backends/mixins.py index fcee2043..c240a852 100644 --- a/tests/backends/mixins.py +++ b/tests/backends/mixins.py @@ -14,7 +14,6 @@ class StreamsTestMixin(ABC, Generic[TStrategyPayload]): - cooperative_sticky = False kip_848 = False @abstractmethod @@ -454,7 +453,7 @@ def wait_until_rebalancing( wait_until_rebalancing(consumer_a, consumer_b) - if self.cooperative_sticky or self.kip_848: + if self.kip_848: # within incremental rebalancing, only one partition should have been reassigned to the consumer_b, and consumer_a should remain paused # Either partition 0 or 1 might be the paused one assert len(consumer_a.paused()) == 1 @@ -483,7 +482,7 @@ def wait_until_rebalancing( assert len(consumer_b.tell()) == 2 - if self.cooperative_sticky or self.kip_848: + if self.kip_848: consumer_a_on_assign.assert_has_calls( [ mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}), diff --git a/tests/backends/test_confluent_producer.py b/tests/backends/test_confluent_producer.py index 96b5783d..91fb88bb 100644 --- a/tests/backends/test_confluent_producer.py +++ b/tests/backends/test_confluent_producer.py @@ -28,7 +28,7 @@ def test_metrics_callback_records_success(self) -> None: {"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"} ) mock_message = mock.Mock(spec=ConfluentMessage) - producer._ConfluentProducer__metrics_delivery_callback(None, mock_message) + producer._ConfluentProducer__metrics_delivery_callback(None, mock_message) # type: ignore[attr-defined] producer.flush() # Flush buffered metrics assert ( Increment( @@ -44,7 +44,7 @@ def test_metrics_callback_records_error(self) -> None: producer = ConfluentProducer({"bootstrap.servers": "fake:9092"}) mock_error = mock.Mock(spec=KafkaError) mock_message = mock.Mock(spec=ConfluentMessage) - producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message) + producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message) # type: ignore[attr-defined] producer.flush() # Flush buffered metrics assert ( Increment("arroyo.producer.produce_status", 1, {"status": "error"}) @@ -63,7 +63,7 @@ def user_callback( ) -> None: user_callback_invoked.append((error, message)) - wrapped = producer._ConfluentProducer__delivery_callback(user_callback) + wrapped = producer._ConfluentProducer__delivery_callback(user_callback) # type: ignore[attr-defined] mock_message = mock.Mock(spec=ConfluentMessage) wrapped(None, mock_message) producer.flush() # Flush buffered metrics diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index f624df66..4da707bf 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -10,7 +10,10 @@ from unittest import mock import pytest -from confluent_kafka.admin import AdminClient, NewTopic +from confluent_kafka.admin import ( # type: ignore[attr-defined, unused-ignore] + AdminClient, + NewTopic, +) from arroyo.backends.kafka import KafkaConsumer, KafkaPayload, KafkaProducer from arroyo.backends.kafka.commit import CommitCodec @@ -119,9 +122,6 @@ def get_consumer( if max_poll_interval_ms < 45000: configuration["session.timeout.ms"] = max_poll_interval_ms - if self.cooperative_sticky: - configuration["partition.assignment.strategy"] = "cooperative-sticky" - if self.kip_848: configuration["group.protocol"] = "consumer" configuration.pop("session.timeout.ms") @@ -233,8 +233,8 @@ def test_consumer_stream_processor_shutdown(self) -> None: @mock.patch("arroyo.processing.processor.BACKPRESSURE_THRESHOLD", 0) def test_assign_partition_during_pause(self) -> None: - if self.cooperative_sticky or self.kip_848: - pytest.skip("test does not work with cooperative-sticky rebalancing") + if self.kip_848: + pytest.skip("test does not work with incremental rebalancing") payloads = self.get_payloads() @@ -423,11 +423,6 @@ def test_auto_commit_mode(self) -> None: assert e.partition == Partition(topic, 0) -class TestKafkaStreamsIncrementalRebalancing(TestKafkaStreams): - # re-test the kafka consumer with cooperative-sticky rebalancing - cooperative_sticky = True - - class TestKafkaStreamsKip848(TestKafkaStreams): kip_848 = True diff --git a/tests/processing/strategies/test_filter.py b/tests/processing/strategies/test_filter.py index f3268f47..bc903833 100644 --- a/tests/processing/strategies/test_filter.py +++ b/tests/processing/strategies/test_filter.py @@ -170,7 +170,7 @@ def test_function(message: Message[bool]) -> bool: def test_backpressure_in_join() -> None: topic = Topic("topic") next_step = Mock() - next_step.submit.side_effect = [None] * 6 + [MessageRejected] # type: ignore + next_step.submit.side_effect = [None] * 6 + [MessageRejected] now = datetime.now() diff --git a/tests/test_kip848_e2e.py b/tests/test_kip848_e2e.py index 430da295..03e62dca 100644 --- a/tests/test_kip848_e2e.py +++ b/tests/test_kip848_e2e.py @@ -6,7 +6,10 @@ from contextlib import closing from typing import Any, Iterator, Mapping -from confluent_kafka.admin import AdminClient, NewTopic +from confluent_kafka.admin import ( # type: ignore[attr-defined, unused-ignore] + AdminClient, + NewTopic, +) from arroyo.backends.kafka import KafkaProducer from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration From 08542046a2b1c0e83ea38970f476ac82322a44f2 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Thu, 12 Mar 2026 11:03:02 -0400 Subject: [PATCH 5/8] fix(confluent): Assert topic/partition/offset are populated on consumed messages Co-Authored-By: Claude Sonnet 4.6 --- arroyo/backends/kafka/consumer.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 8ab8b1f8..3c347d10 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -507,14 +507,20 @@ def poll( raise ConsumerError(str(error)) headers: Optional[Headers] = message.headers() # type: ignore[assignment, unused-ignore] + topic = message.topic() + partition = message.partition() + offset = message.offset() + assert topic is not None + assert partition is not None + assert offset is not None broker_value = BrokerValue( KafkaPayload( message.key(), message.value() or b"", headers if headers is not None else [], ), - Partition(Topic(message.topic() or ""), message.partition() or 0), - message.offset() or 0, + Partition(Topic(topic), partition), + offset, datetime.utcfromtimestamp(message.timestamp()[1] / 1000.0), ) self.__offsets[broker_value.partition] = broker_value.next_offset From 76a4a0f6510fb95e51fa05994db2ad84efac5814 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Thu, 12 Mar 2026 11:07:21 -0400 Subject: [PATCH 6/8] ci: Upgrade to Python 3.13 Update mypy.ini, CI linting and typing jobs to use Python 3.13. Suppress two new strict covariant TypeVar errors in types.py where the covariance is semantically correct but mypy 3.13 disallows covariant TypeVars in class attribute declarations. Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/ci.yml | 4 ++-- arroyo/types.py | 4 ++-- mypy.ini | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b3dd28a5..c18a0ed6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ jobs: name: Checkout code - uses: actions/setup-python@v2 with: - python-version: 3.12 + python-version: 3.13 - name: Install dependencies run: | python -m pip install --upgrade pip @@ -34,7 +34,7 @@ jobs: name: Checkout code - uses: actions/setup-python@v2 with: - python-version: 3.12 + python-version: 3.13 - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/arroyo/types.py b/arroyo/types.py index 4472fc56..eb430d0f 100644 --- a/arroyo/types.py +++ b/arroyo/types.py @@ -125,7 +125,7 @@ class Value(BaseValue[TMessagePayload]): """ __slots__ = ["__payload", "__committable"] - __payload: TMessagePayload + __payload: TMessagePayload # type: ignore[misc] __committable: Mapping[Partition, int] __timestamp: Optional[datetime] @@ -163,7 +163,7 @@ class BrokerValue(BaseValue[TMessagePayload]): """ __slots__ = ["__payload", "partition", "offset", "timestamp"] - __payload: TMessagePayload + __payload: TMessagePayload # type: ignore[misc] partition: Partition offset: int timestamp: datetime diff --git a/mypy.ini b/mypy.ini index aaf0ac98..8a3e6526 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,5 +1,5 @@ [mypy] -python_version = 3.12 +python_version = 3.13 [mypy-avro.*] ignore_missing_imports = True From 6335fd6de0d7c89fd708e1a6d6e85b6fca728905 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Thu, 12 Mar 2026 11:15:34 -0400 Subject: [PATCH 7/8] ref: Add comments explaining memoryview type: ignore annotations Co-Authored-By: Claude Sonnet 4.6 --- arroyo/processing/strategies/run_task_with_multiprocessing.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index 82ef31a7..80ca0e83 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -116,6 +116,7 @@ def __getitem__(self, index: int) -> TBatchValue: pickle.loads( data, buffers=[ + # mypy doesn't support slice indexing on memoryview self.block.buf[offset : offset + length].tobytes() # type: ignore[index] for offset, length in buffers ], @@ -159,6 +160,7 @@ def buffer_callback(buffer: PickleBuffer) -> None: f"Value exceeds available space in block, {length} " f"bytes needed but {self.block.size - offset} bytes free." ) + # mypy doesn't support slice indexing on memoryview self.block.buf[offset : offset + length] = value # type: ignore[index] self.__offset += length buffers.append((offset, length)) From 6651a00344e281ac31cbef7cf44bdf962c9283cb Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Tue, 17 Mar 2026 12:56:52 -0400 Subject: [PATCH 8/8] fix(confluent): Call incremental_assign before on_assign for KIP-848 Move the incremental_assign call to after offset resolution but before the on_assign callback fires. This allows seeks performed inside on_assign to succeed, since the partition is now registered with rdkafka at that point. Previously, incremental_assign was deferred to the finally block after on_assign, which meant any seek() call inside on_assign would fail with _UNKNOWN_PARTITION. Calling it at the very top of the callback (inside __assign) also fails, but calling it after committed() and offset resolution are complete works correctly. Co-Authored-By: Claude Sonnet 4.6 --- arroyo/backends/kafka/consumer.py | 34 ++++++++++++++----------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 3c347d10..eca6cc24 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -358,6 +358,20 @@ def assignment_callback( self.__assign(offsets) + if self.__is_kip848: + # For KIP-848, incremental_assign must be called after + # offset resolution but before on_assign, so that seeks + # issued inside on_assign work correctly. Calling it + # earlier (inside __assign at the top of the callback) + # fails with _UNKNOWN_PARTITION because rdkafka hasn't + # fully processed the assignment at that point. + self.__consumer.incremental_assign( + [ + ConfluentTopicPartition(p.topic.name, p.index, offsets[p]) + for p in offsets + ] + ) + # Ensure that all partitions are resumed on assignment to avoid # carrying over state from a previous assignment. self.resume([p for p in offsets]) @@ -370,18 +384,6 @@ def assignment_callback( if on_assign is not None: on_assign(offsets) finally: - if self.__is_kip848: - # For KIP-848, incremental_assign must be called after - # on_assign to set the starting offsets for newly assigned - # partitions. Calling it before on_assign (in __assign) - # would fail with _UNKNOWN_PARTITION because rdkafka hasn't - # fully processed the assignment at that point. - self.__consumer.incremental_assign( - [ - ConfluentTopicPartition(p.topic.name, p.index, offsets[p]) - for p in offsets - ] - ) self.__state = KafkaConsumerState.CONSUMING logger.info("Paused partitions after assignment: %s", self.__paused) @@ -552,13 +554,7 @@ def __assign(self, offsets: Mapping[Partition, int]) -> None: ConfluentTopicPartition(partition.topic.name, partition.index, offset) for partition, offset in offsets.items() ] - if self.__is_kip848: - # For KIP-848, incremental_assign is deferred to after on_assign - # in the assignment_callback finally block. Calling it here would - # fail with _UNKNOWN_PARTITION because rdkafka hasn't fully - # processed the assignment yet at this point in the callback. - pass - else: + if not self.__is_kip848: self.__consumer.assign(partitions) self.__offsets.update(offsets)