diff --git a/fern/assets/owning-the-model-stack/request-keying.png b/fern/assets/owning-the-model-stack/throttle-keying.png similarity index 100% rename from fern/assets/owning-the-model-stack/request-keying.png rename to fern/assets/owning-the-model-stack/throttle-keying.png diff --git a/fern/versions/v0.5.8/pages/concepts/architecture-and-performance.mdx b/fern/versions/v0.5.8/pages/concepts/architecture-and-performance.mdx index cc9bd0638..fc0704cda 100644 --- a/fern/versions/v0.5.8/pages/concepts/architecture-and-performance.mdx +++ b/fern/versions/v0.5.8/pages/concepts/architecture-and-performance.mdx @@ -51,7 +51,7 @@ This guide explains the architecture, execution model, and how to tune performan ## Execution Model - The default execution path is the **async engine**, which dispatches work at the cell level and overlaps independent columns — see [Async Engine](#async-engine) below for its semantics. The legacy **sync engine** is still available for one transitional release via `DATA_DESIGNER_ASYNC_ENGINE=0` and is what this section describes. The public configuration knobs documented below (`buffer_size`, `max_parallel_requests`, error handling) apply to both engines; the differences are flagged inline. + The default execution path is the **async engine**, which dispatches work at the cell level and overlaps independent columns — see [Async Engine](#async-engine) below for its semantics. The legacy **sync engine** is still available for one transitional release via `DATA_DESIGNER_ASYNC_ENGINE=0` and is what this section describes. The configuration knobs documented below (`buffer_size`, `max_parallel_requests`, AIMD throttle config, error handling) apply to both engines; the differences are flagged inline. The sync engine processes datasets in **batches**, with **parallel** operations within each batch. @@ -108,12 +108,12 @@ At any moment, the number of concurrent LLM requests is: ```python concurrent_requests = min( buffer_size, # Records in current batch - current_request_limit, # AIMD-managed limit (≤ max_parallel_requests) + current_throttle_limit, # AIMD-managed limit (≤ max_parallel_requests) remaining_cells_in_column # Cells left to generate ) ``` -`max_parallel_requests` sets the **ceiling**. The actual limit (`current_request_limit`) is managed at runtime by adaptive request admission that reacts to rate-limit signals from the inference server: +`max_parallel_requests` sets the **ceiling**. The actual limit (`current_throttle_limit`) is managed at runtime by an AIMD (Additive Increase / Multiplicative Decrease) controller that reacts to rate-limit signals from the inference server: - **On the first 429 in a burst**: the limit is reduced by a configurable factor (default: 25% reduction) and a cooldown is applied. Further 429s from already in-flight requests in the same burst do not reduce the limit again — they release their permits and hold the limit steady. - **After consecutive successes**: the limit increases by 1 (by default) until it reaches the ceiling or a stabilized rate-limit threshold. @@ -121,7 +121,7 @@ concurrent_requests = min( This means Data Designer automatically finds the right concurrency level for your server without manual tuning. - Adaptive request admission is fully active on the default **async engine**. The legacy **sync engine** is available for one transitional release via `DATA_DESIGNER_ASYNC_ENGINE=0`; on that path 429s are first retried at the HTTP transport layer and AIMD only engages as a fallback. See [Async engine](#async-engine) below. + AIMD adaptive concurrency is fully active on the default **async engine**. The legacy **sync engine** is available for one transitional release via `DATA_DESIGNER_ASYNC_ENGINE=0`; on that path 429s are first retried at the HTTP transport layer and AIMD only engages as a fallback. See [Async engine](#async-engine) below. **Example**: With `buffer_size=100` and `max_parallel_requests=32`, Data Designer starts sending up to 32 requests in parallel. If the server returns 429s, concurrency drops automatically (e.g., to 24, then 18) and recovers once the server catches up. @@ -198,7 +198,7 @@ Only resume datasets from trusted artifact directories. Resume reads local `meta ### `max_parallel_requests` (InferenceParams) -Sets the **maximum** concurrent LLM API calls **per model**. This is the ceiling that adaptive request admission can ramp up to — the actual concurrency at runtime may be lower if the server signals rate limits. +Sets the **maximum** concurrent LLM API calls **per model**. This is the ceiling that the AIMD throttle controller can ramp up to — the actual concurrency at runtime may be lower if the server signals rate limits. ```python import data_designer.config as dd @@ -215,7 +215,7 @@ model = dd.ModelConfig( **Default**: 4 -**When to increase**: Your inference backend has high throughput capacity, you're using a cloud API with generous rate limits, or you're running vLLM/TensorRT-LLM with multiple GPUs. With adaptive request admission, setting an aggressively high value is safer than before — the system will self-correct downward if the server can't keep up. The salvage queue on the async engine (default) reclaims failed rows; on the sync engine the initial burst of 429s before AIMD stabilizes can drop rows, so start with a more conservative ceiling if you've opted into sync. +**When to increase**: Your inference backend has high throughput capacity, you're using a cloud API with generous rate limits, or you're running vLLM/TensorRT-LLM with multiple GPUs. With AIMD, setting an aggressively high value is safer than before — the system will self-correct downward if the server can't keep up. The salvage queue on the async engine (default) reclaims failed rows; on the sync engine the initial burst of 429s before AIMD stabilizes can drop rows, so start with a more conservative ceiling if you've opted into sync. **When to decrease**: You want to cap resource usage to a known safe level, or you want more predictable/debuggable execution. @@ -223,7 +223,7 @@ model = dd.ModelConfig( Finding the optimal value The right value depends on your inference stack and model. Self-hosted vLLM servers can often handle values as high as 256, 512, or even 1024 depending on your hardware. -With adaptive request admission, a practical approach is to set `max_parallel_requests` to the **upper bound** you're comfortable with and let the request controller find the sustainable level automatically. If you see frequent 429 → recovery cycles in the logs, your ceiling is above the server's true capacity but the system is handling it. If you never see any request-admission activity, you may have room to increase the ceiling further. +With AIMD, a practical approach is to set `max_parallel_requests` to the **upper bound** you're comfortable with and let the throttle controller find the sustainable level automatically. If you see frequent 429 → recovery cycles in the logs, your ceiling is above the server's true capacity but the system is handling it. If you never see any throttle activity, you may have room to increase the ceiling further. **Benchmark approach**: Run a small dataset (e.g., 100 records) with increasing `max_parallel_requests` values (4 → 8 → 16 → 32 → ...) and measure generation time. Stop increasing when the runtime stops decreasing—that's when your inference server is saturated. @@ -245,9 +245,39 @@ designer.set_run_config(run_config) --- -### Adaptive Request Admission +### Adaptive Throttling (RunConfig) -Data Designer uses AIMD (Additive Increase / Multiplicative Decrease) request admission to automatically adjust concurrency per provider/model/domain based on rate-limit feedback from the inference server. This is an internal runtime controller, not a public `RunConfig` knob. Set `max_parallel_requests` as the user-facing ceiling and inspect `AsyncCapacityPlan`/logs to understand the effective runtime limits. +Data Designer uses an AIMD (Additive Increase / Multiplicative Decrease) controller to automatically adjust concurrency per model based on rate-limit feedback from the inference server. The defaults work well for most workloads. Override them via `ThrottleConfig` only when you understand the trade-offs. + + + Adaptive throttling is fully active on the default **async engine**, where 429 responses propagate directly to the AIMD controller. On the legacy **sync engine** (`DATA_DESIGNER_ASYNC_ENGINE=0`), 429s are first retried at the HTTP transport layer; `ThrottleConfig` settings only take effect as a fallback if transport retries are exhausted. + + +```python +import data_designer.config as dd +from data_designer.interface import DataDesigner + +run_config = dd.RunConfig( + throttle=dd.ThrottleConfig( + reduce_factor=0.75, # Multiply limit by this on a 429 (default: 0.75) + additive_increase=1, # Add this many slots after success_window successes (default: 1) + success_window=25, # Consecutive successes before increasing (default: 25) + cooldown_seconds=2.0, # Pause after a 429 when no Retry-After header (default: 2.0) + ceiling_overshoot=0.10, # Probe 10% above observed server limit (default: 0.10) + ), +) + +designer = DataDesigner() +designer.set_run_config(run_config) +``` + +| Parameter | Default | Effect | +|-----------|---------|--------| +| `reduce_factor` | 0.75 | How aggressively to cut concurrency on a 429. Lower = more aggressive. | +| `additive_increase` | 1 | Slots added per recovery step. Higher = faster ramp-up, but riskier. | +| `success_window` | 25 | Consecutive successes required before each increase step. | +| `cooldown_seconds` | 2.0 | Pause duration after a 429 (used when the server doesn't send `Retry-After`). | +| `ceiling_overshoot` | 0.10 | Fraction above the observed rate-limit ceiling the controller is allowed to probe. | How it works in practice @@ -283,11 +313,11 @@ designer.set_run_config(run_config) ## Async Engine -The async engine is the default execution path. It dispatches work at the cell level rather than the column level, so independent columns overlap in time and provider/model/domain request resources tune themselves independently. See the [Async All the Way Down](/dev-notes/async-all-the-way-down) dev note for the full architecture. +The async engine is the default execution path. It dispatches work at the cell level rather than the column level, so independent columns overlap in time and per-(provider, model) AIMD pools tune themselves independently. See the [Async All the Way Down](/dev-notes/async-all-the-way-down) dev note for the full architecture. ### Per-model timeouts drive every deadline -The `inference_parameters.timeout` field on a `ModelConfig` sets the per-request HTTP timeout. The same value also drives the sync→async bridge that custom columns use when they call `model.generate()`. There is no separate queue-wait deadline — waits scale with provider speed and adaptive request admission. Slow self-hosted endpoints (e.g. large models on a single GPU) only need this one knob raised: +The `inference_parameters.timeout` field on a `ModelConfig` sets the per-request HTTP timeout. The same value also drives the sync→async bridge that custom columns use when they call `model.generate()`. There is no separate queue-wait deadline — waits scale with provider speed and AIMD's adaptive concurrency. Slow self-hosted endpoints (e.g. large models on a single GPU) only need this one knob raised: ```python import data_designer.config as dd @@ -336,8 +366,8 @@ DATA_DESIGNER_ASYNC_ENGINE=0 python my_pipeline.py | Problem | Symptom | Solution | |---------|---------|----------| -| **Low throughput** | Low GPU utilization | Increase `max_parallel_requests` and/or `buffer_size`. If request admission has self-reduced due to earlier 429s (check logs for "concurrency reduced" messages), the server may need more capacity or you can wait for AIMD recovery. | -| **Frequent 429 → recovery cycles** | Logs show repeated concurrency drops and ramp-ups | The `max_parallel_requests` ceiling is above the server's sustained capacity. This is handled automatically, but you can lower the ceiling to reduce the sawtooth. | +| **Low throughput** | Low GPU utilization | Increase `max_parallel_requests` and/or `buffer_size`. If the throttle has self-reduced due to earlier 429s (check logs for "concurrency reduced" messages), the server may need more capacity or you can wait for AIMD recovery. | +| **Frequent 429 → recovery cycles** | Logs show repeated concurrency drops and ramp-ups | The `max_parallel_requests` ceiling is above the server's sustained capacity. This is handled automatically, but you can lower the ceiling to reduce the sawtooth or tune `reduce_factor` / `success_window`. | | **Long tail of slow generations** | Most records fast, few very slow | Reduce `max_conversation_restarts`, simplify schemas, improve prompts | | **Multi-model idle periods** | One model busy, others idle | Reduce `buffer_size` for faster cycling, or consolidate models | | **Memory errors** | OOM crashes | Reduce `buffer_size` and `max_parallel_requests` | @@ -350,7 +380,7 @@ DATA_DESIGNER_ASYNC_ENGINE=0 python my_pipeline.py 1. **Start with defaults** for initial development — AIMD handles rate-limit adaptation automatically 2. **Profile your workload**: How many LLM columns? How many records? What models? 3. **Identify bottleneck**: Low GPU util → increase `max_parallel_requests` (AIMD will self-correct if you overshoot). Memory issues → decrease `buffer_size`. Long tails → tune retry settings. -4. **Check request-admission logs**: Look for "concurrency reduced" / "concurrency increased" messages to understand whether rate limits are the bottleneck +4. **Check throttle logs**: Look for "concurrency reduced" / "concurrency increased" messages to understand whether rate limits are the bottleneck 5. **Iterate**: Make one change at a time, measure impact before next change --- diff --git a/fern/versions/v0.5.8/pages/devnotes/index.mdx b/fern/versions/v0.5.8/pages/devnotes/index.mdx index fa5545d2c..2bab4a5fd 100644 --- a/fern/versions/v0.5.8/pages/devnotes/index.mdx +++ b/fern/versions/v0.5.8/pages/devnotes/index.mdx @@ -44,7 +44,7 @@ Welcome to NeMo Data Designer Dev Notes — in-depth guides, benchmark write-ups } diff --git a/fern/versions/v0.5.8/pages/devnotes/posts/async-all-the-way-down.mdx b/fern/versions/v0.5.8/pages/devnotes/posts/async-all-the-way-down.mdx index c81f1551d..631d1ab18 100644 --- a/fern/versions/v0.5.8/pages/devnotes/posts/async-all-the-way-down.mdx +++ b/fern/versions/v0.5.8/pages/devnotes/posts/async-all-the-way-down.mdx @@ -58,7 +58,7 @@ Getting this right required solving three problems at different levels of the st
-![Three layers: AsyncTaskScheduler for dependency dispatch, row-group admission for memory, and request admission for provider capacity](/assets/async-all-the-way-down/architecture-layers.svg) +![Three layers: AsyncTaskScheduler for dependency dispatch, row-group admission for memory, ThrottleManager for rate limits](/assets/async-all-the-way-down/architecture-layers.svg)
@@ -66,42 +66,43 @@ Getting this right required solving three problems at different levels of the st At the top sits the `AsyncTaskScheduler`. It builds an `ExecutionGraph` from your column configs using [Kahn's algorithm](https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm) for topological ordering, then tracks per-cell completion via a `CompletionTracker`. When a cell completes, the tracker determines which downstream cells are now ready and pushes them onto the dispatch queue. -The scheduler maintains a *frontier* — the set of tasks whose inputs are all satisfied. Dispatch is a loop: enqueue ready tasks, ask `FairTaskQueue` to select the next eligible task, acquire a scheduler-resource lease from `TaskAdmissionController`, commit the queue selection, then spawn a worker. When the worker completes, mark the cell done, release the lease, and make any newly-ready downstream tasks visible. The loop runs until every cell in every row group has completed or been dropped. +The scheduler maintains a *frontier* — the set of tasks whose inputs are all satisfied. Dispatch is a loop: pull ready tasks from the frontier, acquire a [semaphore](https://en.wikipedia.org/wiki/Semaphore_(programming)) slot, spawn a worker. When the worker completes, mark the cell done, which may add new tasks to the frontier. The loop runs until every cell in every row group has completed or been dropped. -There's a subtlety in how the scheduler manages task-stage pressure. Scheduler resources and provider request resources are separate. The scheduler decides which dependency-ready cell may become a running worker; `ModelRequestExecutor` and request admission decide when an individual provider call may execute. Keeping those ledgers separate prevents provider cooldown or request backpressure from masquerading as DAG readiness. +There's a subtlety in how the scheduler manages its task slots, and getting it right required a delicate dance between two semaphores. A naïve approach would hold a submission slot for the entire lifetime of a task. That's fine for the outbound HTTP call — the slot is released before the request goes out. But the `ThrottleManager` can impose an internal timeout while waiting for a permit during AIMD cooldown, and *that* wait would hold the submission slot hostage. If enough tasks are blocked waiting for throttle permits, the scheduler can't dispatch new work even when the frontier has ready tasks. -The durable scheduler contract is selection plus admission plus commit: +The fix is a one-way semaphore handoff. The scheduler maintains two pools: a *submission* semaphore that caps how many tasks can be dispatched, and an *LLM-wait* semaphore (sized larger) for tasks that are blocked on a model call. When a task is about to call the model, it acquires an LLM-wait slot and releases its submission slot in the same atomic operation — stepping from one pool to the other mid-flight. The dispatch loop immediately sees a free submission slot and can send another task. When the LLM responds, the LLM-wait slot is released. Non-LLM generators (samplers, Jinja expressions) skip the handoff and hold their submission slot for the full duration, which is fine because they complete quickly. ```py -selection = queue.select_next(lambda item, view: admission.is_eligible(item, view)) -decision = admission.try_acquire(selection.item, selection.queue_view) -committed = queue.commit(selection) -spawn_worker(committed, decision) +if is_llm_bound: + await self._llm_wait_semaphore.acquire() + holds_llm_wait = True + self._submission_semaphore.release() + holds_submission = False ``` -This keeps ready-work ordering, scheduler-resource accounting, and request-capacity adaptation explicit. `SchedulingMetadata` describes static generator pressure, `TaskSchedulingResolver` turns it into scheduler inputs, `FairTaskQueue` handles ready ordering, and `TaskAdmissionController` owns leases for scheduler resources. +This keeps the dispatch loop saturated without unbounded coroutine growth — the submission semaphore controls how fast tasks enter, and the LLM-wait semaphore controls how many are waiting on the network. ### **Layer 2: Row-group admission** -Below the scheduler, the 10,000 rows you requested aren't all in memory at once. They're partitioned into row groups that checkpoint to parquet independently. Row-group admission limits how many groups are in flight simultaneously, preventing memory from growing unboundedly on large runs. +Below the scheduler, the 10,000 rows you requested aren't all in memory at once. They're partitioned into row groups that checkpoint to parquet independently. A semaphore limits how many row groups are in flight simultaneously, preventing memory from growing unboundedly on large runs. When a row group completes — all columns generated for all its rows — the buffer manager flushes it to disk and releases the memory. Partial results appear on disk during generation. If something fails, you keep everything that already checkpointed. This is also the basis for fault tolerance, discussed later — the unit of loss is a row group, not the entire run. ### **Layer 3: Adaptive rate limiting** -At the bottom, each provider/model/request-domain resource gets [additive-increase, multiplicative-decrease (AIMD)](https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease) request admission. When the provider returns a 429, `AdaptiveRequestAdmissionController` reduces admitted concurrency for that request resource. On streaks of successful requests, it gradually increases. Because this happens per request resource, a judge model running on one provider can saturate its endpoint while a generator on another provider is backing off. The [Owning the Model Stack](/dev-notes/owning-the-model-stack) dev note covers this layer in depth. +At the bottom, each (provider, model) pair gets an independent concurrency pool with [additive-increase, multiplicative-decrease (AIMD)](https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease) rate adaptation. When the provider returns a 429, the pool cuts its concurrency. On streaks of successful requests, it gradually increases. Because this happens per-model, a judge model running on one provider can saturate its endpoint while a generator on another provider is backing off. The [Owning the Model Stack](/dev-notes/owning-the-model-stack) dev note covers this layer in depth. ### **How they compose** -The layers are independent. The scheduler decides *what* to run next. The row-group layer decides *how much* to keep in memory at once. The request-admission layer discovers *how fast* each provider will accept requests. No layer needs to know about the others. +The layers are independent. The scheduler decides *what* to run next. The row-group layer decides *how much* to keep in memory at once. The throttle layer discovers *how fast* each provider will accept requests. No layer needs to know about the others. A single task's lifecycle makes the composition concrete:
- Task lifecycle: Frontier to fair queue to task admission to generator to request admission to LLM call to complete, with downstream cells looping back to the frontier + Task lifecycle: Frontier → Dispatch → Generator → Throttle → LLM Call → Complete, with downstream cells looping back to the frontier
-A cell enters the frontier when its upstream dependencies are satisfied. The scheduler enqueues it, selects it through `FairTaskQueue`, acquires a `TaskAdmissionLease`, commits the selection, and hands the task to a worker. The worker runs the generator; any model call goes through `ModelRequestExecutor`, which acquires and releases the request-admission lease around provider execution. On completion, leases are released, the cell is marked done in the `CompletionTracker`, and any downstream cells whose dependencies are now satisfied enter the frontier. The cycle continues until every cell has completed or been dropped. +A cell enters the frontier when its upstream dependencies are satisfied. The dispatch loop acquires a submission slot and hands it to a worker. The worker runs the generator, which acquires a throttle permit before making the LLM call. On completion, permits are released, the cell is marked done in the `CompletionTracker`, and any downstream cells whose dependencies are now satisfied enter the frontier. The cycle continues until every cell has completed or been dropped. --- @@ -136,13 +137,13 @@ The pattern is clear: speedup scales with the amount of parallelism available in The **narrow** workload is a sequential chain with no cross-column parallelism. The async engine still ekes out a small gain from overlapping row-level dispatch, but there's no structural parallelism to exploit. This is expected: async can't speed up a fundamentally serial pipeline. -The **dual-model** workload is the most interesting case. Three generation columns use one model, and three judge columns use another. Each model maps to its own request-admission resources. The judge model starts processing rows as soon as the first generator finishes, running at full concurrency while the generator is still producing. In the sync engine, all generation has to finish before any judging starts. +The **dual-model** workload is the most interesting case. Three generation columns use one model, and three judge columns use another. Each model gets its own ThrottleManager pool. The judge model starts processing rows as soon as the first generator finishes, running at full concurrency while the generator is still producing. In the sync engine, all generation has to finish before any judging starts. ### **At higher record counts** -The benchmarks above use 10 records deliberately — small batches isolate the scheduling benefit from rate-limit effects. At higher record counts, the bottleneck shifts. The async engine dispatches requests more aggressively, which means it discovers the provider's rate limits sooner. When a 429 hits, adaptive request admission backs off, and the backoff can cascade through downstream columns that were waiting on the rate-limited model's output. +The benchmarks above use 10 records deliberately — small batches isolate the scheduling benefit from rate-limit effects. At higher record counts, the bottleneck shifts. The async engine dispatches requests more aggressively, which means it discovers the provider's rate limits sooner. When a 429 hits, the AIMD controller backs off, and the backoff can cascade through downstream columns that were waiting on the throttled model's output. -This is where per-model request resources become important. Single-model pipelines are most susceptible to cascading backoff because all columns compete for the same provider capacity. Multi-model pipelines hold up well because each model adapts independently — a 429 on the generator model doesn't slow down the judge. In our larger runs, dual-model and multi-provider workloads consistently showed the largest async gains. +This is where the per-model throttle pools become important. Single-model pipelines are most susceptible to cascading backoff because all columns compete for the same pool. Multi-model pipelines hold up well because each model adapts independently — a 429 on the generator model doesn't slow down the judge. In our larger runs, dual-model and multi-provider workloads consistently showed the largest async gains. The primary tuning lever is `max_parallel_requests` per model. Set it to a generous upper bound and let AIMD find the real ceiling. See the [Owning the Model Stack](/dev-notes/owning-the-model-stack) dev note for the full story on adaptive concurrency. @@ -178,7 +179,7 @@ The performance numbers are satisfying, but raw throughput is only part of the p ### **Progress you can see** -Because rows complete out of order and row groups checkpoint independently, results start appearing on disk within seconds. The new progress bars — sticky ANSI bars that redraw in-place at the bottom of the terminal — update on every task completion rather than waiting for a full column to finish. Log messages from the scheduler and request-admission layer render above the bars, so you see both the high-level progress and the per-event detail. A 10-minute generation run no longer means staring at nothing until the end. +Because rows complete out of order and row groups checkpoint independently, results start appearing on disk within seconds. The new progress bars — sticky ANSI bars that redraw in-place at the bottom of the terminal — update on every task completion rather than waiting for a full column to finish. Log messages from the scheduler and throttle layer render above the bars, so you see both the high-level progress and the per-event detail. A 10-minute generation run no longer means staring at nothing until the end. ``` column 'topic' ████████████████████████████████████░░░░ 89% | 890/1000 | 148.3 rec/s | eta 1s | 0 failed @@ -187,13 +188,13 @@ Because rows complete out of order and row groups checkpoint independently, resu column 'analysis' ██████████████░░░░░░░░░░░░░░░░░░░░░░░░░░ 35% | 350/1000 | 87.5 rec/s | eta 7s | 1 failed ``` -When tracing is enabled (`DATA_DESIGNER_ASYNC_TRACE=1` or `RunConfig(async_trace=True)`), the scheduler also records a `TaskTrace` for every task: when it was dispatched, when it acquired scheduler resources, when it completed, and its status. These traces are available on the result object after the run, so you can reconstruct the scheduler's timeline and understand where time was spent. +When tracing is enabled (`DATA_DESIGNER_ASYNC_TRACE=1` or `RunConfig(async_trace=True)`), the scheduler also records a `TaskTrace` for every task: when it was dispatched, when it acquired a semaphore slot, when it completed, and its status. These traces are available on the result object after the run, so you can reconstruct the scheduler's timeline and understand where time was spent. ### **Fault tolerance** Failures in a long-running pipeline are not exceptional — they're expected. Model endpoints return 429s, connections time out, prompts produce unparseable output. The scheduler classifies errors into two buckets. -Retryable errors (rate limits, timeouts, transient server errors) are deferred rather than dropped. The task stays on the frontier so it can be re-attempted. If a row group stalls — all of its pending tasks are deferred and nothing is in flight — the scheduler detects the deadlock and runs *salvage rounds*: it re-dispatches the deferred tasks inline, up to a configurable maximum number of attempts. Tasks that still fail after salvage are dropped, and the row group is checkpointed with whatever succeeded. This prevents a stalled row group from holding row-group admission capacity forever and blocking admission of new row groups. +Retryable errors (rate limits, timeouts, transient server errors) are deferred rather than dropped. The task stays on the frontier so it can be re-attempted. If a row group stalls — all of its pending tasks are deferred and nothing is in flight — the scheduler detects the deadlock and runs *salvage rounds*: it re-dispatches the deferred tasks inline, up to a configurable maximum number of attempts. Tasks that still fail after salvage are dropped, and the row group is checkpointed with whatever succeeded. This prevents a stalled row group from holding its semaphore slot forever and blocking admission of new row groups. Non-retryable errors (malformed output, validation failures) drop the row immediately. The `CompletionTracker` knows which downstream tasks depended on that row and removes them from the frontier, so no work is wasted on a row that's already lost. @@ -201,7 +202,7 @@ In both cases, completed row groups are already on disk. The unit of loss is at ### **Multi-model concurrency** -Multi-model pipelines are where the architecture pays for itself. With independent request resources per model, there's no reason not to use the right model for each job: a reasoning model for generation, a smaller model for judging, an embedding model for deduplication, each running at its own optimal concurrency. The previous engine supported multi-model configs, but running them concurrently is what makes them practical at scale. +Multi-model pipelines are where the architecture pays for itself. With independent throttle pools per model, there's no reason not to use the right model for each job: a reasoning model for generation, a smaller model for judging, an embedding model for deduplication, each running at its own optimal concurrency. The previous engine supported multi-model configs, but running them concurrently is what makes them practical at scale. ### **Adoption** @@ -213,7 +214,7 @@ Adoption is opt-in. Set `DATA_DESIGNER_ASYNC_ENGINE=1` in your environment. Your This was a ground-up rebuild of the execution layer, delivered across six PRs over four weeks. -It started with the data structures: `ExecutionGraph`, `CompletionTracker`, and task models ([#356](https://github.com/NVIDIA-NeMo/DataDesigner/pull/356)). Next came the generator migration ([#378](https://github.com/NVIDIA-NeMo/DataDesigner/pull/378)), where we added symmetric `generate()`/`agenerate()` bridging so every generator works in both modes without rewriting. The core scheduler and buffer manager followed in [#404](https://github.com/NVIDIA-NeMo/DataDesigner/pull/404), then integration into `DatasetBuilder` with callbacks and trace export ([#429](https://github.com/NVIDIA-NeMo/DataDesigner/pull/429)). The historical predecessor to request admission and task-stage capacity control landed in [#449](https://github.com/NVIDIA-NeMo/DataDesigner/pull/449), wiring AIMD concurrency control into every outbound model request. A final polish pass ([#456](https://github.com/NVIDIA-NeMo/DataDesigner/pull/456)) added async preview, unified lifecycle callbacks, and sticky ANSI progress bars. +It started with the data structures: `ExecutionGraph`, `CompletionTracker`, and task models ([#356](https://github.com/NVIDIA-NeMo/DataDesigner/pull/356)). Next came the generator migration ([#378](https://github.com/NVIDIA-NeMo/DataDesigner/pull/378)), where we added symmetric `generate()`/`agenerate()` bridging so every generator works in both modes without rewriting. The core scheduler and buffer manager followed in [#404](https://github.com/NVIDIA-NeMo/DataDesigner/pull/404), then integration into `DatasetBuilder` with callbacks and trace export ([#429](https://github.com/NVIDIA-NeMo/DataDesigner/pull/429)). The `ThrottledModelClient` and dual-semaphore scheduler landed in [#449](https://github.com/NVIDIA-NeMo/DataDesigner/pull/449), wiring AIMD concurrency control into every outbound model request. A final polish pass ([#456](https://github.com/NVIDIA-NeMo/DataDesigner/pull/456)) added async preview, unified lifecycle callbacks, and sticky ANSI progress bars. The symmetric bridging was critical for adoption. Every `ColumnGenerator` has both a `generate()` and an `agenerate()` method. Implement one, and the base class synthesizes the other: @@ -276,7 +277,7 @@ Key Resources: 1. [NeMo Data Designer on GitHub](https://github.com/NVIDIA-NeMo/DataDesigner) 2. [Data Designer Documentation](/getting-started/welcome) -3. [Owning the Model Stack: Adaptive Concurrency](/dev-notes/owning-the-model-stack) — companion dev note on the native client layer and AIMD request admission +3. [Owning the Model Stack: Adaptive Concurrency](/dev-notes/owning-the-model-stack) — companion dev note on the native client layer and AIMD throttling 4. [Async Engine Plan (#346)](https://github.com/NVIDIA-NeMo/DataDesigner/issues/346) — original issue and architecture plan *Want to learn more about NeMo Data Designer? Check out our [documentation](/getting-started/welcome) and start building your own synthetic data pipelines today.* diff --git a/fern/versions/v0.5.8/pages/devnotes/posts/owning-the-model-stack.mdx b/fern/versions/v0.5.8/pages/devnotes/posts/owning-the-model-stack.mdx index 0eec884c5..ad9136362 100644 --- a/fern/versions/v0.5.8/pages/devnotes/posts/owning-the-model-stack.mdx +++ b/fern/versions/v0.5.8/pages/devnotes/posts/owning-the-model-stack.mdx @@ -10,11 +10,11 @@ import { Authors } from "@/components/Authors"; Picture this: you're generating a million-record dataset. Thirty two concurrent requests per model, three models in the pipeline, two providers. Everything hums along for the first ten minutes — then one provider starts returning 429s, your retry logic kicks in, and suddenly you're in a feedback loop where retries *cause* more 429s. The run stalls. You restart with lower concurrency, waste throughput for hours, and wonder if there's a better way. -There is. This post is about the native model client layer we built with adaptive request admission (a system that discovers provider capacity at runtime) replacing our dependency on LiteLLM along the way. +There is. This post is about the native model client layer we built with adaptive throttling (a system that discovers provider capacity at runtime) replacing our dependency on LiteLLM along the way. {/* more */} -![From chaotic request flow to calibrated concurrency via adaptive request admission](/assets/owning-the-model-stack/native-model-client-hero.png) +![From chaotic request flow to calibrated concurrency via adaptive throttling](/assets/owning-the-model-stack/native-model-client-hero.png) ## **Why We Made the Move** @@ -40,9 +40,9 @@ From top to bottom: 1. **ModelFacade**: orchestrates correction loops, MCP tool-calling, and usage tracking. This is the public API. Column generators talk to this layer, and it was untouched during the migration. If you've written a Data Designer pipeline, nothing about your code changes. -2. **ModelRequestExecutor**: the request execution layer. It maps every outbound model call to a provider/model/domain request resource, acquires a request-admission lease before provider execution, releases it on every terminal path, and feeds the outcome (success, 429, or error) back to request admission. +2. **ThrottledModelClient**: the new layer. It's a decorator around `HttpModelClient` — same `ModelClient` protocol, but every outbound call is wrapped with a throttle permit: acquire a concurrency slot before the call, release it after, and feed the outcome (success, 429, or error) back to `ThrottleManager`. This is where adaptive throttling lives. -3. **AdaptiveRequestAdmissionController**: the Additive Increase / Multiplicative Decrease (AIMD) controller used by `ModelRequestExecutor`. A single shared controller is created at pipeline startup and shared across all model clients. It owns the mutable request-admission state — per-domain AIMD counters, global caps, cascade dampening, and cooldown timers. +3. **ThrottleManager**: the Additive Increase / Multiplicative Decrease (AIMD) controller that `ThrottledModelClient` delegates to. A single instance is created at pipeline startup and shared across all model clients. It owns all the mutable concurrency state — per-domain AIMD counters, global caps, cascade dampening, and cooldown timers. 4. **HttpModelClient**: an abstract base class that defines the interface for all provider adapters. It owns the shared `httpx` transport lifecycle — connection pooling, timeouts, and transport-level retries for transient failures (502, 503, 504). Boring but important. @@ -52,7 +52,7 @@ From top to bottom: The boundary between `ModelFacade` and the client layer is defined by canonical types. `ChatCompletionRequest`, `ChatCompletionResponse`, `EmbeddingRequest`, `EmbeddingResponse`, `ImageGenerationRequest`, `ImageGenerationResponse`, and `ProviderError`. These are plain dataclasses. No provider SDK objects cross this line. A `ModelClient` protocol defines the contract that all adapters implement, and that's the only interface the rest of the system sees. -## **Adaptive Request Admission: The Centerpiece** +## **Adaptive Throttling: The Centerpiece** With this client stack in place, we had the foundation to build something that wasn't possible before. Adaptive concurrency control. Let's start with the problem. @@ -97,23 +97,23 @@ Here's a subtlety that bit us during testing. When the system is running at capa Real pipelines aren't simple. A single provider+model combination might serve chat completions, embeddings, and image generation, potentially on different rate-limit budgets. And multiple [model aliases](/concepts/models/model-configs) in your pipeline might point to the same underlying provider and model (say, one alias for generation and another for judging, both hitting the same NVIDIA endpoint). -Request admission handles this with two-level keying: +The throttle manager handles this with two-level keying:
-![Two-level request-resource keying: global cap per provider+model, independent domain states for chat, embedding, image](/assets/owning-the-model-stack/request-keying.png) +![Two-level throttle keying: global cap per provider+model, independent domain states for chat, embedding, image](/assets/owning-the-model-stack/throttle-keying.png)
- **Global cap**: keyed by `(provider_name, model_id)`. When multiple model aliases target the same provider and model, the effective max is `min()` of their configured `max_parallel_requests`. This enforces the most conservative limit for shared upstream capacity, because the provider doesn't care what you *call* the model, it sees the same API key. -- **Domain state**: keyed by `(provider_name, model_id, request_domain)`. Each domain (`chat`, `embedding`, `image`, `healthcheck`) maintains its own AIMD state: `current_limit`, `in_flight`, `blocked_until`, `success_streak`, and `rate_limit_ceiling`. Domains float independently but are always capped by the global max. +- **Domain state**: keyed by `(provider_name, model_id, throttle_domain)`. Each domain (`chat`, `embedding`, `image`, `healthcheck`) maintains its own AIMD state: `current_limit`, `in_flight`, `blocked_until`, `success_streak`, and `rate_limit_ceiling`. Domains float independently but are always capped by the global max. The practical effect is that a burst of 429s on the chat route doesn't starve embedding requests, and vice versa. Each route adapts to its own capacity independently while respecting the shared upstream limit. ## **The Retry Boundary** -There's a design choice here that isn't obvious until you think about it, and getting it wrong would break the entire adaptive request-admission system. +There's a design choice here that isn't obvious until you think about it, and getting it wrong would break the entire throttling system. The transport layer (via `httpx` with `RetryTransport`) handles transient server failures like 502, 503, 504, and connection errors. These are hiccups. The server is temporarily broken. Retry with exponential backoff and jitter, and move on. @@ -121,25 +121,37 @@ But **429 is explicitly excluded from transport retries**.
-![Retry boundary: 502/503/504 retried at transport, 429 passed through to ModelRequestExecutor for AIMD feedback](/assets/owning-the-model-stack/retry-boundary.png) +![Retry boundary: 502/503/504 retried at transport, 429 passed through to ThrottledModelClient for AIMD feedback](/assets/owning-the-model-stack/retry-boundary.png)
-Why? Because if the retry layer swallows 429s, request admission never learns the provider is overloaded. The whole AIMD feedback loop depends on seeing raw rate-limit signals. A 429 must bubble up to `ModelRequestExecutor` so it can release the request lease as rate-limited, cut the concurrency limit, apply the cooldown, and record the ceiling. The next attempt then re-enters the request-admission path before making another HTTP call. +Why? Because if the retry layer swallows 429s, the throttle manager never learns the provider is overloaded. The whole AIMD feedback loop depends on seeing raw rate-limit signals. A 429 must bubble up to `ThrottledModelClient` so it can call `release_rate_limited()`, cut the concurrency limit, apply the cooldown, and record the ceiling. The next attempt then re-enters the throttle acquire path, waiting for a permit, before making another HTTP call. -The split is clean and worth remembering. Transport retries handle *server problems*. Request admission handles *capacity problems*. The provider is working fine, you're just sending too many requests. Conflating the two is how you get retry storms. +The split is clean and worth remembering. Transport retries handle *server problems*. Throttle adaptation handles *capacity problems*. The provider is working fine, you're just sending too many requests. Conflating the two is how you get retry storms. -One caveat: this boundary behaves differently depending on the execution mode. In async mode (currently experimental, enabled with `DATA_DESIGNER_ASYNC_ENGINE=1`), 429s bypass transport retries entirely and flow straight to `ModelRequestExecutor` for AIMD feedback — this is the full adaptive loop described above. In sync mode, 429s are retried at the transport layer since there's no salvage queue to re-attempt failed rows. AIMD is still wired up but only fires if all transport retries are exhausted. This is temporary — once the async engine graduates from experimental, it will become the default path and the sync codepath will be retired. See [Async All the Way Down](/dev-notes/async-all-the-way-down) for the full story on the async engine. +One caveat: this boundary behaves differently depending on the execution mode. In async mode (currently experimental, enabled with `DATA_DESIGNER_ASYNC_ENGINE=1`), 429s bypass transport retries entirely and flow straight to `ThrottledModelClient` for AIMD feedback — this is the full adaptive loop described above. In sync mode, 429s are retried at the transport layer since there's no salvage queue to re-attempt failed rows. AIMD is still wired up but only fires if all transport retries are exhausted. This is temporary — once the async engine graduates from experimental, it will become the default path and the sync codepath will be retired. See [Async All the Way Down](/dev-notes/async-all-the-way-down) for the full story on the async engine. ## **Configuration** -Adaptive request admission is designed to work well out of the box. The defaults are conservative and handle most workloads without tuning. The primary user-facing knob is still `max_parallel_requests` on your model's inference parameters, which sets the hard upper bound for concurrency. AIMD floats below it. +The throttle system is designed to work well out of the box. The defaults are conservative and handle most workloads without tuning. The primary user-facing knob is still `max_parallel_requests` on your model's inference parameters, which sets the hard upper bound for concurrency. AIMD floats below it. + +For workloads where you want to fine-tune the adaptation behavior, `ThrottleConfig` is available on `RunConfig`: ```python import data_designer.config as dd from data_designer.interface import DataDesigner data_designer = DataDesigner() +data_designer.set_run_config( + dd.RunConfig( + throttle=dd.ThrottleConfig( + reduce_factor=0.75, + success_window=25, + cooldown_seconds=2.0, + ceiling_overshoot=0.10, + ) + ) +) config_builder = dd.DataDesignerConfigBuilder( model_configs=[ dd.ModelConfig( @@ -161,11 +173,21 @@ create_result = data_designer.create( ) ``` -Most users will never need more than `max_parallel_requests`. The system adapts automatically, and capacity diagnostics are exposed through runtime logs and `AsyncCapacityPlan` rather than public controller tuning knobs. +| Parameter | Default | What it does | +|---|---|---| +| `reduce_factor` | 0.75 | Multiplicative decrease on 429 (0.75 = reduce by 25%) | +| `additive_increase` | 1 | How much to increase the limit after a success window | +| `success_window` | 25 | Consecutive successes before additive increase | +| `cooldown_seconds` | 2.0 | Default cooldown when no `Retry-After` header | +| `ceiling_overshoot` | 0.10 | How far above the observed ceiling to probe (10%) | + +In practice, the parameter most worth adjusting is `success_window`. A smaller window (say, 10) makes the system more aggressive about reclaiming throughput after a pullback, useful when you know the provider's capacity fluctuates quickly. A larger window (say, 50) makes it more conservative, better for providers with strict, stable rate limits where you'd rather not probe at all. + +Most users will never need to touch any of these. The system adapts automatically. ## **What It Looks Like in the Logs** -Request admission logs every state transition at `INFO` level, so the adaptation story is visible in your terminal as the run progresses. +`ThrottleManager` logs every state transition at `INFO` level, so the adaptation story is visible in your terminal as the run progresses. ``` # When the system hits a 429 and cuts concurrency: @@ -188,9 +210,9 @@ Reading these lines in sequence tells you exactly what happened: where the syste ## **Where This Leaves Us** -This shipped in Data Designer v0.5.4. If you're using Data Designer today, nothing changes in your pipeline code. `ModelFacade` is the same API it's always been. What changes is what happens underneath. The system now discovers provider capacity at runtime, isolates request state per route, and separates retry logic from rate-limit adaptation. Adaptive request admission is enabled by default for all providers. You don't opt in or configure anything; it just starts learning. If you want to see this fully in action, turn on async mode — see [Async All the Way Down](/dev-notes/async-all-the-way-down) for details. +This shipped in Data Designer v0.5.4. If you're using Data Designer today, nothing changes in your pipeline code. `ModelFacade` is the same API it's always been. What changes is what happens underneath. The system now discovers provider capacity at runtime, isolates throttle state per route, and separates retry logic from rate-limit adaptation. Adaptive throttling is enabled by default for all providers. You don't opt in or configure anything; it just starts learning. If you want to see this fully in action, turn on async mode — see [Async All the Way Down](/dev-notes/async-all-the-way-down) for details. -For most workloads, the defaults are all you need. Set `max_parallel_requests` to a generous upper bound and let AIMD find the right level. If you're running against a stack that returns 429s, the system adapts to the available capacity without public controller tuning. +For most workloads, the defaults are all you need. Set `max_parallel_requests` to a generous upper bound and let AIMD find the right level. If you're running against a stack that returns 429s, the system adapts to the available capacity without any tuning. If you want finer control, `ThrottleConfig` is there — but the goal is that you spend your time designing datasets, not tuning concurrency knobs. Key Resources: diff --git a/fern/versions/v0.5.9/pages/devnotes/index.mdx b/fern/versions/v0.5.9/pages/devnotes/index.mdx index 504c4b6f7..1d5fe68da 100644 --- a/fern/versions/v0.5.9/pages/devnotes/index.mdx +++ b/fern/versions/v0.5.9/pages/devnotes/index.mdx @@ -44,7 +44,7 @@ Welcome to NeMo Data Designer Dev Notes — in-depth guides, benchmark write-ups } diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py index 231aec159..df19c5716 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py @@ -22,6 +22,7 @@ AsyncCapacityPlan, AsyncCapacityRuntimeSnapshot, CapacityValue, + RequestAdmissionConfigSnapshot, RowGroupAdmission, ) from data_designer.engine.context import current_row_group @@ -57,6 +58,9 @@ from data_designer.engine.errors import DataDesignerError from data_designer.engine.models.clients.errors import ProviderError from data_designer.engine.models.errors import RETRYABLE_MODEL_ERRORS, GenerationValidationFailureError +from data_designer.engine.models.request_admission.config import RequestAdmissionConfig +from data_designer.engine.models.request_admission.resources import RequestResourceKey +from data_designer.engine.models.resources import ProviderModelKey, ProviderModelStaticCap from data_designer.engine.observability import ( RuntimeCorrelation, SchedulerAdmissionEvent, @@ -280,6 +284,10 @@ def __init__( self._observed_max_row_groups_in_flight = 0 self._observed_max_task_leases_by_resource: dict[str, int] = {} self._observed_max_queued_by_group: dict[str, int] = {} + self._observed_max_request_waiters_by_resource: dict[RequestResourceKey, int] = {} + self._observed_max_request_in_flight_by_resource: dict[RequestResourceKey, int] = {} + self._observed_max_provider_model_aggregate_in_flight: dict[ProviderModelKey, int] = {} + self._observed_max_request_domain_current_limits: dict[RequestResourceKey, int] = {} self._adaptive_row_group_admission = adaptive_row_group_admission self._row_group_admission_hard_cap = max(1, max_concurrent_row_groups) self._row_group_admission_target = ( @@ -438,6 +446,26 @@ def _record_observed_task_state(self) -> None: for group, count in queue_view.queued_by_group.items(): label = f"{group.kind}:{'/'.join(group.identity)}" self._observed_max_queued_by_group[label] = max(self._observed_max_queued_by_group.get(label, 0), count) + if self._request_pressure_provider is None: + return + for resource, snapshot in self._request_pressure_provider.snapshots().items(): + self._observed_max_request_waiters_by_resource[resource] = max( + self._observed_max_request_waiters_by_resource.get(resource, 0), + snapshot.waiters, + ) + self._observed_max_request_in_flight_by_resource[resource] = max( + self._observed_max_request_in_flight_by_resource.get(resource, 0), + snapshot.in_flight_count, + ) + self._observed_max_request_domain_current_limits[resource] = max( + self._observed_max_request_domain_current_limits.get(resource, 0), + snapshot.current_limit, + ) + for provider_model, snapshot in self._request_pressure_provider.global_snapshots().items(): + self._observed_max_provider_model_aggregate_in_flight[provider_model] = max( + self._observed_max_provider_model_aggregate_in_flight.get(provider_model, 0), + snapshot.aggregate_in_flight, + ) def _emit_scheduler_health_snapshot(self, reason: str) -> None: self._emit_scheduler_event( @@ -1758,6 +1786,53 @@ def task_admission_snapshot(self) -> object: def capacity_plan(self) -> AsyncCapacityPlan: """Return the scheduler-side async capacity explanation for this run.""" task_view = self._task_admission.view() + request_snapshots = ( + dict(self._request_pressure_provider.snapshots()) if self._request_pressure_provider is not None else {} + ) + provider_snapshots = ( + dict(self._request_pressure_provider.global_snapshots()) + if self._request_pressure_provider is not None + else {} + ) + request_resources = tuple(sorted(request_snapshots)) + provider_model_static_caps = { + provider_model: ProviderModelStaticCap( + cap=snapshot.static_cap, + aliases=snapshot.aliases, + raw_caps=snapshot.raw_caps, + ) + for provider_model, snapshot in provider_snapshots.items() + } + request_config = self._request_pressure_provider.config if self._request_pressure_provider is not None else None + request_config_snapshot = ( + RequestAdmissionConfigSnapshot.from_config(request_config) + if isinstance(request_config, RequestAdmissionConfig) + else None + ) + request_domain_initial_limits: dict[RequestResourceKey, int] = {} + if request_config_snapshot is not None: + request_domain_initial_limits.update(request_config_snapshot.initial_limits) + for resource, snapshot in request_snapshots.items(): + configured_initial = ( + request_config_snapshot.initial_limits.get(resource) if request_config_snapshot is not None else None + ) + request_domain_initial_limits[resource] = ( + max(1, min(configured_initial, snapshot.effective_max)) + if configured_initial is not None + else snapshot.effective_max + ) + request_domain_current_limits = { + resource: snapshot.current_limit for resource, snapshot in request_snapshots.items() + } + request_domain_effective_max = { + resource: snapshot.effective_max for resource, snapshot in request_snapshots.items() + } + request_domain_blocked_until = { + resource: snapshot.blocked_until_monotonic for resource, snapshot in request_snapshots.items() + } + provider_model_aggregate_in_flight = { + provider_model: snapshot.aggregate_in_flight for provider_model, snapshot in provider_snapshots.items() + } return AsyncCapacityPlan( configured=AsyncCapacityConfigured( buffer_size=CapacityValue(value=self._buffer_size, source="run_config"), @@ -1779,24 +1854,28 @@ def capacity_plan(self) -> AsyncCapacityPlan: source="engine_internal_config", ), request_resources=CapacityValue( - value=(), + value=request_resources, source="runtime_snapshot", - missing_reason="request admission resources are reported by the model registry request controller", + missing_reason=None if request_resources else "request admission has not observed any resources", ), provider_model_static_caps=CapacityValue( - value={}, - source="runtime_snapshot", - missing_reason="request admission caps are reported by the model registry request controller", + value=provider_model_static_caps, + source="model_metadata", + missing_reason=None if provider_model_static_caps else "request admission has no registered models", ), request_domain_initial_limits=CapacityValue( - value={}, - source="runtime_snapshot", - missing_reason="request admission limits are reported by the model registry request controller", + value=request_domain_initial_limits, + source="engine_internal_config" if request_config_snapshot is not None else "runtime_snapshot", + missing_reason=None + if request_domain_initial_limits + else "request admission has not observed any domain limits", ), request_admission_config=CapacityValue( - value=None, - source="runtime_snapshot", - missing_reason="request admission config is owned by the model registry request controller", + value=request_config_snapshot, + source="engine_internal_config", + missing_reason=None + if request_config_snapshot is not None + else "request admission config is not exposed by the pressure provider", ), transport_pool_limits=CapacityValue( value={}, @@ -1804,11 +1883,30 @@ def capacity_plan(self) -> AsyncCapacityPlan: missing_reason="transport pool utilization is adapter-specific", ), ), - runtime_snapshot=AsyncCapacityRuntimeSnapshot(), + runtime_snapshot=AsyncCapacityRuntimeSnapshot( + request_domain_current_limits=request_domain_current_limits, + request_domain_effective_max=request_domain_effective_max, + request_domain_blocked_until=request_domain_blocked_until, + provider_model_aggregate_in_flight=provider_model_aggregate_in_flight, + ), observed_maxima=AsyncCapacityObservedMaxima( row_groups_in_flight=self._observed_max_row_groups_in_flight, queued_tasks_by_group=dict(self._observed_max_queued_by_group), task_leases_by_resource=dict(self._observed_max_task_leases_by_resource or task_view.leased_resources), + request_waiters_by_resource=dict( + self._observed_max_request_waiters_by_resource + or {resource: snapshot.waiters for resource, snapshot in request_snapshots.items()} + ), + request_in_flight_by_resource=dict( + self._observed_max_request_in_flight_by_resource + or {resource: snapshot.in_flight_count for resource, snapshot in request_snapshots.items()} + ), + provider_model_aggregate_in_flight=dict( + self._observed_max_provider_model_aggregate_in_flight or provider_model_aggregate_in_flight + ), + request_domain_current_limits=dict( + self._observed_max_request_domain_current_limits or request_domain_current_limits + ), transport_pool_utilization=None, ), ) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py index 73737629e..b19871319 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py @@ -1045,6 +1045,7 @@ def on_before_checkpoint(rg_id: int, rg_size: int) -> None: buffer_size=buffer_size, progress_interval=self._resource_provider.run_config.progress_interval, progress_bar=self._resource_provider.run_config.progress_bar, + request_pressure_provider=self._resource_provider.model_registry.request_admission, ) return scheduler, buffer_manager diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_admission.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_admission.py index 4dffb47f7..89fb3e280 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_admission.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_admission.py @@ -5,7 +5,7 @@ import time import uuid -from collections import Counter, defaultdict +from collections import Counter, defaultdict, deque from collections.abc import Mapping from dataclasses import dataclass, field from typing import Literal @@ -35,6 +35,7 @@ "wrong_controller_generation", "unknown_lease", ] +RELEASED_TASK_LEASE_HISTORY_LIMIT = 8192 @dataclass(frozen=True) @@ -113,6 +114,7 @@ def __init__( self._generation = uuid.uuid4().hex self._leases: dict[str, TaskAdmissionLease] = {} self._released: set[str] = set() + self._released_order: deque[str] = deque(maxlen=RELEASED_TASK_LEASE_HISTORY_LIMIT) self._leased_by_resource: Counter[SchedulerResourceKey] = Counter() self._leased_by_group: dict[TaskGroupKey, Counter[SchedulerResourceKey]] = defaultdict(Counter) self._running_by_group: Counter[TaskGroupKey] = Counter() @@ -184,7 +186,7 @@ def release(self, lease: TaskAdmissionLease) -> ReleaseResult: self._release_diagnostics["stale_lease"] += 1 return ReleaseResult(released=False, reason="stale_lease") - self._released.add(lease.lease_id) + self._remember_released(lease.lease_id) for resource, amount in active.resources.items(): self._leased_by_resource[resource] = max(0, self._leased_by_resource[resource] - amount) self._leased_by_group[active.item.group.key][resource] = max( @@ -259,3 +261,12 @@ def _missing_resources( def _apply_delta(self, delta: PolicyStateDelta) -> None: for key, change in delta.debt_changes.items(): self._policy_debt[key] = max(0, self._policy_debt[key] + change) + + def _remember_released(self, lease_id: str) -> None: + if lease_id in self._released: + return + maxlen = self._released_order.maxlen + if maxlen is not None and len(self._released_order) >= maxlen: + self._released.discard(self._released_order[0]) + self._released.add(lease_id) + self._released_order.append(lease_id) diff --git a/packages/data-designer-engine/src/data_designer/engine/models/clients/factory.py b/packages/data-designer-engine/src/data_designer/engine/models/clients/factory.py index 57483ec01..398d151a4 100644 --- a/packages/data-designer-engine/src/data_designer/engine/models/clients/factory.py +++ b/packages/data-designer-engine/src/data_designer/engine/models/clients/factory.py @@ -18,6 +18,7 @@ from data_designer.engine.secret_resolver import SecretResolver _SUPPORTED_PROVIDER_TYPES = ("openai", "anthropic") +_NO_TRANSPORT_RETRY_CONFIG = RetryConfig(max_retries=0, retryable_status_codes=frozenset()) def create_model_client( @@ -71,13 +72,14 @@ def create_model_client( max_parallel = model_config.inference_parameters.max_parallel_requests raw_timeout = model_config.inference_parameters.timeout timeout_s = float(raw_timeout if raw_timeout is not None else 60) + adapter_retry_config = _NO_TRANSPORT_RETRY_CONFIG if request_admission is not None else retry_config if provider.provider_type == "openai": client: ModelClient = OpenAICompatibleClient( provider_name=provider.name, endpoint=provider.endpoint, api_key=api_key, - retry_config=retry_config, + retry_config=adapter_retry_config, max_parallel_requests=max_parallel, timeout_s=timeout_s, concurrency_mode=client_concurrency_mode, @@ -87,7 +89,7 @@ def create_model_client( provider_name=provider.name, endpoint=provider.endpoint, api_key=api_key, - retry_config=retry_config, + retry_config=adapter_retry_config, max_parallel_requests=max_parallel, timeout_s=timeout_s, concurrency_mode=client_concurrency_mode, @@ -111,6 +113,7 @@ def create_model_client( provider_name=provider.name, model_id=model_config.model, event_sink=request_event_sink, + retry_config=retry_config, ) return client diff --git a/packages/data-designer-engine/src/data_designer/engine/models/clients/model_request_executor.py b/packages/data-designer-engine/src/data_designer/engine/models/clients/model_request_executor.py index 01f917585..6782c16bc 100644 --- a/packages/data-designer-engine/src/data_designer/engine/models/clients/model_request_executor.py +++ b/packages/data-designer-engine/src/data_designer/engine/models/clients/model_request_executor.py @@ -5,11 +5,13 @@ import asyncio import logging +import time import uuid from typing import TYPE_CHECKING, TypeVar from data_designer.engine.models.clients.base import ModelClient from data_designer.engine.models.clients.errors import ProviderError, ProviderErrorKind +from data_designer.engine.models.clients.retry import RetryConfig from data_designer.engine.models.clients.types import ( ChatCompletionRequest, ChatCompletionResponse, @@ -56,6 +58,7 @@ def __init__( model_id: str, event_sink: RequestAdmissionEventSink | None = None, resource_resolver: RequestResourceResolver | None = None, + retry_config: RetryConfig | None = None, ) -> None: self._inner = inner self._request_admission = request_admission @@ -63,6 +66,7 @@ def __init__( self._model_id = model_id self._event_sink = event_sink self._resource_resolver = resource_resolver or RequestResourceResolver() + self._retry_config = retry_config or RetryConfig() self._event_sequence = 0 @property @@ -103,6 +107,16 @@ async def agenerate_image(self, request: ImageGenerationRequest) -> ImageGenerat return await self._execute_async(self._image_domain(request), lambda: self._inner.agenerate_image(request)) def _execute_sync(self, domain: RequestDomain, call: Callable[[], _T]) -> _T: + for attempt in range(self._max_attempts()): + try: + return self._execute_sync_attempt(domain, call) + except ProviderError as exc: + if not self._should_retry(exc, attempt): + raise + self._sleep_before_retry(attempt) + raise RuntimeError("unreachable request retry state") + + def _execute_sync_attempt(self, domain: RequestDomain, call: Callable[[], _T]) -> _T: item = self._item(domain) try: lease = self._request_admission.acquire_sync(item) @@ -141,6 +155,16 @@ def _execute_sync(self, domain: RequestDomain, call: Callable[[], _T]) -> _T: return result async def _execute_async(self, domain: RequestDomain, call: Callable[[], Awaitable[_T]]) -> _T: + for attempt in range(self._max_attempts()): + try: + return await self._execute_async_attempt(domain, call) + except ProviderError as exc: + if not self._should_retry(exc, attempt): + raise + await self._async_sleep_before_retry(attempt) + raise RuntimeError("unreachable request retry state") + + async def _execute_async_attempt(self, domain: RequestDomain, call: Callable[[], Awaitable[_T]]) -> _T: item = self._item(domain) try: lease = await self._request_admission.acquire_async(item) @@ -187,6 +211,36 @@ async def _execute_async(self, domain: RequestDomain, call: Callable[[], Awaitab ) return result + def _max_attempts(self) -> int: + return max(1, self._retry_config.max_retries + 1) + + def _should_retry(self, exc: ProviderError, attempt: int) -> bool: + if attempt >= self._max_attempts() - 1: + return False + if isinstance(exc.__cause__, RequestAdmissionError): + return False + if exc.kind == ProviderErrorKind.RATE_LIMIT: + return False + if exc.status_code is not None: + return exc.status_code in self._retry_config.retryable_status_codes + return exc.kind == ProviderErrorKind.API_CONNECTION + + def _sleep_before_retry(self, attempt: int) -> None: + delay = self._retry_delay_seconds(attempt) + if delay > 0.0: + time.sleep(delay) + + async def _async_sleep_before_retry(self, attempt: int) -> None: + delay = self._retry_delay_seconds(attempt) + if delay > 0.0: + await asyncio.sleep(delay) + + def _retry_delay_seconds(self, attempt: int) -> float: + if self._retry_config.backoff_factor <= 0.0: + return 0.0 + delay = self._retry_config.backoff_factor * (2**attempt) + return min(delay, self._retry_config.max_backoff_wait) + def _release_provider_error(self, lease: RequestAdmissionLease, exc: ProviderError) -> None: if exc.kind == ProviderErrorKind.RATE_LIMIT: outcome = RequestReleaseOutcome(kind="rate_limited", retry_after_seconds=exc.retry_after) diff --git a/packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py b/packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py index 9d63858fe..09e4dd512 100644 --- a/packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py +++ b/packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py @@ -9,7 +9,7 @@ import threading import time import uuid -from collections import Counter +from collections import Counter, deque from collections.abc import Mapping from dataclasses import dataclass, field from typing import Literal, Protocol @@ -46,6 +46,8 @@ "shutdown", "hard_policy_denial", ] +RELEASED_LEASE_HISTORY_LIMIT = 8192 +_TERMINAL_DENIAL_REASONS: frozenset[RequestDenyReason] = frozenset({"hard_policy_denial", "shutdown"}) @dataclass(frozen=True) @@ -119,6 +121,7 @@ def __init__( self._domains: dict[RequestResourceKey, AdaptiveRequestLimitState] = {} self._active_leases: dict[str, RequestAdmissionLease] = {} self._released: set[str] = set() + self._released_order: deque[str] = deque(maxlen=RELEASED_LEASE_HISTORY_LIMIT) self._aggregate_in_flight: Counter[ProviderModelKey] = Counter() self._aggregate_active_leases: Counter[ProviderModelKey] = Counter() self._sequence = 0 @@ -223,6 +226,11 @@ def acquire_sync(self, item: RequestAdmissionItem) -> RequestAdmissionLease: if waiter.assigned_lease is not None: return waiter.assigned_lease now = time.monotonic() + if (denied := self._terminal_denial_for(item, now)) is not None: + self._remove_waiter_locked(waiter) + events.append(self._request_event_locked("request_acquire_denied", item=item, decision=denied)) + self._condition.notify_all() + raise RequestAdmissionError(denied) if deadline is not None and now >= deadline: self._remove_waiter_locked(waiter) denied = RequestAdmissionDenied( @@ -264,6 +272,11 @@ async def acquire_async(self, item: RequestAdmissionItem) -> RequestAdmissionLea if waiter.assigned_lease is not None: return waiter.assigned_lease now = time.monotonic() + if (denied := self._terminal_denial_for(item, now)) is not None: + self._remove_waiter_locked(waiter) + events.append(self._request_event_locked("request_acquire_denied", item=item, decision=denied)) + self._condition.notify_all() + raise RequestAdmissionError(denied) if deadline is not None and now >= deadline: self._remove_waiter_locked(waiter) denied = RequestAdmissionDenied( @@ -335,7 +348,7 @@ def release(self, lease: RequestAdmissionLease, outcome: RequestReleaseOutcome) ) ) else: - self._released.add(lease.lease_id) + self._remember_released_locked(lease.lease_id) resource = active.item.resource provider_model = resource.provider_model_key state = self._get_or_create_state(resource) @@ -472,6 +485,21 @@ def _denial_for(self, item: RequestAdmissionItem, now: float) -> RequestAdmissio ) return None + def _terminal_denial_for(self, item: RequestAdmissionItem, now: float) -> RequestAdmissionDenied | None: + denied = self._denial_for(item, now) + if denied is None or denied.reason not in _TERMINAL_DENIAL_REASONS: + return None + return denied + + def _remember_released_locked(self, lease_id: str) -> None: + if lease_id in self._released: + return + maxlen = self._released_order.maxlen + if maxlen is not None and len(self._released_order) >= maxlen: + self._released.discard(self._released_order[0]) + self._released.add(lease_id) + self._released_order.append(lease_id) + def _acquire_locked(self, item: RequestAdmissionItem, now: float) -> RequestAdmissionLease: resource = item.resource provider_model = resource.provider_model_key diff --git a/packages/data-designer-engine/src/data_designer/engine/models/request_admission/pressure.py b/packages/data-designer-engine/src/data_designer/engine/models/request_admission/pressure.py index 9f0386dc7..a268f8898 100644 --- a/packages/data-designer-engine/src/data_designer/engine/models/request_admission/pressure.py +++ b/packages/data-designer-engine/src/data_designer/engine/models/request_admission/pressure.py @@ -7,6 +7,7 @@ from dataclasses import dataclass from typing import Protocol +from data_designer.engine.models.request_admission.config import RequestAdmissionConfig from data_designer.engine.models.request_admission.resources import RequestDomain, RequestResourceKey from data_designer.engine.models.resources import ProviderModelKey @@ -43,6 +44,9 @@ class ProviderModelPressureSnapshot: class RequestPressureSnapshotProvider(Protocol): + @property + def config(self) -> RequestAdmissionConfig | None: ... + def snapshot(self, resource: RequestResourceKey) -> RequestPressureSnapshot | None: ... def snapshots(self) -> Mapping[RequestResourceKey, RequestPressureSnapshot]: ... diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py index e1b6382a4..fbb2fd469 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py @@ -12,6 +12,7 @@ stable_task_id, ) from data_designer.engine.dataset_builders.scheduling.task_admission import ( + RELEASED_TASK_LEASE_HISTORY_LIMIT, TaskAdmissionConfig, TaskAdmissionController, TaskAdmissionDenied, @@ -85,6 +86,23 @@ def test_task_admission_duplicate_release_does_not_increase_capacity() -> None: assert controller.view().resources_available["submission"] == 1 +def test_task_admission_released_history_is_bounded() -> None: + controller = TaskAdmissionController(TaskAdmissionConfig(submission_capacity=1)) + first_lease: TaskAdmissionLease | None = None + for index in range(RELEASED_TASK_LEASE_HISTORY_LIMIT + 5): + item = _item(f"task-{index}") + lease = controller.try_acquire(item, _queue_view(item)) + assert isinstance(lease, TaskAdmissionLease) + first_lease = first_lease or lease + controller.release(lease) + + assert len(controller._released) == RELEASED_TASK_LEASE_HISTORY_LIMIT + assert len(controller._released_order) == RELEASED_TASK_LEASE_HISTORY_LIMIT + assert controller._released_order.maxlen == RELEASED_TASK_LEASE_HISTORY_LIMIT + assert first_lease is not None + assert controller.release(first_lease).reason == "unknown_lease" + + def test_task_admission_group_cap_yields_to_peer_pressure() -> None: group = TaskGroupSpec(TaskGroupKey(kind="model", identity=("provider", "model")), admitted_limit=1) controller = TaskAdmissionController(TaskAdmissionConfig(submission_capacity=2)) diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py index a3fe5f902..83791e276 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py @@ -46,8 +46,19 @@ ModelRateLimitError, ModelTimeoutError, ) +from data_designer.engine.models.request_admission.config import RequestAdmissionConfig +from data_designer.engine.models.request_admission.controller import ( + AdaptiveRequestAdmissionController, + RequestAdmissionLease, +) +from data_designer.engine.models.request_admission.outcomes import RequestReleaseOutcome from data_designer.engine.models.request_admission.pressure import RequestPressureSnapshot -from data_designer.engine.models.request_admission.resources import RequestDomain, RequestResourceKey +from data_designer.engine.models.request_admission.resources import ( + RequestAdmissionItem, + RequestDomain, + RequestGroupSpec, + RequestResourceKey, +) from data_designer.engine.models.resources import ProviderModelKey from data_designer.engine.observability import InMemoryAdmissionEventSink from data_designer.engine.resources.resource_provider import ResourceProvider @@ -2321,6 +2332,10 @@ class _StaticRequestPressureProvider: def __init__(self, snapshots: dict[RequestResourceKey, RequestPressureSnapshot]) -> None: self._snapshots = snapshots + @property + def config(self) -> RequestAdmissionConfig | None: + return None + def snapshot(self, resource: RequestResourceKey) -> RequestPressureSnapshot | None: return self._snapshots.get(resource) @@ -2645,6 +2660,59 @@ async def test_scheduler_capacity_plan_observes_buffer_backpressure() -> None: assert max(plan.observed_maxima.task_leases_by_resource.values()) <= 2 +def test_scheduler_capacity_plan_reports_request_admission_state() -> None: + resource = RequestResourceKey("provider", "model", RequestDomain.CHAT) + request_admission = AdaptiveRequestAdmissionController( + RequestAdmissionConfig(initial_limits={resource: 2}, max_limit_clamps={resource: 3}) + ) + request_admission.register( + provider_name="provider", + model_id="model", + alias="primary", + max_parallel_requests=4, + ) + lease = request_admission.try_acquire(RequestAdmissionItem(resource, RequestGroupSpec(resource))) + assert isinstance(lease, RequestAdmissionLease) + + scheduler, _tracker = _build_simple_pipeline() + scheduler._request_pressure_provider = request_admission + scheduler._record_observed_task_state() + plan = scheduler.capacity_plan() + + assert plan.configured.request_resources.value == (resource,) + assert plan.configured.request_domain_initial_limits.value[resource] == 2 + assert plan.configured.request_admission_config.value is not None + assert plan.configured.provider_model_static_caps.value[ProviderModelKey("provider", "model")].cap == 4 + assert plan.runtime_snapshot.request_domain_current_limits[resource] == 2 + assert plan.runtime_snapshot.request_domain_effective_max[resource] == 3 + assert plan.runtime_snapshot.provider_model_aggregate_in_flight[ProviderModelKey("provider", "model")] == 1 + assert plan.observed_maxima.request_in_flight_by_resource[resource] == 1 + assert plan.observed_maxima.provider_model_aggregate_in_flight[ProviderModelKey("provider", "model")] == 1 + request_admission.release(lease, RequestReleaseOutcome(kind="success")) + + +def test_scheduler_capacity_plan_reports_default_request_initial_limit_after_aimd_drop() -> None: + resource = RequestResourceKey("provider", "model", RequestDomain.CHAT) + request_admission = AdaptiveRequestAdmissionController() + request_admission.register( + provider_name="provider", + model_id="model", + alias="primary", + max_parallel_requests=4, + ) + lease = request_admission.try_acquire(RequestAdmissionItem(resource, RequestGroupSpec(resource))) + assert isinstance(lease, RequestAdmissionLease) + request_admission.release(lease, RequestReleaseOutcome(kind="rate_limited")) + + scheduler, _tracker = _build_simple_pipeline() + scheduler._request_pressure_provider = request_admission + plan = scheduler.capacity_plan() + + assert plan.configured.request_domain_initial_limits.value[resource] == 4 + assert plan.runtime_snapshot.request_domain_effective_max[resource] == 4 + assert plan.runtime_snapshot.request_domain_current_limits[resource] == 3 + + @pytest.mark.asyncio(loop_scope="session") async def test_scheduler_emits_job_health_and_row_group_telemetry() -> None: provider = _mock_provider() diff --git a/packages/data-designer-engine/tests/engine/models/clients/test_factory.py b/packages/data-designer-engine/tests/engine/models/clients/test_factory.py index df86e12d8..f809db8be 100644 --- a/packages/data-designer-engine/tests/engine/models/clients/test_factory.py +++ b/packages/data-designer-engine/tests/engine/models/clients/test_factory.py @@ -187,15 +187,18 @@ def test_request_admission_wraps_openai_client( openai_registry: ModelProviderRegistry, ) -> None: controller = AdaptiveRequestAdmissionController() + retry_config = RetryConfig(max_retries=5) client = create_model_client( openai_model_config, secret_resolver, openai_registry, - retry_config=RetryConfig(), + retry_config=retry_config, request_admission=controller, ) assert isinstance(client, ModelRequestExecutor) assert isinstance(client._inner, OpenAICompatibleClient) + assert client._retry_config is retry_config + assert client._inner._retry_config.max_retries == 0 def test_request_admission_wraps_anthropic_client( diff --git a/packages/data-designer-engine/tests/engine/models/clients/test_model_request_executor.py b/packages/data-designer-engine/tests/engine/models/clients/test_model_request_executor.py index 9eb310edd..2c44a0c00 100644 --- a/packages/data-designer-engine/tests/engine/models/clients/test_model_request_executor.py +++ b/packages/data-designer-engine/tests/engine/models/clients/test_model_request_executor.py @@ -10,6 +10,7 @@ from data_designer.engine.models.clients.errors import ProviderError, ProviderErrorKind from data_designer.engine.models.clients.model_request_executor import ModelRequestExecutor +from data_designer.engine.models.clients.retry import RetryConfig from data_designer.engine.models.clients.types import ( AssistantMessage, ChatCompletionRequest, @@ -100,6 +101,40 @@ async def agenerate_image(self, request: ImageGenerationRequest) -> ImageGenerat return ImageGenerationResponse(images=[ImagePayload("image")]) +class _FlakyClient(_Client): + def __init__( + self, + *, + failures: int, + kind: ProviderErrorKind = ProviderErrorKind.INTERNAL_SERVER, + status_code: int | None = 503, + ) -> None: + super().__init__() + self.failures = failures + self.calls = 0 + self.kind = kind + self.status_code = status_code + + def _maybe_fail(self) -> None: + self.calls += 1 + if self.calls <= self.failures: + raise ProviderError( + kind=self.kind, + message="temporarily unavailable", + status_code=self.status_code, + provider_name="nvidia", + model_name="nemotron", + ) + + def completion(self, request: ChatCompletionRequest) -> ChatCompletionResponse: + self._maybe_fail() + return ChatCompletionResponse(AssistantMessage(content="ok")) + + async def acompletion(self, request: ChatCompletionRequest) -> ChatCompletionResponse: + self._maybe_fail() + return ChatCompletionResponse(AssistantMessage(content="ok")) + + def _executor() -> tuple[ModelRequestExecutor, AdaptiveRequestAdmissionController, _Client]: controller = AdaptiveRequestAdmissionController() controller.register(provider_name="nvidia", model_id="nemotron", alias="default", max_parallel_requests=1) @@ -138,6 +173,76 @@ def test_model_request_executor_classifies_rate_limit() -> None: assert snapshot.cooldown_remaining_seconds > 0 +def test_model_request_executor_retries_provider_503_with_fresh_leases() -> None: + sink = InMemoryAdmissionEventSink() + controller = AdaptiveRequestAdmissionController(event_sink=sink) + controller.register(provider_name="nvidia", model_id="nemotron", alias="default", max_parallel_requests=1) + client = _FlakyClient(failures=1) + executor = ModelRequestExecutor( + client, + controller, + "nvidia", + "nemotron", + event_sink=sink, + retry_config=RetryConfig(max_retries=1, backoff_factor=0.0), + ) + + response = executor.completion(ChatCompletionRequest(model="nemotron", messages=[])) + + assert response.message.content == "ok" + assert client.calls == 2 + acquired = [event for event in sink.request_events if event.event_kind == "request_lease_acquired"] + released = [event for event in sink.request_events if event.event_kind == "request_lease_released"] + assert len(acquired) == 2 + assert len(released) == 2 + assert {event.request_lease_id for event in acquired} == {event.request_lease_id for event in released} + + +def test_model_request_executor_does_not_retry_provider_timeout_without_status() -> None: + controller = AdaptiveRequestAdmissionController() + controller.register(provider_name="nvidia", model_id="nemotron", alias="default", max_parallel_requests=1) + client = _FlakyClient(failures=2, kind=ProviderErrorKind.TIMEOUT, status_code=None) + executor = ModelRequestExecutor( + client, + controller, + "nvidia", + "nemotron", + retry_config=RetryConfig(max_retries=2, backoff_factor=0.0), + ) + + with pytest.raises(ProviderError) as exc_info: + executor.completion(ChatCompletionRequest(model="nemotron", messages=[])) + + assert exc_info.value.kind == ProviderErrorKind.TIMEOUT + assert client.calls == 1 + + +@pytest.mark.asyncio(loop_scope="session") +async def test_model_request_executor_retries_async_provider_503_with_fresh_leases() -> None: + sink = InMemoryAdmissionEventSink() + controller = AdaptiveRequestAdmissionController(event_sink=sink) + controller.register(provider_name="nvidia", model_id="nemotron", alias="default", max_parallel_requests=1) + client = _FlakyClient(failures=1) + executor = ModelRequestExecutor( + client, + controller, + "nvidia", + "nemotron", + event_sink=sink, + retry_config=RetryConfig(max_retries=1, backoff_factor=0.0), + ) + + response = await executor.acompletion(ChatCompletionRequest(model="nemotron", messages=[])) + + assert response.message.content == "ok" + assert client.calls == 2 + acquired = [event for event in sink.request_events if event.event_kind == "request_lease_acquired"] + released = [event for event in sink.request_events if event.event_kind == "request_lease_released"] + assert len(acquired) == 2 + assert len(released) == 2 + assert {event.request_lease_id for event in acquired} == {event.request_lease_id for event in released} + + @pytest.mark.asyncio(loop_scope="session") async def test_model_request_executor_releases_async_cancellation() -> None: class _SlowClient(_Client): diff --git a/packages/data-designer-engine/tests/engine/models/request_admission/test_controller.py b/packages/data-designer-engine/tests/engine/models/request_admission/test_controller.py index b62237a48..6fc65d227 100644 --- a/packages/data-designer-engine/tests/engine/models/request_admission/test_controller.py +++ b/packages/data-designer-engine/tests/engine/models/request_admission/test_controller.py @@ -10,6 +10,7 @@ from data_designer.engine.models.request_admission.config import RequestAdmissionConfig from data_designer.engine.models.request_admission.controller import ( + RELEASED_LEASE_HISTORY_LIMIT, AdaptiveRequestAdmissionController, RequestAdmissionDenied, RequestAdmissionError, @@ -154,6 +155,18 @@ def test_request_admission_zero_sync_timeout_is_immediate() -> None: controller.release(lease, RequestReleaseOutcome(kind="success")) +def test_request_admission_sync_unregistered_provider_raises_hard_denial() -> None: + controller = AdaptiveRequestAdmissionController() + + with pytest.raises(RequestAdmissionError) as exc_info: + controller.acquire_sync(_item()) + + assert exc_info.value.decision.reason == "hard_policy_denial" + snapshot = controller.pressure.snapshot(_item().resource) + assert snapshot is not None + assert snapshot.waiters == 0 + + def test_request_admission_logs_sink_failures(caplog: pytest.LogCaptureFixture) -> None: caplog.set_level(logging.WARNING, logger="data_designer.engine.models.request_admission.controller") controller = AdaptiveRequestAdmissionController(event_sink=_BrokenRequestSink()) @@ -235,24 +248,34 @@ async def test_acquire_async_wakes_when_release_assigns_lease(monkeypatch: pytes @pytest.mark.asyncio(loop_scope="session") -async def test_register_wakes_unregistered_async_waiter(monkeypatch: pytest.MonkeyPatch) -> None: +async def test_acquire_async_unregistered_provider_raises_hard_denial(monkeypatch: pytest.MonkeyPatch) -> None: controller = AdaptiveRequestAdmissionController() monkeypatch.setattr(controller, "_wait_seconds_locked", lambda _item, _now, _deadline: 10.0) queued = _item(RequestDomain.CHAT, timeout=30.0) - queued_task = asyncio.create_task(controller.acquire_async(queued)) - for _ in range(20): - snapshot = controller.pressure.snapshot(queued.resource) - if snapshot is not None and snapshot.waiters == 1: - break - await asyncio.sleep(0) - else: - raise AssertionError("async waiter did not enqueue") + with pytest.raises(RequestAdmissionError) as exc_info: + await asyncio.wait_for(controller.acquire_async(queued), timeout=0.5) - controller.register(provider_name="nvidia", model_id="nemotron", alias="default", max_parallel_requests=1) - queued_lease = await asyncio.wait_for(queued_task, timeout=0.5) + assert exc_info.value.decision.reason == "hard_policy_denial" + snapshot = controller.pressure.snapshot(queued.resource) + assert snapshot is not None + assert snapshot.waiters == 0 - controller.release(queued_lease, RequestReleaseOutcome(kind="success")) + +def test_request_admission_released_history_is_bounded() -> None: + controller = _controller(cap=1) + first_lease: RequestAdmissionLease | None = None + for _ in range(RELEASED_LEASE_HISTORY_LIMIT + 5): + lease = controller.try_acquire(_item()) + assert isinstance(lease, RequestAdmissionLease) + first_lease = first_lease or lease + controller.release(lease, RequestReleaseOutcome(kind="success")) + + assert len(controller._released) == RELEASED_LEASE_HISTORY_LIMIT + assert len(controller._released_order) == RELEASED_LEASE_HISTORY_LIMIT + assert controller._released_order.maxlen == RELEASED_LEASE_HISTORY_LIMIT + assert first_lease is not None + assert controller.release(first_lease, RequestReleaseOutcome(kind="success")).reason == "unknown_lease" @pytest.mark.asyncio(loop_scope="session") diff --git a/packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py b/packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py index 595977464..f3088293c 100644 --- a/packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py +++ b/packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py @@ -39,6 +39,11 @@ def _load_idle_regression_module() -> ModuleType: return module +def _load_idle_report_module() -> ModuleType: + _load_idle_regression_module() + return sys.modules["generate_async_scheduling_idle_report"] + + def _capacity_plan() -> SimpleNamespace: return SimpleNamespace( observed_maxima=SimpleNamespace( @@ -264,6 +269,24 @@ def test_idle_regression_detects_bad_idle_partition() -> None: assert any(not check.passed and check.name == "row-scale/rows-64 idle partition" for check in checks) +def test_idle_regression_skip_run_does_not_generate_missing_artifacts(tmp_path: Path) -> None: + report = _load_idle_report_module() + case = report.IdleBenchmarkCase( + name="missing", + sweep="reuse", + record_count=1, + buffer_size=1, + row_group_concurrency=1, + task_admission_capacity=1, + fanout_width=1, + upstream_latency_seconds=0.0, + downstream_latency_seconds=0.0, + ) + + with pytest.raises(FileNotFoundError, match="Cannot reuse benchmark artifact"): + report._run_or_load_case(case, tmp_path, skip_run=True) + + def test_idle_regression_requires_adaptation_controls() -> None: regression = _load_idle_regression_module() summary = _idle_regression_summary() diff --git a/scripts/benchmarks/generate_async_scheduling_idle_report.py b/scripts/benchmarks/generate_async_scheduling_idle_report.py index 707706090..cfdb98feb 100644 --- a/scripts/benchmarks/generate_async_scheduling_idle_report.py +++ b/scripts/benchmarks/generate_async_scheduling_idle_report.py @@ -308,6 +308,7 @@ def _build_cases(quick: bool) -> list[IdleBenchmarkCase]: fanout_width=1, upstream_latency_seconds=0.006, downstream_latency_seconds=0.0003, + iterations=3, ), replace( base, @@ -320,6 +321,7 @@ def _build_cases(quick: bool) -> list[IdleBenchmarkCase]: fanout_width=4, upstream_latency_seconds=0.006, downstream_latency_seconds=0.0003, + iterations=3, ), ) ) @@ -432,7 +434,11 @@ def _build_cases(quick: bool) -> list[IdleBenchmarkCase]: def _run_or_load_case(case: IdleBenchmarkCase, artifact_dir: Path, *, skip_run: bool) -> IdleBenchmarkResult: output_dir = artifact_dir / case.sweep / case.name json_path = output_dir / "async_scheduling_benchmark.json" - if not skip_run or not json_path.exists(): + if skip_run and not json_path.exists(): + raise FileNotFoundError( + f"Cannot reuse benchmark artifact for {case.sweep}/{case.name}: {json_path} does not exist." + ) + if not skip_run: output_dir.mkdir(parents=True, exist_ok=True) command = [ sys.executable, diff --git a/scripts/benchmarks/run_async_scheduling_idle_regression.py b/scripts/benchmarks/run_async_scheduling_idle_regression.py index 482f6a8e1..164459b55 100644 --- a/scripts/benchmarks/run_async_scheduling_idle_regression.py +++ b/scripts/benchmarks/run_async_scheduling_idle_regression.py @@ -322,13 +322,14 @@ def _suite_behavior_checks(cases: Mapping[str, Mapping[str, Any]], *, mode: str) ) ) if "stress-shape/wide-frontier-high-cap" in cases: + floor = _wide_high_capacity_stress_utilization_floor(mode) checks.append( _check( "wide high-capacity stress utilization", category="optimization", - passed=_metric(cases["stress-shape/wide-frontier-high-cap"], "llm_wait_utilization_ratio") >= 0.55, + passed=_metric(cases["stress-shape/wide-frontier-high-cap"], "llm_wait_utilization_ratio") >= floor, observed=_metric(cases["stress-shape/wide-frontier-high-cap"], "llm_wait_utilization_ratio"), - expected=">= 0.55 llm_wait utilization", + expected=f">= {floor:.2f} llm_wait utilization", ) ) if { @@ -574,6 +575,10 @@ def _wide_row_group_utilization_floor(mode: str) -> float: return 0.55 if mode == "quick" else 0.70 +def _wide_high_capacity_stress_utilization_floor(mode: str) -> float: + return 0.53 if mode == "quick" else 0.55 + + def _checks_payload(checks: Sequence[RegressionCheck]) -> dict[str, Any]: return { "checks_schema_version": CHECKS_SCHEMA_VERSION, diff --git a/tests_e2e/tests/test_mcp_demo.py b/tests_e2e/tests/test_mcp_demo.py index 163e904cb..d7bca2e9e 100644 --- a/tests_e2e/tests/test_mcp_demo.py +++ b/tests_e2e/tests/test_mcp_demo.py @@ -101,25 +101,25 @@ def test_mcp_server_tool_usage_with_nvidia_text(tmp_path: Path) -> None: assert tool_call_messages tool_calls: list[dict[str, object]] = [] - tool_call_indices: dict[str, int] = {} + tool_call_positions: dict[str, tuple[int, int]] = {} for msg_index, msg in enumerate(trace): if not isinstance(msg, dict): continue if msg.get("role") != "assistant": continue - for tool_call in msg.get("tool_calls") or []: + for tool_call_index, tool_call in enumerate(msg.get("tool_calls") or []): if not isinstance(tool_call, dict): continue tool_calls.append(tool_call) function = tool_call.get("function") or {} if isinstance(function, dict): name = function.get("name") - if isinstance(name, str) and name not in tool_call_indices: - tool_call_indices[name] = msg_index + if isinstance(name, str) and name not in tool_call_positions: + tool_call_positions[name] = (msg_index, tool_call_index) - assert tool_call_indices.get("get_fact") is not None - assert tool_call_indices.get("add_numbers") is not None - assert tool_call_indices["get_fact"] < tool_call_indices["add_numbers"] + assert tool_call_positions.get("get_fact") is not None + assert tool_call_positions.get("add_numbers") is not None + assert tool_call_positions["get_fact"] < tool_call_positions["add_numbers"] def _tool_call_to_name_args(tool_call: dict[str, object]) -> tuple[str | None, dict[str, object]]: function = tool_call.get("function")