Skip to content

Add EnsureRequirements: merged EnforceDistribution + EnforceSorting with idempotent pushdown_sorts#21976

Open
zhuqi-lucas wants to merge 9 commits into
apache:mainfrom
zhuqi-lucas:ensure-requirements
Open

Add EnsureRequirements: merged EnforceDistribution + EnforceSorting with idempotent pushdown_sorts#21976
zhuqi-lucas wants to merge 9 commits into
apache:mainfrom
zhuqi-lucas:ensure-requirements

Conversation

@zhuqi-lucas
Copy link
Copy Markdown
Contributor

@zhuqi-lucas zhuqi-lucas commented May 1, 2026

Summary

Replace the separate EnforceDistribution and EnforceSorting optimizer rules with a single EnsureRequirements rule in the default optimizer chain. Fix pushdown_sorts to be distribution-aware and fix the SortPreservingMergeExec / CoalescePartitionsExec fetch preservation issue from #14150, making the composition idempotent.

Epic: #21973

Closes: #14150

Problem

EnforceDistribution and EnforceSorting run as separate rules, but sorting and distribution are coupled through SortExec.preserve_partitioning. This caused:

  1. SanityCheckPlan validation failures on multi-partition sort + limitpushdown_sorts set preserve_partitioning=true on multi-partition input without inserting SortPreservingMergeExec, violating the SinglePartition requirement coming from GlobalLimitExec.
  2. Non-idempotent composition — running the rules multiple times produced different (sometimes invalid) plans.
  3. Lost fetch values (Bug: applying multiple times EnforceDistribution generates invalid plan #14150)EnforceDistribution dropped fetch from SortPreservingMergeExec / CoalescePartitionsExec when stripping and re-adding distribution operators.

DataFusion was the only major query engine with separate rules — Spark (EnsureRequirements) and Presto/Trino (AddExchanges) handle both in a single rule.

Changes

1. EnsureRequirements rule (new, replaces EnforceDistribution + EnforceSorting in the default chain)

  • Single PhysicalOptimizerRule that calls the distribution + sorting helpers in one coordinated bottom-up sequence.
  • Registered in place of Arc::new(EnforceDistribution) + Arc::new(EnforceSorting) in the default optimizer chain.
  • Comprehensive inline tests covering known bug topologies + idempotency verification.

2. Distribution-aware pushdown_sorts (sort_pushdown.rs)

  • Add distribution_requirement: Distribution field to ParentRequirements.
  • New add_sort_above_with_distribution() in utils.rs — inserts SortPreservingMergeExec when the parent requires SinglePartition and the input has multiple partitions.
  • Switch both add_sort_above call sites to the distribution-aware variant.
  • Propagate distribution through recursion with a stronger_distribution() helper.
  • Reset distribution below partition-merging nodes (SPM, single-partition outputs).

3. Fix fetch preservation in distribution enforcement (#14150)

  • remove_dist_changing_operators() now saves fetch from removed SPM / Coalesce nodes.
  • add_merge_on_top() re-applies the saved fetch to re-created operators.

4. Retire the old rule entry points; retarget existing tests

After review feedback from @alamb (#21976 (comment)), the rule structs and their impl PhysicalOptimizerRule blocks have been deleted from enforce_distribution.rs and enforce_sorting/mod.rs. The internal helpers (ensure_distribution, ensure_sorting, the contexts, parallelize_sorts, replace_with_order_preserving_variants, sort_pushdown, …) stay in place — EnsureRequirements calls them directly.

The existing integration tests in core/tests/physical_optimizer/ now exercise EnsureRequirements instead of the deleted rules:

  • enforce_distribution.rsRun::Distribution / Run::Sorting branches both call EnsureRequirements::new(). Legacy run sequences (DISTRIB_DISTRIB_SORT, SORT_DISTRIB_DISTRIB) are preserved verbatim; idempotency makes the previously-different orderings converge to the same plan.
  • enforce_sorting.rsEnforceSortingTest drives EnsureRequirements::new() and pins target_partitions = 10 so snapshots are deterministic across machines. The historical [Dist, Sort] vs [Sort, Dist, Sort] comparison is rewritten as "running EnsureRequirements N times == running it once".
  • enforce_sorting_monotonicity.rs / replace_with_order_preserving_variants.rs — driven through the same test framework; only snapshots updated.

A previously-separate ensure_requirements/new_tests.rs (added in an earlier iteration of this PR) is removed; the same coverage lives in the inline tests in ensure_requirements/mod.rs.

5. Updated SLT

  • explain.slt: EnforceDistribution + EnforceSorting collapse to EnsureRequirements in EXPLAIN VERBOSE output.

Snapshot drift

~78 snapshots in the retargeted tests refreshed. The consistent pattern is SortExec + CoalescePartitionsExec (blocking) → SortPreservingMergeExec (streaming), because EnsureRequirements now runs parallelize_sorts + replace_with_order_preserving_variants on plan shapes that the single-rule path used to miss. These are improvements, not regressions — but worth a careful look in review since they are visible in the diff.

Testing

Suite Result
datafusion-physical-optimizer (lib, inline tests) 59 passed
core_integration physical_optimizer:: 454 passed
cargo clippy --all-targets -- -D warnings clean
cargo fmt --all --check clean

Idempotency / regression coverage in the inline tests

Scenario Covered
Multi-partition sort + limit (1-64 partitions) yes
Union with mixed partition counts yes
Projection over multi-partition yes
HashJoin (Partitioned) yes
SortMergeJoin yes
Window function partitioning + ordering yes
Aggregate (Partial + FinalPartitioned) yes
Nested sort + limit yes
Hash repartition + sort yes
CoalescePartitions + sort (parallelize_sorts) yes
SPM → Sort → multi-partition yes
OutputRequirementExec + SinglePartition over multi-partition source yes
ProjectionExec + multi-partition + SinglePartition requirement yes
#14150 fetch preservation across passes yes
Triple optimization convergence yes
10× consecutive optimization stability yes

Architecture

EnsureRequirements::optimize(plan)
  Phase 1: join key reordering (top-down) — adjust_input_keys_ordering
           or reorder_join_keys_to_inputs depending on config.
  Phase 2: distribution enforcement (bottom-up) — ensure_distribution
           Fetch is preserved across SPM/Coalesce strip/re-add (#14150 fix).
  Phase 3: sort enforcement (bottom-up) — ensure_sorting
  Phase 4: parallelize_sorts (bottom-up, when repartition_sorts is on)
  Phase 5: replace_with_order_preserving_variants (bottom-up)
  Phase 6: pushdown_sorts (top-down, distribution-aware)
  Phase 7: replace_with_partial_sort (bottom-up)

Idempotent because:

  • pushdown_sorts now carries distribution_requirement and uses add_sort_above_with_distribution, so the second pass never re-violates an earlier-established SinglePartition requirement.
  • Distribution enforcement preserves fetch across strip/re-add cycles.
  • Running EnsureRequirements repeatedly converges (verified across the partition-count sweep, hash-join, sort-merge join, window, projection, and Bug: applying multiple times EnforceDistribution generates invalid plan #14150 regression tests).

Next steps (future PRs)

  • Gradually fold pushdown_sorts work into the bottom-up ensure_sorting pass.
  • Eliminate the separate top-down pushdown_sorts traversal.
  • Single-pass architecture (one transform_up for both distribution + sorting, like Spark's EnsureRequirements).

@github-actions github-actions Bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) and removed sqllogictest SQL Logic Tests (.slt) labels May 1, 2026
@zhuqi-lucas zhuqi-lucas force-pushed the ensure-requirements branch 5 times, most recently from dfb1043 to ba7e30e Compare May 2, 2026 07:41
@github-actions github-actions Bot added the core Core DataFusion crate label May 2, 2026
@zhuqi-lucas zhuqi-lucas force-pushed the ensure-requirements branch 2 times, most recently from b99edb9 to 8a4ac4a Compare May 3, 2026 07:14
@zhuqi-lucas zhuqi-lucas marked this pull request as ready for review May 6, 2026 01:34
Copilot AI review requested due to automatic review settings May 6, 2026 01:34
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR replaces the separate physical optimizer rules EnforceDistribution and EnforceSorting with a unified EnsureRequirements rule, and updates sort pushdown to be distribution-aware to prevent invalid plans when SinglePartition is required (e.g., under GlobalLimitExec). It also addresses fetch preservation when EnforceDistribution strips and re-adds partition-changing operators, and updates reference documentation / SLT expectations accordingly.

Changes:

  • Introduces EnsureRequirements and wires it into the default physical optimizer chain in place of EnforceDistribution + EnforceSorting.
  • Makes pushdown_sorts distribution-aware by propagating distribution requirements and inserting SortPreservingMergeExec when needed.
  • Preserves fetch across EnforceDistribution’s removal/reinsertion of SortPreservingMergeExec / CoalescePartitionsExec, and updates SLT + optimizer rule docs.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
datafusion/sqllogictest/test_files/explain.slt Updates EXPLAIN rule-name expectations to EnsureRequirements.
datafusion/physical-optimizer/src/utils.rs Adds add_sort_above_with_distribution to insert SPM when parent requires SinglePartition.
datafusion/physical-optimizer/src/optimizer.rs Swaps default optimizer chain to use EnsureRequirements instead of separate distribution/sorting rules.
datafusion/physical-optimizer/src/lib.rs Exposes new ensure_requirements module.
datafusion/physical-optimizer/src/ensure_requirements/new_tests.rs Adds extensive tests for correctness/idempotency across topologies.
datafusion/physical-optimizer/src/ensure_requirements/mod.rs Implements EnsureRequirements (and an experimental no-pushdown variant) plus additional tests/docs.
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs Propagates distribution requirements through sort pushdown; uses distribution-aware sort insertion.
datafusion/physical-optimizer/src/enforce_sorting/mod.rs Makes replace_with_partial_sort public for reuse.
datafusion/physical-optimizer/src/enforce_distribution.rs Preserves fetch when stripping SPM/Coalesce and reapplies it when re-merging.
datafusion/core/src/optimizer_rule_reference.md Updates docs to reflect EnsureRequirements in the default physical pipeline.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +51 to +56
//! - **Idempotent**: Running the rule twice produces the same plan.
//! - **Distribution before sorting**: For each child, distribution is resolved
//! before ordering, so sorting decisions always have full distribution context.
//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the bottom-up
//! pass only adds `SortExec` where the child doesn't already satisfy the
//! ordering requirement, naturally placing sorts at the deepest valid position.
Comment on lines +172 to 177
// EnsureRequirements: merged EnforceDistribution + EnforceSorting into a
// single idempotent rule with distribution-aware pushdown_sorts.
// See https://github.com/apache/datafusion/issues/21973
Arc::new(EnsureRequirements::new()),
// The CombinePartialFinalAggregate rule should be applied after distribution enforcement
Arc::new(CombinePartialFinalAggregate::new()),

use datafusion_common::Result;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
@zhuqi-lucas
Copy link
Copy Markdown
Contributor Author

cc @alamb @adriangb @xudong963 @Dandandan @2010YOUY01

We've hit several production bugs caused by EnforceDistribution and EnforceSorting running as separate rules — they're coupled through SortExec.preserve_partitioning, which makes the composition non-idempotent.
Other engines (Spark's EnsureRequirements, Trino's AddExchanges) handle both in a single rule, which sidesteps this entire bug class.

I initially attempted a true single-pass rewrite, but the interactions between distribution and sorting requirements turned out to be deeper than anticipated.

This PR takes a more conservative approach — composing the existing rules into one with the necessary correctness fixes. A clean single-pass design is left as a follow-up once the foundation is in place.

If a default change feels too risky for a first step, I'm happy to gate it behind a config flag (e.g. datafusion.optimizer.use_ensure_requirements) so users can opt in and we can validate equivalence in CI before flipping the default.

Would appreciate any thoughts on the overall direction.

Copy link
Copy Markdown
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I think this is the right direction. The existing two rules both
describe the expected input shape of a plan, so they should be tightly coupled.
I believe this direction can reduce entropy in the long run.

I do not fully understand the details yet, but I will try to spend more time looking
into them and continue the review.

Comment on lines +172 to +174
// EnsureRequirements: merged EnforceDistribution + EnforceSorting into a
// single idempotent rule with distribution-aware pushdown_sorts.
// See https://github.com/apache/datafusion/issues/21973
Copy link
Copy Markdown
Contributor

@2010YOUY01 2010YOUY01 May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// EnsureRequirements: merged EnforceDistribution + EnforceSorting into a
// single idempotent rule with distribution-aware pushdown_sorts.
// See https://github.com/apache/datafusion/issues/21973
// Ensures each input plan satisfies the distribution and ordering requirements
// declared by `ExecutionPlan::required_input_distribution` and
// `ExecutionPlan::required_input_ordering`.
// If the requirements are already satisfied, this rule leaves the plan
// unchanged. For example, it does not add sorting when the input is a file
// scan whose existing order already satisfies the required ordering.
// Otherwise, this rule inserts the necessary repartitioning and sorting
// operators.
// This used to be implemented as two separate rules: `EnforceDistribution`
// and `EnforceSorting`. It is now a single idempotent rule with
// distribution-aware `pushdown_sorts`.
// See https://github.com/apache/datafusion/issues/21973.

Added more comments since this is the entry point.

I also have a question: What is this 'distribution-aware pushdown_sorts'? We cloud link some reference to it.

Copy link
Copy Markdown
Contributor Author

@zhuqi-lucas zhuqi-lucas May 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @2010YOUY01 for review and good suggestion!

Will apply the suggestion in the next push (per alamb's plan I'll land a test-extraction PR first and rebase this on top).

On "distribution-aware pushdown_sorts": the old pushdown_sorts didn't propagate the parent's required_input_distribution, so a SortExec could get pushed below something requiring SinglePartition (e.g. under GlobalLimitExec) — giving an invalid plan because the sort then runs on multiple partitions. The fix propagates the requirement and inserts a SortPreservingMergeExec where needed. Will link to #21973 in the comment block when I update.

Copy link
Copy Markdown
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zhuqi-lucas -- this is exciting and a really nice idea.

Given how important this infrastructure is for all DataFusion plans I think the key to making this change will be to make sure we don't introduce regressions.

My biggest concern is that I can't really evaluate what, if any, tests were changed / added / removed in this PR (though it is impressive that all slt is passing)

Initial thoughts

  1. The existing EnforceDistribtuon and EnforceSorting code should be removed
  2. There are a LOT of existing unit tests for EnforceDistribtuon and EnforceSorting -- it looks like most of these have been ported to the new single pass, but it is really hard for me to verify

Suggested next steps

As there are already several large changes queued up in the 54 release, Maybe we can work to get the tests into shape and plan to merge this PR after we ship 54.

What I suggest we do is:

  1. Prepare the tests
    First, make a separate PR to move all existing tests for enforce sorting and enforce distribution somewhere outside the existing modules. By having the tests in a separate module, it will be much easier to understand what, if anything, changed when you replace EnforceSorting/EnforceDistribtuion with EnforceRequirements

This first PR should be fast/easy to review

  1. Then rebase this PR

Given the tests are separate from the code, it will then be much easier to see / verify that this unified pass both: 1) has the same test coverage, and 2) keeps the same behavior

@@ -0,0 +1,782 @@
// Licensed to the Apache Software Foundation (ASF) under one
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a separate module? If we want tests in a separate module, I think putting it in an integration test might be easier to find (datafusion/physical-optimizer/tests/ensure_requirements.rs perhaps?)

);
}

// ========================================================================
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

many of these tests look similar/ the same as what is in enforce distribution, etc but it is hard to tell what changed

@zhuqi-lucas
Copy link
Copy Markdown
Contributor Author

zhuqi-lucas commented May 10, 2026

Thanks @alamb. Reading again I see two separate asks:

  • The inline on new_tests.rs (move it to datafusion/physical-optimizer/tests/ensure_requirements.rs) is something I can do inside this PR right away — will push that next.
  • The bigger one (move the existing core/tests/physical_optimizer/{enforce_distribution.rs, enforce_sorting.rs, ...} integration tests, ~9.8k lines, to live alongside the optimizer crate) — I'll do that as a separate prep PR after 54 ships, then rebase this on top so the porting diff is clean.

Will ping you when the inline fix lands and again when the prep PR is up.

@xudong963
Copy link
Copy Markdown
Member

What I suggest we do is:

  1. Prepare the tests
    First, make a separate PR to move all existing tests for enforce sorting and enforce distribution somewhere outside the existing modules. By having the tests in a separate module, it will be much easier to understand what, if anything, changed when you replace EnforceSorting/EnforceDistribtuion with EnforceRequirements

+1 for this

@zhuqi-lucas
Copy link
Copy Markdown
Contributor Author

What I suggest we do is:

  1. Prepare the tests
    First, make a separate PR to move all existing tests for enforce sorting and enforce distribution somewhere outside the existing modules. By having the tests in a separate module, it will be much easier to understand what, if anything, changed when you replace EnforceSorting/EnforceDistribtuion with EnforceRequirements

+1 for this

Thanks @xudong963 , will go this way.

@alamb
Copy link
Copy Markdown
Contributor

alamb commented May 11, 2026

Thanks @alamb. Reading again I see two separate asks:

  • The inline on new_tests.rs (move it to datafusion/physical-optimizer/tests/ensure_requirements.rs) is something I can do inside this PR right away — will push that next.
  • The bigger one (move the existing core/tests/physical_optimizer/{enforce_distribution.rs, enforce_sorting.rs, ...} integration tests, ~9.8k lines, to live alongside the optimizer crate) — I'll do that as a separate prep PR after 54 ships, then rebase this on top so the porting diff is clean.

Will ping you when the inline fix lands and again when the prep PR is up.

Thanks @zhuqi-lucas

FYI I think we can reorganize the tests before 54 as that has a low chance of introducing bugs (so doesn't need as long a "bake" time)

In terms of this PR, my plan for reviewing it is basically

  1. Review what, if any tests have changed (will spend most effort here)
  2. Run sql planning performance benchmarks to ensure no regressions
  3. Review the code for anything that stands out

So TLDR is I plan to spend most of my time doing verification rather than code review

@zhuqi-lucas
Copy link
Copy Markdown
Contributor Author

zhuqi-lucas commented May 12, 2026

Thanks @alamb. Reading again I see two separate asks:

  • The inline on new_tests.rs (move it to datafusion/physical-optimizer/tests/ensure_requirements.rs) is something I can do inside this PR right away — will push that next.
  • The bigger one (move the existing core/tests/physical_optimizer/{enforce_distribution.rs, enforce_sorting.rs, ...} integration tests, ~9.8k lines, to live alongside the optimizer crate) — I'll do that as a separate prep PR after 54 ships, then rebase this on top so the porting diff is clean.

Will ping you when the inline fix lands and again when the prep PR is up.

Thanks @zhuqi-lucas

FYI I think we can reorganize the tests before 54 as that has a low chance of introducing bugs (so doesn't need as long a "bake" time)

In terms of this PR, my plan for reviewing it is basically

  1. Review what, if any tests have changed (will spend most effort here)
  2. Run sql planning performance benchmarks to ensure no regressions
  3. Review the code for anything that stands out

So TLDR is I plan to spend most of my time doing verification rather than code review

Thanks @alamb !

Test-extraction PR is up: #22117. Hit a snag with the original "move to physical-optimizer/tests/" approach — the tests use SessionContext so they'd need datafusion as a dev-dep of datafusion-physical-optimizer, which dev/depcheck rejects (it'd block crates.io publish through the resulting cycle).

Pivoted to a new workspace crate datafusion-physical-optimizer-tests (publish = false, following the precedent of test-utils and datafusion-wasmtest). The crate is a leaf in the dependency graph — nothing depends on it — so its dev-deps on datafusion and datafusion-physical-optimizer introduce no cycle. cargo metadata confirms zero incoming edges.

Git detects all four moves as 93–98% renames. Once #22117 lands I'll rebase this PR on top, and the resulting diff should be the function-name swap from EnforceDistribution::new() / EnforceSorting::new() to EnsureRequirements::new() on the moved files plus the genuinely new tests — which is the comparison you wanted to be able to do.

Happy to discuss the new-crate choice on #22117 if you'd prefer a different structure.

zhuqi-lucas added a commit to zhuqi-lucas/arrow-datafusion that referenced this pull request May 12, 2026
…ptimizer/tests/

Moves the four physical-optimizer integration test files from
`datafusion/core/tests/physical_optimizer/` to
`datafusion/physical-optimizer/tests/` so they live alongside the rules
they exercise:

  - enforce_distribution.rs              (66 tests)
  - enforce_sorting.rs                   (60 tests)
  - enforce_sorting_monotonicity.rs      (64 tests)
  - replace_with_order_preserving_variants.rs  (46 tests)

This sets up the review pattern apache#21976 asks for: once
EnsureRequirements lands, the rebased PR's diff will show the
function-name swap from `EnforceDistribution::new()` /
`EnforceSorting::new()` to `EnsureRequirements::new()` on these files
plus any net-new tests, instead of the current hard-to-diff mix of
ported and net-new inline tests.

Mechanics
---------
- `git mv` preserves history for all four files.
- `physical-optimizer/Cargo.toml` gains the dev-dependencies the
  integration tests need (`datafusion`, `datafusion-datasource`,
  `arrow`, `object_store`, etc.). `datafusion` itself appears as a
  dev-dependency even though `datafusion` depends on this crate;
  cargo permits this dev-dep cycle because it only affects test
  builds.
- `core/tests/physical_optimizer/test_utils.rs` is *copied* to
  `physical-optimizer/tests/common/test_utils.rs` (cargo skips
  subdirectory files when discovering integration tests). The two
  copies will be reconciled once the remaining physical_optimizer
  integration tests move alongside their rules. The copy carries an
  inner `#![allow(dead_code)]` since each binary uses only a subset
  of the helpers.
- `DummyStreamPartition`, previously imported via
  `crate::memory_limit::DummyStreamPartition`, is inlined into
  `enforce_sorting.rs` (20 lines, isolated from the `memory_limit`
  directory module).
- `EnforceSortingTest` (previously shared via
  `crate::physical_optimizer::enforce_sorting`) is duplicated into
  `enforce_sorting_monotonicity.rs`. Each integration test is now its
  own crate root and cannot reach into a sibling integration test's
  `pub(crate)` items. The duplication is intentional and transient —
  once EnsureRequirements lands the helper goes away with the rule it
  tests.
- Workspace `clippy::needless_pass_by_value` and `dead_code`
  suppressions previously carried by the parent `mod` declarations
  are re-applied at file scope where applicable.

Verification
------------
- cargo build --workspace --tests
- cargo test -p datafusion-physical-optimizer
    --test enforce_distribution               (66 passed)
    --test enforce_sorting                    (60 passed)
    --test enforce_sorting_monotonicity       (64 passed)
    --test replace_with_order_preserving_variants  (46 passed)
- cargo test -p datafusion --test core_integration physical_optimizer::
    (218 passed — the remaining core/tests/physical_optimizer/ tests)
- cargo clippy --workspace --tests -- -D warnings
- cargo fmt --all --check
zhuqi-lucas added a commit to zhuqi-lucas/arrow-datafusion that referenced this pull request May 12, 2026
…ptimizer/tests/

Moves the four physical-optimizer integration test files from
`datafusion/core/tests/physical_optimizer/` to
`datafusion/physical-optimizer/tests/` so they live alongside the rules
they exercise:

  - enforce_distribution.rs              (66 tests)
  - enforce_sorting.rs                   (60 tests)
  - enforce_sorting_monotonicity.rs      (64 tests)
  - replace_with_order_preserving_variants.rs  (46 tests)

This sets up the review pattern apache#21976 asks for: once
EnsureRequirements lands, the rebased PR's diff will show the
function-name swap from `EnforceDistribution::new()` /
`EnforceSorting::new()` to `EnsureRequirements::new()` on these files
plus any net-new tests, instead of the current hard-to-diff mix of
ported and net-new inline tests.

Mechanics
---------
- `git mv` preserves history for all four files.
- `physical-optimizer/Cargo.toml` gains the dev-dependencies the
  integration tests need (`datafusion`, `datafusion-datasource`,
  `arrow`, `object_store`, etc.). `datafusion` itself appears as a
  dev-dependency even though `datafusion` depends on this crate;
  cargo permits this dev-dep cycle because it only affects test
  builds.
- `core/tests/physical_optimizer/test_utils.rs` is *copied* to
  `physical-optimizer/tests/common/test_utils.rs` (cargo skips
  subdirectory files when discovering integration tests). The two
  copies will be reconciled once the remaining physical_optimizer
  integration tests move alongside their rules. The copy carries an
  inner `#![allow(dead_code)]` since each binary uses only a subset
  of the helpers.
- `DummyStreamPartition`, previously imported via
  `crate::memory_limit::DummyStreamPartition`, is inlined into
  `enforce_sorting.rs` (20 lines, isolated from the `memory_limit`
  directory module).
- `EnforceSortingTest` (previously shared via
  `crate::physical_optimizer::enforce_sorting`) is duplicated into
  `enforce_sorting_monotonicity.rs`. Each integration test is now its
  own crate root and cannot reach into a sibling integration test's
  `pub(crate)` items. The duplication is intentional and transient —
  once EnsureRequirements lands the helper goes away with the rule it
  tests.
- Workspace `clippy::needless_pass_by_value` and `dead_code`
  suppressions previously carried by the parent `mod` declarations
  are re-applied at file scope where applicable.

Verification
------------
- cargo build --workspace --tests
- cargo test -p datafusion-physical-optimizer
    --test enforce_distribution               (66 passed)
    --test enforce_sorting                    (60 passed)
    --test enforce_sorting_monotonicity       (64 passed)
    --test replace_with_order_preserving_variants  (46 passed)
- cargo test -p datafusion --test core_integration physical_optimizer::
    (218 passed — the remaining core/tests/physical_optimizer/ tests)
- cargo clippy --workspace --tests -- -D warnings
- cargo fmt --all --check
zhuqi-lucas added a commit to zhuqi-lucas/arrow-datafusion that referenced this pull request May 12, 2026
…ptimizer/tests/

Moves the four physical-optimizer integration test files from
`datafusion/core/tests/physical_optimizer/` to
`datafusion/physical-optimizer/tests/` so they live alongside the rules
they exercise:

  - enforce_distribution.rs              (66 tests)
  - enforce_sorting.rs                   (60 tests)
  - enforce_sorting_monotonicity.rs      (64 tests)
  - replace_with_order_preserving_variants.rs  (46 tests)

This sets up the review pattern apache#21976 asks for: once
EnsureRequirements lands, the rebased PR's diff will show the
function-name swap from `EnforceDistribution::new()` /
`EnforceSorting::new()` to `EnsureRequirements::new()` on these files
plus any net-new tests, instead of the current hard-to-diff mix of
ported and net-new inline tests.

Mechanics
---------
- `git mv` preserves history for all four files.
- `physical-optimizer/Cargo.toml` gains the dev-dependencies the
  integration tests need (`datafusion`, `datafusion-datasource`,
  `arrow`, `object_store`, etc.). `datafusion` itself appears as a
  dev-dependency even though `datafusion` depends on this crate;
  cargo permits this dev-dep cycle because it only affects test
  builds.
- `core/tests/physical_optimizer/test_utils.rs` is *copied* to
  `physical-optimizer/tests/common/test_utils.rs` (cargo skips
  subdirectory files when discovering integration tests). The two
  copies will be reconciled once the remaining physical_optimizer
  integration tests move alongside their rules. The copy carries an
  inner `#![allow(dead_code)]` since each binary uses only a subset
  of the helpers.
- `DummyStreamPartition`, previously imported via
  `crate::memory_limit::DummyStreamPartition`, is inlined into
  `enforce_sorting.rs` (20 lines, isolated from the `memory_limit`
  directory module).
- `EnforceSortingTest` (previously shared via
  `crate::physical_optimizer::enforce_sorting`) is duplicated into
  `enforce_sorting_monotonicity.rs`. Each integration test is now its
  own crate root and cannot reach into a sibling integration test's
  `pub(crate)` items. The duplication is intentional and transient —
  once EnsureRequirements lands the helper goes away with the rule it
  tests.
- Workspace `clippy::needless_pass_by_value` and `dead_code`
  suppressions previously carried by the parent `mod` declarations
  are re-applied at file scope where applicable.

Verification
------------
- cargo build --workspace --tests
- cargo test -p datafusion-physical-optimizer
    --test enforce_distribution               (66 passed)
    --test enforce_sorting                    (60 passed)
    --test enforce_sorting_monotonicity       (64 passed)
    --test replace_with_order_preserving_variants  (46 passed)
- cargo test -p datafusion --test core_integration physical_optimizer::
    (218 passed — the remaining core/tests/physical_optimizer/ tests)
- cargo clippy --workspace --tests -- -D warnings
- cargo fmt --all --check
zhuqi-lucas added a commit to zhuqi-lucas/arrow-datafusion that referenced this pull request May 12, 2026
…icated crate

Moves the four physical-optimizer integration test files from
`datafusion/core/tests/physical_optimizer/` into a new workspace crate
`datafusion-physical-optimizer-tests` (publish = false) so they live
adjacent to the optimizer rule crate they exercise:

  - enforce_distribution.rs              (66 tests)
  - enforce_sorting.rs                   (60 tests)
  - enforce_sorting_monotonicity.rs      (64 tests)
  - replace_with_order_preserving_variants.rs  (46 tests)

This sets up the review pattern apache#21976 asks for: once
EnsureRequirements lands, the rebased PR's diff will show the
function-name swap from `EnforceDistribution::new()` /
`EnforceSorting::new()` to `EnsureRequirements::new()` on these files
plus any net-new tests, instead of the current hard-to-diff mix of
ported and net-new inline tests.

Why a new crate
---------------
The simpler "move to `physical-optimizer/tests/`" arrangement runs
afoul of `dev/depcheck`: the tests use `datafusion::prelude::SessionContext`,
which would require `datafusion = { workspace = true }` as a
dev-dependency of `datafusion-physical-optimizer`. Because `datafusion`
already runtime-depends on `datafusion-physical-optimizer`, that
forms a cycle the project's depcheck (which guards crates.io publish
ability) rejects.

The new crate is a leaf in the dependency graph — nothing depends on
it — so its dev-deps on `datafusion` and `datafusion-physical-optimizer`
introduce no cycle. The crate is `publish = false`, following the
existing precedent of `test-utils` and `datafusion-wasmtest`.

Mechanics
---------
- `git mv` preserves history; git detects the four moves as 93–98%
  renames.
- `Cargo.toml` adds the new crate as a workspace member.
- `physical-optimizer-tests/Cargo.toml` lists the dev-dependencies the
  tests need; the crate has no runtime dependencies. The `src/lib.rs`
  is intentionally empty (only doc comments) — the crate exists purely
  to host integration tests under `tests/`.
- `core/tests/physical_optimizer/mod.rs` drops the four moved `mod`
  declarations and adds `dead_code` suppression for the now-partially-unused
  shared `test_utils`.
- `test_utils` is referenced via `#[path = "../../core/tests/physical_optimizer/test_utils.rs"] mod test_utils;`
  so there is a single source of truth shared with the remaining
  `core/tests/physical_optimizer/` tests. The mod declaration carries
  `#[allow(dead_code, clippy::allow_attributes)]` because each test
  binary uses only a subset of the helpers.
- `DummyStreamPartition`, previously imported via
  `crate::memory_limit::DummyStreamPartition`, is inlined into
  `enforce_sorting.rs` (20 lines).
- `EnforceSortingTest` is duplicated into
  `enforce_sorting_monotonicity.rs` — each integration test is now its
  own crate root and cannot reach into a sibling integration test's
  `pub(crate)` items. The duplication is annotated as transient.

Verification
------------
- cargo build --workspace --tests
- cargo test -p datafusion-physical-optimizer-tests
    (66 + 60 + 64 + 46 = 236 tests passed)
- cargo test -p datafusion --test core_integration physical_optimizer::
    (remaining core tests still pass)
- cargo clippy --workspace --tests -- -D warnings
- cargo fmt --all --check
- The new crate is a leaf: `cargo metadata` confirms nothing depends on
  `datafusion-physical-optimizer-tests`, so depcheck will not find a
  cycle through it.
zhuqi-lucas added a commit to zhuqi-lucas/arrow-datafusion that referenced this pull request May 12, 2026
…icated crate

Moves the four physical-optimizer integration test files from
`datafusion/core/tests/physical_optimizer/` into a new workspace crate
`datafusion-physical-optimizer-tests` (publish = false) so they live
adjacent to the optimizer rule crate they exercise:

  - enforce_distribution.rs              (66 tests)
  - enforce_sorting.rs                   (60 tests)
  - enforce_sorting_monotonicity.rs      (64 tests)
  - replace_with_order_preserving_variants.rs  (46 tests)

This sets up the review pattern apache#21976 asks for: once
EnsureRequirements lands, the rebased PR's diff will show the
function-name swap from `EnforceDistribution::new()` /
`EnforceSorting::new()` to `EnsureRequirements::new()` on these files
plus any net-new tests, instead of the current hard-to-diff mix of
ported and net-new inline tests.

Why a new crate
---------------
The simpler "move to `physical-optimizer/tests/`" arrangement runs
afoul of `dev/depcheck`: the tests use `datafusion::prelude::SessionContext`,
which would require `datafusion = { workspace = true }` as a
dev-dependency of `datafusion-physical-optimizer`. Because `datafusion`
already runtime-depends on `datafusion-physical-optimizer`, that
forms a cycle the project's depcheck (which guards crates.io publish
ability) rejects.

The new crate is a leaf in the dependency graph — nothing depends on
it — so its dev-deps on `datafusion` and `datafusion-physical-optimizer`
introduce no cycle. The crate is `publish = false`, following the
existing precedent of `test-utils` and `datafusion-wasmtest`.

Mechanics
---------
- `git mv` preserves history; git detects the four moves as 93–98%
  renames.
- `Cargo.toml` adds the new crate as a workspace member.
- `physical-optimizer-tests/Cargo.toml` lists the dev-dependencies the
  tests need; the crate has no runtime dependencies. The `src/lib.rs`
  is intentionally empty (only doc comments) — the crate exists purely
  to host integration tests under `tests/`.
- `core/tests/physical_optimizer/mod.rs` drops the four moved `mod`
  declarations and adds `dead_code` suppression for the now-partially-unused
  shared `test_utils`.
- `test_utils` is referenced via `#[path = "../../core/tests/physical_optimizer/test_utils.rs"] mod test_utils;`
  so there is a single source of truth shared with the remaining
  `core/tests/physical_optimizer/` tests. The mod declaration carries
  `#[allow(dead_code, clippy::allow_attributes)]` because each test
  binary uses only a subset of the helpers.
- `DummyStreamPartition`, previously imported via
  `crate::memory_limit::DummyStreamPartition`, is inlined into
  `enforce_sorting.rs` (20 lines).
- `EnforceSortingTest` is duplicated into
  `enforce_sorting_monotonicity.rs` — each integration test is now its
  own crate root and cannot reach into a sibling integration test's
  `pub(crate)` items. The duplication is annotated as transient.

Verification
------------
- cargo build --workspace --tests
- cargo test -p datafusion-physical-optimizer-tests
    (66 + 60 + 64 + 46 = 236 tests passed)
- cargo test -p datafusion --test core_integration physical_optimizer::
    (remaining core tests still pass)
- cargo clippy --workspace --tests -- -D warnings
- cargo fmt --all --check
- The new crate is a leaf: `cargo metadata` confirms nothing depends on
  `datafusion-physical-optimizer-tests`, so depcheck will not find a
  cycle through it.
@alamb
Copy link
Copy Markdown
Contributor

alamb commented May 12, 2026

@zhuqi-lucas
Copy link
Copy Markdown
Contributor Author

zhuqi-lucas commented May 13, 2026

Thanks @alamb, that direction worked out really cleanly. All three steps are in now.

The old EnforceDistribution / EnforceSorting rule structs are gone. The existing tests in core/tests/physical_optimizer/ now call EnsureRequirements::new() directly — kept the original test framework shape so the historical [Dist, Sort] / [Sort, Dist, Sort] sequences are still there, they just collapse to running the merged rule a couple of times (and the merged rule is idempotent, which is the whole point).

Snapshots refreshed; the consistent pattern is SortExec + CoalescePartitionsExecSortPreservingMergeExec. EnsureRequirements is now also pulling in parallelize_sorts + replace_with_order_preserving_variants on plan shapes that the bare single-rule path used to miss, so a chunk of the diff is the optimizer producing better plans on the same inputs. Worth a careful look in review.

On new_tests.rs: looking at it more carefully, every test in that file (and every inline test in ensure_requirements/mod.rs) is using either an optimize_and_sanity_check helper (regression for the SanityCheckPlan validation failures that motivated this PR) or assert_idempotent — none of them are smoke tests, they're all exercising the new properties. The real redundancy was that the two files duplicated the helpers and a lot of plan shapes. Dropped new_tests.rs and kept the mod.rs inline ones, since the named #14150 regression lives there and the plan-shape coverage (hash join, aggregate, window, projection, sort+limit, fetch preservation, …) is comprehensive already. If you'd rather I cull harder, easy follow-up.

Hit one CI failure on the hash-collisions job after the first push: EnforceSortingTest used ConfigOptions::new() (defaulted target_partitions to CPU count) and the snapshots had my local 12 baked in, so the 4-core runner blew up. Pinned target_partitions = 10 to match the convention in enforce_distribution.rs and re-accepted. Should be green now.

…ceSorting

## Summary

Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules
with a single `EnsureRequirements` rule in the default optimizer chain. This makes
the composition idempotent by fixing distribution-awareness in `pushdown_sorts`
and fetch preservation in `EnforceDistribution`.

## Problem

`EnforceDistribution` and `EnforceSorting` are coupled through
`SortExec.preserve_partitioning` but run as independent rules. This caused:

1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true`
   without `SortPreservingMergeExec`, violating `SinglePartition` requirements
   from `GlobalLimitExec` → `SanityCheckPlan` failure.

2. **Non-idempotent composition**: Running the rules multiple times produced
   different (sometimes invalid) plans.

3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from
   `SortPreservingMergeExec` when stripping and re-adding distribution operators.

DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`)
and Presto (`AddExchanges`) use a single rule.

