-
Notifications
You must be signed in to change notification settings - Fork 9
Deprecate zero-copy keyword argument in @ez.subscriber
#216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/sync-lowlevel-api
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| import time | ||
| import inspect | ||
| import functools | ||
| import warnings | ||
| from .stream import InputStream, OutputStream | ||
| from .component import ComponentMeta, Component | ||
| from .settings import Settings | ||
|
|
@@ -23,6 +24,8 @@ | |
| LEAKY_ATTR = "__ez_leaky__" | ||
| MAX_QUEUE_ATTR = "__ez_max_queue__" | ||
|
|
||
| _ZERO_COPY_SENTINEL = object() | ||
|
|
||
|
|
||
| class UnitMeta(ComponentMeta): | ||
| def __init__( | ||
|
|
@@ -162,17 +165,18 @@ def pub_factory(func): | |
| return pub_factory | ||
|
|
||
|
|
||
| def subscriber(stream: InputStream, zero_copy: bool = False): | ||
| def subscriber(stream: InputStream, zero_copy: Any = _ZERO_COPY_SENTINEL): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice |
||
| """ | ||
| A decorator for a method that subscribes to a stream in the task/messaging thread. | ||
|
|
||
| An async function will run once per message received from the :obj:`InputStream` | ||
| it subscribes to. A function can have both ``@subscriber`` and ``@publisher`` decorators. | ||
|
|
||
| The ``zero_copy`` argument is deprecated and ignored. Subscribers always receive | ||
| zero-copy messages, so callers can omit it. | ||
|
Comment on lines
+175
to
+176
|
||
|
|
||
| :param stream: The input stream to subscribe to | ||
| :type stream: InputStream | ||
| :param zero_copy: Whether to use zero-copy message passing (default: False) | ||
| :type zero_copy: bool | ||
| :return: Decorated function that can subscribe to the stream | ||
| :rtype: collections.abc.Callable | ||
| :raises ValueError: If stream is not an InputStream | ||
|
|
@@ -183,20 +187,30 @@ def subscriber(stream: InputStream, zero_copy: bool = False): | |
|
|
||
| INPUT = ez.InputStream(Message) | ||
|
|
||
| @subscriber(INPUT) | ||
| async def print_message(self, message: Message) -> None: | ||
| print(message) | ||
| @subscriber(INPUT) | ||
| async def print_message(self, message: Message) -> None: | ||
| print(message) | ||
| """ | ||
|
|
||
| if not isinstance(stream, InputStream): | ||
| raise ValueError(f"Cannot subscribe to object of type {type(stream)}") | ||
|
|
||
| if zero_copy is not _ZERO_COPY_SENTINEL: | ||
| warnings.warn( | ||
| "The `zero_copy` argument to @subscriber is deprecated and ignored. " | ||
| "Zero-copy behavior is now determined by the InputStream's `leaky` property " | ||
| "(non-leaky subscribers use zero-copy; leaky subscribers receive deep-copied " | ||
| "messages). Remove any explicit `zero_copy=...` usage.", | ||
| DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
|
|
||
| def sub_factory(func): | ||
| subscribed_streams: InputStream | None = getattr(func, SUBSCRIBES_ATTR, None) | ||
| if subscribed_streams is not None: | ||
| raise Exception(f"{func} cannot subscribe to more than one stream") | ||
| setattr(func, SUBSCRIBES_ATTR, stream) | ||
| setattr(func, ZERO_COPY_ATTR, zero_copy) | ||
| setattr(func, ZERO_COPY_ATTR, True) | ||
| setattr(func, LEAKY_ATTR, stream.leaky) | ||
| setattr(func, MAX_QUEUE_ATTR, stream.max_queue) | ||
| return task(func) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This removes the safety check that prevented zero-copy message pass-through when
obj is msg. Previously, if a subscriber yielded the exact same object it received, a deepcopy would be created to prevent unintended mutations. Now, the same object reference can be passed through to downstream subscribers. This is a significant behavior change that could lead to mutation bugs if multiple subscribers modify shared message objects. If this is intentional as part of the new "zero-copy by default" design, it should be clearly documented. Otherwise, consider restoring this safety check without the ZERO_COPY_ATTR condition.