Skip to content

feat: implement async scheduling admission control#661

Open
eric-tramel wants to merge 25 commits into
epic/645-async-schedulingfrom
scheduling-yolo
Open

feat: implement async scheduling admission control#661
eric-tramel wants to merge 25 commits into
epic/645-async-schedulingfrom
scheduling-yolo

Conversation

@eric-tramel
Copy link
Copy Markdown
Contributor

@eric-tramel eric-tramel commented May 14, 2026

📋 Summary

Implements the issue 645 async scheduling epic by splitting runtime control into explicit scheduler task admission and concrete model-request admission, with typed scheduling metadata, AIMD-backed request leases, capacity snapshots, and correlated observability.

This PR now also includes the idle-time optimization pass on top of the original plan PR: adaptive row-group admission, request-pressure-aware task selection, richer scheduler telemetry, Perfetto trace export, and a canonical idle regression/report suite for future scheduler tuning.

🔗 Related Issue

Refs #645

🔄 Changes

✨ Added

  • Generator-facing SchedulingMetadata and validation in the config package.
  • Engine-side task scheduling bridge, fair ready queue views, and task admission leases.
  • ModelRequestExecutor and AdaptiveRequestAdmissionController for per-attempt provider/model/domain admission.
  • Scheduler/request observability events, runtime correlation, capacity snapshots, and deterministic benchmark harnesses.
  • Adaptive row-group admission for widening the runnable frontier when model capacity is idle and queued model demand is low.
  • Request-pressure advisory selection, which can skip a pressured model task when an eligible open peer exists.
  • Scheduler health snapshots, row-group checkpoint telemetry, advisory skip telemetry, and job-level scheduling diagnostics.
  • Canonical idle regression tooling and reports:
    • scripts/benchmarks/run_async_scheduling_idle_regression.py
    • scripts/benchmarks/generate_async_scheduling_idle_report.py
    • scripts/benchmarks/export_async_scheduling_perfetto.py
    • reports/async-scheduling-idle-regression.html
    • reports/async-scheduling-idle-analysis.html
  • Focused unit/regression tests for scheduling metadata, task admission, fair queue behavior, request admission, model request execution, async scheduler behavior, capacity reporting, idle regression guardrails, and benchmark validation.

🔧 Changed

  • Rewired the async scheduler so root and downstream work use the same ready queue, admission, task lease, worker spawn, and release flow.
  • Routed model clients through request admission and exposed request event sinks at the model-call boundary.
  • Updated architecture docs, devnotes, and Fern docs/assets from throttle-oriented language to request-admission terminology.
  • Tightened issue 645 plan contracts around task admission, request admission, migration cleanup, and capacity vocabulary.
  • Extended the benchmark harness to report total idle, starved idle, frontier/dependency-horizon idle, scheduler queue age, downstream ready gap, burstiness, request utilization, request idle, leased request wait, first model dispatch, and advisory skip counts.
  • Added combined adaptive + request-pressure benchmarking so the two scheduler adaptations are measured together rather than only in isolation.

🗑️ Removed

  • Removed legacy scheduling hint resolver code and tests.
  • Removed transport-level throttle manager/client wrapper code and tests in favor of request admission.

📈 Idle Optimization Evidence

Latest quick idle regression: PASS (0 errors, 0 warnings), 22 cases, 367 checks.

  • Adaptive row groups improved the fixed-low frontier case from 10.5% to 67.7% llm utilization and reduced frontier/dependency idle from 81.3% to 14.7%.
  • Request-pressure advisory changed first dispatch from a_pressured to z_open and reduced leased request wait from 49.8ms to 3.2ms in the dedicated pressure case.
  • Combined adaptive + request-pressure improved llm utilization from 82.4% to 84.4%, request utilization from 85.6% to 89.2%, and request idle by 3.7 pp, with 193 advisory skip events. Wall time did not improve in that combined case, so this is currently a flow/utilization improvement rather than a proven throughput improvement.

🔍 Attention Areas

⚠️ Reviewers: This is intentionally a large PR against the epic branch.

