From 4387364225c2f582487f866fecc66b8919cc7bbb Mon Sep 17 00:00:00 2001 From: Chris Lundquist Date: Wed, 11 Mar 2026 23:24:55 -0700 Subject: [PATCH] refactor: remove GPU coordinator from parallel scheduler, fix streaming backpressure The parallel GPU coordinator serialized entropy encoding on one thread, bottlenecking at 28 MiB/s. The streaming path's GPU coordinator with adaptive backpressure is the correct architecture for GPU acceleration. Changes: - Remove GPU coordinator from parallel.rs (~900 lines): GpuRequest enum, StageGpu/FusedGpu task variants, gpu_fused_span, pressure_inc/dec, should_route_block_to_gpu_stage0, complete_gpu_stage, and all GPU channel/routing logic. Parallel scheduler is now CPU-only. - Fix one-way backpressure ratchet in streaming.rs: coordinator now decrements pressure by batch_len on completion, preventing permanent GPU lockout (routing went from ~16% to ~62% on mozilla corpus). - Honor Backend::WebGpu in compress_with_options: multi-block GPU requests route through compress_stream with in-memory Cursor I/O, producing framed-format output (transparently decompressible). - Flatten UnifiedTask from single-variant enum to type alias. - Simplify complete_task_lifecycle (remove dead next_task parameter). - Remove dead GPU telemetry methods and atomic fields from LocalSchedulerStats; retain public UnifiedSchedulerStats fields for API stability. - Add criterion benchmark comparing parallel vs streaming GPU paths. - Update CLAUDE.md, DESIGN.md, gpu-strategy.md, pipeline-architecture.md. Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 6 +- Cargo.toml | 4 + benches/gpu_parallel_vs_streaming.rs | 142 ++++ docs/DESIGN.md | 2 +- docs/design-docs/gpu-strategy.md | 34 +- docs/design-docs/pipeline-architecture.md | 33 +- src/pipeline/mod.rs | 32 +- src/pipeline/parallel.rs | 942 ++-------------------- src/pipeline/parallel_tests.rs | 63 +- src/pipeline/telemetry.rs | 35 +- src/streaming.rs | 19 +- 11 files changed, 285 insertions(+), 1027 deletions(-) create mode 100644 benches/gpu_parallel_vs_streaming.rs diff --git a/CLAUDE.md b/CLAUDE.md index d79ca2b..de7dfbd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -84,7 +84,7 @@ pz compresses 2.5–7x faster than gzip with ~2pp ratio gap. Decompress is faste - `src/lib.rs` — crate root, `PzError`/`PzResult` types - `src/lz_token.rs` — universal `LzToken` type, `TokenEncoder` trait, three encoder implementations -- `src/{algorithm}.rs` — one file per composable algorithm (bwt, crc32, fse, huffman, lz77, lzseq, lzss, mtf, rans, rle) +- `src/{algorithm}.rs` — one file per composable algorithm (bwt, crc32, fse, huffman, lz77, lzseq, lzss, lz_token, mtf, rans, rle, sortlz, recoil) - `src/analysis.rs` — data profiling (entropy, match density, run ratio, autocorrelation) - `src/optimal.rs` — optimal parsing (GPU top-K + backward DP) - `src/simd.rs` — SIMD decode paths for rANS @@ -102,8 +102,8 @@ pz compresses 2.5–7x faster than gzip with ~2pp ratio gap. Decompress is faste Before optimizing GPU code paths, read this first — multiple agents have spent full sessions rediscovering these: - **GPU entropy (rANS/FSE) is slower than CPU** — 0.77x on encode, 0.54x on decode. This has been proven across 500+ optimization iterations. The serial state dependency in rANS limits GPU to ~300 threads; saturation needs ~8K-16K. Do not attempt to batch, parallelize, or "pipeline" GPU entropy encoding. -- **`gpu_fused_span()` returning `Some((0,1))` is counterproductive** — it routes entropy to GPU (slower). It exists as architectural prep for if GPU entropy ever becomes competitive. The `GPU_ENTROPY_THRESHOLD` (256KB > default GPU block size 128KB) intentionally prevents this path from activating. -- **The CLI uses `streaming::compress_stream`, not `pipeline::compress_with_options`** — the parallel scheduler's GPU coordinator is not invoked by the CLI. The streaming path deliberately uses CPU rANS for entropy. +- **The parallel scheduler (`compress_with_options`) is CPU-only** — the GPU coordinator was removed because it serialized entropy encoding on one thread, bottlenecking at 28 MiB/s. GPU-accelerated compression uses the streaming path (`compress_stream`) which has a dedicated GPU coordinator with adaptive backpressure. Do not re-add a GPU coordinator to the parallel path. +- **The CLI uses `streaming::compress_stream`, not `pipeline::compress_with_options`** — the streaming path handles GPU match-finding via a coordinator thread with adaptive backpressure that decrements on batch completion. Workers use CPU for entropy. - **The real GPU win (ring-buffered LZ77 batching) is already shipped** — delivers +7-17% throughput. See `docs/design-docs/gpu-strategy.md`. - **GPU device init time skews throughput benchmarks** — first-call GPU init adds significant overhead that `bench.sh` captures but Criterion amortizes across iterations. When comparing GPU vs CPU throughput, use Criterion (`cargo bench`) for apples-to-apples; `bench.sh` reflects real-world cold-start cost. Don't chase "GPU is slower" regressions that are really just init time. - **Compression ratio is limited by wire encoding overhead, not match quality** — the LZ match-finder finds good matches. The legacy Lz77Encoder (5-byte per match) was the worst offender; LzSeqEncoder (log2-coded, 6 streams) is much better but still ~2pp behind gzip on Silesia (34.4% vs 32.2%). Further ratio gains require encoding format work, not matcher tuning. diff --git a/Cargo.toml b/Cargo.toml index d0ae0e6..78ecbcd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,6 +109,10 @@ harness = false name = "stages_match_finders" harness = false +[[bench]] +name = "gpu_parallel_vs_streaming" +harness = false + [profile.profiling] inherits = "release" debug = true diff --git a/benches/gpu_parallel_vs_streaming.rs b/benches/gpu_parallel_vs_streaming.rs new file mode 100644 index 0000000..dd6ad90 --- /dev/null +++ b/benches/gpu_parallel_vs_streaming.rs @@ -0,0 +1,142 @@ +/// Criterion benchmark: GPU parallel (in-memory) vs GPU streaming for Lzf. +/// +/// Compares four paths at 4 MB and 16 MB: +/// 1. parallel CPU-only (compress_with_options, threads=0) +/// 2. parallel GPU (compress_with_options, Backend::WebGpu, threads=0) +/// 3. streaming CPU-only (compress_stream, threads=0) +/// 4. streaming GPU (compress_stream, Backend::WebGpu, threads=0) +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use pz::pipeline::{self, CompressOptions, Pipeline}; +use pz::streaming; +use std::io::Cursor; +use std::path::Path; +use std::time::Duration; + +fn get_test_data_sized(size: usize) -> Vec { + let manifest = Path::new(env!("CARGO_MANIFEST_DIR")); + let base = ["alice29.txt", "cantrbry.tar"] + .iter() + .filter_map(|name| { + let p = manifest.join("samples").join(name); + std::fs::read(&p).ok().filter(|d| !d.is_empty()) + }) + .next() + .unwrap_or_else(|| b"The quick brown fox jumps over the lazy dog. ".repeat(3000)); + + let mut data = Vec::with_capacity(size); + while data.len() < size { + let remaining = size - data.len(); + data.extend_from_slice(&base[..remaining.min(base.len())]); + } + data +} + +fn bench_parallel_vs_streaming(c: &mut Criterion) { + let pipeline = Pipeline::Lzf; + + #[cfg(feature = "webgpu")] + let engine = { + use pz::webgpu::WebGpuEngine; + match WebGpuEngine::new() { + Ok(e) => Some(std::sync::Arc::new(e)), + Err(e) => { + eprintln!("gpu_parallel_vs_streaming: no GPU device ({e}), GPU benches skipped"); + None + } + } + }; + + for &size in &[4_194_304usize, 16_777_216] { + let data = get_test_data_sized(size); + let label = format!("{}MB", size / (1024 * 1024)); + + let mut group = c.benchmark_group(format!("gpu_par_vs_stream_{label}")); + group.warm_up_time(Duration::from_secs(3)); + group.measurement_time(Duration::from_secs(8)); + group.sample_size(10); + group.throughput(Throughput::Bytes(size as u64)); + + // --- parallel CPU --- + { + let opts = CompressOptions { + threads: 0, + ..Default::default() + }; + group.bench_with_input( + BenchmarkId::new("parallel_cpu", &label), + &data, + |b, data| { + b.iter(|| pipeline::compress_with_options(data, pipeline, &opts).unwrap()); + }, + ); + } + + // --- streaming CPU --- + { + let opts = CompressOptions { + threads: 0, + ..Default::default() + }; + group.bench_with_input( + BenchmarkId::new("streaming_cpu", &label), + &data, + |b, data| { + b.iter(|| { + let input = Cursor::new(data); + let mut output = Vec::with_capacity(data.len()); + streaming::compress_stream(input, &mut output, pipeline, &opts).unwrap(); + output + }); + }, + ); + } + + // --- parallel GPU --- + #[cfg(feature = "webgpu")] + if let Some(ref engine) = engine { + use pz::pipeline::Backend; + let opts = CompressOptions { + backend: Backend::WebGpu, + threads: 0, + webgpu_engine: Some(engine.clone()), + ..Default::default() + }; + group.bench_with_input( + BenchmarkId::new("parallel_gpu", &label), + &data, + |b, data| { + b.iter(|| pipeline::compress_with_options(data, pipeline, &opts).unwrap()); + }, + ); + } + + // --- streaming GPU --- + #[cfg(feature = "webgpu")] + if let Some(ref engine) = engine { + use pz::pipeline::Backend; + let opts = CompressOptions { + backend: Backend::WebGpu, + threads: 0, + webgpu_engine: Some(engine.clone()), + ..Default::default() + }; + group.bench_with_input( + BenchmarkId::new("streaming_gpu", &label), + &data, + |b, data| { + b.iter(|| { + let input = Cursor::new(data); + let mut output = Vec::with_capacity(data.len()); + streaming::compress_stream(input, &mut output, pipeline, &opts).unwrap(); + output + }); + }, + ); + } + + group.finish(); + } +} + +criterion_group!(benches, bench_parallel_vs_streaming); +criterion_main!(benches); diff --git a/docs/DESIGN.md b/docs/DESIGN.md index a4fb08c..1fa0880 100644 --- a/docs/DESIGN.md +++ b/docs/DESIGN.md @@ -180,7 +180,7 @@ To understand how a pipeline works, trace a single block from `compress_block()` - Account for staging buffers and padding/alignment - Use `scripts/gpu-meminfo.sh` to analyze actual allocations -**GPU scheduling:** All GPU work is routed through a single unified scheduler (`compress_parallel_unified` in `pipeline/parallel.rs`). A dedicated GPU coordinator thread batch-collects requests from CPU workers. The `UnifiedTask` enum has three variants: `Stage` (CPU), `StageGpu` (single GPU stage), and `FusedGpu` (multi-stage GPU execution). Workers use `try_send()` on a bounded channel with CPU fallback to avoid deadlock. +**GPU scheduling:** GPU-accelerated compression uses the **streaming path** (`compress_stream` in `streaming.rs`), which spawns a dedicated GPU coordinator thread that batch-collects LZ77 match-finding requests from CPU workers via a bounded channel with adaptive backpressure. The **parallel scheduler** (`compress_parallel` in `pipeline/parallel.rs`) is CPU-only by design — it achieves higher throughput by avoiding GPU dispatch overhead. When `compress_with_options` is called with `Backend::WebGpu`, it routes through `compress_stream` internally. ### Feature Flags and Build Configurations diff --git a/docs/design-docs/gpu-strategy.md b/docs/design-docs/gpu-strategy.md index 7678d24..127f526 100644 --- a/docs/design-docs/gpu-strategy.md +++ b/docs/design-docs/gpu-strategy.md @@ -72,29 +72,37 @@ The GPU rANS performance gate (Slice 4 in PLAN-p0a) remains FAIL. Recommendation GPU Huffman encode (`huffman_encode.wgsl`) and FSE encode/decode (`fse_encode.wgsl`, `fse_decode.wgsl`) exist in production. These have the same serial-per-stream limitation as rANS but are used for specific pipelines (Deflate uses Huffman, Lzf uses FSE). Their per-stream performance has not been systematically compared to CPU in the same way as rANS. -## Current architecture: unified scheduler +## Current architecture: dual-path GPU support -All GPU work routes through `compress_parallel_unified()` in `src/pipeline/parallel.rs` (PR #101). The architecture: +GPU-accelerated compression uses the **streaming path** (`compress_stream` in `streaming.rs`), not the parallel scheduler. The parallel scheduler (`compress_parallel` in `parallel.rs`) is CPU-only by design — its GPU coordinator was removed after proving it bottlenecked at 28 MiB/s due to serializing entropy encoding on a single thread. + +When `compress_with_options` is called with `Backend::WebGpu`, it routes through `compress_stream` internally (producing framed-format output, which `decompress` handles natively). + +### Streaming GPU coordinator + +For LZ-demux pipelines (Lzf, LzSeqR, etc.), `compress_stream_parallel` spawns a GPU coordinator thread: ``` -CPU workers: pick up tasks from shared VecDeque - ├─ Stage(stage_idx, block_idx) → run on CPU - ├─ StageGpu(stage_idx, block_idx) → send to GPU coordinator via bounded channel - └─ FusedGpu(start, end, block_idx) → send to GPU coordinator +Workers: read blocks → if GPU available and pressure < limit, + try_send to GPU coordinator via bounded channel + → else, compress on CPU GPU coordinator thread: - ├─ Batch-collects Stage 0 requests → find_matches_batched() (ring-buffered) - ├─ Processes Stage N requests → run_compress_stage() individually - └─ Processes Fused requests → runs stages start..=end sequentially + 1. Block on gpu_rx.recv() for first request + 2. Drain additional requests via gpu_rx.try_recv() + 3. Batch blocks → engine.find_matches_batched() + 4. Demux matches → compress_block_from_demux() (CPU entropy) + 5. Send results to output_tx for ordered writing + 6. Decrement backpressure by batch_len (enables worker → GPU flow) ``` -**Deadlock prevention**: Workers use `try_send()` on the bounded channel. If the channel is full, the block falls back to CPU processing. +**Adaptive backpressure**: An `AtomicUsize` pressure score gates worker → GPU routing. Workers increment on `try_send` Full (+2), decrement on Ok (-1). The coordinator decrements by `batch_len` after completing each batch, preventing the one-way ratchet that would permanently lock out GPU. -**GPU failure recovery**: If any GPU operation fails, the block is re-enqueued as `Stage(0, block_idx)` for CPU retry. +**GPU failure recovery**: If `find_matches_batched` fails, all blocks in the batch fall back to CPU via `compress_block`. -### The FusedGpu path problem +### GPU entropy is not used -`gpu_fused_span()` currently returns `Some((0, 1))` for Lzr and LzSeqR, routing both stages to GPU. But since GPU entropy is 0.77x CPU encode, this actually makes those pipelines *slower* than the decomposed path (GPU LZ77 + CPU entropy). The fused path is architectural preparation for the case where GPU entropy becomes competitive — it is not a performance win today. +GPU entropy (rANS/FSE) is 0.54-0.77x CPU throughput due to serial state dependencies. All entropy encoding runs on CPU — the GPU is only used for LZ77 match-finding. ## What would need to change for GPU to win diff --git a/docs/design-docs/pipeline-architecture.md b/docs/design-docs/pipeline-architecture.md index ecd47f4..260d5ad 100644 --- a/docs/design-docs/pipeline-architecture.md +++ b/docs/design-docs/pipeline-architecture.md @@ -189,35 +189,30 @@ Synchronization: Join all threads before writing container **When used:** Default for >256KB inputs with multiple CPU cores -### 2. Unified GPU Coordinator +### 2. GPU Coordinator (Streaming Path) -**File:** `src/pipeline/parallel.rs` - `compress_parallel_unified()` +**File:** `src/streaming.rs` - `compress_stream_parallel()` -A single GPU coordinator thread handles all GPU work. CPU workers send GPU requests via a bounded `SyncSender`; the coordinator batch-collects and dispatches: +The parallel scheduler (`compress_parallel` in `parallel.rs`) is **CPU-only**. GPU-accelerated compression uses the streaming path, which spawns a dedicated GPU coordinator thread for LZ-demux pipelines: ``` -CPU workers: split input → enqueue StageGpu(0, block_idx) - or FusedGpu(0, 1, block_idx) +Workers: read blocks → try_send to GPU coordinator (bounded channel) + → CPU fallback on Full or when pressure >= limit GPU coordinator: - 1. Block on rx.recv() for first request - 2. Drain up to ring_depth more Stage0 requests via try_recv() - 3. Batch Stage0 blocks → find_matches_batched() (ring-buffered overlap) - 4. Demux results → push Stage(1, block_idx) to unified queue - 5. Process StageN requests via run_compress_stage() - 6. Process Fused requests: run stages start..=end sequentially on GPU + 1. Block on gpu_rx.recv() for first request + 2. Drain additional requests via gpu_rx.try_recv() + 3. Batch blocks → engine.find_matches_batched() + 4. Demux matches → compress_block_from_demux() (CPU entropy) + 5. Send compressed results to output_tx for ordered writing + 6. Decrement backpressure by batch_len ``` -**Task variants:** `UnifiedTask` enum routes work: -- `Stage(stage_idx, block_idx)` — CPU worker picks up -- `StageGpu(stage_idx, block_idx)` — GPU coordinator handles -- `FusedGpu(start, end, block_idx)` — GPU runs multiple stages without queue round-trips +**Adaptive backpressure:** AtomicUsize pressure score ramps up on Full (+2), down on Ok (-1) per worker, and coordinator decrements by batch_len on completion. Prevents one-way ratchet lockout. -**Deadlock prevention:** Workers use `try_send()` with CPU fallback when the bounded GPU channel is full +**GPU-to-CPU fallback:** If `find_matches_batched` fails, all batch blocks fall back to CPU via `compress_block`. -**GPU-to-CPU fallback:** If GPU operations fail, blocks are re-enqueued as `Stage(0, block_idx)` for CPU retry - -**When used:** GPU available AND input ≥ 256KB +**When used:** `compress_stream` with `Backend::WebGpu` + LZ-demux pipeline. Also used internally by `compress_with_options` when `Backend::WebGpu` is requested (routes through `compress_stream` with in-memory I/O). ## Auto-Selection Strategy diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 510eb07..0a88541 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -417,11 +417,16 @@ pub fn compress(input: &[u8], pipeline: Pipeline) -> PzResult> { /// When `threads` is 1 or the input fits in a single block, a single block /// is compressed without thread overhead. /// -/// The output always uses the multi-block container format (V2) with a -/// block table, even for single-block streams. +/// Output format depends on the path taken: +/// - **CPU parallel**: table-mode V2 container (block table + block data) +/// - **GPU parallel**: framed V2 container (self-framing blocks, routed +/// through the streaming path's GPU coordinator) +/// +/// Both formats are readable by [`decompress`] and [`decompress_with_options`]. /// /// When `options.backend` is `Backend::WebGpu` and an engine is provided, -/// GPU-amenable stages (e.g., LZ77 match finding) run on the GPU. +/// GPU-amenable stages (e.g., LZ77 match finding) run on the GPU via the +/// streaming path's coordinator thread with adaptive backpressure. /// Other stages and decompression always use the CPU. pub fn compress_with_options( input: &[u8], @@ -483,7 +488,26 @@ pub fn compress_with_options( return Ok(output); } - // Multi-block parallel: unified scheduler dispatches all stages from a shared work queue. + // Multi-block parallel with GPU: route through the streaming path which + // has the GPU coordinator thread with adaptive backpressure. The parallel + // scheduler is CPU-only by design (see CLAUDE.md). Output uses framed + // format, which decompress()/decompress_with_options() handle natively. + #[cfg(feature = "webgpu")] + if matches!(options.backend, Backend::WebGpu) && options.webgpu_engine.is_some() { + let reader = std::io::Cursor::new(input); + let mut buf = Vec::new(); + crate::streaming::compress_stream(reader, &mut buf, pipeline, options).map_err( + |e| match e { + crate::streaming::StreamError::Pz(pe) => pe, + // Unreachable: in-memory Cursor I/O cannot produce IO errors. + crate::streaming::StreamError::Io(_) => PzError::InvalidInput, + }, + )?; + return Ok(buf); + } + + // Multi-block parallel (CPU-only): unified scheduler dispatches all stages + // from a shared work queue. compress_parallel(input, pipeline, options, num_threads) } diff --git a/src/pipeline/parallel.rs b/src/pipeline/parallel.rs index 5b14410..bbf1a0d 100644 --- a/src/pipeline/parallel.rs +++ b/src/pipeline/parallel.rs @@ -1,19 +1,17 @@ //! Multi-block parallel compression and decompression. //! //! **Unified scheduler**: a single shared work queue executes all pipeline stages from a -//! worker pool. Supports N-stage pipelines (2-stage LZ, 4-stage BWT) and GPU dispatch -//! via a dedicated GPU coordinator thread when a WebGPU device is available. +//! CPU worker pool. Workers process blocks through all stages using local continuation, +//! only round-tripping through the queue when stealing work from other blocks. //! -//! GPU and CPU encoders are interchangeable: the same queue handles both, with the -//! coordinator thread dispatching GPU work via `run_compress_stage` and feeding -//! results back into the queue for subsequent stages. +//! GPU-accelerated compression is handled by the **streaming** path +//! (`streaming::compress_stream`), which has a dedicated GPU coordinator thread with +//! adaptive backpressure. The in-memory parallel path is CPU-only by design — it +//! achieves higher throughput (6+ GiB/s) by avoiding GPU dispatch overhead. use crate::{PzError, PzResult}; use std::collections::VecDeque; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, Condvar, Mutex, -}; +use std::sync::{atomic::Ordering, Arc, Condvar, Mutex}; use std::time::Instant; use super::stages::{run_compress_stage, StageBlock, StageMetadata}; @@ -24,9 +22,7 @@ use super::{write_header, CompressOptions, DecompressOptions, Pipeline, BLOCK_HE /// Multi-block parallel compression. /// -/// All pipelines flow through the unified scheduler. When a WebGPU device is -/// available, a dedicated GPU coordinator thread handles ring-buffered GPU -/// submissions while the CPU worker pool continues processing other blocks. +/// All pipelines flow through the unified scheduler with CPU-only workers. /// /// Format after the 8-byte container header: /// - num_blocks: u32 LE @@ -67,15 +63,8 @@ fn assemble_multiblock_container( output } -#[derive(Copy, Clone)] -#[cfg_attr(not(feature = "webgpu"), allow(dead_code))] -enum UnifiedTask { - Stage(usize, usize), // (stage_idx, block_idx) — CPU execution - StageGpu(usize, usize), // (stage_idx, block_idx) — route to GPU coordinator - /// Fused GPU execution: run stages start..=end on the GPU coordinator - /// without intermediate queue round-trips. (stage_start, stage_end, block_idx) - FusedGpu(usize, usize, usize), -} +/// A task in the unified work queue: `(stage_idx, block_idx)`. +type UnifiedTask = (usize, usize); /// Number of compression stages for a pipeline in the unified scheduler. fn unified_stage_count(pipeline: Pipeline) -> usize { @@ -94,179 +83,39 @@ struct UnifiedQueueState { failed: bool, } -fn should_route_block_to_gpu_entropy_with_backpressure( - block_len: usize, - stage1_backend: super::BackendAssignment, - has_gpu_entropy: bool, - auto_backpressure_score: usize, - auto_backpressure_limit: usize, -) -> bool { - #[cfg(feature = "webgpu")] - { - use super::{BackendAssignment, GPU_ENTROPY_THRESHOLD}; - match stage1_backend { - BackendAssignment::Gpu => has_gpu_entropy, - BackendAssignment::Cpu => false, - BackendAssignment::Auto => { - has_gpu_entropy - && block_len >= GPU_ENTROPY_THRESHOLD - && auto_backpressure_score < auto_backpressure_limit - } - } - } - #[cfg(not(feature = "webgpu"))] - { - let _ = block_len; - let _ = stage1_backend; - let _ = has_gpu_entropy; - let _ = auto_backpressure_score; - let _ = auto_backpressure_limit; - false - } -} - -#[cfg(feature = "webgpu")] -#[inline] -fn pressure_inc(score: &AtomicUsize, delta: usize) { - score.fetch_add(delta, Ordering::Relaxed); -} - -#[cfg(feature = "webgpu")] -#[inline] -fn pressure_dec(score: &AtomicUsize) { - let _ = score.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| { - Some(v.saturating_sub(1)) - }); -} - -/// Determine whether to route a block's Stage 0 (LZ77/LzSeq match-finding) to GPU. -/// -/// Returns true for LZ77-based and LzSeq-based pipelines when the GPU backend -/// is active, the block meets size constraints, and the stage0_backend allows it. -#[cfg(feature = "webgpu")] -fn should_route_block_to_gpu_stage0( - block: &[u8], - pipeline: Pipeline, - options: &CompressOptions, -) -> bool { - use super::BackendAssignment; - - // Only LZ-based pipelines have GPU Stage 0 support - let is_gpu_stage0_pipeline = matches!( - pipeline, - Pipeline::Lzf | Pipeline::Lzfi | Pipeline::LzssR | Pipeline::LzSeqR | Pipeline::LzSeqH - ); - - if !is_gpu_stage0_pipeline { - return false; - } - - // Optimal parse uses GPU top-K which runs through run_compress_stage already - if options.parse_strategy == super::ParseStrategy::Optimal { - return false; - } - - match options.stage0_backend { - BackendAssignment::Gpu => options.webgpu_engine.is_some(), - BackendAssignment::Cpu => false, - BackendAssignment::Auto => { - if let Some(ref engine) = options.webgpu_engine { - block.len() >= crate::webgpu::MIN_GPU_INPUT_SIZE - && block.len() <= engine.max_dispatch_input_size() - } else { - false - } - } - } -} - -/// Determine whether a pipeline can fuse all its stages on GPU. -/// -/// Returns `Some((start, end))` when every stage in `start..=end` has a GPU -/// implementation, allowing the GPU coordinator to run them sequentially -/// without intermediate queue round-trips or CPU readback. -/// -/// **WARNING — GPU entropy is currently SLOWER than CPU** (0.77x encode, 0.54x decode). -/// This function exists as architectural preparation for when GPU entropy becomes -/// competitive. In practice, `GPU_ENTROPY_THRESHOLD` (256KB) is deliberately set above -/// `DEFAULT_GPU_BLOCK_SIZE` (128KB) so the fused path is never activated for GPU blocks. -/// Lowering the threshold or bypassing it will REGRESS throughput by ~10-15x because -/// it serializes all work onto the single coordinator thread AND uses the slower GPU -/// entropy path. See `docs/design-docs/gpu-strategy.md` and -/// `.claude/feedback/2026-03-01-gpu-wins-on-lz77-loses-on-entropy.md`. -#[cfg(feature = "webgpu")] -fn gpu_fused_span(pipeline: Pipeline) -> Option<(usize, usize)> { - match pipeline { - // Both stages have GPU paths: LzSeq fused match+demux + rANS encode - // NOTE: GPU rANS is slower than CPU — fused path is gated by GPU_ENTROPY_THRESHOLD - Pipeline::LzSeqR => Some((0, 1)), - _ => None, - } -} - -/// Message sent from unified workers to the GPU coordinator thread. -#[cfg(feature = "webgpu")] -enum GpuRequest { - /// Stage 0: LZ77/LzSeq match-finding on GPU. - /// Carries only `block_idx`: the coordinator reads immutable input slices - /// directly from the parent-scoped `blocks` array to avoid payload copies. - Stage0(usize), - /// Stage 1+: entropy encoding on GPU. - /// (stage_idx, block_idx, stage_block) - StageN(usize, usize, StageBlock), - /// Fused: run stages start..=end on GPU without queue round-trips. - /// (stage_start, stage_end, block_idx) - Fused(usize, usize, usize), -} - -/// Apply unified queue completion semantics for one finished task. +/// Retire one task from the unified queue after a block's final stage completes. /// /// Returns `(mark_invalid_after_lock, should_return_worker)`. fn complete_task_lifecycle( guard: &mut UnifiedQueueState, queue_cv: &Condvar, - next_task: Option, stage_failed: bool, - mark_invalid_on_failed_final: bool, ) -> (bool, bool) { let mut mark_invalid_after_lock = false; - let mut should_return = false; - if stage_failed { - if !guard.failed { - guard.failed = true; - let dropped = guard.queue.len(); - guard.queue.clear(); - guard.pending_tasks = guard.pending_tasks.saturating_sub(dropped); - queue_cv.notify_all(); - } - debug_assert!(guard.pending_tasks > 0); - guard.pending_tasks -= 1; - } else if let Some(task) = next_task { - if !guard.failed { - // Current task transitions into next stage; pending is unchanged. - guard.queue.push_back(task); - queue_cv.notify_one(); - } else { - // Scheduler already failed; retire this task. - debug_assert!(guard.pending_tasks > 0); - guard.pending_tasks -= 1; - mark_invalid_after_lock = true; - } - } else { - if mark_invalid_on_failed_final && guard.failed { - mark_invalid_after_lock = true; - } - // Final-stage success retires one pending task. - debug_assert!(guard.pending_tasks > 0); - guard.pending_tasks -= 1; + if stage_failed && !guard.failed { + guard.failed = true; + let dropped = guard.queue.len(); + guard.queue.clear(); + guard.pending_tasks = guard.pending_tasks.saturating_sub(dropped); + queue_cv.notify_all(); + } + + if guard.failed && !stage_failed { + // Block completed successfully but scheduler already failed elsewhere. + mark_invalid_after_lock = true; } - if guard.pending_tasks == 0 { + debug_assert!(guard.pending_tasks > 0); + guard.pending_tasks -= 1; + + let should_return = if guard.pending_tasks == 0 { guard.closed = true; queue_cv.notify_all(); - should_return = true; - } + true + } else { + false + }; (mark_invalid_after_lock, should_return) } @@ -294,6 +143,14 @@ fn compress_parallel_unified( resolved_options.max_match_len = Some(super::resolve_max_match_len(pipeline, options)); } + // Safety net: compress_with_options routes GPU requests through + // compress_stream before reaching here, but force CPU as defense-in-depth. + #[cfg(feature = "webgpu")] + { + resolved_options.backend = super::Backend::Cpu; + resolved_options.webgpu_engine = None; + } + // Per-block intermediate slot: holds the StageBlock between stages. // A block is only in one stage at a time, so one slot per block suffices. let intermediate_slots: Vec>> = @@ -301,60 +158,8 @@ fn compress_parallel_unified( let results: Vec>>>> = (0..num_blocks).map(|_| Mutex::new(None)).collect(); - // Determine if any GPU routing is possible for initial task enqueue. - #[cfg(feature = "webgpu")] - let has_gpu = - matches!(options.backend, super::Backend::WebGpu) && options.webgpu_engine.is_some(); - #[cfg(not(feature = "webgpu"))] - let has_gpu = false; - #[cfg(feature = "webgpu")] - let gpu_auto_backpressure = if has_gpu { - Some(Arc::new(AtomicUsize::new(0))) - } else { - None - }; - #[cfg(not(feature = "webgpu"))] - let gpu_auto_backpressure: Option> = None; - #[cfg(feature = "webgpu")] - let gpu_auto_backpressure_limit = worker_count.saturating_mul(2).max(4); - #[cfg(not(feature = "webgpu"))] - let gpu_auto_backpressure_limit = 0usize; - - // Determine if this pipeline can fuse all GPU stages. - #[cfg(feature = "webgpu")] - let fused_span = if has_gpu { - gpu_fused_span(pipeline) - } else { - None - }; - #[cfg(not(feature = "webgpu"))] - let fused_span: Option<(usize, usize)> = None; - - // Build initial task queue with GPU routing for Stage 0 where applicable. - let initial_tasks: VecDeque = (0..num_blocks) - .map(|i| { - #[cfg(feature = "webgpu")] - if has_gpu && should_route_block_to_gpu_stage0(blocks[i], pipeline, &resolved_options) { - // If the pipeline supports fusion and entropy also qualifies for GPU, - // fuse all stages into a single GPU coordinator dispatch. - if let Some((start, end)) = fused_span { - if should_route_block_to_gpu_entropy_with_backpressure( - blocks[i].len(), - resolved_options.stage1_backend, - resolved_options.webgpu_engine.is_some(), - 0, - gpu_auto_backpressure_limit, - ) { - return UnifiedTask::FusedGpu(start, end, i); - } - } - return UnifiedTask::StageGpu(0, i); - } - let _ = has_gpu; - let _ = fused_span; - UnifiedTask::Stage(0, i) - }) - .collect(); + // All blocks start at Stage 0 on CPU. + let initial_tasks: VecDeque = (0..num_blocks).map(|i| (0, i)).collect(); let queue = Mutex::new(UnifiedQueueState { queue: initial_tasks, @@ -364,367 +169,8 @@ fn compress_parallel_unified( }); let queue_cv = Condvar::new(); - // GPU channel: created before the scope so workers can borrow the sender. - // The GPU coordinator receives from `gpu_rx`, workers send via `gpu_tx`. - #[cfg(feature = "webgpu")] - let (gpu_tx, gpu_rx) = if has_gpu { - // Use a modestly deeper channel than the previous fixed depth=4 to - // reduce transient try_send(Full) fallbacks under bursty mixed-stage - // loads without unbounded buffering. - let ring_depth = num_blocks - .min(worker_count.saturating_mul(2).max(1)) - .clamp(1, 16); - let (tx, rx) = std::sync::mpsc::sync_channel::(ring_depth); - (Some(tx), Some(rx)) - } else { - (None, None) - }; - #[cfg(not(feature = "webgpu"))] - let gpu_tx: Option<()> = None; - std::thread::scope(|scope| { - // GPU coordinator thread: spawned only when GPU is available. - // Receives GPU work via a bounded channel, manages ring buffers, - // and feeds completed results back into the unified queue. - #[cfg(feature = "webgpu")] - if let Some(rx) = gpu_rx { - let queue_ref = &queue; - let cv_ref = &queue_cv; - let slots_ref = &intermediate_slots; - let results_ref = &results; - let opts = resolved_options.clone(); - let stats_ref = stats_local.clone(); - let gpu_pressure_ref = gpu_auto_backpressure.clone(); - let gpu_pressure_limit = gpu_auto_backpressure_limit; - - scope.spawn(move || { - let engine = opts.webgpu_engine.as_ref().unwrap(); - let uses_lz77_demux = false; // No current pipeline uses the Lz77 demuxer - let uses_sortlz_match_finder = - opts.match_finder == super::MatchFinder::SortLz && uses_lz77_demux; - - while let Ok(first) = rx.recv() { - // Batch-collect: drain additional pending requests. - let mut stage0_batch: Vec = Vec::new(); - let mut stage_n_queue: Vec<(usize, usize, StageBlock)> = Vec::new(); - let mut fused_queue: Vec<(usize, usize, usize)> = Vec::new(); - - // Classify the first request. - match first { - GpuRequest::Stage0(b) => stage0_batch.push(b), - GpuRequest::StageN(s, b, sb) => stage_n_queue.push((s, b, sb)), - GpuRequest::Fused(s, e, b) => fused_queue.push((s, e, b)), - } - - // Non-blocking drain of additional requests. - while let Ok(req) = rx.try_recv() { - match req { - GpuRequest::Stage0(b) => stage0_batch.push(b), - GpuRequest::StageN(s, b, sb) => stage_n_queue.push((s, b, sb)), - GpuRequest::Fused(s, e, b) => fused_queue.push((s, e, b)), - } - } - - // Process Stage N requests first (fairness): these are - // downstream continuations and completing them reduces - // in-flight work / pending pressure. - for (stage_idx, block_idx, sb) in stage_n_queue { - let t0 = Instant::now(); - let result = run_compress_stage(pipeline, stage_idx, sb, &opts); - if let Some(stats) = stats_ref.as_ref() { - stats.add_stage_compute(t0.elapsed()); - } - if result.is_err() { - // GPU entropy failed — restart from stage 0 on CPU. - // The intermediate StageBlock is consumed so we cannot - // retry just this stage; re-enqueue from scratch. - eprintln!( - "[pz-gpu] stage {stage_idx} failed for block {block_idx}; \ - retrying from stage 0 on CPU" - ); - let lock_start = Instant::now(); - let mut guard = queue_ref.lock().expect("unified queue poisoned"); - if let Some(stats) = stats_ref.as_ref() { - stats.add_queue_wait(lock_start.elapsed()); - } - let admin_start = Instant::now(); - if !guard.failed { - guard.queue.push_back(UnifiedTask::Stage(0, block_idx)); - cv_ref.notify_one(); - } - if let Some(stats) = stats_ref.as_ref() { - stats.add_queue_admin(admin_start.elapsed()); - } - drop(guard); - } else { - complete_gpu_stage( - result, - stage_idx, - block_idx, - last_stage, - blocks, - &opts, - slots_ref, - results_ref, - queue_ref, - cv_ref, - stats_ref.as_deref(), - gpu_pressure_ref.as_deref(), - gpu_pressure_limit, - ); - } - } - - // Process fused requests next: run stages start..=end sequentially - // on GPU without intermediate queue round-trips. - for (stage_start, stage_end, block_idx) in fused_queue { - let block = StageBlock { - block_index: block_idx, - original_len: blocks[block_idx].len(), - data: blocks[block_idx].to_vec(), - streams: None, - metadata: StageMetadata::default(), - }; - let mut result: PzResult = Ok(block); - let mut final_stage = stage_start; - for stage in stage_start..=stage_end { - match result { - Ok(sb) => { - let t0 = Instant::now(); - result = run_compress_stage(pipeline, stage, sb, &opts); - if let Some(stats) = stats_ref.as_ref() { - stats.add_stage_compute(t0.elapsed()); - } - final_stage = stage; - } - Err(_) => break, - } - } - if result.is_err() { - // GPU fused path failed — fall back to per-stage CPU. - // Re-enqueue from stage 0 since intermediate data is consumed. - eprintln!( - "[pz-gpu] fused stages {stage_start}..={stage_end} failed \ - for block {block_idx}; retrying on CPU" - ); - let lock_start = Instant::now(); - let mut guard = queue_ref.lock().expect("unified queue poisoned"); - if let Some(stats) = stats_ref.as_ref() { - stats.add_queue_wait(lock_start.elapsed()); - } - let admin_start = Instant::now(); - if !guard.failed { - guard.queue.push_back(UnifiedTask::Stage(0, block_idx)); - cv_ref.notify_one(); - } - if let Some(stats) = stats_ref.as_ref() { - stats.add_queue_admin(admin_start.elapsed()); - } - drop(guard); - } else { - complete_gpu_stage( - result, - final_stage, - block_idx, - last_stage, - blocks, - &opts, - slots_ref, - results_ref, - queue_ref, - cv_ref, - stats_ref.as_deref(), - gpu_pressure_ref.as_deref(), - gpu_pressure_limit, - ); - } - } - - // Process Stage 0 batch last to avoid starving queued StageN/Fused - // continuations when bursts arrive together. - if !stage0_batch.is_empty() && uses_sortlz_match_finder { - // SortLZ GPU match finding: per-block dispatch → parse_matches → demux_tokens - let sortlz_config = crate::sortlz::SortLzConfig::for_lz77( - opts.max_match_len.unwrap_or(crate::lz77::LZ77_MAX_MATCH), - ); - for block_idx in stage0_batch { - let t0 = Instant::now(); - let result = engine - .sortlz_find_matches(blocks[block_idx], &sortlz_config) - .and_then(|raw_matches| { - let tokens = crate::sortlz::parse_matches( - blocks[block_idx], - &raw_matches, - true, // lazy parsing - ); - let demux = super::demux::demux_tokens( - blocks[block_idx], - &tokens, - pipeline, - )?; - Ok(StageBlock { - block_index: block_idx, - original_len: blocks[block_idx].len(), - data: Vec::new(), - streams: Some(demux.streams), - metadata: StageMetadata { - pre_entropy_len: Some(demux.pre_entropy_len), - demux_meta: demux.meta, - ..StageMetadata::default() - }, - }) - }); - if let Some(stats) = stats_ref.as_ref() { - stats.add_stage_compute(t0.elapsed()); - } - complete_gpu_stage( - result, - 0, - block_idx, - last_stage, - blocks, - &opts, - slots_ref, - results_ref, - queue_ref, - cv_ref, - stats_ref.as_deref(), - gpu_pressure_ref.as_deref(), - gpu_pressure_limit, - ); - } - } else if !stage0_batch.is_empty() && uses_lz77_demux { - let batch_blocks: Vec<&[u8]> = - stage0_batch.iter().map(|&b| blocks[b]).collect(); - let t0 = Instant::now(); - let batch_results = engine.find_matches_batched(&batch_blocks); - if let Some(stats) = stats_ref.as_ref() { - stats.add_stage_compute(t0.elapsed()); - } - match batch_results { - Ok(all_matches) => { - for (matches, block_idx) in - all_matches.into_iter().zip(stage0_batch.iter().copied()) - { - let result = super::demux::demux_lz77_matches( - blocks[block_idx], - matches, - pipeline, - ) - .map(|demux| StageBlock { - block_index: block_idx, - original_len: blocks[block_idx].len(), - data: Vec::new(), - streams: Some(demux.streams), - metadata: StageMetadata { - pre_entropy_len: Some(demux.pre_entropy_len), - demux_meta: demux.meta, - ..StageMetadata::default() - }, - }); - complete_gpu_stage( - result, - 0, - block_idx, - last_stage, - blocks, - &opts, - slots_ref, - results_ref, - queue_ref, - cv_ref, - stats_ref.as_deref(), - gpu_pressure_ref.as_deref(), - gpu_pressure_limit, - ); - } - } - Err(e) => { - // GPU batch failed — fall back to CPU for each block. - eprintln!( - "[pz-gpu] find_matches_batched failed: {e}; \ - retrying {} blocks on CPU", - stage0_batch.len() - ); - let lock_start = Instant::now(); - let mut guard = queue_ref.lock().expect("unified queue poisoned"); - if let Some(stats) = stats_ref.as_ref() { - stats.add_queue_wait(lock_start.elapsed()); - } - let admin_start = Instant::now(); - if !guard.failed { - for block_idx in &stage0_batch { - guard.queue.push_back(UnifiedTask::Stage(0, *block_idx)); - } - cv_ref.notify_all(); - } - if let Some(stats) = stats_ref.as_ref() { - stats.add_queue_admin(admin_start.elapsed()); - } - drop(guard); - } - } - } else { - // Non-LZ77 pipelines (LzSeq, LZSS): dispatch individually - // through run_compress_stage which calls lzseq_encode_gpu etc. - for block_idx in stage0_batch { - let block = StageBlock { - block_index: block_idx, - original_len: blocks[block_idx].len(), - data: blocks[block_idx].to_vec(), - streams: None, - metadata: StageMetadata::default(), - }; - let t0 = Instant::now(); - let result = run_compress_stage(pipeline, 0, block, &opts); - if let Some(stats) = stats_ref.as_ref() { - stats.add_stage_compute(t0.elapsed()); - } - if result.is_err() { - // GPU stage 0 failed — retry on CPU. - eprintln!( - "[pz-gpu] stage 0 failed for block {block_idx}; \ - retrying on CPU" - ); - let lock_start = Instant::now(); - let mut guard = queue_ref.lock().expect("unified queue poisoned"); - if let Some(stats) = stats_ref.as_ref() { - stats.add_queue_wait(lock_start.elapsed()); - } - let admin_start = Instant::now(); - if !guard.failed { - guard.queue.push_back(UnifiedTask::Stage(0, block_idx)); - cv_ref.notify_one(); - } - if let Some(stats) = stats_ref.as_ref() { - stats.add_queue_admin(admin_start.elapsed()); - } - drop(guard); - } else { - complete_gpu_stage( - result, - 0, - block_idx, - last_stage, - blocks, - &opts, - slots_ref, - results_ref, - queue_ref, - cv_ref, - stats_ref.as_deref(), - gpu_pressure_ref.as_deref(), - gpu_pressure_limit, - ); - } - } - } - } - }); - } - - // CPU worker threads — each gets a clone of the GPU sender (if any). - // Clones are cheap (Arc internally) and avoid borrow issues with scoped threads. + // CPU worker threads. for _ in 0..worker_count { let queue_ref = &queue; let cv_ref = &queue_cv; @@ -732,21 +178,18 @@ fn compress_parallel_unified( let results_ref = &results; let opts = resolved_options.clone(); let stats_ref = stats_local.clone(); - let gpu_pressure_ref = gpu_auto_backpressure.clone(); - let gpu_pressure_limit = gpu_auto_backpressure_limit; - #[cfg(feature = "webgpu")] - let gpu_tx_clone = gpu_tx.clone(); scope.spawn(move || { // Local continuation state for this worker. // When set, process the next stage directly instead of // round-tripping through the shared queue. - let mut local_task: Option<(usize, usize, Option)> = None; + let mut local_task: Option<(usize, usize, StageBlock)> = None; loop { - let (stage_idx, block_idx, inline_block) = if let Some(task) = local_task.take() + let (stage_idx, block_idx, inline_block) = if let Some((s, b, sb)) = + local_task.take() { - task + (s, b, Some(sb)) } else { let task = { let lock_start = Instant::now(); @@ -776,121 +219,8 @@ fn compress_parallel_unified( } }; - match task { - UnifiedTask::Stage(s, b) => (s, b, None), - #[allow(unused_variables)] - UnifiedTask::FusedGpu(start, end, b) => { - // Route to GPU coordinator for fused multi-stage execution. - #[cfg(feature = "webgpu")] - if let Some(ref tx) = gpu_tx_clone { - let handoff_start = Instant::now(); - let request = GpuRequest::Fused(start, end, b); - match tx.try_send(request) { - Ok(()) => { - if let Some(stats) = stats_ref.as_ref() { - stats.add_gpu_handoff(handoff_start.elapsed()); - } - if let Some(score) = gpu_pressure_ref.as_ref() { - pressure_dec(score); - } - continue; - } - Err(std::sync::mpsc::TrySendError::Full(_)) => { - if let Some(stats) = stats_ref.as_ref() { - stats.add_gpu_handoff(handoff_start.elapsed()); - stats.inc_gpu_try_send_full(); - } - if let Some(score) = gpu_pressure_ref.as_ref() { - pressure_inc(score, 2); - } - // Channel full — fall through to CPU stage start. - } - Err(std::sync::mpsc::TrySendError::Disconnected(_)) => { - if let Some(stats) = stats_ref.as_ref() { - stats.add_gpu_handoff(handoff_start.elapsed()); - stats.inc_gpu_try_send_disconnected(); - } - if let Some(score) = gpu_pressure_ref.as_ref() { - pressure_inc(score, 1); - } - // Channel closed — fall through to CPU stage start. - } - } - } - (start, b, None) - } - UnifiedTask::StageGpu(s, b) => { - #[cfg(feature = "webgpu")] - { - // Route to GPU coordinator — send and continue to next task. - if let Some(ref tx) = gpu_tx_clone { - let request = if s == 0 { - GpuRequest::Stage0(b) - } else { - let stage_block = slots_ref[b] - .lock() - .expect("intermediate slot poisoned") - .take() - .expect("intermediate result missing"); - GpuRequest::StageN(s, b, stage_block) - }; - let handoff_start = Instant::now(); - let inline_stage_block = match tx.try_send(request) { - Ok(()) => { - if let Some(stats) = stats_ref.as_ref() { - stats.add_gpu_handoff(handoff_start.elapsed()); - } - if let Some(score) = gpu_pressure_ref.as_ref() { - pressure_dec(score); - } - continue; - } - Err(std::sync::mpsc::TrySendError::Full(req)) => { - if let Some(stats) = stats_ref.as_ref() { - stats.add_gpu_handoff(handoff_start.elapsed()); - stats.inc_gpu_try_send_full(); - } - if let Some(score) = gpu_pressure_ref.as_ref() { - pressure_inc(score, 2); - } - match req { - // Keep StageN payload local for CPU fallback - // to avoid slot round-trips under pressure. - GpuRequest::StageN(_, _, sb) => Some(sb), - _ => None, - } - } - Err(std::sync::mpsc::TrySendError::Disconnected( - req, - )) => { - if let Some(stats) = stats_ref.as_ref() { - stats.add_gpu_handoff(handoff_start.elapsed()); - stats.inc_gpu_try_send_disconnected(); - } - if let Some(score) = gpu_pressure_ref.as_ref() { - pressure_inc(score, 1); - } - match req { - GpuRequest::StageN(_, _, sb) => Some(sb), - _ => None, - } - } - }; - if let Some(sb) = inline_stage_block { - (s, b, Some(sb)) - } else { - (s, b, None) - } - } else { - (s, b, None) - } - } - #[cfg(not(feature = "webgpu"))] - { - (s, b, None) - } - } - } + let (s, b) = task; + (s, b, None) }; // Build or retrieve the StageBlock for this stage. @@ -926,85 +256,9 @@ fn compress_parallel_unified( *results_ref[block_idx].lock().expect("result slot poisoned") = Some(Ok(stage_block.data)); } else { - let next_stage = stage_idx + 1; - let backpressure_score = gpu_pressure_ref - .as_ref() - .map_or(0usize, |s| s.load(Ordering::Relaxed)); - let has_gpu = { - #[cfg(feature = "webgpu")] - { - opts.webgpu_engine.is_some() - } - #[cfg(not(feature = "webgpu"))] - { - false - } - }; - let route_next_to_gpu = next_stage == last_stage - && should_route_block_to_gpu_entropy_with_backpressure( - blocks[block_idx].len(), - opts.stage1_backend, - has_gpu, - backpressure_score, - gpu_pressure_limit, - ); - - if route_next_to_gpu { - // Directly hand off StageN to GPU coordinator from this - // worker, avoiding queue and slot round-trips. - #[cfg(feature = "webgpu")] - if let Some(ref tx) = gpu_tx_clone { - let handoff_start = Instant::now(); - let request = - GpuRequest::StageN(next_stage, block_idx, stage_block); - match tx.try_send(request) { - Ok(()) => { - if let Some(stats) = stats_ref.as_ref() { - stats.add_gpu_handoff(handoff_start.elapsed()); - } - if let Some(score) = gpu_pressure_ref.as_ref() { - pressure_dec(score); - } - continue; - } - Err(std::sync::mpsc::TrySendError::Full(req)) => { - if let Some(stats) = stats_ref.as_ref() { - stats.add_gpu_handoff(handoff_start.elapsed()); - stats.inc_gpu_try_send_full(); - } - if let Some(score) = gpu_pressure_ref.as_ref() { - pressure_inc(score, 2); - } - if let GpuRequest::StageN(_, _, sb) = req { - local_task = - Some((next_stage, block_idx, Some(sb))); - continue; - } - unreachable!("StageN request expected"); - } - Err(std::sync::mpsc::TrySendError::Disconnected( - req, - )) => { - if let Some(stats) = stats_ref.as_ref() { - stats.add_gpu_handoff(handoff_start.elapsed()); - stats.inc_gpu_try_send_disconnected(); - } - if let Some(score) = gpu_pressure_ref.as_ref() { - pressure_inc(score, 1); - } - if let GpuRequest::StageN(_, _, sb) = req { - local_task = - Some((next_stage, block_idx, Some(sb))); - continue; - } - unreachable!("StageN request expected"); - } - } - } - } - // CPU continuation: keep processing the same block locally. - local_task = Some((next_stage, block_idx, Some(stage_block))); + let next_stage = stage_idx + 1; + local_task = Some((next_stage, block_idx, stage_block)); continue; } } @@ -1023,7 +277,7 @@ fn compress_parallel_unified( } let admin_start = Instant::now(); let (mark_invalid_after_lock, should_return) = - complete_task_lifecycle(&mut guard, cv_ref, None, stage_failed, true); + complete_task_lifecycle(&mut guard, cv_ref, stage_failed); if let Some(stats) = stats_ref.as_ref() { stats.add_queue_admin(admin_start.elapsed()); } @@ -1039,15 +293,6 @@ fn compress_parallel_unified( } }); } - - // Drop the original sender so it doesn't keep the channel open. - // Workers hold clones; when the last worker exits the scope, - // all clones drop, closing the channel and unblocking the - // GPU coordinator's recv(). - #[cfg(feature = "webgpu")] - drop(gpu_tx); - #[cfg(not(feature = "webgpu"))] - let _ = gpu_tx; }); let mut block_data_vec = Vec::with_capacity(num_blocks); @@ -1069,93 +314,6 @@ fn compress_parallel_unified( )) } -/// Handle the result of a GPU stage execution: store results or errors, -/// enqueue the next stage, and manage queue lifecycle (pending_tasks, closed, failed). -/// -/// Shared by all GPU request types to avoid duplicating the stage-completion logic. -#[cfg(feature = "webgpu")] -#[allow(clippy::too_many_arguments)] -fn complete_gpu_stage( - result: PzResult, - stage_idx: usize, - block_idx: usize, - last_stage: usize, - blocks: &[&[u8]], - options: &CompressOptions, - intermediate_slots: &[Mutex>], - results: &[Mutex>>>], - queue: &Mutex, - queue_cv: &Condvar, - stats: Option<&LocalSchedulerStats>, - gpu_pressure: Option<&AtomicUsize>, - gpu_pressure_limit: usize, -) { - let mut next_task: Option = None; - let mut stage_failed = false; - - match result { - Ok(sb) => { - if stage_idx == last_stage { - *results[block_idx].lock().expect("result slot poisoned") = Some(Ok(sb.data)); - } else { - *intermediate_slots[block_idx] - .lock() - .expect("intermediate slot poisoned") = Some(sb); - let next_stage = stage_idx + 1; - let backpressure_score = gpu_pressure.map_or(0usize, |s| s.load(Ordering::Relaxed)); - let has_gpu = { - #[cfg(feature = "webgpu")] - { - options.webgpu_engine.is_some() - } - #[cfg(not(feature = "webgpu"))] - { - false - } - }; - next_task = Some( - if next_stage == last_stage - && should_route_block_to_gpu_entropy_with_backpressure( - blocks[block_idx].len(), - options.stage1_backend, - has_gpu, - backpressure_score, - gpu_pressure_limit, - ) - { - UnifiedTask::StageGpu(next_stage, block_idx) - } else { - UnifiedTask::Stage(next_stage, block_idx) - }, - ); - } - } - Err(e) => { - *results[block_idx].lock().expect("result slot poisoned") = Some(Err(e)); - stage_failed = true; - } - } - - // Single completion lock per GPU-finished task. - let lock_start = Instant::now(); - let mut guard = queue.lock().expect("unified queue poisoned"); - if let Some(stats) = stats { - stats.add_queue_wait(lock_start.elapsed()); - } - let admin_start = Instant::now(); - let (mark_invalid_after_lock, _) = - complete_task_lifecycle(&mut guard, queue_cv, next_task, stage_failed, false); - if let Some(stats) = stats { - stats.add_queue_admin(admin_start.elapsed()); - } - drop(guard); - - if mark_invalid_after_lock { - *results[block_idx].lock().expect("result slot poisoned") = - Some(Err(PzError::InvalidInput)); - } -} - /// Multi-block parallel decompression. pub(crate) fn decompress_parallel( payload: &[u8], diff --git a/src/pipeline/parallel_tests.rs b/src/pipeline/parallel_tests.rs index df0cdfd..753d006 100644 --- a/src/pipeline/parallel_tests.rs +++ b/src/pipeline/parallel_tests.rs @@ -60,67 +60,8 @@ fn test_stage1_routing_respects_size_threshold() { assert_eq!(decompressed, input, "round-trip should match"); } -#[cfg(feature = "webgpu")] -#[test] -fn test_stage1_auto_backpressure_biases_to_cpu() { - use super::super::BackendAssignment; - use super::super::GPU_ENTROPY_THRESHOLD; - - let block_len = GPU_ENTROPY_THRESHOLD * 2; - let limit = 8usize; - - assert!( - should_route_block_to_gpu_entropy_with_backpressure( - block_len, - BackendAssignment::Auto, - true, - 0, - limit, - ), - "auto should route to GPU when pressure is low" - ); - assert!( - !should_route_block_to_gpu_entropy_with_backpressure( - block_len, - BackendAssignment::Auto, - true, - limit, - limit, - ), - "auto should bias to CPU when pressure reaches limit" - ); -} - -#[cfg(feature = "webgpu")] -#[test] -fn test_stage1_backpressure_does_not_override_explicit_backend() { - use super::super::BackendAssignment; - use super::super::GPU_ENTROPY_THRESHOLD; - - let block_len = GPU_ENTROPY_THRESHOLD * 2; - let high_pressure = 1_000usize; - - assert!( - should_route_block_to_gpu_entropy_with_backpressure( - block_len, - BackendAssignment::Gpu, - true, - high_pressure, - 1, - ), - "explicit GPU assignment should remain GPU regardless of pressure" - ); - assert!( - !should_route_block_to_gpu_entropy_with_backpressure( - block_len, - BackendAssignment::Cpu, - true, - 0, - 1, - ), - "explicit CPU assignment should remain CPU regardless of pressure" - ); -} +// GPU backpressure unit tests removed — parallel path is now CPU-only. +// GPU routing with backpressure is handled by the streaming path. // --- Task 3 tests: Round-trip correctness and threshold boundary --- diff --git a/src/pipeline/telemetry.rs b/src/pipeline/telemetry.rs index 8c2c3b0..8a3c92d 100644 --- a/src/pipeline/telemetry.rs +++ b/src/pipeline/telemetry.rs @@ -54,9 +54,6 @@ pub(super) struct LocalSchedulerStats { stage_compute_ns: AtomicU64, queue_wait_ns: AtomicU64, queue_admin_ns: AtomicU64, - gpu_handoff_ns: AtomicU64, - gpu_try_send_full_count: AtomicU64, - gpu_try_send_disconnected_count: AtomicU64, } impl LocalSchedulerStats { @@ -74,23 +71,6 @@ impl LocalSchedulerStats { self.queue_admin_ns .fetch_add(duration_to_ns(d), Ordering::Relaxed); } - - #[cfg(feature = "webgpu")] - pub(super) fn add_gpu_handoff(&self, d: Duration) { - self.gpu_handoff_ns - .fetch_add(duration_to_ns(d), Ordering::Relaxed); - } - - #[cfg(feature = "webgpu")] - pub(super) fn inc_gpu_try_send_full(&self) { - self.gpu_try_send_full_count.fetch_add(1, Ordering::Relaxed); - } - - #[cfg(feature = "webgpu")] - pub(super) fn inc_gpu_try_send_disconnected(&self) { - self.gpu_try_send_disconnected_count - .fetch_add(1, Ordering::Relaxed); - } } pub(super) fn duration_to_ns(d: Duration) -> u64 { @@ -136,18 +116,9 @@ impl Drop for SchedulerRunRecorder { guard.queue_admin_ns = guard .queue_admin_ns .saturating_add(local.queue_admin_ns.load(Ordering::Relaxed)); - guard.gpu_handoff_ns = guard - .gpu_handoff_ns - .saturating_add(local.gpu_handoff_ns.load(Ordering::Relaxed)); - guard.gpu_try_send_full_count = guard - .gpu_try_send_full_count - .saturating_add(local.gpu_try_send_full_count.load(Ordering::Relaxed)); - guard.gpu_try_send_disconnected_count = - guard.gpu_try_send_disconnected_count.saturating_add( - local - .gpu_try_send_disconnected_count - .load(Ordering::Relaxed), - ); + // GPU telemetry fields (gpu_handoff_ns, gpu_try_send_*) are retained + // in UnifiedSchedulerStats for API stability but always 0: the parallel + // scheduler is CPU-only; GPU work uses the streaming path. } } diff --git a/src/streaming.rs b/src/streaming.rs index 6ed400d..9075388 100644 --- a/src/streaming.rs +++ b/src/streaming.rs @@ -231,8 +231,9 @@ fn compress_stream_parallel( }; // Adaptive backpressure: workers check this before try_send to GPU. - // Score ramps up on Full (+2), down on Ok (-1). When score >= limit, - // workers skip GPU entirely. Same mechanism as parallel.rs. + // Score ramps up on Full (+2), down on Ok (-1) per worker send, and + // the coordinator decrements by batch_len after completing work. + // When score >= limit, workers skip GPU entirely. #[cfg(feature = "webgpu")] let gpu_pressure = if has_gpu { Some(Arc::new(std::sync::atomic::AtomicUsize::new(0))) @@ -253,6 +254,7 @@ fn compress_stream_parallel( let gpu_output_tx = output_tx.clone(); let opts = options.clone(); let pl = pipeline; + let coordinator_pressure = gpu_pressure.clone(); Some(scope.spawn(move || -> StreamResult<()> { let engine = opts.webgpu_engine.as_ref().unwrap(); @@ -266,6 +268,8 @@ fn compress_stream_parallel( batch_blocks.push(data); } + let batch_len = batch_blocks.len(); + // Run GPU match-finding on the batch. let block_refs: Vec<&[u8]> = batch_blocks.iter().map(|b| b.as_slice()).collect(); @@ -310,6 +314,17 @@ fn compress_stream_parallel( } } } + + // Signal capacity: decrement pressure so workers resume + // sending to GPU. Without this, pressure is a one-way ratchet + // that permanently locks out GPU after initial Full events. + if let Some(ref p) = coordinator_pressure { + let _ = p.fetch_update( + std::sync::atomic::Ordering::Relaxed, + std::sync::atomic::Ordering::Relaxed, + |v| Some(v.saturating_sub(batch_len)), + ); + } } Ok(()) }))