Skip to content

Unify task identity: collapse _uuid into deterministic task_id#2036

Merged
abhinavg4 merged 15 commits into
mainfrom
abhinavg/task-id-cleanup
Jun 6, 2026
Merged

Unify task identity: collapse _uuid into deterministic task_id#2036
abhinavg4 merged 15 commits into
mainfrom
abhinavg/task-id-cleanup

Conversation

@abhinavg4

Copy link
Copy Markdown
Contributor

Summary

Collapses the random per-task `_uuid` field into a deterministic `task_id` set by the framework. `task_id` is now derived from the task's lineage path through the pipeline DAG (a sha256-32 hash); two runs of the same pipeline on the same inputs produce byte-identical `task_id`s across all tasks.

This is a small, low-risk prerequisite for the resumability work in #2033, which will be rebased on top to use `task.task_id` as the per-task dedup key instead of a separate `_deterministic_lineage_path_hash` field.

What changes

  • `Task` class (`nemo_curator/tasks/tasks.py`)
    • DROP `_uuid` field.
    • ADD `_lineage_path` field (`init=False, default=""`).
    • ADD `_set_lineage(parent_lineage_paths, child_segment)` method. Always overwrites `_lineage_path` and `task_id` (no idempotency check). `child_segment` accepts either a positional `int` index or a content-based `str` (for source-stage emissions).
    • ADD `get_deterministic_id() -> str | None` method (default `None`). Subclasses with stable content override.
  • `FileGroupTask` overrides `get_deterministic_id()` to return `get_deterministic_hash(sorted(self.data))` — stable across runs even if files are added/removed between launches.
  • `ProcessingStage.process_batch` default impl now calls `_set_lineage` per emitted child. Stages that override `process_batch` are responsible for calling it themselves (docstring updated).
  • Migrate 6 `_uuid` use sites to `task.task_id` (all dedup output-filename generators + AddId).

Why no breaking change at the constructor

`Task.task_id` is still a regular `init=True` constructor argument. User code that passes `task_id="file_group_3"` continues to work — the value is just immediately overwritten by `_set_lineage` when the task flows through any stage. The framework retains the ability to enforce `init=False` strictly in a future cleanup PR, but doing it here would force migrating ~55 internal constructor calls and isn't needed for the resumability work.

Test plan

  • `tests/tasks/test_tasks.py`: updated fanout test to call `process_batch` (the path that exercises the new lineage assignment).
  • `tests/tasks/test_lineage.py` (NEW):
    • `_set_lineage` filters empty parent paths, always overwrites, accepts `str` and `int` segments.
    • `get_deterministic_id` returns `None` by default; `FileGroupTask` returns a stable hash regardless of input list order.
    • Default `process_batch` assigns unique lineage paths to fan-out outputs.
    • Same pipeline shape on same inputs produces byte-identical `task_id`s.

