Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 98 additions & 1 deletion datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use crate::physical_optimizer::test_utils::{
coalesce_partitions_exec, create_test_schema, create_test_schema2,
create_test_schema3, filter_exec, global_limit_exec, hash_join_exec,
local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_sort, projection_exec,
repartition_exec, sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_options,
repartition_exec, simple_projection_exec, sort_exec, sort_exec_with_fetch,
sort_exec_with_preserve_partitioning, sort_expr, sort_expr_options,
sort_merge_join_exec, sort_preserving_merge_exec,
sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered,
union_exec,
Expand Down Expand Up @@ -458,6 +459,102 @@ async fn output_requirement_adds_merge_after_partition_preserving_sort() -> Resu
Ok(())
}

/// Regression test: when `OutputRequirementExec(SinglePartition)` wraps a plan
/// that already contains `SortPreservingMergeExec`, sort pushdown must not add
/// a second `SortPreservingMergeExec` below the existing one.
#[test]
fn test_no_extra_spm_from_output_requirement_single_partition() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs: LexOrdering = [sort_expr("nullable_col", &schema)].into();
let requirement = [PhysicalSortRequirement::new(
col("nullable_col", &schema)?,
Some(SortOptions::new(false, true)),
)]
.into();

// Plan entering pushdown_sorts:
// OutputRequirementExec (dist=SinglePartition)
// SortPreservingMergeExec [nullable_col@0]
// SortExec [nullable_col@0] (preserve_partitioning=true)
// RepartitionExec (10 partitions)
// DataSource
let source = memory_exec(&schema);
let repartitioned = repartition_exec(source);
let sorted = sort_exec_with_preserve_partitioning(sort_exprs.clone(), repartitioned);
let merged = sort_preserving_merge_exec(sort_exprs.clone(), sorted);
let plan: Arc<dyn ExecutionPlan> = Arc::new(OutputRequirementExec::new(
merged,
Some(OrderingRequirements::new(requirement)),
Distribution::SinglePartition,
None,
));

let mut sort_pushdown = SortPushDown::new_default(Arc::clone(&plan));
assign_initial_requirements(&mut sort_pushdown);
let result = pushdown_sorts(sort_pushdown)?;

// The plan is already optimal; no extra SortPreservingMergeExec should appear.
assert_snapshot!(
displayable(result.plan.as_ref()).indent(true).to_string(),
@r"
OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
SortPreservingMergeExec: [nullable_col@0 ASC]
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[true]
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: partitions=1, partition_sizes=[0]
"
);
Ok(())
}

/// Positive test: when `OutputRequirementExec` carries `SinglePartition` and
/// sort pushdown reaches a multi-partition node through a projection, it must
/// insert both `SortExec(preserve_partitioning=true)` AND
/// `SortPreservingMergeExec` — the core behaviour added by commit 45620e982.
#[test]
fn test_sort_pushdown_adds_spm_for_single_partition_requirement() -> Result<()> {
let schema = create_test_schema()?;
let requirement = [PhysicalSortRequirement::new(
col("nullable_col", &schema)?,
Some(SortOptions::new(false, true)),
)]
.into();

// Plan entering pushdown_sorts:
// OutputRequirementExec (dist=SinglePartition, order=[nullable_col@0])
// ProjectionExec (identity)
// RepartitionExec (10 partitions)
// DataSource
let source = memory_exec(&schema);
let repartitioned = repartition_exec(source);
let projected = simple_projection_exec(repartitioned, vec![0, 1]);
let plan: Arc<dyn ExecutionPlan> = Arc::new(OutputRequirementExec::new(
projected,
Some(OrderingRequirements::new(requirement)),
Distribution::SinglePartition,
None,
));

let mut sort_pushdown = SortPushDown::new_default(Arc::clone(&plan));
assign_initial_requirements(&mut sort_pushdown);
let result = pushdown_sorts(sort_pushdown)?;

// Sort is pushed through the projection; because SinglePartition is
// required, add_sort_above_with_distribution wraps it in SPM.
assert_snapshot!(
displayable(result.plan.as_ref()).indent(true).to_string(),
@r"
OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition
SortPreservingMergeExec: [nullable_col@0 ASC]
SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[true]
ProjectionExec: expr=[nullable_col@0 as nullable_col, non_nullable_col@1 as non_nullable_col]
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: partitions=1, partition_sizes=[0]
"
);
Ok(())
}

