fix: preserve ordered seed dataset position on resume#710
Conversation
Preserve planned row-group start offsets during resume so ordered seed datasets continue from the next seed row instead of replaying already-consumed rows. Fixes #709
- Simplify _run_batch context-var setup so current_row_group is set consistently in fresh and resumed sync runs (matches the (x/X) log prefix the async path already emits) and add a docstring spelling out which ContextVars the function owns. - Document RowGroupResumePlan and build_row_group_resume_plan, and make the plan dataclass frozen+slots since it is a one-shot value. - Comment the modulo cycling logic in _index_range_at_offset. - Add a scheduler test verifying fresh async runs auto-derive the per-row-group offsets from row-group sizes (no caller-supplied offsets) so ordered generators stay parallel-safe across row groups. - Add a wraparound regression test that resumes past a full seed cycle, exercising the relative_offset == 0 branch the original #709 regression test missed.
- update current_row_group ContextVar comment to reflect that both the async scheduler and the sync engine's _run_batch set it - move pre_batch_snapshot capture (and ran_pre_batch flag) inside the try/finally in _run_batch so a failure between ContextVar.set and the snapshot call still resets the tokens - add a direct unit test for the relative_offset == 0 wraparound branch in _index_range_at_offset to lock in the fresh-cycle restart behavior
Code Review: PR #710 — fix: preserve ordered seed dataset position on resumeSummaryFixes #709, where The change is well-scoped: a new FindingsCorrectness
Async fresh-run behavioral changeThe async scheduler now seeks to per-row-group offsets on fresh runs as well as resumed runs. The PR description frames this as removing an implicit shared-reader assumption (parallel-safe).
Style / project conventions
Minor nits (non-blocking)
Test coverageStrong. Coverage spans:
Test refactors ( Security / performanceNo security concerns. No new I/O, no string interpolation against external data, no deserialization. Performance impact negligible: one extra VerdictLooks good. The fix targets the reported bug, the design (per-row-group offsets via ContextVar) is the right shape — order-dependent generators stay decoupled from the engine — and test coverage is thorough on both sync and async paths. Recommend a manual smoke test of fresh async ORDERED seed generation against a real on-disk seed dataset before release to validate the behavioral change called out under "Async fresh-run behavioral change" above. Otherwise no blocking concerns. |
Greptile SummaryThis PR fixes
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py | Core fix: generate_from_scratch now seeks ORDERED readers to current_row_group_start_offset on every call; _index_range_at_offset correctly handles full-cycle wraparound (modulo == 0 returns original range instead of degenerate empty range). |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | Extracts row-group resume logic into build_row_group_resume_plan; threads row_group_start_offset through _run_batch with proper try/finally ContextVar lifecycle; offsets are computed from the original plan so they stay stable when there are holes. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | Adds _rg_start_offset_map (auto-derived for fresh runs, caller-supplied for resume); sets/resets current_row_group_start_offset per task in _execute_task_inner; removes the implicit shared-reader assumption for fresh ORDERED async runs. |
| packages/data-designer-engine/src/data_designer/engine/context.py | Adds current_row_group_start_offset ContextVar (default None) so ORDERED generators can observe their row group's planned start offset without coupling to the engine. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/progress_tracker.py | Adds initial_completed parameter; rate calculation now uses run_completed = completed - _initial_completed so resumed runs report throughput for this run only; next_log_at advances past the initial offset. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_progress_reporter.py | Adds scheduled_records parameter to log_start; when provided, uses it directly rather than summing tracker totals, giving correct task-count logging on resume. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py | Adds three end-to-end regression tests: basic #709 repro, cycle-boundary wraparound, and PartitionBlock resume; also cleans up several tests to use real builds instead of mocked internals. |
| packages/data-designer-engine/tests/engine/column_generators/generators/test_seed_dataset.py | Three new unit tests covering _reset_batch_reader with offset, cycle-boundary wraparound, and the full generate_from_scratch to current_row_group_start_offset flow. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py | Two new async tests: one verifying explicit offset injection for resume, one locking in auto-derived offsets for fresh runs; plus a progress-tracker resume test. |
Sequence Diagram
sequenceDiagram
participant DB as DatasetBuilder
participant RP as build_row_group_resume_plan
participant AS as AsyncTaskScheduler
participant CV as ContextVar current_row_group_start_offset
participant GEN as SeedDatasetColumnGenerator
Note over DB: Resume path
DB->>RP: original_target, num_records, buffer_size, completed_ids
RP-->>DB: RowGroupResumePlan
DB->>AS: "row_groups=remaining, row_group_start_offsets, initial_completed_records"
Note over AS: Per task execution
AS->>CV: set(rg_start_offset_map[task.row_group])
AS->>GEN: generate_from_scratch(num_records)
GEN->>CV: "get() -> row_group_start_offset"
GEN->>GEN: _index_range_at_offset(offset)
GEN->>GEN: _reset_batch_reader(num_records, record_offset)
GEN-->>AS: DataFrame at correct seed position
AS->>CV: reset(token)
Note over DB: Sync resume path
DB->>DB: "_run_batch(row_group_start_offset=sum(sizes before batch))"
DB->>CV: set(row_group_start_offset)
DB->>GEN: generate_from_scratch(num_records)
GEN->>GEN: seek to correct seed row
DB->>CV: reset(token) in finally
Reviews (4): Last reviewed commit: "fix async resume progress accounting" | Re-trigger Greptile
Companion to the existing IndexRange resume test. Locks in correct behavior when the seed selection comes from PartitionBlock — its to_index_range produces a contiguous range today, but nothing else asserts that contract. The test crosses a cycle boundary inside the partition (4 records over a 2-row partition) so it exercises both the offset-into-partition branch and the relative_offset == 0 wraparound branch end-to-end.
📋 Summary
ResumeMode.ALWAYSreplayed the first seed row(s) for ordered seed datasets after an interrupted run, because the resumed process'sSeedDatasetColumnGeneratoralways started reading at the configuredIndexRange.start. This PR threads each row group's planned start offset through both the sync and async engines so ordered seed readers seek to the correct position when resuming, and adds regression coverage that mirrors the issue's minimal repro.Scope. This fixes resume for
SamplingStrategy.ORDEREDwith or without a selection strategy (IndexRange,PartitionBlock, or none).SamplingStrategy.SHUFFLEis unchanged: its underlyingORDER BY RANDOM()query is unseeded, so a resumed shuffled run already produces a fresh random order with potential row duplication. That's a pre-existing limitation, not introduced or addressed here.🔗 Related Issue
Fixes #709
🔄 Changes
current_row_group_start_offsetContextVar so order-dependent generators can observe each row group's planned start offset without coupling to the engine. Comment oncurrent_row_groupupdated to reflect that both engines now set it.SeedDatasetColumnGenerator._reset_batch_readeraccepts arecord_offsetand computes the seek range via_index_range_at_offset, including modulo wraparound for cycling through the selection.RowGroupResumePlan(frozen + slots) andbuild_row_group_resume_plan. The plan is computed against the original row-group layout so per-group offsets stay stable when resume has holes.AsyncTaskSchedulerauto-derives offsets for fresh runs (parallel-safe; removes the implicit shared-reader assumption).row_group_start_offsetthrough_run_batch. The method also setscurrent_row_groupfor both fresh and resumed runs so the(x/X)log prefix is consistent across engines, owns both ContextVars inside a singletry/finally, and capturespre_batch_snapshotinside thattryso any failure betweensetand the snapshot still resets tokens.🧪 Testing
pytest tests/engine/column_generators/generators/test_seed_dataset.py tests/engine/dataset_builders/test_async_scheduler.py tests/engine/dataset_builders/test_dataset_builder.py→ 251 passedtest_seed_dataset.py:test_seed_dataset_column_generator_reset_batch_reader_applies_record_offset,test_seed_dataset_column_generator_reset_batch_reader_wraps_at_cycle_boundary,test_seed_dataset_column_generator_ordered_generation_uses_row_group_offsettest_async_scheduler.py:test_scheduler_auto_computes_row_group_start_offsets_for_fresh_runs(fresh async runs auto-derive per-row-group offsets across multiple row groups, including a non-aligned last group)test_dataset_builder.py: regression test mirroring the Resume replays ordered seed rows after completed checkpoints #709 minimal repro (sync resume after a simulated interruption);test_build_resume_ordered_seed_dataset_extension_wraps_at_cycle_boundaryexercising the cycle-boundary branch end-to-end;test_build_resume_ordered_seed_dataset_with_partition_block_continues_within_partitioncovering ORDERED + PartitionBlock resume and exercising both the offset-into-partition and wraparound branches✅ Checklist
🔍 Attention Areas
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py—RowGroupResumePlansemantics (offsets snapshot the original plan, not the remaining list, so they stay stable when there are holes) and the_run_batchContextVar lifecycle.packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py—_index_range_at_offsetmodulo wraparound:record_offset % selected_size == 0must return the original full range so the next read restarts a fresh cycle (not a degenerate empty range).