Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pz compresses 2.5–7x faster than gzip with ~2pp ratio gap. Decompress is faste

- `src/lib.rs` — crate root, `PzError`/`PzResult` types
- `src/lz_token.rs` — universal `LzToken` type, `TokenEncoder` trait, three encoder implementations
- `src/{algorithm}.rs` — one file per composable algorithm (bwt, crc32, fse, huffman, lz77, lzseq, lzss, mtf, rans, rle)
- `src/{algorithm}.rs` — one file per composable algorithm (bwt, crc32, fse, huffman, lz77, lzseq, lzss, lz_token, mtf, rans, rle, sortlz, recoil)
- `src/analysis.rs` — data profiling (entropy, match density, run ratio, autocorrelation)
- `src/optimal.rs` — optimal parsing (GPU top-K + backward DP)
- `src/simd.rs` — SIMD decode paths for rANS
Expand All @@ -102,8 +102,8 @@ pz compresses 2.5–7x faster than gzip with ~2pp ratio gap. Decompress is faste
Before optimizing GPU code paths, read this first — multiple agents have spent full sessions rediscovering these:

- **GPU entropy (rANS/FSE) is slower than CPU** — 0.77x on encode, 0.54x on decode. This has been proven across 500+ optimization iterations. The serial state dependency in rANS limits GPU to ~300 threads; saturation needs ~8K-16K. Do not attempt to batch, parallelize, or "pipeline" GPU entropy encoding.
- **`gpu_fused_span()` returning `Some((0,1))` is counterproductive** — it routes entropy to GPU (slower). It exists as architectural prep for if GPU entropy ever becomes competitive. The `GPU_ENTROPY_THRESHOLD` (256KB > default GPU block size 128KB) intentionally prevents this path from activating.
- **The CLI uses `streaming::compress_stream`, not `pipeline::compress_with_options`** — the parallel scheduler's GPU coordinator is not invoked by the CLI. The streaming path deliberately uses CPU rANS for entropy.
- **The parallel scheduler (`compress_with_options`) is CPU-only** — the GPU coordinator was removed because it serialized entropy encoding on one thread, bottlenecking at 28 MiB/s. GPU-accelerated compression uses the streaming path (`compress_stream`) which has a dedicated GPU coordinator with adaptive backpressure. Do not re-add a GPU coordinator to the parallel path.
- **The CLI uses `streaming::compress_stream`, not `pipeline::compress_with_options`** — the streaming path handles GPU match-finding via a coordinator thread with adaptive backpressure that decrements on batch completion. Workers use CPU for entropy.
- **The real GPU win (ring-buffered LZ77 batching) is already shipped** — delivers +7-17% throughput. See `docs/design-docs/gpu-strategy.md`.
- **GPU device init time skews throughput benchmarks** — first-call GPU init adds significant overhead that `bench.sh` captures but Criterion amortizes across iterations. When comparing GPU vs CPU throughput, use Criterion (`cargo bench`) for apples-to-apples; `bench.sh` reflects real-world cold-start cost. Don't chase "GPU is slower" regressions that are really just init time.
- **Compression ratio is limited by wire encoding overhead, not match quality** — the LZ match-finder finds good matches. The legacy Lz77Encoder (5-byte per match) was the worst offender; LzSeqEncoder (log2-coded, 6 streams) is much better but still ~2pp behind gzip on Silesia (34.4% vs 32.2%). Further ratio gains require encoding format work, not matcher tuning.
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ harness = false
name = "stages_match_finders"
harness = false

[[bench]]
name = "gpu_parallel_vs_streaming"
harness = false

[profile.profiling]
inherits = "release"
debug = true
Expand Down
142 changes: 142 additions & 0 deletions benches/gpu_parallel_vs_streaming.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/// Criterion benchmark: GPU parallel (in-memory) vs GPU streaming for Lzf.
///
/// Compares four paths at 4 MB and 16 MB:
/// 1. parallel CPU-only (compress_with_options, threads=0)
/// 2. parallel GPU (compress_with_options, Backend::WebGpu, threads=0)
/// 3. streaming CPU-only (compress_stream, threads=0)
/// 4. streaming GPU (compress_stream, Backend::WebGpu, threads=0)
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use pz::pipeline::{self, CompressOptions, Pipeline};
use pz::streaming;
use std::io::Cursor;
use std::path::Path;
use std::time::Duration;

