Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ jobs:
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.

# Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
# queries: security-extended,security-and-quality


# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
Expand All @@ -56,7 +56,7 @@ jobs:
# ℹ️ Command-line programs to run using the OS shell.
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun

# If the Autobuild fails above, remove it and uncomment the following three lines.
# If the Autobuild fails above, remove it and uncomment the following three lines.
# modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance.

# - run: |
Expand Down
17 changes: 3 additions & 14 deletions arroyo/backends/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,20 +146,9 @@ def seek(self, offsets: Mapping[Partition, int]) -> None:
@abstractmethod
def stage_offsets(self, offsets: Mapping[Partition, int]) -> None:
"""
Stage offsets to be committed. If an offset has already been staged
for a given partition, that offset is overwritten (even if the offset
moves in reverse.)
"""
raise NotImplementedError

@abstractmethod
def commit_offsets(self) -> Optional[Mapping[Partition, int]]:
"""
Commit staged offsets. The return value of this method is a mapping
of streams with their committed offsets as values.

When auto-commit is enabled (in Kafka consumers), returns None since
the broker handles commits automatically.
Stage offsets to be committed. Offsets are automatically committed
by the broker. If an offset has already been staged for a given
partition, that offset is overwritten (even if the offset moves in reverse.)
"""
raise NotImplementedError

Expand Down
7 changes: 2 additions & 5 deletions arroyo/backends/kafka/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ def build_kafka_consumer_configuration(
bootstrap_servers: Optional[Sequence[str]] = None,
override_params: Optional[Mapping[str, Any]] = None,
strict_offset_reset: Optional[bool] = None,
enable_auto_commit: bool = False,
retry_handle_destroyed: bool = False,
) -> KafkaBrokerConfig:

Expand All @@ -254,16 +253,14 @@ def build_kafka_consumer_configuration(
default_config, bootstrap_servers, override_params
)