## Changes

### `EnsureRequirements` rule (new)
- Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()`
- Replaces both rules in the default optimizer chain
- 53 comprehensive tests including idempotency verification

### Distribution-aware `pushdown_sorts` (fix)
- Add `distribution_requirement` field to `ParentRequirements`
- New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec`
  when parent requires `SinglePartition` and input has multiple partitions
- Propagate distribution through recursion with `stronger_distribution()`
- Reset distribution below partition-merging nodes (SPM, single-partition outputs)

### Fix `EnforceDistribution` fetch preservation (apache#14150)
- `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce
- `add_merge_on_top()` re-applies saved fetch to new operators

## Testing

| Suite | Result |
|-------|--------|
| EnsureRequirements (new) | 53 passed |
| enforce_sorting (existing) | 124 passed, 0 regressions |
| enforce_distribution (existing) | 66 passed, 0 regressions |
| SLT (465 files) | 1 pre-existing failure only |
| **Total** | **243 unit + 464 SLT, 0 new failures** |

Idempotency verified:
- All partition counts 1-64
- Triple + 10x consecutive optimization passes
- SortMergeJoin, HashJoin, Window, Aggregate topologies
- PR apache#53/apache#54 regression scenarios
- apache#14150 fetch preservation across passes

Closes: apache#14150
Part of: apache#21973
…et existing tests at EnsureRequirements

Step 1 — Removed the now-unregistered `EnforceDistribution` and
`EnforceSorting` rule structs (and their `impl PhysicalOptimizerRule`).
`EnsureRequirements` already calls the underlying helpers
(`ensure_distribution`, `ensure_sorting`, the contexts, sort_pushdown,
parallelize_sorts, replace_with_order_preserving_variants, …) directly,
so the helper modules stay in place — only the public rule entry points
are gone.

Step 2 — Retargeted the existing integration tests in
`core/tests/physical_optimizer/` so they exercise EnsureRequirements
instead of the removed rules:

  * `enforce_distribution.rs` — `Run::Distribution` / `Run::Sorting`
    branches now both call `EnsureRequirements::new()`. The legacy run
    sequences are preserved verbatim so the historical test assertions
    keep their shape; idempotency makes the previously-different run
    orders converge.
  * `enforce_sorting.rs` — `EnforceSortingTest` now drives
    `EnsureRequirements::new()`. The idempotency comparison test
    (was "\[Dist, Sort\] vs \[Sort, Dist, Sort\]") is rewritten as
    "running EnsureRequirements N times == running it once", which is
    the merged rule's actual guarantee.
  * `enforce_sorting_monotonicity.rs` and
    `replace_with_order_preserving_variants.rs` needed no source
    changes — only snapshots updated via the shared test driver.

78 insta snapshots were updated and reviewed: the consistent diff is
`SortExec + CoalescePartitionsExec` (blocking) →
`SortPreservingMergeExec` (streaming), i.e. EnsureRequirements now
applies parallelize_sorts + replace_with_order_preserving_variants on
inputs the single-rule path didn't reach. These are improvements, not
regressions.

Also dropped one inline test inside `ensure_requirements/mod.rs`
(`test_issue_14150_enforce_distribution_idempotent`) that asserted
idempotency of the standalone EnforceDistribution pass — its successor
`test_issue_14150_fetch_survives_multiple_passes` exercises the same
property end-to-end on EnsureRequirements.

Step 3 (drop the remaining inline / new_tests.rs duplicates added in
apache#21976) is intentionally left for a follow-up commit, pending review
on which genuinely-new-behavior cases to keep.

Test results:
  - core_integration physical_optimizer:: 454 passed
  - datafusion-physical-optimizer (lib)   79 passed
  - clippy --all-targets -- -D warnings   clean
  - cargo fmt --all --check               clean
Step 3 of alamb's review redirection on apache#21976: drop the
`new_tests.rs` file added alongside the EnsureRequirements rule. The
file was an organizational anomaly — a separate test module with
duplicated test helpers (`optimize_and_sanity_check` /
`assert_idempotent` are also defined in the inline tests module in
`ensure_requirements/mod.rs`) and substantial plan-shape overlap with
the inline tests:

  * Hash joins, aggregates, window functions, projections, fetch
    preservation, filters, repartitioning, skip+fetch, and nested-plan
    cases all have idempotency + SanityCheckPlan-validity coverage in
    `mod.rs` already.
  * The specific production-bug regressions (apache#14150, PR53, PR54) all
    live in `mod.rs`.

The inline tests in `mod.rs` remain — every one of them exercises
either `optimize_and_sanity_check` (production-502 regression
property) or `assert_idempotent` (the merged-rule's core property),
neither of which is covered by the existing tests in
`core/tests/physical_optimizer/`. They are not duplicates of the
existing rule tests, just of `new_tests.rs`.

Test results after the deletion:
  - datafusion-physical-optimizer (lib)   59 passed (was 79; 20 from
                                          new_tests.rs removed)
  - core_integration physical_optimizer:: 454 passed (unchanged)
  - clippy -D warnings                    clean
@zhuqi-lucas zhuqi-lucas force-pushed the ensure-requirements branch from d368ad5 to 47cd383 Compare May 13, 2026 07:08
…pendent

After retargeting the test framework to EnsureRequirements, the partition
count from ConfigOptions::new() (defaulting to CPU count) appears in
Hash(\[…\], N) operators in the optimized plan. Snapshots taken on one
machine fail on a machine with a different CPU count, which is why the
hash-collisions CI job (4-core runner) was failing after the local
acceptance (12-core dev machine).

Fix: pin target_partitions = 10 in EnforceSortingTest::run(), matching
the convention already used in enforce_distribution.rs. Re-accept the
affected snapshots so they reflect the pinned value.
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 13, 2026

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
     Cloning apache/main
    Building datafusion v53.1.0 (current)
       Built [  82.374s] (current)
     Parsing datafusion v53.1.0 (current)
      Parsed [   0.034s] (current)
    Building datafusion v53.1.0 (baseline)
       Built [  81.888s] (baseline)
     Parsing datafusion v53.1.0 (baseline)
      Parsed [   0.034s] (baseline)
    Checking datafusion v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.677s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [ 166.450s] datafusion
    Building datafusion-physical-optimizer v53.1.0 (current)
       Built [  36.841s] (current)
     Parsing datafusion-physical-optimizer v53.1.0 (current)
      Parsed [   0.022s] (current)
    Building datafusion-physical-optimizer v53.1.0 (baseline)
       Built [  37.171s] (baseline)
     Parsing datafusion-physical-optimizer v53.1.0 (baseline)
      Parsed [   0.022s] (baseline)
    Checking datafusion-physical-optimizer v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.104s] 222 checks: 221 pass, 1 fail, 0 warn, 30 skip

--- failure struct_missing: pub struct removed or renamed ---

Description:
A publicly-visible struct cannot be imported by its prior path. A `pub use` may have been removed, or the struct itself may have been renamed or removed entirely.
        ref: https://doc.rust-lang.org/cargo/reference/semver.html#item-remove
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.47.0/src/lints/struct_missing.ron

Failed in:
  struct datafusion_physical_optimizer::enforce_sorting::EnforceSorting, previously in file /home/runner/work/datafusion/datafusion/target/semver-checks/git-apache_main/da1bc44dbfe2ea5ef369389ef86956738e6352d0/datafusion/physical-optimizer/src/enforce_sorting/mod.rs:79
  struct datafusion_physical_optimizer::enforce_distribution::EnforceDistribution, previously in file /home/runner/work/datafusion/datafusion/target/semver-checks/git-apache_main/da1bc44dbfe2ea5ef369389ef86956738e6352d0/datafusion/physical-optimizer/src/enforce_distribution.rs:184

     Summary semver requires new major version: 1 major and 0 minor checks failed
    Finished [  75.704s] datafusion-physical-optimizer
    Building datafusion-sqllogictest v53.1.0 (current)
       Built [ 134.912s] (current)
     Parsing datafusion-sqllogictest v53.1.0 (current)
      Parsed [   0.022s] (current)
    Building datafusion-sqllogictest v53.1.0 (baseline)
       Built [ 134.337s] (baseline)
     Parsing datafusion-sqllogictest v53.1.0 (baseline)
      Parsed [   0.023s] (baseline)
    Checking datafusion-sqllogictest v53.1.0 -> v53.1.0 (no change; assume patch)
     Checked [   0.089s] 222 checks: 222 pass, 30 skip
     Summary no semver update required
    Finished [ 272.864s] datafusion-sqllogictest

@github-actions github-actions Bot added the auto detected api change Auto detected API change label May 13, 2026
Three small follow-ups to the retirement of `EnforceDistribution` /
`EnforceSorting`:

- enforce_distribution.rs: the ~120-line introductory rustdoc that
  previously documented the `EnforceDistribution` struct ended up
  silently rebound to the unrelated internal `JoinKeyPairs` struct
  after the struct deletion. Dropped the orphaned block — the brief
  `//` retirement note at the top of the file is enough; the original
  content lives in git history.
