diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs index 5f9b7e50848fd..572ae83540892 100644 --- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs @@ -467,10 +467,7 @@ fn merges_local_limit_with_local_limit() -> Result<()> { let optimized = format_plan(&after_optimize); insta::assert_snapshot!( optimized, - @r" - GlobalLimitExec: skip=0, fetch=10 - EmptyExec - " + @"EmptyExec" ); Ok(()) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index c5fa0cc3ee78c..6164d86e5342a 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -67,10 +67,14 @@ use crate::PhysicalOptimizerRule; use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; +use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::utils::combine_limit; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; +use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; /// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from @@ -158,6 +162,25 @@ pub fn pushdown_limit_helper( global_state.preserve_order = limit_info.preserve_order; global_state.satisfied = false; + if let Some(fetch) = fetch + && limit_satisfied_by_input(&limit_info.input, skip, fetch)? + { + // The input already produces at most `fetch` rows, so no new limit + // node is needed. Mark satisfied so downstream won't re-add one, + // but preserve skip/fetch so any nested limit nodes (e.g. an inner + // GlobalLimitExec) can still be merged with the outer constraint. + global_state.satisfied = true; + + return Ok(( + Transformed { + data: limit_info.input, + transformed: true, + tnr: TreeNodeRecursion::Stop, + }, + global_state, + )); + } + // Now the global state has the most recent information, we can remove // the limit node. We will decide later if we should add it again or // not. @@ -284,6 +307,59 @@ pub fn pushdown_limit_helper( } } +/// Returns true if exact input statistics prove that applying the limit would +/// not remove any rows. +fn limit_satisfied_by_input( + plan: &Arc, + skip: usize, + fetch: usize, +) -> Result { + if skip > 0 { + return Ok(false); + } + + if plan.output_partitioning().partition_count() != 1 { + return Ok(false); + } + + let Some(num_rows) = limit_eliminable_exact_num_rows(plan)? else { + return Ok(false); + }; + + Ok(num_rows <= fetch) +} + +/// Returns exact row counts only from a conservative whitelist of operators +/// whose row-count guarantees are strong enough to remove a limit. +fn limit_eliminable_exact_num_rows( + plan: &Arc, +) -> Result> { + // Unwrap any wrapping ProjectionExec layers; projections preserve row count + // but may derive statistics in ways that are not trustworthy, so we peek + // through them to the underlying producer. + let mut current = plan; + while let Some(projection) = current.downcast_ref::() { + current = projection.input(); + } + + if current.is::() { + return Ok(Some(0)); + } + + if current.is::() { + return Ok(Some(1)); + } + + if matches!( + current.partition_statistics(None)?.num_rows, + Precision::Exact(0) + ) { + return Ok(Some(0)); + } + + Ok(None) +} + /// Pushes down the limit through the plan. pub(crate) fn pushdown_limits( pushdown_plan: Arc, diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 46d01f39a920b..a7d3bead0e8e5 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -1803,6 +1803,19 @@ physical_plan 07)│ PlaceholderRowExec │ 08)└───────────────────────────┘ +query TT +EXPLAIN select count(*) from (values ('a', 'b'), ('c', 'd')) as t (c1, c2) order by 1 limit 10 +---- +physical_plan +01)┌───────────────────────────┐ +02)│ ProjectionExec │ +03)│ -------------------- │ +04)│ count(*): 2 │ +05)└─────────────┬─────────────┘ +06)┌─────────────┴─────────────┐ +07)│ PlaceholderRowExec │ +08)└───────────────────────────┘ + # Test explain for large plans diff --git a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt index 7334054ff2c04..923a51afc8df9 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt @@ -393,7 +393,7 @@ STORED AS PARQUET; statement ok COPY ( - SELECT * FROM (VALUES (3, 6, 90), (8, 12, 110)) AS v(a, b, c) + SELECT * FROM (VALUES (1, 6, 90), (8, 12, 110)) AS v(a, b, c) ) TO 'test_files/scratch/push_down_filter_regression/agg_dyn_mixed/file_1.parquet' STORED AS PARQUET; diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 3871468411c4b..e1ede91ba9d6b 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -530,13 +530,12 @@ physical_plan 13)------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], file_type=csv, has_header=true 15)----ProjectionExec: expr=[1 as cnt] -16)------GlobalLimitExec: skip=0, fetch=3 -17)--------PlaceholderRowExec -18)----ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt] -19)------GlobalLimitExec: skip=0, fetch=3 -20)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] -21)----------ProjectionExec: expr=[1 as c1] -22)------------PlaceholderRowExec +16)------PlaceholderRowExec +17)----ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt] +18)------GlobalLimitExec: skip=0, fetch=3 +19)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] +20)----------ProjectionExec: expr=[1 as c1] +21)------------PlaceholderRowExec ########