async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(
repartition_sorts: bool,
) -> Result<String> {
Expand Down
130 changes: 120 additions & 10 deletions datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::fmt::Debug;
use std::sync::Arc;

use crate::utils::{
add_sort_above, is_sort, is_sort_preserving_merge, is_union, is_window,
add_sort_above_with_distribution, is_sort, is_sort_preserving_merge, is_union,
is_window,
};

use arrow::datatypes::SchemaRef;
Expand All @@ -29,7 +30,7 @@ use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{
EquivalenceProperties, add_offset_to_physical_sort_exprs,
Distribution, EquivalenceProperties, add_offset_to_physical_sort_exprs,
};
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortExpr,
Expand All @@ -55,23 +56,46 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
/// of the parent node as its data.
///
/// [`EnforceSorting`]: crate::enforce_sorting::EnforceSorting
#[derive(Default, Clone, Debug)]
#[derive(Clone, Debug)]
pub struct ParentRequirements {
ordering_requirement: Option<OrderingRequirements>,
fetch: Option<usize>,
/// The distribution required by whatever consumer will sit above any
/// `SortExec` we materialise here. When a sort is added by `add_sort_above`
/// over a multi-partition input, we use this to decide whether the new
/// sort needs a `SortPreservingMergeExec` wrapper to produce a single
/// partition.
distribution_requirement: Distribution,
}

impl Default for ParentRequirements {
fn default() -> Self {
Self {
ordering_requirement: None,
fetch: None,
distribution_requirement: Distribution::UnspecifiedDistribution,
}
}
}

pub type SortPushDown = PlanContext<ParentRequirements>;

/// Assigns the ordering requirement of the root node to the its children.
pub fn assign_initial_requirements(sort_push_down: &mut SortPushDown) {
let reqs = sort_push_down.plan.required_input_ordering();
for (child, requirement) in sort_push_down.children.iter_mut().zip(reqs) {
let dists = sort_push_down.plan.required_input_distribution();
for (idx, (child, requirement)) in
sort_push_down.children.iter_mut().zip(reqs).enumerate()
{
child.data = ParentRequirements {
ordering_requirement: requirement,
// If the parent has a fetch value, assign it to the children
// Or use the fetch value of the child.
fetch: child.plan.fetch(),
distribution_requirement: dists
.get(idx)
.cloned()
.unwrap_or(Distribution::UnspecifiedDistribution),
};
}
}
Expand All @@ -92,11 +116,34 @@ fn min_fetch(f1: Option<usize>, f2: Option<usize>) -> Option<usize> {
}
}

/// Returns the stricter of two distribution requirements when propagating
/// `parent_distribution` down through pass-through operators.
///
/// `SinglePartition` is the strictest requirement we care about for the
/// purposes of inserting `SortPreservingMergeExec` above a partition-
/// preserving `SortExec`. If either side requests it, we keep that.
fn stronger_distribution(a: &Distribution, b: &Distribution) -> Distribution {
match (a, b) {
(Distribution::SinglePartition, _) | (_, Distribution::SinglePartition) => {
Distribution::SinglePartition
}
(Distribution::HashPartitioned(_), _) => a.clone(),
(_, Distribution::HashPartitioned(_)) => b.clone(),
_ => Distribution::UnspecifiedDistribution,
}
}

fn pushdown_sorts_helper(
mut sort_push_down: SortPushDown,
) -> Result<Transformed<SortPushDown>> {
let plan = sort_push_down.plan;
let parent_fetch = sort_push_down.data.fetch;
// The distribution required by whatever sits above any new sort we add
// here. When this node is a SortExec we are about to remove or replace,
// the new sort takes the removed sort's slot, so its consumer is the
// grandparent — i.e. the same distribution requirement that flowed into
// this call.
let parent_distribution = sort_push_down.data.distribution_requirement.clone();

let Some(parent_requirement) = sort_push_down.data.ordering_requirement.clone()
else {
Expand All @@ -116,11 +163,28 @@ fn pushdown_sorts_helper(
sort_push_down.data.fetch = fetch;
sort_push_down.data.ordering_requirement =
Some(OrderingRequirements::from(sort_ordering));
// The new context now sits where the SortExec was; preserve the
// grandparent's distribution requirement so a subsequent
// `add_sort_above` knows whether to wrap in SortPreservingMergeExec.
sort_push_down.data.distribution_requirement = parent_distribution;
// Recursive call to helper, so it doesn't transform_down and miss
// the new node (previous child of sort):
return pushdown_sorts_helper(sort_push_down);
}
sort_push_down.plan = plan;
// No ordering is being pushed down here, so only use the node's own
// distribution requirement. Do NOT propagate parent_distribution
// through partition-merging nodes (e.g. SortPreservingMergeExec):
// those nodes already satisfy SinglePartition themselves, so the
// children below them should not be forced to also produce a single
// partition.
let dists = sort_push_down.plan.required_input_distribution();
for (idx, child) in sort_push_down.children.iter_mut().enumerate() {
child.data.distribution_requirement = dists
.get(idx)
.cloned()
.unwrap_or(Distribution::UnspecifiedDistribution);
}
return Ok(Transformed::no(sort_push_down));
};

Expand Down Expand Up @@ -149,16 +213,21 @@ fn pushdown_sorts_helper(
// The sort was imposing a different ordering than the one being
// pushed down. Replace it with a sort that matches the pushed-down
// ordering, and continue the pushdown.
// Add back the sort:
sort_push_down = add_sort_above(
// Add back the sort. The new sort sits where the old one did, so
// its consumer is the grandparent and we must respect that
// distribution requirement (otherwise a multi-partition input
// produces preserve_partitioning=true with no SPM above).
sort_push_down = add_sort_above_with_distribution(
sort_push_down,
parent_requirement.into_single(),
parent_fetch,
&parent_distribution,
);
// Update pushdown requirements:
sort_push_down.children[0].data = ParentRequirements {
ordering_requirement: Some(OrderingRequirements::from(sort_ordering)),
fetch: sort_fetch,
distribution_requirement: Distribution::UnspecifiedDistribution,
};
return Ok(Transformed::yes(sort_push_down));
} else {
Expand All @@ -174,6 +243,10 @@ fn pushdown_sorts_helper(
} else {
Some(parent_requirement)
};
// The sort was removed; carry the grandparent's distribution
// requirement so any sort we materialise deeper down still
// satisfies it.
sort_push_down.data.distribution_requirement = parent_distribution;
// Recursive call to helper, so it doesn't transform_down and miss
// the new node (previous child of sort):
return pushdown_sorts_helper(sort_push_down);
Expand All @@ -184,10 +257,35 @@ fn pushdown_sorts_helper(
if satisfy_parent {
// For non-sort operators which satisfy ordering:
let reqs = sort_push_down.plan.required_input_ordering();
let dists = sort_push_down.plan.required_input_distribution();

// If this node already produces a single partition it has absorbed any
// SinglePartition requirement from the consumer above. Don't push
// that requirement down into children that live below the merge point.
let effective_parent_dist =
if sort_push_down.plan.output_partitioning().partition_count() == 1 {
Distribution::UnspecifiedDistribution
} else {
parent_distribution.clone()
};

for (child, order) in sort_push_down.children.iter_mut().zip(reqs) {
for (idx, (child, order)) in
sort_push_down.children.iter_mut().zip(reqs).enumerate()
{
child.data.ordering_requirement = order;
child.data.fetch = min_fetch(parent_fetch, child.data.fetch);
// Any sort we materialise inside this child subtree must still
// satisfy the strongest distribution requirement we've seen on
// the way down. Pass-through operators (Projection, Filter, etc.)
// don't change partitioning, so a `SinglePartition` requirement
// from a higher consumer must propagate, not get reset to this
// node's own (often `UnspecifiedDistribution`) input requirement.
child.data.distribution_requirement = stronger_distribution(
&effective_parent_dist,
dists
.get(idx)
.unwrap_or(&Distribution::UnspecifiedDistribution),
);
}
} else if let Some(adjusted) = pushdown_requirement_to_children(
&sort_push_down.plan,
Expand All @@ -197,17 +295,29 @@ fn pushdown_sorts_helper(
// For operators that can take a sort pushdown, continue with updated
// requirements:
let current_fetch = sort_push_down.plan.fetch();
for (child, order) in sort_push_down.children.iter_mut().zip(adjusted) {
let dists = sort_push_down.plan.required_input_distribution();
for (idx, (child, order)) in
sort_push_down.children.iter_mut().zip(adjusted).enumerate()
{
child.data.ordering_requirement = order;
child.data.fetch = min_fetch(current_fetch, parent_fetch);
child.data.distribution_requirement = stronger_distribution(
&parent_distribution,
dists
.get(idx)
.unwrap_or(&Distribution::UnspecifiedDistribution),
);
}
sort_push_down.data.ordering_requirement = None;
} else {
// Can not push down requirements, add new `SortExec`:
sort_push_down = add_sort_above(
// Can not push down requirements, add new `SortExec`. The new sort sits
// between this node and its parent, so its consumer's distribution
// requirement is the one carried in `parent_distribution`.
sort_push_down = add_sort_above_with_distribution(
sort_push_down,
parent_requirement.into_single(),
parent_fetch,
&parent_distribution,
);
assign_initial_requirements(&mut sort_push_down);
}
Expand Down
Loading