Future work

  • "Strict" mode: make `task_id` `init=False` so users can't pass it at all. Requires migrating ~55 internal `task_id=...` constructor calls — mechanical but big diff. Separate PR.
  • Resumability (Pipeline resumability via source-level counter checkpointing #2033) rebases on top of this and uses `task.task_id` as the per-task dedup key.

🤖 Generated with Claude Code

@abhinavg4 abhinavg4 requested review from a team, ayushdg and praateekmahajan as code owners May 27, 2026 23:54
@copy-pr-bot

copy-pr-bot Bot commented May 27, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps

greptile-apps Bot commented May 27, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR replaces the random per-task _uuid with a deterministic task_id derived from a sha256 hash of the task's lineage path through the pipeline DAG. It also introduces is_source_stage/is_sink_stage role flags on ProcessingStage and a role-assignment step in Pipeline.build().

  • Core lineage mechanism (tasks.py, base.py): _set_lineage builds a _lineage_path by joining parent paths and a child segment with \"_\", then hashes it into a 32-char hex task_id. The default process_batch calls this for each emitted child; stages that override process_batch must call it themselves.
  • Source stage identity (file_group.py, file_partitioning.py): FileGroupTask overrides get_deterministic_id() to return a content hash of its sorted file paths; FilePartitioningStage is marked is_source_stage = True; Pipeline._assign_source_sink_roles() defaults the first/last stage to source/sink if none is explicit.
  • Migration (add_id.py, deduplication stages): six _uuid references swapped to task_id; three deduplication process_batch overrides (ConnectedComponentsStage, KMeansReadFitWriteStage) do not yet call _set_lineage on their outputs (noted in previously posted comments).

Confidence Score: 4/5

Generally safe to merge; the core lineage hash is correct and the migration is mechanical. One structural defect in Pipeline.build() could produce silently wrong source/sink role assignments if stage objects are reused across builds.

The core lineage hash and migration are mechanically correct. _assign_source_sink_roles mutates stage instances in-place during build(), so a stage reused across two pipelines at different positions will have the wrong source/sink role in the second pipeline, silently producing incorrect lineage assignments.

nemo_curator/pipeline/pipeline.py — the in-place mutation in _assign_source_sink_roles

Important Files Changed

Filename Overview
nemo_curator/tasks/tasks.py Drops _uuid, adds deterministic _set_lineage / _lineage_path contract. Core logic is sound; separator ambiguity is a latent design hazard for future subclasses.
nemo_curator/stages/base.py Adds is_source_stage/is_sink_stage class attrs and calls _set_lineage in the default process_batch. Clean except that the class-level flags are instance-mutated by Pipeline.build(), causing role pollution across builds.
nemo_curator/pipeline/pipeline.py Adds _assign_source_sink_roles() called inside build(). Mutates stage instances in-place, which silently corrupts role assignments when stages are shared across pipeline builds.
nemo_curator/tasks/file_group.py Adds get_deterministic_id() returning a stable hash of sorted file paths. Clean implementation; correctly uses hex output (no underscores) that is safe with the _ separator.
nemo_curator/stages/deduplication/fuzzy/connected_components.py Migrates _uuid to task_id for output filename. Overrides process_batch without calling _set_lineage, leaving the returned task with empty _lineage_path (previously flagged).
nemo_curator/stages/deduplication/semantic/kmeans.py Migrates _uuid to task_id for filenames. Both _process_batch_single_pass and _predict_write_pass create output _EmptyTask objects without calling _set_lineage (previously flagged).
nemo_curator/stages/text/modules/add_id.py Straightforward swap of _uuid to task_id for document ID prefix. Safe because task_id is always set by _set_lineage before AddId processes a task in a pipeline.
tests/tasks/test_lineage.py New test file with comprehensive coverage of _set_lineage, get_deterministic_id, default process_batch, source stage segment selection, and cross-run determinism. Well-structured.
tests/pipelines/test_pipeline_roles.py New tests for _assign_source_sink_roles covering defaults, explicit overrides, and error cases. Does not cover the stage-reuse mutation scenario.
tests/tasks/test_tasks.py Updated fanout test to use process_batch path. The assertion on line 747 has a dead branch that can never be true (previously flagged).

Sequence Diagram

sequenceDiagram
    participant P as Pipeline.build()
    participant S as ProcessingStage (default)
    participant T as Task._set_lineage
    participant FG as FileGroupTask.get_deterministic_id

    P->>P: _assign_source_sink_roles()
    Note over P: mutates is_source_stage / is_sink_stage on stage instances

    Note over S: process_batch(tasks)
    S->>S: process(task) to children

    loop for each child
        alt "is_source_stage and child.get_deterministic_id() != None"
            S->>FG: get_deterministic_id()
            FG-->>S: content hash (12-char hex)
            S->>T: _set_lineage([parent._lineage_path], content_hash)
        else
            S->>T: _set_lineage([parent._lineage_path], positional_index)
        end
        T->>T: "_lineage_path = join(parent_paths + segment)"
        T->>T: "task_id = sha256(_lineage_path)[:32]"
    end

    S-->>P: children with deterministic task_ids
Loading

Reviews (2): Last reviewed commit: "Add is_source_stage / is_sink_stage flag..." | Re-trigger Greptile

Comment thread tests/tasks/test_tasks.py Outdated
assert len(set(task_ids)) == 3, f"Expected unique task_id per task, got {task_ids}"
# Each child has a non-empty lineage path with the parent's path as prefix.
for i, t in enumerate(output):
assert t._lineage_path == f"_{i}" or t._lineage_path == str(i), t._lineage_path

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The first branch of this assertion (f"_{i}") can never be true. The parent task starts with _lineage_path = "", which is filtered out by _set_lineage's empty-string guard, so children always get _lineage_path = str(i) (e.g. "0", not "_0"). The dead branch obscures the actual invariant being tested.

Suggested change
assert t._lineage_path == f"_{i}" or t._lineage_path == str(i), t._lineage_path
assert t._lineage_path == str(i), t._lineage_path

abhinavg4 added a commit that referenced this pull request May 28, 2026
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash
→ task_id and dropped user-set task_id. Adjusts the resumability adapter,
client, and tests to the new field names + reformats per ruff/black.

@praateekmahajan praateekmahajan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LGTM, left some feedback re tests and clarifying questions

original_file = segments_sorted[0].get("original_file", "unknown")

combined = self._concatenate(original_file, segments_sorted, task.task_id, task.dataset_name)
combined = self._concatenate(original_file, segments_sorted, task.dataset_name)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mohammadaaftabv can you / someone from audio review if this is fine? We're removing task_id

def extract_and_write(self) -> list[FileGroupTask]:
self._check_actor_obj()
current_band_min, current_band_max = self._current_band_range
_current_band_min, _current_band_max = self._current_band_range

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment thread nemo_curator/tasks/file_group.py
Comment thread nemo_curator/stages/file_partitioning.py Outdated
Comment thread tests/pipelines/test_pipeline_roles.py Outdated
Comment on lines +42 to +58
def test_defaults_first_stage_to_source(self) -> None:
s0 = _NoopStage(name="s0")
s1 = _NoopStage(name="s1")
s2 = _NoopStage(name="s2")
Pipeline(name="t", stages=[s0, s1, s2]).build()
assert s0.is_source_stage is True
assert s1.is_source_stage is False
assert s2.is_source_stage is False

def test_defaults_last_stage_to_sink(self) -> None:
s0 = _NoopStage(name="s0")
s1 = _NoopStage(name="s1")
s2 = _NoopStage(name="s2")
Pipeline(name="t", stages=[s0, s1, s2]).build()
assert s2.is_sink_stage is True
assert s0.is_sink_stage is False
assert s1.is_sink_stage is False

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: AI overdoes on the unittests and writes too many lines.. can we possibly consolidate into a single test maybe within test_pipelines.py where we have class TestPipelineBuild and that has a test that says tests_default_first_source_last_sing_stage(self) which tests both these things?

That way any future changes to pipeline.build(..) also live there

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup probably we can do that. @oyilmaz-nvidia what do you think?

Comment thread nemo_curator/tasks/tasks.py Outdated
# limitations under the License.

import uuid
from __future__ import annotations

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need future?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was since we had int | str or something in the typedef. Probably AI dump to support older Python version too. Can we removed?

Comment thread nemo_curator/tasks/tasks.py Outdated
Comment thread tests/tasks/test_lineage.py Outdated
t = _new()
# Empty strings in parent paths are stripped (EmptyTask's default
# _lineage_path is "").
t._set_lineage(["", "5", ""], 3)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When can this happen?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot. First empty is due to an Empty Task, and the second cannot occur.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait until get_deteministic_hash gives "" as the output. I think unrelated, but this is a bug. We need to ensure that get_deteministic_hash cannot return an empty string. @oyilmaz-nvidia I might be wrong, but that sound correct to you/

Comment thread tests/tasks/test_task_id.py Outdated
assert t._lineage_path == "root_abc123"


class TestGetDeterministicId:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this test be inside tasks/test_file_group_tasks.py where we have a test for file group task where we do this... under TestFileGroupTask def test_deterministic_ids(..): in which we gtest few different asserts at once..

Comment thread tests/tasks/test_lineage.py Outdated
return None # always filter out


class TestDefaultProcessBatchAssignsLineage:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are good, but can we add a new test method here https://github.com/NVIDIA-NeMo/Curator/blob/main/tests/backends/test_integration.py#L53

Where we do test_lineage() / test_task_ids()

abhinavg4 added a commit that referenced this pull request Jun 1, 2026
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash
→ task_id and dropped user-set task_id. Adjusts the resumability adapter,
client, and tests to the new field names + reformats per ruff/black.
@abhinavg4 abhinavg4 force-pushed the abhinavg/task-id-cleanup branch from 8fd345a to ac66f5f Compare June 2, 2026 02:05
@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 75b9298

@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 5757fba

Comment thread nemo_curator/pipeline/pipeline.py Outdated
def _assign_source_sink_roles(self) -> None:
explicit_sources = [s for s in self.stages if s.is_source_stage]
if len(explicit_sources) > 1:
names = [type(s).__name__ for s in explicit_sources]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is type(s).__name__ different than stage.name property?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They should be the same

Comment thread nemo_curator/pipeline/pipeline.py Outdated

explicit_sinks = [s for s in self.stages if s.is_sink_stage]
if len(explicit_sinks) > 1:
names = [type(s).__name__ for s in explicit_sinks]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is type(s).name different than stage.name property?

Comment on lines +156 to +158
for tid in task_ids:
segments = tid.split("_")
assert all(segments), f"task_id {tid!r} has an empty id segment"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If in these pipeline they're deterministic can we just test those ids? Or do we have a fan-in stage here making them non-deterministic?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, wondering if we know the exact IDs...

@@ -0,0 +1,150 @@
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far our tests have one is to one mapping of source file to test file (for the most part), which makes it super easy to reason in a large code base where the logic of that particular thing is implemented? Would there be any hesitation to move this to a corresponding source test file?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me bunch of these tests seem like integration tests of running a series of stages for a given task and getting an output, so can we try to replicate this in our tests/backends/test_integration.py

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. We can move these tests around.

Comment on lines +196 to +202
``task_id`` is framework-owned: stages must NOT set it. The executor
adapter (``BaseStageAdapter._post_process_task_ids``) assigns a
deterministic id to every emitted task — regardless of whether
a stage uses this default or overrides ``process_batch``. Where the
input→output mapping is ambiguous (e.g. a batch aggregation), the
adapter falls back to a random ``"r"``-prefixed id (see
``Task.task_id``); there is no way for a stage to supply its own.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure ruff is running on this? Aren't the characters - / problematic? Can you ensure pre-commit did run on these because our CI still doesn't catch them

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The precommits are def running. Not sure why this was not caught. Can look at this later.

(returning a flat list rather than a per-input slot) cannot be mapped
positionally; if its length happens to equal the input length the 1:1
assumption may misattribute parents. That combination is unsupported
until per-slot sentinels (NoneTask/FailedTask) land in a later PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are NoneTask's or can we have None?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have None, we repalce None with NoneTask anyways in the next PR> not sure what is question is though

parent_id = input_tasks[0].task_id
out: list[Task] = [t for t in output_tasks if t is not None]
for i, task in enumerate(out):
suffix = (task.get_deterministic_id() or i) if is_source else i

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If something was not a source and wanted to still override get_deterministic_id do we not want to support that?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. I think the current code assumption is that only the source can have a deterministic ID. This makes a few assumptions easy; all other IDs will be numbers except one. Very easy to say which one is the source ID (We do store the source ID as well). Finally, I think there is no specific purpose that deterministic_ids are serving. Later down the road, we might implement a uniqueness check for the source ID, but doing it for 2 ids would be much more difficult.

Comment thread nemo_curator/pipeline/pipeline.py
Comment thread nemo_curator/tasks/tasks.py Outdated
"""Add performance stats for a stage."""
self._stage_perf.append(perf_stats)

def _set_task_id(self, parent_task_ids: list[str], current_task_id_suffix: str | int) -> None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is parent_task_ids a list? Since only 1-> 1 is supported or N->1 will parent ever be a list? It should be just that parent's single task id right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it should just be a string

abhinavg4 and others added 14 commits June 5, 2026 16:29
User-visible: `Task.task_id` is now a deterministic 12-char hex hash
derived from the task's lineage path through the pipeline DAG, set by
the framework. User-provided `task_id=...` values continue to work (no
breaking API change) but are always overwritten by the framework's
`_set_lineage` once the task flows through any stage.

Two pipelines run on the same inputs produce byte-identical `task_id`s
across all tasks. This replaces the random `_uuid` field (which is now
dropped) for use cases like deterministic output filenames in dedup
stages, IDs added by `AddId`, etc.

Changes:

- `Task` (nemo_curator/tasks/tasks.py):
  - DROP `_uuid` field.
  - ADD `_lineage_path` field (init=False, default="").
  - ADD `_set_lineage(parent_lineage_paths, child_segment)` method.
    Always overwrites both `_lineage_path` and `task_id`. The
    `child_segment` can be a positional index (int) for plain emissions
    or a content-based string (e.g. from `get_deterministic_id()`) for
    source-stage emissions where stability across input reordering
    matters.
  - ADD `get_deterministic_id() -> str | None` method (default None).
    Subclasses with stable content override.
- `FileGroupTask` (nemo_curator/tasks/file_group.py):
  - Override `get_deterministic_id()` to return
    `get_deterministic_hash(sorted(self.data))` — stable across runs
    even if files are added/removed between launches.
- `ProcessingStage.process_batch` (nemo_curator/stages/base.py):
  - Default implementation now calls `_set_lineage` on each emitted
    child. Stages that override `process_batch` are responsible for
    calling `_set_lineage` themselves.
- Migrate 6 `_uuid` use sites to `task.task_id`:
  - `stages/text/modules/add_id.py:71`
  - `stages/deduplication/semantic/kmeans.py:235, 395`
  - `stages/deduplication/fuzzy/buckets_to_edges.py:83`
  - `stages/deduplication/fuzzy/connected_components.py:176`
  - `stages/deduplication/fuzzy/minhash.py:309`

Tests:
- `tests/tasks/test_tasks.py`: updated fanout test to use
  `process_batch` (which exercises the new lineage assignment) and
  assert on `task_id` instead of `_uuid`.
- `tests/tasks/test_lineage.py` (NEW): unit tests covering
  `_set_lineage` (always overwrites, filters empty parent paths,
  accepts string segments), `get_deterministic_id` (default None,
  FileGroupTask content-hash with stable sort), and the default
  `process_batch` lineage assignment.

This PR is a prerequisite for the resumability work in #2033, which
will be rebased on top to use `task.task_id` as the per-task dedup key
instead of a separate `_deterministic_lineage_path_hash` field.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Makes PR-A self-contained: `get_deterministic_id()` is now actually
used. The default `process_batch` calls it for stages flagged
`is_source_stage=True` and falls back to the positional index otherwise.

- Add `is_source_stage: bool = False` and `is_sink_stage: bool = False`
  ClassVars on `ProcessingStage`.
- `Pipeline.build()` now auto-assigns the first stage as source and the
  last as sink if no stage is explicitly marked. Raises on multiple
  explicit marks. The sink flag itself has no behavior in this PR — it
  exists for the resumability layer in a follow-up.
- Default `process_batch` uses `child.get_deterministic_id() or i` as
  the lineage segment when the stage is a source. Content-based ids
  make `task_id` stable across input reordering for stages whose Task
  subclass implements `get_deterministic_id` (currently `FileGroupTask`).
- Mark `FilePartitioningStage` as `is_source_stage = True` so the
  default reader path gets content-based ids out of the box.
- Tests: `TestSourceStageSegment` (3 tests covering content-hash use,
  positional fallback, and non-source-stage passthrough);
  `TestSourceSinkRoleAssignment` in new `tests/pipelines/test_pipeline_roles.py`
  (defaults, explicit overrides, multi-mark validation).
- Also: move `StagePerfStats` import to `TYPE_CHECKING` block in
  tasks.py (ruff TC001).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
task_id is now framework-controlled — it is assigned exclusively by
`_set_lineage` at each stage boundary. Users no longer have any way
to set it.

- Make `Task.task_id` `init=False, default=""` (was a required `__init__` arg).
- Drop EmptyTask's `task_id="empty"` literal.
- Strip `task_id=...` kwargs from all 78 Task constructors across
  nemo_curator/stages/.
- Strip `task_id=...` from all test fixtures (107 files).
- Delete ~40 test assertions like `assert result.task_id == "test_task"`
  that exercised the old user-set-task_id-passes-through behavior.
- Drop now-dead helper params (`task_id` arg to `_concatenate`, conftest
  helpers, etc.) and dead loop counters that only existed to format task_ids.

Output filenames that previously derived from `task_id` (e.g. `f"{task_id}.parquet"`)
still work — they now use the lineage-derived sha256-32 hash, which is
deterministic across runs.
…ng source flag

- Rename `child_segment`/`segment` -> `current_task_id_suffix` (this task's
  own segment of the lineage path). Clearer per review.
- Move `get_deterministic_hash` to `nemo_curator/utils/hash_utils.py` so
  non-text modalities (FileGroupTask, dedup) don't pull in text deps. The
  text `writer/utils.py` re-exports it for backward compat.
- Drop `is_source_stage = True` from FilePartitioningStage; rely on
  Pipeline.build()'s "first stage is source" default, so users who put a
  different stage first (e.g. URL generation) get the right behavior.
- Fix dead-branch assertion in test_tasks.py (parent lineage is always ""
  -> filtered, so children are str(i), never "_i").
…t hash imports, test reorg

- task_id is now the readable underscore-joined lineage path itself (no
  sha256). Easier to debug; collapses the redundant `_lineage_path` field.
  `_set_lineage(parent_task_ids, current_task_id_suffix)`.
- Drop `from __future__ import annotations` from tasks.py; restore the
  runtime StagePerfStats import (project requires py>=3.11, so `str | int`
  is native).
- get_deterministic_hash lives only in nemo_curator/utils/hash_utils.py;
  all call sites import it directly (no writer/utils re-export). Fix the
  3 writer tests that mocked it via the writer_utils namespace.
- Test reorg per review:
  * get_deterministic_hash tests -> tests/utils/test_hash_utils.py
  * FileGroupTask id test -> tests/tasks/test_file_group_tasks.py
    (consolidated into one test_deterministic_ids)
  * role-assignment tests -> tests/pipelines/test_pipelines.py
    TestPipelineBuild (7 -> 3); delete test_pipeline_roles.py
  * add test_task_id_lineage to backends/test_integration.py
Patch the consuming module's name directly with a string target instead
of importing the module as megatron_mod just to use patch.object.
Move all task_id/lineage assignment into a single place —
BaseStageAdapter._post_process_task_ids — instead of the default
ProcessingStage.process_batch. Every backend adapter subclasses
BaseStageAdapter, so this runs for every stage on every backend and makes
no difference whether a stage defines `process` or overrides
`process_batch`. This closes the gap where overriding `process_batch`
produced empty task_ids and restores the old "every task always has an id"
guarantee.

- stages/base.py: process_batch reverts to the plain per-task loop (no
  lineage). is_source/is_sink flags + get_deterministic_id stay (declarations).
- backends/base.py: _post_process_task_ids assigns ids unconditionally —
  source content-id (rooted), 1:N, positional M:M, and a random uuid for
  ambiguous batch fan-out so task_id is never empty. None results dropped.
- tasks.py: _EmptyTask.task_id defaults to "0", the implicit lineage root.
- pipeline.py: assign_root_lineage (moved here) roots user initial_tasks at
  "0" → "0_0", "0_1", … ; run() calls it.
- tests: process_batch no longer assigns ids (test_tasks asserts that);
  lineage behavior moved to tests/backends/test_task_id_postprocess.py;
  test_lineage keeps the _set_lineage/root/get_deterministic_id primitives.
…ness

_post_process_task_ids conflated two independent things: the input→output
mapping (which determines an output's parent) and whether the stage is a
source (which determines the segment: content id vs index). The old
`if is_source` branch hard-coded "all outputs are children of tasks[0]",
which is only valid for 1→N — a source stage can also be N→N (each input
→ one partition), where each output must descend from its positional
parent. Now the mapping shape (1→N vs positional M:M vs ambiguous) picks
the parent and the content-id-vs-index choice is applied within each.
…id-move

- Ambiguous batch fan-out (lineage can't be derived) now assigns
  "r"+uuid; documented in Task.task_id that an "r" prefix means
  non-deterministic / lineage-not-tracked.
- assign_root_lineage: keep positional roots (NOT content-hash) to avoid
  double content-hashing when a source stage is also present; commented.
- Fix tests that assumed the old per-stage task_id / _uuid behavior
  (validated against a baseline run of main in the same container — these
  were the only PR regressions; all other suite failures are pre-existing
  env issues: image codecs, pdfium, video MOTION_VECTORS, ray version):
  * test_merge_file_prefixes: rename over-stripped helper arg task_id->prefix
  * writer jsonl/parquet/megatron: uuid call_count 2->1 (no more _uuid)
  * add_id: assign distinct lineage ids to the two batches (adapter does this)
  * drop task_id-suffix assertions in sortformer/dedup/aesthetic/nsfw/
    convert/nemotron_cc/multimodal_reader (stages no longer set task_id)
…ntract

- test_kmeans output-filename test: stop asserting output_task.task_id ==
  filename. KMeans is an aggregating stage so its terminal _EmptyTask output
  ids are the non-deterministic "r<uuid>" fallback; the written FILES remain
  deterministic ("0_<file_hash>_<subgroup>.parquet"). Assert that filename
  pattern + centroid partitioning instead.
- process_batch docstring: state explicitly that task_id is framework-owned
  and stages must not set it (no escape hatch).
main added nemo_curator/stages/audio/alm/pretrain after this branch's base;
those files were written against the old settable task_id API and broke the
stages-audio CPU job (_EmptyTask.__init__() got unexpected kwarg 'task_id').
Strip task_id= from the new production constructors (extraction.py, io.py)
and alm tests, and drop the now-unused task_id helper param. Same mechanical
change applied to the rest of the codebase. No task_id assertions in these
tests, so no further changes needed.
…rams

Per review on _post_process_task_ids:
- (1) Do NOT drop None outputs before the length check. A filter stage that
  returns None in a slot (e.g. [T1, None, T3] for 3 inputs) keeps the list
  length, so each surviving output still maps to its OWN parent instead of
  being shifted/misattributed (or wrongly falling into the ambiguous uuid
  branch). None slots are dropped from the returned list afterward.
  Per-slot sentinels (NoneTask/FailedTask) for the filter+fanout combo come
  in a later PR.
- Rename params tasks/results -> input_tasks/output_tasks for clarity.
- Add a regression test (test_filter_stage_keeps_positional_alignment).
…path

task_id now embodies the parent->child id path, so there is no separate
"lineage" concept/field/method. Rename and de-jargon accordingly:

- _set_lineage -> _set_task_id
- assign_root_lineage -> assign_root_task_ids
- tests/tasks/test_lineage.py -> test_task_id.py (classes/methods renamed:
  TestSetTaskId, TestRootTaskIds; *_gets_id, *_is_reassigned, etc.)
- replace "lineage" wording in docstrings/comments with task_id / id-path /
  ancestry language across tasks.py, stages/base.py, backends/base.py,
  pipeline.py and the affected tests.

Pure rename + docs; no behavior change.
The merge of main (PR #2038, translation control knobs) re-introduced
task_id= constructor args in the translation segmentation/reassembly stages
and a translation pipeline test, breaking Unit_Test_stages-text_CPU
(DocumentBatch.__init__() got an unexpected keyword argument 'task_id').
Strip them — same mechanical change as the rest of the PR.
@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 8e373de

…idate task_id tests

- Task._set_task_id takes a single parent_task_id (str) instead of a list:
  every supported mapping (1->1, 1->N, N->N) gives an output exactly one
  parent; N->1 aggregations don't track ancestry (they get an "r"-prefixed
  id in the adapter). Update the two adapter call sites and the root
  assignment. (Praateek)
- _assign_source_sink_roles reports stage.name, not type(s).__name__, in the
  multiple-source/sink error messages — consistent with the rest of
  pipeline.py and the user-facing identifier. (Praateek)
- Consolidate the task_id tests next to the source they cover:
  * Task id primitives (_set_task_id, get_deterministic_id default) ->
    tests/tasks/test_tasks.py
  * assign_root_task_ids -> tests/pipelines/test_pipelines.py::TestRootTaskIds
  * delete tests/tasks/test_task_id.py (contents distributed)
  * slim tests/backends/test_task_id_postprocess.py to the cases a real
    pipeline can't easily reach (filter-None alignment, ambiguous->r-uuid,
    in-place re-derive, source content-id vs index)
  * enrich tests/backends/test_integration.py::test_task_ids to assert the
    deterministic id STRUCTURE end-to-end (rooted at "0", 12-hex source-hash
    segment, uniform path depth). Exact id strings aren't assertable: the
    source hashes temp-dir paths, so the hash varies per run. (Praateek)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 28fd4cd

@VibhuJawa VibhuJawa left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants