Skip to content

fix: preserve ordered seed dataset position on resume#710

Open
nabinchha wants to merge 7 commits into
mainfrom
nmulepati/fix-709-seed-resume-offset
Open

fix: preserve ordered seed dataset position on resume#710
nabinchha wants to merge 7 commits into
mainfrom
nmulepati/fix-709-seed-resume-offset

Conversation

@nabinchha
Copy link
Copy Markdown
Contributor

@nabinchha nabinchha commented May 28, 2026

📋 Summary

ResumeMode.ALWAYS replayed the first seed row(s) for ordered seed datasets after an interrupted run, because the resumed process's SeedDatasetColumnGenerator always started reading at the configured IndexRange.start. This PR threads each row group's planned start offset through both the sync and async engines so ordered seed readers seek to the correct position when resuming, and adds regression coverage that mirrors the issue's minimal repro.

Scope. This fixes resume for SamplingStrategy.ORDERED with or without a selection strategy (IndexRange, PartitionBlock, or none). SamplingStrategy.SHUFFLE is unchanged: its underlying ORDER BY RANDOM() query is unseeded, so a resumed shuffled run already produces a fresh random order with potential row duplication. That's a pre-existing limitation, not introduced or addressed here.

🔗 Related Issue

Fixes #709

🔄 Changes

  • Add current_row_group_start_offset ContextVar so order-dependent generators can observe each row group's planned start offset without coupling to the engine. Comment on current_row_group updated to reflect that both engines now set it.
  • SeedDatasetColumnGenerator._reset_batch_reader accepts a record_offset and computes the seek range via _index_range_at_offset, including modulo wraparound for cycling through the selection.
  • Async resume: introduce RowGroupResumePlan (frozen + slots) and build_row_group_resume_plan. The plan is computed against the original row-group layout so per-group offsets stay stable when resume has holes. AsyncTaskScheduler auto-derives offsets for fresh runs (parallel-safe; removes the implicit shared-reader assumption).
  • Sync resume: thread row_group_start_offset through _run_batch. The method also sets current_row_group for both fresh and resumed runs so the (x/X) log prefix is consistent across engines, owns both ContextVars inside a single try/finally, and captures pre_batch_snapshot inside that try so any failure between set and the snapshot still resets tokens.

🧪 Testing

  • Targeted engine suites pass: pytest tests/engine/column_generators/generators/test_seed_dataset.py tests/engine/dataset_builders/test_async_scheduler.py tests/engine/dataset_builders/test_dataset_builder.py → 251 passed
  • Unit tests added/updated:
    • test_seed_dataset.py: test_seed_dataset_column_generator_reset_batch_reader_applies_record_offset, test_seed_dataset_column_generator_reset_batch_reader_wraps_at_cycle_boundary, test_seed_dataset_column_generator_ordered_generation_uses_row_group_offset
    • test_async_scheduler.py: test_scheduler_auto_computes_row_group_start_offsets_for_fresh_runs (fresh async runs auto-derive per-row-group offsets across multiple row groups, including a non-aligned last group)
    • test_dataset_builder.py: regression test mirroring the Resume replays ordered seed rows after completed checkpoints #709 minimal repro (sync resume after a simulated interruption); test_build_resume_ordered_seed_dataset_extension_wraps_at_cycle_boundary exercising the cycle-boundary branch end-to-end; test_build_resume_ordered_seed_dataset_with_partition_block_continues_within_partition covering ORDERED + PartitionBlock resume and exercising both the offset-into-partition and wraparound branches
  • E2E tests added/updated — N/A (covered by unit + integration regressions above)

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO) — will sign off via the dco-assistant bot comment after the PR is open
  • Architecture docs updated — N/A (fix preserves the existing public API; no architectural shift)

🔍 Attention Areas

⚠️ Reviewers: Please pay special attention to the following:

nabinchha added 4 commits May 28, 2026 09:52
Preserve planned row-group start offsets during resume so ordered seed datasets continue from the next seed row instead of replaying already-consumed rows.

Fixes #709
- Simplify _run_batch context-var setup so current_row_group is set
  consistently in fresh and resumed sync runs (matches the (x/X) log
  prefix the async path already emits) and add a docstring spelling
  out which ContextVars the function owns.