- enforce_sorting/mod.rs: the module-level doc still introduced
  "the EnforceSorting optimizer rule". Rewrote to describe the
  module's current role (helpers used by `EnsureRequirements`) while
  keeping the worked sort-removal example.
- enforce_sorting/mod.rs: `PlanWithCorrespondingSort` / `PlanWith
  CorrespondingCoalescePartitions` docs said the type "was originally
  used within the `EnforceSorting` rule". They are still used, just by
  `EnsureRequirements` now — reworded.
- ensure_requirements/mod.rs: `test_enforce_distribution_idempotent_hash_join`
  had stale `expect("first EnforceDistribution failed")` strings and
  a stale assertion message. Now says `EnsureRequirements pass failed`
  / `EnsureRequirements not idempotent`.
Caught a few more stale references on a closer re-read:

- enforce_distribution.rs: module-level doc still introduced "the
  EnforceDistribution optimizer rule". Rewrote to describe the
  helpers used by EnsureRequirements (mirrors the enforce_sorting
  module doc).
- enforce_sorting/sort_pushdown.rs: the doc on ParentRequirements
  referenced the [`EnforceSorting`] rule and had a now-broken
  intra-doc link to crate::enforce_sorting::EnforceSorting. Rewrote
  and removed the dangling link.
- optimizer.rs: three pipeline-position comments still mentioned
  EnforceDistribution / EnforceSorting as separate rules. Updated to
  point at EnsureRequirements instead.
