Pipeline resumability via source-level counter checkpointing#2033
Conversation
Greptile SummaryThis PR adds opt-in pipeline resumability: when
Confidence Score: 3/5The pipeline execution path is unchanged for existing users; all new code is gated behind checkpoint_path. However, the synchronous are_completed call inside _source_counters can raise an uncaught RayActorError when the actor's pending queue saturates, and several actor-lifecycle issues from earlier reviews remain unresolved. The are_completed synchronous call inside the worker's critical path can throw RayActorError under queue pressure—an exception that nothing in the call stack catches, causing the source stage to crash. Combined with open lifecycle issues that could silently misdirect checkpointing state to the wrong LMDB file, the resumability feature is not yet reliable for production use. nemo_curator/utils/resumability_client.py (_skip_completed_sources synchronous ray.get), nemo_curator/pipeline/pipeline.py (actor lifecycle: stale reuse, discarded handle, unreachable kill) Important Files Changed
Reviews (12): Last reviewed commit: "Add pipeline resumability on top of the ..." | Re-trigger Greptile |
| try: | ||
| ray.init(ignore_reinit_error=True) | ||
| actor_handle = ray.get_actor(ACTOR_NAME) | ||
| final_errs = ray.get(actor_handle.errors.remote(), timeout=30) # type: ignore[attr-defined] | ||
| if final_errs and not collected_error: | ||
| raise final_errs[0] | ||
| except ray.exceptions.RayActorError: # type: ignore[attr-defined] | ||
| pass | ||
| except Exception as e: # noqa: BLE001 | ||
| logger.warning(f"final resumability error check failed: {e}") |
There was a problem hiding this comment.
Final error check silently swallowed by
except Exception — the raise final_errs[0] on line 352 lives inside the try block that owns the broad except Exception as e handler. When the actor records a NonDeterministicTaskError or NegativePendingCountError that only surfaces at teardown (e.g., from the last in-flight batch), the raise is immediately re-caught and demoted to a logger.warning(...). The pipeline then exits cleanly with return results even though data-integrity errors were detected.
| try: | |
| ray.init(ignore_reinit_error=True) | |
| actor_handle = ray.get_actor(ACTOR_NAME) | |
| final_errs = ray.get(actor_handle.errors.remote(), timeout=30) # type: ignore[attr-defined] | |
| if final_errs and not collected_error: | |
| raise final_errs[0] | |
| except ray.exceptions.RayActorError: # type: ignore[attr-defined] | |
| pass | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning(f"final resumability error check failed: {e}") | |
| _deferred_error: Exception | None = None | |
| try: | |
| ray.init(ignore_reinit_error=True) | |
| actor_handle = ray.get_actor(ACTOR_NAME) | |
| final_errs = ray.get(actor_handle.errors.remote(), timeout=30) # type: ignore[attr-defined] | |
| if final_errs and not collected_error: | |
| _deferred_error = final_errs[0] | |
| except ray.exceptions.RayActorError: # type: ignore[attr-defined] | |
| pass | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning(f"final resumability error check failed: {e}") | |
| if _deferred_error is not None: | |
| raise _deferred_error |
| def _apply_impl(self, per_task: list[tuple[str, str, int]]) -> None: | ||
| newly_done: list[str] = [] | ||
| for task_hash, sid, d in per_task: | ||
| existing = self._applied.get(task_hash) | ||
| if existing is not None: | ||
| if existing != d: | ||
| msg = ( | ||
| f"Task {task_hash} produced delta={d} but previously " | ||
| f"produced delta={existing}. User code must be " | ||
| f"deterministic w.r.t. its input for resumability " | ||
| f"(e.g. consistent NoneTask vs real-output decisions)." | ||
| ) | ||
| raise NonDeterministicTaskError(msg) | ||
| continue # idempotent re-fire | ||
| self._applied[task_hash] = d | ||
| if sid in self._completed: | ||
| continue | ||
| self._pending[sid] = self._pending.get(sid, 0) + d | ||
| if self._pending[sid] == 0: | ||
| newly_done.append(sid) | ||
| elif self._pending[sid] < 0: | ||
| msg = ( | ||
| f"source {sid!r} pending count went to {self._pending[sid]}. " | ||
| f"This shouldn't happen with deterministic user code — likely " | ||
| f"a fan-out stage returned a different number of outputs " | ||
| f"between runs, or a NoneTask was issued more times than " | ||
| f"the source had tasks. Last delta applied: {d} for task " | ||
| f"{task_hash}." | ||
| ) | ||
| raise NegativePendingCountError(msg) | ||
| if newly_done: | ||
| self._persist_completed(newly_done) | ||
| for sid in newly_done: | ||
| self._completed.add(sid) |
There was a problem hiding this comment.
Inconsistent
_pending state after mid-batch error
_apply_impl accumulates newly_done entries while iterating the batch, then calls _persist_completed only if the loop finishes cleanly. If the loop raises NegativePendingCountError (e.g., two -1 deltas for the same source in one batch), any source that had already reached zero earlier in that same batch is never persisted and never added to _completed, but _pending[sid] for the failing source is left at a negative value. Subsequent apply_deltas calls on the same actor see neither the correct _completed entry nor a sane counter, which can lead to a false "source completed" if a later delta happens to bring the negative value back to zero.
| def _is_active() -> bool: | ||
| """True if a resumability actor is registered in this Ray cluster.""" | ||
| return _actor() is not None |
There was a problem hiding this comment.
ray.get_actor() GCS round-trip on every processed batch
_is_active() delegates to _actor(), which calls ray.get_actor(ACTOR_NAME) unconditionally whenever Ray is initialized — even for pipelines that never set checkpoint_path. For an executor that runs hundreds of thousands of small batches this adds a GCS lookup per batch. Caching the handle (or the False sentinel) at module level after the first lookup would eliminate the per-batch overhead.
| actor = ResumabilityActor.options( # type: ignore[attr-defined] | ||
| name=ACTOR_NAME, | ||
| lifetime="detached", | ||
| get_if_exists=True, | ||
| _max_pending_calls=100, | ||
| ).remote(str(checkpoint_path)) |
There was a problem hiding this comment.
get_if_exists=True can silently reuse a stale detached actor
When get_if_exists=True, Ray returns the existing actor handle without calling the constructor if an actor named nemo_curator_resumability is already alive — for instance from a previous run that crashed before the cleanup block ran. That stale actor owns a different LMDB file (from its own checkpoint_path), so all new deltas accumulate in the wrong place and are_completed queries reflect the wrong state. The caller has no indication that the path argument was ignored. Consider checking whether the returned actor is pointing at the expected path (e.g., via a path() accessor), or kill any pre-existing actor and spawn fresh.
There was a problem hiding this comment.
Improvements we should probably apply to v0.
Priority:
- Rename terminal stage to sink. Source and sink is better than source and terminal
- Here, instead of erroring out, can we just rewrite the new value? and removing the whole demon logic
- Unify task_id, _uuid and _deterministic_lineage_path_hash.
- support a dynamic dataset
| # hashed into `_deterministic_lineage_path_hash`, the deterministic | ||
| # task id used for retry-deduplication on the resumability actor. | ||
| _lineage_path: str = field(init=False, default="") | ||
| _deterministic_lineage_path_hash: str = field(init=False, default="") |
There was a problem hiding this comment.
Do we really need this? We have task_id, _uuid and _deterministic_lineage_path_hash. They can be combined into a single variable. But before that we need to ensure the usage of task_id and uuid is not affected. Eg:- Writer stage might use task_id right now, we should change to this.
Two changes from review feedback on PR #2033: 1. Rename `_is_terminal_stage` → `_is_sink_stage` throughout. "Source" and "sink" reads better than "source" and "terminal". 2. Resumability never fails the pipeline. - Drop `NonDeterministicTaskError`, `NegativePendingCountError`, `_errors`, `errors()`, `_record_error` on the actor. - `apply_deltas` is now never-raising: on a retry firing a different delta for the same task hash, rewrite the pending counter via `(-old + new)` rather than erroring. The newest observation wins. - On the two anomaly cases (different delta for a task whose source is already completed, OR a never-seen task for an already-completed source), log a loud warning AND remove the source from the completed set (in-memory + LMDB) so it's reprocessed cleanly on the next run. The warning points users at https://github.com/NVIDIA-NeMo/Curator to file an issue. - Drop the watchdog thread and `_thread.interrupt_main` in `Pipeline._run_with_resumability`. The method shrinks from ~80 lines to ~25 — just spawn, run, close. Tests updated to assert: rewrite-on-conflict math; un-complete-on- anomaly with LMDB-survival; `apply_deltas` never raises under any input. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two changes from review feedback on PR #2033: 1. Rename `_is_terminal_stage` → `_is_sink_stage` throughout. "Source" and "sink" reads better than "source" and "terminal". 2. Resumability never fails the pipeline. - Drop `NonDeterministicTaskError`, `NegativePendingCountError`, `_errors`, `errors()`, `_record_error` on the actor. - `apply_deltas` is now never-raising: on a retry firing a different delta for the same task hash, rewrite the pending counter via `(-old + new)` rather than erroring. The newest observation wins. - On the two anomaly cases (different delta for a task whose source is already completed, OR a never-seen task for an already-completed source), log a loud warning AND remove the source from the completed set (in-memory + LMDB) so it's reprocessed cleanly on the next run. The warning points users at https://github.com/NVIDIA-NeMo/Curator to file an issue. - Drop the watchdog thread and `_thread.interrupt_main` in `Pipeline._run_with_resumability`. The method shrinks from ~80 lines to ~25 — just spawn, run, close. Tests updated to assert: rewrite-on-conflict math; un-complete-on- anomaly with LMDB-survival; `apply_deltas` never raises under any input. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
733fa37 to
ec75bf2
Compare
|
|
||
| if checkpoint_path is None: | ||
| return executor.execute(self.stages, initial_tasks) | ||
| return self._run_with_resumability(executor, initial_tasks, checkpoint_path) |
There was a problem hiding this comment.
I do not like this, the reason this is being done is probably coz we call ray init inside the executor and not here, so we need to differentiate that. Not ideal for sure. Maybe we can make a function for _start_resumability_actor and call it inside each executor. Design choice.
Two changes from review feedback on PR #2033: 1. Rename `_is_terminal_stage` → `_is_sink_stage` throughout. "Source" and "sink" reads better than "source" and "terminal". 2. Resumability never fails the pipeline. - Drop `NonDeterministicTaskError`, `NegativePendingCountError`, `_errors`, `errors()`, `_record_error` on the actor. - `apply_deltas` is now never-raising: on a retry firing a different delta for the same task hash, rewrite the pending counter via `(-old + new)` rather than erroring. The newest observation wins. - On the two anomaly cases (different delta for a task whose source is already completed, OR a never-seen task for an already-completed source), log a loud warning AND remove the source from the completed set (in-memory + LMDB) so it's reprocessed cleanly on the next run. The warning points users at https://github.com/NVIDIA-NeMo/Curator to file an issue. - Drop the watchdog thread and `_thread.interrupt_main` in `Pipeline._run_with_resumability`. The method shrinks from ~80 lines to ~25 — just spawn, run, close. Tests updated to assert: rewrite-on-conflict math; un-complete-on- anomaly with LMDB-survival; `apply_deltas` never raises under any input. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
5af7e17 to
b6a6c7f
Compare
| if all(not t._source_id for t in tasks): | ||
| return [r for r in results if not isinstance(r, (NoneTask, FailedTask))] |
There was a problem hiding this comment.
The pre-source sentinel filter must also exclude raw
None values; otherwise a fan-out stage that returns None in its output list will pass those values through to the add_stage_perf loop, causing an AttributeError at runtime.
| if all(not t._source_id for t in tasks): | |
| return [r for r in results if not isinstance(r, (NoneTask, FailedTask))] | |
| if all(not t._source_id for t in tasks): | |
| return [r for r in results if r is not None and not isinstance(r, (NoneTask, FailedTask))] |
| ResumabilityActor.options( # type: ignore[attr-defined] | ||
| name=ACTOR_NAME, | ||
| lifetime="detached", | ||
| get_if_exists=True, | ||
| _max_pending_calls=100, | ||
| ).remote(str(checkpoint_path)) |
There was a problem hiding this comment.
Fire-and-forget delta loss when
_max_pending_calls=100 is exceeded
_max_pending_calls=100 on the actor causes Ray to embed a RayActorError in the returned ObjectRef when the queue is full. Because _flush_deltas never calls ray.get() on that ref, the rejection is silently swallowed and the delta is permanently lost. For a pipeline processing more than 100 batches before the actor can drain its queue (trivially possible at high throughput), any source whose counter delta was lost will have _pending stuck above 0 and will never be written to LMDB. The PR description claims "_max_pending_calls provides backpressure", but the actual behavior is silent delta loss, not blocking — the two have opposite throughput implications. A limit of 100 is also very low relative to a typical pipeline's batch concurrency. Consider either raising the limit substantially, adding a ray.get() health check in the watchdog thread to surface errors, or verifying the actual Ray behavior for the fire-and-forget case.
User-visible: `Task.task_id` is now a deterministic 12-char hex hash
derived from the task's lineage path through the pipeline DAG, set by
the framework. User-provided `task_id=...` values continue to work (no
breaking API change) but are always overwritten by the framework's
`_set_lineage` once the task flows through any stage.
Two pipelines run on the same inputs produce byte-identical `task_id`s
across all tasks. This replaces the random `_uuid` field (which is now
dropped) for use cases like deterministic output filenames in dedup
stages, IDs added by `AddId`, etc.
Changes:
- `Task` (nemo_curator/tasks/tasks.py):
- DROP `_uuid` field.
- ADD `_lineage_path` field (init=False, default="").
- ADD `_set_lineage(parent_lineage_paths, child_segment)` method.
Always overwrites both `_lineage_path` and `task_id`. The
`child_segment` can be a positional index (int) for plain emissions
or a content-based string (e.g. from `get_deterministic_id()`) for
source-stage emissions where stability across input reordering
matters.
- ADD `get_deterministic_id() -> str | None` method (default None).
Subclasses with stable content override.
- `FileGroupTask` (nemo_curator/tasks/file_group.py):
- Override `get_deterministic_id()` to return
`get_deterministic_hash(sorted(self.data))` — stable across runs
even if files are added/removed between launches.
- `ProcessingStage.process_batch` (nemo_curator/stages/base.py):
- Default implementation now calls `_set_lineage` on each emitted
child. Stages that override `process_batch` are responsible for
calling `_set_lineage` themselves.
- Migrate 6 `_uuid` use sites to `task.task_id`:
- `stages/text/modules/add_id.py:71`
- `stages/deduplication/semantic/kmeans.py:235, 395`
- `stages/deduplication/fuzzy/buckets_to_edges.py:83`
- `stages/deduplication/fuzzy/connected_components.py:176`
- `stages/deduplication/fuzzy/minhash.py:309`
Tests:
- `tests/tasks/test_tasks.py`: updated fanout test to use
`process_batch` (which exercises the new lineage assignment) and
assert on `task_id` instead of `_uuid`.
- `tests/tasks/test_lineage.py` (NEW): unit tests covering
`_set_lineage` (always overwrites, filters empty parent paths,
accepts string segments), `get_deterministic_id` (default None,
FileGroupTask content-hash with stable sort), and the default
`process_batch` lineage assignment.
This PR is a prerequisite for the resumability work in #2033, which
will be rebased on top to use `task.task_id` as the per-task dedup key
instead of a separate `_deterministic_lineage_path_hash` field.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
75b9298 to
976f16d
Compare
User-visible: `Task.task_id` is now a deterministic 12-char hex hash
derived from the task's lineage path through the pipeline DAG, set by
the framework. User-provided `task_id=...` values continue to work (no
breaking API change) but are always overwritten by the framework's
`_set_lineage` once the task flows through any stage.
Two pipelines run on the same inputs produce byte-identical `task_id`s
across all tasks. This replaces the random `_uuid` field (which is now
dropped) for use cases like deterministic output filenames in dedup
stages, IDs added by `AddId`, etc.
Changes:
- `Task` (nemo_curator/tasks/tasks.py):
- DROP `_uuid` field.
- ADD `_lineage_path` field (init=False, default="").
- ADD `_set_lineage(parent_lineage_paths, child_segment)` method.
Always overwrites both `_lineage_path` and `task_id`. The
`child_segment` can be a positional index (int) for plain emissions
or a content-based string (e.g. from `get_deterministic_id()`) for
source-stage emissions where stability across input reordering
matters.
- ADD `get_deterministic_id() -> str | None` method (default None).
Subclasses with stable content override.
- `FileGroupTask` (nemo_curator/tasks/file_group.py):
- Override `get_deterministic_id()` to return
`get_deterministic_hash(sorted(self.data))` — stable across runs
even if files are added/removed between launches.
- `ProcessingStage.process_batch` (nemo_curator/stages/base.py):
- Default implementation now calls `_set_lineage` on each emitted
child. Stages that override `process_batch` are responsible for
calling `_set_lineage` themselves.
- Migrate 6 `_uuid` use sites to `task.task_id`:
- `stages/text/modules/add_id.py:71`
- `stages/deduplication/semantic/kmeans.py:235, 395`
- `stages/deduplication/fuzzy/buckets_to_edges.py:83`
- `stages/deduplication/fuzzy/connected_components.py:176`
- `stages/deduplication/fuzzy/minhash.py:309`
Tests:
- `tests/tasks/test_tasks.py`: updated fanout test to use
`process_batch` (which exercises the new lineage assignment) and
assert on `task_id` instead of `_uuid`.
- `tests/tasks/test_lineage.py` (NEW): unit tests covering
`_set_lineage` (always overwrites, filters empty parent paths,
accepts string segments), `get_deterministic_id` (default None,
FileGroupTask content-hash with stable sort), and the default
`process_batch` lineage assignment.
This PR is a prerequisite for the resumability work in #2033, which
will be rebased on top to use `task.task_id` as the per-task dedup key
instead of a separate `_deterministic_lineage_path_hash` field.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
07036af to
5757fba
Compare
88b146e to
d73ebd0
Compare
| try: | ||
| ray.init(ignore_reinit_error=True) | ||
| actor_handle = ray.get_actor(ACTOR_NAME) | ||
| ray.get(actor_handle.close.remote(), timeout=10) # type: ignore[attr-defined] | ||
| ray.kill(actor_handle) | ||
| except Exception as e: # noqa: BLE001 | ||
| logger.warning(f"resumability actor cleanup failed: {e}") |
There was a problem hiding this comment.
ray.kill is unreachable when the close() RPC times out. ray.get(..., timeout=10) raises GetTimeoutError (a subclass of Exception), which is caught by the outer except Exception block — so ray.kill(actor_handle) on the next line is never executed. The detached actor keeps running and holds the LMDB file open indefinitely. On the next Pipeline.run(checkpoint_path=...) call, get_if_exists=True returns the leaked actor, which is pointing at the original path but may have stale in-memory state.
Separate the kill so it always runs regardless of whether close() succeeds.
| try: | |
| ray.init(ignore_reinit_error=True) | |
| actor_handle = ray.get_actor(ACTOR_NAME) | |
| ray.get(actor_handle.close.remote(), timeout=10) # type: ignore[attr-defined] | |
| ray.kill(actor_handle) | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning(f"resumability actor cleanup failed: {e}") | |
| try: | |
| ray.init(ignore_reinit_error=True) | |
| actor_handle = ray.get_actor(ACTOR_NAME) | |
| try: | |
| ray.get(actor_handle.close.remote(), timeout=10) # type: ignore[attr-defined] | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning(f"resumability actor close failed: {e}") | |
| finally: | |
| ray.kill(actor_handle) | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning(f"resumability actor cleanup failed: {e}") |
|
/ok to test 5cb8a87 |
| ResumabilityActor.options( # type: ignore[attr-defined] | ||
| name=ACTOR_NAME, | ||
| lifetime="detached", | ||
| get_if_exists=True, | ||
| max_pending_calls=100, | ||
| ).remote(str(checkpoint_path)) |
There was a problem hiding this comment.
Actor handle is discarded; silent resumability bypass on spawn failure
ResumabilityActor.options(...).remote(str(checkpoint_path)) returns an actor handle that is immediately discarded. Ray actor creation is asynchronous — if the actor fails to initialize (e.g., resource pressure, LMDB permission error), the .remote() call still returns a handle, the failure is deferred, and subsequent ray.get_actor(ACTOR_NAME) in _is_active() will raise ValueError returning None. The pipeline then runs without resumability and without any user-visible error; completed sources are never checkpointed.
Capturing the handle and making at least one synchronous ray.get call on a lightweight probe (e.g., are_completed([])) before the executor runs would surface initialization failures eagerly.
|
/ok to test 24fdafc |
| def _apply_resumability_counters(self, input_tasks: list[Task], output_tasks: list[Task]) -> list[Task]: # noqa: C901 | ||
| stage = self.stage | ||
| if getattr(stage, "is_source_stage", False): | ||
| return self._source_counters(output_tasks) | ||
|
|
||
| # Pre-source stages: inputs carry no _source_id, so there's nothing to | ||
| # track yet. Leave outputs untouched. | ||
| if all(not t._source_id for t in input_tasks): | ||
| return output_tasks | ||
|
|
There was a problem hiding this comment.
Single-stage pipeline never marks sources complete
_apply_resumability_counters short-circuits to _source_counters whenever is_source_stage is True, returning before the is_sink_stage branch can fire a −1 delta. Pipeline.build() assigns both flags to the same stage when only one stage is present (stages[0] == stages[-1]). As a result, every source fires +1 but nothing ever fires −1, so _pending stays at 1 for every source indefinitely and the checkpoint is never written. Every resume reprocesses the full pipeline.
|
/ok to test 90f6834 |
|
/ok to test 2eaa29b |
| case (lengths differ) ``None`` children are simply dropped.""" | ||
| if len(results) == len(tasks): | ||
| return [NoneTask.from_input(tasks[i]) if r is None else r for i, r in enumerate(results)] | ||
| return [r for r in results if r is not None] |
There was a problem hiding this comment.
Not really clear to me why we would just drop None children during a fanout stage.
There was a problem hiding this comment.
Furthermore it looks like _post_process_task_ids will already drop any None objects right?
| return results | ||
|
|
||
| @staticmethod | ||
| def _normalize_none(tasks: list[Task], results: list[Task | None]) -> list[Task | None]: |
There was a problem hiding this comment.
Nit but everywhere else is calling them input and output tasks:
| def _normalize_none(tasks: list[Task], results: list[Task | None]) -> list[Task | None]: | |
| def _normalize_none(input_tasks: list[Task], output_tasks: list[Task | None]) -> list[Task | None]: |
| # ------------------------------------------------------------------ # | ||
| def _apply_resumability_counters(self, input_tasks: list[Task], output_tasks: list[Task]) -> list[Task]: # noqa: C901 | ||
| stage = self.stage | ||
| if getattr(stage, "is_source_stage", False): |
There was a problem hiding this comment.
General question: why would the user ever want something other than source stage being the first stage and sink stage being the last stage of the pipeline? Like if the last stage failed but the second to last stage was the sink stage, they just don't want to rerun the last stage?
|
|
||
| is_sink = stage.is_sink_stage | ||
| per_task: list[tuple[str, str, int]] = [] | ||
| real = [t for t in output_tasks if not isinstance(t, (NoneTask, FailedTask))] |
There was a problem hiding this comment.
It seems like since real is only used in the if len(input_tasks) == 1 and len(output_tasks) != 1: block, let's move it there for easier reading?
| surviving source fires a ``+1``.""" | ||
| sources = [t for t in output_tasks if not isinstance(t, (NoneTask, FailedTask))] | ||
| for t in sources: | ||
| t._source_id = t.task_id.rsplit("_", 1)[-1] |
There was a problem hiding this comment.
I am a little confused why the source ID here is the last part of the X_Y_Z chain? I guess maybe I am confused about how task_id versus _source_id are constructed/formatted.
Builds the opt-in resumability layer on the SentinelTask hierarchy from the base PR (#2062): NoneTask/FailedTask are bare SentinelTask subclasses (no identity of their own — dataset_name "none"/"failed", task_id assigned by the adapter like any task). The layer is a thin, counter-only step that runs after PR-2062's always-on task_id assignment. - backends/base.py process_batch flow (always-on, no resumability needed): a returned None ("filter this slot") becomes a NoneTask via a single inline comprehension so every output is a real Task and gets a task_id; ids are assigned by _post_process_task_ids (incl. sentinels); NoneTask/FailedTask are stripped before returning to the next stage. - _apply_resumability_counters (gated on _is_active, counter-only — no task_id work): a source stage stamps _source_id (= the id's last segment), skips already-completed sources, and fires +1; a non-source fires -1/0 per output (NoneTask -> -1, FailedTask -> no delta so its source stays pending and reruns), inheriting _source_id; ambiguous M->K batches warn + skip rather than misattribute. Counters key on the PARENT's identity, which is why the sentinels need no identity of their own. - Task gains _source_id; LMDB actor + client; Pipeline.run(checkpoint_path=...) owns the detached actor lifecycle (Ray 2.54: max_pending_calls). - Make lmdb opt-in at import time: it stays a (locked) dependency, but ACTOR_NAME lives in resumability_client so the always-imported worker path (BaseStageAdapter -> client) never imports resumability_actor / lmdb; lmdb loads only when resumability is actually used. uv.lock adds lmdb 2.2.1. - Resumability state lives in a <checkpoint_path>/.nemo_curator_metadata directory with ONE LMDB file per writer (host-pid), not a single shared file: LMDB can't be safely written by multiple processes across hosts on a networked FS like Lustre. Each actor writes only its own file and reads the union of completed sources across all files on startup, so the tasks of a SLURM array can checkpoint to the same directory without contention. - Tests: sentinels test covers the SentinelTask hierarchy (bare construction, payload rejection, EmptyTask rooted at "0", task_id not user-settable); the adapter test covers the counter math + an end-to-end None->NoneTask->strip case; the actor test covers counter dedup, anomaly recovery, lifecycle, and multi-writer union (SLURM-array safety). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Abhinav Garg <abhgarg@nvidia.com>
|
/ok to test a560bc1 |
|
Heads-up: this PR was inadvertently squash-merged into the |
| def _skip_completed_sources(source_ids: list[str]) -> set[str]: | ||
| """Synchronous lookup of which source_ids are already marked complete | ||
| in LMDB. Used by the source-stage adapter to drop already-done | ||
| sources from its output before downstream stages see them.""" | ||
| a = _actor() | ||
| if a is None or not source_ids: | ||
| return set() | ||
| flags = ray.get(a.are_completed.remote(source_ids)) # type: ignore[attr-defined] |
There was a problem hiding this comment.
are_completed synchronous call unguarded against max_pending_calls rejection
ray.get(a.are_completed.remote(...)) shares the actor's 100-call pending queue with all the fire-and-forget apply_deltas calls fired by downstream stages. If 100 apply_deltas are queued when the source stage invokes _skip_completed_sources, Ray embeds a RayActorError in the returned ObjectRef; ray.get() then raises it. Unlike _flush_deltas, this error is NOT silently discarded — it propagates up through _source_counters → _apply_resumability_counters → process_batch, crashing the source-stage batch. In a long-running pipeline where the source stage can fire while downstream stages are actively queuing deltas this is a concrete failure path. Consider increasing max_pending_calls substantially, or wrapping this call in a try/except and falling back gracefully (e.g. assume no sources are complete, skip the filter step).
What
Adds opt-in pipeline resumability via source-level counter checkpointing,
built on top of the sentinel-task refactor in #2062 (this PR is stacked on
that branch — review/merge #2062 first).
Pipeline.run(checkpoint_path=...)tracks which source partitions have fullydrained through the pipeline and skips already-completed ones on a rerun, so an
interrupted run resumes without reprocessing finished work.
How
SentinelTaskbase):NoneTask/FailedTaskarebare subclasses — no identity of their own (
dataset_name"none"/"failed",task_idassigned by the adapter like any task).backends/base.pyprocess_batch(always-on): a returnedNone("filter this slot") becomes a
NoneTaskvia a single inline comprehension soevery output is a real
Taskand gets atask_id; sentinels are strippedbefore the next stage.
_apply_resumability_counters(gated on_is_active, counter-only): asource stamps
_source_id, skips completed sources, fires+1; a non-sourcefires
-1/0per output (NoneTask→-1,FailedTask→ no delta, so itssource stays pending and reruns). Counters key on the parent's identity —
which is why the sentinels need none of their own. Ambiguous
M→Kbatcheswarn + skip rather than misattribute.
lmdbis a (locked) dependency but stays opt-in atimport time:
ACTOR_NAMElives inresumability_clientso the always-importedworker path never imports
lmdb; it loads only when resumability is used.<checkpoint_path>/.nemo_curator_metadatawith one LMDB file per writer (
<host>-<pid>), not a single shared file(LMDB can't be safely shared across hosts on a networked FS like Lustre). Each
actor writes only its own file and reads the union of completed sources
across all files on startup, so the tasks of a SLURM array can checkpoint to
the same directory without contention.
Testing
tests/tasks/test_sentinels.py—SentinelTaskhierarchy (bare construction,payload rejection,
EmptyTaskrooted at"0",task_idnot user-settable).tests/backends/test_resumability_adapter.py— counter math + an end-to-endNone→NoneTask→strip case (actor RPCs mocked).tests/utils/test_resumability_actor.py— counter dedup, anomaly recovery,lifecycle, and multi-writer union (SLURM-array safety).
nemo-curatorcontainer: aSource → Flaky(random FailedTask) → Sinkpipeline re-run against one on-diskLMDB checkpoint converges, skips already-completed sources on resume, and
processes each source exactly once.