- Document RowGroupResumePlan and build_row_group_resume_plan, and
  make the plan dataclass frozen+slots since it is a one-shot value.
- Comment the modulo cycling logic in _index_range_at_offset.
- Add a scheduler test verifying fresh async runs auto-derive the
  per-row-group offsets from row-group sizes (no caller-supplied
  offsets) so ordered generators stay parallel-safe across row groups.
- Add a wraparound regression test that resumes past a full seed
  cycle, exercising the relative_offset == 0 branch the original
  #709 regression test missed.
- update current_row_group ContextVar comment to reflect that both the
  async scheduler and the sync engine's _run_batch set it
- move pre_batch_snapshot capture (and ran_pre_batch flag) inside the
  try/finally in _run_batch so a failure between ContextVar.set and the
  snapshot call still resets the tokens
- add a direct unit test for the relative_offset == 0 wraparound branch
  in _index_range_at_offset to lock in the fresh-cycle restart behavior
@nabinchha nabinchha requested a review from a team as a code owner May 28, 2026 19:37
@github-actions
Copy link
Copy Markdown
Contributor

Code Review: PR #710 — fix: preserve ordered seed dataset position on resume

Summary

Fixes #709, where ResumeMode.ALWAYS replayed the first seed row(s) for ORDERED seed datasets after an interrupted run. The fix threads each row group's planned start offset through both the sync and async engines so ordered seed readers seek to the correct position on resume.

The change is well-scoped: a new current_row_group_start_offset ContextVar carries the planned offset to order-dependent generators without coupling them to the engine. SeedDatasetColumnGenerator._index_range_at_offset derives the seek range with modulo wraparound for cycling selections. The async path introduces a RowGroupResumePlan dataclass and a pure helper build_row_group_resume_plan; the sync path threads the offset through _run_batch and tightens its ContextVar lifecycle.

Findings

Correctness

  • _index_range_at_offset modulo wraparound is correct. The relative_offset == 0 branch returning self._index_range is the right call — without it, an exactly-cycled offset would yield a start=selected_end+1, end=selected_end empty range. Test test_seed_dataset_column_generator_reset_batch_reader_wraps_at_cycle_boundary covers this directly, and test_build_resume_ordered_seed_dataset_extension_wraps_at_cycle_boundary exercises it end-to-end.
  • Resume plan offsets are stable under holes. build_row_group_resume_plan snapshots offsets from the full original plan and then filters to the remaining groups (dataset_builder.py:222-232). This is the right design — recomputing offsets from remaining_row_groups would shift them when there are gaps. The dedicated unit test test_row_group_resume_plan_keeps_original_offsets_for_remaining_groups (with completed_ids={0, 2}) locks this in.
  • Sync _run_batch ContextVar lifecycle is sound. Both tokens are captured before the try, both reset in finally, and the snapshot/usage-deltas/post-batch logic now lives inside the same try so any failure between set and the rest of the body still resets tokens. The if token is not None guards correctly handle the preview path (no current_batch_number, no offset).
  • Fresh sync runs gain the (x/X) log prefix. Previously the prefix only appeared in async/resume paths; it's now consistent across engines, which the PR description calls out and the code matches (dataset_builder.py:1208-1209).

Async fresh-run behavioral change

The async scheduler now seeks to per-row-group offsets on fresh runs as well as resumed runs. The PR description frames this as removing an implicit shared-reader assumption (parallel-safe).

  • LocalFileSeedReader.create_batch_reader constructs a new query result and DuckDBSeedReaderBatchReader on each call (seed_reader.py:228-243), so creating multiple readers concurrently is safe in principle. Other implementations (DirectorySeedReader, FileContentsSeedReader, HuggingFaceSeedReader, AgentRolloutSeedReader) all derive from this same factory pattern, so the contract holds for them as well.
  • test_scheduler_auto_computes_row_group_start_offsets_for_fresh_runs only uses a synthetic in-memory generator; it locks in the offset propagation but does not exercise concurrent create_batch_reader calls against any real seed reader. Suggest: validate manually (or in a follow-up integration test) that the existing async ordered-seed flow against a real on-disk seed dataset still produces the expected sequence under buffer_size=1 with multiple concurrent row groups. The likelihood of regression is low, but this is the kind of change worth a smoke test before release.

