diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 63506d99..8ce3a625 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ jobs: name: Checkout code - uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e # v2 with: - python-version: 3.12 + python-version: 3.13 - name: Install dependencies run: | python -m pip install --upgrade pip @@ -34,7 +34,7 @@ jobs: name: Checkout code - uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e # v2 with: - python-version: 3.12 + python-version: 3.13 - name: Install dependencies run: | python -m pip install --upgrade pip @@ -48,7 +48,7 @@ jobs: strategy: max-parallel: 5 matrix: - python: [3.9, "3.10", "3.11", "3.12", "3.13"] + python: ["3.10", "3.11", "3.12", "3.13"] timeout-minutes: 10 steps: - uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 61bcd6b2..47870473 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,11 +22,13 @@ repos: - id: flake8 language_version: python3.12 - repo: https://github.com/pre-commit/mirrors-mypy - rev: 'v0.961' + rev: 'v1.19.1' hooks: - id: mypy args: [--config-file, mypy.ini, --strict] - additional_dependencies: [pytest==7.1.2] + additional_dependencies: [pytest==7.1.2, confluent-kafka==2.13.2] + pass_filenames: false + entry: mypy . --config-file mypy.ini --strict - repo: https://github.com/pycqa/isort rev: 5.12.0 hooks: diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index c6b2e1a4..eca6cc24 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -23,7 +23,6 @@ Tuple, Type, Union, - cast, ) from confluent_kafka import ( @@ -163,7 +162,6 @@ def __init__( KafkaError.REQUEST_TIMED_OUT, KafkaError.NOT_COORDINATOR, KafkaError._WAIT_COORD, - KafkaError.STALE_MEMBER_EPOCH, # kip-848 KafkaError.COORDINATOR_LOAD_IN_PROGRESS, ) @@ -171,13 +169,11 @@ def __init__( 3, 1, lambda e: isinstance(e, KafkaException) + and isinstance(e.args[0], KafkaError) and e.args[0].code() in retryable_errors, ) - self.__is_cooperative_sticky = ( - configuration.get("partition.assignment.strategy") == "cooperative-sticky" - or configuration.get("group.protocol") == "consumer" - ) + self.__is_kip848 = configuration.get("group.protocol") == "consumer" auto_offset_reset = configuration.get("auto.offset.reset", "largest") # This is a special flag that controls the auto offset behavior for @@ -335,7 +331,7 @@ def subscribe( raise InvalidState(self.__state) def assignment_callback( - consumer: ConfluentConsumer, partitions: Sequence[ConfluentTopicPartition] + consumer: ConfluentConsumer, partitions: list[ConfluentTopicPartition] ) -> None: if not partitions: logger.info("skipping empty assignment") @@ -362,6 +358,20 @@ def assignment_callback( self.__assign(offsets) + if self.__is_kip848: + # For KIP-848, incremental_assign must be called after + # offset resolution but before on_assign, so that seeks + # issued inside on_assign work correctly. Calling it + # earlier (inside __assign at the top of the callback) + # fails with _UNKNOWN_PARTITION because rdkafka hasn't + # fully processed the assignment at that point. + self.__consumer.incremental_assign( + [ + ConfluentTopicPartition(p.topic.name, p.index, offsets[p]) + for p in offsets + ] + ) + # Ensure that all partitions are resumed on assignment to avoid # carrying over state from a previous assignment. self.resume([p for p in offsets]) @@ -378,11 +388,14 @@ def assignment_callback( logger.info("Paused partitions after assignment: %s", self.__paused) def revocation_callback( - consumer: ConfluentConsumer, partitions: Sequence[ConfluentTopicPartition] + consumer: ConfluentConsumer, + confluent_partitions: Sequence[ConfluentTopicPartition], ) -> None: self.__state = KafkaConsumerState.REVOKING - partitions = [Partition(Topic(i.topic), i.partition) for i in partitions] + partitions = [ + Partition(Topic(i.topic), i.partition) for i in confluent_partitions + ] try: if on_revoke is not None: @@ -475,9 +488,15 @@ def poll( if error is not None: code = error.code() if code == KafkaError._PARTITION_EOF: + topic = message.topic() + partition = message.partition() + offset = message.offset() + assert topic is not None + assert partition is not None + assert offset is not None raise EndOfPartition( - Partition(Topic(message.topic()), message.partition()), - message.offset(), + Partition(Topic(topic), partition), + offset, ) elif code == KafkaError._TRANSPORT: raise TransportError(str(error)) @@ -489,15 +508,21 @@ def poll( else: raise ConsumerError(str(error)) - headers: Optional[Headers] = message.headers() + headers: Optional[Headers] = message.headers() # type: ignore[assignment, unused-ignore] + topic = message.topic() + partition = message.partition() + offset = message.offset() + assert topic is not None + assert partition is not None + assert offset is not None broker_value = BrokerValue( KafkaPayload( message.key(), - message.value(), + message.value() or b"", headers if headers is not None else [], ), - Partition(Topic(message.topic()), message.partition()), - message.offset(), + Partition(Topic(topic), partition), + offset, datetime.utcfromtimestamp(message.timestamp()[1] / 1000.0), ) self.__offsets[broker_value.partition] = broker_value.next_offset @@ -529,9 +554,7 @@ def __assign(self, offsets: Mapping[Partition, int]) -> None: ConfluentTopicPartition(partition.topic.name, partition.index, offset) for partition, offset in offsets.items() ] - if self.__is_cooperative_sticky: - self.__consumer.incremental_assign(partitions) - else: + if not self.__is_kip848: self.__consumer.assign(partitions) self.__offsets.update(offsets) @@ -729,7 +752,7 @@ def closed(self) -> bool: @property def member_id(self) -> str: - member_id: str = self.__consumer.memberid() + member_id: str = self.__consumer.memberid() or "" return member_id @@ -738,7 +761,7 @@ def __init__( self, configuration: Mapping[str, Any], use_simple_futures: bool = False ) -> None: self.__configuration = configuration - self.__producer = ConfluentKafkaProducer(configuration) + self.__producer = ConfluentKafkaProducer(dict(configuration)) self.__shutdown_requested = Event() # The worker must execute in a separate thread to ensure that callbacks @@ -815,7 +838,7 @@ def produce( produce( value=payload.value, key=payload.key, - headers=payload.headers, + headers=list(payload.headers), on_delivery=partial(self.__delivery_callback, future, payload), ) return future @@ -832,13 +855,13 @@ def close(self) -> Future[None]: METRICS_FREQUENCY_SEC = 1.0 -class ConfluentProducer(ConfluentKafkaProducer): # type: ignore[misc] +class ConfluentProducer(ConfluentKafkaProducer): # type: ignore[misc, unused-ignore] """ A thin wrapper for confluent_kafka.Producer that adds metrics reporting. """ def __init__(self, configuration: Mapping[str, Any]) -> None: - super().__init__(configuration) + super().__init__(dict(configuration)) self.producer_name = configuration.get("client.id") or None self.__metrics = get_metrics() self.__produce_counters: MutableMapping[str, int] = defaultdict(int) @@ -873,7 +896,7 @@ def produce(self, *args: Any, **kwargs: Any) -> None: on_delivery = kwargs.pop("on_delivery", None) user_callback = callback or on_delivery wrapped_callback = self.__delivery_callback(user_callback) - super().produce(*args, on_delivery=wrapped_callback, **kwargs) + super().produce(*args, **kwargs, on_delivery=wrapped_callback) # type: ignore[misc] def __flush_metrics(self) -> None: for status, count in self.__produce_counters.items(): @@ -887,10 +910,10 @@ def __flush_metrics(self) -> None: ) self.__reset_metrics() - def flush(self, timeout: float = -1) -> int: + def flush(self, timeout: float | None = -1) -> int: # Kafka producer flush should flush metrics too self.__flush_metrics() - return cast(int, super().flush(timeout)) + return super().flush(timeout) # type: ignore[arg-type, unused-ignore] def __reset_metrics(self) -> None: self.__produce_counters.clear() diff --git a/arroyo/backends/local/backend.py b/arroyo/backends/local/backend.py index 760f7f22..1b07a187 100644 --- a/arroyo/backends/local/backend.py +++ b/arroyo/backends/local/backend.py @@ -216,6 +216,7 @@ def unsubscribe(self) -> None: if self.__closed: raise RuntimeError("consumer is closed") + assert self.__subscription is not None self.__pending_callbacks.append( partial( self.__revoke, diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index f3e3ffae..80ca0e83 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -116,7 +116,8 @@ def __getitem__(self, index: int) -> TBatchValue: pickle.loads( data, buffers=[ - self.block.buf[offset : offset + length].tobytes() + # mypy doesn't support slice indexing on memoryview + self.block.buf[offset : offset + length].tobytes() # type: ignore[index] for offset, length in buffers ], ), @@ -159,7 +160,8 @@ def buffer_callback(buffer: PickleBuffer) -> None: f"Value exceeds available space in block, {length} " f"bytes needed but {self.block.size - offset} bytes free." ) - self.block.buf[offset : offset + length] = value + # mypy doesn't support slice indexing on memoryview + self.block.buf[offset : offset + length] = value # type: ignore[index] self.__offset += length buffers.append((offset, length)) diff --git a/arroyo/types.py b/arroyo/types.py index 4472fc56..eb430d0f 100644 --- a/arroyo/types.py +++ b/arroyo/types.py @@ -125,7 +125,7 @@ class Value(BaseValue[TMessagePayload]): """ __slots__ = ["__payload", "__committable"] - __payload: TMessagePayload + __payload: TMessagePayload # type: ignore[misc] __committable: Mapping[Partition, int] __timestamp: Optional[datetime] @@ -163,7 +163,7 @@ class BrokerValue(BaseValue[TMessagePayload]): """ __slots__ = ["__payload", "partition", "offset", "timestamp"] - __payload: TMessagePayload + __payload: TMessagePayload # type: ignore[misc] partition: Partition offset: int timestamp: datetime diff --git a/mypy.ini b/mypy.ini index aaf0ac98..8a3e6526 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,5 +1,5 @@ [mypy] -python_version = 3.12 +python_version = 3.13 [mypy-avro.*] ignore_missing_imports = True diff --git a/requirements-test.txt b/requirements-test.txt index 1327328a..0005b2ba 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,3 @@ pytest==8.4.1 pytest-benchmark==4.0.0 -mypy==0.961 +mypy==1.19.1 diff --git a/requirements.txt b/requirements.txt index 7e4f7dfd..7196e292 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -confluent-kafka>=2.11.0,<2.12.0 +confluent-kafka>=2.13.2 diff --git a/tests/backends/mixins.py b/tests/backends/mixins.py index fcee2043..c240a852 100644 --- a/tests/backends/mixins.py +++ b/tests/backends/mixins.py @@ -14,7 +14,6 @@ class StreamsTestMixin(ABC, Generic[TStrategyPayload]): - cooperative_sticky = False kip_848 = False @abstractmethod @@ -454,7 +453,7 @@ def wait_until_rebalancing( wait_until_rebalancing(consumer_a, consumer_b) - if self.cooperative_sticky or self.kip_848: + if self.kip_848: # within incremental rebalancing, only one partition should have been reassigned to the consumer_b, and consumer_a should remain paused # Either partition 0 or 1 might be the paused one assert len(consumer_a.paused()) == 1 @@ -483,7 +482,7 @@ def wait_until_rebalancing( assert len(consumer_b.tell()) == 2 - if self.cooperative_sticky or self.kip_848: + if self.kip_848: consumer_a_on_assign.assert_has_calls( [ mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}), diff --git a/tests/backends/test_confluent_producer.py b/tests/backends/test_confluent_producer.py index 96b5783d..91fb88bb 100644 --- a/tests/backends/test_confluent_producer.py +++ b/tests/backends/test_confluent_producer.py @@ -28,7 +28,7 @@ def test_metrics_callback_records_success(self) -> None: {"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"} ) mock_message = mock.Mock(spec=ConfluentMessage) - producer._ConfluentProducer__metrics_delivery_callback(None, mock_message) + producer._ConfluentProducer__metrics_delivery_callback(None, mock_message) # type: ignore[attr-defined] producer.flush() # Flush buffered metrics assert ( Increment( @@ -44,7 +44,7 @@ def test_metrics_callback_records_error(self) -> None: producer = ConfluentProducer({"bootstrap.servers": "fake:9092"}) mock_error = mock.Mock(spec=KafkaError) mock_message = mock.Mock(spec=ConfluentMessage) - producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message) + producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message) # type: ignore[attr-defined] producer.flush() # Flush buffered metrics assert ( Increment("arroyo.producer.produce_status", 1, {"status": "error"}) @@ -63,7 +63,7 @@ def user_callback( ) -> None: user_callback_invoked.append((error, message)) - wrapped = producer._ConfluentProducer__delivery_callback(user_callback) + wrapped = producer._ConfluentProducer__delivery_callback(user_callback) # type: ignore[attr-defined] mock_message = mock.Mock(spec=ConfluentMessage) wrapped(None, mock_message) producer.flush() # Flush buffered metrics diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index 7fba26dd..4da707bf 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -10,7 +10,10 @@ from unittest import mock import pytest -from confluent_kafka.admin import AdminClient, NewTopic +from confluent_kafka.admin import ( # type: ignore[attr-defined, unused-ignore] + AdminClient, + NewTopic, +) from arroyo.backends.kafka import KafkaConsumer, KafkaPayload, KafkaProducer from arroyo.backends.kafka.commit import CommitCodec @@ -119,9 +122,6 @@ def get_consumer( if max_poll_interval_ms < 45000: configuration["session.timeout.ms"] = max_poll_interval_ms - if self.cooperative_sticky: - configuration["partition.assignment.strategy"] = "cooperative-sticky" - if self.kip_848: configuration["group.protocol"] = "consumer" configuration.pop("session.timeout.ms") @@ -233,8 +233,8 @@ def test_consumer_stream_processor_shutdown(self) -> None: @mock.patch("arroyo.processing.processor.BACKPRESSURE_THRESHOLD", 0) def test_assign_partition_during_pause(self) -> None: - if self.cooperative_sticky or self.kip_848: - pytest.skip("test does not work with cooperative-sticky rebalancing") + if self.kip_848: + pytest.skip("test does not work with incremental rebalancing") payloads = self.get_payloads() @@ -423,18 +423,9 @@ def test_auto_commit_mode(self) -> None: assert e.partition == Partition(topic, 0) -class TestKafkaStreamsIncrementalRebalancing(TestKafkaStreams): - # re-test the kafka consumer with cooperative-sticky rebalancing - cooperative_sticky = True - - class TestKafkaStreamsKip848(TestKafkaStreams): kip_848 = True - @pytest.mark.xfail(reason="To be fixed") - def test_pause_resume_rebalancing(self) -> None: - super().test_pause_resume_rebalancing() - def test_commit_codec() -> None: commit = Commit( diff --git a/tests/processing/strategies/test_filter.py b/tests/processing/strategies/test_filter.py index f3268f47..bc903833 100644 --- a/tests/processing/strategies/test_filter.py +++ b/tests/processing/strategies/test_filter.py @@ -170,7 +170,7 @@ def test_function(message: Message[bool]) -> bool: def test_backpressure_in_join() -> None: topic = Topic("topic") next_step = Mock() - next_step.submit.side_effect = [None] * 6 + [MessageRejected] # type: ignore + next_step.submit.side_effect = [None] * 6 + [MessageRejected] now = datetime.now() diff --git a/tests/test_kip848_e2e.py b/tests/test_kip848_e2e.py index 63f8b983..03e62dca 100644 --- a/tests/test_kip848_e2e.py +++ b/tests/test_kip848_e2e.py @@ -6,7 +6,10 @@ from contextlib import closing from typing import Any, Iterator, Mapping -from confluent_kafka.admin import AdminClient, NewTopic +from confluent_kafka.admin import ( # type: ignore[attr-defined, unused-ignore] + AdminClient, + NewTopic, +) from arroyo.backends.kafka import KafkaProducer from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration @@ -28,6 +31,15 @@ def get_topic( name = TOPIC configuration = dict(configuration) client = AdminClient(configuration) + + # Delete the topic if it already exists from a previous test run + existing = client.list_topics(timeout=5).topics + if name in existing: + [[key, future]] = client.delete_topics([name]).items() + assert key == name + future.result() + time.sleep(1) + [[key, future]] = client.create_topics( [NewTopic(name, num_partitions=partitions_count, replication_factor=1)] ).items()