diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a8db56a6..7d650010 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -48,7 +48,7 @@ jobs: strategy: max-parallel: 5 matrix: - python: [3.9, "3.10", "3.11", "3.12", "3.13"] + python: [3.9, "3.10", "3.11", "3.12", "3.13", "3.14"] timeout-minutes: 10 steps: - uses: actions/checkout@v2 diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index c6b2e1a4..17574a03 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -346,7 +346,7 @@ def assignment_callback( try: assignment: MutableSequence[ConfluentTopicPartition] = [] - for partition in self.__consumer.committed(partitions): + for partition in self.__consumer.committed(list(partitions)): if partition.offset >= 0: assignment.append(partition) elif partition.offset == OFFSET_INVALID: @@ -364,7 +364,7 @@ def assignment_callback( # Ensure that all partitions are resumed on assignment to avoid # carrying over state from a previous assignment. - self.resume([p for p in offsets]) + self.resume(list(offsets.keys())) except Exception: self.__state = KafkaConsumerState.ERROR @@ -382,13 +382,15 @@ def revocation_callback( ) -> None: self.__state = KafkaConsumerState.REVOKING - partitions = [Partition(Topic(i.topic), i.partition) for i in partitions] + revoked_partitions = [ + Partition(Topic(i.topic), i.partition) for i in partitions + ] try: if on_revoke is not None: - on_revoke(partitions) + on_revoke(revoked_partitions) finally: - for partition in partitions: + for partition in revoked_partitions: # Staged offsets are deleted during partition revocation to # prevent later committing offsets for partitions that are # no longer owned by this consumer. @@ -476,8 +478,11 @@ def poll( code = error.code() if code == KafkaError._PARTITION_EOF: raise EndOfPartition( - Partition(Topic(message.topic()), message.partition()), - message.offset(), + Partition( + Topic(cast(str, message.topic())), + cast(int, message.partition()), + ), + cast(int, message.offset()), ) elif code == KafkaError._TRANSPORT: raise TransportError(str(error)) @@ -489,15 +494,33 @@ def poll( else: raise ConsumerError(str(error)) - headers: Optional[Headers] = message.headers() + raw_headers = message.headers() + if raw_headers is None: + headers = [] + elif isinstance(raw_headers, dict): + headers = [ + (k, v if isinstance(v, bytes) else b"") + for k, v in raw_headers.items() + ] + else: + headers = [ + (k, v if isinstance(v, bytes) else b"") + for k, v in raw_headers + ] + value = message.value() + if value is None: + value = b"" broker_value = BrokerValue( KafkaPayload( message.key(), - message.value(), - headers if headers is not None else [], + value, + headers, + ), + Partition( + Topic(cast(str, message.topic())), + cast(int, message.partition()), ), - Partition(Topic(message.topic()), message.partition()), - message.offset(), + cast(int, message.offset()), datetime.utcfromtimestamp(message.timestamp()[1] / 1000.0), ) self.__offsets[broker_value.partition] = broker_value.next_offset @@ -737,8 +760,8 @@ class KafkaProducer(Producer[KafkaPayload]): def __init__( self, configuration: Mapping[str, Any], use_simple_futures: bool = False ) -> None: - self.__configuration = configuration - self.__producer = ConfluentKafkaProducer(configuration) + self.__configuration = dict(configuration) + self.__producer = ConfluentKafkaProducer(self.__configuration) self.__shutdown_requested = Event() # The worker must execute in a separate thread to ensure that callbacks @@ -778,8 +801,11 @@ def __delivery_callback( future.set_result( BrokerValue( payload, - Partition(Topic(message.topic()), message.partition()), - message.offset(), + Partition( + Topic(cast(str, message.topic())), + cast(int, message.partition()), + ), + cast(int, message.offset()), datetime.utcfromtimestamp(timestamp_value / 1000.0), ), ) @@ -815,7 +841,7 @@ def produce( produce( value=payload.value, key=payload.key, - headers=payload.headers, + headers=cast(Any, payload.headers), on_delivery=partial(self.__delivery_callback, future, payload), ) return future @@ -832,14 +858,15 @@ def close(self) -> Future[None]: METRICS_FREQUENCY_SEC = 1.0 -class ConfluentProducer(ConfluentKafkaProducer): # type: ignore[misc] +class ConfluentProducer(ConfluentKafkaProducer): """ A thin wrapper for confluent_kafka.Producer that adds metrics reporting. """ def __init__(self, configuration: Mapping[str, Any]) -> None: - super().__init__(configuration) - self.producer_name = configuration.get("client.id") or None + config_dict = dict(configuration) + super().__init__(config_dict) + self.producer_name = config_dict.get("client.id") or None self.__metrics = get_metrics() self.__produce_counters: MutableMapping[str, int] = defaultdict(int) self.__reset_metrics() @@ -873,7 +900,7 @@ def produce(self, *args: Any, **kwargs: Any) -> None: on_delivery = kwargs.pop("on_delivery", None) user_callback = callback or on_delivery wrapped_callback = self.__delivery_callback(user_callback) - super().produce(*args, on_delivery=wrapped_callback, **kwargs) + super().produce(*args, on_delivery=wrapped_callback, **kwargs) # type: ignore[misc] def __flush_metrics(self) -> None: for status, count in self.__produce_counters.items(): @@ -890,7 +917,7 @@ def __flush_metrics(self) -> None: def flush(self, timeout: float = -1) -> int: # Kafka producer flush should flush metrics too self.__flush_metrics() - return cast(int, super().flush(timeout)) + return super().flush(timeout) def __reset_metrics(self) -> None: self.__produce_counters.clear() diff --git a/arroyo/backends/local/backend.py b/arroyo/backends/local/backend.py index 760f7f22..d845b9a5 100644 --- a/arroyo/backends/local/backend.py +++ b/arroyo/backends/local/backend.py @@ -216,10 +216,12 @@ def unsubscribe(self) -> None: if self.__closed: raise RuntimeError("consumer is closed") + subscription = self.__subscription + assert subscription is not None self.__pending_callbacks.append( partial( self.__revoke, - self.__subscription, + subscription, self.__broker.unsubscribe(self), ) ) diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index f3e3ffae..0ee408e0 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -111,12 +111,15 @@ def __getitem__(self, index: int) -> TBatchValue: # was still "alive" in a different part of the processing pipeline, the # contents of the message would be liable to be corrupted (at best -- # possibly causing a data leak/security issue at worst.) + buf = self.block.buf + if buf is None: + raise RuntimeError("shared memory block has no buffer") return cast( TBatchValue, pickle.loads( data, buffers=[ - self.block.buf[offset : offset + length].tobytes() + buf[offset : offset + length].tobytes() for offset, length in buffers ], ), @@ -159,7 +162,10 @@ def buffer_callback(buffer: PickleBuffer) -> None: f"Value exceeds available space in block, {length} " f"bytes needed but {self.block.size - offset} bytes free." ) - self.block.buf[offset : offset + length] = value + buf = self.block.buf + if buf is None: + raise RuntimeError("shared memory block has no buffer") + buf[offset : offset + length] = value self.__offset += length buffers.append((offset, length)) diff --git a/requirements-test.txt b/requirements-test.txt index 1327328a..9ad8d9ed 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,3 @@ pytest==8.4.1 pytest-benchmark==4.0.0 -mypy==0.961 +mypy>=1.19.1 diff --git a/requirements.txt b/requirements.txt index 7e4f7dfd..a958b727 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -confluent-kafka>=2.11.0,<2.12.0 +confluent-kafka>=2.12.1 diff --git a/tests/backends/test_confluent_producer.py b/tests/backends/test_confluent_producer.py index 96b5783d..e10983ee 100644 --- a/tests/backends/test_confluent_producer.py +++ b/tests/backends/test_confluent_producer.py @@ -28,7 +28,9 @@ def test_metrics_callback_records_success(self) -> None: {"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"} ) mock_message = mock.Mock(spec=ConfluentMessage) - producer._ConfluentProducer__metrics_delivery_callback(None, mock_message) + getattr( + producer, "_ConfluentProducer__metrics_delivery_callback" + )(None, mock_message) producer.flush() # Flush buffered metrics assert ( Increment( @@ -44,7 +46,9 @@ def test_metrics_callback_records_error(self) -> None: producer = ConfluentProducer({"bootstrap.servers": "fake:9092"}) mock_error = mock.Mock(spec=KafkaError) mock_message = mock.Mock(spec=ConfluentMessage) - producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message) + getattr( + producer, "_ConfluentProducer__metrics_delivery_callback" + )(mock_error, mock_message) producer.flush() # Flush buffered metrics assert ( Increment("arroyo.producer.produce_status", 1, {"status": "error"}) @@ -63,7 +67,9 @@ def user_callback( ) -> None: user_callback_invoked.append((error, message)) - wrapped = producer._ConfluentProducer__delivery_callback(user_callback) + wrapped = getattr( + producer, "_ConfluentProducer__delivery_callback" + )(user_callback) mock_message = mock.Mock(spec=ConfluentMessage) wrapped(None, mock_message) producer.flush() # Flush buffered metrics diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index 7fba26dd..e222cfc4 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -10,7 +10,7 @@ from unittest import mock import pytest -from confluent_kafka.admin import AdminClient, NewTopic +from confluent_kafka.admin import AdminClient, NewTopic # type: ignore[attr-defined] from arroyo.backends.kafka import KafkaConsumer, KafkaPayload, KafkaProducer from arroyo.backends.kafka.commit import CommitCodec diff --git a/tests/processing/strategies/test_filter.py b/tests/processing/strategies/test_filter.py index f3268f47..bc903833 100644 --- a/tests/processing/strategies/test_filter.py +++ b/tests/processing/strategies/test_filter.py @@ -170,7 +170,7 @@ def test_function(message: Message[bool]) -> bool: def test_backpressure_in_join() -> None: topic = Topic("topic") next_step = Mock() - next_step.submit.side_effect = [None] * 6 + [MessageRejected] # type: ignore + next_step.submit.side_effect = [None] * 6 + [MessageRejected] now = datetime.now() diff --git a/tests/test_kip848_e2e.py b/tests/test_kip848_e2e.py index 63f8b983..532d0e91 100644 --- a/tests/test_kip848_e2e.py +++ b/tests/test_kip848_e2e.py @@ -6,7 +6,7 @@ from contextlib import closing from typing import Any, Iterator, Mapping -from confluent_kafka.admin import AdminClient, NewTopic +from confluent_kafka.admin import AdminClient, NewTopic # type: ignore[attr-defined] from arroyo.backends.kafka import KafkaProducer from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration