Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub fn main(init: std.process.Init) !void {
}

fn columnIndex(file: *File, name: []const u8) usize {
return file.findSchemaElement(&.{name}).?.index - 1;
return file.findSchemaElement(&.{name}).?.column_index;
}
```

Expand Down
2 changes: 1 addition & 1 deletion examples/nyc_taxi.zig
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@ pub fn main(init: std.process.Init) !void {
}

fn columnIndex(file: *File, name: []const u8) usize {
return file.findSchemaElement(&.{name}).?.index - 1;
return file.findSchemaElement(&.{name}).?.column_index;
}
90 changes: 84 additions & 6 deletions src/parquet/File.zig
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ const nestedReader = @import("./nestedReader.zig");

const File = @This();

pub const SchemaInfo = struct {
column_index: usize,
max_definition_level: u8,
max_repetition_level: u8,
elem: parquet_schema.SchemaElement,
};

const MAGIC = "PAR1";
const METADATA_LENGTH_SIZE = 4;
const FOOTER_SIZE = MAGIC.len + METADATA_LENGTH_SIZE;
Expand Down Expand Up @@ -92,14 +99,16 @@ pub fn deinit(self: *File) void {
self.arena.deinit();
}

/// Find a schema element by path and return its index, definition level, repetition level, and the element itself.
/// Find a schema element by path and return its column index, definition level, repetition level, and the element itself.
/// The column_index corresponds to the index in the row group's columns array.
/// Returns null if the path is not found or invalid.
pub fn findSchemaElement(self: *File, path: []const []const u8) ?struct { index: usize, max_definition_level: u8, max_repetition_level: u8, elem: parquet_schema.SchemaElement } {
pub fn findSchemaElement(self: *File, path: []const []const u8) ?SchemaInfo {
if (path.len == 0 or self.metadata.schema.len < 2) {
return null;
}

var current_idx: usize = 1; // Skip the root element
var column_index: usize = 0;
var max_definition_level: u8 = 0;
var max_repetition_level: u8 = 0;
var elem: parquet_schema.SchemaElement = undefined;
Expand All @@ -113,13 +122,16 @@ pub fn findSchemaElement(self: *File, path: []const []const u8) ?struct { index:
break :blk current_idx;
}

// Skip this element and all its children
// Skip this element and all its children, counting leaf columns
var skip_idx = current_idx;
var nodes_to_skip: i32 = 1; // Need to skip the current one at least
var nodes_to_skip: i32 = 1;

while (nodes_to_skip > 0 and skip_idx < self.metadata.schema.len) {
if (self.metadata.schema[skip_idx].num_children) |num_children| {
const skip_elem = self.metadata.schema[skip_idx];
if (skip_elem.num_children) |num_children| {
nodes_to_skip += @as(i32, @intCast(num_children));
} else {
column_index += 1;
}

nodes_to_skip -= 1;
Expand Down Expand Up @@ -150,7 +162,7 @@ pub fn findSchemaElement(self: *File, path: []const []const u8) ?struct { index:
current_idx = found_idx + 1;
}

return .{ .index = current_idx - 1, .max_definition_level = max_definition_level, .max_repetition_level = max_repetition_level, .elem = elem };
return .{ .column_index = column_index, .max_definition_level = max_definition_level, .max_repetition_level = max_repetition_level, .elem = elem };
}

pub fn readLevelDataV1(_: *File, reader: *Reader, encoding: parquet_schema.Encoding, bit_width: u8, dest: []u16) !void {
Expand Down Expand Up @@ -319,3 +331,69 @@ test "reading simple file with nulls with dynamic types" {
try std.testing.expectEqualSlices(?i64, &[_]?i64{ 1, 2, null, 4 }, (try rg.readColumnDynamic(0)).int64);
try std.testing.expectEqualDeep(&[_]?[]const u8{ null, "foo", "bar", null }, (try rg.readColumnDynamic(1)).byte_array);
}

test "findSchemaElement returns correct column_index for flat schema" {
var reader_buf: [1024]u8 = undefined;
var file_reader = (try std.Io.Dir.cwd().openFile(std.testing.io, "testdata/simple.parquet", .{ .mode = .read_only })).reader(std.testing.io, &reader_buf);
var file = try File.read(std.testing.allocator, &file_reader);
defer file.deinit();

// Schema: root, foo, bar, ham (3 columns, indices 0, 1, 2)
const foo = file.findSchemaElement(&.{"foo"}).?;
try std.testing.expectEqual(0, foo.column_index);
try std.testing.expectEqualStrings("foo", foo.elem.name);

const bar = file.findSchemaElement(&.{"bar"}).?;
try std.testing.expectEqual(1, bar.column_index);
try std.testing.expectEqualStrings("bar", bar.elem.name);

const ham = file.findSchemaElement(&.{"ham"}).?;
try std.testing.expectEqual(2, ham.column_index);
try std.testing.expectEqualStrings("ham", ham.elem.name);
}

test "findSchemaElement returns null for non-existent path" {
var reader_buf: [1024]u8 = undefined;
var file_reader = (try std.Io.Dir.cwd().openFile(std.testing.io, "testdata/simple.parquet", .{ .mode = .read_only })).reader(std.testing.io, &reader_buf);
var file = try File.read(std.testing.allocator, &file_reader);
defer file.deinit();

try std.testing.expect(file.findSchemaElement(&.{"nonexistent"}) == null);
try std.testing.expect(file.findSchemaElement(&.{ "foo", "nested" }) == null);
try std.testing.expect(file.findSchemaElement(&.{}) == null);
}

test "findSchemaElement returns correct levels for optional columns" {
var reader_buf: [1024]u8 = undefined;
var file_reader = (try std.Io.Dir.cwd().openFile(std.testing.io, "testdata/simple_with_nulls.parquet", .{ .mode = .read_only })).reader(std.testing.io, &reader_buf);
var file = try File.read(std.testing.allocator, &file_reader);
defer file.deinit();

const col0 = file.findSchemaElement(&.{"a"}).?;
try std.testing.expectEqual(1, col0.max_definition_level);
try std.testing.expectEqual(0, col0.max_repetition_level);

const col1 = file.findSchemaElement(&.{"b"}).?;
try std.testing.expectEqual(1, col1.max_definition_level);
try std.testing.expectEqual(0, col1.max_repetition_level);
}

test "findSchemaElement returns correct column_index for nested schema" {
var reader_buf: [1024]u8 = undefined;
var file_reader = (try std.Io.Dir.cwd().openFile(std.testing.io, "testdata/parquet-testing/data/nested_structs.rust.parquet", .{ .mode = .read_only })).reader(std.testing.io, &reader_buf);
var file = try File.read(std.testing.allocator, &file_reader);
defer file.deinit();

// First struct: roll_num with 6 nested fields (min, max, mean, count, sum, variance)
// These are columns 0-5
const roll_num_min = file.findSchemaElement(&.{ "roll_num", "min" }).?;
try std.testing.expectEqual(0, roll_num_min.column_index);

const roll_num_variance = file.findSchemaElement(&.{ "roll_num", "variance" }).?;
try std.testing.expectEqual(5, roll_num_variance.column_index);

// Second struct: PC_CUR with 6 nested fields
// These are columns 6-11
const pc_cur_min = file.findSchemaElement(&.{ "PC_CUR", "min" }).?;
try std.testing.expectEqual(6, pc_cur_min.column_index);
}