Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions scripts/download-public-datasets.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,28 @@ download_nyc_taxi() {
fi
}

# =============================================================================
# ClickBench Dataset
# Source: https://github.com/ClickHouse/ClickBench
# =============================================================================
download_clickbench() {
local mode="$1"
local base_url="https://datasets.clickhouse.com/hits_compatible/athena_partitioned"
local dest="$DEST_DIR/clickbench"
mkdir -p "$dest"

echo "=== ClickBench Dataset ==="

if [[ "$mode" == "all" ]]; then
echo "Downloading ClickBench partitioned files (CI only)..."
for i in 0 1 2; do
download_file "$base_url/hits_$i.parquet" "$dest/hits_$i.parquet"
done
else
echo "Skipping ClickBench (CI only, use --all to download)"
fi
}

# =============================================================================
# Add more datasets here following the same pattern
# =============================================================================
Expand Down Expand Up @@ -93,6 +115,7 @@ done
mkdir -p "$DEST_DIR"

download_nyc_taxi "$MODE"
download_clickbench "$MODE"

echo ""
echo "Done!"
80 changes: 59 additions & 21 deletions src/compress/snappy.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ const Limit = Io.Limit;
const Writer = Io.Writer;

pub const Decompress = struct {
pub const window_len = 65536;
pub const window_len = 65536 * 2;
pub const reader_buffer_len = 4096;
pub const buffer_len = window_len + reader_buffer_len;

input: *Reader,
remaining: usize,
total_written: usize,
window: *[window_len]u8,
reader: Reader,

state: union(enum) {
Expand All @@ -34,14 +38,17 @@ pub const Decompress = struct {
};

pub fn init(input: *Reader, buffer: []u8) Decompress {
std.debug.assert(buffer.len >= buffer_len);
return .{
.input = input,
.remaining = 0,
.total_written = 0,
.window = buffer[0..window_len],
.reader = .{
.vtable = &.{
.stream = Decompress.stream,
},
.buffer = buffer,
.buffer = buffer[window_len..],
.seek = 0,
.end = 0,
},
Expand Down Expand Up @@ -71,40 +78,71 @@ pub const Decompress = struct {
},
.literal => |*literal_remaining| {
if (literal_remaining.* == 0) {
if (d.remaining == 0) {
d.state = .eof;
return error.EndOfStream;
}
try d.readTag();
continue :read d.state;
}

const out = limit.min(@enumFromInt(literal_remaining.*)).min(@enumFromInt(d.remaining)).slice(try w.writableSliceGreedyPreserve(Decompress.window_len, 1));
try d.input.readSliceAll(out);
const n = @min(@min(literal_remaining.*, remaining), d.remaining);
if (n == 0) return 0;

// Read directly into our window buffer first
const window_start = d.total_written % window_len;
const window_space = window_len - window_start;
const first_chunk = @min(n, window_space);

try d.input.readSliceAll(d.window[window_start..][0..first_chunk]);
if (first_chunk < n) {
try d.input.readSliceAll(d.window[0 .. n - first_chunk]);
}

// Now copy from window to output
const dst = try w.writableSlice(n);
for (0..n) |i| {
dst[i] = d.window[(d.total_written + i) % window_len];
}

literal_remaining.* -= out.len;
d.remaining -= out.len;
w.advance(out.len);
return out.len;
literal_remaining.* -= n;
d.remaining -= n;
d.total_written += n;

return n;
},
.copy => |*copy| {
if (copy.len == 0) {
if (d.remaining == 0) {
d.state = .eof;
return error.EndOfStream;
}
try d.readTag();
continue :read d.state;
}

const n = @min(copy.len, remaining);
std.debug.assert(n <= 64);
if (copy.offset > d.total_written or copy.offset == 0) {
return error.ReadFailed;
}

const start = w.end - copy.offset;
const end = @min(start + n, w.end);
const length = end - start;
const n = @min(@min(copy.len, remaining), 64);
if (n == 0) return 0;

const src = w.buffer[start..end];
const dst = try w.writableSliceGreedyPreserve(Decompress.window_len, length);
@memmove(dst[0..length], src);
const dst = try w.writableSlice(n);

// Copy from our circular window, byte by byte to handle overlapping copies
for (0..n) |i| {
const src_idx = (d.total_written - copy.offset + i) % window_len;
const byte = d.window[src_idx];
d.window[(d.total_written + i) % window_len] = byte;
dst[i] = byte;
}

copy.len -= length;
d.remaining -= length;
w.advance(length);
copy.len -= n;
d.remaining -= n;
d.total_written += n;

return length;
return n;
},
}
}
Expand Down Expand Up @@ -278,7 +316,7 @@ test "golden" {
fn expectDecoded(input: []const u8, expected: []const u8) !void {
var fixed: Reader = .fixed(input);

var decompress_buf: [1024]u8 = undefined;
var decompress_buf: [Decompress.buffer_len]u8 = undefined;
var decompress = Decompress.init(&fixed, &decompress_buf);

var decoded: Writer.Allocating = .init(testing.allocator);
Expand Down
2 changes: 1 addition & 1 deletion src/parquet/rowGroupReader.zig
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ fn decoderForPage(arena: std.mem.Allocator, inner_reader: *Reader, codec: parque
break :blk &decompress.reader;
},
.SNAPPY => blk: {
const buf = try arena.alloc(u8, compress.snappy.Decompress.window_len);
const buf = try arena.alloc(u8, compress.snappy.Decompress.buffer_len);
const decompress = try arena.create(compress.snappy.Decompress);
decompress.* = compress.snappy.Decompress.init(inner_reader, buf);
break :blk &decompress.reader;
Expand Down
29 changes: 29 additions & 0 deletions src/public_datasets_testing.zig
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,32 @@ test "nyc taxi: fhvhv tripdata 2025-10 (ci only)" {
// TODO: This causes OOM on the CI. We probably need to have a seperate arena for each row group and de-allocate it between.
// try readAllRowGroups(&file);
}

// =============================================================================
// ClickBench Dataset - CI only
// Source: https://github.com/ClickHouse/ClickBench
// 105 columns, real web analytics data, diverse types
// =============================================================================

test "clickbench: hits (ci only)" {
if (!ci_tests) return error.SkipZigTest;

const files = [_][]const u8{
"testdata/public-datasets/clickbench/hits_0.parquet",
"testdata/public-datasets/clickbench/hits_1.parquet",
"testdata/public-datasets/clickbench/hits_2.parquet",
};

for (files) |path| {
var reader_buf: [4096]u8 = undefined;
var file_reader = (try Io.Dir.cwd().openFile(io, path, .{ .mode = .read_only })).reader(io, &reader_buf);
var file = try File.read(testing.allocator, &file_reader);
defer file.deinit();

try testing.expectEqual(105, file.metadata.schema.len - 1); // -1 for root schema element
try testing.expect(file.metadata.num_rows > 0);
try testing.expect(file.metadata.row_groups.len > 0);

try readAllRowGroups(&file);
}
}
18 changes: 18 additions & 0 deletions testdata/public-datasets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ public-datasets/
│ ├── fhv_tripdata_2025-10.parquet
│ ├── yellow_tripdata_2025-10.parquet (CI only)
│ └── fhvhv_tripdata_2025-10.parquet (CI only)
├── clickbench/ # ClickBench web analytics data (CI only)
│ ├── hits_0.parquet
│ ├── hits_1.parquet
│ └── hits_2.parquet
└── <future-dataset>/ # More datasets can be added
```

Expand All @@ -28,6 +32,20 @@ Source: [TLC Trip Record Data](https://www.nyc.gov/site/tlc/about/tlc-trip-recor
| `yellow_tripdata_2025-10.parquet` | ~50MB | Yes |
| `fhvhv_tripdata_2025-10.parquet` | ~400MB | Yes |

### ClickBench

Source: [ClickHouse/ClickBench](https://github.com/ClickHouse/ClickBench)

Real-world web analytics data with 105 columns covering diverse types. The dataset
is "intentionally dirty" with no bloom filters or proper logical types, making it
excellent for stress-testing edge cases.

| File | Size | CI Only |
|------|------|---------|
| `hits_0.parquet` | ~150MB | Yes |
| `hits_1.parquet` | ~150MB | Yes |
| `hits_2.parquet` | ~150MB | Yes |

## Download

Run the download script from the repository root:
Expand Down