🧪 Testing

  • .venv/bin/ruff check packages scripts tests_e2e
  • .venv/bin/ruff format --check packages scripts tests_e2e
  • git diff --check
  • Config package tests: 570 passed
  • Engine package tests: 1,995 passed
  • Interface package tests: 899 passed, 1 skipped
  • Benchmark schema and derived metric checks passed
  • Stale throttle/scheduling term cleanup gates passed, excluding expected migration/removal references
  • Live GPT-5.5 benchmark lanes: max_parallel, AIMD, short/long, fan-in/fan-out, bottleneck workloads
  • Live GPT-5 Nano 1024+ benchmark lanes: cap scale, AIMD scale, fan scale, mixed GPT-5.5 -> Nano pipeline
  • uv run ruff check scripts/benchmarks/benchmark_async_scheduling.py scripts/benchmarks/generate_async_scheduling_idle_report.py scripts/benchmarks/run_async_scheduling_idle_regression.py packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py
  • uv run ruff format --check scripts/benchmarks/benchmark_async_scheduling.py scripts/benchmarks/generate_async_scheduling_idle_report.py scripts/benchmarks/run_async_scheduling_idle_regression.py packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py
  • uv run pytest packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py -q — 17 passed
  • uv run python scripts/benchmarks/run_async_scheduling_idle_regression.py --quick ... — PASS, 0 errors, 0 warnings
  • make test as a single aggregate command was not rerun; equivalent package suites above passed earlier, and focused checks passed for this optimization pass
  • E2E tests added/updated: N/A for this PR; live provider benchmark artifacts were collected instead

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated
  • Idle regression report and guardrails added

Notes

Raw live benchmark traces and large artifact trees remain local because the full artifact tree is large. This PR includes condensed reports and reusable benchmark tooling so reviewers can inspect the evidence and rerun the canonical regression suite without committing raw JSONL/timeline dumps.


Description updated with AI

eric-tramel and others added 10 commits May 14, 2026 10:46
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Resolve the second review pass over plans/645 by making the Markdown spec and UML source agree on task admission, request admission, capacity, telemetry, benchmark, migration, and issue-map contracts.

Key updates include canonical event names, richer AsyncCapacityPlan fields, request waiter and cancellation semantics, timed wakeups, retry/salvage lease ordering, and clearer public/internal documentation boundaries.

Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 14, 2026

MkDocs preview: https://faba4e95.dd-docs-preview.pages.dev

Fern preview: https://nvidia-preview-pr-661.docs.buildwithfern.com/nemo/datadesigner​

Notebook tutorials are rendered without execution outputs in previews.

Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
@eric-tramel eric-tramel marked this pull request as ready for review May 18, 2026 16:39
@eric-tramel eric-tramel requested a review from a team as a code owner May 18, 2026 16:39
@nabinchha
Copy link
Copy Markdown
Contributor

Thanks for putting this together, @eric-tramel — this is a substantial reshaping of the runtime control surfaces and the new module ownership reads cleanly. Here are my thoughts.

Summary

This PR splits runtime control into explicit scheduler task admission (FairTaskQueue + TaskAdmissionController) and provider/model request admission (AdaptiveRequestAdmissionController + ModelRequestExecutor), retires the old ThrottleManager/ThrottledModelClient stack, adds an observability/capacity surface, and updates architecture docs and the published Fern site to match. The implementation matches the PR description and the contracts laid out under plans/645/.

Findings

Warnings — Worth addressing

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/completion.py:347_stable_task_id duplicates stable_task_id and uses a function-local import

  • What: completion.py defines _stable_task_id(task) with the same hashing recipe as resources.stable_task_id(task), plus an in-function import hashlib.
  • Why: This is both a DRY violation and a STYLEGUIDE.md violation ("Place imports at module level, not inside functions"). It also creates real drift risk: _stable_task_id produces the IDs we compare against in mark_enqueued, while stable_task_id is the one the resolver/scheduler actually attaches to SchedulableTask. If either drifts (e.g. someone changes the hash algorithm), mark_enqueued will silently stop matching and frontier dedup breaks.
  • Suggestion: Drop the local function and import the public one from resources:
from data_designer.engine.dataset_builders.scheduling.resources import stable_task_id
# ...
self._frontier = {task for task in self._frontier if stable_task_id(task) not in wanted}

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/queue.py:93,108blocked accumulator is collected but never read

  • What: FairTaskQueue.select_next builds a blocked: list[tuple[float, int, TaskGroupKey]] and appends ineligible heap entries to it, but the list is never consumed before the function returns.
  • Why: Dead code. It's not flagged by F841 (the list is mutated, just never read), so it'll quietly stick around. It also reads as if it was meant to feed diagnostics or a stable backoff that didn't make it into the PR.
  • Suggestion: Either remove the local entirely if it's leftover from refactoring, or wire it into the _DispatchOutcome/explain_blocked path you already have so callers see which groups got skipped during selection.

packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py:639-640, model_request_executor.py:245-246, dataset_builders/async_scheduler.py:369-370 — Event sink exceptions are silently swallowed

  • What: Three independent paths emit observability events through a try / except Exception: return (or continue).
  • Why: We lose any signal that the sink itself is misbehaving (e.g. a buggy custom RequestAdmissionEventSink that always raises). Given the new benchmark/QA story leans heavily on these events, a stuck sink would be effectively invisible.
  • Suggestion: Log at logging.WARNING once per kind (or with rate limiting), and document the contract on Sink.emit_* ("must not raise; failures are dropped"). At a minimum, attach exc_info=True so users can find the broken sink in their logs.
