feat: implement async scheduling admission control#661
Conversation
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Resolve the second review pass over plans/645 by making the Markdown spec and UML source agree on task admission, request admission, capacity, telemetry, benchmark, migration, and issue-map contracts. Key updates include canonical event names, richer AsyncCapacityPlan fields, request waiter and cancellation semantics, timed wakeups, retry/salvage lease ordering, and clearer public/internal documentation boundaries. Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
|
MkDocs preview: https://faba4e95.dd-docs-preview.pages.dev Fern preview: https://nvidia-preview-pr-661.docs.buildwithfern.com/nemo/datadesigner
|
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
|
Thanks for putting this together, @eric-tramel — this is a substantial reshaping of the runtime control surfaces and the new module ownership reads cleanly. Here are my thoughts. SummaryThis PR splits runtime control into explicit scheduler task admission ( FindingsWarnings — Worth addressing
from data_designer.engine.dataset_builders.scheduling.resources import stable_task_id
# ...
self._frontier = {task for task in self._frontier if stable_task_id(task) not in wanted}
except Exception:
logger.warning("Admission event sink raised; dropping event", exc_info=True)
return
def acquire_sync(self, item: RequestAdmissionItem) -> RequestAdmissionLease:
try:
asyncio.get_running_loop()
except RuntimeError:
pass
else:
raise RuntimeError(
"acquire_sync would block the running event loop; use acquire_async instead."
)
...
@dataclass
class _EndpointBucket:
aliases: list[str] = field(default_factory=list)
caps: list[int] = field(default_factory=list)
endpoints: dict[tuple[str, str, str], _EndpointBucket] = {}
...
bucket = endpoints.setdefault(endpoint, _EndpointBucket())
bucket.aliases.append(alias)
bucket.caps.append(cap)This drops the
Suggestions — Take it or leave it
Duplication between
What Looks Good
VerdictNeeds changes — the duplicated This review was generated by an AI assistant. |
- add adaptive row-group admission and request-pressure advisory telemetry - add idle regression suite, HTML reports, and Perfetto export tooling - add combined adaptive/request-pressure benchmark guardrails - expand scheduler, request admission, and benchmark tests
Idle-time optimization and observability passThis pass adds a layer on top of the original async scheduling plan PR. The original PR established task admission, request admission, scheduling metadata, correlated observability, and capacity snapshots. This follow-up focused on making scheduler idle time visible, measurable, and tunable. What changed
What improvedLatest quick idle regression result: PASS,
One important caveat: the combined case did not improve wall time in the latest quick run ( Verification run in this pass
Raw timeline/perfetto artifacts remain local; the PR includes the reusable tooling and HTML reports rather than committing the large artifact trees. |
|
Did one more shallow final pass on this, plus an independent Claude cross-review at head A few non-blocking nits / follow-ups worth considering:
Overall I'm comfortable with these as non-blocking, assuming #682 lands before we approve/merge this epic PR. The remaining items look like doc hardening, sink-contract polish, or targeted follow-up tests rather than reasons to hold the whole async scheduling PR. |
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
…scheduling-yolo # Conflicts: # plans/645/module-ownership.md
Greptile SummaryThis PR implements the async scheduling epic (#645) by introducing explicit task admission control, request-admission leases, AIMD-backed capacity management, adaptive row-group admission, and request-pressure advisory scheduling. It replaces legacy throttle-manager code with a new
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | Central scheduler with adaptive row-group admission and request-pressure advisory; logic is generally correct but _adaptive_row_group_block_reason has a condition-ordering bug that misclassifies llm_wait_saturated as queued_llm_demand in a specific edge case. |
| packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py | Core AIMD admission state machine; threading model is sound (lock + Condition for sync, asyncio.Event for async), lease accounting and AIMD decrease/increase logic is correct. Minor: in_flight and active_lease_count are always incremented/decremented together but tracked separately. |
| packages/data-designer-engine/src/data_designer/engine/models/clients/model_request_executor.py | Model-call boundary with admission lease acquire/release; sync and async paths are symmetric except the async BaseException handler always records unexpected_exception while the sync path correctly maps KeyboardInterrupt to local_cancelled. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_admission.py | Task lease accounting and policy delegation; _remember_released bounded-history implementation is correct; BoundedBorrowTaskAdmissionPolicy borrow/debt lifecycle is sound. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py | StrictFair and BoundedBorrow policy implementations look correct; peer-pressure resource detection and debt lifecycle (accumulate on borrow, repay on any group release, clamped to 0) is well-reasoned. |
| packages/data-designer-engine/src/data_designer/engine/observability.py | New typed event contracts for scheduler and request admission; correlated timeline merge is correct; InMemoryAdmissionEventSink suitable for tests. |
| packages/data-designer-engine/src/data_designer/engine/capacity.py | New capacity snapshot dataclasses for configured, runtime, and observed-maxima capacity; clean read-only structures with no logic issues. |
Sequence Diagram
sequenceDiagram
participant Sched as AsyncTaskScheduler
participant TAC as TaskAdmissionController
participant FQ as FairTaskQueue
participant MRE as ModelRequestExecutor
participant RAC as AdaptiveRequestAdmissionController
Sched->>FQ: enqueue(schedulable_tasks)
loop Main dispatch loop
Sched->>FQ: select_next(is_dispatch_eligible)
FQ-->>Sched: QueueSelection
Sched->>TAC: try_acquire(item, queue_view)
TAC-->>Sched: TaskAdmissionLease
Sched->>FQ: commit(selection)
Sched->>Sched: spawn_worker
end
Note over Sched: Worker coroutine
Sched->>MRE: acompletion(request)
MRE->>RAC: acquire_async(item)
RAC-->>MRE: RequestAdmissionLease
MRE->>MRE: call inner model client
MRE->>RAC: release(lease, outcome)
RAC-->>MRE: ReleaseResult
MRE-->>Sched: result
Sched->>TAC: release(task_lease)
Sched->>Sched: wake_event.set()
Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py:919-922
Condition ordering causes "queued_llm_demand" to shadow "llm_wait_saturated". When `llm_available == 0` and `queued_llm == 0` but `queued_total > 0`, `0 <= 0` satisfies the first guard and returns "queued_llm_demand" even though there is no queued LLM demand — the real cause is that all LLM slots are in-flight. The `llm_wait_saturated` branch is therefore unreachable whenever any non-LLM tasks occupy the queue. This produces a misleading diagnostic label in `_row_group_admission_blocked_reasons` telemetry and the scheduler health snapshot.
```suggestion
if llm_available <= 0:
return "llm_wait_saturated"
if llm_available <= queued_llm and queue_view.queued_total > 0:
return "queued_llm_demand"
```
### Issue 2 of 2
packages/data-designer-engine/src/data_designer/engine/models/clients/model_request_executor.py:201-206
The async `BaseException` handler always records `"unexpected_exception"`, while the equivalent sync handler (`_execute_sync_attempt`, line 146) maps `KeyboardInterrupt` to `"local_cancelled"`. The outcome string is persisted in the request-admission domain state (`state.last_outcome`) and emitted in `model_request_completed` events, so the two paths produce inconsistent telemetry for the same interruption scenario. In the async path `asyncio.CancelledError` is already handled above, but a `KeyboardInterrupt` delivered to the event loop would fall here and be misclassified.
```suggestion
except BaseException as exc:
outcome = "local_cancelled" if isinstance(exc, KeyboardInterrupt) else "unexpected_exception"
self._request_admission.release(lease, RequestReleaseOutcome(kind=outcome))
self._emit_model_event(
"model_request_completed", item=item, lease=lease, diagnostics={"outcome": outcome}
)
raise
```
Reviews (1): Last reviewed commit: "Merge remote-tracking branch 'origin/epi..." | Re-trigger Greptile
| if llm_available <= queued_llm and queue_view.queued_total > 0: | ||
| return "queued_llm_demand" | ||
| if llm_available <= 0: | ||
| return "llm_wait_saturated" |
There was a problem hiding this comment.
Condition ordering causes "queued_llm_demand" to shadow "llm_wait_saturated". When
llm_available == 0 and queued_llm == 0 but queued_total > 0, 0 <= 0 satisfies the first guard and returns "queued_llm_demand" even though there is no queued LLM demand — the real cause is that all LLM slots are in-flight. The llm_wait_saturated branch is therefore unreachable whenever any non-LLM tasks occupy the queue. This produces a misleading diagnostic label in _row_group_admission_blocked_reasons telemetry and the scheduler health snapshot.
| if llm_available <= queued_llm and queue_view.queued_total > 0: | |
| return "queued_llm_demand" | |
| if llm_available <= 0: | |
| return "llm_wait_saturated" | |
| if llm_available <= 0: | |
| return "llm_wait_saturated" | |
| if llm_available <= queued_llm and queue_view.queued_total > 0: | |
| return "queued_llm_demand" |
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 919-922
Comment:
Condition ordering causes "queued_llm_demand" to shadow "llm_wait_saturated". When `llm_available == 0` and `queued_llm == 0` but `queued_total > 0`, `0 <= 0` satisfies the first guard and returns "queued_llm_demand" even though there is no queued LLM demand — the real cause is that all LLM slots are in-flight. The `llm_wait_saturated` branch is therefore unreachable whenever any non-LLM tasks occupy the queue. This produces a misleading diagnostic label in `_row_group_admission_blocked_reasons` telemetry and the scheduler health snapshot.
```suggestion
if llm_available <= 0:
return "llm_wait_saturated"
if llm_available <= queued_llm and queue_view.queued_total > 0:
return "queued_llm_demand"
```
How can I resolve this? If you propose a fix, please make it concise.| except BaseException: | ||
| self._request_admission.release(lease, RequestReleaseOutcome(kind="unexpected_exception")) | ||
| self._emit_model_event( | ||
| "model_request_completed", item=item, lease=lease, diagnostics={"outcome": "unexpected_exception"} | ||
| ) | ||
| raise |
There was a problem hiding this comment.
The async
BaseException handler always records "unexpected_exception", while the equivalent sync handler (_execute_sync_attempt, line 146) maps KeyboardInterrupt to "local_cancelled". The outcome string is persisted in the request-admission domain state (state.last_outcome) and emitted in model_request_completed events, so the two paths produce inconsistent telemetry for the same interruption scenario. In the async path asyncio.CancelledError is already handled above, but a KeyboardInterrupt delivered to the event loop would fall here and be misclassified.
| except BaseException: | |
| self._request_admission.release(lease, RequestReleaseOutcome(kind="unexpected_exception")) | |
| self._emit_model_event( | |
| "model_request_completed", item=item, lease=lease, diagnostics={"outcome": "unexpected_exception"} | |
| ) | |
| raise | |
| except BaseException as exc: | |
| outcome = "local_cancelled" if isinstance(exc, KeyboardInterrupt) else "unexpected_exception" | |
| self._request_admission.release(lease, RequestReleaseOutcome(kind=outcome)) | |
| self._emit_model_event( | |
| "model_request_completed", item=item, lease=lease, diagnostics={"outcome": outcome} | |
| ) | |
| raise |
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/models/clients/model_request_executor.py
Line: 201-206
Comment:
The async `BaseException` handler always records `"unexpected_exception"`, while the equivalent sync handler (`_execute_sync_attempt`, line 146) maps `KeyboardInterrupt` to `"local_cancelled"`. The outcome string is persisted in the request-admission domain state (`state.last_outcome`) and emitted in `model_request_completed` events, so the two paths produce inconsistent telemetry for the same interruption scenario. In the async path `asyncio.CancelledError` is already handled above, but a `KeyboardInterrupt` delivered to the event loop would fall here and be misclassified.
```suggestion
except BaseException as exc:
outcome = "local_cancelled" if isinstance(exc, KeyboardInterrupt) else "unexpected_exception"
self._request_admission.release(lease, RequestReleaseOutcome(kind=outcome))
self._emit_model_event(
"model_request_completed", item=item, lease=lease, diagnostics={"outcome": outcome}
)
raise
```
How can I resolve this? If you propose a fix, please make it concise.
📋 Summary
Implements the issue 645 async scheduling epic by splitting runtime control into explicit scheduler task admission and concrete model-request admission, with typed scheduling metadata, AIMD-backed request leases, capacity snapshots, and correlated observability.
This PR now also includes the idle-time optimization pass on top of the original plan PR: adaptive row-group admission, request-pressure-aware task selection, richer scheduler telemetry, Perfetto trace export, and a canonical idle regression/report suite for future scheduler tuning.
🔗 Related Issue
Refs #645
🔄 Changes
✨ Added
SchedulingMetadataand validation in the config package.ModelRequestExecutorandAdaptiveRequestAdmissionControllerfor per-attempt provider/model/domain admission.scripts/benchmarks/run_async_scheduling_idle_regression.pyscripts/benchmarks/generate_async_scheduling_idle_report.pyscripts/benchmarks/export_async_scheduling_perfetto.pyreports/async-scheduling-idle-regression.htmlreports/async-scheduling-idle-analysis.html🔧 Changed
🗑️ Removed
📈 Idle Optimization Evidence
Latest quick idle regression:
PASS (0 errors, 0 warnings),22cases,367checks.10.5%to67.7%llm utilization and reduced frontier/dependency idle from81.3%to14.7%.a_pressuredtoz_openand reduced leased request wait from49.8msto3.2msin the dedicated pressure case.82.4%to84.4%, request utilization from85.6%to89.2%, and request idle by3.7 pp, with193advisory skip events. Wall time did not improve in that combined case, so this is currently a flow/utilization improvement rather than a proven throughput improvement.🔍 Attention Areas
async_scheduler.py— central runtime control flow, lease lifecycle, adaptive row-group admission, request-pressure advisory, and scheduler telemetry.model_request_executor.py— concrete model-call attempt boundary and release outcome classification.controller.py— AIMD request admission state machine and exact request lease accounting.observability.py— scheduler/request event contracts used by benchmark evidence and Perfetto export.benchmark_async_scheduling.py— deterministic benchmark scenarios and derived idle/request metrics.async-scheduling-idle-regression.html— current regression report with adaptation and combined-case figures.async-scheduling-epic-benchmark-report.html— high-level QA and live benchmark report.🧪 Testing
.venv/bin/ruff check packages scripts tests_e2e.venv/bin/ruff format --check packages scripts tests_e2egit diff --checkuv run ruff check scripts/benchmarks/benchmark_async_scheduling.py scripts/benchmarks/generate_async_scheduling_idle_report.py scripts/benchmarks/run_async_scheduling_idle_regression.py packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.pyuv run ruff format --check scripts/benchmarks/benchmark_async_scheduling.py scripts/benchmarks/generate_async_scheduling_idle_report.py scripts/benchmarks/run_async_scheduling_idle_regression.py packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.pyuv run pytest packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py -q— 17 passeduv run python scripts/benchmarks/run_async_scheduling_idle_regression.py --quick ...— PASS, 0 errors, 0 warningsmake testas a single aggregate command was not rerun; equivalent package suites above passed earlier, and focused checks passed for this optimization pass✅ Checklist
Notes
Raw live benchmark traces and large artifact trees remain local because the full artifact tree is large. This PR includes condensed reports and reusable benchmark tooling so reviewers can inspect the evidence and rerun the canonical regression suite without committing raw JSONL/timeline dumps.
Description updated with AI