From 45620e982a620c77a3e8f0916b248c4835199f15 Mon Sep 17 00:00:00 2001 From: Wyatt Herkamp Date: Thu, 30 Apr 2026 14:49:30 -0400 Subject: [PATCH 1/5] Enhance sort pushdown logic to include distribution requirements for SortExec --- .../src/enforce_sorting/sort_pushdown.rs | 80 ++++++++++++++++--- 1 file changed, 70 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 267faeda0c1bb..0e0a39324ae7c 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -19,7 +19,8 @@ use std::fmt::Debug; use std::sync::Arc; use crate::utils::{ - add_sort_above, is_sort, is_sort_preserving_merge, is_union, is_window, + add_sort_above_with_distribution, is_sort, is_sort_preserving_merge, is_union, + is_window, }; use arrow::datatypes::SchemaRef; @@ -29,7 +30,7 @@ use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ - EquivalenceProperties, add_offset_to_physical_sort_exprs, + Distribution, EquivalenceProperties, add_offset_to_physical_sort_exprs, }; use datafusion_physical_expr_common::sort_expr::{ LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortExpr, @@ -55,10 +56,26 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; /// of the parent node as its data. /// /// [`EnforceSorting`]: crate::enforce_sorting::EnforceSorting -#[derive(Default, Clone, Debug)] +#[derive(Clone, Debug)] pub struct ParentRequirements { ordering_requirement: Option, fetch: Option, + /// The distribution required by whatever consumer will sit above any + /// `SortExec` we materialise here. When a sort is added by `add_sort_above` + /// over a multi-partition input, we use this to decide whether the new + /// sort needs a `SortPreservingMergeExec` wrapper to produce a single + /// partition. + distribution_requirement: Distribution, +} + +impl Default for ParentRequirements { + fn default() -> Self { + Self { + ordering_requirement: None, + fetch: None, + distribution_requirement: Distribution::UnspecifiedDistribution, + } + } } pub type SortPushDown = PlanContext; @@ -66,12 +83,19 @@ pub type SortPushDown = PlanContext; /// Assigns the ordering requirement of the root node to the its children. pub fn assign_initial_requirements(sort_push_down: &mut SortPushDown) { let reqs = sort_push_down.plan.required_input_ordering(); - for (child, requirement) in sort_push_down.children.iter_mut().zip(reqs) { + let dists = sort_push_down.plan.required_input_distribution(); + for (idx, (child, requirement)) in + sort_push_down.children.iter_mut().zip(reqs).enumerate() + { child.data = ParentRequirements { ordering_requirement: requirement, // If the parent has a fetch value, assign it to the children // Or use the fetch value of the child. fetch: child.plan.fetch(), + distribution_requirement: dists + .get(idx) + .cloned() + .unwrap_or(Distribution::UnspecifiedDistribution), }; } } @@ -97,6 +121,12 @@ fn pushdown_sorts_helper( ) -> Result> { let plan = sort_push_down.plan; let parent_fetch = sort_push_down.data.fetch; + // The distribution required by whatever sits above any new sort we add + // here. When this node is a SortExec we are about to remove or replace, + // the new sort takes the removed sort's slot, so its consumer is the + // grandparent — i.e. the same distribution requirement that flowed into + // this call. + let parent_distribution = sort_push_down.data.distribution_requirement.clone(); let Some(parent_requirement) = sort_push_down.data.ordering_requirement.clone() else { @@ -116,6 +146,10 @@ fn pushdown_sorts_helper( sort_push_down.data.fetch = fetch; sort_push_down.data.ordering_requirement = Some(OrderingRequirements::from(sort_ordering)); + // The new context now sits where the SortExec was; preserve the + // grandparent's distribution requirement so a subsequent + // `add_sort_above` knows whether to wrap in SortPreservingMergeExec. + sort_push_down.data.distribution_requirement = parent_distribution; // Recursive call to helper, so it doesn't transform_down and miss // the new node (previous child of sort): return pushdown_sorts_helper(sort_push_down); @@ -149,16 +183,21 @@ fn pushdown_sorts_helper( // The sort was imposing a different ordering than the one being // pushed down. Replace it with a sort that matches the pushed-down // ordering, and continue the pushdown. - // Add back the sort: - sort_push_down = add_sort_above( + // Add back the sort. The new sort sits where the old one did, so + // its consumer is the grandparent and we must respect that + // distribution requirement (otherwise a multi-partition input + // produces preserve_partitioning=true with no SPM above). + sort_push_down = add_sort_above_with_distribution( sort_push_down, parent_requirement.into_single(), parent_fetch, + &parent_distribution, ); // Update pushdown requirements: sort_push_down.children[0].data = ParentRequirements { ordering_requirement: Some(OrderingRequirements::from(sort_ordering)), fetch: sort_fetch, + distribution_requirement: Distribution::UnspecifiedDistribution, }; return Ok(Transformed::yes(sort_push_down)); } else { @@ -174,6 +213,10 @@ fn pushdown_sorts_helper( } else { Some(parent_requirement) }; + // The sort was removed; carry the grandparent's distribution + // requirement so any sort we materialise deeper down still + // satisfies it. + sort_push_down.data.distribution_requirement = parent_distribution; // Recursive call to helper, so it doesn't transform_down and miss // the new node (previous child of sort): return pushdown_sorts_helper(sort_push_down); @@ -184,10 +227,17 @@ fn pushdown_sorts_helper( if satisfy_parent { // For non-sort operators which satisfy ordering: let reqs = sort_push_down.plan.required_input_ordering(); + let dists = sort_push_down.plan.required_input_distribution(); - for (child, order) in sort_push_down.children.iter_mut().zip(reqs) { + for (idx, (child, order)) in + sort_push_down.children.iter_mut().zip(reqs).enumerate() + { child.data.ordering_requirement = order; child.data.fetch = min_fetch(parent_fetch, child.data.fetch); + child.data.distribution_requirement = dists + .get(idx) + .cloned() + .unwrap_or(Distribution::UnspecifiedDistribution); } } else if let Some(adjusted) = pushdown_requirement_to_children( &sort_push_down.plan, @@ -197,17 +247,27 @@ fn pushdown_sorts_helper( // For operators that can take a sort pushdown, continue with updated // requirements: let current_fetch = sort_push_down.plan.fetch(); - for (child, order) in sort_push_down.children.iter_mut().zip(adjusted) { + let dists = sort_push_down.plan.required_input_distribution(); + for (idx, (child, order)) in + sort_push_down.children.iter_mut().zip(adjusted).enumerate() + { child.data.ordering_requirement = order; child.data.fetch = min_fetch(current_fetch, parent_fetch); + child.data.distribution_requirement = dists + .get(idx) + .cloned() + .unwrap_or(Distribution::UnspecifiedDistribution); } sort_push_down.data.ordering_requirement = None; } else { - // Can not push down requirements, add new `SortExec`: - sort_push_down = add_sort_above( + // Can not push down requirements, add new `SortExec`. The new sort sits + // between this node and its parent, so its consumer's distribution + // requirement is the one carried in `parent_distribution`. + sort_push_down = add_sort_above_with_distribution( sort_push_down, parent_requirement.into_single(), parent_fetch, + &parent_distribution, ); assign_initial_requirements(&mut sort_push_down); } From 26a80cfdb2b8820455bc493c0611c591229b2010 Mon Sep 17 00:00:00 2001 From: Wyatt Herkamp Date: Thu, 30 Apr 2026 16:28:49 -0400 Subject: [PATCH 2/5] Fixes --- .../src/enforce_sorting/sort_pushdown.rs | 52 ++++++++++++++++--- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 0e0a39324ae7c..257d5863596d0 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -116,6 +116,23 @@ fn min_fetch(f1: Option, f2: Option) -> Option { } } +/// Returns the stricter of two distribution requirements when propagating +/// `parent_distribution` down through pass-through operators. +/// +/// `SinglePartition` is the strictest requirement we care about for the +/// purposes of inserting `SortPreservingMergeExec` above a partition- +/// preserving `SortExec`. If either side requests it, we keep that. +fn stronger_distribution(a: &Distribution, b: &Distribution) -> Distribution { + match (a, b) { + (Distribution::SinglePartition, _) | (_, Distribution::SinglePartition) => { + Distribution::SinglePartition + } + (Distribution::HashPartitioned(_), _) => a.clone(), + (_, Distribution::HashPartitioned(_)) => b.clone(), + _ => Distribution::UnspecifiedDistribution, + } +} + fn pushdown_sorts_helper( mut sort_push_down: SortPushDown, ) -> Result> { @@ -155,6 +172,19 @@ fn pushdown_sorts_helper( return pushdown_sorts_helper(sort_push_down); } sort_push_down.plan = plan; + // No ordering work to do at this node, but we still need to propagate + // the distribution requirement to children before transform_down + // descends. Otherwise, when we eventually reach a node where a sort + // must be added, `parent_distribution` has decayed to + // `UnspecifiedDistribution` and `add_sort_above_with_distribution` + // skips the wrapping `SortPreservingMergeExec`. + let dists = sort_push_down.plan.required_input_distribution(); + for (idx, child) in sort_push_down.children.iter_mut().enumerate() { + child.data.distribution_requirement = stronger_distribution( + &parent_distribution, + dists.get(idx).unwrap_or(&Distribution::UnspecifiedDistribution), + ); + } return Ok(Transformed::no(sort_push_down)); }; @@ -234,10 +264,16 @@ fn pushdown_sorts_helper( { child.data.ordering_requirement = order; child.data.fetch = min_fetch(parent_fetch, child.data.fetch); - child.data.distribution_requirement = dists - .get(idx) - .cloned() - .unwrap_or(Distribution::UnspecifiedDistribution); + // Any sort we materialise inside this child subtree must still + // satisfy the strongest distribution requirement we've seen on + // the way down. Pass-through operators (Projection, Filter, etc.) + // don't change partitioning, so a `SinglePartition` requirement + // from a higher consumer must propagate, not get reset to this + // node's own (often `UnspecifiedDistribution`) input requirement. + child.data.distribution_requirement = stronger_distribution( + &parent_distribution, + dists.get(idx).unwrap_or(&Distribution::UnspecifiedDistribution), + ); } } else if let Some(adjusted) = pushdown_requirement_to_children( &sort_push_down.plan, @@ -253,10 +289,10 @@ fn pushdown_sorts_helper( { child.data.ordering_requirement = order; child.data.fetch = min_fetch(current_fetch, parent_fetch); - child.data.distribution_requirement = dists - .get(idx) - .cloned() - .unwrap_or(Distribution::UnspecifiedDistribution); + child.data.distribution_requirement = stronger_distribution( + &parent_distribution, + dists.get(idx).unwrap_or(&Distribution::UnspecifiedDistribution), + ); } sort_push_down.data.ordering_requirement = None; } else { From 72c87230b086c6a2811eceaff3b1b4c1f5468cd0 Mon Sep 17 00:00:00 2001 From: Wyatt Herkamp Date: Thu, 30 Apr 2026 16:43:32 -0400 Subject: [PATCH 3/5] Formatting Fixes --- .../src/enforce_sorting/sort_pushdown.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 257d5863596d0..5861b82c8f283 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -182,7 +182,9 @@ fn pushdown_sorts_helper( for (idx, child) in sort_push_down.children.iter_mut().enumerate() { child.data.distribution_requirement = stronger_distribution( &parent_distribution, - dists.get(idx).unwrap_or(&Distribution::UnspecifiedDistribution), + dists + .get(idx) + .unwrap_or(&Distribution::UnspecifiedDistribution), ); } return Ok(Transformed::no(sort_push_down)); @@ -272,7 +274,9 @@ fn pushdown_sorts_helper( // node's own (often `UnspecifiedDistribution`) input requirement. child.data.distribution_requirement = stronger_distribution( &parent_distribution, - dists.get(idx).unwrap_or(&Distribution::UnspecifiedDistribution), + dists + .get(idx) + .unwrap_or(&Distribution::UnspecifiedDistribution), ); } } else if let Some(adjusted) = pushdown_requirement_to_children( @@ -291,7 +295,9 @@ fn pushdown_sorts_helper( child.data.fetch = min_fetch(current_fetch, parent_fetch); child.data.distribution_requirement = stronger_distribution( &parent_distribution, - dists.get(idx).unwrap_or(&Distribution::UnspecifiedDistribution), + dists + .get(idx) + .unwrap_or(&Distribution::UnspecifiedDistribution), ); } sort_push_down.data.ordering_requirement = None; From 1e4706ea2c00e3038d67c046d91be004c1b6f69b Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Thu, 30 Apr 2026 23:52:11 -0400 Subject: [PATCH 4/5] Fix tests --- .../src/enforce_sorting/sort_pushdown.rs | 34 ++++++++++++------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 5861b82c8f283..4e4e04f92daad 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -172,20 +172,18 @@ fn pushdown_sorts_helper( return pushdown_sorts_helper(sort_push_down); } sort_push_down.plan = plan; - // No ordering work to do at this node, but we still need to propagate - // the distribution requirement to children before transform_down - // descends. Otherwise, when we eventually reach a node where a sort - // must be added, `parent_distribution` has decayed to - // `UnspecifiedDistribution` and `add_sort_above_with_distribution` - // skips the wrapping `SortPreservingMergeExec`. + // No ordering is being pushed down here, so only use the node's own + // distribution requirement. Do NOT propagate parent_distribution + // through partition-merging nodes (e.g. SortPreservingMergeExec): + // those nodes already satisfy SinglePartition themselves, so the + // children below them should not be forced to also produce a single + // partition. let dists = sort_push_down.plan.required_input_distribution(); for (idx, child) in sort_push_down.children.iter_mut().enumerate() { - child.data.distribution_requirement = stronger_distribution( - &parent_distribution, - dists - .get(idx) - .unwrap_or(&Distribution::UnspecifiedDistribution), - ); + child.data.distribution_requirement = dists + .get(idx) + .cloned() + .unwrap_or(Distribution::UnspecifiedDistribution); } return Ok(Transformed::no(sort_push_down)); }; @@ -261,6 +259,16 @@ fn pushdown_sorts_helper( let reqs = sort_push_down.plan.required_input_ordering(); let dists = sort_push_down.plan.required_input_distribution(); + // If this node already produces a single partition it has absorbed any + // SinglePartition requirement from the consumer above. Don't push + // that requirement down into children that live below the merge point. + let effective_parent_dist = + if sort_push_down.plan.output_partitioning().partition_count() == 1 { + Distribution::UnspecifiedDistribution + } else { + parent_distribution.clone() + }; + for (idx, (child, order)) in sort_push_down.children.iter_mut().zip(reqs).enumerate() { @@ -273,7 +281,7 @@ fn pushdown_sorts_helper( // from a higher consumer must propagate, not get reset to this // node's own (often `UnspecifiedDistribution`) input requirement. child.data.distribution_requirement = stronger_distribution( - &parent_distribution, + &effective_parent_dist, dists .get(idx) .unwrap_or(&Distribution::UnspecifiedDistribution), From c603941600ec802f47f009694eda7e45271c2857 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 1 May 2026 00:28:27 -0400 Subject: [PATCH 5/5] Tests --- .../physical_optimizer/enforce_sorting.rs | 99 ++++++++++++++++++- 1 file changed, 98 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 0cb9bc78f14c0..344b7eb57ca6f 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -24,7 +24,8 @@ use crate::physical_optimizer::test_utils::{ coalesce_partitions_exec, create_test_schema, create_test_schema2, create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_sort, projection_exec, - repartition_exec, sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_options, + repartition_exec, simple_projection_exec, sort_exec, sort_exec_with_fetch, + sort_exec_with_preserve_partitioning, sort_expr, sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered, union_exec, @@ -458,6 +459,102 @@ async fn output_requirement_adds_merge_after_partition_preserving_sort() -> Resu Ok(()) } +/// Regression test: when `OutputRequirementExec(SinglePartition)` wraps a plan +/// that already contains `SortPreservingMergeExec`, sort pushdown must not add +/// a second `SortPreservingMergeExec` below the existing one. +#[test] +fn test_no_extra_spm_from_output_requirement_single_partition() -> Result<()> { + let schema = create_test_schema()?; + let sort_exprs: LexOrdering = [sort_expr("nullable_col", &schema)].into(); + let requirement = [PhysicalSortRequirement::new( + col("nullable_col", &schema)?, + Some(SortOptions::new(false, true)), + )] + .into(); + + // Plan entering pushdown_sorts: + // OutputRequirementExec (dist=SinglePartition) + // SortPreservingMergeExec [nullable_col@0] + // SortExec [nullable_col@0] (preserve_partitioning=true) + // RepartitionExec (10 partitions) + // DataSource + let source = memory_exec(&schema); + let repartitioned = repartition_exec(source); + let sorted = sort_exec_with_preserve_partitioning(sort_exprs.clone(), repartitioned); + let merged = sort_preserving_merge_exec(sort_exprs.clone(), sorted); + let plan: Arc = Arc::new(OutputRequirementExec::new( + merged, + Some(OrderingRequirements::new(requirement)), + Distribution::SinglePartition, + None, + )); + + let mut sort_pushdown = SortPushDown::new_default(Arc::clone(&plan)); + assign_initial_requirements(&mut sort_pushdown); + let result = pushdown_sorts(sort_pushdown)?; + + // The plan is already optimal; no extra SortPreservingMergeExec should appear. + assert_snapshot!( + displayable(result.plan.as_ref()).indent(true).to_string(), + @r" + OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition + SortPreservingMergeExec: [nullable_col@0 ASC] + SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[0] + " + ); + Ok(()) +} + +/// Positive test: when `OutputRequirementExec` carries `SinglePartition` and +/// sort pushdown reaches a multi-partition node through a projection, it must +/// insert both `SortExec(preserve_partitioning=true)` AND +/// `SortPreservingMergeExec` — the core behaviour added by commit 45620e982. +#[test] +fn test_sort_pushdown_adds_spm_for_single_partition_requirement() -> Result<()> { + let schema = create_test_schema()?; + let requirement = [PhysicalSortRequirement::new( + col("nullable_col", &schema)?, + Some(SortOptions::new(false, true)), + )] + .into(); + + // Plan entering pushdown_sorts: + // OutputRequirementExec (dist=SinglePartition, order=[nullable_col@0]) + // ProjectionExec (identity) + // RepartitionExec (10 partitions) + // DataSource + let source = memory_exec(&schema); + let repartitioned = repartition_exec(source); + let projected = simple_projection_exec(repartitioned, vec![0, 1]); + let plan: Arc = Arc::new(OutputRequirementExec::new( + projected, + Some(OrderingRequirements::new(requirement)), + Distribution::SinglePartition, + None, + )); + + let mut sort_pushdown = SortPushDown::new_default(Arc::clone(&plan)); + assign_initial_requirements(&mut sort_pushdown); + let result = pushdown_sorts(sort_pushdown)?; + + // Sort is pushed through the projection; because SinglePartition is + // required, add_sort_above_with_distribution wraps it in SPM. + assert_snapshot!( + displayable(result.plan.as_ref()).indent(true).to_string(), + @r" + OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition + SortPreservingMergeExec: [nullable_col@0 ASC] + SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[true] + ProjectionExec: expr=[nullable_col@0 as nullable_col, non_nullable_col@1 as non_nullable_col] + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[0] + " + ); + Ok(()) +} + async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl( repartition_sorts: bool, ) -> Result {