except Exception:
    logger.warning("Admission event sink raised; dropping event", exc_info=True)
    return

packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py:229-281acquire_async polls every ~50 ms instead of being woken

  • What: The async path loops with self._lock: ... ; wait = self._wait_seconds_locked(...) then await asyncio.sleep(wait) where _wait_seconds_locked clamps wait to a minimum candidate of 0.05. The threading _condition.notify_all() paths only wake acquire_sync waiters; the async coroutine is on asyncio.sleep and isn't notified by them.
  • Why: Under contention, every async lease release adds up to ~50 ms of additional latency before the next waiter sees its assigned lease. The benchmark harness will mask this on hot endpoints, but it's a measurable ceiling on scheduler-driven async traffic, and "wakeup" is in the contracts language but not implemented for asyncio.
  • Suggestion: Add a per-resource asyncio.Event (or a small list of asyncio.Futures) that _admit_waiters_locked and release set when an async waiter is admitted; switch acquire_async to asyncio.wait_for(event.wait(), timeout=wait) so we still bound the wait by deadline but stop polling. Alternatively, document the polling cadence as intentional in plans/645/request-admission.md and pull 0.05 out as a named module constant (e.g. _ASYNC_WAIT_POLL_INTERVAL_S) so it's easy to find and tune.

packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py:196-227acquire_sync will block the event loop if called from async code

  • What: acquire_sync calls self._condition.wait(timeout=wait), which holds a real OS-level wait inside a threading.Condition.
  • Why: There's no enforcement that this method is never reached from an asyncio task. _execute_sync in ModelRequestExecutor is the intended caller, but a future caller (e.g. a custom column generator that re-enters the controller) could deadlock the loop. The async/sync executor split assumes everyone reads the contract.
  • Suggestion: Either rename to acquire_blocking and add a docstring warning, or assert asyncio.get_running_loop() raises (i.e. we're not on a loop) at the top of the method:
def acquire_sync(self, item: RequestAdmissionItem) -> RequestAdmissionLease:
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        pass
    else:
        raise RuntimeError(
            "acquire_sync would block the running event loop; use acquire_async instead."
        )
    ...

packages/data-designer-config/src/data_designer/config/run_config.pyRunConfig.throttle removed with no deprecation path

  • What: ThrottleConfig and RunConfig.throttle are deleted outright. ConfigBase enables extra="forbid", so any user code that still does RunConfig(throttle=...) will fail Pydantic validation with extra inputs are not permitted.
  • Why: This is a real public-API breaking change. The PR description correctly calls it out, but a sudden Pydantic error on first run will be confusing — users will see a validator stack trace, not "throttle has been replaced by request admission".
  • Suggestion: Add a model_validator(mode="before") that detects throttle in the input dict and raises a typed ConfigValidationError (or your equivalent canonical error) with a one-line "Replaced by adaptive request admission; see X." This keeps extra="forbid" honest while giving users a humane error. If we want to be even friendlier, accept and warn for one release. Either way, this should be in the release notes when the epic lands on main.

packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py:166-208 — Endpoint-bucket typing forces runtime isinstance casts

  • What: endpoints: dict[tuple[str, str, str], dict[str, object]] is built up with bucket["aliases"] and bucket["caps"], and the code defends each access with if isinstance(cast_aliases, list): cast_aliases.append(...).
  • Why: The runtime checks are there only to satisfy the loose dict[str, object] typing — they can't actually fail because the only writer is this same function. The result is harder to read than the underlying logic.
  • Suggestion: Lift a small private dataclass:
@dataclass
class _EndpointBucket:
    aliases: list[str] = field(default_factory=list)
    caps: list[int] = field(default_factory=list)

endpoints: dict[tuple[str, str, str], _EndpointBucket] = {}
...
bucket = endpoints.setdefault(endpoint, _EndpointBucket())
bucket.aliases.append(alias)
bucket.caps.append(cap)

This drops the isinstance ladder and tightens the typing without expanding the public surface.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py:1042-1085 — Generic except Exception in task body classifies non-LLM bugs as drop-row events

  • What: When a task raises something that isn't in RETRYABLE_MODEL_ERRORS, the scheduler treats it as a non-retryable drop, logs a warning, and continues. So a KeyError/TypeError/AttributeError from our own scheduler/generator code looks identical to "model returned a JSON that fails validation".
  • Why: That makes legitimate engine bugs invisible behind row drops. The early-shutdown gate will eventually fire if this is widespread, but a one-off bug on a hot column path can quietly drain a large portion of a run.
  • Suggestion: Either bump the existing log to logger.error(..., exc_info=True) for non-retryable, non-ProviderError/Generation*Error types, or add an explicit allowlist of "expected provider/generator errors" and re-raise anything else into a typed DatasetGenerationError that the run knows is internal. Tests already exercise the success/failure mix; a unit test that asserts KeyError from a generator raises rather than silently drops the row would lock this in.

packages/data-designer-config/src/data_designer/config/run_config.py:55-57 — Note glued into the Attributes: section

  • What: The "Request admission is engine-internal in V1 and is not exposed as a public run-config knob." sentence sits inside the Google-style Attributes: block where every other entry documents an attribute.
  • Why: Tools that parse the docstring (and reviewers skimming) will read this as a malformed attribute entry. It also looks like a dangling note from a previous iteration.
  • Suggestion: Move it into a separate Notes: section below Attributes:, or up into the class-level summary paragraph so it's clearly a meta-note, not a field.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py:176-181 — Bounded-borrow release silently discards "credit" for non-borrowing tasks

  • What: BoundedBorrowTaskAdmissionPolicy.on_release always returns negative debt_changes for every leased resource, regardless of whether the lease originally borrowed. _apply_delta clamps to zero, so no debt goes negative — but a task that never borrowed still "repays" debt incurred by someone else.
  • Why: This is probably intentional cross-lease debt repayment (and repay_on_withheld_peer_pressure=True hints at that), but it's not obvious from the code or the test names. A future contributor reading this without architecture/ context could easily change it.
  • Suggestion: Add a one-line comment on on_release explaining "any release in the group repays group-level debt up to zero", and add a test that pins the behaviour: borrow once, release a non-borrowing lease, observe debt drops by the full lease size.

Suggestions — Take it or leave it

packages/data-designer-engine/src/data_designer/engine/observability.py:35-39RuntimeCorrelationProvider.set shadows the builtin

  • What: The method name set shadows Python's set() builtin within method bodies and reads oddly at call sites (runtime_correlation_provider.set(...)).
  • Suggestion: push or set_correlation would be more grep-able. Not a blocker.

packages/data-designer-engine/src/data_designer/engine/models/request_admission/resolver.py:20-40RequestResourceResolver is a stateless one-method class

  • What: The class has no instance state and a single resolve(...) method.
  • Suggestion: A free function resolve_request_resource(...) would convey "no state" more directly. If the plan is to swap implementations later, a Protocol with one method captures that without the class indirection. YAGNI either way.

packages/data-designer-engine/src/data_designer/engine/observability.py:100-101,128-132snapshot/request_resource_key typed as object | None

  • What: SchedulerAdmissionEvent.snapshot, RequestAdmissionEvent.request_resource_key, request_group_key, and pressure_snapshot are all object | None (presumably to avoid circular imports at runtime).
  • Suggestion: With TYPE_CHECKING-only imports you can keep the precise types in static checkers without taking a runtime cost. Sinks consuming these events would then get autocomplete instead of poking at object.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/resources.py:57-61stable_task_id uses SHA-1 where a non-cryptographic hash would do

  • What: We hash a small key string with SHA-1 and truncate to 16 hex chars per dispatch.
  • Suggestion: For task identity only, hashlib.blake2b(raw, digest_size=8).hexdigest() is a touch faster and signals "not cryptographic" by name. Not measurable on its own; mostly a readability nudge.

Duplication between FairTaskQueue and RequestFairQueue (scheduling/queue.py and models/request_admission/queue.py)

  • What: Two virtual-time fair queues with very similar shape but slightly different commit contracts and item types.
  • Suggestion: Wait until a third one shows up before extracting a base class — STYLEGUIDE.md says rule-of-three for DRY. Just flagging that the next reviewer touching either will likely want to skim both.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/completion.py:159-160 — Empty mark_complete body

  • What: The compatibility hook is just a docstring with no body, which works because docstrings count as a no-op statement, but reads as "did the author forget to implement this?".
  • Suggestion: Add an explicit pass or ... so it parses as intentionally empty at a glance.

packages/data-designer-config/src/data_designer/config/scheduling.py:103-118SchedulingMetadataError is defined after SchedulingMetadata

  • What: __post_init__ references SchedulingMetadataError which is defined a few classes below. It works (Python resolves at call time) but it's an unusual layout in this codebase.
  • Suggestion: Move the error class above the dataclass that raises it — mirrors the rest of data_designer.config.errors and makes the file read top-down.

What Looks Good

  • Lease lifecycle hygiene. Both controllers track controller_generation, distinguish duplicate / unknown_lease / stale_lease, and have tests that pin the duplicate-release-doesn't-double-credit invariant. That's exactly the kind of thing that bit the old throttle manager.
  • Two-phase fair queue (select_nextcommit). Non-mutating selection plus a sequence_version check makes "select, fail to admit, retry next eligible group" possible without losing fairness. Clean design.
  • Cancellation correctness in acquire_async. The race where CancelledError arrives after a lease was assigned but before the loop noticed is explicitly handled and tested (test_async_cancellation_after_waiter_assignment_releases_lease). Easy to get wrong; it's right here.
  • Doc + plan alignment. architecture/dataset-builders.md, architecture/models.md, the user-facing docs/concepts/architecture-and-performance.md, and the Fern mirrors all move in lockstep with the code, and plans/645/ documents the contracts at the level of detail this kind of refactor needs.
  • Linter clean. ruff check and ruff format --check pass on every file I touched.

Verdict

Needs changes — the duplicated _stable_task_id (with the in-function import), the dead blocked accumulator, the swallowed observability-sink exceptions, the polling/blocking semantics on acquire_async/acquire_sync, and the unguarded RunConfig.throttle removal are all worth addressing before this lands on main. None are critical blockers individually, but collectively they're the kind of subtle issues that become hard to track down once the epic ships.


This review was generated by an AI assistant.

- add adaptive row-group admission and request-pressure advisory telemetry
- add idle regression suite, HTML reports, and Perfetto export tooling
- add combined adaptive/request-pressure benchmark guardrails
- expand scheduler, request admission, and benchmark tests
@eric-tramel
Copy link
Copy Markdown
Contributor Author

Idle-time optimization and observability pass

This pass adds a layer on top of the original async scheduling plan PR. The original PR established task admission, request admission, scheduling metadata, correlated observability, and capacity snapshots. This follow-up focused on making scheduler idle time visible, measurable, and tunable.

What changed

  • Added adaptive row-group admission in the async scheduler. The scheduler can now start from a narrow admitted frontier and increase the row-group target when llm_wait capacity is idle and queued model demand is low, bounded by the configured hard cap and max-admitted-row guardrails.
  • Added request-pressure advisory task selection. When a candidate task targets a pressured request resource and an eligible peer can run against an open request resource, the scheduler can skip the pressured task for that selection pass and emit request_pressure_advisory_skipped telemetry.
  • Added richer scheduler telemetry:
    • scheduler_job_started
    • scheduler_job_completed
    • scheduler_health_snapshot
    • row_group_checkpointed
    • request_pressure_advisory_skipped
  • Added job-shape and live-health diagnostics to scheduling telemetry: active workers, active/admitted rows, active row groups, target row groups, queued demand by resource, leased/available resources, request pressure, row-group admission block reasons, and advisory skip counts.
  • Added Perfetto export tooling for benchmark timelines so scheduler events, row groups, task leases, request waits, request leases, and counters can be inspected as traces.
  • Added canonical idle regression tooling and reports:
    • scripts/benchmarks/run_async_scheduling_idle_regression.py
    • scripts/benchmarks/generate_async_scheduling_idle_report.py
    • scripts/benchmarks/export_async_scheduling_perfetto.py
    • reports/async-scheduling-idle-analysis.html
    • reports/async-scheduling-idle-regression.html
  • Extended the benchmark harness to report total idle, starved idle, frontier/dependency-horizon idle, scheduler queue age, downstream ready gap, burstiness, request utilization, request idle, request starved idle, leased request wait, first model dispatch, and advisory skip counts.
  • Added a combined adaptive-request-pressure scenario so adaptive row-group admission and request-pressure advisory are tested together instead of only in separate synthetic cases.

What improved

Latest quick idle regression result: PASS, 0 errors, 0 warnings, 22 cases, 367 checks.

  • Adaptive row-group admission materially improved the under-admitted frontier case:
    • fixed-low control: 10.5% llm utilization, 81.3% frontier/dependency idle, 4.48s wall
    • adaptive enabled: 67.7% llm utilization, 14.7% frontier/dependency idle, 0.92s wall
  • Request-pressure advisory improved the dedicated pressure scenario:
    • first model dispatch moved from a_pressured to z_open
    • leased request wait moved from 49.8ms to 3.2ms
    • advisory emitted pressure-skip telemetry (29 skips in the quick run)
  • Combined adaptive + request-pressure improved flow metrics:
    • llm utilization moved from 82.4% to 84.4%
    • request utilization moved from 85.6% to 89.2%
    • request idle improved by 3.7 pp
    • advisory emitted 193 skip events
    • first dispatch moved from a_pressured to z_open

One important caveat: the combined case did not improve wall time in the latest quick run (0.354s control vs 0.371s combined), and leased request wait moved slightly worse (488ms to 515ms). So the combined policy is currently a confirmed utilization/flow improvement, not yet a proven throughput improvement. The new regression/report suite makes that tradeoff explicit so future scheduler tuning can optimize against a stable bar.

Verification run in this pass

  • uv run ruff check scripts/benchmarks/benchmark_async_scheduling.py scripts/benchmarks/generate_async_scheduling_idle_report.py scripts/benchmarks/run_async_scheduling_idle_regression.py packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py
  • uv run ruff format --check scripts/benchmarks/benchmark_async_scheduling.py scripts/benchmarks/generate_async_scheduling_idle_report.py scripts/benchmarks/run_async_scheduling_idle_regression.py packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py
  • uv run pytest packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py -q (17 passed)
  • uv run python scripts/benchmarks/run_async_scheduling_idle_regression.py --quick ... (PASS, 0 errors, 0 warnings)

Raw timeline/perfetto artifacts remain local; the PR includes the reusable tooling and HTML reports rather than committing the large artifact trees.

Comment thread fern/assets/owning-the-model-stack/retry-boundary.png
Comment thread artifacts/645-live-bench-nano/agent-aimd-scale/README.md Outdated
@andreatgretel
Copy link
Copy Markdown
Contributor

Did one more shallow final pass on this, plus an independent Claude cross-review at head 70974fd. No approval-blocking findings from either pass. The follow-up fixes already merged into this branch (#679, #680, #681, #683, #684, #685) look present and sane.

A few non-blocking nits / follow-ups worth considering:

  1. Please fold/merge fix: tighten request controller release semantics #682 before final approval. It fixes the remaining request-controller release semantics I was still worried about: exact release telemetry outcome and AIMD rate-limit ceiling behavior.

  2. BoundedBorrowTaskAdmissionPolicy currently treats borrow debt as group-level debt that any completed lease in the group can repay. That may be intentional, but it is a softer contract than per-borrow/per-lease debt. If we keep it, I'd document that explicitly on the policy config so future tuning does not assume strict per-borrow accounting.

  3. Scheduler event diagnostics may contain Python objects such as TaskGroupKey. In-memory sinks are fine, but JSON-style external sinks could fail and drop events. Either normalizing diagnostics at the emit boundary or documenting the sink contract would make this easier to integrate later.

  4. Adaptive row-group admission appears additive-only: it can raise the target but does not actively shrink it under later sustained pressure. That seems okay as an optimization, but docs/comments should avoid implying full AIMD-style behavior at the row-group layer unless a decrease path is added.

  5. Minor defensive edge cases Claude noticed:

    • _dispatch_selected_task rollback on worker-spawn failure could also undo _dispatched and in_flight_count.
    • The pending_pre_batch continue path could yield if no progress is possible, to avoid a pathological tight loop.
    • It would be useful to add an end-to-end timeout classification test for native adapters raising ProviderError(kind=TIMEOUT, status_code=None).
    • Direct ThrottleConfig imports now fail with a generic missing-symbol error; a friendly migration error would be nicer.

Overall I'm comfortable with these as non-blocking, assuming #682 lands before we approve/merge this epic PR. The remaining items look like doc hardening, sink-contract polish, or targeted follow-up tests rather than reasons to hold the whole async scheduling PR.

@eric-tramel eric-tramel requested a review from nabinchha May 20, 2026 00:29
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
…scheduling-yolo

# Conflicts:
#	plans/645/module-ownership.md
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 20, 2026

Greptile Summary

This PR implements the async scheduling epic (#645) by introducing explicit task admission control, request-admission leases, AIMD-backed capacity management, adaptive row-group admission, and request-pressure advisory scheduling. It replaces legacy throttle-manager code with a new AdaptiveRequestAdmissionController and TaskAdmissionController, adds correlated observability events, and ships canonical idle regression tooling.

  • New admission layer: TaskAdmissionController owns scheduler-level leases and resource accounting; AdaptiveRequestAdmissionController provides AIMD-backed per-domain request leases with fair queuing, cooldown management, and provider-model aggregate caps.
  • Adaptive scheduler extensions: AsyncTaskScheduler gains adaptive row-group admission (widening the runnable frontier when LLM capacity is idle) and request-pressure advisory selection (skipping a pressure-saturated model task when an eligible open peer exists).
  • Observability and benchmarking: Typed SchedulerAdmissionEvent/RequestAdmissionEvent sinks, Perfetto trace export, and a new idle regression harness (22 cases, 367 checks) replace the removed throttle-manager telemetry.

Confidence Score: 4/5

Safe to merge against the epic branch; the two findings are diagnostic/telemetry inaccuracies that do not affect admission control decisions or data correctness.

The admission control logic, AIMD state machine, fair-queue implementation, lease lifecycle, and adaptive row-group admission are all structurally sound. One finding affects the block-reason label emitted to telemetry when LLM capacity is fully leased with no queued LLM demand — the admission still blocks correctly, only the string tag is wrong. The second finding is a missing KeyboardInterrupt-to-local_cancelled mapping in the async model-request path that the sync path handles; it affects only the last_outcome field in the pressure snapshot and the emitted event, not the AIMD limit adjustments.

async_scheduler.py (_adaptive_row_group_block_reason condition ordering) and model_request_executor.py (async BaseException outcome label).

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Central scheduler with adaptive row-group admission and request-pressure advisory; logic is generally correct but _adaptive_row_group_block_reason has a condition-ordering bug that misclassifies llm_wait_saturated as queued_llm_demand in a specific edge case.
packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py Core AIMD admission state machine; threading model is sound (lock + Condition for sync, asyncio.Event for async), lease accounting and AIMD decrease/increase logic is correct. Minor: in_flight and active_lease_count are always incremented/decremented together but tracked separately.
packages/data-designer-engine/src/data_designer/engine/models/clients/model_request_executor.py Model-call boundary with admission lease acquire/release; sync and async paths are symmetric except the async BaseException handler always records unexpected_exception while the sync path correctly maps KeyboardInterrupt to local_cancelled.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_admission.py Task lease accounting and policy delegation; _remember_released bounded-history implementation is correct; BoundedBorrowTaskAdmissionPolicy borrow/debt lifecycle is sound.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py StrictFair and BoundedBorrow policy implementations look correct; peer-pressure resource detection and debt lifecycle (accumulate on borrow, repay on any group release, clamped to 0) is well-reasoned.
packages/data-designer-engine/src/data_designer/engine/observability.py New typed event contracts for scheduler and request admission; correlated timeline merge is correct; InMemoryAdmissionEventSink suitable for tests.
packages/data-designer-engine/src/data_designer/engine/capacity.py New capacity snapshot dataclasses for configured, runtime, and observed-maxima capacity; clean read-only structures with no logic issues.

Sequence Diagram

sequenceDiagram
    participant Sched as AsyncTaskScheduler
    participant TAC as TaskAdmissionController
    participant FQ as FairTaskQueue
    participant MRE as ModelRequestExecutor
    participant RAC as AdaptiveRequestAdmissionController

    Sched->>FQ: enqueue(schedulable_tasks)
    loop Main dispatch loop
        Sched->>FQ: select_next(is_dispatch_eligible)
        FQ-->>Sched: QueueSelection
        Sched->>TAC: try_acquire(item, queue_view)
        TAC-->>Sched: TaskAdmissionLease
        Sched->>FQ: commit(selection)
        Sched->>Sched: spawn_worker
    end

    Note over Sched: Worker coroutine
    Sched->>MRE: acompletion(request)
    MRE->>RAC: acquire_async(item)
    RAC-->>MRE: RequestAdmissionLease
    MRE->>MRE: call inner model client
    MRE->>RAC: release(lease, outcome)
    RAC-->>MRE: ReleaseResult
    MRE-->>Sched: result

    Sched->>TAC: release(task_lease)
    Sched->>Sched: wake_event.set()
Loading
Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py:919-922
Condition ordering causes "queued_llm_demand" to shadow "llm_wait_saturated". When `llm_available == 0` and `queued_llm == 0` but `queued_total > 0`, `0 <= 0` satisfies the first guard and returns "queued_llm_demand" even though there is no queued LLM demand — the real cause is that all LLM slots are in-flight. The `llm_wait_saturated` branch is therefore unreachable whenever any non-LLM tasks occupy the queue. This produces a misleading diagnostic label in `_row_group_admission_blocked_reasons` telemetry and the scheduler health snapshot.

```suggestion
        if llm_available <= 0:
            return "llm_wait_saturated"
        if llm_available <= queued_llm and queue_view.queued_total > 0:
            return "queued_llm_demand"
```

### Issue 2 of 2
packages/data-designer-engine/src/data_designer/engine/models/clients/model_request_executor.py:201-206
The async `BaseException` handler always records `"unexpected_exception"`, while the equivalent sync handler (`_execute_sync_attempt`, line 146) maps `KeyboardInterrupt` to `"local_cancelled"`. The outcome string is persisted in the request-admission domain state (`state.last_outcome`) and emitted in `model_request_completed` events, so the two paths produce inconsistent telemetry for the same interruption scenario. In the async path `asyncio.CancelledError` is already handled above, but a `KeyboardInterrupt` delivered to the event loop would fall here and be misclassified.

```suggestion
        except BaseException as exc:
            outcome = "local_cancelled" if isinstance(exc, KeyboardInterrupt) else "unexpected_exception"
            self._request_admission.release(lease, RequestReleaseOutcome(kind=outcome))
            self._emit_model_event(
                "model_request_completed", item=item, lease=lease, diagnostics={"outcome": outcome}
            )
            raise
```

Reviews (1): Last reviewed commit: "Merge remote-tracking branch 'origin/epi..." | Re-trigger Greptile

Comment on lines +919 to +922
if llm_available <= queued_llm and queue_view.queued_total > 0:
return "queued_llm_demand"
if llm_available <= 0:
return "llm_wait_saturated"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Condition ordering causes "queued_llm_demand" to shadow "llm_wait_saturated". When llm_available == 0 and queued_llm == 0 but queued_total > 0, 0 <= 0 satisfies the first guard and returns "queued_llm_demand" even though there is no queued LLM demand — the real cause is that all LLM slots are in-flight. The llm_wait_saturated branch is therefore unreachable whenever any non-LLM tasks occupy the queue. This produces a misleading diagnostic label in _row_group_admission_blocked_reasons telemetry and the scheduler health snapshot.

Suggested change
if llm_available <= queued_llm and queue_view.queued_total > 0:
return "queued_llm_demand"
if llm_available <= 0:
return "llm_wait_saturated"
if llm_available <= 0:
return "llm_wait_saturated"
if llm_available <= queued_llm and queue_view.queued_total > 0:
return "queued_llm_demand"
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 919-922

Comment:
Condition ordering causes "queued_llm_demand" to shadow "llm_wait_saturated". When `llm_available == 0` and `queued_llm == 0` but `queued_total > 0`, `0 <= 0` satisfies the first guard and returns "queued_llm_demand" even though there is no queued LLM demand — the real cause is that all LLM slots are in-flight. The `llm_wait_saturated` branch is therefore unreachable whenever any non-LLM tasks occupy the queue. This produces a misleading diagnostic label in `_row_group_admission_blocked_reasons` telemetry and the scheduler health snapshot.

```suggestion
        if llm_available <= 0:
            return "llm_wait_saturated"
        if llm_available <= queued_llm and queue_view.queued_total > 0:
            return "queued_llm_demand"
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +201 to +206
except BaseException:
self._request_admission.release(lease, RequestReleaseOutcome(kind="unexpected_exception"))
self._emit_model_event(
"model_request_completed", item=item, lease=lease, diagnostics={"outcome": "unexpected_exception"}
)
raise
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The async BaseException handler always records "unexpected_exception", while the equivalent sync handler (_execute_sync_attempt, line 146) maps KeyboardInterrupt to "local_cancelled". The outcome string is persisted in the request-admission domain state (state.last_outcome) and emitted in model_request_completed events, so the two paths produce inconsistent telemetry for the same interruption scenario. In the async path asyncio.CancelledError is already handled above, but a KeyboardInterrupt delivered to the event loop would fall here and be misclassified.

Suggested change
except BaseException:
self._request_admission.release(lease, RequestReleaseOutcome(kind="unexpected_exception"))
self._emit_model_event(
"model_request_completed", item=item, lease=lease, diagnostics={"outcome": "unexpected_exception"}
)
raise
except BaseException as exc:
outcome = "local_cancelled" if isinstance(exc, KeyboardInterrupt) else "unexpected_exception"
self._request_admission.release(lease, RequestReleaseOutcome(kind=outcome))
self._emit_model_event(
"model_request_completed", item=item, lease=lease, diagnostics={"outcome": outcome}
)
raise
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/models/clients/model_request_executor.py
Line: 201-206

Comment:
The async `BaseException` handler always records `"unexpected_exception"`, while the equivalent sync handler (`_execute_sync_attempt`, line 146) maps `KeyboardInterrupt` to `"local_cancelled"`. The outcome string is persisted in the request-admission domain state (`state.last_outcome`) and emitted in `model_request_completed` events, so the two paths produce inconsistent telemetry for the same interruption scenario. In the async path `asyncio.CancelledError` is already handled above, but a `KeyboardInterrupt` delivered to the event loop would fall here and be misclassified.

```suggestion
        except BaseException as exc:
            outcome = "local_cancelled" if isinstance(exc, KeyboardInterrupt) else "unexpected_exception"
            self._request_admission.release(lease, RequestReleaseOutcome(kind=outcome))
            self._emit_model_event(
                "model_request_completed", item=item, lease=lease, diagnostics={"outcome": outcome}
            )
            raise
```

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants