Unify task identity: collapse _uuid into deterministic task_id#2036
Conversation
Greptile SummaryThis PR replaces the random per-task
Confidence Score: 4/5Generally safe to merge; the core lineage hash is correct and the migration is mechanical. One structural defect in Pipeline.build() could produce silently wrong source/sink role assignments if stage objects are reused across builds. The core lineage hash and migration are mechanically correct. _assign_source_sink_roles mutates stage instances in-place during build(), so a stage reused across two pipelines at different positions will have the wrong source/sink role in the second pipeline, silently producing incorrect lineage assignments. nemo_curator/pipeline/pipeline.py — the in-place mutation in _assign_source_sink_roles Important Files Changed
Sequence DiagramsequenceDiagram
participant P as Pipeline.build()
participant S as ProcessingStage (default)
participant T as Task._set_lineage
participant FG as FileGroupTask.get_deterministic_id
P->>P: _assign_source_sink_roles()
Note over P: mutates is_source_stage / is_sink_stage on stage instances
Note over S: process_batch(tasks)
S->>S: process(task) to children
loop for each child
alt "is_source_stage and child.get_deterministic_id() != None"
S->>FG: get_deterministic_id()
FG-->>S: content hash (12-char hex)
S->>T: _set_lineage([parent._lineage_path], content_hash)
else
S->>T: _set_lineage([parent._lineage_path], positional_index)
end
T->>T: "_lineage_path = join(parent_paths + segment)"
T->>T: "task_id = sha256(_lineage_path)[:32]"
end
S-->>P: children with deterministic task_ids
Reviews (2): Last reviewed commit: "Add is_source_stage / is_sink_stage flag..." | Re-trigger Greptile |
| assert len(set(task_ids)) == 3, f"Expected unique task_id per task, got {task_ids}" | ||
| # Each child has a non-empty lineage path with the parent's path as prefix. | ||
| for i, t in enumerate(output): | ||
| assert t._lineage_path == f"_{i}" or t._lineage_path == str(i), t._lineage_path |
There was a problem hiding this comment.
The first branch of this assertion (
f"_{i}") can never be true. The parent task starts with _lineage_path = "", which is filtered out by _set_lineage's empty-string guard, so children always get _lineage_path = str(i) (e.g. "0", not "_0"). The dead branch obscures the actual invariant being tested.
| assert t._lineage_path == f"_{i}" or t._lineage_path == str(i), t._lineage_path | |
| assert t._lineage_path == str(i), t._lineage_path |
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash → task_id and dropped user-set task_id. Adjusts the resumability adapter, client, and tests to the new field names + reformats per ruff/black.
praateekmahajan
left a comment
There was a problem hiding this comment.
Mostly LGTM, left some feedback re tests and clarifying questions
| original_file = segments_sorted[0].get("original_file", "unknown") | ||
|
|
||
| combined = self._concatenate(original_file, segments_sorted, task.task_id, task.dataset_name) | ||
| combined = self._concatenate(original_file, segments_sorted, task.dataset_name) |
There was a problem hiding this comment.
@mohammadaaftabv can you / someone from audio review if this is fine? We're removing task_id
| def extract_and_write(self) -> list[FileGroupTask]: | ||
| self._check_actor_obj() | ||
| current_band_min, current_band_max = self._current_band_range | ||
| _current_band_min, _current_band_max = self._current_band_range |
| def test_defaults_first_stage_to_source(self) -> None: | ||
| s0 = _NoopStage(name="s0") | ||
| s1 = _NoopStage(name="s1") | ||
| s2 = _NoopStage(name="s2") | ||
| Pipeline(name="t", stages=[s0, s1, s2]).build() | ||
| assert s0.is_source_stage is True | ||
| assert s1.is_source_stage is False | ||
| assert s2.is_source_stage is False | ||
|
|
||
| def test_defaults_last_stage_to_sink(self) -> None: | ||
| s0 = _NoopStage(name="s0") | ||
| s1 = _NoopStage(name="s1") | ||
| s2 = _NoopStage(name="s2") | ||
| Pipeline(name="t", stages=[s0, s1, s2]).build() | ||
| assert s2.is_sink_stage is True | ||
| assert s0.is_sink_stage is False | ||
| assert s1.is_sink_stage is False |
There was a problem hiding this comment.
nit: AI overdoes on the unittests and writes too many lines.. can we possibly consolidate into a single test maybe within test_pipelines.py where we have class TestPipelineBuild and that has a test that says tests_default_first_source_last_sing_stage(self) which tests both these things?
That way any future changes to pipeline.build(..) also live there
There was a problem hiding this comment.
Yup probably we can do that. @oyilmaz-nvidia what do you think?
| # limitations under the License. | ||
|
|
||
| import uuid | ||
| from __future__ import annotations |
There was a problem hiding this comment.
why do we need future?
There was a problem hiding this comment.
I think it was since we had int | str or something in the typedef. Probably AI dump to support older Python version too. Can we removed?
| t = _new() | ||
| # Empty strings in parent paths are stripped (EmptyTask's default | ||
| # _lineage_path is ""). | ||
| t._set_lineage(["", "5", ""], 3) |
There was a problem hiding this comment.
When can this happen?
There was a problem hiding this comment.
It cannot. First empty is due to an Empty Task, and the second cannot occur.
There was a problem hiding this comment.
Wait until get_deteministic_hash gives "" as the output. I think unrelated, but this is a bug. We need to ensure that get_deteministic_hash cannot return an empty string. @oyilmaz-nvidia I might be wrong, but that sound correct to you/
| assert t._lineage_path == "root_abc123" | ||
|
|
||
|
|
||
| class TestGetDeterministicId: |
There was a problem hiding this comment.
Can this test be inside tasks/test_file_group_tasks.py where we have a test for file group task where we do this... under TestFileGroupTask def test_deterministic_ids(..): in which we gtest few different asserts at once..
| return None # always filter out | ||
|
|
||
|
|
||
| class TestDefaultProcessBatchAssignsLineage: |
There was a problem hiding this comment.
These tests are good, but can we add a new test method here https://github.com/NVIDIA-NeMo/Curator/blob/main/tests/backends/test_integration.py#L53
Where we do test_lineage() / test_task_ids()
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash → task_id and dropped user-set task_id. Adjusts the resumability adapter, client, and tests to the new field names + reformats per ruff/black.
8fd345a to
ac66f5f
Compare
|
/ok to test 75b9298 |
|
/ok to test 5757fba |
| def _assign_source_sink_roles(self) -> None: | ||
| explicit_sources = [s for s in self.stages if s.is_source_stage] | ||
| if len(explicit_sources) > 1: | ||
| names = [type(s).__name__ for s in explicit_sources] |
There was a problem hiding this comment.
Is type(s).__name__ different than stage.name property?
There was a problem hiding this comment.
They should be the same
|
|
||
| explicit_sinks = [s for s in self.stages if s.is_sink_stage] | ||
| if len(explicit_sinks) > 1: | ||
| names = [type(s).__name__ for s in explicit_sinks] |
There was a problem hiding this comment.
Is type(s).name different than stage.name property?
| for tid in task_ids: | ||
| segments = tid.split("_") | ||
| assert all(segments), f"task_id {tid!r} has an empty id segment" |
There was a problem hiding this comment.
If in these pipeline they're deterministic can we just test those ids? Or do we have a fan-in stage here making them non-deterministic?
There was a problem hiding this comment.
Hmm, wondering if we know the exact IDs...
| @@ -0,0 +1,150 @@ | |||
| # Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. | |||
There was a problem hiding this comment.
So far our tests have one is to one mapping of source file to test file (for the most part), which makes it super easy to reason in a large code base where the logic of that particular thing is implemented? Would there be any hesitation to move this to a corresponding source test file?
There was a problem hiding this comment.
For me bunch of these tests seem like integration tests of running a series of stages for a given task and getting an output, so can we try to replicate this in our tests/backends/test_integration.py
There was a problem hiding this comment.
Sure. We can move these tests around.
| ``task_id`` is framework-owned: stages must NOT set it. The executor | ||
| adapter (``BaseStageAdapter._post_process_task_ids``) assigns a | ||
| deterministic id to every emitted task — regardless of whether | ||
| a stage uses this default or overrides ``process_batch``. Where the | ||
| input→output mapping is ambiguous (e.g. a batch aggregation), the | ||
| adapter falls back to a random ``"r"``-prefixed id (see | ||
| ``Task.task_id``); there is no way for a stage to supply its own. |
There was a problem hiding this comment.
Are you sure ruff is running on this? Aren't the characters - / → problematic? Can you ensure pre-commit did run on these because our CI still doesn't catch them
There was a problem hiding this comment.
The precommits are def running. Not sure why this was not caught. Can look at this later.
| (returning a flat list rather than a per-input slot) cannot be mapped | ||
| positionally; if its length happens to equal the input length the 1:1 | ||
| assumption may misattribute parents. That combination is unsupported | ||
| until per-slot sentinels (NoneTask/FailedTask) land in a later PR. |
There was a problem hiding this comment.
Are NoneTask's or can we have None?
There was a problem hiding this comment.
We can have None, we repalce None with NoneTask anyways in the next PR> not sure what is question is though
| parent_id = input_tasks[0].task_id | ||
| out: list[Task] = [t for t in output_tasks if t is not None] | ||
| for i, task in enumerate(out): | ||
| suffix = (task.get_deterministic_id() or i) if is_source else i |
There was a problem hiding this comment.
If something was not a source and wanted to still override get_deterministic_id do we not want to support that?
There was a problem hiding this comment.
Nope. I think the current code assumption is that only the source can have a deterministic ID. This makes a few assumptions easy; all other IDs will be numbers except one. Very easy to say which one is the source ID (We do store the source ID as well). Finally, I think there is no specific purpose that deterministic_ids are serving. Later down the road, we might implement a uniqueness check for the source ID, but doing it for 2 ids would be much more difficult.
| """Add performance stats for a stage.""" | ||
| self._stage_perf.append(perf_stats) | ||
|
|
||
| def _set_task_id(self, parent_task_ids: list[str], current_task_id_suffix: str | int) -> None: |
There was a problem hiding this comment.
Why is parent_task_ids a list? Since only 1-> 1 is supported or N->1 will parent ever be a list? It should be just that parent's single task id right?
There was a problem hiding this comment.
Yeah it should just be a string
User-visible: `Task.task_id` is now a deterministic 12-char hex hash
derived from the task's lineage path through the pipeline DAG, set by
the framework. User-provided `task_id=...` values continue to work (no
breaking API change) but are always overwritten by the framework's
`_set_lineage` once the task flows through any stage.
Two pipelines run on the same inputs produce byte-identical `task_id`s
across all tasks. This replaces the random `_uuid` field (which is now
dropped) for use cases like deterministic output filenames in dedup
stages, IDs added by `AddId`, etc.
Changes:
- `Task` (nemo_curator/tasks/tasks.py):
- DROP `_uuid` field.
- ADD `_lineage_path` field (init=False, default="").
- ADD `_set_lineage(parent_lineage_paths, child_segment)` method.
Always overwrites both `_lineage_path` and `task_id`. The
`child_segment` can be a positional index (int) for plain emissions
or a content-based string (e.g. from `get_deterministic_id()`) for
source-stage emissions where stability across input reordering
matters.
- ADD `get_deterministic_id() -> str | None` method (default None).
Subclasses with stable content override.
- `FileGroupTask` (nemo_curator/tasks/file_group.py):
- Override `get_deterministic_id()` to return
`get_deterministic_hash(sorted(self.data))` — stable across runs
even if files are added/removed between launches.
- `ProcessingStage.process_batch` (nemo_curator/stages/base.py):
- Default implementation now calls `_set_lineage` on each emitted
child. Stages that override `process_batch` are responsible for
calling `_set_lineage` themselves.
- Migrate 6 `_uuid` use sites to `task.task_id`:
- `stages/text/modules/add_id.py:71`
- `stages/deduplication/semantic/kmeans.py:235, 395`
- `stages/deduplication/fuzzy/buckets_to_edges.py:83`
- `stages/deduplication/fuzzy/connected_components.py:176`
- `stages/deduplication/fuzzy/minhash.py:309`
Tests:
- `tests/tasks/test_tasks.py`: updated fanout test to use
`process_batch` (which exercises the new lineage assignment) and
assert on `task_id` instead of `_uuid`.
- `tests/tasks/test_lineage.py` (NEW): unit tests covering
`_set_lineage` (always overwrites, filters empty parent paths,
accepts string segments), `get_deterministic_id` (default None,
FileGroupTask content-hash with stable sort), and the default
`process_batch` lineage assignment.
This PR is a prerequisite for the resumability work in #2033, which
will be rebased on top to use `task.task_id` as the per-task dedup key
instead of a separate `_deterministic_lineage_path_hash` field.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Makes PR-A self-contained: `get_deterministic_id()` is now actually used. The default `process_batch` calls it for stages flagged `is_source_stage=True` and falls back to the positional index otherwise. - Add `is_source_stage: bool = False` and `is_sink_stage: bool = False` ClassVars on `ProcessingStage`. - `Pipeline.build()` now auto-assigns the first stage as source and the last as sink if no stage is explicitly marked. Raises on multiple explicit marks. The sink flag itself has no behavior in this PR — it exists for the resumability layer in a follow-up. - Default `process_batch` uses `child.get_deterministic_id() or i` as the lineage segment when the stage is a source. Content-based ids make `task_id` stable across input reordering for stages whose Task subclass implements `get_deterministic_id` (currently `FileGroupTask`). - Mark `FilePartitioningStage` as `is_source_stage = True` so the default reader path gets content-based ids out of the box. - Tests: `TestSourceStageSegment` (3 tests covering content-hash use, positional fallback, and non-source-stage passthrough); `TestSourceSinkRoleAssignment` in new `tests/pipelines/test_pipeline_roles.py` (defaults, explicit overrides, multi-mark validation). - Also: move `StagePerfStats` import to `TYPE_CHECKING` block in tasks.py (ruff TC001). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
task_id is now framework-controlled — it is assigned exclusively by
`_set_lineage` at each stage boundary. Users no longer have any way
to set it.
- Make `Task.task_id` `init=False, default=""` (was a required `__init__` arg).
- Drop EmptyTask's `task_id="empty"` literal.
- Strip `task_id=...` kwargs from all 78 Task constructors across
nemo_curator/stages/.
- Strip `task_id=...` from all test fixtures (107 files).
- Delete ~40 test assertions like `assert result.task_id == "test_task"`
that exercised the old user-set-task_id-passes-through behavior.
- Drop now-dead helper params (`task_id` arg to `_concatenate`, conftest
helpers, etc.) and dead loop counters that only existed to format task_ids.
Output filenames that previously derived from `task_id` (e.g. `f"{task_id}.parquet"`)
still work — they now use the lineage-derived sha256-32 hash, which is
deterministic across runs.
…ng source flag - Rename `child_segment`/`segment` -> `current_task_id_suffix` (this task's own segment of the lineage path). Clearer per review. - Move `get_deterministic_hash` to `nemo_curator/utils/hash_utils.py` so non-text modalities (FileGroupTask, dedup) don't pull in text deps. The text `writer/utils.py` re-exports it for backward compat. - Drop `is_source_stage = True` from FilePartitioningStage; rely on Pipeline.build()'s "first stage is source" default, so users who put a different stage first (e.g. URL generation) get the right behavior. - Fix dead-branch assertion in test_tasks.py (parent lineage is always "" -> filtered, so children are str(i), never "_i").
…t hash imports, test reorg
- task_id is now the readable underscore-joined lineage path itself (no
sha256). Easier to debug; collapses the redundant `_lineage_path` field.
`_set_lineage(parent_task_ids, current_task_id_suffix)`.
- Drop `from __future__ import annotations` from tasks.py; restore the
runtime StagePerfStats import (project requires py>=3.11, so `str | int`
is native).
- get_deterministic_hash lives only in nemo_curator/utils/hash_utils.py;
all call sites import it directly (no writer/utils re-export). Fix the
3 writer tests that mocked it via the writer_utils namespace.
- Test reorg per review:
* get_deterministic_hash tests -> tests/utils/test_hash_utils.py
* FileGroupTask id test -> tests/tasks/test_file_group_tasks.py
(consolidated into one test_deterministic_ids)
* role-assignment tests -> tests/pipelines/test_pipelines.py
TestPipelineBuild (7 -> 3); delete test_pipeline_roles.py
* add test_task_id_lineage to backends/test_integration.py
Patch the consuming module's name directly with a string target instead of importing the module as megatron_mod just to use patch.object.
Move all task_id/lineage assignment into a single place — BaseStageAdapter._post_process_task_ids — instead of the default ProcessingStage.process_batch. Every backend adapter subclasses BaseStageAdapter, so this runs for every stage on every backend and makes no difference whether a stage defines `process` or overrides `process_batch`. This closes the gap where overriding `process_batch` produced empty task_ids and restores the old "every task always has an id" guarantee. - stages/base.py: process_batch reverts to the plain per-task loop (no lineage). is_source/is_sink flags + get_deterministic_id stay (declarations). - backends/base.py: _post_process_task_ids assigns ids unconditionally — source content-id (rooted), 1:N, positional M:M, and a random uuid for ambiguous batch fan-out so task_id is never empty. None results dropped. - tasks.py: _EmptyTask.task_id defaults to "0", the implicit lineage root. - pipeline.py: assign_root_lineage (moved here) roots user initial_tasks at "0" → "0_0", "0_1", … ; run() calls it. - tests: process_batch no longer assigns ids (test_tasks asserts that); lineage behavior moved to tests/backends/test_task_id_postprocess.py; test_lineage keeps the _set_lineage/root/get_deterministic_id primitives.
…ness _post_process_task_ids conflated two independent things: the input→output mapping (which determines an output's parent) and whether the stage is a source (which determines the segment: content id vs index). The old `if is_source` branch hard-coded "all outputs are children of tasks[0]", which is only valid for 1→N — a source stage can also be N→N (each input → one partition), where each output must descend from its positional parent. Now the mapping shape (1→N vs positional M:M vs ambiguous) picks the parent and the content-id-vs-index choice is applied within each.
…id-move
- Ambiguous batch fan-out (lineage can't be derived) now assigns
"r"+uuid; documented in Task.task_id that an "r" prefix means
non-deterministic / lineage-not-tracked.
- assign_root_lineage: keep positional roots (NOT content-hash) to avoid
double content-hashing when a source stage is also present; commented.
- Fix tests that assumed the old per-stage task_id / _uuid behavior
(validated against a baseline run of main in the same container — these
were the only PR regressions; all other suite failures are pre-existing
env issues: image codecs, pdfium, video MOTION_VECTORS, ray version):
* test_merge_file_prefixes: rename over-stripped helper arg task_id->prefix
* writer jsonl/parquet/megatron: uuid call_count 2->1 (no more _uuid)
* add_id: assign distinct lineage ids to the two batches (adapter does this)
* drop task_id-suffix assertions in sortformer/dedup/aesthetic/nsfw/
convert/nemotron_cc/multimodal_reader (stages no longer set task_id)
…ntract
- test_kmeans output-filename test: stop asserting output_task.task_id ==
filename. KMeans is an aggregating stage so its terminal _EmptyTask output
ids are the non-deterministic "r<uuid>" fallback; the written FILES remain
deterministic ("0_<file_hash>_<subgroup>.parquet"). Assert that filename
pattern + centroid partitioning instead.
- process_batch docstring: state explicitly that task_id is framework-owned
and stages must not set it (no escape hatch).
main added nemo_curator/stages/audio/alm/pretrain after this branch's base; those files were written against the old settable task_id API and broke the stages-audio CPU job (_EmptyTask.__init__() got unexpected kwarg 'task_id'). Strip task_id= from the new production constructors (extraction.py, io.py) and alm tests, and drop the now-unused task_id helper param. Same mechanical change applied to the rest of the codebase. No task_id assertions in these tests, so no further changes needed.
…rams Per review on _post_process_task_ids: - (1) Do NOT drop None outputs before the length check. A filter stage that returns None in a slot (e.g. [T1, None, T3] for 3 inputs) keeps the list length, so each surviving output still maps to its OWN parent instead of being shifted/misattributed (or wrongly falling into the ambiguous uuid branch). None slots are dropped from the returned list afterward. Per-slot sentinels (NoneTask/FailedTask) for the filter+fanout combo come in a later PR. - Rename params tasks/results -> input_tasks/output_tasks for clarity. - Add a regression test (test_filter_stage_keeps_positional_alignment).
…path task_id now embodies the parent->child id path, so there is no separate "lineage" concept/field/method. Rename and de-jargon accordingly: - _set_lineage -> _set_task_id - assign_root_lineage -> assign_root_task_ids - tests/tasks/test_lineage.py -> test_task_id.py (classes/methods renamed: TestSetTaskId, TestRootTaskIds; *_gets_id, *_is_reassigned, etc.) - replace "lineage" wording in docstrings/comments with task_id / id-path / ancestry language across tasks.py, stages/base.py, backends/base.py, pipeline.py and the affected tests. Pure rename + docs; no behavior change.
The merge of main (PR #2038, translation control knobs) re-introduced task_id= constructor args in the translation segmentation/reassembly stages and a translation pipeline test, breaking Unit_Test_stages-text_CPU (DocumentBatch.__init__() got an unexpected keyword argument 'task_id'). Strip them — same mechanical change as the rest of the PR.
|
/ok to test 8e373de |
…idate task_id tests
- Task._set_task_id takes a single parent_task_id (str) instead of a list:
every supported mapping (1->1, 1->N, N->N) gives an output exactly one
parent; N->1 aggregations don't track ancestry (they get an "r"-prefixed
id in the adapter). Update the two adapter call sites and the root
assignment. (Praateek)
- _assign_source_sink_roles reports stage.name, not type(s).__name__, in the
multiple-source/sink error messages — consistent with the rest of
pipeline.py and the user-facing identifier. (Praateek)
- Consolidate the task_id tests next to the source they cover:
* Task id primitives (_set_task_id, get_deterministic_id default) ->
tests/tasks/test_tasks.py
* assign_root_task_ids -> tests/pipelines/test_pipelines.py::TestRootTaskIds
* delete tests/tasks/test_task_id.py (contents distributed)
* slim tests/backends/test_task_id_postprocess.py to the cases a real
pipeline can't easily reach (filter-None alignment, ambiguous->r-uuid,
in-place re-derive, source content-id vs index)
* enrich tests/backends/test_integration.py::test_task_ids to assert the
deterministic id STRUCTURE end-to-end (rooted at "0", 12-hex source-hash
segment, uniform path depth). Exact id strings aren't assertable: the
source hashes temp-dir paths, so the hash varies per run. (Praateek)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
|
/ok to test 28fd4cd |
Summary
Collapses the random per-task `_uuid` field into a deterministic `task_id` set by the framework. `task_id` is now derived from the task's lineage path through the pipeline DAG (a sha256-32 hash); two runs of the same pipeline on the same inputs produce byte-identical `task_id`s across all tasks.
This is a small, low-risk prerequisite for the resumability work in #2033, which will be rebased on top to use `task.task_id` as the per-task dedup key instead of a separate `_deterministic_lineage_path_hash` field.
What changes
Why no breaking change at the constructor
`Task.task_id` is still a regular `init=True` constructor argument. User code that passes `task_id="file_group_3"` continues to work — the value is just immediately overwritten by `_set_lineage` when the task flows through any stage. The framework retains the ability to enforce `init=False` strictly in a future cleanup PR, but doing it here would force migrating ~55 internal constructor calls and isn't needed for the resumability work.
Test plan
Future work
🤖 Generated with Claude Code