feat: globally reorder files and row groups by statistics for TopK queries#21956
feat: globally reorder files and row groups by statistics for TopK queries#21956zhuqi-lucas wants to merge 6 commits into
Conversation
864a3d3 to
6e56cae
Compare
There was a problem hiding this comment.
Pull request overview
This PR improves TopK (ORDER BY ... LIMIT K) performance for Parquet scans by using statistics to reorder work (files and row groups) so the dynamic filter threshold converges faster and more data can be pruned during execution.
Changes:
- Propagate TopK sort metadata (
sort_options,fetch) viaDynamicFilterPhysicalExprand fixSortExec::with_fetchordering so the dynamic filter sees the correct K. - Add file-level reordering hook (
FileSource::reorder_files) and implement statistics-based file reordering for Parquet. - Add row-group access plan optimization plumbing and statistics-based row-group reordering (composable with reverse scanning), plus SLT coverage.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/sqllogictest/test_files/sort_pushdown.slt | Adds SLT coverage for row-group/file reordering behavior across multiple TopK scenarios |
| datafusion/physical-plan/src/sorts/sort.rs | Passes sort options + fetch into dynamic filter and adjusts fetch/filter initialization order |
| datafusion/physical-expr/src/expressions/dynamic_filters.rs | Extends DynamicFilterPhysicalExpr with optional sort metadata and fetch limit |
| datafusion/datasource/src/file_stream/work_source.rs | Calls FileSource::reorder_files before queueing shared work |
| datafusion/datasource/src/file.rs | Introduces FileSource::reorder_files default extension point |
| datafusion/datasource-parquet/src/source.rs | Implements Parquet file reordering using column statistics; wires fallback sort info |
| datafusion/datasource-parquet/src/opener.rs | Applies row-group access plan optimizers (reorder + reverse) based on TopK metadata |
| datafusion/datasource-parquet/src/mod.rs | Registers new access plan optimizer module |
| datafusion/datasource-parquet/src/access_plan_optimizer.rs | Adds AccessPlanOptimizer trait plus ReverseRowGroups / ReorderByStatistics implementations |
| datafusion/datasource-parquet/src/access_plan.rs | Adds PreparedAccessPlan::reorder_by_statistics (min-stat based RG ordering) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
6e56cae to
f0f4058
Compare
587297d to
235c4e1
Compare
|
run benchmark sort_pushdown_inexact |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/rg-reorder-by-statistics (235c4e1) to 0144570 (merge-base) diff using: sort_pushdown_inexact File an issue against this benchmark runner |
235c4e1 to
0c9c3d8
Compare
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_inexact — base (merge-base)
sort_pushdown_inexact — branch
File an issue against this benchmark runner |
0c9c3d8 to
f7d9156
Compare
|
The benchmark results are expected — RG reorder alone doesn't skip any row groups, it only changes the read order so that TopK's dynamic filter threshold converges faster. The significant speedup (2-3x on
Without reorder, cumulative prune might truncate the wrong RGs. Reorder ensures the best RGs come first, making truncation safe and effective. |
|
Sorry @zhuqi-lucas - I am really struggling these days tor review all these large PRs. Last year it was very rare to see a more than 1000 line PR and now we get multiple such PRs a day. I will try and find the time to review this more carefuly |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/rg-reorder-by-statistics (f7d9156) to 0144570 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Thanks @alamb, no rush at all, I really appreciate you taking the time. |
|
the benchmark results look mixed -- I'll see if that reproduces on a second run |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/rg-reorder-by-statistics (f7d9156) to 0144570 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Thanks @alamb , i update the PR to skip reorder when overlap is heavy, and let me trigger again to see if it's the reason. |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/rg-reorder-by-statistics (c8fd321) to 0c38ebb (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
c8fd321 to
ab09242
Compare
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/rg-reorder-by-statistics (4fac37b) to 3b634aa (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/rg-reorder-by-statistics (6987aa2) to 3b634aa (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
The benchmark is ok now. |
adriangb
left a comment
There was a problem hiding this comment.
Main concerns:
- I don't think we should be (IMO) abusing dynamic filters to pass sort information.
- I don't think the traits introduced are helpful.
Can you explain why we need to use dynamic filters like this?
| match arrow::compute::sort_to_indices(&stat_mins, Some(sort_options), None) { | ||
| Ok(indices) => indices, | ||
| Err(e) => { | ||
| debug!("Skipping RG reorder: sort failed: {e}"); |
There was a problem hiding this comment.
Is this expected to happen normally? If not maybe this should be an info or a warn. Maybe we even add a debug assertion so it fails in our CI?
| let stat_mins = match converter.row_group_mins(rg_metadata.iter().copied()) { | ||
| Ok(vals) => vals, | ||
| Err(e) => { | ||
| debug!("Skipping RG reorder: cannot get min values: {e}"); |
There was a problem hiding this comment.
Same note as https://github.com/apache/datafusion/pull/21956/changes#r3244789303.
If this is expected a docstring explaining why would be great. If unexpected... then we should at least error in CI.
| file_metadata: &ParquetMetaData, | ||
| _arrow_schema: &Schema, | ||
| ) -> Result<PreparedAccessPlan> { | ||
| plan.reverse(file_metadata) |
There was a problem hiding this comment.
I like the idea but is it really worth having a trait just to hide 1 line behind it?
| // 1. reorder_by_statistics: sort RGs by min values (ASC) to align | ||
| // with the file's declared output ordering. This fixes out-of-order |
There was a problem hiding this comment.
This comment doesn't make sense to me. A file either declares an output ordering or not. If it does the row groups should already be sorted by it. Should this say the query's desired output ordering?
| } else if let Some(predicate) = &prepared.predicate | ||
| && let Some(df) = find_dynamic_filter(predicate) | ||
| && let Some(sort_options) = df.sort_options() | ||
| && !sort_options.is_empty() | ||
| { | ||
| // Build a sort order from DynamicFilter for non-sort-pushdown TopK. | ||
| // Quick bail: check if the sort column exists in file schema. |
There was a problem hiding this comment.
Why is this needed? It seems kinda hacky / a side channel when we already have sort pushdown APIs. What is "non-sort-pushdown TopK"?
| let reverse_optimizer: Option< | ||
| Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>, | ||
| > = if is_descending { | ||
| Some(Box::new(crate::access_plan_optimizer::ReverseRowGroups)) | ||
| } else { | ||
| None | ||
| }; |
There was a problem hiding this comment.
I'm not convinced by this abstraction. The implementations are trivial, it's more code overall and it also obscures the code execution.
| &self, | ||
| plan: PreparedAccessPlan, | ||
| file_metadata: &ParquetMetaData, | ||
| _arrow_schema: &Schema, |
There was a problem hiding this comment.
It's also smelly that we have 2 implementations and the arguments / signatures already don't match.
| } | ||
|
|
||
| /// Find a `DynamicFilterPhysicalExpr` in the expression tree. | ||
| fn find_dynamic_filter( |
There was a problem hiding this comment.
Is it safe to just blindly recurse like this? What if the expression is something like NOT (dynamic_filter) or whatever?
| /// | ||
| /// Sort options indicate the sort direction for each child expression, | ||
| /// enabling downstream consumers (e.g., parquet readers) to reorder | ||
| /// row groups by statistics for TopK queries. |
There was a problem hiding this comment.
Again not a fan of this way of passing information around.
|
I wonder if this belongs on Concretely: in A few reasons I think this factoring is cleaner:
Happy to be wrong here — is there a case I'm missing where the sort metadata can reach the parquet source through the dynamic filter but not through |
|
Thanks @adriangb, you're right. Routing through Can't think of a case where the sort metadata reaches parquet through the dynamic filter but not through Let me refactor along the lines you sketched. |
When a parquet file has multiple row groups with out-of-order or overlapping statistics, TopK queries benefit from reading "best" row groups first so the dynamic filter threshold tightens quickly. This PR adds: 1. `reorder_by_statistics`: sorts row groups by min values (ASC) based on parquet column statistics. Direction (DESC) is handled by the existing `reverse()` applied after reorder. The two steps compose: - Sorted data: reorder is a no-op, reverse gives perfect DESC order - Unsorted data: reorder fixes the order, reverse flips for DESC 2. `AccessPlanOptimizer` trait: extensible interface for row group access plan optimizations (reorder, reverse) applied after pruning. 3. `DynamicFilterPhysicalExpr.sort_options/fetch`: SortExec now passes sort direction and fetch limit to the dynamic filter, enabling the parquet reader to determine reorder direction for any TopK query. 4. `FileSource::reorder_files`: file-level reordering in the shared work queue so multi-file TopK reads the most promising files first. 5. Fix `SortExec::with_fetch` ordering: fetch must be set before `create_filter()` so the DynamicFilter gets the correct K value.
Adds an overlap guard to `PreparedAccessPlan::reorder_by_statistics`: when sorted-by-min adjacent row-group `[min, max]` ranges overlap above 50%, reordering cannot enable RG-level pruning (every "later" RG still has values that could appear in TopK results) so the reorder cost — CPU sort, lost IO sequential locality, and parallel scheduling pessimization across workers all pulling "best" RGs first — dominates, producing a net regression. This addresses the ClickBench `hits_partitioned` regressions (Q24, Q25, Q26: 1.11x-1.44x slower) where `EventTime` and `SearchPhrase` are uniformly distributed across row groups via the user-id hash partitioning, so RG min/max ranges fully overlap and reorder cannot help. Sorted / non-overlapping data continues to take the reorder path — verified by `sort_pushdown.slt` Test H still passing. The overlap helper treats null mins/maxes (RGs without statistics) conservatively as overlaps, so missing stats discourage rather than silently disable the guard. Adds 7 unit tests covering: fully disjoint, disjoint after reorder, fully overlapping, partial overlap, null max in previous, null min in next, and the n<2 edge case.
Upstream `proto: serialize and dedupe dynamic filters v2 (apache#21807)` added a new `DynamicFilterPhysicalExpr::from_parts` constructor that builds the struct via a `Self { ... }` literal. After this PR's rebase brings that commit in, the new initializer is missing the `sort_options` and `fetch` fields this PR adds, breaking the build (E0063). Default both to `None` in `from_parts` — the proto wire format does not yet carry these fields, so reconstruction cannot recover them. Callers that need sort metadata still go through `new_with_sort_options`. Proto round-trip tests (162 in `datafusion-proto`) continue to pass.
Mirrors the row-group-level overlap guard at the file level. When adjacent file `[min, max]` ranges (in sorted-by-min order) overlap above 50%, file-level reorder cannot enable file pruning and the reorder cost (CPU sort + lost IO sequential locality + parallel work-stealing pessimization) dominates. This addresses the remaining ClickBench `hits_partitioned` regressions (Q24, Q25, Q26 still 1.10x-1.31x slower after the RG-level guard landed). `hits_partitioned` is partitioned by user-id hash, so each file covers the full range of `EventTime` and `SearchPhrase` — file ranges fully overlap and reorder cannot help. Files lacking statistics are conservatively counted as overlaps so missing stats discourage rather than silently disable the guard. Adds 6 unit tests (disjoint sorted, disjoint after reorder, fully overlapping, partial below threshold, missing stats, n<2).
Per @adriangb's review on PR apache#21956, the sort-direction + fetch metadata that drives parquet row-group reorder belongs on the sort-pushdown channel that already exists, not bolted onto `DynamicFilterPhysicalExpr` whose job is runtime threshold pruning. Concretely: - `ParquetSource::try_pushdown_sort` grows a third branch. After the existing Exact and reverse-Inexact checks fall through, if the requested sort column is a plain `Column` present in the file schema, the source now returns `Inexact` with `sort_order_for_reorder` set (and `reverse_row_groups` for DESC), instead of `Unsupported`. The reorder benefit now applies to any `ORDER BY` on a sorted source, not just TopK. - `parquet/opener.rs` drops the `find_dynamic_filter` / `find_column_in_expr` walk under AND/wrappers and the `df.sort_options()` extraction. Both reorder direction and reverse-pass activation come from the source config alone. `is_descending` collapses to just `prepared.reverse_row_groups`. - `ParquetSource::extract_topk_sort_info` (used by `reorder_files` in the shared work queue) reads `sort_order_for_reorder` directly, no dynamic-filter fallback. - `DynamicFilterPhysicalExpr` loses its `sort_options` field, `fetch` field, `new_with_sort_options` constructor, and both getters. The proto round-trip carve-out (`from_parts` defaulting these to None) goes away. - `SortExec::create_filter` reverts to `DynamicFilterPhysicalExpr::new`. `SortExec::with_fetch` reverts to building the filter from `self.create_filter()` before assigning `new_sort.fetch = fetch` — the ordering coupling introduced earlier is no longer needed because `create_filter` no longer reads `fetch`. Tests: - Two existing pushdown_sort tests (`test_no_pushdown_for_non_reverse_sort`, `test_no_prefix_match_longer_than_source`) renamed and re-snapshotted to reflect the new `Inexact` outcome — pushdown now succeeds even when neither natural nor reversed ordering matches, as long as the sort column is in the file schema. The surrounding `SortExec` still stays in place; only the source picks up the reorder hint. - Four new unit tests in `source.rs` for the new branch: - column in schema + ASC -> Inexact, sort_order_for_reorder set, reverse_row_groups not set - column in schema + DESC -> Inexact, both set - non-Column sort expression (e.g. `a + 1`) -> Unsupported - column not in file schema -> Unsupported Test results: datasource-parquet (116 + 4 passed), physical-plan sorts (60 passed), core_integration physical_optimizer (454 passed). clippy --all-targets -D warnings clean. cargo fmt clean.
6987aa2 to
45d775a
Compare
…ORDER BY ... DESC LIMIT` shapes After routing the stats-based RG reorder through `try_pushdown_sort` (commit before this), the new `Inexact` branch trips whenever the requested sort column is in the file schema — not only when the dynamic filter path could derive sort direction. As a result, more `ORDER BY col DESC LIMIT N` queries on parquet sources without a declared natural ordering now emit `reverse_row_groups=true` on the `DataSourceExec` line. This is the behavioural change Adrian's review intended: the optimisation applies to any `ORDER BY` shape that has a column-in-schema basis for the reorder, not just TopK queries with a dynamic filter. The plan results are unchanged; only the `reverse_row_groups` decorator appears in more EXPLAIN outputs. Regenerated with `cargo test --test sqllogictests -- ... --complete`, then verified the suites pass without --complete.
Which issue does this PR close?
Rationale for this change
TopK queries (
ORDER BY col LIMIT K) on parquet files with multiple out-of-order row groups are suboptimal — the dynamic filter threshold converges slowly because row groups are read in arbitrary order. By reordering row groups so the "best" ones (containing optimal values) are read first, the threshold tightens quickly and subsequent row groups are pruned at runtime.What changes are included in this PR?
Row group reorder by statistics:
PreparedAccessPlan::reorder_by_statistics(): sorts row groups by min values (ASC) using parquet column statistics. Direction (DESC) is handled by existingreverse()applied after reorder. The two compose correctly for both sorted and unsorted data.AccessPlanOptimizertrait: extensible interface for row group access plan optimizations applied after pruning.DynamicFilter sort metadata:
DynamicFilterPhysicalExprgainssort_optionsandfetchfields, set bySortExec::create_filter(). This lets the parquet reader determine reorder direction for any TopK query (not just sort-pushdown path).SortExec::with_fetchnow sets fetch before callingcreate_filter()so the DynamicFilter gets the correct K value.File-level reorder in shared work queue:
FileSource::reorder_files()trait method + parquet implementation: reorders files in the shared work queue by statistics so multi-file TopK reads the most promising files first across all partitions.Are these changes tested?
Are there any user-facing changes?
No API changes. TopK queries on parquet with multiple row groups will automatically benefit from better row group ordering. This is a performance optimization only — query results are unchanged.