-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat(parquet): row-group and row-range sampling on ParquetSource #22024
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -136,6 +136,9 @@ pub(super) struct ParquetMorselizer { | |
| pub max_predicate_cache_size: Option<usize>, | ||
| /// Whether to read row groups in reverse order | ||
| pub reverse_row_groups: bool, | ||
| /// Sampling config carried from `ParquetSource`. Applied lazily | ||
| /// inside the opener once the parquet metadata is available. | ||
| pub sampling: crate::sampling::ParquetSampling, | ||
| } | ||
|
|
||
| impl fmt::Debug for ParquetMorselizer { | ||
|
|
@@ -287,6 +290,7 @@ struct PreparedParquetOpen { | |
| max_predicate_cache_size: Option<usize>, | ||
| reverse_row_groups: bool, | ||
| preserve_order: bool, | ||
| sampling: crate::sampling::ParquetSampling, | ||
| #[cfg(feature = "parquet_encryption")] | ||
| file_decryption_properties: Option<Arc<FileDecryptionProperties>>, | ||
| } | ||
|
|
@@ -656,6 +660,7 @@ impl ParquetMorselizer { | |
| max_predicate_cache_size: self.max_predicate_cache_size, | ||
| reverse_row_groups: self.reverse_row_groups, | ||
| preserve_order: self.preserve_order, | ||
| sampling: self.sampling.clone(), | ||
| #[cfg(feature = "parquet_encryption")] | ||
| file_decryption_properties: None, | ||
| }) | ||
|
|
@@ -882,11 +887,33 @@ impl FiltersPreparedParquetOpen { | |
|
|
||
| // Determine which row groups to actually read. The idea is to skip | ||
| // as many row groups as possible based on the metadata and query | ||
| let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan( | ||
| let mut initial_plan = create_initial_plan( | ||
| &prepared.file_name, | ||
| prepared.extensions.clone(), | ||
| rg_metadata.len(), | ||
| )?); | ||
| )?; | ||
|
|
||
| // Apply optional row-group and row-range sampling now that we | ||
| // know the actual row-group count. Both calls are no-ops when | ||
| // their respective fraction is `None`. Selection is | ||
| // deterministic per `(partition_index, row_group_index, | ||
| // fraction, cluster_size)` so re-runs match. The execution | ||
| // `partition_index` is the stable per-file id we plumb in: | ||
| // it makes sampling reproducible across environments without | ||
| // depending on object-store paths, and decorrelates files | ||
| // assigned to different partitions. | ||
| prepared.sampling.apply_row_group_sampling( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current plumbing passes |
||
| &mut initial_plan, | ||
| rg_metadata.len(), | ||
| prepared.partition_index, | ||
| ); | ||
| prepared.sampling.apply_row_fraction_sampling( | ||
| &mut initial_plan, | ||
| rg_metadata, | ||
| prepared.partition_index, | ||
| ); | ||
|
|
||
| let mut row_groups = RowGroupAccessPlanFilter::new(initial_plan); | ||
|
|
||
| // If there is a range restricting what parts of the file to read | ||
| if let Some(range) = prepared.file_range.as_ref() { | ||
|
|
@@ -1676,6 +1703,7 @@ mod test { | |
| max_predicate_cache_size: Option<usize>, | ||
| reverse_row_groups: bool, | ||
| preserve_order: bool, | ||
| sampling: crate::sampling::ParquetSampling, | ||
| } | ||
|
|
||
| impl ParquetMorselizerBuilder { | ||
|
|
@@ -1702,9 +1730,16 @@ mod test { | |
| max_predicate_cache_size: None, | ||
| reverse_row_groups: false, | ||
| preserve_order: false, | ||
| sampling: crate::sampling::ParquetSampling::default(), | ||
| } | ||
| } | ||
|
|
||
| /// Set the sampling config. | ||
| fn with_sampling(mut self, sampling: crate::sampling::ParquetSampling) -> Self { | ||
| self.sampling = sampling; | ||
| self | ||
| } | ||
|
|
||
| /// Set the object store (required for building). | ||
| fn with_store(mut self, store: Arc<dyn ObjectStore>) -> Self { | ||
| self.store = Some(store); | ||
|
|
@@ -1816,6 +1851,7 @@ mod test { | |
| encryption_factory: None, | ||
| max_predicate_cache_size: self.max_predicate_cache_size, | ||
| reverse_row_groups: self.reverse_row_groups, | ||
| sampling: self.sampling, | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -2720,4 +2756,112 @@ mod test { | |
| "without page index all rows are returned" | ||
| ); | ||
| } | ||
|
|
||
| /// End-to-end: a parquet file with 4 row groups, scanned with | ||
| /// `row_group_fraction = 0.5`, should return rows from exactly 2 | ||
| /// of the 4 row groups. | ||
| #[tokio::test] | ||
| async fn row_group_sampling_end_to_end() { | ||
| let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>; | ||
|
|
||
| // 4 row groups of 3 rows each = 12 rows total. | ||
| let batches = (0..4) | ||
| .map(|g| { | ||
| record_batch!(( | ||
| "a", | ||
| Int32, | ||
| vec![Some(g * 10 + 1), Some(g * 10 + 2), Some(g * 10 + 3),] | ||
| )) | ||
| .unwrap() | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
| let schema = batches[0].schema(); | ||
| let props = WriterProperties::builder() | ||
| .set_max_row_group_row_count(Some(3)) | ||
| .build(); | ||
|
|
||
| let data_len = write_parquet_batches( | ||
| Arc::clone(&store), | ||
| "rg_sampled.parquet", | ||
| batches, | ||
| Some(props), | ||
| ) | ||
| .await; | ||
|
|
||
| let file = PartitionedFile::new( | ||
| "rg_sampled.parquet".to_string(), | ||
| u64::try_from(data_len).unwrap(), | ||
| ); | ||
|
|
||
| let sampling = crate::sampling::ParquetSampling { | ||
| row_group_fraction: Some(0.5), | ||
| ..Default::default() | ||
| }; | ||
|
|
||
| let opener = ParquetMorselizerBuilder::new() | ||
| .with_store(Arc::clone(&store)) | ||
| .with_schema(Arc::clone(&schema)) | ||
| .with_projection_indices(&[0]) | ||
| .with_sampling(sampling) | ||
| .build(); | ||
|
|
||
| let stream = open_file(&opener, file).await.unwrap(); | ||
| let (_num_batches, num_rows) = count_batches_and_rows(stream).await; | ||
|
|
||
| // ceil(4 * 0.5) = 2 row groups kept, each with 3 rows. | ||
| assert_eq!( | ||
| num_rows, 6, | ||
| "row_group_fraction=0.5 over 4 row groups should yield 2 row groups × 3 rows" | ||
| ); | ||
| } | ||
|
|
||
| /// End-to-end: a single row group of 100 rows scanned with | ||
| /// `row_fraction = 0.1` and the default cluster size should yield | ||
| /// roughly 10 rows. The exact count depends on `ceil(100 * 0.1) = | ||
| /// 10` plus how the windows pack — we assert the count is in the | ||
| /// expected range and significantly less than 100. | ||
| #[tokio::test] | ||
| async fn row_fraction_end_to_end() { | ||
| let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>; | ||
|
|
||
| // One row group of 100 rows so we exercise the per-row-group | ||
| // RowSelection, not the row-group-level skip. | ||
| let values: Vec<Option<i32>> = (0..100).map(Some).collect(); | ||
| let batch = record_batch!(("a", Int32, values)).unwrap(); | ||
| let schema = batch.schema(); | ||
| let data_len = | ||
| write_parquet(Arc::clone(&store), "rf.parquet", batch.clone()).await; | ||
| let file = PartitionedFile::new( | ||
| "rf.parquet".to_string(), | ||
| u64::try_from(data_len).unwrap(), | ||
| ); | ||
|
|
||
| let sampling = crate::sampling::ParquetSampling { | ||
| row_fraction: Some(0.1), | ||
| row_cluster_size: 4, // small cluster -> several windows | ||
| ..Default::default() | ||
| }; | ||
|
|
||
| let opener = ParquetMorselizerBuilder::new() | ||
| .with_store(Arc::clone(&store)) | ||
| .with_schema(Arc::clone(&schema)) | ||
| .with_projection_indices(&[0]) | ||
| .with_sampling(sampling) | ||
| .build(); | ||
|
|
||
| let stream = open_file(&opener, file).await.unwrap(); | ||
| let (_num_batches, num_rows) = count_batches_and_rows(stream).await; | ||
|
|
||
| // We asked for ~10% of 100 rows. ceil(10 / cluster=4) = 3 | ||
| // windows of ceil(10/3)=4 rows each, capped at the total -> | ||
| // up to 12 rows in practice. Assert the bounds. | ||
| assert!( | ||
| (1..100).contains(&num_rows), | ||
| "row_fraction=0.1 should drop the vast majority of rows; got {num_rows}" | ||
| ); | ||
| assert!( | ||
| num_rows <= 16, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will it be flaky? |
||
| "row_fraction=0.1 should yield ~10-12 rows; got {num_rows}" | ||
| ); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use the normal
ParquetSamplingsyntax here rather than the fully qualifiedcrate::sampling::ParquetSampling