You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Curator pipelines processing millions of source files routinely run for
hours. Any non-graceful termination — OOM, autoscale-induced worker kill,
a transient infra issue, or a Ctrl-C — currently means starting over
from scratch. We want users to be able to relaunch the same pipeline with
a checkpoint path and have it pick up exactly where it left off:
This document describes the design we landed on. The companion PR
implements it as a small, isolated layer; if the user doesn't pass checkpoint_path, nothing in the new layer runs and existing pipelines
behave exactly as today.
Design choices
The four principles below drive every concrete decision.
1. Narrow scope; loud refusal outside it
We deliberately support a limited set of stage shapes:
Per-task process(task) — 1 → 0|1|N.
Batched process_batch(tasks) — N → N, strictly positional. The user
marks individual slots with NoneTask (intentional filter) or FailedTask (retry on resume).
Single-input fan-out — process_batch(tasks) where len(tasks)==1
(the typical case for readers emitting N source partitions from EmptyTask).
Everything else — cross-source commingling, batched fan-out where len(input)>1 and len(output)!=len(input), shuffle stages — is rejected
at the stage adapter with a clear ValueError pointing the user at NoneTask / FailedTask. No silent degradation. If we can't reason
about the lineage, we don't try.
2. Near-zero user-facing surface
For the cases we do support, user stage code does not change. Curator's
existing convention of returning None from process() to mean "filter
this out" continues to work — internally we wrap it as NoneTask. The
two new flags is_source_stage and _is_terminal_stage default
sensibly (first stage and last stage, respectively) so most pipelines
never have to set them. The new public API is one optional argument on Pipeline.run plus two opt-in sentinels.
3. Built for scale: fire-and-forget + write-once
Two scaling properties are non-negotiable:
Workers never block on the resumability layer. Every counter
update is fire-and-forget — the worker calls the actor's apply_deltas via .remote() and never ray.gets. Backpressure is
handled by Ray's _max_pending_calls cap on the actor, which is the
only place a worker ever waits on the resumability layer, and only
when the actor is genuinely behind.
One LMDB write per source, at end-of-life. Counter math runs
entirely in memory on the actor. We touch the database only when a
source's pending counter hits zero — at that point we write a single
row (source_id → b"1"). For a 10M-source pipeline that's 10M tiny
writes across the whole run, not per-task or per-batch writes.
4. Minimum surface, single point of integration
The whole runtime is intentionally small: one file for the actor and
counter math, one for client helpers, one hook in BaseStageAdapter,
one method in Pipeline.run, one sentinels file. Per-backend changes
(Xenna, RayData, RayActorPool) are zero — all backends already route
through BaseStageAdapter, and Pipeline.run owns the actor lifecycle.
Algorithm
Each task carries two pieces of bookkeeping:
_deterministic_lineage_path_hash — a hash of the task's index path
through the pipeline DAG. Deterministic across runs: the same pipeline
on the same inputs produces byte-identical hashes.
_source_id — a string identifying which source the task descends
from. Set explicitly at the source stage, inherited everywhere
downstream.
A singleton Ray actor keeps two in-memory dicts:
pending: dict[source_id → int] — how many in-flight tasks each
source still has.
applied: dict[task_hash → delta] — which deltas we've already
counted, keyed by the input task's hash.
After each process_batch invocation, the stage adapter fires one entry
per task — (task_hash, source_id, delta) — at the actor and does not
wait. The actor processes each entry as follows:
Same task_hash previously applied with the same delta → silent skip
(Ray retry idempotency).
Same task_hash previously applied with a different delta → error.
The user's code isn't a pure function of its input, which resumability
can't tolerate.
New task_hash → record in applied, apply the delta to pending[source_id].
When pending[source_id] hits zero, the actor writes source_id to
LMDB — that's the only durable state. When pending[source_id] would
go below zero, the actor records an error.
The executor runs a background watchdog thread that polls actor.errors() every minute. The first non-empty result triggers _thread.interrupt_main(), which converts to KeyboardInterrupt in the
main thread; the main thread catches it and re-raises the actual error.
Counter math (the deltas)
Stage kind
Output slot
Δ on pending[source_id]
Source stage (is_source_stage=True)
each emitted child
+1
Non-terminal, len(input)>1, real Task
slot
0
Non-terminal, len(input)>1, NoneTask
slot
-1
Non-terminal, len(input)>1, FailedTask
slot
no entry (counter unchanged)
Non-terminal, len(input)==1, fan-out of N
the list
+(N-1)
Terminal stage, real Task
slot
-1
Terminal stage, NoneTask
slot
-1
Terminal stage, FailedTask
slot
no entry
Why this works: pending[S] = "number of live tasks in S's subtree."
Every transition either preserves the count (1:1), strictly reduces it
(filter or terminal sink), or grows it at controlled points (fan-out
which is only allowed when len(input)==1). FailedTask does not
record an entry, so the source's counter never reaches zero until that
input is retried — that's the rerun-on-resume signal.
What lives where
The PR adds a small set of new files and small additions to existing
ones. The full surface is:
File
What changes
nemo_curator/tasks/tasks.py
Three new fields on Task (_lineage_path, _deterministic_lineage_path_hash, _source_id) and a _set_lineage(parents, idx) method.
nemo_curator/tasks/sentinels.py (new)
NoneTask and FailedTask Task subclasses with a msg field.
nemo_curator/stages/base.py
Two new ClassVars on ProcessingStage (is_source_stage, _is_terminal_stage), plus assign_root_lineage and assign_child_lineage helpers.
One hook in BaseStageAdapter.process_batch that wraps the stage call with the resumability post-process. Pre-source tasks (_source_id == "") short-circuit.
nemo_curator/pipeline/pipeline.py
New checkpoint_path argument on Pipeline.run. Build-time validation that exactly one stage is the source. _run_with_resumability method that owns actor spawn, watchdog, final check, and cleanup.
pyproject.toml
Add lmdb>=1.4 dependency.
The per-backend executors (Xenna, RayData, RayActorPool) are not
touched. The actor is detached, workers find it by name, and Pipeline.run is the single integration point.
How resume actually happens
On a fresh run, the actor loads the existing LMDB into an in-memory _completed set on startup. When the source stage emits its candidate
sources, the source-stage adapter calls _skip_completed_sources —
which is a single RPC that returns the set of source_ids already in _completed. Those candidates are dropped from the output; the
remaining sources flow into the pipeline. Their counters initialize
from +1 and flow normally.
Because the source-stage adapter handles this transparently, no other
stage in the pipeline ever knows resumability is happening.
Error model
Three error classes are introduced:
NonDeterministicTaskError — same task hash produced two
different deltas across runs. User code isn't a pure function of its
input.
NegativePendingCountError — a source's pending counter went
below zero. Should be impossible with deterministic user code; it's a
hard invariant violation.
(RayActorError surfaced via watchdog) — actor died unexpectedly
(OOM, kill).
All three surface through the same path: watchdog thread polls, _thread.interrupt_main() injects KeyboardInterrupt, the main thread
re-raises the real error. Worst-case detection latency: one polling
interval (60 seconds by default).
Open questions / design points worth feedback in this thread
Polling interval — currently defaulted to 60 seconds in Pipeline.run. Trade-off is detection latency vs. RPC load on the
actor. Should this be tunable via Pipeline.run(checkpoint_path=..., resumability_poll_interval=...), or kept as a constant?
Source-id collision with custom pipelines — source_id is the
integer index of the output from the source stage. If the source
stage's output order is non-deterministic across runs, source_ids
shift and the resume map points to the wrong files. The PR enforces
this informally (the existing FilePartitioningStage sorts by file
path); should we make the source stage explicitly declare its
ordering invariant?
48-bit hash collision risk at 100M+ sources — get_deterministic_hash returns 12 hex chars (48 bits). Birthday
collisions at ~16M items. For Curator's high-end scale we'd want 64
bits. Add a length parameter to the helper, or just bump globally?
is_source_stage default behavior — currently defaults to the
first stage in the pipeline if no stage is explicitly marked. Same
for _is_terminal_stage defaulting to the last stage. Some users
may expect a pipeline with a Filter as the first stage NOT to be
treated as a source. Should we require an explicit is_source_stage
marker on at least one stage to enable resumability?
Actor death recovery — if the resumability actor itself crashes
mid-run, we lose _pending and _applied. Currently we just fail
the run. Should we instead periodically snapshot _pending to LMDB
so the actor can recover? Probably not worth it for v1 — actor death
is rare and the existing source-level checkpoint is enough for
recovery.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
Curator Pipeline Resumability — Design Discussion
Problem
Curator pipelines processing millions of source files routinely run for
hours. Any non-graceful termination — OOM, autoscale-induced worker kill,
a transient infra issue, or a
Ctrl-C— currently means starting overfrom scratch. We want users to be able to relaunch the same pipeline with
a checkpoint path and have it pick up exactly where it left off:
This document describes the design we landed on. The companion PR
implements it as a small, isolated layer; if the user doesn't pass
checkpoint_path, nothing in the new layer runs and existing pipelinesbehave exactly as today.
Design choices
The four principles below drive every concrete decision.
1. Narrow scope; loud refusal outside it
We deliberately support a limited set of stage shapes:
process(task)—1 → 0|1|N.process_batch(tasks)—N → N, strictly positional. The usermarks individual slots with
NoneTask(intentional filter) orFailedTask(retry on resume).process_batch(tasks)wherelen(tasks)==1(the typical case for readers emitting N source partitions from
EmptyTask).Everything else — cross-source commingling, batched fan-out where
len(input)>1 and len(output)!=len(input), shuffle stages — is rejectedat the stage adapter with a clear
ValueErrorpointing the user atNoneTask/FailedTask. No silent degradation. If we can't reasonabout the lineage, we don't try.
2. Near-zero user-facing surface
For the cases we do support, user stage code does not change. Curator's
existing convention of returning
Nonefromprocess()to mean "filterthis out" continues to work — internally we wrap it as
NoneTask. Thetwo new flags
is_source_stageand_is_terminal_stagedefaultsensibly (first stage and last stage, respectively) so most pipelines
never have to set them. The new public API is one optional argument on
Pipeline.runplus two opt-in sentinels.3. Built for scale: fire-and-forget + write-once
Two scaling properties are non-negotiable:
update is fire-and-forget — the worker calls the actor's
apply_deltasvia.remote()and neverray.gets. Backpressure ishandled by Ray's
_max_pending_callscap on the actor, which is theonly place a worker ever waits on the resumability layer, and only
when the actor is genuinely behind.
entirely in memory on the actor. We touch the database only when a
source's pending counter hits zero — at that point we write a single
row (
source_id → b"1"). For a 10M-source pipeline that's 10M tinywrites across the whole run, not per-task or per-batch writes.
4. Minimum surface, single point of integration
The whole runtime is intentionally small: one file for the actor and
counter math, one for client helpers, one hook in
BaseStageAdapter,one method in
Pipeline.run, one sentinels file. Per-backend changes(Xenna, RayData, RayActorPool) are zero — all backends already route
through
BaseStageAdapter, andPipeline.runowns the actor lifecycle.Algorithm
Each task carries two pieces of bookkeeping:
_deterministic_lineage_path_hash— a hash of the task's index paththrough the pipeline DAG. Deterministic across runs: the same pipeline
on the same inputs produces byte-identical hashes.
_source_id— a string identifying which source the task descendsfrom. Set explicitly at the source stage, inherited everywhere
downstream.
A singleton Ray actor keeps two in-memory dicts:
pending: dict[source_id → int]— how many in-flight tasks eachsource still has.
applied: dict[task_hash → delta]— which deltas we've alreadycounted, keyed by the input task's hash.
After each
process_batchinvocation, the stage adapter fires one entryper task —
(task_hash, source_id, delta)— at the actor and does notwait. The actor processes each entry as follows:
task_hashpreviously applied with the same delta → silent skip(Ray retry idempotency).
task_hashpreviously applied with a different delta → error.The user's code isn't a pure function of its input, which resumability
can't tolerate.
task_hash→ record inapplied, apply the delta topending[source_id].When
pending[source_id]hits zero, the actor writessource_idtoLMDB — that's the only durable state. When
pending[source_id]wouldgo below zero, the actor records an error.
The executor runs a background watchdog thread that polls
actor.errors()every minute. The first non-empty result triggers_thread.interrupt_main(), which converts toKeyboardInterruptin themain thread; the main thread catches it and re-raises the actual error.
Counter math (the deltas)
pending[source_id]is_source_stage=True)+1len(input)>1, real Task0len(input)>1,NoneTask-1len(input)>1,FailedTasklen(input)==1, fan-out of N+(N-1)-1NoneTask-1FailedTaskWhy this works:
pending[S]= "number of live tasks in S's subtree."Every transition either preserves the count (
1:1), strictly reduces it(filter or terminal sink), or grows it at controlled points (fan-out
which is only allowed when
len(input)==1).FailedTaskdoes notrecord an entry, so the source's counter never reaches zero until that
input is retried — that's the rerun-on-resume signal.
What lives where
The PR adds a small set of new files and small additions to existing
ones. The full surface is:
nemo_curator/tasks/tasks.pyTask(_lineage_path,_deterministic_lineage_path_hash,_source_id) and a_set_lineage(parents, idx)method.nemo_curator/tasks/sentinels.py(new)NoneTaskandFailedTaskTask subclasses with amsgfield.nemo_curator/stages/base.pyProcessingStage(is_source_stage,_is_terminal_stage), plusassign_root_lineageandassign_child_lineagehelpers.nemo_curator/utils/resumability_actor.py(new)ResumabilityActoritself: counter math, LMDB writes, error recording. ~100 lines.nemo_curator/utils/resumability_client.py(new)_is_active,_flush_deltas,_skip_completed_sources— module-level helpers. ~30 lines.nemo_curator/backends/base.pyBaseStageAdapter.process_batchthat wraps the stage call with the resumability post-process. Pre-source tasks (_source_id == "") short-circuit.nemo_curator/pipeline/pipeline.pycheckpoint_pathargument onPipeline.run. Build-time validation that exactly one stage is the source._run_with_resumabilitymethod that owns actor spawn, watchdog, final check, and cleanup.pyproject.tomllmdb>=1.4dependency.The per-backend executors (Xenna, RayData, RayActorPool) are not
touched. The actor is detached, workers find it by name, and
Pipeline.runis the single integration point.How resume actually happens
On a fresh run, the actor loads the existing LMDB into an in-memory
_completedset on startup. When the source stage emits its candidatesources, the source-stage adapter calls
_skip_completed_sources—which is a single RPC that returns the set of
source_ids already in_completed. Those candidates are dropped from the output; theremaining sources flow into the pipeline. Their counters initialize
from
+1and flow normally.Because the source-stage adapter handles this transparently, no other
stage in the pipeline ever knows resumability is happening.
Error model
Three error classes are introduced:
NonDeterministicTaskError— same task hash produced twodifferent deltas across runs. User code isn't a pure function of its
input.
NegativePendingCountError— a source's pending counter wentbelow zero. Should be impossible with deterministic user code; it's a
hard invariant violation.
(OOM, kill).
All three surface through the same path: watchdog thread polls,
_thread.interrupt_main()injectsKeyboardInterrupt, the main threadre-raises the real error. Worst-case detection latency: one polling
interval (60 seconds by default).
Open questions / design points worth feedback in this thread
Pipeline.run. Trade-off is detection latency vs. RPC load on theactor. Should this be tunable via
Pipeline.run(checkpoint_path=..., resumability_poll_interval=...), or kept as a constant?source_idis theinteger index of the output from the source stage. If the source
stage's output order is non-deterministic across runs, source_ids
shift and the resume map points to the wrong files. The PR enforces
this informally (the existing
FilePartitioningStagesorts by filepath); should we make the source stage explicitly declare its
ordering invariant?
get_deterministic_hashreturns 12 hex chars (48 bits). Birthdaycollisions at ~16M items. For Curator's high-end scale we'd want 64
bits. Add a
lengthparameter to the helper, or just bump globally?is_source_stagedefault behavior — currently defaults to thefirst stage in the pipeline if no stage is explicitly marked. Same
for
_is_terminal_stagedefaulting to the last stage. Some usersmay expect a pipeline with a
Filteras the first stage NOT to betreated as a source. Should we require an explicit
is_source_stagemarker on at least one stage to enable resumability?
mid-run, we lose
_pendingand_applied. Currently we just failthe run. Should we instead periodically snapshot
_pendingto LMDBso the actor can recover? Probably not worth it for v1 — actor death
is rare and the existing source-level checkpoint is enough for
recovery.
Comments welcome on any of these.
Beta Was this translation helpful? Give feedback.
All reactions