Style / project conventions

  • All new code uses from __future__ import annotations, modern typing, absolute imports, and @dataclass(frozen=True, slots=True) — consistent with the style guide.
  • RowGroupResumePlan and build_row_group_resume_plan extract a non-trivial chunk of logic from _build_async into a pure, testable helper. Net readability win.
  • Docstring on _run_batch is longer than the project's "default to no comments" guidance, but the lifecycle it documents is non-obvious (two ContextVars, asymmetric across call sites — fresh sync sets only current_row_group, resume sets both, preview sets neither). I'd keep it.
  • Inline comment on _index_range_at_offset is similarly load-bearing — the modulo branch behavior is exactly the kind of "would surprise a reader" case the styleguide carves out for.

Minor nits (non-blocking)

  • async_scheduler.py:292row_group_start_offsets or self._build_row_group_start_offsets(row_groups) treats an empty dict as "missing." In practice the resume path always passes a non-empty dict and fresh runs pass None, so this is fine, but if row_group_start_offsets is None else row_group_start_offsets would be slightly more explicit about intent. Not worth a re-spin.
  • async_scheduler.py:1566 uses .get(task.row_group) returning None for missing keys. Defensive, but if the map is built from row_groups (auto-derived) or from RowGroupResumePlan.row_group_start_offsets (covers all remaining IDs), a missing key would indicate a logic bug. Either [task.row_group] (fail-fast) or a comment explaining why None is acceptable would be slightly clearer. Current behavior — falling back to None which the seed generator interprets as "no offset, use existing reader" — is reasonable.
  • dataset_builder.py:657row_group_start_offset=sum(self.batch_manager.num_records_list[:batch_idx]) is recomputed in each loop iteration; O(n^2) over batches. For typical batch counts this is irrelevant, but a running accumulator would be marginally cleaner.

Test coverage

Strong. Coverage spans:

  • Direct unit tests on _index_range_at_offset (offset, cycle-boundary wrap).
  • current_row_group_start_offset end-to-end through generate_from_scratch.
  • build_row_group_resume_plan with holes in completed_ids.
  • Async scheduler propagation, both with caller-supplied offsets and auto-derived offsets across non-aligned row groups.
  • Two end-to-end regression tests against the Resume replays ordered seed rows after completed checkpoints #709 minimal repro: basic resume and the cycle-boundary extension case.

Test refactors (_make_sampler_only_builder, _write_incompatible_config_metadata) replace heavy patch.object stacks with real config-driven setup. Good cleanup, and the unrelated _json/_Path/_ArtifactStorage underscore aliases get folded into normal imports — small but welcome.

Security / performance

No security concerns. No new I/O, no string interpolation against external data, no deserialization. Performance impact negligible: one extra dict.get per task on the async side; one extra IndexRange construction per resumed batch on the sync side.

Verdict

Looks good. The fix targets the reported bug, the design (per-row-group offsets via ContextVar) is the right shape — order-dependent generators stay decoupled from the engine — and test coverage is thorough on both sync and async paths.

Recommend a manual smoke test of fresh async ORDERED seed generation against a real on-disk seed dataset before release to validate the behavioral change called out under "Async fresh-run behavioral change" above. Otherwise no blocking concerns.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 28, 2026

Greptile Summary

This PR fixes ResumeMode.ALWAYS for SamplingStrategy.ORDERED seed datasets: previously the resumed process always re-read from IndexRange.start, duplicating already-written rows. The fix threads each row group's planned start offset through a new current_row_group_start_offset ContextVar so SeedDatasetColumnGenerator can seek to the exact seed row when resuming.

  • _index_range_at_offset computes the correct sub-range within the selection (or full dataset) using modulo-wraparound; the % selected_size == 0 branch safely returns the original full range to restart a fresh cycle instead of producing a degenerate empty range.
  • build_row_group_resume_plan (new public function) snapshots offsets from the original row-group layout so per-group positions stay stable when there are holes in the completed set; the async scheduler accepts these pre-computed offsets and auto-derives them for fresh runs.
  • _run_batch in the sync engine now owns both current_row_group and current_row_group_start_offset inside a single try/finally, and three new end-to-end regression tests cover the core bug, cycle-boundary wraparound, and PartitionBlock resume.

Confidence Score: 5/5

