Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3268,7 +3268,7 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti
UnionExec
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet, sort_order_for_reorder=[id@0 ASC NULLS LAST]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe inexact_output_ordering ?

");
Ok(())
}
Expand All @@ -3286,7 +3286,7 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti
UnionExec
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet, sort_order_for_reorder=[id@0 ASC NULLS LAST]
");

Ok(())
Expand Down
19 changes: 16 additions & 3 deletions datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,23 @@ async fn test_fuzz_topk_filter_pushdown() {
.map(|col| orders.get(**col).unwrap())
.multi_cartesian_product()
{
// Add remaining columns as tiebreakers (ASC NULLS LAST)
// to ensure deterministic results when RG reorder changes
// the read order of rows with equal sort key values.
let tiebreakers: Vec<String> = ["id", "name", "department"]
.iter()
.filter(|c| {
!order_columns
.iter()
.take(num_order_by_columns)
.any(|oc| **oc == **c)
})
.map(|c| format!("{c} ASC NULLS LAST"))
.collect();
let all_orderings =
orderings.into_iter().chain(tiebreakers.iter()).join(", ");
let query = format!(
"SELECT * FROM test_table ORDER BY {} LIMIT {}",
orderings.into_iter().join(", "),
limit
"SELECT * FROM test_table ORDER BY {all_orderings} LIMIT {limit}",
);
queries.push(query);
}
Expand Down
70 changes: 43 additions & 27 deletions datafusion/core/tests/physical_optimizer/pushdown_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn test_sort_pushdown_basic_phase1() {
output:
Ok:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}
Expand Down Expand Up @@ -114,7 +114,7 @@ fn test_sort_with_limit_phase1() {
output:
Ok:
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}
Expand Down Expand Up @@ -145,7 +145,7 @@ fn test_sort_multiple_columns_phase1() {
output:
Ok:
- SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 ASC, b@1 DESC NULLS LAST]
"
);
}
Expand Down Expand Up @@ -180,7 +180,7 @@ fn test_prefix_match_single_column() {
output:
Ok:
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 ASC]
"
);
}
Expand Down Expand Up @@ -214,7 +214,7 @@ fn test_prefix_match_with_limit() {
output:
Ok:
- SortExec: TopK(fetch=100), expr=[a@0 DESC NULLS LAST, b@1 ASC], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST, b@1 ASC], reverse_row_groups=true
"
);
}
Expand Down Expand Up @@ -249,7 +249,7 @@ fn test_prefix_match_through_transparent_nodes() {
Ok:
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 ASC]
"
);
}
Expand Down Expand Up @@ -285,8 +285,13 @@ fn test_exact_prefix_match_same_direction() {
}

#[test]
fn test_no_prefix_match_longer_than_source() {
// Test that prefix matching does NOT work if requested is longer than source
fn test_inexact_pushdown_when_prefix_longer_than_source() {
// Source has [a DESC] ordering, request is [a ASC, b DESC] — longer
// than the source ordering so the prefix can't be matched. The
// primary sort column 'a' is in the file schema, so sort pushdown
// returns `Inexact` with `sort_order_for_reorder` set, drops the
// source's `output_ordering` (the runtime reorder invalidates it),
// and leaves the outer `SortExec` to enforce the full ordering.
let schema = schema();

// Source has [a DESC] ordering (single column)
Expand All @@ -310,7 +315,7 @@ fn test_no_prefix_match_longer_than_source() {
output:
Ok:
- SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST], file_type=parquet
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 ASC, b@1 DESC NULLS LAST]
"
);
}
Expand Down Expand Up @@ -343,7 +348,7 @@ fn test_sort_through_repartition() {
Ok:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}
Expand Down Expand Up @@ -375,7 +380,7 @@ fn test_nested_sorts() {
Ok:
- SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}
Expand Down Expand Up @@ -435,7 +440,7 @@ fn test_sort_through_coalesce_partitions() {
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- CoalescePartitionsExec
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}
Expand Down Expand Up @@ -467,7 +472,7 @@ fn test_complex_plan_with_multiple_operators() {
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- CoalescePartitionsExec
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}
Expand Down Expand Up @@ -501,14 +506,19 @@ fn test_multiple_sorts_different_columns() {
Ok:
- SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}

#[test]
fn test_no_pushdown_for_unordered_source() {
// Verify pushdown does NOT happen for sources without ordering
fn test_inexact_pushdown_for_unordered_source() {
// Source has no declared `output_ordering`, request is `[a ASC]`.
// The reversed-equivalence check can't fire (nothing to reverse),
// but 'a' is in the file schema — sort pushdown returns `Inexact`
// with `sort_order_for_reorder` set so the opener can sort row
// groups by `min(a)` at scan time. The surrounding `SortExec`
// stays in place to enforce the full ordering.
let schema = schema();
let source = parquet_exec(schema.clone()); // No output_ordering
let sort_exprs = LexOrdering::new(vec![sort_expr("a", &schema)]).unwrap();
Expand All @@ -524,14 +534,20 @@ fn test_no_pushdown_for_unordered_source() {
output:
Ok:
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 ASC]
"
);
}

#[test]
fn test_no_pushdown_for_non_reverse_sort() {
// Verify pushdown does NOT happen when sort doesn't reverse source ordering
fn test_inexact_pushdown_when_request_doesnt_match_source_ordering() {
// The requested sort column ('b') doesn't match the source's natural
// ordering ('a' ASC). Neither natural nor reversed satisfies the
// request, but 'b' is in the file schema — so sort pushdown returns
// `Inexact` with `sort_order_for_reorder` set, drops the source's
// claimed `output_ordering` (the runtime row-group reorder
// invalidates it), and keeps the surrounding `SortExec` for
// correctness.
let schema = schema();

// Source sorted by 'a' ASC
Expand All @@ -554,7 +570,7 @@ fn test_no_pushdown_for_non_reverse_sort() {
output:
Ok:
- SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[b@1 ASC]
"
);
}
Expand Down Expand Up @@ -630,7 +646,7 @@ fn test_pushdown_through_blocking_node() {
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
- AggregateExec: mode=Final, gby=[a@0 as a], aggr=[COUNT(b)], ordering_mode=Sorted
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}
Expand Down Expand Up @@ -668,7 +684,7 @@ fn test_sort_pushdown_through_simple_projection() {
Ok:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}
Expand Down Expand Up @@ -703,7 +719,7 @@ fn test_sort_pushdown_through_projection_with_alias() {
Ok:
- SortExec: expr=[id@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as id, b@1 as value]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}
Expand Down Expand Up @@ -792,7 +808,7 @@ fn test_sort_pushdown_projection_reordered_columns() {
Ok:
- SortExec: expr=[a@2 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[c@2 as c, b@1 as b, a@0 as a]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}
Expand Down Expand Up @@ -826,7 +842,7 @@ fn test_sort_pushdown_projection_with_limit() {
Ok:
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}
Expand Down Expand Up @@ -860,7 +876,7 @@ fn test_sort_pushdown_through_projection() {
Ok:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}
Expand Down Expand Up @@ -895,7 +911,7 @@ fn test_sort_pushdown_projection_subset_of_columns() {
Ok:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
"
);
}
Expand Down
Loading
Loading