-
Notifications
You must be signed in to change notification settings - Fork 287
Unify task identity: collapse _uuid into deterministic task_id #2036
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f8012b6
a8f2c40
429530d
d4de31c
735c334
9ae3b9d
4c6de4b
4c9d0d5
1792a35
2ebec4a
d72847e
e4f4f18
cec16f6
24bc08c
28fd4cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| import uuid | ||
| from abc import ABC, abstractmethod | ||
| from dataclasses import dataclass | ||
| from typing import TYPE_CHECKING, Any | ||
|
|
@@ -84,6 +85,9 @@ def process_batch(self, tasks: list[Task]) -> list[Task]: | |
| # Use the batch processing logic | ||
| results = self.stage.process_batch(tasks) | ||
|
|
||
| # Guarantee every emitted task has a task_id (derived id, or uuid fallback). | ||
| results = self._post_process_task_ids(tasks, results) | ||
|
|
||
| # Log performance stats and add to result tasks | ||
| _, stage_perf_stats = self._timer.log_stats() | ||
| # Consume and attach any custom metrics recorded by the stage during this call | ||
|
|
@@ -95,6 +99,75 @@ def process_batch(self, tasks: list[Task]) -> list[Task]: | |
|
|
||
| return results | ||
|
|
||
| def _post_process_task_ids(self, input_tasks: list[Task], output_tasks: list[Task | None]) -> list[Task]: | ||
| """Assign a deterministic ``task_id`` to every emitted task. | ||
|
|
||
| This is the single place task ids are assigned — it runs for every | ||
| stage on every backend (all backend adapters subclass this), so it | ||
| makes no difference whether a stage defines ``process`` or overrides | ||
| ``process_batch``. ``task_id`` is the task's id path (parents + own segment); ids are | ||
| re-derived at each stage boundary so the same object passing through | ||
| N stages gets N ids. | ||
|
|
||
| The input→output mapping decides each output's PARENT; whether the | ||
| stage is a source decides each output's SEGMENT (content id vs index) | ||
| — the two are independent. ``None`` outputs (Curator's "return None to | ||
| filter") are NOT removed before the length check — keeping them in | ||
| place preserves positional alignment for filter stages — and are then | ||
| dropped from the returned list. | ||
|
|
||
| - single input → every output is its child (fan-out): ``parent_<seg>`` | ||
| - ``len(output) == len(input)`` → positional 1:1: each ``parent_i_<seg>``; | ||
| a ``None`` slot just means input ``i`` was filtered. | ||
| - any other (ambiguous) cardinality across a batch → a random ``uuid`` | ||
| prefixed with ``"r"`` (e.g. ``"r3f9a…"``), so ``task_id`` is never | ||
| empty even when a derived id is not possible. The ``"r"`` prefix flags | ||
| the id as non-deterministic / ancestry-not-tracked (see | ||
| ``Task.task_id`` docstring). | ||
|
|
||
| ``seg`` is the output's content id (``Task.get_deterministic_id()``) | ||
| for a source stage when available, else the positional index — so a | ||
| source partition keeps a stable id across reorderings regardless of | ||
| whether the source is 1→N or N→N. | ||
|
|
||
| Note: a stage that BOTH filters and fans out within a single batch | ||
| (returning a flat list rather than a per-input slot) cannot be mapped | ||
| positionally; if its length happens to equal the input length the 1:1 | ||
| assumption may misattribute parents. That combination is unsupported | ||
| until per-slot sentinels (NoneTask/FailedTask) land in a later PR. | ||
| """ | ||
| is_source = getattr(self.stage, "is_source_stage", False) | ||
|
|
||
| if len(input_tasks) == 1: | ||
| # Fan-out (incl. a source reading from EmptyTask): every non-None | ||
| # output is a child of the single input. | ||
| parent_id = input_tasks[0].task_id | ||
| out: list[Task] = [t for t in output_tasks if t is not None] | ||
| for i, task in enumerate(out): | ||
| suffix = (task.get_deterministic_id() or i) if is_source else i | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If something was not a source and wanted to still override get_deterministic_id do we not want to support that?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope. I think the current code assumption is that only the source can have a deterministic ID. This makes a few assumptions easy; all other IDs will be numbers except one. Very easy to say which one is the source ID (We do store the source ID as well). Finally, I think there is no specific purpose that deterministic_ids are serving. Later down the road, we might implement a uniqueness check for the source ID, but doing it for 2 ids would be much more difficult. |
||
| task._set_task_id(parent_id, suffix) | ||
| return out | ||
|
|
||
| if len(output_tasks) == len(input_tasks): | ||
| # Positional 1:1. None is kept above so a filtered slot still lines | ||
| # up with its own parent; drop the None slots from the result. | ||
| out = [] | ||
| for parent, task in zip(input_tasks, output_tasks, strict=True): | ||
| if task is None: | ||
| continue | ||
| suffix = (task.get_deterministic_id() or 0) if is_source else 0 | ||
| task._set_task_id(parent.task_id, suffix) | ||
| out.append(task) | ||
| return out | ||
|
|
||
| # Ambiguous cardinality across a batch: a derived id is not possible. Use a | ||
| # random "r"-prefixed uuid so task_id is non-empty but clearly flagged | ||
| # non-deterministic. | ||
| out = [t for t in output_tasks if t is not None] | ||
| for task in out: | ||
| task.task_id = "r" + uuid.uuid4().hex | ||
| return out | ||
|
|
||
| def setup_on_node(self, node_info: NodeInfo | None = None, worker_metadata: WorkerMetadata | None = None) -> None: | ||
| """Setup the stage on a node. | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are
NoneTask's or can we haveNone?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have None, we repalce None with NoneTask anyways in the next PR> not sure what is question is though