Safe to merge — all changed paths are covered by targeted unit and integration tests, ContextVar tokens are always reset in finally blocks, and the offset arithmetic is validated against three distinct scenarios including cycle wraparound and PartitionBlock.

The fix is mechanically sound: _index_range_at_offset correctly handles the modulo-zero edge case the PR calls out, ContextVar lifetimes are clean in both engines, and build_row_group_resume_plan correctly anchors offsets to the original plan rather than the remaining list. The regression suite covers the exact minimal repro from issue #709, the cycle-boundary branch, and the PartitionBlock path end-to-end. No pre-existing interfaces are broken.

No files require special attention — the most complex logic (_index_range_at_offset wraparound and RowGroupResumePlan offset stability) is directly exercised by the new tests.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py Core fix: generate_from_scratch now seeks ORDERED readers to current_row_group_start_offset on every call; _index_range_at_offset correctly handles full-cycle wraparound (modulo == 0 returns original range instead of degenerate empty range).
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py Extracts row-group resume logic into build_row_group_resume_plan; threads row_group_start_offset through _run_batch with proper try/finally ContextVar lifecycle; offsets are computed from the original plan so they stay stable when there are holes.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Adds _rg_start_offset_map (auto-derived for fresh runs, caller-supplied for resume); sets/resets current_row_group_start_offset per task in _execute_task_inner; removes the implicit shared-reader assumption for fresh ORDERED async runs.
packages/data-designer-engine/src/data_designer/engine/context.py Adds current_row_group_start_offset ContextVar (default None) so ORDERED generators can observe their row group's planned start offset without coupling to the engine.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/progress_tracker.py Adds initial_completed parameter; rate calculation now uses run_completed = completed - _initial_completed so resumed runs report throughput for this run only; next_log_at advances past the initial offset.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_progress_reporter.py Adds scheduled_records parameter to log_start; when provided, uses it directly rather than summing tracker totals, giving correct task-count logging on resume.
packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py Adds three end-to-end regression tests: basic #709 repro, cycle-boundary wraparound, and PartitionBlock resume; also cleans up several tests to use real builds instead of mocked internals.
packages/data-designer-engine/tests/engine/column_generators/generators/test_seed_dataset.py Three new unit tests covering _reset_batch_reader with offset, cycle-boundary wraparound, and the full generate_from_scratch to current_row_group_start_offset flow.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py Two new async tests: one verifying explicit offset injection for resume, one locking in auto-derived offsets for fresh runs; plus a progress-tracker resume test.

Sequence Diagram

sequenceDiagram
    participant DB as DatasetBuilder
    participant RP as build_row_group_resume_plan
    participant AS as AsyncTaskScheduler
    participant CV as ContextVar current_row_group_start_offset
    participant GEN as SeedDatasetColumnGenerator

    Note over DB: Resume path
    DB->>RP: original_target, num_records, buffer_size, completed_ids
    RP-->>DB: RowGroupResumePlan
    DB->>AS: "row_groups=remaining, row_group_start_offsets, initial_completed_records"

    Note over AS: Per task execution
    AS->>CV: set(rg_start_offset_map[task.row_group])
    AS->>GEN: generate_from_scratch(num_records)
    GEN->>CV: "get() -> row_group_start_offset"
    GEN->>GEN: _index_range_at_offset(offset)
    GEN->>GEN: _reset_batch_reader(num_records, record_offset)
    GEN-->>AS: DataFrame at correct seed position
    AS->>CV: reset(token)

    Note over DB: Sync resume path
    DB->>DB: "_run_batch(row_group_start_offset=sum(sizes before batch))"
    DB->>CV: set(row_group_start_offset)
    DB->>GEN: generate_from_scratch(num_records)
    GEN->>GEN: seek to correct seed row
    DB->>CV: reset(token) in finally
Loading

Reviews (4): Last reviewed commit: "fix async resume progress accounting" | Re-trigger Greptile

nabinchha added 3 commits May 28, 2026 13:46
Companion to the existing IndexRange resume test. Locks in correct
behavior when the seed selection comes from PartitionBlock — its
to_index_range produces a contiguous range today, but nothing else
asserts that contract. The test crosses a cycle boundary inside the
partition (4 records over a 2-row partition) so it exercises both
the offset-into-partition branch and the relative_offset == 0
wraparound branch end-to-end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Resume replays ordered seed rows after completed checkpoints

1 participant