Pipeline resumability via source-level counter checkpointing#2063
Open
abhinavg4 wants to merge 1 commit into
Open
Pipeline resumability via source-level counter checkpointing#2063abhinavg4 wants to merge 1 commit into
abhinavg4 wants to merge 1 commit into
Conversation
Contributor
Author
|
/ok to test a560bc1 |
Contributor
Author
|
/ok to test 49edeba |
Builds the opt-in resumability layer on the SentinelTask hierarchy from the base PR (#2062): NoneTask/FailedTask are bare SentinelTask subclasses (no identity of their own — dataset_name "none"/"failed", task_id assigned by the adapter like any task). The layer is a thin, counter-only step that runs after PR-2062's always-on task_id assignment. - backends/base.py process_batch flow (always-on, no resumability needed): a returned None ("filter this slot") becomes a NoneTask via a single inline comprehension so every output is a real Task and gets a task_id; ids are assigned by _post_process_task_ids (incl. sentinels); NoneTask/FailedTask are stripped before returning to the next stage. - _apply_resumability_counters (gated on _is_active, counter-only — no task_id work): a source stage stamps _source_id (= the id's last segment), skips already-completed sources, and fires +1; a non-source fires -1/0 per output (NoneTask -> -1, FailedTask -> no delta so its source stays pending and reruns), inheriting _source_id; ambiguous M->K batches warn + skip rather than misattribute. Counters key on the PARENT's identity, which is why the sentinels need no identity of their own. - Task gains _source_id; LMDB actor + client; Pipeline.run(checkpoint_path=...) owns the detached actor lifecycle (Ray 2.54: max_pending_calls). - Make lmdb opt-in at import time: it stays a (locked) dependency, but ACTOR_NAME lives in resumability_client so the always-imported worker path (BaseStageAdapter -> client) never imports resumability_actor / lmdb; lmdb loads only when resumability is actually used. uv.lock adds lmdb 2.2.1. - Resumability state lives in a <checkpoint_path>/.nemo_curator_metadata directory with ONE LMDB file per writer (host-pid), not a single shared file: LMDB can't be safely written by multiple processes across hosts on a networked FS like Lustre. Each actor writes only its own file and reads the union of completed sources across all files on startup, so the tasks of a SLURM array can checkpoint to the same directory without contention. - Tests: sentinels test covers the SentinelTask hierarchy (bare construction, payload rejection, EmptyTask rooted at "0", task_id not user-settable); the adapter test covers the counter math + an end-to-end None->NoneTask->strip case; the actor test covers counter dedup, anomaly recovery, lifecycle, and multi-writer union (SLURM-array safety). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
Contributor
Author
|
/ok to test 3a6ef8d |
sarahyurick
reviewed
Jun 11, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Discussion (Design Doc)
#2034
What
Adds opt-in pipeline resumability via source-level counter checkpointing,
built on top of the sentinel-task refactor in #2062 (this PR is stacked on
that branch — review/merge #2062 first).
Pipeline.run(checkpoint_path=...)tracks which source partitions have fullydrained through the pipeline and skips already-completed ones on a rerun, so an
interrupted run resumes without reprocessing finished work.
How
SentinelTaskbase):NoneTask/FailedTaskarebare subclasses — no identity of their own (
dataset_name"none"/"failed",task_idassigned by the adapter like any task).backends/base.pyprocess_batch(always-on): a returnedNone("filter this slot") becomes a
NoneTaskvia a single inline comprehension soevery output is a real
Taskand gets atask_id; sentinels are strippedbefore the next stage.
_apply_resumability_counters(gated on_is_active, counter-only): asource stamps
_source_id, skips completed sources, fires+1; a non-sourcefires
-1/0per output (NoneTask→-1,FailedTask→ no delta, so itssource stays pending and reruns). Counters key on the parent's identity —
which is why the sentinels need none of their own. Ambiguous
M→Kbatcheswarn + skip rather than misattribute.
lmdbis a (locked) dependency but stays opt-in atimport time:
ACTOR_NAMElives inresumability_clientso the always-importedworker path never imports
lmdb; it loads only when resumability is used.<checkpoint_path>/.nemo_curator_metadatawith one LMDB file per writer (
<host>-<pid>), not a single shared file(LMDB can't be safely shared across hosts on a networked FS like Lustre). Each
actor writes only its own file and reads the union of completed sources
across all files on startup, so the tasks of a SLURM array can checkpoint to
the same directory without contention.
Testing
tests/tasks/test_sentinels.py—SentinelTaskhierarchy (bare construction,payload rejection,
EmptyTaskrooted at"0",task_idnot user-settable).tests/backends/test_resumability_adapter.py— counter math + an end-to-endNone→NoneTask→strip case (actor RPCs mocked).tests/utils/test_resumability_actor.py— counter dedup, anomaly recovery,lifecycle, and multi-writer union (SLURM-array safety).
nemo-curatorcontainer: aSource → Flaky(random FailedTask) → Sinkpipeline re-run against one on-diskLMDB checkpoint converges, skips already-completed sources on resume, and
processes each source exactly once.