Skip to content

Pipeline resumability via source-level counter checkpointing#2063

Open
abhinavg4 wants to merge 1 commit into
mainfrom
abhinavg/resumability
Open

Pipeline resumability via source-level counter checkpointing#2063
abhinavg4 wants to merge 1 commit into
mainfrom
abhinavg/resumability

Conversation

@abhinavg4

Copy link
Copy Markdown
Contributor

Discussion (Design Doc)

#2034

Supersedes #2033, which was inadvertently squash-merged into the
abhinavg/sentinel-task feature branch by a stale auto-merge rule when its
base was retargeted. The feature branch was reset to the approved state and
resumability is tracked here instead. Same diff, stacked on #2062.

What

Adds opt-in pipeline resumability via source-level counter checkpointing,
built on top of the sentinel-task refactor in #2062 (this PR is stacked on
that branch — review/merge #2062 first).

Pipeline.run(checkpoint_path=...) tracks which source partitions have fully
drained through the pipeline and skips already-completed ones on a rerun, so an
interrupted run resumes without reprocessing finished work.

How

  • Sentinels (on Refactor empty/sentinel tasks: EmptyTask class + SentinelTask base #2062's SentinelTask base): NoneTask / FailedTask are
    bare subclasses — no identity of their own (dataset_name "none"/"failed",
    task_id assigned by the adapter like any task).
  • backends/base.py process_batch (always-on): a returned None
    ("filter this slot") becomes a NoneTask via a single inline comprehension so
    every output is a real Task and gets a task_id; sentinels are stripped
    before the next stage.
  • _apply_resumability_counters (gated on _is_active, counter-only): a
    source stamps _source_id, skips completed sources, fires +1; a non-source
    fires -1/0 per output (NoneTask-1, FailedTask → no delta, so its
    source stays pending and reruns). Counters key on the parent's identity —
    which is why the sentinels need none of their own. Ambiguous M→K batches
    warn + skip rather than misattribute.
  • LMDB actor + client. lmdb is a (locked) dependency but stays opt-in at
    import time: ACTOR_NAME lives in resumability_client so the always-imported
    worker path never imports lmdb; it loads only when resumability is used.
  • SLURM-array safe. State lives in <checkpoint_path>/.nemo_curator_metadata
    with one LMDB file per writer (<host>-<pid>), not a single shared file
    (LMDB can't be safely shared across hosts on a networked FS like Lustre). Each
    actor writes only its own file and reads the union of completed sources
    across all files on startup, so the tasks of a SLURM array can checkpoint to
    the same directory without contention.

Testing

  • tests/tasks/test_sentinels.pySentinelTask hierarchy (bare construction,
    payload rejection, EmptyTask rooted at "0", task_id not user-settable).
  • tests/backends/test_resumability_adapter.py — counter math + an end-to-end
    NoneNoneTask→strip case (actor RPCs mocked).
  • tests/utils/test_resumability_actor.py — counter dedup, anomaly recovery,
    lifecycle, and multi-writer union (SLURM-array safety).
  • Verified end-to-end in the nemo-curator container: a
    Source → Flaky(random FailedTask) → Sink pipeline re-run against one on-disk
    LMDB checkpoint converges, skips already-completed sources on resume, and
    processes each source exactly once.

@copy-pr-bot

copy-pr-bot Bot commented Jun 10, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test a560bc1

@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 49edeba

Builds the opt-in resumability layer on the SentinelTask hierarchy from the
base PR (#2062): NoneTask/FailedTask are bare SentinelTask subclasses (no
identity of their own — dataset_name "none"/"failed", task_id assigned by the
adapter like any task). The layer is a thin, counter-only step that runs after
PR-2062's always-on task_id assignment.

- backends/base.py process_batch flow (always-on, no resumability needed):
  a returned None ("filter this slot") becomes a NoneTask via a single inline
  comprehension so every output is a real Task and gets a task_id; ids are
  assigned by _post_process_task_ids (incl. sentinels); NoneTask/FailedTask are
  stripped before returning to the next stage.
- _apply_resumability_counters (gated on _is_active, counter-only — no task_id
  work): a source stage stamps _source_id (= the id's last segment), skips
  already-completed sources, and fires +1; a non-source fires -1/0 per output
  (NoneTask -> -1, FailedTask -> no delta so its source stays pending and
  reruns), inheriting _source_id; ambiguous M->K batches warn + skip rather
  than misattribute. Counters key on the PARENT's identity, which is why the
  sentinels need no identity of their own.
- Task gains _source_id; LMDB actor + client; Pipeline.run(checkpoint_path=...)
  owns the detached actor lifecycle (Ray 2.54: max_pending_calls).
- Make lmdb opt-in at import time: it stays a (locked) dependency, but
  ACTOR_NAME lives in resumability_client so the always-imported worker path
  (BaseStageAdapter -> client) never imports resumability_actor / lmdb; lmdb
  loads only when resumability is actually used. uv.lock adds lmdb 2.2.1.
- Resumability state lives in a <checkpoint_path>/.nemo_curator_metadata
  directory with ONE LMDB file per writer (host-pid), not a single shared
  file: LMDB can't be safely written by multiple processes across hosts on a
  networked FS like Lustre. Each actor writes only its own file and reads the
  union of completed sources across all files on startup, so the tasks of a
  SLURM array can checkpoint to the same directory without contention.
- Tests: sentinels test covers the SentinelTask hierarchy (bare construction,
  payload rejection, EmptyTask rooted at "0", task_id not user-settable); the
  adapter test covers the counter math + an end-to-end None->NoneTask->strip
  case; the actor test covers counter dedup, anomaly recovery, lifecycle, and
  multi-writer union (SLURM-array safety).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
@abhinavg4

Copy link
Copy Markdown
Contributor Author

/ok to test 3a6ef8d

Comment thread nemo_curator/tasks/sentinels.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants