Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,7 @@ fn merges_local_limit_with_local_limit() -> Result<()> {
let optimized = format_plan(&after_optimize);
insta::assert_snapshot!(
optimized,
@r"
GlobalLimitExec: skip=0, fetch=10
EmptyExec
"
@"EmptyExec"
);

Ok(())
Expand Down
76 changes: 76 additions & 0 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ use crate::PhysicalOptimizerRule;

use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
use datafusion_common::utils::combine_limit;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
/// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from
Expand Down Expand Up @@ -158,6 +162,25 @@ pub fn pushdown_limit_helper(
global_state.preserve_order = limit_info.preserve_order;
global_state.satisfied = false;

if let Some(fetch) = fetch
&& limit_satisfied_by_input(&limit_info.input, skip, fetch)?
{
// The input already produces at most `fetch` rows, so no new limit
// node is needed. Mark satisfied so downstream won't re-add one,
// but preserve skip/fetch so any nested limit nodes (e.g. an inner
// GlobalLimitExec) can still be merged with the outer constraint.
global_state.satisfied = true;

return Ok((
Transformed {
data: limit_info.input,
transformed: true,
tnr: TreeNodeRecursion::Stop,
},
global_state,
));
}

// Now the global state has the most recent information, we can remove
// the limit node. We will decide later if we should add it again or
// not.
Expand Down Expand Up @@ -284,6 +307,59 @@ pub fn pushdown_limit_helper(
}
}

/// Returns true if exact input statistics prove that applying the limit would
/// not remove any rows.
fn limit_satisfied_by_input(
plan: &Arc<dyn ExecutionPlan>,
skip: usize,
fetch: usize,
) -> Result<bool> {
if skip > 0 {
return Ok(false);
}

if plan.output_partitioning().partition_count() != 1 {
return Ok(false);
}

let Some(num_rows) = limit_eliminable_exact_num_rows(plan)? else {
return Ok(false);
};

Ok(num_rows <= fetch)
}

/// Returns exact row counts only from a conservative whitelist of operators
/// whose row-count guarantees are strong enough to remove a limit.
fn limit_eliminable_exact_num_rows(
plan: &Arc<dyn ExecutionPlan>,
) -> Result<Option<usize>> {
// Unwrap any wrapping ProjectionExec layers; projections preserve row count
// but may derive statistics in ways that are not trustworthy, so we peek
// through them to the underlying producer.
let mut current = plan;
while let Some(projection) = current.downcast_ref::<ProjectionExec>() {
current = projection.input();
}

if current.is::<EmptyExec>() {
return Ok(Some(0));
}

if current.is::<PlaceholderRowExec>() {
return Ok(Some(1));
}

if matches!(
current.partition_statistics(None)?.num_rows,
Precision::Exact(0)
) {
return Ok(Some(0));
}

Ok(None)
}

/// Pushes down the limit through the plan.
pub(crate) fn pushdown_limits(
pushdown_plan: Arc<dyn ExecutionPlan>,
Expand Down
13 changes: 13 additions & 0 deletions datafusion/sqllogictest/test_files/explain_tree.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,19 @@ physical_plan
07)│ PlaceholderRowExec │
08)└───────────────────────────┘

query TT
EXPLAIN select count(*) from (values ('a', 'b'), ('c', 'd')) as t (c1, c2) order by 1 limit 10
----
physical_plan
01)┌───────────────────────────┐
02)│ ProjectionExec │
03)│ -------------------- │
04)│ count(*): 2 │
05)└─────────────┬─────────────┘
06)┌─────────────┴─────────────┐
07)│ PlaceholderRowExec │
08)└───────────────────────────┘


# Test explain for large plans

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ STORED AS PARQUET;

statement ok
COPY (
SELECT * FROM (VALUES (3, 6, 90), (8, 12, 110)) AS v(a, b, c)
SELECT * FROM (VALUES (1, 6, 90), (8, 12, 110)) AS v(a, b, c)
) TO 'test_files/scratch/push_down_filter_regression/agg_dyn_mixed/file_1.parquet'
STORED AS PARQUET;

Expand Down
13 changes: 6 additions & 7 deletions datafusion/sqllogictest/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -530,13 +530,12 @@ physical_plan
13)------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], file_type=csv, has_header=true
15)----ProjectionExec: expr=[1 as cnt]
16)------GlobalLimitExec: skip=0, fetch=3
17)--------PlaceholderRowExec
18)----ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
19)------GlobalLimitExec: skip=0, fetch=3
20)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
21)----------ProjectionExec: expr=[1 as c1]
22)------------PlaceholderRowExec
16)------PlaceholderRowExec
17)----ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
18)------GlobalLimitExec: skip=0, fetch=3
19)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
20)----------ProjectionExec: expr=[1 as c1]
21)------------PlaceholderRowExec


########
Expand Down
Loading