Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
591d803
docs: add async scheduling epic UML reference
eric-tramel May 14, 2026
c03bedc
docs: align async scheduling issue map
eric-tramel May 14, 2026
7125435
docs: base async scheduling artifacts in plans
eric-tramel May 14, 2026
0552243
docs: clarify async scheduler control ownership
eric-tramel May 14, 2026
451c1ff
docs: align task admission class flow
eric-tramel May 14, 2026
f617101
Update plans/645/async-scheduling-epic.puml
eric-tramel May 14, 2026
844bfbb
Update plans/645/async-scheduling-epic.puml
eric-tramel May 14, 2026
ad7bb87
docs: add async scheduling source-of-truth plan
eric-tramel May 14, 2026
59dbb47
docs: tighten async scheduling plan contracts
eric-tramel May 14, 2026
d40e27c
feat: implement async scheduling admission control
eric-tramel May 14, 2026
8d1f2e1
refactor async scheduling module ownership
eric-tramel May 18, 2026
ea3d4a0
improve async scheduler idle observability
eric-tramel May 19, 2026
9a8312a
fix request pressure domain metadata (#679)
andreatgretel May 19, 2026
2c4379a
fix: harden async scheduling admission follow-ups (#680)
nabinchha May 19, 2026
387d5a5
fix request waiter deadline admission (#681)
andreatgretel May 19, 2026
0ee8f6d
fix: keep async idle benchmark artifacts in scratch (#683)
andreatgretel May 19, 2026
0d2050b
test: pin custom generator error boundary (#684)
andreatgretel May 19, 2026
70974fd
fix: request admission edge cases (#685)
nabinchha May 19, 2026
05852e4
fix: tighten request controller release semantics (#682)
andreatgretel May 20, 2026
1fcc6b5
chore: remove generated benchmark artifacts
eric-tramel May 20, 2026
d5edf05
chore: remove local benchmark scripts
eric-tramel May 20, 2026
87a2143
chore: restore historical devnote posts
eric-tramel May 20, 2026
74eb432
chore: restore fern image assets
eric-tramel May 20, 2026
bc83ae3
chore: restore latest devnotes index
eric-tramel May 20, 2026
538891c
Merge remote-tracking branch 'origin/epic/645-async-scheduling' into …
eric-tramel May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ venv.bak/
# Local scratch space
.scratch/

# Generated benchmark/report output
/artifacts/
/reports/
/scripts/benchmarks/benchmark_async_scheduling.py
/scripts/benchmarks/export_async_scheduling_perfetto.py
/scripts/benchmarks/generate_async_scheduling_idle_report.py
/scripts/benchmarks/run_async_scheduling_idle_regression.py

docs/notebooks/
docs/notebook_source/*.ipynb
docs/notebook_source/*.csv
Expand Down
6 changes: 3 additions & 3 deletions architecture/dataset-builders.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Preparation (`_prepare_async_run`):
4. Constructs `CompletionTracker`, `RowGroupBufferManager`, `AsyncTaskScheduler`
5. Hooks `ProcessorRunner` for pre-batch and post-batch stages

`AsyncTaskScheduler` runs on a dedicated async loop with frontier-driven dispatch, semaphore-based capacity limits, salvage rounds for failed tasks, and order-dependent locks for columns that must execute sequentially. Ready frontier tasks are admitted through a virtual-time fair queue so one hot column or model-backed generator cannot consume the whole submission window before peer work gets a turn.
`AsyncTaskScheduler` runs on a dedicated async loop with frontier-driven dispatch, task-admission leases, salvage rounds for failed tasks, and order-dependent locks for columns that must execute sequentially. Ready frontier tasks enter `FairTaskQueue`, are selected through virtual-time ordering, and are committed only after `TaskAdmissionController` acquires the required scheduler resources.

### Execution Graph

Expand Down Expand Up @@ -121,7 +121,7 @@ DatasetBuilder.build()
β†’ _prepare_async_run()
β†’ ExecutionGraph.create()
β†’ CompletionTracker.with_graph()
β†’ AsyncTaskScheduler(semaphores, salvage_rounds)
β†’ AsyncTaskScheduler(task admission, fair queue, salvage_rounds)
β†’ scheduler.run()
β†’ for each row group, fairly admit ready tasks from frontier
β†’ tasks execute generators, update CompletionTracker
Expand All @@ -133,7 +133,7 @@ DatasetBuilder.build()

- **Dual execution engines behind one API.** The sequential engine is simpler and easier to debug; the async engine adds row-group parallelism for throughput. Users switch via an environment variable without changing their code.
- **DAG-driven ordering** ensures columns with dependencies (e.g., a judge column that depends on a text column) are generated in the correct order, regardless of the order they appear in the config.
- **Fair async admission** keeps the scheduler flowing across ready columns and model groups. Global semaphores still bound memory/coroutine growth, while per-group virtual-time queues prevent a large ready frontier from degenerating into a column-by-column wave. LLM admission caps are peer-sensitive: a solo model group can fill available global capacity, but once another scheduling group has queued work the saturated group yields until peers get admission slots or admitted tasks complete.
- **Fair async admission** keeps the scheduler flowing across ready columns and model groups. `FairTaskQueue.select_next(...)` chooses eligible ready work, `TaskAdmissionController` leases scheduler resources before spawn, and `FairTaskQueue.commit(...)` removes the selected task only after admission succeeds. Per-group virtual-time ordering prevents a large ready frontier from degenerating into a column-by-column wave, and scheduler-resource accounting remains separate from provider/model request admission.
- **Salvage rounds in async mode** retry failed tasks after all other tasks in a round complete, improving resilience against transient LLM failures without blocking the entire generation.
- **Unified DAG construction.** `topologically_sort_column_configs` (in `execution_graph.py`) determines column ordering using Kahn's algorithm; the runtime `ExecutionGraph` adds strategy-aware dependency tracking for the async scheduler.

Expand Down
28 changes: 14 additions & 14 deletions architecture/models.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Models

The model subsystem provides a unified interface for LLM access: chat completions, embeddings, and image generation. It handles client creation, retry, rate-limit throttling, usage tracking, and MCP tool integration.
The model subsystem provides a unified interface for LLM access: chat completions, embeddings, and image generation. It handles client creation, retry, request admission, usage tracking, and MCP tool integration.

Source: `packages/data-designer-engine/src/data_designer/engine/models/`

Expand All @@ -11,12 +11,12 @@ The model subsystem is layered:
```
ModelRegistry (lazy facade-per-alias)
└── ModelFacade (completion, embeddings, image gen, MCP tool loops)
└── ThrottledModelClient (AIMD rate limiting)
└── ModelRequestExecutor (request admission + provider execution)
└── ModelClient (OpenAI-compatible or Anthropic adapter)
└── RetryTransport (httpx-level retries)
```

Generators never interact with HTTP clients directly. They request a `ModelFacade` by alias from the `ModelRegistry`, which handles lazy construction and shared throttle state.
Generators never interact with HTTP clients directly. They request a `ModelFacade` by alias from the `ModelRegistry`, which handles lazy construction, request-resource canonicalization, and shared adaptive request admission state.

## Key Components

Expand All @@ -31,13 +31,13 @@ Defines the contract: sync/async chat, embeddings, image generation, `supports_*

`create_model_client` routes by provider type to the appropriate adapter. Optionally wraps with:
- **`RetryTransport`** β€” httpx-level retries via `httpx_retries.RetryTransport`. `HttpModelClient` sets `strip_rate_limit_codes=True` for the async client and `False` for the sync client (`http_model_client.py`), which controls whether 429 responses are eligible for transport-layer retries.
- **`ThrottledModelClient`** β€” AIMD (Additive Increase, Multiplicative Decrease) concurrency control per throttle domain.
- **`ModelRequestExecutor`** β€” maps model-call attempts to request-admission items, acquires request leases, invokes the provider client, and releases the exact lease on every terminal path.

### ThrottleManager
### Request Admission

Manages concurrency limits per `ThrottleDomain` (CHAT, EMBEDDING, IMAGE, HEALTHCHECK), keyed by `(provider_name, model_id)`. Thread-safe with a shared lock for sync/async access.
`RequestAdmissionController` manages provider/model/domain request resources. `AdaptiveRequestAdmissionController` adds AIMD (Additive Increase, Multiplicative Decrease) adaptation per `RequestDomain` (`chat`, `embedding`, `image`, `healthcheck`) under the provider/model static cap.

`ThrottledModelClient` wraps each API call in a context manager that acquires/releases throttle capacity and adjusts limits on success (additive increase) or rate-limit errors (multiplicative decrease).
`ModelRequestExecutor` wraps each provider call with a request-admission lease and feeds success or rate-limit outcomes back to the controller. `RequestResourceResolver` owns canonical provider/model/domain identity so aliases that target the same endpoint share request capacity.

### ModelFacade

Expand All @@ -50,7 +50,7 @@ The primary interface for generators. Holds a `ModelConfig`, `ModelClient`, opti

### ModelRegistry

Lazy `ModelFacade` construction per alias. Registers a shared `ThrottleManager` across all facades for coordinated rate limiting. Provides `get_model_usage_stats` and `log_model_usage` for post-build reporting.
Lazy `ModelFacade` construction per alias. Registers shared request-admission state across all facades for coordinated provider/model/domain capacity. Provides `get_model_usage_stats` and `log_model_usage` for post-build reporting.

### Usage Tracking

Expand All @@ -59,18 +59,18 @@ Lazy `ModelFacade` construction per alias. Registers a shared `ThrottleManager`
## Data Flow

1. Generator requests a model by alias from `ModelRegistry`
2. Registry lazily creates `ModelFacade` with the appropriate client and throttle config
2. Registry lazily creates `ModelFacade` with the appropriate client and request-admission executor
3. Generator calls `completion()` with prompt/messages
4. `ModelFacade` builds kwargs, calls `ThrottledModelClient`
5. Throttle layer acquires capacity, delegates to `ModelClient`
4. `ModelFacade` builds kwargs, calls `ModelRequestExecutor`
5. Request admission acquires a provider/model/domain lease, delegates to `ModelClient`
6. `ModelClient` makes the HTTP request through `RetryTransport`
7. Response flows back; usage is tracked; if MCP tools are configured, tool calls are executed and results fed back for another completion round

## Design Decisions

- **Facade pattern** hides HTTP, retry, throttle, and MCP complexity from generators. Generators see `completion()` and get back parsed results.
- **AIMD throttling at the application layer** rather than relying solely on HTTP retries. This provides smoother throughput under rate limits β€” the transport layer still handles many transient failures, while the throttle manager adjusts concurrency to avoid sustained 429 storms.
- **429 handling depends on sync vs async `HttpModelClient`** β€” The async client uses `strip_rate_limit_codes=True`, so 429s are not retried at the transport layer and rate-limit signals reach `ThrottledModelClient` / AIMD quickly. The sync client uses `strip_rate_limit_codes=False`, so 429s may still be retried transparently at the transport layer before surfacing to callers.
- **Facade pattern** hides HTTP, retry, request admission, and MCP complexity from generators. Generators see `completion()` and get back parsed results.
- **AIMD request admission at the application layer** rather than relying solely on HTTP retries. This provides smoother throughput under rate limits: the transport layer still handles many transient failures, while adaptive request admission adjusts concurrency to avoid sustained 429 storms.
- **429 handling depends on sync vs async `HttpModelClient`** β€” The async client uses `strip_rate_limit_codes=True`, so 429s are not retried at the transport layer and rate-limit signals reach `ModelRequestExecutor` / request admission quickly. The sync client uses `strip_rate_limit_codes=False`, so 429s may still be retried transparently at the transport layer before surfacing to callers.
- **Distribution-valued inference parameters** (`temperature`, `top_p` as `UniformDistribution` or `ManualDistribution`) enable controlled randomness across a dataset without per-row config changes.
- **Lazy facade construction** avoids health-checking or connecting to models that are configured but never used in a particular generation run.

Expand Down
6 changes: 3 additions & 3 deletions architecture/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Users declare what their data should look like through config objects (columns,
| `DataDesigner` | `data-designer` | Public API β€” `create()`, `preview()`, `validate()` |
| `DataDesignerConfigBuilder` | `data-designer-config` | Fluent builder for dataset configs |
| `DatasetBuilder` | `data-designer-engine` | Orchestrates generation (sync or async) |
| `ModelFacade` / `ModelRegistry` | `data-designer-engine` | LLM client abstraction with retry, throttle, usage tracking |
| `ModelFacade` / `ModelRegistry` | `data-designer-engine` | LLM client abstraction with retry, request admission, usage tracking |
| `MCPFacade` / `MCPRegistry` | `data-designer-engine` | Tool execution via Model Context Protocol |
| `ColumnGeneratorRegistry` | `data-designer-engine` | Maps column types to generator implementations |
| `PluginRegistry` | `data-designer-config` | Discovers and registers entry-point plugins |
Expand All @@ -44,7 +44,7 @@ Users declare what their data should look like through config objects (columns,

3. **Generation** β€” `DatasetBuilder` instantiates column generators from the registry, then executes one of two paths:
- **Sequential** (default): batch loop over columns in topological order. Each generator produces its column via `CELL_BY_CELL` (threaded fan-out) or `FULL_COLUMN` strategy.
- **Async** (`DATA_DESIGNER_ASYNC_ENGINE=1`): builds an `ExecutionGraph`, partitions rows into groups, and dispatches tasks via `AsyncTaskScheduler` with semaphore-based concurrency, salvage rounds, and per-row-group checkpointing.
- **Async** (`DATA_DESIGNER_ASYNC_ENGINE=1`): builds an `ExecutionGraph`, partitions rows into groups, and dispatches tasks via `AsyncTaskScheduler` with `FairTaskQueue` selection, `TaskAdmissionController` scheduler-resource leases, salvage rounds, and per-row-group checkpointing.

4. **Post-processing** β€” `ProcessorRunner` applies transformations (pre-batch, post-batch, after-generation). Profilers analyze the generated dataset.

Expand All @@ -61,7 +61,7 @@ Users declare what their data should look like through config objects (columns,

- [Config Layer](config.md) β€” builder API, column types, model configs, plugin system
- [Engine Layer](engine.md) β€” compilation, generators, registries
- [Models](models.md) β€” model facade, adapters, retry/throttle
- [Models](models.md) β€” model facade, adapters, retry, request admission
- [Dataset Builders](dataset-builders.md) β€” sync/async orchestration, DAG, batching
- [MCP](mcp.md) β€” tool execution, session pooling
- [Sampling](sampling.md) β€” statistical generators, person/entity data
Expand Down
Loading
Loading