From 7d3474ec68b9c3f0a03b7356af0f1665a2b8f026 Mon Sep 17 00:00:00 2001 From: Burak Varli Date: Tue, 3 Feb 2026 21:36:06 +0000 Subject: [PATCH] Fix Snappy decompressor buffer corruption and add ClickBench tests Snappy fix: - The decompressor was sharing the same buffer for both the circular window (for back-references) and the Reader's internal buffer. - When the Reader framework used the buffer, it corrupted the window. - Fix: Use separate buffer regions - first 128KB for the window, remaining 4KB for the Reader's output buffer. - Increased window to 128KB to support copy 4-byte test (offset 65540). ClickBench dataset: - Added ClickBench web analytics dataset (3 parquet files, ~450MB total) - 105 columns with diverse types, real-world 'dirty' data - CI-only tests that read all columns from all row groups - Updated download script and documentation --- scripts/download-public-datasets.sh | 23 +++++++++ src/compress/snappy.zig | 80 +++++++++++++++++++++-------- src/parquet/rowGroupReader.zig | 2 +- src/public_datasets_testing.zig | 29 +++++++++++ testdata/public-datasets/README.md | 18 +++++++ 5 files changed, 130 insertions(+), 22 deletions(-) diff --git a/scripts/download-public-datasets.sh b/scripts/download-public-datasets.sh index 249dbc6..e1220ae 100755 --- a/scripts/download-public-datasets.sh +++ b/scripts/download-public-datasets.sh @@ -53,6 +53,28 @@ download_nyc_taxi() { fi } +# ============================================================================= +# ClickBench Dataset +# Source: https://github.com/ClickHouse/ClickBench +# ============================================================================= +download_clickbench() { + local mode="$1" + local base_url="https://datasets.clickhouse.com/hits_compatible/athena_partitioned" + local dest="$DEST_DIR/clickbench" + mkdir -p "$dest" + + echo "=== ClickBench Dataset ===" + + if [[ "$mode" == "all" ]]; then + echo "Downloading ClickBench partitioned files (CI only)..." + for i in 0 1 2; do + download_file "$base_url/hits_$i.parquet" "$dest/hits_$i.parquet" + done + else + echo "Skipping ClickBench (CI only, use --all to download)" + fi +} + # ============================================================================= # Add more datasets here following the same pattern # ============================================================================= @@ -93,6 +115,7 @@ done mkdir -p "$DEST_DIR" download_nyc_taxi "$MODE" +download_clickbench "$MODE" echo "" echo "Done!" diff --git a/src/compress/snappy.zig b/src/compress/snappy.zig index 35662a2..a7aefd9 100644 --- a/src/compress/snappy.zig +++ b/src/compress/snappy.zig @@ -5,10 +5,14 @@ const Limit = Io.Limit; const Writer = Io.Writer; pub const Decompress = struct { - pub const window_len = 65536; + pub const window_len = 65536 * 2; + pub const reader_buffer_len = 4096; + pub const buffer_len = window_len + reader_buffer_len; input: *Reader, remaining: usize, + total_written: usize, + window: *[window_len]u8, reader: Reader, state: union(enum) { @@ -34,14 +38,17 @@ pub const Decompress = struct { }; pub fn init(input: *Reader, buffer: []u8) Decompress { + std.debug.assert(buffer.len >= buffer_len); return .{ .input = input, .remaining = 0, + .total_written = 0, + .window = buffer[0..window_len], .reader = .{ .vtable = &.{ .stream = Decompress.stream, }, - .buffer = buffer, + .buffer = buffer[window_len..], .seek = 0, .end = 0, }, @@ -71,40 +78,71 @@ pub const Decompress = struct { }, .literal => |*literal_remaining| { if (literal_remaining.* == 0) { + if (d.remaining == 0) { + d.state = .eof; + return error.EndOfStream; + } try d.readTag(); continue :read d.state; } - const out = limit.min(@enumFromInt(literal_remaining.*)).min(@enumFromInt(d.remaining)).slice(try w.writableSliceGreedyPreserve(Decompress.window_len, 1)); - try d.input.readSliceAll(out); + const n = @min(@min(literal_remaining.*, remaining), d.remaining); + if (n == 0) return 0; + + // Read directly into our window buffer first + const window_start = d.total_written % window_len; + const window_space = window_len - window_start; + const first_chunk = @min(n, window_space); + + try d.input.readSliceAll(d.window[window_start..][0..first_chunk]); + if (first_chunk < n) { + try d.input.readSliceAll(d.window[0 .. n - first_chunk]); + } + + // Now copy from window to output + const dst = try w.writableSlice(n); + for (0..n) |i| { + dst[i] = d.window[(d.total_written + i) % window_len]; + } - literal_remaining.* -= out.len; - d.remaining -= out.len; - w.advance(out.len); - return out.len; + literal_remaining.* -= n; + d.remaining -= n; + d.total_written += n; + + return n; }, .copy => |*copy| { if (copy.len == 0) { + if (d.remaining == 0) { + d.state = .eof; + return error.EndOfStream; + } try d.readTag(); continue :read d.state; } - const n = @min(copy.len, remaining); - std.debug.assert(n <= 64); + if (copy.offset > d.total_written or copy.offset == 0) { + return error.ReadFailed; + } - const start = w.end - copy.offset; - const end = @min(start + n, w.end); - const length = end - start; + const n = @min(@min(copy.len, remaining), 64); + if (n == 0) return 0; - const src = w.buffer[start..end]; - const dst = try w.writableSliceGreedyPreserve(Decompress.window_len, length); - @memmove(dst[0..length], src); + const dst = try w.writableSlice(n); + + // Copy from our circular window, byte by byte to handle overlapping copies + for (0..n) |i| { + const src_idx = (d.total_written - copy.offset + i) % window_len; + const byte = d.window[src_idx]; + d.window[(d.total_written + i) % window_len] = byte; + dst[i] = byte; + } - copy.len -= length; - d.remaining -= length; - w.advance(length); + copy.len -= n; + d.remaining -= n; + d.total_written += n; - return length; + return n; }, } } @@ -278,7 +316,7 @@ test "golden" { fn expectDecoded(input: []const u8, expected: []const u8) !void { var fixed: Reader = .fixed(input); - var decompress_buf: [1024]u8 = undefined; + var decompress_buf: [Decompress.buffer_len]u8 = undefined; var decompress = Decompress.init(&fixed, &decompress_buf); var decoded: Writer.Allocating = .init(testing.allocator); diff --git a/src/parquet/rowGroupReader.zig b/src/parquet/rowGroupReader.zig index fda2841..df4fb68 100644 --- a/src/parquet/rowGroupReader.zig +++ b/src/parquet/rowGroupReader.zig @@ -336,7 +336,7 @@ fn decoderForPage(arena: std.mem.Allocator, inner_reader: *Reader, codec: parque break :blk &decompress.reader; }, .SNAPPY => blk: { - const buf = try arena.alloc(u8, compress.snappy.Decompress.window_len); + const buf = try arena.alloc(u8, compress.snappy.Decompress.buffer_len); const decompress = try arena.create(compress.snappy.Decompress); decompress.* = compress.snappy.Decompress.init(inner_reader, buf); break :blk &decompress.reader; diff --git a/src/public_datasets_testing.zig b/src/public_datasets_testing.zig index 75566db..c0e81ea 100644 --- a/src/public_datasets_testing.zig +++ b/src/public_datasets_testing.zig @@ -115,3 +115,32 @@ test "nyc taxi: fhvhv tripdata 2025-10 (ci only)" { // TODO: This causes OOM on the CI. We probably need to have a seperate arena for each row group and de-allocate it between. // try readAllRowGroups(&file); } + +// ============================================================================= +// ClickBench Dataset - CI only +// Source: https://github.com/ClickHouse/ClickBench +// 105 columns, real web analytics data, diverse types +// ============================================================================= + +test "clickbench: hits (ci only)" { + if (!ci_tests) return error.SkipZigTest; + + const files = [_][]const u8{ + "testdata/public-datasets/clickbench/hits_0.parquet", + "testdata/public-datasets/clickbench/hits_1.parquet", + "testdata/public-datasets/clickbench/hits_2.parquet", + }; + + for (files) |path| { + var reader_buf: [4096]u8 = undefined; + var file_reader = (try Io.Dir.cwd().openFile(io, path, .{ .mode = .read_only })).reader(io, &reader_buf); + var file = try File.read(testing.allocator, &file_reader); + defer file.deinit(); + + try testing.expectEqual(105, file.metadata.schema.len - 1); // -1 for root schema element + try testing.expect(file.metadata.num_rows > 0); + try testing.expect(file.metadata.row_groups.len > 0); + + try readAllRowGroups(&file); + } +} diff --git a/testdata/public-datasets/README.md b/testdata/public-datasets/README.md index 973e621..7de34c0 100644 --- a/testdata/public-datasets/README.md +++ b/testdata/public-datasets/README.md @@ -12,6 +12,10 @@ public-datasets/ │ ├── fhv_tripdata_2025-10.parquet │ ├── yellow_tripdata_2025-10.parquet (CI only) │ └── fhvhv_tripdata_2025-10.parquet (CI only) +├── clickbench/ # ClickBench web analytics data (CI only) +│ ├── hits_0.parquet +│ ├── hits_1.parquet +│ └── hits_2.parquet └── / # More datasets can be added ``` @@ -28,6 +32,20 @@ Source: [TLC Trip Record Data](https://www.nyc.gov/site/tlc/about/tlc-trip-recor | `yellow_tripdata_2025-10.parquet` | ~50MB | Yes | | `fhvhv_tripdata_2025-10.parquet` | ~400MB | Yes | +### ClickBench + +Source: [ClickHouse/ClickBench](https://github.com/ClickHouse/ClickBench) + +Real-world web analytics data with 105 columns covering diverse types. The dataset +is "intentionally dirty" with no bloom filters or proper logical types, making it +excellent for stress-testing edge cases. + +| File | Size | CI Only | +|------|------|---------| +| `hits_0.parquet` | ~150MB | Yes | +| `hits_1.parquet` | ~150MB | Yes | +| `hits_2.parquet` | ~150MB | Yes | + ## Download Run the download script from the repository root: