Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
name: Checkout code
- uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e # v2
with:
python-version: 3.12
python-version: 3.13
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand All @@ -34,7 +34,7 @@ jobs:
name: Checkout code
- uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e # v2
with:
python-version: 3.12
python-version: 3.13
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand All @@ -48,7 +48,7 @@ jobs:
strategy:
max-parallel: 5
matrix:
python: [3.9, "3.10", "3.11", "3.12", "3.13"]
python: ["3.10", "3.11", "3.12", "3.13"]
timeout-minutes: 10
steps:
- uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2
Expand Down
6 changes: 4 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ repos:
- id: flake8
language_version: python3.12
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v0.961'
rev: 'v1.19.1'
hooks:
- id: mypy
args: [--config-file, mypy.ini, --strict]
additional_dependencies: [pytest==7.1.2]
additional_dependencies: [pytest==7.1.2, confluent-kafka==2.13.2]
pass_filenames: false
entry: mypy . --config-file mypy.ini --strict
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
Expand Down
75 changes: 49 additions & 26 deletions arroyo/backends/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
Tuple,
Type,
Union,
cast,
)

from confluent_kafka import (
Expand Down Expand Up @@ -163,21 +162,18 @@ def __init__(
KafkaError.REQUEST_TIMED_OUT,
KafkaError.NOT_COORDINATOR,
KafkaError._WAIT_COORD,
KafkaError.STALE_MEMBER_EPOCH, # kip-848
KafkaError.COORDINATOR_LOAD_IN_PROGRESS,
)

commit_retry_policy = BasicRetryPolicy(
3,
1,
lambda e: isinstance(e, KafkaException)
and isinstance(e.args[0], KafkaError)
and e.args[0].code() in retryable_errors,
)

self.__is_cooperative_sticky = (
configuration.get("partition.assignment.strategy") == "cooperative-sticky"
or configuration.get("group.protocol") == "consumer"
)
self.__is_kip848 = configuration.get("group.protocol") == "consumer"
auto_offset_reset = configuration.get("auto.offset.reset", "largest")

# This is a special flag that controls the auto offset behavior for
Expand Down Expand Up @@ -335,7 +331,7 @@ def subscribe(
raise InvalidState(self.__state)

def assignment_callback(
consumer: ConfluentConsumer, partitions: Sequence[ConfluentTopicPartition]
consumer: ConfluentConsumer, partitions: list[ConfluentTopicPartition]
) -> None:
if not partitions:
logger.info("skipping empty assignment")
Expand All @@ -362,6 +358,20 @@ def assignment_callback(

self.__assign(offsets)

if self.__is_kip848:
# For KIP-848, incremental_assign must be called after
# offset resolution but before on_assign, so that seeks
# issued inside on_assign work correctly. Calling it
# earlier (inside __assign at the top of the callback)
# fails with _UNKNOWN_PARTITION because rdkafka hasn't
# fully processed the assignment at that point.
self.__consumer.incremental_assign(
[
ConfluentTopicPartition(p.topic.name, p.index, offsets[p])
for p in offsets
]
)

# Ensure that all partitions are resumed on assignment to avoid
# carrying over state from a previous assignment.
self.resume([p for p in offsets])
Expand All @@ -378,11 +388,14 @@ def assignment_callback(
logger.info("Paused partitions after assignment: %s", self.__paused)

def revocation_callback(
consumer: ConfluentConsumer, partitions: Sequence[ConfluentTopicPartition]
consumer: ConfluentConsumer,
confluent_partitions: Sequence[ConfluentTopicPartition],
) -> None:
self.__state = KafkaConsumerState.REVOKING

partitions = [Partition(Topic(i.topic), i.partition) for i in partitions]
partitions = [
Partition(Topic(i.topic), i.partition) for i in confluent_partitions
]

try:
if on_revoke is not None:
Expand Down Expand Up @@ -475,9 +488,15 @@ def poll(
if error is not None:
code = error.code()
if code == KafkaError._PARTITION_EOF:
topic = message.topic()
partition = message.partition()
offset = message.offset()
assert topic is not None
assert partition is not None
assert offset is not None
raise EndOfPartition(
Partition(Topic(message.topic()), message.partition()),
message.offset(),
Partition(Topic(topic), partition),
offset,
)
elif code == KafkaError._TRANSPORT:
raise TransportError(str(error))
Expand All @@ -489,15 +508,21 @@ def poll(
else:
raise ConsumerError(str(error))

headers: Optional[Headers] = message.headers()
headers: Optional[Headers] = message.headers() # type: ignore[assignment, unused-ignore]
topic = message.topic()
partition = message.partition()
offset = message.offset()
assert topic is not None
assert partition is not None
assert offset is not None
broker_value = BrokerValue(
KafkaPayload(
message.key(),
message.value(),
message.value() or b"",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tombstone messages silently converted to empty bytes

Medium Severity

message.value() or b"" converts Kafka tombstone messages (where value() returns None) into empty bytes b"", making them indistinguishable from legitimately empty messages. On compacted topics, tombstones signal record deletion, and any downstream logic that needs to detect tombstones loses that signal. While KafkaPayload.value is typed as bytes, the previous code did pass None through at runtime, so any existing consumers relying on that for tombstone detection will silently break.

Fix in Cursor Fix in Web

headers if headers is not None else [],
),
Partition(Topic(message.topic()), message.partition()),
message.offset(),
Partition(Topic(topic), partition),
offset,
datetime.utcfromtimestamp(message.timestamp()[1] / 1000.0),
)
self.__offsets[broker_value.partition] = broker_value.next_offset
Expand Down Expand Up @@ -529,9 +554,7 @@ def __assign(self, offsets: Mapping[Partition, int]) -> None:
ConfluentTopicPartition(partition.topic.name, partition.index, offset)
for partition, offset in offsets.items()
]
if self.__is_cooperative_sticky:
self.__consumer.incremental_assign(partitions)
else:
if not self.__is_kip848:
self.__consumer.assign(partitions)
self.__offsets.update(offsets)

Expand Down Expand Up @@ -729,7 +752,7 @@ def closed(self) -> bool:

@property
def member_id(self) -> str:
member_id: str = self.__consumer.memberid()
member_id: str = self.__consumer.memberid() or ""
return member_id


Expand All @@ -738,7 +761,7 @@ def __init__(
self, configuration: Mapping[str, Any], use_simple_futures: bool = False
) -> None:
self.__configuration = configuration
self.__producer = ConfluentKafkaProducer(configuration)
self.__producer = ConfluentKafkaProducer(dict(configuration))
self.__shutdown_requested = Event()

# The worker must execute in a separate thread to ensure that callbacks
Expand Down Expand Up @@ -815,7 +838,7 @@ def produce(
produce(
value=payload.value,
key=payload.key,
headers=payload.headers,
headers=list(payload.headers),
on_delivery=partial(self.__delivery_callback, future, payload),
)
return future
Expand All @@ -832,13 +855,13 @@ def close(self) -> Future[None]:
METRICS_FREQUENCY_SEC = 1.0


class ConfluentProducer(ConfluentKafkaProducer): # type: ignore[misc]
class ConfluentProducer(ConfluentKafkaProducer): # type: ignore[misc, unused-ignore]
"""
A thin wrapper for confluent_kafka.Producer that adds metrics reporting.
"""

def __init__(self, configuration: Mapping[str, Any]) -> None:
super().__init__(configuration)
super().__init__(dict(configuration))
self.producer_name = configuration.get("client.id") or None
self.__metrics = get_metrics()
self.__produce_counters: MutableMapping[str, int] = defaultdict(int)
Expand Down Expand Up @@ -873,7 +896,7 @@ def produce(self, *args: Any, **kwargs: Any) -> None:
on_delivery = kwargs.pop("on_delivery", None)
user_callback = callback or on_delivery
wrapped_callback = self.__delivery_callback(user_callback)
super().produce(*args, on_delivery=wrapped_callback, **kwargs)
super().produce(*args, **kwargs, on_delivery=wrapped_callback) # type: ignore[misc]

def __flush_metrics(self) -> None:
for status, count in self.__produce_counters.items():
Expand All @@ -887,10 +910,10 @@ def __flush_metrics(self) -> None:
)
self.__reset_metrics()

def flush(self, timeout: float = -1) -> int:
def flush(self, timeout: float | None = -1) -> int:
# Kafka producer flush should flush metrics too
self.__flush_metrics()
return cast(int, super().flush(timeout))
return super().flush(timeout) # type: ignore[arg-type, unused-ignore]

def __reset_metrics(self) -> None:
self.__produce_counters.clear()
Expand Down
1 change: 1 addition & 0 deletions arroyo/backends/local/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def unsubscribe(self) -> None:
if self.__closed:
raise RuntimeError("consumer is closed")

assert self.__subscription is not None
self.__pending_callbacks.append(
partial(
self.__revoke,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ def __getitem__(self, index: int) -> TBatchValue:
pickle.loads(
data,
buffers=[
self.block.buf[offset : offset + length].tobytes()
# mypy doesn't support slice indexing on memoryview
self.block.buf[offset : offset + length].tobytes() # type: ignore[index]
for offset, length in buffers
],
),
Expand Down Expand Up @@ -159,7 +160,8 @@ def buffer_callback(buffer: PickleBuffer) -> None:
f"Value exceeds available space in block, {length} "
f"bytes needed but {self.block.size - offset} bytes free."
)
self.block.buf[offset : offset + length] = value
# mypy doesn't support slice indexing on memoryview
self.block.buf[offset : offset + length] = value # type: ignore[index]
self.__offset += length
buffers.append((offset, length))

Expand Down
4 changes: 2 additions & 2 deletions arroyo/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class Value(BaseValue[TMessagePayload]):
"""

__slots__ = ["__payload", "__committable"]
__payload: TMessagePayload
__payload: TMessagePayload # type: ignore[misc]
__committable: Mapping[Partition, int]
__timestamp: Optional[datetime]

Expand Down Expand Up @@ -163,7 +163,7 @@ class BrokerValue(BaseValue[TMessagePayload]):
"""

__slots__ = ["__payload", "partition", "offset", "timestamp"]
__payload: TMessagePayload
__payload: TMessagePayload # type: ignore[misc]
partition: Partition
offset: int
timestamp: datetime
Expand Down
2 changes: 1 addition & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[mypy]
python_version = 3.12
python_version = 3.13

[mypy-avro.*]
ignore_missing_imports = True
Expand Down
2 changes: 1 addition & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pytest==8.4.1
pytest-benchmark==4.0.0
mypy==0.961
mypy==1.19.1
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
confluent-kafka>=2.11.0,<2.12.0
confluent-kafka>=2.13.2
5 changes: 2 additions & 3 deletions tests/backends/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@


class StreamsTestMixin(ABC, Generic[TStrategyPayload]):
cooperative_sticky = False
kip_848 = False

@abstractmethod
Expand Down Expand Up @@ -454,7 +453,7 @@ def wait_until_rebalancing(

wait_until_rebalancing(consumer_a, consumer_b)

if self.cooperative_sticky or self.kip_848:
if self.kip_848:
# within incremental rebalancing, only one partition should have been reassigned to the consumer_b, and consumer_a should remain paused
# Either partition 0 or 1 might be the paused one
assert len(consumer_a.paused()) == 1
Expand Down Expand Up @@ -483,7 +482,7 @@ def wait_until_rebalancing(

assert len(consumer_b.tell()) == 2

if self.cooperative_sticky or self.kip_848:
if self.kip_848:
consumer_a_on_assign.assert_has_calls(
[
mock.call({Partition(topic, 0): 0, Partition(topic, 1): 0}),
Expand Down
6 changes: 3 additions & 3 deletions tests/backends/test_confluent_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_metrics_callback_records_success(self) -> None:
{"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"}
)
mock_message = mock.Mock(spec=ConfluentMessage)
producer._ConfluentProducer__metrics_delivery_callback(None, mock_message)
producer._ConfluentProducer__metrics_delivery_callback(None, mock_message) # type: ignore[attr-defined]
producer.flush() # Flush buffered metrics
assert (
Increment(
Expand All @@ -44,7 +44,7 @@ def test_metrics_callback_records_error(self) -> None:
producer = ConfluentProducer({"bootstrap.servers": "fake:9092"})
mock_error = mock.Mock(spec=KafkaError)
mock_message = mock.Mock(spec=ConfluentMessage)
producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message)
producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message) # type: ignore[attr-defined]
producer.flush() # Flush buffered metrics
assert (
Increment("arroyo.producer.produce_status", 1, {"status": "error"})
Expand All @@ -63,7 +63,7 @@ def user_callback(
) -> None:
user_callback_invoked.append((error, message))

wrapped = producer._ConfluentProducer__delivery_callback(user_callback)
wrapped = producer._ConfluentProducer__delivery_callback(user_callback) # type: ignore[attr-defined]
mock_message = mock.Mock(spec=ConfluentMessage)
wrapped(None, mock_message)
producer.flush() # Flush buffered metrics
Expand Down
Loading
Loading