fn get_test_data_sized(size: usize) -> Vec<u8> {
let manifest = Path::new(env!("CARGO_MANIFEST_DIR"));
let base = ["alice29.txt", "cantrbry.tar"]
.iter()
.filter_map(|name| {
let p = manifest.join("samples").join(name);
std::fs::read(&p).ok().filter(|d| !d.is_empty())
})
.next()
.unwrap_or_else(|| b"The quick brown fox jumps over the lazy dog. ".repeat(3000));

let mut data = Vec::with_capacity(size);
while data.len() < size {
let remaining = size - data.len();
data.extend_from_slice(&base[..remaining.min(base.len())]);
}
data
}

fn bench_parallel_vs_streaming(c: &mut Criterion) {
let pipeline = Pipeline::Lzf;

#[cfg(feature = "webgpu")]
let engine = {
use pz::webgpu::WebGpuEngine;
match WebGpuEngine::new() {
Ok(e) => Some(std::sync::Arc::new(e)),
Err(e) => {
eprintln!("gpu_parallel_vs_streaming: no GPU device ({e}), GPU benches skipped");
None
}
}
};

for &size in &[4_194_304usize, 16_777_216] {
let data = get_test_data_sized(size);
let label = format!("{}MB", size / (1024 * 1024));

let mut group = c.benchmark_group(format!("gpu_par_vs_stream_{label}"));
group.warm_up_time(Duration::from_secs(3));
group.measurement_time(Duration::from_secs(8));
group.sample_size(10);
group.throughput(Throughput::Bytes(size as u64));

// --- parallel CPU ---
{
let opts = CompressOptions {
threads: 0,
..Default::default()
};
group.bench_with_input(
BenchmarkId::new("parallel_cpu", &label),
&data,
|b, data| {
b.iter(|| pipeline::compress_with_options(data, pipeline, &opts).unwrap());
},
);
}

// --- streaming CPU ---
{
let opts = CompressOptions {
threads: 0,
..Default::default()
};
group.bench_with_input(
BenchmarkId::new("streaming_cpu", &label),
&data,
|b, data| {
b.iter(|| {
let input = Cursor::new(data);
let mut output = Vec::with_capacity(data.len());
streaming::compress_stream(input, &mut output, pipeline, &opts).unwrap();
output
});
},
);
}

// --- parallel GPU ---
#[cfg(feature = "webgpu")]
if let Some(ref engine) = engine {
use pz::pipeline::Backend;
let opts = CompressOptions {
backend: Backend::WebGpu,
threads: 0,
webgpu_engine: Some(engine.clone()),
..Default::default()
};
group.bench_with_input(
BenchmarkId::new("parallel_gpu", &label),
&data,
|b, data| {
b.iter(|| pipeline::compress_with_options(data, pipeline, &opts).unwrap());
},
);
}

// --- streaming GPU ---
#[cfg(feature = "webgpu")]
if let Some(ref engine) = engine {
use pz::pipeline::Backend;
let opts = CompressOptions {
backend: Backend::WebGpu,
threads: 0,
webgpu_engine: Some(engine.clone()),
..Default::default()
};
group.bench_with_input(
BenchmarkId::new("streaming_gpu", &label),
&data,
|b, data| {
b.iter(|| {
let input = Cursor::new(data);
let mut output = Vec::with_capacity(data.len());
streaming::compress_stream(input, &mut output, pipeline, &opts).unwrap();
output
});
},
);
}

group.finish();
}
}

criterion_group!(benches, bench_parallel_vs_streaming);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion docs/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ To understand how a pipeline works, trace a single block from `compress_block()`
- Account for staging buffers and padding/alignment
- Use `scripts/gpu-meminfo.sh` to analyze actual allocations

**GPU scheduling:** All GPU work is routed through a single unified scheduler (`compress_parallel_unified` in `pipeline/parallel.rs`). A dedicated GPU coordinator thread batch-collects requests from CPU workers. The `UnifiedTask` enum has three variants: `Stage` (CPU), `StageGpu` (single GPU stage), and `FusedGpu` (multi-stage GPU execution). Workers use `try_send()` on a bounded channel with CPU fallback to avoid deadlock.
**GPU scheduling:** GPU-accelerated compression uses the **streaming path** (`compress_stream` in `streaming.rs`), which spawns a dedicated GPU coordinator thread that batch-collects LZ77 match-finding requests from CPU workers via a bounded channel with adaptive backpressure. The **parallel scheduler** (`compress_parallel` in `pipeline/parallel.rs`) is CPU-only by design — it achieves higher throughput by avoiding GPU dispatch overhead. When `compress_with_options` is called with `Backend::WebGpu`, it routes through `compress_stream` internally.

