From a727e9fefce9a7b1f0d876a9945b8abc45dfa609 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 11 Nov 2025 19:22:31 +0100 Subject: [PATCH] ref(consumers): Remove manual commit entirely (WIP) --- .github/workflows/codeql-analysis.yml | 6 +- arroyo/backends/abstract.py | 17 +- arroyo/backends/kafka/configuration.py | 7 +- arroyo/backends/kafka/consumer.py | 153 +---------- arroyo/backends/local/backend.py | 13 - arroyo/commit.py | 56 +--- arroyo/processing/processor.py | 28 +- arroyo/processing/strategies/__init__.py | 2 +- arroyo/processing/strategies/batching.py | 18 +- arroyo/processing/strategies/buffer.py | 10 +- arroyo/processing/strategies/commit.py | 5 +- arroyo/processing/strategies/filter.py | 78 +----- arroyo/processing/strategies/healthcheck.py | 4 +- arroyo/processing/strategies/reduce.py | 2 +- arroyo/types.py | 2 +- docs/source/getstarted.rst | 2 - docs/source/offsets.rst | 7 +- setup.py | 2 +- tests/backends/mixins.py | 10 - tests/backends/test_kafka.py | 25 +- tests/backends/test_kafka_commit_callback.py | 2 - tests/processing/strategies/test_all.py | 5 +- tests/processing/strategies/test_batching.py | 4 +- tests/processing/strategies/test_buffer.py | 2 +- tests/processing/strategies/test_filter.py | 191 +------------- .../strategies/test_run_task_in_threads.py | 5 +- tests/processing/test_processor.py | 243 +----------------- tests/test_commit.py | 46 ---- tests/test_kip848_e2e.py | 19 +- 29 files changed, 84 insertions(+), 880 deletions(-) delete mode 100644 tests/test_commit.py diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 097677cc..57536185 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -43,11 +43,11 @@ jobs: # If you wish to specify custom queries, you can do so here or in a config file. # By default, queries listed here will override any specified in a config file. # Prefix the list here with "+" to use these queries and those in the config file. - + # Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs # queries: security-extended,security-and-quality - + # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # If this step fails, then you should remove it and run the build manually (see below) - name: Autobuild @@ -56,7 +56,7 @@ jobs: # â„šī¸ Command-line programs to run using the OS shell. # 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun - # If the Autobuild fails above, remove it and uncomment the following three lines. + # If the Autobuild fails above, remove it and uncomment the following three lines. # modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance. # - run: | diff --git a/arroyo/backends/abstract.py b/arroyo/backends/abstract.py index 4b1ca8bb..e6532eb3 100644 --- a/arroyo/backends/abstract.py +++ b/arroyo/backends/abstract.py @@ -146,20 +146,9 @@ def seek(self, offsets: Mapping[Partition, int]) -> None: @abstractmethod def stage_offsets(self, offsets: Mapping[Partition, int]) -> None: """ - Stage offsets to be committed. If an offset has already been staged - for a given partition, that offset is overwritten (even if the offset - moves in reverse.) - """ - raise NotImplementedError - - @abstractmethod - def commit_offsets(self) -> Optional[Mapping[Partition, int]]: - """ - Commit staged offsets. The return value of this method is a mapping - of streams with their committed offsets as values. - - When auto-commit is enabled (in Kafka consumers), returns None since - the broker handles commits automatically. + Stage offsets to be committed. Offsets are automatically committed + by the broker. If an offset has already been staged for a given + partition, that offset is overwritten (even if the offset moves in reverse.) """ raise NotImplementedError diff --git a/arroyo/backends/kafka/configuration.py b/arroyo/backends/kafka/configuration.py index 3b4eb089..0fd914c9 100644 --- a/arroyo/backends/kafka/configuration.py +++ b/arroyo/backends/kafka/configuration.py @@ -237,7 +237,6 @@ def build_kafka_consumer_configuration( bootstrap_servers: Optional[Sequence[str]] = None, override_params: Optional[Mapping[str, Any]] = None, strict_offset_reset: Optional[bool] = None, - enable_auto_commit: bool = False, retry_handle_destroyed: bool = False, ) -> KafkaBrokerConfig: @@ -254,16 +253,14 @@ def build_kafka_consumer_configuration( default_config, bootstrap_servers, override_params ) - # Default configuration with manual commit management + # Configuration with auto-commit enabled config_update = { - "enable.auto.commit": False, + "enable.auto.commit": True, "enable.auto.offset.store": False, "group.id": group_id, "auto.offset.reset": auto_offset_reset, # this is an arroyo specific flag that only affects the consumer. "arroyo.strict.offset.reset": strict_offset_reset, - # this is an arroyo specific flag to enable auto-commit mode - "arroyo.enable.auto.commit": enable_auto_commit, # arroyo specific flag to enable retries when hitting `KafkaError._DESTROY` while committing "arroyo.retry.broker.handle.destroyed": retry_handle_destroyed, # overridden to reduce memory usage when there's a large backlog diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 6b3d1cfc..6b3d264e 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -54,7 +54,6 @@ from arroyo.types import BrokerValue, Partition, Topic from arroyo.utils.concurrent import execute from arroyo.utils.metrics import get_metrics -from arroyo.utils.retries import BasicRetryPolicy logger = logging.getLogger(__name__) @@ -159,30 +158,6 @@ def __init__( ) -> None: configuration = dict(configuration) - # Feature flag to enable retrying on `Broker handle destroyed` errors - # which can occur if we attempt to commit during a rebalance when - # the consumer group coordinator changed - self.__retry_handle_destroyed = as_kafka_configuration_bool( - configuration.pop("arroyo.retry.broker.handle.destroyed", False) - ) - - retryable_errors: Tuple[int, ...] = ( - KafkaError.REQUEST_TIMED_OUT, - KafkaError.NOT_COORDINATOR, - KafkaError._WAIT_COORD, - KafkaError.STALE_MEMBER_EPOCH, # kip-848 - KafkaError.COORDINATOR_LOAD_IN_PROGRESS, - ) - if self.__retry_handle_destroyed: - retryable_errors += (KafkaError._DESTROY,) - - commit_retry_policy = BasicRetryPolicy( - 3, - 1, - lambda e: isinstance(e, KafkaException) - and e.args[0].code() in retryable_errors, - ) - self.__is_cooperative_sticky = ( configuration.get("partition.assignment.strategy") == "cooperative-sticky" ) @@ -197,13 +172,6 @@ def __init__( if self.__strict_offset_reset is None: self.__strict_offset_reset = True - # Feature flag to enable rdkafka auto-commit with store_offsets - # When enabled, offsets are stored via store_offsets() and rdkafka - # automatically commits them periodically - self.__use_auto_commit = as_kafka_configuration_bool( - configuration.pop("arroyo.enable.auto.commit", False) - ) - if auto_offset_reset in {"smallest", "earliest", "beginning"}: self.__resolve_partition_starting_offset = ( self.__resolve_partition_offset_earliest @@ -219,32 +187,9 @@ def __init__( else: raise ValueError("invalid value for 'auto.offset.reset' configuration") - # When auto-commit is disabled (default), we require explicit configuration - # When auto-commit is enabled, we allow rdkafka to handle commits - if not self.__use_auto_commit: - if ( - as_kafka_configuration_bool( - configuration.get("enable.auto.commit", "true") - ) - is not False - ): - raise ValueError("invalid value for 'enable.auto.commit' configuration") - - if ( - as_kafka_configuration_bool( - configuration.get("enable.auto.offset.store", "true") - ) - is not False - ): - raise ValueError( - "invalid value for 'enable.auto.offset.store' configuration" - ) - else: - # In auto-commit mode, enable auto.commit and keep auto.offset.store disabled - # We'll use store_offsets() manually to control which offsets get committed - configuration["enable.auto.commit"] = True - configuration["enable.auto.offset.store"] = False - configuration["on_commit"] = self.__on_commit_callback + configuration["enable.auto.commit"] = True + configuration["enable.auto.offset.store"] = False + configuration["on_commit"] = self.__on_commit_callback # NOTE: Offsets are explicitly managed as part of the assignment # callback, so preemptively resetting offsets is not enabled when @@ -257,11 +202,8 @@ def __init__( ) self.__offsets: MutableMapping[Partition, int] = {} - self.__staged_offsets: MutableMapping[Partition, int] = {} self.__paused: Set[Partition] = set() - self.__commit_retry_policy = commit_retry_policy - self.__state = KafkaConsumerState.CONSUMING self.__metrics = get_metrics() @@ -397,16 +339,6 @@ def revocation_callback( on_revoke(partitions) finally: for partition in partitions: - # Staged offsets are deleted during partition revocation to - # prevent later committing offsets for partitions that are - # no longer owned by this consumer. - if partition in self.__staged_offsets: - logger.warning( - "Dropping staged offset for revoked partition (%r)!", - partition, - ) - del self.__staged_offsets[partition] - try: self.__offsets.pop(partition) except KeyError: @@ -634,86 +566,15 @@ def stage_offsets(self, offsets: Mapping[Partition, int]) -> None: # TODO: Maybe log a warning if these offsets exceed the current # offsets, since that's probably a side effect of an incorrect usage # pattern? - if self.__use_auto_commit: - # When auto-commit is enabled, use store_offsets to stage offsets - # for rdkafka to auto-commit - if offsets: - self.__consumer.store_offsets( - offsets=[ - ConfluentTopicPartition( - partition.topic.name, partition.index, offset - ) - for partition, offset in offsets.items() - ] - ) - else: - # Default behavior: manually track staged offsets - self.__staged_offsets.update(offsets) - - def __commit(self) -> Mapping[Partition, int]: - if self.__state in {KafkaConsumerState.CLOSED, KafkaConsumerState.ERROR}: - raise InvalidState(self.__state) - - if self.__staged_offsets.keys() - self.__offsets.keys(): - raise ConsumerError("cannot stage offsets for unassigned partitions") - - self.__validate_offsets(self.__staged_offsets) - - result: Optional[Sequence[ConfluentTopicPartition]] - - if self.__staged_offsets: - result = self.__consumer.commit( + if offsets: + self.__consumer.store_offsets( offsets=[ ConfluentTopicPartition( partition.topic.name, partition.index, offset ) - for partition, offset in self.__staged_offsets.items() - ], - asynchronous=False, + for partition, offset in offsets.items() + ] ) - else: - result = [] - - assert result is not None # synchronous commit should return result immediately - - offsets: MutableMapping[Partition, int] = {} - - for value in result: - # The Confluent Kafka Consumer will include logical offsets in the - # sequence of ``Partition`` objects returned by ``commit``. These - # are an implementation detail of the Kafka Consumer, so we don't - # expose them here. - # NOTE: These should no longer be seen now that we are forcing - # offsets to be set as part of the assignment callback. - if value.offset in self.LOGICAL_OFFSETS: - continue - - assert value.offset >= 0, "expected non-negative offset" - partition = Partition(Topic(value.topic), value.partition) - offsets[partition] = value.offset - - self.__staged_offsets.clear() - - return offsets - - def commit_offsets(self) -> Optional[Mapping[Partition, int]]: - """ - Commit staged offsets for all partitions that this consumer is - assigned to. The return value of this method is a mapping of - partitions with their committed offsets as values. - - When auto-commit is enabled, returns None since rdkafka handles - commits automatically and we don't track which offsets were committed. - - Raises an ``InvalidState`` if called on a closed consumer. - """ - if self.__use_auto_commit: - # When auto-commit is enabled, rdkafka commits automatically - # We don't track what was committed, so return None - # The offsets have already been staged via store_offsets() - return None - else: - return self.__commit_retry_policy.call(self.__commit) def close(self, timeout: Optional[float] = None) -> None: """ diff --git a/arroyo/backends/local/backend.py b/arroyo/backends/local/backend.py index 760f7f22..3eb303e0 100644 --- a/arroyo/backends/local/backend.py +++ b/arroyo/backends/local/backend.py @@ -153,7 +153,6 @@ def __init__( self.__pending_callbacks: Deque[Callable[[], None]] = deque() self.__offsets: MutableMapping[Partition, int] = {} - self.__staged_offsets: MutableMapping[Partition, int] = {} self.__paused: Set[Partition] = set() @@ -208,7 +207,6 @@ def subscribe( ) ) - self.__staged_offsets.clear() self.__last_eof_at.clear() def unsubscribe(self) -> None: @@ -225,7 +223,6 @@ def unsubscribe(self) -> None: ) self.__subscription = None - self.__staged_offsets.clear() self.__last_eof_at.clear() def poll( @@ -321,18 +318,10 @@ def seek(self, offsets: Mapping[Partition, int]) -> None: self.__offsets.update(offsets) def stage_offsets(self, offsets: Mapping[Partition, int]) -> None: - with self.__lock: - # XXX: can we remove the locking? dictionary updates might be - # atomic - self.__staged_offsets.update(offsets) - - def commit_offsets(self) -> Optional[Mapping[Partition, int]]: with self.__lock: if self.__closed: raise RuntimeError("consumer is closed") - offsets = {**self.__staged_offsets} - if offsets.keys() - self.__offsets.keys(): raise ConsumerError("cannot stage offsets for unassigned partitions") @@ -341,10 +330,8 @@ def commit_offsets(self) -> Optional[Mapping[Partition, int]]: self, offsets, ) - self.__staged_offsets.clear() self.commit_offsets_calls += 1 - return offsets def close(self, timeout: Optional[float] = None) -> None: with self.__lock: diff --git a/arroyo/commit.py b/arroyo/commit.py index 25b97577..b798ae89 100644 --- a/arroyo/commit.py +++ b/arroyo/commit.py @@ -1,63 +1,11 @@ from __future__ import annotations -import time -from dataclasses import dataclass, field -from typing import Mapping, MutableMapping, Optional +from dataclasses import dataclass +from typing import Optional from arroyo.types import Partition -@dataclass(frozen=True) -class CommitPolicy: - min_commit_frequency_sec: Optional[float] - min_commit_messages: Optional[int] - - def __post_init__(self) -> None: - assert ( - self.min_commit_frequency_sec is not None - or self.min_commit_messages is not None - ), "Must provide either min_commit_frequency_sec or min_commit_messages" - - def get_state_machine(self) -> CommitPolicyState: - return CommitPolicyState(self) - - -@dataclass -class CommitPolicyState: - policy: CommitPolicy - - __committed_offsets: MutableMapping[Partition, int] = field(default_factory=dict) - __last_committed_time: float = field(default_factory=time.time) - - def should_commit(self, now: float, offsets: Mapping[Partition, int]) -> bool: - if self.policy.min_commit_frequency_sec is not None: - elapsed = now - self.__last_committed_time - if elapsed >= self.policy.min_commit_frequency_sec: - return True - - if self.policy.min_commit_messages is not None: - messages_since_last_commit = 0 - for partition, pos in offsets.items(): - prev_offset = self.__committed_offsets.setdefault(partition, pos - 1) - messages_since_last_commit += pos - prev_offset - - # XXX: is it faster to do this check in the loop and - # potentially early-return, or do it outside and keep - # the loop small? - if messages_since_last_commit >= self.policy.min_commit_messages: - return True - - return False - - def did_commit(self, now: float, offsets: Mapping[Partition, int]) -> None: - self.__last_committed_time = now - self.__committed_offsets.update(offsets) - - -IMMEDIATE = CommitPolicy(None, 1) -ONCE_PER_SECOND = CommitPolicy(1, None) - - @dataclass(frozen=True) class Commit: __slots__ = ["group", "partition", "offset", "orig_message_ts", "received_p99"] diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 2e1d3dea..0c3e9ee6 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -19,7 +19,6 @@ ) from arroyo.backends.abstract import Consumer -from arroyo.commit import ONCE_PER_SECOND, CommitPolicy from arroyo.dlq import BufferedMessages, DlqPolicy, DlqPolicyWrapper, InvalidMessage from arroyo.errors import RecoverableError from arroyo.processing.strategies.abstract import ( @@ -137,7 +136,6 @@ def __init__( consumer: Consumer[TStrategyPayload], topic: Topic, processor_factory: ProcessingStrategyFactory[TStrategyPayload], - commit_policy: CommitPolicy = ONCE_PER_SECOND, dlq_policy: Optional[DlqPolicy[TStrategyPayload]] = None, join_timeout: Optional[float] = None, shutdown_strategy_before_consumer: bool = False, @@ -158,8 +156,6 @@ def __init__( # Consumer is paused after it is in backpressure state for > BACKPRESSURE_THRESHOLD seconds self.__is_paused = False - self.__commit_policy_state = commit_policy.get_state_machine() - # Default join_timeout to DEFAULT_JOIN_TIMEOUT if not provided self.__join_timeout = ( join_timeout if join_timeout is not None else DEFAULT_JOIN_TIMEOUT @@ -312,31 +308,17 @@ def _close_processing_strategy(self) -> None: ) self.__metrics_buffer.incr_timing("arroyo.consumer.shutdown.time", value) - def __commit(self, offsets: Mapping[Partition, int], force: bool = False) -> None: + def __commit(self, offsets: Mapping[Partition, int]) -> None: """ - If force is passed, commit immediately and do not throttle. This should - be used during consumer shutdown where we do not want to wait before committing. + Stage offsets for auto-commit. """ for partition, offset in offsets.items(): self.__buffered_messages.pop(partition, offset - 1) - self.__consumer.stage_offsets(offsets) - now = time.time() - - if force or self.__commit_policy_state.should_commit( - now, - offsets, - ): - if self.__dlq_policy: - self.__dlq_policy.flush(offsets) + if self.__dlq_policy: + self.__dlq_policy.flush(offsets) - self.__consumer.commit_offsets() - logger.debug( - "Waited %0.4f seconds for offsets to be committed to %r.", - time.time() - now, - self.__consumer, - ) - self.__commit_policy_state.did_commit(now, offsets) + self.__consumer.stage_offsets(offsets) def run(self) -> None: "The main run loop, see class docstring for more information." diff --git a/arroyo/processing/strategies/__init__.py b/arroyo/processing/strategies/__init__.py index 05f81626..675150f2 100644 --- a/arroyo/processing/strategies/__init__.py +++ b/arroyo/processing/strategies/__init__.py @@ -3,10 +3,10 @@ ProcessingStrategy, ProcessingStrategyFactory, ) -from arroyo.processing.strategies.healthcheck import Healthcheck from arroyo.processing.strategies.batching import BatchStep, UnbatchStep from arroyo.processing.strategies.commit import CommitOffsets from arroyo.processing.strategies.filter import FilterStep +from arroyo.processing.strategies.healthcheck import Healthcheck from arroyo.processing.strategies.produce import Produce from arroyo.processing.strategies.reduce import Reduce from arroyo.processing.strategies.run_task import RunTask diff --git a/arroyo/processing/strategies/batching.py b/arroyo/processing/strategies/batching.py index 4c099c0d..e5eb00b6 100644 --- a/arroyo/processing/strategies/batching.py +++ b/arroyo/processing/strategies/batching.py @@ -51,15 +51,15 @@ def accumulator( result.append(value) return result - self.__reduce_step: Reduce[TStrategyPayload, ValuesBatch[TStrategyPayload]] = ( - Reduce( - max_batch_size, - max_batch_time, - accumulator, - lambda: [], - next_step, - compute_batch_size, - ) + self.__reduce_step: Reduce[ + TStrategyPayload, ValuesBatch[TStrategyPayload] + ] = Reduce( + max_batch_size, + max_batch_time, + accumulator, + lambda: [], + next_step, + compute_batch_size, ) def submit( diff --git a/arroyo/processing/strategies/buffer.py b/arroyo/processing/strategies/buffer.py index 4965fa65..103311af 100644 --- a/arroyo/processing/strategies/buffer.py +++ b/arroyo/processing/strategies/buffer.py @@ -1,14 +1,6 @@ import time from datetime import datetime -from typing import ( - Generic, - MutableMapping, - Optional, - TypeVar, - Union, - cast, - Protocol, -) +from typing import Generic, MutableMapping, Optional, Protocol, TypeVar, Union, cast from arroyo.processing.strategies import MessageRejected, ProcessingStrategy from arroyo.types import BaseValue, FilteredPayload, Message, Partition, Value diff --git a/arroyo/processing/strategies/commit.py b/arroyo/processing/strategies/commit.py index e79cdbd5..7705a183 100644 --- a/arroyo/processing/strategies/commit.py +++ b/arroyo/processing/strategies/commit.py @@ -21,7 +21,7 @@ def __init__(self, commit: Commit) -> None: self.__last_record_time: Optional[float] = None def poll(self) -> None: - self.__commit({}) + pass def submit(self, message: Message[Any]) -> None: now = time.time() @@ -40,5 +40,4 @@ def terminate(self) -> None: pass def join(self, timeout: Optional[float] = None) -> None: - # Commit all previously staged offsets - self.__commit({}, force=True) + pass diff --git a/arroyo/processing/strategies/filter.py b/arroyo/processing/strategies/filter.py index 3407e073..2b6997a0 100644 --- a/arroyo/processing/strategies/filter.py +++ b/arroyo/processing/strategies/filter.py @@ -1,17 +1,8 @@ import logging -import time -from typing import Callable, MutableMapping, Optional, Union, cast +from typing import Callable, Optional, Union, cast -from arroyo.commit import CommitPolicy, CommitPolicyState -from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy -from arroyo.types import ( - FILTERED_PAYLOAD, - FilteredPayload, - Message, - Partition, - TStrategyPayload, - Value, -) +from arroyo.processing.strategies.abstract import ProcessingStrategy +from arroyo.types import FilteredPayload, Message, TStrategyPayload from arroyo.utils.metrics import get_metrics logger = logging.getLogger(__name__) @@ -23,45 +14,15 @@ class FilterStep(ProcessingStrategy[Union[FilteredPayload, TStrategyPayload]]): `FilterStep` takes a callback, `function`, and if that callback returns `False`, the message is dropped. - - Sometimes that behavior is not actually desirable because streams of - messages is what makes the consumer commit in regular intervals. If you - filter 100% of messages for a period of time, your consumer may not commit - its offsets as a result. - - For that scenario, you can pass your `CommitPolicy` to `FilterStep`. That - will cause `FilterStep` to emit "sentinel messages" that contain no - payload, but only carry forward partition offsets for later strategies to - commit. Those messages have a payload of type `FilteredPayload`. - - For that reason, basically every strategy needs to be able to handle - `Message[Union[FilteredPayload, T]]` instead of `Message[T]`, i.e. it needs - to subtype `ProcessingStrategy[Union[FilteredPayload, TStrategyPayload]]`. - If it doesn't, and rather just handles the regular `Message[T]`, it cannot - be composed with this step, and many other default strategies of arroyo. - - If no `CommitPolicy` is passed, no "sentinel messages" are emitted and - downstream steps do not have to deal with such messages (despite the type - system telling them so). """ def __init__( self, function: Callable[[Message[TStrategyPayload]], bool], next_step: ProcessingStrategy[Union[FilteredPayload, TStrategyPayload]], - commit_policy: Optional[CommitPolicy] = None, ): self.__test_function = function self.__next_step = next_step - - if commit_policy is not None: - self.__commit_policy_state: Optional[CommitPolicyState] = ( - commit_policy.get_state_machine() - ) - else: - self.__commit_policy_state = None - - self.__uncommitted_offsets: MutableMapping[Partition, int] = {} self.__closed = False self.__metrics = get_metrics() @@ -73,42 +34,12 @@ def submit( ) -> None: assert not self.__closed - policy = self.__commit_policy_state - now = time.time() - if policy is not None and policy.should_commit(now, self.__uncommitted_offsets): - self.__flush_uncommitted_offsets(now, can_backpressure=True) - if not isinstance(message.payload, FilteredPayload) and self.__test_function( cast(Message[TStrategyPayload], message) ): - for partition in message.committable: - self.__uncommitted_offsets.pop(partition, None) self.__next_step.submit(message) else: self.__metrics.increment("arroyo.strategies.filter.dropped_messages") - if self.__commit_policy_state is not None: - self.__uncommitted_offsets.update(message.committable) - - def __flush_uncommitted_offsets(self, now: float, can_backpressure: bool) -> None: - if not self.__uncommitted_offsets: - return - - new_message: Message[Union[FilteredPayload, TStrategyPayload]] = Message( - Value(FILTERED_PAYLOAD, self.__uncommitted_offsets) - ) - try: - self.__next_step.submit(new_message) - except MessageRejected: - if can_backpressure: - raise - # We have little to gain from reattempting the submission. - # Filtering is not supposed to be that expensive. - return - - if self.__commit_policy_state is not None: - self.__commit_policy_state.did_commit(now, self.__uncommitted_offsets) - - self.__uncommitted_offsets = {} def close(self) -> None: self.__closed = True @@ -120,8 +51,5 @@ def terminate(self) -> None: self.__next_step.terminate() def join(self, timeout: Optional[float] = None) -> None: - # We cannot let MessageRejected propagate here. join() is not supposed - # to raise this exception at all. - self.__flush_uncommitted_offsets(time.time(), can_backpressure=False) self.__next_step.close() self.__next_step.join(timeout=timeout) diff --git a/arroyo/processing/strategies/healthcheck.py b/arroyo/processing/strategies/healthcheck.py index 7731e84b..421eebf2 100644 --- a/arroyo/processing/strategies/healthcheck.py +++ b/arroyo/processing/strategies/healthcheck.py @@ -1,8 +1,8 @@ -from typing import Optional import time +from typing import Optional from arroyo.processing.strategies.abstract import ProcessingStrategy -from arroyo.types import TStrategyPayload, Message +from arroyo.types import Message, TStrategyPayload from arroyo.utils.metrics import get_metrics HEALTHCHECK_MAX_FREQUENCY_SEC = 1.0 # In seconds diff --git a/arroyo/processing/strategies/reduce.py b/arroyo/processing/strategies/reduce.py index b8d00fa8..59af852a 100644 --- a/arroyo/processing/strategies/reduce.py +++ b/arroyo/processing/strategies/reduce.py @@ -1,8 +1,8 @@ import time from typing import Callable, Generic, Optional, TypeVar, Union -from arroyo.processing.strategies.buffer import Buffer from arroyo.processing.strategies import ProcessingStrategy +from arroyo.processing.strategies.buffer import Buffer from arroyo.types import BaseValue, FilteredPayload, Message TPayload = TypeVar("TPayload") diff --git a/arroyo/types.py b/arroyo/types.py index 4472fc56..f1d7ab3e 100644 --- a/arroyo/types.py +++ b/arroyo/types.py @@ -197,5 +197,5 @@ def next_offset(self) -> int: class Commit(Protocol): - def __call__(self, offsets: Mapping[Partition, int], force: bool = False) -> None: + def __call__(self, offsets: Mapping[Partition, int]) -> None: pass diff --git a/docs/source/getstarted.rst b/docs/source/getstarted.rst index d0ce1308..f26c1af8 100644 --- a/docs/source/getstarted.rst +++ b/docs/source/getstarted.rst @@ -187,13 +187,11 @@ The code above is orchestrated by the Arroyo runtime called `StreamProcessor`. .. code-block:: Python from arroyo.processing import StreamProcessor - from arroyo.commit import ONCE_PER_SECOND processor = StreamProcessor( consumer=consumer, topic=TOPIC, processor_factory=ConsumerStrategyFactory(), - commit_policy=ONCE_PER_SECOND, ) processor.run() diff --git a/docs/source/offsets.rst b/docs/source/offsets.rst index 6316e653..0a31df69 100644 --- a/docs/source/offsets.rst +++ b/docs/source/offsets.rst @@ -13,11 +13,8 @@ The offset to be committed in Kafka is always the next offset to be consumed fro In Arroyo, this means you should commit `Message.next_offset` and never `Message.offset` when done processing that message. Arroyo exposes `Message.position_to_commit` to make this easier. -It is not safe to commit every offset in a high throughput consumer as this will add a lot of load to the system. -Commits should generally be throttled. `CommitPolicy` is the Arroyo way of specifying commit frequency. A `CommitPolicy` -must be passed to the stream processor, which allows specifying a minimum commit frequency (or messages between commits). -Commit throttling can be skipped when needed (i.e. during consumer shutdown) by passing `force=True` to the commit callback. -If you are not sure how often to commit, `ONCE_PER_SECOND` is a reasonable option. +Arroyo automatically commits offsets immediately when they are staged. Commit throttling can be skipped when +needed (i.e. during consumer shutdown) by passing `force=True` to the commit callback. The easiest way is to use the `CommitOffsets` strategy as the last step in a chain of processing strategies to commit offsets. diff --git a/setup.py b/setup.py index 03d17aa1..20373cf3 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ def get_requirements() -> Sequence[str]: long_description=open("README.md").read(), long_description_content_type="text/markdown", packages=find_packages(exclude=["tests", "examples"]), - package_data={"arroyo": ["py.typed","utils/metricDefs.json"]}, + package_data={"arroyo": ["py.typed", "utils/metricDefs.json"]}, zip_safe=False, install_requires=get_requirements(), classifiers=[ diff --git a/tests/backends/mixins.py b/tests/backends/mixins.py index fcee2043..c9a666ca 100644 --- a/tests/backends/mixins.py +++ b/tests/backends/mixins.py @@ -113,17 +113,10 @@ def _revocation_callback(partitions: Sequence[Partition]) -> None: assert value.offset == messages[0].offset assert value.payload == messages[0].payload - assert consumer.commit_offsets() == {} - consumer.stage_offsets(value.committable) - assert consumer.commit_offsets() == {Partition(topic, 0): value.next_offset} - consumer.stage_offsets({Partition(Topic("invalid"), 0): 0}) - with pytest.raises(ConsumerError): - consumer.commit_offsets() - assert consumer.tell() == {Partition(topic, 0): messages[1].offset} consumer.unsubscribe() @@ -173,9 +166,6 @@ def _revocation_callback(partitions: Sequence[Partition]) -> None: # stage_positions does not validate anything consumer.stage_offsets({}) - with pytest.raises(RuntimeError): - consumer.commit_offsets() - consumer.close() # should be safe, even if the consumer is already closed consumer = self.get_consumer(group) diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index f01e2aa3..0d2acda7 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -19,7 +19,7 @@ build_kafka_configuration, ) from arroyo.backends.kafka.consumer import as_kafka_configuration_bool -from arroyo.commit import IMMEDIATE, Commit +from arroyo.commit import Commit from arroyo.errors import ConsumerError, EndOfPartition from arroyo.processing.processor import StreamProcessor from arroyo.processing.strategies.abstract import MessageRejected @@ -192,7 +192,6 @@ def test_lenient_offset_reset_latest(self) -> None: consumer.stage_offsets( {partition: message.offset}, ) - consumer.commit_offsets() with closing(self.get_consumer(strict_offset_reset=False)) as consumer: consumer.subscribe([topic]) @@ -201,9 +200,6 @@ def test_lenient_offset_reset_latest(self) -> None: assert result_message.payload.key == b"a" assert result_message.payload.value == b"0" - # make sure we reset our offset now - consumer.commit_offsets() - def test_auto_offset_reset_error(self) -> None: with self.get_topic() as topic: with closing(self.get_producer()) as producer: @@ -226,7 +222,7 @@ def test_consumer_stream_processor_shutdown(self) -> None: producer.produce(topic, next(self.get_payloads())).result(5.0) with closing(self.get_consumer()) as consumer: - processor = StreamProcessor(consumer, topic, factory, IMMEDIATE) + processor = StreamProcessor(consumer, topic, factory) with pytest.raises(RuntimeError): processor.run() @@ -262,7 +258,7 @@ def test_assign_partition_during_pause(self) -> None: ) processor_a = StreamProcessor( - consumer_a, topic, factory, IMMEDIATE, handle_poll_while_paused=True + consumer_a, topic, factory, handle_poll_while_paused=True ) def wait_until_consumer_pauses(processor: StreamProcessor[Any]) -> None: @@ -290,7 +286,7 @@ def wait_until_consumer_pauses(processor: StreamProcessor[Any]) -> None: # subscribe with another consumer, now we should have rebalancing in the next few polls processor_b = StreamProcessor( - consumer_b, topic, factory, IMMEDIATE, handle_poll_while_paused=True + consumer_b, topic, factory, handle_poll_while_paused=True ) for _ in range(10): @@ -331,7 +327,7 @@ def test_consumer_polls_when_paused(self) -> None: ) as consumer: producer.produce(topic, next(self.get_payloads())).result(5.0) - processor = StreamProcessor(consumer, topic, factory, IMMEDIATE) + processor = StreamProcessor(consumer, topic, factory) # Wait for the consumer to subscribe and first message to be processed for _ in range(1000): @@ -363,7 +359,7 @@ def test_consumer_polls_when_paused(self) -> None: assert consumer.paused() == [] def test_auto_commit_mode(self) -> None: - """Test that auto-commit mode uses store_offsets and commits on close""" + """Test that auto-commit uses store_offsets and commits on close""" group_id = uuid.uuid1().hex with self.get_topic() as topic: @@ -373,11 +369,10 @@ def test_auto_commit_mode(self) -> None: payload = KafkaPayload(None, f"msg_{i}".encode("utf8"), []) producer.produce(topic, payload).result(5.0) - # Create consumer with auto-commit enabled + # Create consumer configuration = { **self.configuration, "auto.offset.reset": "earliest", - "arroyo.enable.auto.commit": True, "group.id": group_id, "session.timeout.ms": 10000, } @@ -393,13 +388,9 @@ def test_auto_commit_mode(self) -> None: assert value is not None consumed_offsets.append(value.offset) - # Stage offsets (will use store_offsets internally in auto-commit mode) + # Stage offsets (will use store_offsets internally) consumer.stage_offsets(value.committable) - # commit_offsets should return None in auto-commit mode - result = consumer.commit_offsets() - assert result is None - # Close will commit any stored offsets # Verify we consumed offsets 0-4 diff --git a/tests/backends/test_kafka_commit_callback.py b/tests/backends/test_kafka_commit_callback.py index 5f5e0a9f..16ef94f0 100644 --- a/tests/backends/test_kafka_commit_callback.py +++ b/tests/backends/test_kafka_commit_callback.py @@ -11,7 +11,6 @@ def test_commit_callback_success_metric() -> None: { "bootstrap.servers": "localhost:9092", "group.id": "test-group", - "arroyo.enable.auto.commit": True, } ) @@ -42,7 +41,6 @@ def test_commit_callback_error_metric() -> None: { "bootstrap.servers": "localhost:9092", "group.id": "test-group", - "arroyo.enable.auto.commit": True, } ) diff --git a/tests/processing/strategies/test_all.py b/tests/processing/strategies/test_all.py index 54b1bac2..793454e5 100644 --- a/tests/processing/strategies/test_all.py +++ b/tests/processing/strategies/test_all.py @@ -10,7 +10,6 @@ import pytest -from arroyo.commit import IMMEDIATE from arroyo.dlq import InvalidMessage from arroyo.processing.strategies import ProcessingStrategy from arroyo.processing.strategies.filter import FilterStep @@ -111,9 +110,7 @@ def strategy( if raises_invalid_message: pytest.skip("does not support invalid message") - return FilterStep( - lambda message: message.payload, next_step, commit_policy=IMMEDIATE - ) + return FilterStep(lambda message: message.payload, next_step) def shutdown(self) -> None: pass diff --git a/tests/processing/strategies/test_batching.py b/tests/processing/strategies/test_batching.py index 031f42e0..a23cae02 100644 --- a/tests/processing/strategies/test_batching.py +++ b/tests/processing/strategies/test_batching.py @@ -1,6 +1,6 @@ import time from datetime import datetime -from typing import Any, Callable, Mapping, Sequence, cast, Optional +from typing import Any, Callable, Mapping, Optional, Sequence, cast from unittest.mock import Mock, call, patch import pytest @@ -8,7 +8,7 @@ from arroyo.processing.strategies.abstract import MessageRejected from arroyo.processing.strategies.batching import BatchStep, UnbatchStep, ValuesBatch from arroyo.processing.strategies.run_task import RunTask -from arroyo.types import BrokerValue, Message, Partition, Topic, Value, BaseValue +from arroyo.types import BaseValue, BrokerValue, Message, Partition, Topic, Value NOW = datetime(2022, 1, 1, 0, 0, 1) diff --git a/tests/processing/strategies/test_buffer.py b/tests/processing/strategies/test_buffer.py index e9e5c7fd..5cf26703 100644 --- a/tests/processing/strategies/test_buffer.py +++ b/tests/processing/strategies/test_buffer.py @@ -1,6 +1,6 @@ from datetime import datetime -from unittest.mock import Mock, call from typing import List +from unittest.mock import Mock, call from arroyo.processing.strategies.buffer import Buffer from arroyo.types import BaseValue, Message, Partition, Topic, Value diff --git a/tests/processing/strategies/test_filter.py b/tests/processing/strategies/test_filter.py index f3268f47..4178b55c 100644 --- a/tests/processing/strategies/test_filter.py +++ b/tests/processing/strategies/test_filter.py @@ -1,19 +1,8 @@ from datetime import datetime from unittest.mock import Mock, call -import pytest - -from arroyo.commit import CommitPolicy -from arroyo.processing.strategies.abstract import MessageRejected from arroyo.processing.strategies.filter import FilterStep -from arroyo.types import ( - FILTERED_PAYLOAD, - FilteredPayload, - Message, - Partition, - Topic, - Value, -) +from arroyo.types import FilteredPayload, Message, Partition, Topic, Value from tests.assertions import assert_changes, assert_does_not_change @@ -49,99 +38,6 @@ def test_function(message: Message[bool]) -> bool: filter_step.join() -def test_commit_policy_basic() -> None: - topic = Topic("topic") - next_step = Mock() - - def test_function(message: Message[bool]) -> bool: - return message.payload - - filter_step = FilterStep( - test_function, next_step, commit_policy=CommitPolicy(None, 3) - ) - - now = datetime.now() - - init_message = Message(Value(False, {Partition(topic, 1): 1}, now)) - - filter_step.submit(init_message) - assert next_step.submit.call_count == 0 - - for i in range(2): - fail_message = Message(Value(False, {Partition(topic, 0): i}, now)) - - filter_step.submit(fail_message) - assert next_step.submit.call_count == 0 - - fail_message = Message(Value(False, {Partition(topic, 0): 2}, now)) - filter_step.submit(fail_message) - - # Assert that the filter message kept track of the new offsets across - # partitions, and is flushing them all out since this is the third message - # and according to our commit policy we are supposed to commit at this - # point, roughly. - assert next_step.submit.mock_calls == [ - call( - Message( - Value( - FILTERED_PAYLOAD, {Partition(topic, 1): 1, Partition(topic, 0): 1} - ) - ) - ) - ] - - next_step.submit.reset_mock() - filter_step.join() - assert next_step.submit.mock_calls == [ - call(Message(Value(FILTERED_PAYLOAD, {Partition(topic, 0): 2}))) - ] - next_step.submit.reset_mock() - - fail_message = Message(Value(False, {Partition(topic, 0): 3}, now)) - filter_step.submit(fail_message) - assert next_step.submit.call_count == 0 - assert next_step.join.call_count == 1 - - # Since there was a filtered message with no flush inbetween, join() needs - # to send a filter message to flush out uncommitted offsets. If we do not - # do that forcibly, __commit(force=True) in downstream strategies will do - # nothing. - filter_step.join() - - assert next_step.submit.mock_calls == [ - call(Message(Value(FILTERED_PAYLOAD, {Partition(topic, 0): 3}))) - ] - - assert next_step.join.call_count == 2 - - -def test_commit_policy_filtered_messages_alternating() -> None: - topic = Topic("topic") - next_step = Mock() - now = datetime.now() - - def test_function(message: Message[bool]) -> bool: - return message.payload - - filter_step = FilterStep( - test_function, next_step, commit_policy=CommitPolicy(None, 3) - ) - - filter_step.submit(Message(Value(True, {Partition(topic, 1): 1}, now))) - filter_step.submit(Message(Value(False, {Partition(topic, 1): 2}, now))) - filter_step.submit(Message(Value(True, {Partition(topic, 1): 3}, now))) - filter_step.submit(Message(Value(False, {Partition(topic, 1): 4}, now))) - filter_step.submit(Message(Value(True, {Partition(topic, 1): 5}, now))) - filter_step.submit(Message(Value(False, {Partition(topic, 1): 6}, now))) - - assert next_step.submit.mock_calls == [ - call(Message(Value(True, {Partition(topic, 1): 1}, now))), - call(Message(Value(True, {Partition(topic, 1): 3}, now))), - call(Message(Value(FILTERED_PAYLOAD, {Partition(topic, 1): 4}))), - call(Message(Value(True, {Partition(topic, 1): 5}, now))), - ] - - def test_no_commit_policy_does_not_forward_filtered_messages() -> None: topic = Topic("topic") next_step = Mock() @@ -165,88 +61,3 @@ def test_function(message: Message[bool]) -> bool: call(Message(Value(True, {Partition(topic, 1): 3}, now))), call(Message(Value(True, {Partition(topic, 1): 5}, now))), ] - - -def test_backpressure_in_join() -> None: - topic = Topic("topic") - next_step = Mock() - next_step.submit.side_effect = [None] * 6 + [MessageRejected] # type: ignore - - now = datetime.now() - - def test_function(message: Message[bool]) -> bool: - return message.payload - - filter_step = FilterStep( - test_function, next_step, commit_policy=CommitPolicy(None, 3) - ) - - filter_step.submit(Message(Value(True, {Partition(topic, 1): 1}, now))) - filter_step.submit(Message(Value(False, {Partition(topic, 1): 2}, now))) - filter_step.submit(Message(Value(True, {Partition(topic, 1): 3}, now))) - filter_step.submit(Message(Value(False, {Partition(topic, 1): 4}, now))) - filter_step.submit(Message(Value(True, {Partition(topic, 1): 5}, now))) - filter_step.submit(Message(Value(False, {Partition(topic, 1): 6}, now))) - - filter_step.join() - - assert next_step.submit.mock_calls == [ - call(Message(Value(True, {Partition(topic, 1): 1}, now))), - call(Message(Value(True, {Partition(topic, 1): 3}, now))), - call(Message(Value(FILTERED_PAYLOAD, {Partition(topic, 1): 4}))), - call(Message(Value(True, {Partition(topic, 1): 5}, now))), - call(Message(Value(FILTERED_PAYLOAD, {Partition(topic, 1): 6}))), - ] - - -def test_backpressure_in_submit() -> None: - """ - Assert that MessageRejected is propagated for the right messages, and - handled correctly in join() (i.e. suppressed) - """ - topic = Topic("topic") - next_step = Mock() - next_step.submit.side_effect = [ - MessageRejected, - None, - MessageRejected, - MessageRejected, - None, - ] - - now = datetime.now() - - def test_function(message: Message[bool]) -> bool: - return message.payload - - filter_step = FilterStep( - test_function, next_step, commit_policy=CommitPolicy(None, 3) - ) - - with pytest.raises(MessageRejected): - filter_step.submit(Message(Value(True, {Partition(topic, 1): 1}, now))) - - filter_step.submit(Message(Value(True, {Partition(topic, 1): 1}, now))) - - filter_step.submit(Message(Value(False, {Partition(topic, 1): 2}, now))) - - assert next_step.submit.mock_calls == [ - call(Message(Value(True, {Partition(topic, 1): 1}, now))), - call(Message(Value(True, {Partition(topic, 1): 1}, now))), - ] - - next_step.submit.mock_calls.clear() - - filter_step.join() - - assert next_step.submit.mock_calls == [ - call(Message(Value(FILTERED_PAYLOAD, {Partition(topic, 1): 2}))), - ] - - next_step.submit.mock_calls.clear() - - filter_step.join() - - assert next_step.submit.mock_calls == [ - call(Message(Value(FILTERED_PAYLOAD, {Partition(topic, 1): 2}))), - ] diff --git a/tests/processing/strategies/test_run_task_in_threads.py b/tests/processing/strategies/test_run_task_in_threads.py index aac7334f..67762efb 100644 --- a/tests/processing/strategies/test_run_task_in_threads.py +++ b/tests/processing/strategies/test_run_task_in_threads.py @@ -1,10 +1,11 @@ -from datetime import datetime import time +from datetime import datetime from unittest.mock import Mock + import pytest from arroyo.processing.strategies.run_task_in_threads import RunTaskInThreads -from arroyo.types import BrokerValue, Message, Topic, Partition +from arroyo.types import BrokerValue, Message, Partition, Topic @pytest.mark.parametrize("poll_after_msg", (True, False)) diff --git a/tests/processing/test_processor.py b/tests/processing/test_processor.py index af7599c6..5573ee64 100644 --- a/tests/processing/test_processor.py +++ b/tests/processing/test_processor.py @@ -1,24 +1,15 @@ import time -from datetime import datetime, timedelta -from typing import Any, Mapping, Optional, Sequence, cast +from datetime import datetime +from typing import Any, Optional from unittest import mock import pytest -from arroyo.backends.local.backend import LocalBroker -from arroyo.backends.local.storages.abstract import MessageStorage -from arroyo.backends.local.storages.memory import MemoryMessageStorage -from arroyo.commit import IMMEDIATE, CommitPolicy from arroyo.dlq import DlqPolicy, InvalidMessage from arroyo.processing.processor import InvalidStateError, StreamProcessor from arroyo.processing.strategies import Healthcheck -from arroyo.processing.strategies.abstract import ( - MessageRejected, - ProcessingStrategy, - ProcessingStrategyFactory, -) +from arroyo.processing.strategies.abstract import MessageRejected, ProcessingStrategy from arroyo.types import BrokerValue, Commit, Message, Partition, Topic -from arroyo.utils.clock import MockedClock from tests.assertions import assert_changes, assert_does_not_change from tests.metrics import Increment, TestingMetricsBackend, Timing @@ -34,9 +25,7 @@ def test_stream_processor_lifecycle() -> None: metrics = TestingMetricsBackend with assert_changes(lambda: int(consumer.subscribe.call_count), 0, 1): - processor: StreamProcessor[int] = StreamProcessor( - consumer, topic, factory, IMMEDIATE - ) + processor: StreamProcessor[int] = StreamProcessor(consumer, topic, factory) # The processor should accept heartbeat messages without an assignment or # active processor. @@ -177,9 +166,7 @@ def test_stream_processor_termination_on_error() -> None: factory = mock.Mock() factory.create_with_partitions.return_value = strategy - processor: StreamProcessor[int] = StreamProcessor( - consumer, topic, factory, IMMEDIATE - ) + processor: StreamProcessor[int] = StreamProcessor(consumer, topic, factory) assignment_callback = consumer.subscribe.call_args.kwargs["on_assign"] assignment_callback({Partition(topic, 0): 0}) @@ -212,9 +199,7 @@ def test_stream_processor_invalid_message_from_poll() -> None: factory = mock.Mock() factory.create_with_partitions.return_value = strategy - processor: StreamProcessor[int] = StreamProcessor( - consumer, topic, factory, IMMEDIATE - ) + processor: StreamProcessor[int] = StreamProcessor(consumer, topic, factory) assignment_callback = consumer.subscribe.call_args.kwargs["on_assign"] assignment_callback({Partition(topic, 0): 0}) @@ -254,9 +239,7 @@ def test_stream_processor_invalid_message_from_submit() -> None: factory = mock.Mock() factory.create_with_partitions.return_value = strategy - processor: StreamProcessor[int] = StreamProcessor( - consumer, topic, factory, IMMEDIATE - ) + processor: StreamProcessor[int] = StreamProcessor(consumer, topic, factory) assignment_callback = consumer.subscribe.call_args.kwargs["on_assign"] assignment_callback({Partition(topic, 0): 0}) @@ -292,9 +275,7 @@ def test_stream_processor_create_with_partitions() -> None: factory.create_with_partitions.return_value = strategy with assert_changes(lambda: int(consumer.subscribe.call_count), 0, 1): - processor: StreamProcessor[int] = StreamProcessor( - consumer, topic, factory, IMMEDIATE - ) + processor: StreamProcessor[int] = StreamProcessor(consumer, topic, factory) subscribe_args, subscribe_kwargs = consumer.subscribe.call_args assert subscribe_args[0] == [topic] @@ -344,16 +325,7 @@ def __init__(self, commit: Commit) -> None: self.__commit = commit def submit(self, message: Message[int]) -> None: - # If we get a message with value of 1, force commit - if message.payload == 1: - self.__commit( - message.committable, - force=True, - ) - - self.__commit( - message.committable, - ) + self.__commit(message.committable) def poll(self) -> None: pass @@ -368,191 +340,6 @@ def terminate(self) -> None: pass -class CommitOffsetsFactory(ProcessingStrategyFactory[int]): - def create_with_partitions( - self, - commit: Commit, - partitions: Mapping[Partition, int], - ) -> ProcessingStrategy[int]: - return CommitOffsets(commit) - - -def run_commit_policy_test( - topic: Topic, given_messages: Sequence[Message[int]], policy: CommitPolicy -) -> Sequence[int]: - commit = mock.Mock() - consumer = mock.Mock() - consumer.tell.return_value = {} - consumer.commit_offsets = commit - - factory = CommitOffsetsFactory() - - processor: StreamProcessor[int] = StreamProcessor( - consumer, - topic, - factory, - policy, - ) - - # Assignment - subscribe_args, subscribe_kwargs = consumer.subscribe.call_args - assert subscribe_args[0] == [topic] - assignment_callback = subscribe_kwargs["on_assign"] - offsets = {Partition(topic, 0): 0} - assignment_callback(offsets) - - assert commit.call_count == 0 - - commit_calls = [] - - for message in given_messages: - consumer.poll.return_value = message.value - message_timestamp = cast(BrokerValue[int], message.value).timestamp - with mock.patch("time.time", return_value=message_timestamp.timestamp()): - processor._run_once() - commit_calls.append(commit.call_count) - - return commit_calls - - -def test_stream_processor_commit_policy() -> None: - topic = Topic("topic") - - commit_every_second_message = CommitPolicy(None, 2) - - assert run_commit_policy_test( - topic, - [ - Message(BrokerValue(0, Partition(topic, 0), 0, datetime.now())), - Message(BrokerValue(0, Partition(topic, 0), 1, datetime.now())), - Message(BrokerValue(0, Partition(topic, 0), 2, datetime.now())), - Message(BrokerValue(0, Partition(topic, 0), 5, datetime.now())), - Message(BrokerValue(0, Partition(topic, 0), 10, datetime.now())), - ], - commit_every_second_message, - ) == [ - # Does not commit first message - 0, - # Does commit second message - 1, - # Does not commit third message - 1, - # Should always commit if we are committing more than 2 messages at once. - 2, - 3, - ] - - -def test_stream_processor_commit_policy_multiple_partitions() -> None: - topic = Topic("topic") - - commit_every_second_message = CommitPolicy(None, 2) - - assert run_commit_policy_test( - topic, - [ - Message(BrokerValue(0, Partition(topic, 0), 200, datetime.now())), - Message(BrokerValue(0, Partition(topic, 1), 400, datetime.now())), - Message(BrokerValue(0, Partition(topic, 0), 400, datetime.now())), - Message(BrokerValue(0, Partition(topic, 1), 400, datetime.now())), - ], - commit_every_second_message, - ) == [ - # Does not commit first message even if the offset is super large - 0, - # Does not commit first message on other partition even if the offset is super large - 0, - # Does commit second message on first partition since the offset delta is super large - 1, - # Does not commit second message on second partition since the offset delta is zero - 1, - ] - - -def test_stream_processor_commit_policy_always() -> None: - topic = Topic("topic") - - assert run_commit_policy_test( - topic, - [Message(BrokerValue(0, Partition(topic, 0), 200, datetime.now()))], - IMMEDIATE, - ) == [ - # IMMEDIATE policy can commit on the first message (even - # though there is no previous offset stored) - # - # Indirectly assert that an offset delta of 1 is passed to - # the commit policy, not 0 - 1 - ] - - -def test_stream_processor_commit_policy_every_two_seconds() -> None: - topic = Topic("topic") - commit_every_two_seconds = CommitPolicy(2, None) - - now = datetime.now() - - assert run_commit_policy_test( - topic, - [ - Message(BrokerValue(0, Partition(topic, 0), 0, now)), - Message(BrokerValue(0, Partition(topic, 0), 1, now + timedelta(seconds=1))), - Message(BrokerValue(0, Partition(topic, 0), 2, now + timedelta(seconds=2))), - Message(BrokerValue(0, Partition(topic, 0), 3, now + timedelta(seconds=3))), - Message(BrokerValue(0, Partition(topic, 0), 4, now + timedelta(seconds=4))), - Message(BrokerValue(0, Partition(topic, 0), 5, now + timedelta(seconds=5))), - Message(BrokerValue(0, Partition(topic, 0), 6, now + timedelta(seconds=6))), - ], - commit_every_two_seconds, - ) == [ - 0, - 0, - 0, - 1, - 1, - 2, - 2, - ] - - -@pytest.mark.parametrize( - "commit_seconds", [0, 1000], ids=("commit_always", "commit_never") -) -@pytest.mark.parametrize("num_messages", [1, 100, 1000]) -def test_commit_policy_bench( - benchmark: Any, commit_seconds: int, num_messages: int -) -> None: - topic = Topic("topic") - commit_policy = CommitPolicy(commit_seconds, None) - num_partitions = 1 - now = datetime.now() - - storage: MessageStorage[int] = MemoryMessageStorage() - storage.create_topic(topic, num_partitions) - - broker = LocalBroker(storage, MockedClock()) - - consumer = broker.get_consumer("test-group", enable_end_of_partition=True) - - factory = CommitOffsetsFactory() - - processor: StreamProcessor[int] = StreamProcessor( - consumer, - topic, - factory, - commit_policy, - ) - - def inner() -> None: - for i in range(num_messages): - storage.produce(Partition(topic, i % num_partitions), i, now) - - for _ in range(num_messages): - processor._run_once() - - benchmark(inner) - - def test_dlq() -> None: topic = Topic("topic") partition = Partition(topic, 0) @@ -567,7 +354,7 @@ def test_dlq() -> None: dlq_policy: Any = DlqPolicy(producer=mock.Mock()) processor: StreamProcessor[int] = StreamProcessor( - consumer, topic, factory, IMMEDIATE, dlq_policy + consumer, topic, factory, dlq_policy ) # Assignment @@ -605,7 +392,6 @@ def test_healthcheck(tmp_path: Any) -> None: consumer, topic, factory, - IMMEDIATE, ) # Assignment @@ -632,9 +418,7 @@ def test_processor_pause_with_invalid_message() -> None: factory = mock.Mock() factory.create_with_partitions.return_value = strategy - processor: StreamProcessor[int] = StreamProcessor( - consumer, topic, factory, IMMEDIATE - ) + processor: StreamProcessor[int] = StreamProcessor(consumer, topic, factory) # Subscribe to topic subscribe_args, subscribe_kwargs = consumer.subscribe.call_args @@ -691,6 +475,7 @@ def test_processor_pause_with_invalid_message() -> None: processor._run_once() assert strategy.submit.call_args_list[-1] == mock.call(new_message) + def test_processor_poll_while_paused() -> None: topic = Topic("topic") @@ -701,7 +486,7 @@ def test_processor_poll_while_paused() -> None: factory.create_with_partitions.return_value = strategy processor: StreamProcessor[int] = StreamProcessor( - consumer, topic, factory, IMMEDIATE, handle_poll_while_paused=True + consumer, topic, factory, handle_poll_while_paused=True ) # Subscribe to topic @@ -755,7 +540,7 @@ def test_processor_poll_while_paused() -> None: new_message = Message(BrokerValue(0, new_partition, 1, datetime.now())) consumer.poll.return_value = new_message.value processor._run_once() - assert processor._StreamProcessor__is_paused is False # type:ignore + assert processor._StreamProcessor__is_paused is False # type:ignore strategy.submit.return_value = None strategy.submit.side_effect = None diff --git a/tests/test_commit.py b/tests/test_commit.py deleted file mode 100644 index 4f81d06c..00000000 --- a/tests/test_commit.py +++ /dev/null @@ -1,46 +0,0 @@ -import time - -import pytest - -from arroyo.commit import IMMEDIATE, ONCE_PER_SECOND, CommitPolicy -from arroyo.types import Partition, Topic - - -def test_commit_policy() -> None: - partition = Partition(Topic("test"), 0) - - state = IMMEDIATE.get_state_machine() - ts = time.time() + 0.5 - offset = 1 - assert state.should_commit(ts, {partition: offset}) is True - ts += 2 - offset += 1 - assert state.should_commit(ts, {partition: offset}) is True - ts += 0.5 - offset += 5 - assert state.should_commit(ts, {partition: offset}) is True - - state = ONCE_PER_SECOND.get_state_machine() - ts = time.time() + 0.5 - offset = 10 - assert state.should_commit(ts, {partition: offset}) is False - ts += 1 - offset += 10 - assert state.should_commit(ts, {partition: offset}) is True - ts += 1.5 - offset += 10 - assert state.should_commit(ts, {partition: offset}) is True - - state = CommitPolicy(2, 100).get_state_machine() - ts = time.time() + 1 - offset = 99 - assert state.should_commit(ts, {partition: offset}) is False - ts += 2 - offset += 1 - assert state.should_commit(ts, {partition: offset}) is True - ts += 1 - offset += 101 - assert state.should_commit(ts, {partition: offset}) is True - - with pytest.raises(Exception): - CommitPolicy(None, None) diff --git a/tests/test_kip848_e2e.py b/tests/test_kip848_e2e.py index 2cc4a5dc..63f8b983 100644 --- a/tests/test_kip848_e2e.py +++ b/tests/test_kip848_e2e.py @@ -1,21 +1,20 @@ -from typing import Any - -import time import contextlib -from contextlib import closing +import logging import os import threading -import logging -from typing import Iterator, Mapping +import time +from contextlib import closing +from typing import Any, Iterator, Mapping from confluent_kafka.admin import AdminClient, NewTopic -from arroyo.types import Commit, Message, Partition, Topic + +from arroyo.backends.kafka import KafkaProducer from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload -from arroyo.processing.strategies import RunTask, CommitOffsets, ProcessingStrategy -from arroyo.processing.strategies.abstract import ProcessingStrategyFactory from arroyo.processing.processor import StreamProcessor -from arroyo.backends.kafka import KafkaProducer +from arroyo.processing.strategies import CommitOffsets, ProcessingStrategy, RunTask +from arroyo.processing.strategies.abstract import ProcessingStrategyFactory +from arroyo.types import Commit, Message, Partition, Topic logging.basicConfig(level=logging.INFO)