# Default configuration with manual commit management
# Configuration with auto-commit enabled
config_update = {
"enable.auto.commit": False,
"enable.auto.commit": True,
"enable.auto.offset.store": False,
"group.id": group_id,
"auto.offset.reset": auto_offset_reset,
# this is an arroyo specific flag that only affects the consumer.
"arroyo.strict.offset.reset": strict_offset_reset,
# this is an arroyo specific flag to enable auto-commit mode
"arroyo.enable.auto.commit": enable_auto_commit,
# arroyo specific flag to enable retries when hitting `KafkaError._DESTROY` while committing
"arroyo.retry.broker.handle.destroyed": retry_handle_destroyed,
# overridden to reduce memory usage when there's a large backlog
Expand Down
153 changes: 7 additions & 146 deletions arroyo/backends/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
from arroyo.types import BrokerValue, Partition, Topic
from arroyo.utils.concurrent import execute
from arroyo.utils.metrics import get_metrics
from arroyo.utils.retries import BasicRetryPolicy

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -159,30 +158,6 @@ def __init__(
) -> None:
configuration = dict(configuration)

# Feature flag to enable retrying on `Broker handle destroyed` errors
# which can occur if we attempt to commit during a rebalance when
# the consumer group coordinator changed
self.__retry_handle_destroyed = as_kafka_configuration_bool(
configuration.pop("arroyo.retry.broker.handle.destroyed", False)
)

retryable_errors: Tuple[int, ...] = (
KafkaError.REQUEST_TIMED_OUT,
KafkaError.NOT_COORDINATOR,
KafkaError._WAIT_COORD,
KafkaError.STALE_MEMBER_EPOCH, # kip-848
KafkaError.COORDINATOR_LOAD_IN_PROGRESS,
)
if self.__retry_handle_destroyed:
retryable_errors += (KafkaError._DESTROY,)

commit_retry_policy = BasicRetryPolicy(
3,
1,
lambda e: isinstance(e, KafkaException)
and e.args[0].code() in retryable_errors,
)

self.__is_cooperative_sticky = (
configuration.get("partition.assignment.strategy") == "cooperative-sticky"
)
Expand All @@ -197,13 +172,6 @@ def __init__(
if self.__strict_offset_reset is None:
self.__strict_offset_reset = True

# Feature flag to enable rdkafka auto-commit with store_offsets
# When enabled, offsets are stored via store_offsets() and rdkafka
# automatically commits them periodically
self.__use_auto_commit = as_kafka_configuration_bool(
configuration.pop("arroyo.enable.auto.commit", False)
)

if auto_offset_reset in {"smallest", "earliest", "beginning"}:
self.__resolve_partition_starting_offset = (
self.__resolve_partition_offset_earliest
Expand All @@ -219,32 +187,9 @@ def __init__(
else:
raise ValueError("invalid value for 'auto.offset.reset' configuration")

# When auto-commit is disabled (default), we require explicit configuration
# When auto-commit is enabled, we allow rdkafka to handle commits
if not self.__use_auto_commit:
if (
as_kafka_configuration_bool(
configuration.get("enable.auto.commit", "true")
)
is not False
):
raise ValueError("invalid value for 'enable.auto.commit' configuration")

if (
as_kafka_configuration_bool(
configuration.get("enable.auto.offset.store", "true")
)
is not False
):
raise ValueError(
"invalid value for 'enable.auto.offset.store' configuration"
)
else:
# In auto-commit mode, enable auto.commit and keep auto.offset.store disabled
# We'll use store_offsets() manually to control which offsets get committed
configuration["enable.auto.commit"] = True
configuration["enable.auto.offset.store"] = False
configuration["on_commit"] = self.__on_commit_callback
configuration["enable.auto.commit"] = True
configuration["enable.auto.offset.store"] = False
configuration["on_commit"] = self.__on_commit_callback

# NOTE: Offsets are explicitly managed as part of the assignment
# callback, so preemptively resetting offsets is not enabled when
Expand All @@ -257,11 +202,8 @@ def __init__(
)

self.__offsets: MutableMapping[Partition, int] = {}
self.__staged_offsets: MutableMapping[Partition, int] = {}
self.__paused: Set[Partition] = set()

self.__commit_retry_policy = commit_retry_policy

self.__state = KafkaConsumerState.CONSUMING

self.__metrics = get_metrics()
Expand Down Expand Up @@ -397,16 +339,6 @@ def revocation_callback(
on_revoke(partitions)
finally:
for partition in partitions:
# Staged offsets are deleted during partition revocation to
# prevent later committing offsets for partitions that are
# no longer owned by this consumer.
if partition in self.__staged_offsets:
logger.warning(
"Dropping staged offset for revoked partition (%r)!",
partition,
)
del self.__staged_offsets[partition]

try:
self.__offsets.pop(partition)
except KeyError:
Expand Down Expand Up @@ -634,86 +566,15 @@ def stage_offsets(self, offsets: Mapping[Partition, int]) -> None:
# TODO: Maybe log a warning if these offsets exceed the current
# offsets, since that's probably a side effect of an incorrect usage
# pattern?
if self.__use_auto_commit:
# When auto-commit is enabled, use store_offsets to stage offsets
# for rdkafka to auto-commit
if offsets:
self.__consumer.store_offsets(
offsets=[
ConfluentTopicPartition(
partition.topic.name, partition.index, offset
)
for partition, offset in offsets.items()
]
)
else:
# Default behavior: manually track staged offsets
self.__staged_offsets.update(offsets)

def __commit(self) -> Mapping[Partition, int]:
if self.__state in {KafkaConsumerState.CLOSED, KafkaConsumerState.ERROR}:
raise InvalidState(self.__state)

if self.__staged_offsets.keys() - self.__offsets.keys():
raise ConsumerError("cannot stage offsets for unassigned partitions")

self.__validate_offsets(self.__staged_offsets)

result: Optional[Sequence[ConfluentTopicPartition]]

if self.__staged_offsets:
result = self.__consumer.commit(
if offsets:
self.__consumer.store_offsets(
offsets=[
ConfluentTopicPartition(
partition.topic.name, partition.index, offset
)
for partition, offset in self.__staged_offsets.items()
],
asynchronous=False,
for partition, offset in offsets.items()
]
)
else:
result = []

assert result is not None # synchronous commit should return result immediately

offsets: MutableMapping[Partition, int] = {}

for value in result:
# The Confluent Kafka Consumer will include logical offsets in the
# sequence of ``Partition`` objects returned by ``commit``. These
# are an implementation detail of the Kafka Consumer, so we don't
# expose them here.
# NOTE: These should no longer be seen now that we are forcing
# offsets to be set as part of the assignment callback.
if value.offset in self.LOGICAL_OFFSETS:
continue

assert value.offset >= 0, "expected non-negative offset"
partition = Partition(Topic(value.topic), value.partition)
offsets[partition] = value.offset

self.__staged_offsets.clear()

return offsets

def commit_offsets(self) -> Optional[Mapping[Partition, int]]:
"""
Commit staged offsets for all partitions that this consumer is
assigned to. The return value of this method is a mapping of
partitions with their committed offsets as values.

When auto-commit is enabled, returns None since rdkafka handles
commits automatically and we don't track which offsets were committed.

Raises an ``InvalidState`` if called on a closed consumer.
"""
if self.__use_auto_commit:
# When auto-commit is enabled, rdkafka commits automatically
# We don't track what was committed, so return None
# The offsets have already been staged via store_offsets()
return None
else:
return self.__commit_retry_policy.call(self.__commit)

def close(self, timeout: Optional[float] = None) -> None:
"""
Expand Down
13 changes: 0 additions & 13 deletions arroyo/backends/local/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ def __init__(
self.__pending_callbacks: Deque[Callable[[], None]] = deque()

self.__offsets: MutableMapping[Partition, int] = {}
self.__staged_offsets: MutableMapping[Partition, int] = {}

self.__paused: Set[Partition] = set()

Expand Down Expand Up @@ -208,7 +207,6 @@ def subscribe(
)
)

self.__staged_offsets.clear()
self.__last_eof_at.clear()

def unsubscribe(self) -> None:
Expand All @@ -225,7 +223,6 @@ def unsubscribe(self) -> None:
)
self.__subscription = None

self.__staged_offsets.clear()
self.__last_eof_at.clear()

def poll(
Expand Down Expand Up @@ -321,18 +318,10 @@ def seek(self, offsets: Mapping[Partition, int]) -> None:
self.__offsets.update(offsets)

def stage_offsets(self, offsets: Mapping[Partition, int]) -> None:
with self.__lock:
# XXX: can we remove the locking? dictionary updates might be
# atomic
self.__staged_offsets.update(offsets)

def commit_offsets(self) -> Optional[Mapping[Partition, int]]:
with self.__lock:
if self.__closed:
raise RuntimeError("consumer is closed")

offsets = {**self.__staged_offsets}

if offsets.keys() - self.__offsets.keys():
raise ConsumerError("cannot stage offsets for unassigned partitions")

Expand All @@ -341,10 +330,8 @@ def commit_offsets(self) -> Optional[Mapping[Partition, int]]:
self,
offsets,
)
self.__staged_offsets.clear()

self.commit_offsets_calls += 1
return offsets

def close(self, timeout: Optional[float] = None) -> None:
with self.__lock:
Expand Down
Loading
Loading