### Feature Flags and Build Configurations

Expand Down
34 changes: 21 additions & 13 deletions docs/design-docs/gpu-strategy.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,29 +72,37 @@ The GPU rANS performance gate (Slice 4 in PLAN-p0a) remains FAIL. Recommendation

GPU Huffman encode (`huffman_encode.wgsl`) and FSE encode/decode (`fse_encode.wgsl`, `fse_decode.wgsl`) exist in production. These have the same serial-per-stream limitation as rANS but are used for specific pipelines (Deflate uses Huffman, Lzf uses FSE). Their per-stream performance has not been systematically compared to CPU in the same way as rANS.

## Current architecture: unified scheduler
## Current architecture: dual-path GPU support

All GPU work routes through `compress_parallel_unified()` in `src/pipeline/parallel.rs` (PR #101). The architecture:
GPU-accelerated compression uses the **streaming path** (`compress_stream` in `streaming.rs`), not the parallel scheduler. The parallel scheduler (`compress_parallel` in `parallel.rs`) is CPU-only by design — its GPU coordinator was removed after proving it bottlenecked at 28 MiB/s due to serializing entropy encoding on a single thread.

When `compress_with_options` is called with `Backend::WebGpu`, it routes through `compress_stream` internally (producing framed-format output, which `decompress` handles natively).

### Streaming GPU coordinator

For LZ-demux pipelines (Lzf, LzSeqR, etc.), `compress_stream_parallel` spawns a GPU coordinator thread:

```
CPU workers: pick up tasks from shared VecDeque<UnifiedTask>
├─ Stage(stage_idx, block_idx) → run on CPU
├─ StageGpu(stage_idx, block_idx) → send to GPU coordinator via bounded channel
└─ FusedGpu(start, end, block_idx) → send to GPU coordinator
Workers: read blocks → if GPU available and pressure < limit,
try_send to GPU coordinator via bounded channel
→ else, compress on CPU

GPU coordinator thread:
├─ Batch-collects Stage 0 requests → find_matches_batched() (ring-buffered)
├─ Processes Stage N requests → run_compress_stage() individually
└─ Processes Fused requests → runs stages start..=end sequentially
1. Block on gpu_rx.recv() for first request
2. Drain additional requests via gpu_rx.try_recv()
3. Batch blocks → engine.find_matches_batched()
4. Demux matches → compress_block_from_demux() (CPU entropy)
5. Send results to output_tx for ordered writing
6. Decrement backpressure by batch_len (enables worker → GPU flow)
```

**Deadlock prevention**: Workers use `try_send()` on the bounded channel. If the channel is full, the block falls back to CPU processing.
**Adaptive backpressure**: An `AtomicUsize` pressure score gates worker → GPU routing. Workers increment on `try_send` Full (+2), decrement on Ok (-1). The coordinator decrements by `batch_len` after completing each batch, preventing the one-way ratchet that would permanently lock out GPU.

**GPU failure recovery**: If any GPU operation fails, the block is re-enqueued as `Stage(0, block_idx)` for CPU retry.
**GPU failure recovery**: If `find_matches_batched` fails, all blocks in the batch fall back to CPU via `compress_block`.

### The FusedGpu path problem
### GPU entropy is not used

`gpu_fused_span()` currently returns `Some((0, 1))` for Lzr and LzSeqR, routing both stages to GPU. But since GPU entropy is 0.77x CPU encode, this actually makes those pipelines *slower* than the decomposed path (GPU LZ77 + CPU entropy). The fused path is architectural preparation for the case where GPU entropy becomes competitive — it is not a performance win today.
GPU entropy (rANS/FSE) is 0.54-0.77x CPU throughput due to serial state dependencies. All entropy encoding runs on CPU the GPU is only used for LZ77 match-finding.

## What would need to change for GPU to win

Expand Down
33 changes: 14 additions & 19 deletions docs/design-docs/pipeline-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,35 +189,30 @@ Synchronization: Join all threads before writing container

**When used:** Default for >256KB inputs with multiple CPU cores

### 2. Unified GPU Coordinator
### 2. GPU Coordinator (Streaming Path)

**File:** `src/pipeline/parallel.rs` - `compress_parallel_unified()`
**File:** `src/streaming.rs` - `compress_stream_parallel()`

A single GPU coordinator thread handles all GPU work. CPU workers send GPU requests via a bounded `SyncSender`; the coordinator batch-collects and dispatches:
The parallel scheduler (`compress_parallel` in `parallel.rs`) is **CPU-only**. GPU-accelerated compression uses the streaming path, which spawns a dedicated GPU coordinator thread for LZ-demux pipelines:

```
CPU workers: split inputenqueue StageGpu(0, block_idx)
or FusedGpu(0, 1, block_idx)
Workers: read blockstry_send to GPU coordinator (bounded channel)
→ CPU fallback on Full or when pressure >= limit

GPU coordinator:
1. Block on rx.recv() for first request
2. Drain up to ring_depth more Stage0 requests via try_recv()
3. Batch Stage0 blocks → find_matches_batched() (ring-buffered overlap)
4. Demux resultspush Stage(1, block_idx) to unified queue
5. Process StageN requests via run_compress_stage()
6. Process Fused requests: run stages start..=end sequentially on GPU
1. Block on gpu_rx.recv() for first request
2. Drain additional requests via gpu_rx.try_recv()
3. Batch blocks → engine.find_matches_batched()
4. Demux matchescompress_block_from_demux() (CPU entropy)
5. Send compressed results to output_tx for ordered writing
6. Decrement backpressure by batch_len
```

**Task variants:** `UnifiedTask` enum routes work:
- `Stage(stage_idx, block_idx)` — CPU worker picks up
- `StageGpu(stage_idx, block_idx)` — GPU coordinator handles
- `FusedGpu(start, end, block_idx)` — GPU runs multiple stages without queue round-trips
**Adaptive backpressure:** AtomicUsize pressure score ramps up on Full (+2), down on Ok (-1) per worker, and coordinator decrements by batch_len on completion. Prevents one-way ratchet lockout.

**Deadlock prevention:** Workers use `try_send()` with CPU fallback when the bounded GPU channel is full
**GPU-to-CPU fallback:** If `find_matches_batched` fails, all batch blocks fall back to CPU via `compress_block`.

**GPU-to-CPU fallback:** If GPU operations fail, blocks are re-enqueued as `Stage(0, block_idx)` for CPU retry

**When used:** GPU available AND input ≥ 256KB
**When used:** `compress_stream` with `Backend::WebGpu` + LZ-demux pipeline. Also used internally by `compress_with_options` when `Backend::WebGpu` is requested (routes through `compress_stream` with in-memory I/O).

## Auto-Selection Strategy

Expand Down
32 changes: 28 additions & 4 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,16 @@ pub fn compress(input: &[u8], pipeline: Pipeline) -> PzResult<Vec<u8>> {
/// When `threads` is 1 or the input fits in a single block, a single block
/// is compressed without thread overhead.
///
/// The output always uses the multi-block container format (V2) with a
/// block table, even for single-block streams.
/// Output format depends on the path taken:
/// - **CPU parallel**: table-mode V2 container (block table + block data)
/// - **GPU parallel**: framed V2 container (self-framing blocks, routed
/// through the streaming path's GPU coordinator)
///
/// Both formats are readable by [`decompress`] and [`decompress_with_options`].
///
/// When `options.backend` is `Backend::WebGpu` and an engine is provided,
/// GPU-amenable stages (e.g., LZ77 match finding) run on the GPU.
/// GPU-amenable stages (e.g., LZ77 match finding) run on the GPU via the
/// streaming path's coordinator thread with adaptive backpressure.
/// Other stages and decompression always use the CPU.
pub fn compress_with_options(
input: &[u8],
Expand Down Expand Up @@ -483,7 +488,26 @@ pub fn compress_with_options(
return Ok(output);
}

// Multi-block parallel: unified scheduler dispatches all stages from a shared work queue.
// Multi-block parallel with GPU: route through the streaming path which
// has the GPU coordinator thread with adaptive backpressure. The parallel
// scheduler is CPU-only by design (see CLAUDE.md). Output uses framed
// format, which decompress()/decompress_with_options() handle natively.
#[cfg(feature = "webgpu")]
if matches!(options.backend, Backend::WebGpu) && options.webgpu_engine.is_some() {
let reader = std::io::Cursor::new(input);
let mut buf = Vec::new();
crate::streaming::compress_stream(reader, &mut buf, pipeline, options).map_err(
|e| match e {
crate::streaming::StreamError::Pz(pe) => pe,
// Unreachable: in-memory Cursor I/O cannot produce IO errors.
crate::streaming::StreamError::Io(_) => PzError::InvalidInput,
},
)?;
return Ok(buf);
}

// Multi-block parallel (CPU-only): unified scheduler dispatches all stages
// from a shared work queue.
compress_parallel(input, pipeline, options, num_threads)
}

Expand Down
Loading
Loading