Skip to content

Pipeline resumability via source-level counter checkpointing#2033

Merged
abhinavg4 merged 1 commit into
abhinavg/sentinel-taskfrom
abhinavg/resumability
Jun 10, 2026
Merged

Pipeline resumability via source-level counter checkpointing#2033
abhinavg4 merged 1 commit into
abhinavg/sentinel-taskfrom
abhinavg/resumability

Conversation

@abhinavg4

@abhinavg4 abhinavg4 commented May 27, 2026

Copy link
Copy Markdown
Contributor

What

Adds opt-in pipeline resumability via source-level counter checkpointing,
built on top of the sentinel-task refactor in #2062 (this PR is stacked on
that branch — review/merge #2062 first).

Pipeline.run(checkpoint_path=...) tracks which source partitions have fully
drained through the pipeline and skips already-completed ones on a rerun, so an
interrupted run resumes without reprocessing finished work.

How

  • Sentinels (on Refactor empty/sentinel tasks: EmptyTask class + SentinelTask base #2062's SentinelTask base): NoneTask / FailedTask are
    bare subclasses — no identity of their own (dataset_name "none"/"failed",
    task_id assigned by the adapter like any task).
  • backends/base.py process_batch (always-on): a returned None
    ("filter this slot") becomes a NoneTask via a single inline comprehension so
    every output is a real Task and gets a task_id; sentinels are stripped
    before the next stage.
  • _apply_resumability_counters (gated on _is_active, counter-only): a
    source stamps _source_id, skips completed sources, fires +1; a non-source
    fires -1/0 per output (NoneTask-1, FailedTask → no delta, so its
    source stays pending and reruns). Counters key on the parent's identity —
    which is why the sentinels need none of their own. Ambiguous M→K batches
    warn + skip rather than misattribute.
  • LMDB actor + client. lmdb is a (locked) dependency but stays opt-in at
    import time: ACTOR_NAME lives in resumability_client so the always-imported
    worker path never imports lmdb; it loads only when resumability is used.
  • SLURM-array safe. State lives in <checkpoint_path>/.nemo_curator_metadata
    with one LMDB file per writer (<host>-<pid>), not a single shared file
    (LMDB can't be safely shared across hosts on a networked FS like Lustre). Each
    actor writes only its own file and reads the union of completed sources
    across all files on startup, so the tasks of a SLURM array can checkpoint to
    the same directory without contention.

Testing

  • tests/tasks/test_sentinels.pySentinelTask hierarchy (bare construction,
    payload rejection, EmptyTask rooted at "0", task_id not user-settable).
  • tests/backends/test_resumability_adapter.py — counter math + an end-to-end
    NoneNoneTask→strip case (actor RPCs mocked).
  • tests/utils/test_resumability_actor.py — counter dedup, anomaly recovery,
    lifecycle, and multi-writer union (SLURM-array safety).
  • Verified end-to-end in the nemo-curator container: a
    Source → Flaky(random FailedTask) → Sink pipeline re-run against one on-disk
    LMDB checkpoint converges, skips already-completed sources on resume, and
    processes each source exactly once.

@copy-pr-bot

copy-pr-bot Bot commented May 27, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps

greptile-apps Bot commented May 27, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds opt-in pipeline resumability: when Pipeline.run(checkpoint_path=…) is supplied, a detached Ray actor tracks per-source pending counters in an LMDB file and skips already-completed sources on rerun. The integration point is a single hook in BaseStageAdapter.process_batch; pipelines that omit checkpoint_path pay no overhead.

  • ResumabilityActor maintains in-memory counters and flushes to LMDB only when a source's counter reaches zero; "never-raises" design handles Ray-retry dedup, rewrite-on-conflict, and late-delta anomalies with logged warnings and in-memory recovery.
  • BaseStageAdapter gains None → NoneTask wrapping, _apply_resumability_counters, and sentinel stripping; fan-out delta math (len(real) − 1) and positional 1:1 handling correctly account for filtered/failed slots.
  • Pipeline.run owns the full actor lifecycle (spawn → execute → close/kill) via _run_with_resumability; several lifecycle edge-cases (stale actor reuse via get_if_exists=True, discarded spawn handle, unreachable ray.kill after close timeout, single-stage pipeline never completing) were flagged in previous review rounds and remain open.

Confidence Score: 3/5

The pipeline execution path is unchanged for existing users; all new code is gated behind checkpoint_path. However, the synchronous are_completed call inside _source_counters can raise an uncaught RayActorError when the actor's pending queue saturates, and several actor-lifecycle issues from earlier reviews remain unresolved.

The are_completed synchronous call inside the worker's critical path can throw RayActorError under queue pressure—an exception that nothing in the call stack catches, causing the source stage to crash. Combined with open lifecycle issues that could silently misdirect checkpointing state to the wrong LMDB file, the resumability feature is not yet reliable for production use.

nemo_curator/utils/resumability_client.py (_skip_completed_sources synchronous ray.get), nemo_curator/pipeline/pipeline.py (actor lifecycle: stale reuse, discarded handle, unreachable kill)

Important Files Changed

Filename Overview
nemo_curator/utils/resumability_actor.py New ResumabilityActor: in-memory counter + per-writer LMDB persistence; 'never raises' design handles retry dedup, rewrite-on-conflict, and un-completion anomalies well.
nemo_curator/utils/resumability_client.py _skip_completed_sources does a synchronous ray.get() that can raise RayActorError when the actor's max_pending_calls queue is saturated.
nemo_curator/backends/base.py Fan-out delta math and positional handling are correct; _post_process_task_ids now has dead if t is not None guards.
nemo_curator/pipeline/pipeline.py Several lifecycle issues from prior review remain: discarded spawn handle, stale actor reuse, unreachable ray.kill, single-stage never completing.
nemo_curator/tasks/sentinels.py New SentinelTask hierarchy (EmptyTask, NoneTask, FailedTask); clean dataclass design with correct invariants.
nemo_curator/tasks/tasks.py Adds _source_id field to Task; removes _EmptyTask (moved to sentinels.py). Clean refactor.
pyproject.toml Adds lmdb>=1.4 as mandatory core dependency for an opt-in feature.
tests/utils/test_resumability_actor.py Thorough unit coverage of counter math, dedup, rewrite-on-conflict, LMDB persistence, and multi-writer union.
tests/backends/test_resumability_adapter.py Good coverage of the BaseStageAdapter resumability layer with mocked actor RPCs.

Reviews (12): Last reviewed commit: "Add pipeline resumability on top of the ..." | Re-trigger Greptile

Comment thread nemo_curator/pipeline/pipeline.py Outdated
Comment on lines +347 to +356
try:
ray.init(ignore_reinit_error=True)
actor_handle = ray.get_actor(ACTOR_NAME)
final_errs = ray.get(actor_handle.errors.remote(), timeout=30) # type: ignore[attr-defined]
if final_errs and not collected_error:
raise final_errs[0]
except ray.exceptions.RayActorError: # type: ignore[attr-defined]
pass
except Exception as e: # noqa: BLE001
logger.warning(f"final resumability error check failed: {e}")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Final error check silently swallowed by except Exception — the raise final_errs[0] on line 352 lives inside the try block that owns the broad except Exception as e handler. When the actor records a NonDeterministicTaskError or NegativePendingCountError that only surfaces at teardown (e.g., from the last in-flight batch), the raise is immediately re-caught and demoted to a logger.warning(...). The pipeline then exits cleanly with return results even though data-integrity errors were detected.

Suggested change
try:
ray.init(ignore_reinit_error=True)
actor_handle = ray.get_actor(ACTOR_NAME)
final_errs = ray.get(actor_handle.errors.remote(), timeout=30) # type: ignore[attr-defined]
if final_errs and not collected_error:
raise final_errs[0]
except ray.exceptions.RayActorError: # type: ignore[attr-defined]
pass
except Exception as e: # noqa: BLE001
logger.warning(f"final resumability error check failed: {e}")
_deferred_error: Exception | None = None
try:
ray.init(ignore_reinit_error=True)
actor_handle = ray.get_actor(ACTOR_NAME)
final_errs = ray.get(actor_handle.errors.remote(), timeout=30) # type: ignore[attr-defined]
if final_errs and not collected_error:
_deferred_error = final_errs[0]
except ray.exceptions.RayActorError: # type: ignore[attr-defined]
pass
except Exception as e: # noqa: BLE001
logger.warning(f"final resumability error check failed: {e}")
if _deferred_error is not None:
raise _deferred_error

Comment on lines +135 to +168
def _apply_impl(self, per_task: list[tuple[str, str, int]]) -> None:
newly_done: list[str] = []
for task_hash, sid, d in per_task:
existing = self._applied.get(task_hash)
if existing is not None:
if existing != d:
msg = (
f"Task {task_hash} produced delta={d} but previously "
f"produced delta={existing}. User code must be "
f"deterministic w.r.t. its input for resumability "
f"(e.g. consistent NoneTask vs real-output decisions)."
)
raise NonDeterministicTaskError(msg)
continue # idempotent re-fire
self._applied[task_hash] = d
if sid in self._completed:
continue
self._pending[sid] = self._pending.get(sid, 0) + d
if self._pending[sid] == 0:
newly_done.append(sid)
elif self._pending[sid] < 0:
msg = (
f"source {sid!r} pending count went to {self._pending[sid]}. "
f"This shouldn't happen with deterministic user code — likely "
f"a fan-out stage returned a different number of outputs "
f"between runs, or a NoneTask was issued more times than "
f"the source had tasks. Last delta applied: {d} for task "
f"{task_hash}."
)
raise NegativePendingCountError(msg)
if newly_done:
self._persist_completed(newly_done)
for sid in newly_done:
self._completed.add(sid)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Inconsistent _pending state after mid-batch error

_apply_impl accumulates newly_done entries while iterating the batch, then calls _persist_completed only if the loop finishes cleanly. If the loop raises NegativePendingCountError (e.g., two -1 deltas for the same source in one batch), any source that had already reached zero earlier in that same batch is never persisted and never added to _completed, but _pending[sid] for the failing source is left at a negative value. Subsequent apply_deltas calls on the same actor see neither the correct _completed entry nor a sane counter, which can lead to a false "source completed" if a later delta happens to bring the negative value back to zero.

Comment on lines +37 to +39
def _is_active() -> bool:
"""True if a resumability actor is registered in this Ray cluster."""
return _actor() is not None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 ray.get_actor() GCS round-trip on every processed batch

_is_active() delegates to _actor(), which calls ray.get_actor(ACTOR_NAME) unconditionally whenever Ray is initialized — even for pipelines that never set checkpoint_path. For an executor that runs hundreds of thousands of small batches this adds a GCS lookup per batch. Caching the handle (or the False sentinel) at module level after the first lookup would eliminate the per-batch overhead.

Comment thread nemo_curator/pipeline/pipeline.py Outdated
Comment on lines +300 to +305
actor = ResumabilityActor.options( # type: ignore[attr-defined]
name=ACTOR_NAME,
lifetime="detached",
get_if_exists=True,
_max_pending_calls=100,
).remote(str(checkpoint_path))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 get_if_exists=True can silently reuse a stale detached actor

When get_if_exists=True, Ray returns the existing actor handle without calling the constructor if an actor named nemo_curator_resumability is already alive — for instance from a previous run that crashed before the cleanup block ran. That stale actor owns a different LMDB file (from its own checkpoint_path), so all new deltas accumulate in the wrong place and are_completed queries reflect the wrong state. The caller has no indication that the path argument was ignored. Consider checking whether the returned actor is pointing at the expected path (e.g., via a path() accessor), or kill any pre-existing actor and spawn fresh.

@abhinavg4 abhinavg4 left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvements we should probably apply to v0.

Priority:

  1. Rename terminal stage to sink. Source and sink is better than source and terminal
  2. Here, instead of erroring out, can we just rewrite the new value? and removing the whole demon logic
  3. Unify task_id, _uuid and _deterministic_lineage_path_hash.
  4. support a dynamic dataset

Comment thread nemo_curator/tasks/tasks.py Outdated
# hashed into `_deterministic_lineage_path_hash`, the deterministic
# task id used for retry-deduplication on the resumability actor.
_lineage_path: str = field(init=False, default="")
_deterministic_lineage_path_hash: str = field(init=False, default="")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this? We have task_id, _uuid and _deterministic_lineage_path_hash. They can be combined into a single variable. But before that we need to ensure the usage of task_id and uuid is not affected. Eg:- Writer stage might use task_id right now, we should change to this.

Comment thread nemo_curator/pipeline/pipeline.py Outdated
Comment thread nemo_curator/backends/base.py Outdated
Comment thread nemo_curator/utils/resumability_actor.py Outdated
abhinavg4 added a commit that referenced this pull request May 27, 2026
Two changes from review feedback on PR #2033:

1. Rename `_is_terminal_stage` → `_is_sink_stage` throughout. "Source"
   and "sink" reads better than "source" and "terminal".

2. Resumability never fails the pipeline.
   - Drop `NonDeterministicTaskError`, `NegativePendingCountError`,
     `_errors`, `errors()`, `_record_error` on the actor.
   - `apply_deltas` is now never-raising: on a retry firing a different
     delta for the same task hash, rewrite the pending counter via
     `(-old + new)` rather than erroring. The newest observation wins.
   - On the two anomaly cases (different delta for a task whose source
     is already completed, OR a never-seen task for an already-completed
     source), log a loud warning AND remove the source from the
     completed set (in-memory + LMDB) so it's reprocessed cleanly on
     the next run. The warning points users at
     https://github.com/NVIDIA-NeMo/Curator to file an issue.
   - Drop the watchdog thread and `_thread.interrupt_main` in
     `Pipeline._run_with_resumability`. The method shrinks from ~80
     lines to ~25 — just spawn, run, close.

Tests updated to assert: rewrite-on-conflict math; un-complete-on-
anomaly with LMDB-survival; `apply_deltas` never raises under any
input.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread nemo_curator/backends/base.py Outdated
abhinavg4 added a commit that referenced this pull request May 28, 2026
Two changes from review feedback on PR #2033:

1. Rename `_is_terminal_stage` → `_is_sink_stage` throughout. "Source"
   and "sink" reads better than "source" and "terminal".

2. Resumability never fails the pipeline.
   - Drop `NonDeterministicTaskError`, `NegativePendingCountError`,
     `_errors`, `errors()`, `_record_error` on the actor.
   - `apply_deltas` is now never-raising: on a retry firing a different
     delta for the same task hash, rewrite the pending counter via
     `(-old + new)` rather than erroring. The newest observation wins.
   - On the two anomaly cases (different delta for a task whose source
     is already completed, OR a never-seen task for an already-completed
     source), log a loud warning AND remove the source from the
     completed set (in-memory + LMDB) so it's reprocessed cleanly on
     the next run. The warning points users at
     https://github.com/NVIDIA-NeMo/Curator to file an issue.
   - Drop the watchdog thread and `_thread.interrupt_main` in
     `Pipeline._run_with_resumability`. The method shrinks from ~80
     lines to ~25 — just spawn, run, close.

Tests updated to assert: rewrite-on-conflict math; un-complete-on-
anomaly with LMDB-survival; `apply_deltas` never raises under any
input.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@abhinavg4 abhinavg4 force-pushed the abhinavg/resumability branch from 733fa37 to ec75bf2 Compare May 28, 2026 18:39
@abhinavg4 abhinavg4 changed the base branch from main to abhinavg/task-id-cleanup May 28, 2026 18:49

if checkpoint_path is None:
return executor.execute(self.stages, initial_tasks)
return self._run_with_resumability(executor, initial_tasks, checkpoint_path)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like this, the reason this is being done is probably coz we call ray init inside the executor and not here, so we need to differentiate that. Not ideal for sure. Maybe we can make a function for _start_resumability_actor and call it inside each executor. Design choice.

abhinavg4 added a commit that referenced this pull request Jun 1, 2026
Two changes from review feedback on PR #2033:

1. Rename `_is_terminal_stage` → `_is_sink_stage` throughout. "Source"
   and "sink" reads better than "source" and "terminal".

2. Resumability never fails the pipeline.
   - Drop `NonDeterministicTaskError`, `NegativePendingCountError`,
     `_errors`, `errors()`, `_record_error` on the actor.
   - `apply_deltas` is now never-raising: on a retry firing a different
     delta for the same task hash, rewrite the pending counter via
     `(-old + new)` rather than erroring. The newest observation wins.
   - On the two anomaly cases (different delta for a task whose source
     is already completed, OR a never-seen task for an already-completed
     source), log a loud warning AND remove the source from the
     completed set (in-memory + LMDB) so it's reprocessed cleanly on
     the next run. The warning points users at
     https://github.com/NVIDIA-NeMo/Curator to file an issue.
   - Drop the watchdog thread and `_thread.interrupt_main` in
     `Pipeline._run_with_resumability`. The method shrinks from ~80
     lines to ~25 — just spawn, run, close.

Tests updated to assert: rewrite-on-conflict math; un-complete-on-
anomaly with LMDB-survival; `apply_deltas` never raises under any
input.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@abhinavg4 abhinavg4 force-pushed the abhinavg/resumability branch from 5af7e17 to b6a6c7f Compare June 1, 2026 22:59
Comment thread nemo_curator/backends/base.py Outdated
Comment on lines +141 to +142
if all(not t._source_id for t in tasks):
return [r for r in results if not isinstance(r, (NoneTask, FailedTask))]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 The pre-source sentinel filter must also exclude raw None values; otherwise a fan-out stage that returns None in its output list will pass those values through to the add_stage_perf loop, causing an AttributeError at runtime.

Suggested change
if all(not t._source_id for t in tasks):
return [r for r in results if not isinstance(r, (NoneTask, FailedTask))]
if all(not t._source_id for t in tasks):
return [r for r in results if r is not None and not isinstance(r, (NoneTask, FailedTask))]

Comment on lines +286 to +291
ResumabilityActor.options( # type: ignore[attr-defined]
name=ACTOR_NAME,
lifetime="detached",
get_if_exists=True,
_max_pending_calls=100,
).remote(str(checkpoint_path))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Fire-and-forget delta loss when _max_pending_calls=100 is exceeded

_max_pending_calls=100 on the actor causes Ray to embed a RayActorError in the returned ObjectRef when the queue is full. Because _flush_deltas never calls ray.get() on that ref, the rejection is silently swallowed and the delta is permanently lost. For a pipeline processing more than 100 batches before the actor can drain its queue (trivially possible at high throughput), any source whose counter delta was lost will have _pending stuck above 0 and will never be written to LMDB. The PR description claims "_max_pending_calls provides backpressure", but the actual behavior is silent delta loss, not blocking — the two have opposite throughput implications. A limit of 100 is also very low relative to a typical pipeline's batch concurrency. Consider either raising the limit substantially, adding a ray.get() health check in the watchdog thread to surface errors, or verifying the actual Ray behavior for the fire-and-forget case.

abhinavg4 added a commit that referenced this pull request Jun 2, 2026
User-visible: `Task.task_id` is now a deterministic 12-char hex hash
derived from the task's lineage path through the pipeline DAG, set by
the framework. User-provided `task_id=...` values continue to work (no
breaking API change) but are always overwritten by the framework's
`_set_lineage` once the task flows through any stage.

Two pipelines run on the same inputs produce byte-identical `task_id`s
across all tasks. This replaces the random `_uuid` field (which is now
dropped) for use cases like deterministic output filenames in dedup
stages, IDs added by `AddId`, etc.

Changes:

- `Task` (nemo_curator/tasks/tasks.py):
  - DROP `_uuid` field.
  - ADD `_lineage_path` field (init=False, default="").
  - ADD `_set_lineage(parent_lineage_paths, child_segment)` method.
    Always overwrites both `_lineage_path` and `task_id`. The
    `child_segment` can be a positional index (int) for plain emissions
    or a content-based string (e.g. from `get_deterministic_id()`) for
    source-stage emissions where stability across input reordering
    matters.
  - ADD `get_deterministic_id() -> str | None` method (default None).
    Subclasses with stable content override.
- `FileGroupTask` (nemo_curator/tasks/file_group.py):
  - Override `get_deterministic_id()` to return
    `get_deterministic_hash(sorted(self.data))` — stable across runs
    even if files are added/removed between launches.
- `ProcessingStage.process_batch` (nemo_curator/stages/base.py):
  - Default implementation now calls `_set_lineage` on each emitted
    child. Stages that override `process_batch` are responsible for
    calling `_set_lineage` themselves.
- Migrate 6 `_uuid` use sites to `task.task_id`:
  - `stages/text/modules/add_id.py:71`
  - `stages/deduplication/semantic/kmeans.py:235, 395`
  - `stages/deduplication/fuzzy/buckets_to_edges.py:83`
  - `stages/deduplication/fuzzy/connected_components.py:176`
  - `stages/deduplication/fuzzy/minhash.py:309`

Tests:
- `tests/tasks/test_tasks.py`: updated fanout test to use
  `process_batch` (which exercises the new lineage assignment) and
  assert on `task_id` instead of `_uuid`.
- `tests/tasks/test_lineage.py` (NEW): unit tests covering
  `_set_lineage` (always overwrites, filters empty parent paths,
  accepts string segments), `get_deterministic_id` (default None,
  FileGroupTask content-hash with stable sort), and the default
  `process_batch` lineage assignment.

This PR is a prerequisite for the resumability work in #2033, which
will be rebased on top to use `task.task_id` as the per-task dedup key
instead of a separate `_deterministic_lineage_path_hash` field.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@abhinavg4 abhinavg4 force-pushed the abhinavg/task-id-cleanup branch from 75b9298 to 976f16d Compare June 2, 2026 16:45
abhinavg4 added a commit that referenced this pull request Jun 4, 2026
User-visible: `Task.task_id` is now a deterministic 12-char hex hash
derived from the task's lineage path through the pipeline DAG, set by
the framework. User-provided `task_id=...` values continue to work (no
breaking API change) but are always overwritten by the framework's
`_set_lineage` once the task flows through any stage.

Two pipelines run on the same inputs produce byte-identical `task_id`s
across all tasks. This replaces the random `_uuid` field (which is now
dropped) for use cases like deterministic output filenames in dedup
stages, IDs added by `AddId`, etc.

Changes:

- `Task` (nemo_curator/tasks/tasks.py):
  - DROP `_uuid` field.
  - ADD `_lineage_path` field (init=False, default="").
  - ADD `_set_lineage(parent_lineage_paths, child_segment)` method.
    Always overwrites both `_lineage_path` and `task_id`. The
    `child_segment` can be a positional index (int) for plain emissions
    or a content-based string (e.g. from `get_deterministic_id()`) for
    source-stage emissions where stability across input reordering
    matters.
  - ADD `get_deterministic_id() -> str | None` method (default None).
    Subclasses with stable content override.
- `FileGroupTask` (nemo_curator/tasks/file_group.py):
  - Override `get_deterministic_id()` to return
    `get_deterministic_hash(sorted(self.data))` — stable across runs
    even if files are added/removed between launches.
- `ProcessingStage.process_batch` (nemo_curator/stages/base.py):
  - Default implementation now calls `_set_lineage` on each emitted
    child. Stages that override `process_batch` are responsible for
    calling `_set_lineage` themselves.
- Migrate 6 `_uuid` use sites to `task.task_id`:
  - `stages/text/modules/add_id.py:71`
  - `stages/deduplication/semantic/kmeans.py:235, 395`
  - `stages/deduplication/fuzzy/buckets_to_edges.py:83`
  - `stages/deduplication/fuzzy/connected_components.py:176`
  - `stages/deduplication/fuzzy/minhash.py:309`

Tests:
- `tests/tasks/test_tasks.py`: updated fanout test to use
  `process_batch` (which exercises the new lineage assignment) and
  assert on `task_id` instead of `_uuid`.
- `tests/tasks/test_lineage.py` (NEW): unit tests covering
  `_set_lineage` (always overwrites, filters empty parent paths,
  accepts string segments), `get_deterministic_id` (default None,
  FileGroupTask content-hash with stable sort), and the default
  `process_batch` lineage assignment.

This PR is a prerequisite for the resumability work in #2033, which
will be rebased on top to use `task.task_id` as the per-task dedup key
instead of a separate `_deterministic_lineage_path_hash` field.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@abhinavg4 abhinavg4 force-pushed the abhinavg/task-id-cleanup branch from 07036af to 5757fba Compare June 4, 2026 18:00
@abhinavg4 abhinavg4 force-pushed the abhinavg/resumability branch 2 times, most recently from 88b146e to d73ebd0 Compare June 4, 2026 22:06
Comment on lines +319 to +325
try:
ray.init(ignore_reinit_error=True)
actor_handle = ray.get_actor(ACTOR_NAME)
ray.get(actor_handle.close.remote(), timeout=10) # type: ignore[attr-defined]
ray.kill(actor_handle)
except Exception as e: # noqa: BLE001
logger.warning(f"resumability actor cleanup failed: {e}")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 ray.kill is unreachable when the close() RPC times out. ray.get(..., timeout=10) raises GetTimeoutError (a subclass of Exception), which is caught by the outer except Exception block — so ray.kill(actor_handle) on the next line is never executed. The detached actor keeps running and holds the LMDB file open indefinitely. On the next Pipeline.run(checkpoint_path=...) call, get_if_exists=True returns the leaked actor, which is pointing at the original path but may have stale in-memory state.

Separate the kill so it always runs regardless of whether close() succeeds.

Suggested change
try:
ray.init(ignore_reinit_error=True)
actor_handle = ray.get_actor(ACTOR_NAME)
ray.get(actor_handle.close.remote(), timeout=10) # type: ignore[attr-defined]
ray.kill(actor_handle)
except Exception as e: # noqa: BLE001
logger.warning(f"resumability actor cleanup failed: {e}")
try:
ray.init(ignore_reinit_error=True)
actor_handle = ray.get_actor(ACTOR_NAME)
try:
ray.get(actor_handle.close.remote(), timeout=10) # type: ignore[attr-defined]
except Exception as e: # noqa: BLE001
logger.warning(f"resumability actor close failed: {e}")
finally:
ray.kill(actor_handle)
except Exception as e: # noqa: BLE001
logger.warning(f"resumability actor cleanup failed: {e}")

@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 5cb8a87

Comment on lines +305 to +310
ResumabilityActor.options( # type: ignore[attr-defined]
name=ACTOR_NAME,
lifetime="detached",
get_if_exists=True,
max_pending_calls=100,
).remote(str(checkpoint_path))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Actor handle is discarded; silent resumability bypass on spawn failure

ResumabilityActor.options(...).remote(str(checkpoint_path)) returns an actor handle that is immediately discarded. Ray actor creation is asynchronous — if the actor fails to initialize (e.g., resource pressure, LMDB permission error), the .remote() call still returns a handle, the failure is deferred, and subsequent ray.get_actor(ACTOR_NAME) in _is_active() will raise ValueError returning None. The pipeline then runs without resumability and without any user-visible error; completed sources are never checkpointed.

Capturing the handle and making at least one synchronous ray.get call on a lightweight probe (e.g., are_completed([])) before the executor runs would surface initialization failures eagerly.

@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 24fdafc

Comment on lines +203 to +212
def _apply_resumability_counters(self, input_tasks: list[Task], output_tasks: list[Task]) -> list[Task]: # noqa: C901
stage = self.stage
if getattr(stage, "is_source_stage", False):
return self._source_counters(output_tasks)

# Pre-source stages: inputs carry no _source_id, so there's nothing to
# track yet. Leave outputs untouched.
if all(not t._source_id for t in input_tasks):
return output_tasks

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Single-stage pipeline never marks sources complete

_apply_resumability_counters short-circuits to _source_counters whenever is_source_stage is True, returning before the is_sink_stage branch can fire a −1 delta. Pipeline.build() assigns both flags to the same stage when only one stage is present (stages[0] == stages[-1]). As a result, every source fires +1 but nothing ever fires −1, so _pending stays at 1 for every source indefinitely and the checkpoint is never written. Every resume reprocesses the full pipeline.

@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 90f6834

@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 2eaa29b

Comment thread nemo_curator/backends/base.py Outdated
case (lengths differ) ``None`` children are simply dropped."""
if len(results) == len(tasks):
return [NoneTask.from_input(tasks[i]) if r is None else r for i, r in enumerate(results)]
return [r for r in results if r is not None]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really clear to me why we would just drop None children during a fanout stage.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Furthermore it looks like _post_process_task_ids will already drop any None objects right?

Comment thread nemo_curator/backends/base.py Outdated
return results

@staticmethod
def _normalize_none(tasks: list[Task], results: list[Task | None]) -> list[Task | None]:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit but everywhere else is calling them input and output tasks:

Suggested change
def _normalize_none(tasks: list[Task], results: list[Task | None]) -> list[Task | None]:
def _normalize_none(input_tasks: list[Task], output_tasks: list[Task | None]) -> list[Task | None]:

# ------------------------------------------------------------------ #
def _apply_resumability_counters(self, input_tasks: list[Task], output_tasks: list[Task]) -> list[Task]: # noqa: C901
stage = self.stage
if getattr(stage, "is_source_stage", False):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question: why would the user ever want something other than source stage being the first stage and sink stage being the last stage of the pipeline? Like if the last stage failed but the second to last stage was the sink stage, they just don't want to rerun the last stage?


is_sink = stage.is_sink_stage
per_task: list[tuple[str, str, int]] = []
real = [t for t in output_tasks if not isinstance(t, (NoneTask, FailedTask))]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like since real is only used in the if len(input_tasks) == 1 and len(output_tasks) != 1: block, let's move it there for easier reading?

surviving source fires a ``+1``."""
sources = [t for t in output_tasks if not isinstance(t, (NoneTask, FailedTask))]
for t in sources:
t._source_id = t.task_id.rsplit("_", 1)[-1]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little confused why the source ID here is the last part of the X_Y_Z chain? I guess maybe I am confused about how task_id versus _source_id are constructed/formatted.

Builds the opt-in resumability layer on the SentinelTask hierarchy from the
base PR (#2062): NoneTask/FailedTask are bare SentinelTask subclasses (no
identity of their own — dataset_name "none"/"failed", task_id assigned by the
adapter like any task). The layer is a thin, counter-only step that runs after
PR-2062's always-on task_id assignment.

- backends/base.py process_batch flow (always-on, no resumability needed):
  a returned None ("filter this slot") becomes a NoneTask via a single inline
  comprehension so every output is a real Task and gets a task_id; ids are
  assigned by _post_process_task_ids (incl. sentinels); NoneTask/FailedTask are
  stripped before returning to the next stage.
- _apply_resumability_counters (gated on _is_active, counter-only — no task_id
  work): a source stage stamps _source_id (= the id's last segment), skips
  already-completed sources, and fires +1; a non-source fires -1/0 per output
  (NoneTask -> -1, FailedTask -> no delta so its source stays pending and
  reruns), inheriting _source_id; ambiguous M->K batches warn + skip rather
  than misattribute. Counters key on the PARENT's identity, which is why the
  sentinels need no identity of their own.
- Task gains _source_id; LMDB actor + client; Pipeline.run(checkpoint_path=...)
  owns the detached actor lifecycle (Ray 2.54: max_pending_calls).
- Make lmdb opt-in at import time: it stays a (locked) dependency, but
  ACTOR_NAME lives in resumability_client so the always-imported worker path
  (BaseStageAdapter -> client) never imports resumability_actor / lmdb; lmdb
  loads only when resumability is actually used. uv.lock adds lmdb 2.2.1.
- Resumability state lives in a <checkpoint_path>/.nemo_curator_metadata
  directory with ONE LMDB file per writer (host-pid), not a single shared
  file: LMDB can't be safely written by multiple processes across hosts on a
  networked FS like Lustre. Each actor writes only its own file and reads the
  union of completed sources across all files on startup, so the tasks of a
  SLURM array can checkpoint to the same directory without contention.
- Tests: sentinels test covers the SentinelTask hierarchy (bare construction,
  payload rejection, EmptyTask rooted at "0", task_id not user-settable); the
  adapter test covers the counter math + an end-to-end None->NoneTask->strip
  case; the actor test covers counter dedup, anomaly recovery, lifecycle, and
  multi-writer union (SLURM-array safety).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test a560bc1

@abhinavg4

Copy link
Copy Markdown
Contributor Author

Heads-up: this PR was inadvertently squash-merged into the abhinavg/sentinel-task feature branch (not main) by a stale auto-merge rule that fired when I retargeted its base. No code reached main. I reset the feature branch to its approved state, so the sentinel refactor (#2062) is unaffected. Resumability now continues in #2063 (same diff, stacked on #2062).

Comment on lines +60 to +67
def _skip_completed_sources(source_ids: list[str]) -> set[str]:
"""Synchronous lookup of which source_ids are already marked complete
in LMDB. Used by the source-stage adapter to drop already-done
sources from its output before downstream stages see them."""
a = _actor()
if a is None or not source_ids:
return set()
flags = ray.get(a.are_completed.remote(source_ids)) # type: ignore[attr-defined]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 are_completed synchronous call unguarded against max_pending_calls rejection

ray.get(a.are_completed.remote(...)) shares the actor's 100-call pending queue with all the fire-and-forget apply_deltas calls fired by downstream stages. If 100 apply_deltas are queued when the source stage invokes _skip_completed_sources, Ray embeds a RayActorError in the returned ObjectRef; ray.get() then raises it. Unlike _flush_deltas, this error is NOT silently discarded — it propagates up through _source_counters_apply_resumability_countersprocess_batch, crashing the source-stage batch. In a long-running pipeline where the source stage can fire while downstream stages are actively queuing deltas this is a concrete failure path. Consider increasing max_pending_calls substantially, or wrapping this call in a try/except and falling back gracefully (e.g. assume no sources are complete, skip the filter step).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants