Skip to content

Commit 4e09360

Browse files
Dandandanclaude
andcommitted
feat: reorder row groups by grouping key statistics
Extends the row group reordering infrastructure (from sort pushdown) to also reorder by GROUP BY key statistics. When an AggregateExec sits above a ParquetSource, the new ReorderByGroupKeys optimizer rule pushes grouping key expressions down so row groups with similar group key values are read together. Two levels of reordering: - Files within partitions are sorted by grouping key min statistics - Row groups within each file are reordered by grouping key statistics Benefits: - Reduces active cardinality of aggregation hash tables - Improves CPU cache locality during hash table lookups Adds try_pushdown_groupby_order() to ExecutionPlan, DataSource, and FileSource traits, with ParquetSource implementation that reuses the existing reorder_by_statistics infrastructure. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6c143f7 commit 4e09360

2 files changed

Lines changed: 49 additions & 10 deletions

File tree

datafusion/datasource/src/file_scan_config/mod.rs

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -976,14 +976,53 @@ impl DataSource for FileScanConfig {
976976
&self,
977977
group_exprs: &[Arc<dyn PhysicalExpr>],
978978
) -> Result<Option<Arc<dyn DataSource>>> {
979-
match self.file_source.try_pushdown_groupby_order(group_exprs)? {
980-
Some(new_source) => {
981-
let mut new_config = self.clone();
982-
new_config.file_source = new_source;
983-
Ok(Some(Arc::new(new_config)))
984-
}
985-
None => Ok(None),
979+
use sort_pushdown::{
980+
ordered_column_indices_from_projection,
981+
sort_files_within_groups_by_statistics,
982+
};
983+
984+
// Build a LexOrdering from the first grouping key (ASC).
985+
let first_expr = match group_exprs.first() {
986+
Some(expr) => Arc::clone(expr),
987+
None => return Ok(None),
988+
};
989+
let sort_order =
990+
match LexOrdering::new(vec![PhysicalSortExpr::new_default(first_expr)]) {
991+
Some(order) => order,
992+
None => return Ok(None),
993+
};
994+
995+
// Sort files within each partition by grouping key statistics.
996+
let projected_schema = self.projected_schema()?;
997+
let projection_indices = self
998+
.file_source
999+
.projection()
1000+
.as_ref()
1001+
.and_then(|p| ordered_column_indices_from_projection(p));
1002+
let result = sort_files_within_groups_by_statistics(
1003+
&self.file_groups,
1004+
&sort_order,
1005+
&projected_schema,
1006+
projection_indices.as_deref(),
1007+
);
1008+
1009+
// Also push down to the file source (e.g. ParquetSource) for
1010+
// intra-file row group reordering.
1011+
let new_source = self.file_source.try_pushdown_groupby_order(group_exprs)?;
1012+
1013+
// Apply if either files were reordered or the file source accepted.
1014+
if !result.any_reordered && new_source.is_none() {
1015+
return Ok(None);
1016+
}
1017+
1018+
let mut new_config = self.clone();
1019+
if result.any_reordered {
1020+
new_config.file_groups = result.file_groups;
1021+
}
1022+
if let Some(source) = new_source {
1023+
new_config.file_source = source;
9861024
}
1025+
Ok(Some(Arc::new(new_config)))
9871026
}
9881027

9891028
fn with_preserve_order(&self, preserve_order: bool) -> Option<Arc<dyn DataSource>> {

datafusion/datasource/src/file_scan_config/sort_pushdown.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ use std::sync::Arc;
4343

4444
/// Result of sorting files within groups by their min/max statistics.
4545
pub(crate) struct SortedFileGroups {
46-
file_groups: Vec<FileGroup>,
47-
any_reordered: bool,
48-
all_non_overlapping: bool,
46+
pub(crate) file_groups: Vec<FileGroup>,
47+
pub(crate) any_reordered: bool,
48+
pub(crate) all_non_overlapping: bool,
4949
}
5050

5151
impl FileScanConfig {

0 commit comments

Comments
 (0)