- ensure_requirements/mod.rs: a couple of internal-reference comments
  ("PR apache#53", "EnforceSorting's ensure_sorting must insert …",
  "EnforceDistribution claims to be idempotent") rephrased to neutral
  language and to refer to the current rule structure.
- enforce_sorting.rs (tests): the EnforceSortingTest::run comment
  about "this file has 4 rules that use tree node, apply these rules
  as in the EnforceSorting::optimize implementation" updated to refer
  to EnsureRequirements.
Caught a few more on a third re-read:

- ensure_requirements/mod.rs (tests): the internal PR numbers
  `PR apache#53` / `PR apache#54` and the internal product/team acronyms
  `UXX0/HRC 502s` were in the test docstrings and function names.
  Renamed and rewrote so the test names and docs describe the
  actual scenario (`OutputRequirementExec(SinglePartition)` over a
  multi-partition source, `pushdown_sorts` through `ProjectionExec`)
  instead of internal ticket IDs.
- combine_partial_final_agg.rs: rule-ordering doc still said
  "should be applied after the EnforceDistribution and EnforceSorting
  rules". Updated to refer to `EnsureRequirements`.
… multi-partition

The existing `test_issue_14150_fetch_survives_multiple_passes` exercises
the apache#14150 code path indirectly — the input plan has no
`SortPreservingMergeExec` and lets the optimizer insert one. The
original apache#14150 bug, however, bit when the *input* plan already
contained an `SPM(fetch=N)`: `remove_dist_changing_operators()` would
strip it, then `add_merge_on_top()` re-create an SPM without copying
the saved `fetch`.

This test rebuilds the exact input shape from the previously-deleted
`test_issue_14150_enforce_distribution_idempotent` and runs the
plan through `EnsureRequirements::new()` twice. The apache#14150 property
holds: `fetch=5` is preserved across both passes.

The test only asserts the fetch-preservation property, not byte-
identical idempotency on this shape, because the merged rule may
legitimately deduplicate the `fetch` field between adjacent operators
(e.g. consolidate it onto the surrounding `GlobalLimitExec` and drop
it from the SPM). Byte-identical idempotency for realistic input
shapes is covered separately by the existing test.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto detected api change Auto detected API change core Core DataFusion crate optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bug: applying multiple times EnforceDistribution generates invalid plan

5 participants