Add EnsureRequirements: merged EnforceDistribution + EnforceSorting with idempotent pushdown_sorts#21976
Add EnsureRequirements: merged EnforceDistribution + EnforceSorting with idempotent pushdown_sorts#21976zhuqi-lucas wants to merge 9 commits into
Conversation
dfb1043 to
ba7e30e
Compare
b99edb9 to
8a4ac4a
Compare
There was a problem hiding this comment.
Pull request overview
This PR replaces the separate physical optimizer rules EnforceDistribution and EnforceSorting with a unified EnsureRequirements rule, and updates sort pushdown to be distribution-aware to prevent invalid plans when SinglePartition is required (e.g., under GlobalLimitExec). It also addresses fetch preservation when EnforceDistribution strips and re-adds partition-changing operators, and updates reference documentation / SLT expectations accordingly.
Changes:
- Introduces
EnsureRequirementsand wires it into the default physical optimizer chain in place ofEnforceDistribution+EnforceSorting. - Makes
pushdown_sortsdistribution-aware by propagating distribution requirements and insertingSortPreservingMergeExecwhen needed. - Preserves
fetchacrossEnforceDistribution’s removal/reinsertion ofSortPreservingMergeExec/CoalescePartitionsExec, and updates SLT + optimizer rule docs.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/sqllogictest/test_files/explain.slt | Updates EXPLAIN rule-name expectations to EnsureRequirements. |
| datafusion/physical-optimizer/src/utils.rs | Adds add_sort_above_with_distribution to insert SPM when parent requires SinglePartition. |
| datafusion/physical-optimizer/src/optimizer.rs | Swaps default optimizer chain to use EnsureRequirements instead of separate distribution/sorting rules. |
| datafusion/physical-optimizer/src/lib.rs | Exposes new ensure_requirements module. |
| datafusion/physical-optimizer/src/ensure_requirements/new_tests.rs | Adds extensive tests for correctness/idempotency across topologies. |
| datafusion/physical-optimizer/src/ensure_requirements/mod.rs | Implements EnsureRequirements (and an experimental no-pushdown variant) plus additional tests/docs. |
| datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs | Propagates distribution requirements through sort pushdown; uses distribution-aware sort insertion. |
| datafusion/physical-optimizer/src/enforce_sorting/mod.rs | Makes replace_with_partial_sort public for reuse. |
| datafusion/physical-optimizer/src/enforce_distribution.rs | Preserves fetch when stripping SPM/Coalesce and reapplies it when re-merging. |
| datafusion/core/src/optimizer_rule_reference.md | Updates docs to reflect EnsureRequirements in the default physical pipeline. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| //! - **Idempotent**: Running the rule twice produces the same plan. | ||
| //! - **Distribution before sorting**: For each child, distribution is resolved | ||
| //! before ordering, so sorting decisions always have full distribution context. | ||
| //! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the bottom-up | ||
| //! pass only adds `SortExec` where the child doesn't already satisfy the | ||
| //! ordering requirement, naturally placing sorts at the deepest valid position. |
| // EnsureRequirements: merged EnforceDistribution + EnforceSorting into a | ||
| // single idempotent rule with distribution-aware pushdown_sorts. | ||
| // See https://github.com/apache/datafusion/issues/21973 | ||
| Arc::new(EnsureRequirements::new()), | ||
| // The CombinePartialFinalAggregate rule should be applied after distribution enforcement | ||
| Arc::new(CombinePartialFinalAggregate::new()), |
|
|
||
| use datafusion_common::Result; | ||
| use datafusion_common::config::ConfigOptions; | ||
| use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; |
|
cc @alamb @adriangb @xudong963 @Dandandan @2010YOUY01 We've hit several production bugs caused by EnforceDistribution and EnforceSorting running as separate rules — they're coupled through SortExec.preserve_partitioning, which makes the composition non-idempotent. I initially attempted a true single-pass rewrite, but the interactions between distribution and sorting requirements turned out to be deeper than anticipated. This PR takes a more conservative approach — composing the existing rules into one with the necessary correctness fixes. A clean single-pass design is left as a follow-up once the foundation is in place. If a default change feels too risky for a first step, I'm happy to gate it behind a config flag (e.g. Would appreciate any thoughts on the overall direction. |
2010YOUY01
left a comment
There was a problem hiding this comment.
Thank you! I think this is the right direction. The existing two rules both
describe the expected input shape of a plan, so they should be tightly coupled.
I believe this direction can reduce entropy in the long run.
I do not fully understand the details yet, but I will try to spend more time looking
into them and continue the review.
| // EnsureRequirements: merged EnforceDistribution + EnforceSorting into a | ||
| // single idempotent rule with distribution-aware pushdown_sorts. | ||
| // See https://github.com/apache/datafusion/issues/21973 |
There was a problem hiding this comment.
| // EnsureRequirements: merged EnforceDistribution + EnforceSorting into a | |
| // single idempotent rule with distribution-aware pushdown_sorts. | |
| // See https://github.com/apache/datafusion/issues/21973 | |
| // Ensures each input plan satisfies the distribution and ordering requirements | |
| // declared by `ExecutionPlan::required_input_distribution` and | |
| // `ExecutionPlan::required_input_ordering`. | |
| // If the requirements are already satisfied, this rule leaves the plan | |
| // unchanged. For example, it does not add sorting when the input is a file | |
| // scan whose existing order already satisfies the required ordering. | |
| // Otherwise, this rule inserts the necessary repartitioning and sorting | |
| // operators. | |
| // This used to be implemented as two separate rules: `EnforceDistribution` | |
| // and `EnforceSorting`. It is now a single idempotent rule with | |
| // distribution-aware `pushdown_sorts`. | |
| // See https://github.com/apache/datafusion/issues/21973. |
Added more comments since this is the entry point.
I also have a question: What is this 'distribution-aware pushdown_sorts'? We cloud link some reference to it.
There was a problem hiding this comment.
Thanks @2010YOUY01 for review and good suggestion!
Will apply the suggestion in the next push (per alamb's plan I'll land a test-extraction PR first and rebase this on top).
On "distribution-aware pushdown_sorts": the old pushdown_sorts didn't propagate the parent's required_input_distribution, so a SortExec could get pushed below something requiring SinglePartition (e.g. under GlobalLimitExec) — giving an invalid plan because the sort then runs on multiple partitions. The fix propagates the requirement and inserts a SortPreservingMergeExec where needed. Will link to #21973 in the comment block when I update.
alamb
left a comment
There was a problem hiding this comment.
Thanks @zhuqi-lucas -- this is exciting and a really nice idea.
Given how important this infrastructure is for all DataFusion plans I think the key to making this change will be to make sure we don't introduce regressions.
My biggest concern is that I can't really evaluate what, if any, tests were changed / added / removed in this PR (though it is impressive that all slt is passing)
Initial thoughts
- The existing EnforceDistribtuon and EnforceSorting code should be removed
- There are a LOT of existing unit tests for EnforceDistribtuon and EnforceSorting -- it looks like most of these have been ported to the new single pass, but it is really hard for me to verify
Suggested next steps
As there are already several large changes queued up in the 54 release, Maybe we can work to get the tests into shape and plan to merge this PR after we ship 54.
What I suggest we do is:
- Prepare the tests
First, make a separate PR to move all existing tests for enforce sorting and enforce distribution somewhere outside the existing modules. By having the tests in a separate module, it will be much easier to understand what, if anything, changed when you replace EnforceSorting/EnforceDistribtuion with EnforceRequirements
This first PR should be fast/easy to review
- Then rebase this PR
Given the tests are separate from the code, it will then be much easier to see / verify that this unified pass both: 1) has the same test coverage, and 2) keeps the same behavior
| @@ -0,0 +1,782 @@ | |||
| // Licensed to the Apache Software Foundation (ASF) under one | |||
There was a problem hiding this comment.
Why is this a separate module? If we want tests in a separate module, I think putting it in an integration test might be easier to find (datafusion/physical-optimizer/tests/ensure_requirements.rs perhaps?)
| ); | ||
| } | ||
|
|
||
| // ======================================================================== |
There was a problem hiding this comment.
many of these tests look similar/ the same as what is in enforce distribution, etc but it is hard to tell what changed
|
Thanks @alamb. Reading again I see two separate asks:
Will ping you when the inline fix lands and again when the prep PR is up. |
+1 for this |
Thanks @xudong963 , will go this way. |
Thanks @zhuqi-lucas FYI I think we can reorganize the tests before 54 as that has a low chance of introducing bugs (so doesn't need as long a "bake" time) In terms of this PR, my plan for reviewing it is basically
So TLDR is I plan to spend most of my time doing verification rather than code review |
Thanks @alamb ! Test-extraction PR is up: #22117. Hit a snag with the original "move to Pivoted to a new workspace crate Git detects all four moves as 93–98% renames. Once #22117 lands I'll rebase this PR on top, and the resulting diff should be the function-name swap from Happy to discuss the new-crate choice on #22117 if you'd prefer a different structure. |
…ptimizer/tests/ Moves the four physical-optimizer integration test files from `datafusion/core/tests/physical_optimizer/` to `datafusion/physical-optimizer/tests/` so they live alongside the rules they exercise: - enforce_distribution.rs (66 tests) - enforce_sorting.rs (60 tests) - enforce_sorting_monotonicity.rs (64 tests) - replace_with_order_preserving_variants.rs (46 tests) This sets up the review pattern apache#21976 asks for: once EnsureRequirements lands, the rebased PR's diff will show the function-name swap from `EnforceDistribution::new()` / `EnforceSorting::new()` to `EnsureRequirements::new()` on these files plus any net-new tests, instead of the current hard-to-diff mix of ported and net-new inline tests. Mechanics --------- - `git mv` preserves history for all four files. - `physical-optimizer/Cargo.toml` gains the dev-dependencies the integration tests need (`datafusion`, `datafusion-datasource`, `arrow`, `object_store`, etc.). `datafusion` itself appears as a dev-dependency even though `datafusion` depends on this crate; cargo permits this dev-dep cycle because it only affects test builds. - `core/tests/physical_optimizer/test_utils.rs` is *copied* to `physical-optimizer/tests/common/test_utils.rs` (cargo skips subdirectory files when discovering integration tests). The two copies will be reconciled once the remaining physical_optimizer integration tests move alongside their rules. The copy carries an inner `#![allow(dead_code)]` since each binary uses only a subset of the helpers. - `DummyStreamPartition`, previously imported via `crate::memory_limit::DummyStreamPartition`, is inlined into `enforce_sorting.rs` (20 lines, isolated from the `memory_limit` directory module). - `EnforceSortingTest` (previously shared via `crate::physical_optimizer::enforce_sorting`) is duplicated into `enforce_sorting_monotonicity.rs`. Each integration test is now its own crate root and cannot reach into a sibling integration test's `pub(crate)` items. The duplication is intentional and transient — once EnsureRequirements lands the helper goes away with the rule it tests. - Workspace `clippy::needless_pass_by_value` and `dead_code` suppressions previously carried by the parent `mod` declarations are re-applied at file scope where applicable. Verification ------------ - cargo build --workspace --tests - cargo test -p datafusion-physical-optimizer --test enforce_distribution (66 passed) --test enforce_sorting (60 passed) --test enforce_sorting_monotonicity (64 passed) --test replace_with_order_preserving_variants (46 passed) - cargo test -p datafusion --test core_integration physical_optimizer:: (218 passed — the remaining core/tests/physical_optimizer/ tests) - cargo clippy --workspace --tests -- -D warnings - cargo fmt --all --check
…ptimizer/tests/ Moves the four physical-optimizer integration test files from `datafusion/core/tests/physical_optimizer/` to `datafusion/physical-optimizer/tests/` so they live alongside the rules they exercise: - enforce_distribution.rs (66 tests) - enforce_sorting.rs (60 tests) - enforce_sorting_monotonicity.rs (64 tests) - replace_with_order_preserving_variants.rs (46 tests) This sets up the review pattern apache#21976 asks for: once EnsureRequirements lands, the rebased PR's diff will show the function-name swap from `EnforceDistribution::new()` / `EnforceSorting::new()` to `EnsureRequirements::new()` on these files plus any net-new tests, instead of the current hard-to-diff mix of ported and net-new inline tests. Mechanics --------- - `git mv` preserves history for all four files. - `physical-optimizer/Cargo.toml` gains the dev-dependencies the integration tests need (`datafusion`, `datafusion-datasource`, `arrow`, `object_store`, etc.). `datafusion` itself appears as a dev-dependency even though `datafusion` depends on this crate; cargo permits this dev-dep cycle because it only affects test builds. - `core/tests/physical_optimizer/test_utils.rs` is *copied* to `physical-optimizer/tests/common/test_utils.rs` (cargo skips subdirectory files when discovering integration tests). The two copies will be reconciled once the remaining physical_optimizer integration tests move alongside their rules. The copy carries an inner `#![allow(dead_code)]` since each binary uses only a subset of the helpers. - `DummyStreamPartition`, previously imported via `crate::memory_limit::DummyStreamPartition`, is inlined into `enforce_sorting.rs` (20 lines, isolated from the `memory_limit` directory module). - `EnforceSortingTest` (previously shared via `crate::physical_optimizer::enforce_sorting`) is duplicated into `enforce_sorting_monotonicity.rs`. Each integration test is now its own crate root and cannot reach into a sibling integration test's `pub(crate)` items. The duplication is intentional and transient — once EnsureRequirements lands the helper goes away with the rule it tests. - Workspace `clippy::needless_pass_by_value` and `dead_code` suppressions previously carried by the parent `mod` declarations are re-applied at file scope where applicable. Verification ------------ - cargo build --workspace --tests - cargo test -p datafusion-physical-optimizer --test enforce_distribution (66 passed) --test enforce_sorting (60 passed) --test enforce_sorting_monotonicity (64 passed) --test replace_with_order_preserving_variants (46 passed) - cargo test -p datafusion --test core_integration physical_optimizer:: (218 passed — the remaining core/tests/physical_optimizer/ tests) - cargo clippy --workspace --tests -- -D warnings - cargo fmt --all --check
…ptimizer/tests/ Moves the four physical-optimizer integration test files from `datafusion/core/tests/physical_optimizer/` to `datafusion/physical-optimizer/tests/` so they live alongside the rules they exercise: - enforce_distribution.rs (66 tests) - enforce_sorting.rs (60 tests) - enforce_sorting_monotonicity.rs (64 tests) - replace_with_order_preserving_variants.rs (46 tests) This sets up the review pattern apache#21976 asks for: once EnsureRequirements lands, the rebased PR's diff will show the function-name swap from `EnforceDistribution::new()` / `EnforceSorting::new()` to `EnsureRequirements::new()` on these files plus any net-new tests, instead of the current hard-to-diff mix of ported and net-new inline tests. Mechanics --------- - `git mv` preserves history for all four files. - `physical-optimizer/Cargo.toml` gains the dev-dependencies the integration tests need (`datafusion`, `datafusion-datasource`, `arrow`, `object_store`, etc.). `datafusion` itself appears as a dev-dependency even though `datafusion` depends on this crate; cargo permits this dev-dep cycle because it only affects test builds. - `core/tests/physical_optimizer/test_utils.rs` is *copied* to `physical-optimizer/tests/common/test_utils.rs` (cargo skips subdirectory files when discovering integration tests). The two copies will be reconciled once the remaining physical_optimizer integration tests move alongside their rules. The copy carries an inner `#![allow(dead_code)]` since each binary uses only a subset of the helpers. - `DummyStreamPartition`, previously imported via `crate::memory_limit::DummyStreamPartition`, is inlined into `enforce_sorting.rs` (20 lines, isolated from the `memory_limit` directory module). - `EnforceSortingTest` (previously shared via `crate::physical_optimizer::enforce_sorting`) is duplicated into `enforce_sorting_monotonicity.rs`. Each integration test is now its own crate root and cannot reach into a sibling integration test's `pub(crate)` items. The duplication is intentional and transient — once EnsureRequirements lands the helper goes away with the rule it tests. - Workspace `clippy::needless_pass_by_value` and `dead_code` suppressions previously carried by the parent `mod` declarations are re-applied at file scope where applicable. Verification ------------ - cargo build --workspace --tests - cargo test -p datafusion-physical-optimizer --test enforce_distribution (66 passed) --test enforce_sorting (60 passed) --test enforce_sorting_monotonicity (64 passed) --test replace_with_order_preserving_variants (46 passed) - cargo test -p datafusion --test core_integration physical_optimizer:: (218 passed — the remaining core/tests/physical_optimizer/ tests) - cargo clippy --workspace --tests -- -D warnings - cargo fmt --all --check
…icated crate Moves the four physical-optimizer integration test files from `datafusion/core/tests/physical_optimizer/` into a new workspace crate `datafusion-physical-optimizer-tests` (publish = false) so they live adjacent to the optimizer rule crate they exercise: - enforce_distribution.rs (66 tests) - enforce_sorting.rs (60 tests) - enforce_sorting_monotonicity.rs (64 tests) - replace_with_order_preserving_variants.rs (46 tests) This sets up the review pattern apache#21976 asks for: once EnsureRequirements lands, the rebased PR's diff will show the function-name swap from `EnforceDistribution::new()` / `EnforceSorting::new()` to `EnsureRequirements::new()` on these files plus any net-new tests, instead of the current hard-to-diff mix of ported and net-new inline tests. Why a new crate --------------- The simpler "move to `physical-optimizer/tests/`" arrangement runs afoul of `dev/depcheck`: the tests use `datafusion::prelude::SessionContext`, which would require `datafusion = { workspace = true }` as a dev-dependency of `datafusion-physical-optimizer`. Because `datafusion` already runtime-depends on `datafusion-physical-optimizer`, that forms a cycle the project's depcheck (which guards crates.io publish ability) rejects. The new crate is a leaf in the dependency graph — nothing depends on it — so its dev-deps on `datafusion` and `datafusion-physical-optimizer` introduce no cycle. The crate is `publish = false`, following the existing precedent of `test-utils` and `datafusion-wasmtest`. Mechanics --------- - `git mv` preserves history; git detects the four moves as 93–98% renames. - `Cargo.toml` adds the new crate as a workspace member. - `physical-optimizer-tests/Cargo.toml` lists the dev-dependencies the tests need; the crate has no runtime dependencies. The `src/lib.rs` is intentionally empty (only doc comments) — the crate exists purely to host integration tests under `tests/`. - `core/tests/physical_optimizer/mod.rs` drops the four moved `mod` declarations and adds `dead_code` suppression for the now-partially-unused shared `test_utils`. - `test_utils` is referenced via `#[path = "../../core/tests/physical_optimizer/test_utils.rs"] mod test_utils;` so there is a single source of truth shared with the remaining `core/tests/physical_optimizer/` tests. The mod declaration carries `#[allow(dead_code, clippy::allow_attributes)]` because each test binary uses only a subset of the helpers. - `DummyStreamPartition`, previously imported via `crate::memory_limit::DummyStreamPartition`, is inlined into `enforce_sorting.rs` (20 lines). - `EnforceSortingTest` is duplicated into `enforce_sorting_monotonicity.rs` — each integration test is now its own crate root and cannot reach into a sibling integration test's `pub(crate)` items. The duplication is annotated as transient. Verification ------------ - cargo build --workspace --tests - cargo test -p datafusion-physical-optimizer-tests (66 + 60 + 64 + 46 = 236 tests passed) - cargo test -p datafusion --test core_integration physical_optimizer:: (remaining core tests still pass) - cargo clippy --workspace --tests -- -D warnings - cargo fmt --all --check - The new crate is a leaf: `cargo metadata` confirms nothing depends on `datafusion-physical-optimizer-tests`, so depcheck will not find a cycle through it.
…icated crate Moves the four physical-optimizer integration test files from `datafusion/core/tests/physical_optimizer/` into a new workspace crate `datafusion-physical-optimizer-tests` (publish = false) so they live adjacent to the optimizer rule crate they exercise: - enforce_distribution.rs (66 tests) - enforce_sorting.rs (60 tests) - enforce_sorting_monotonicity.rs (64 tests) - replace_with_order_preserving_variants.rs (46 tests) This sets up the review pattern apache#21976 asks for: once EnsureRequirements lands, the rebased PR's diff will show the function-name swap from `EnforceDistribution::new()` / `EnforceSorting::new()` to `EnsureRequirements::new()` on these files plus any net-new tests, instead of the current hard-to-diff mix of ported and net-new inline tests. Why a new crate --------------- The simpler "move to `physical-optimizer/tests/`" arrangement runs afoul of `dev/depcheck`: the tests use `datafusion::prelude::SessionContext`, which would require `datafusion = { workspace = true }` as a dev-dependency of `datafusion-physical-optimizer`. Because `datafusion` already runtime-depends on `datafusion-physical-optimizer`, that forms a cycle the project's depcheck (which guards crates.io publish ability) rejects. The new crate is a leaf in the dependency graph — nothing depends on it — so its dev-deps on `datafusion` and `datafusion-physical-optimizer` introduce no cycle. The crate is `publish = false`, following the existing precedent of `test-utils` and `datafusion-wasmtest`. Mechanics --------- - `git mv` preserves history; git detects the four moves as 93–98% renames. - `Cargo.toml` adds the new crate as a workspace member. - `physical-optimizer-tests/Cargo.toml` lists the dev-dependencies the tests need; the crate has no runtime dependencies. The `src/lib.rs` is intentionally empty (only doc comments) — the crate exists purely to host integration tests under `tests/`. - `core/tests/physical_optimizer/mod.rs` drops the four moved `mod` declarations and adds `dead_code` suppression for the now-partially-unused shared `test_utils`. - `test_utils` is referenced via `#[path = "../../core/tests/physical_optimizer/test_utils.rs"] mod test_utils;` so there is a single source of truth shared with the remaining `core/tests/physical_optimizer/` tests. The mod declaration carries `#[allow(dead_code, clippy::allow_attributes)]` because each test binary uses only a subset of the helpers. - `DummyStreamPartition`, previously imported via `crate::memory_limit::DummyStreamPartition`, is inlined into `enforce_sorting.rs` (20 lines). - `EnforceSortingTest` is duplicated into `enforce_sorting_monotonicity.rs` — each integration test is now its own crate root and cannot reach into a sibling integration test's `pub(crate)` items. The duplication is annotated as transient. Verification ------------ - cargo build --workspace --tests - cargo test -p datafusion-physical-optimizer-tests (66 + 60 + 64 + 46 = 236 tests passed) - cargo test -p datafusion --test core_integration physical_optimizer:: (remaining core tests still pass) - cargo clippy --workspace --tests -- -D warnings - cargo fmt --all --check - The new crate is a leaf: `cargo metadata` confirms nothing depends on `datafusion-physical-optimizer-tests`, so depcheck will not find a cycle through it.
|
Thanks @alamb, that direction worked out really cleanly. All three steps are in now. The old Snapshots refreshed; the consistent pattern is On Hit one CI failure on the hash-collisions job after the first push: |
…ceSorting ## Summary Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules with a single `EnsureRequirements` rule in the default optimizer chain. This makes the composition idempotent by fixing distribution-awareness in `pushdown_sorts` and fetch preservation in `EnforceDistribution`. ## Problem `EnforceDistribution` and `EnforceSorting` are coupled through `SortExec.preserve_partitioning` but run as independent rules. This caused: 1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true` without `SortPreservingMergeExec`, violating `SinglePartition` requirements from `GlobalLimitExec` → `SanityCheckPlan` failure. 2. **Non-idempotent composition**: Running the rules multiple times produced different (sometimes invalid) plans. 3. **Lost fetch values** (apache#14150): `EnforceDistribution` dropped `fetch` from `SortPreservingMergeExec` when stripping and re-adding distribution operators. DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`) and Presto (`AddExchanges`) use a single rule. ## Changes ### `EnsureRequirements` rule (new) - Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()` - Replaces both rules in the default optimizer chain - 53 comprehensive tests including idempotency verification ### Distribution-aware `pushdown_sorts` (fix) - Add `distribution_requirement` field to `ParentRequirements` - New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec` when parent requires `SinglePartition` and input has multiple partitions - Propagate distribution through recursion with `stronger_distribution()` - Reset distribution below partition-merging nodes (SPM, single-partition outputs) ### Fix `EnforceDistribution` fetch preservation (apache#14150) - `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce - `add_merge_on_top()` re-applies saved fetch to new operators ## Testing | Suite | Result | |-------|--------| | EnsureRequirements (new) | 53 passed | | enforce_sorting (existing) | 124 passed, 0 regressions | | enforce_distribution (existing) | 66 passed, 0 regressions | | SLT (465 files) | 1 pre-existing failure only | | **Total** | **243 unit + 464 SLT, 0 new failures** | Idempotency verified: - All partition counts 1-64 - Triple + 10x consecutive optimization passes - SortMergeJoin, HashJoin, Window, Aggregate topologies - PR apache#53/apache#54 regression scenarios - apache#14150 fetch preservation across passes Closes: apache#14150 Part of: apache#21973
…et existing tests at EnsureRequirements
Step 1 — Removed the now-unregistered `EnforceDistribution` and
`EnforceSorting` rule structs (and their `impl PhysicalOptimizerRule`).
`EnsureRequirements` already calls the underlying helpers
(`ensure_distribution`, `ensure_sorting`, the contexts, sort_pushdown,
parallelize_sorts, replace_with_order_preserving_variants, …) directly,
so the helper modules stay in place — only the public rule entry points
are gone.
Step 2 — Retargeted the existing integration tests in
`core/tests/physical_optimizer/` so they exercise EnsureRequirements
instead of the removed rules:
* `enforce_distribution.rs` — `Run::Distribution` / `Run::Sorting`
branches now both call `EnsureRequirements::new()`. The legacy run
sequences are preserved verbatim so the historical test assertions
keep their shape; idempotency makes the previously-different run
orders converge.
* `enforce_sorting.rs` — `EnforceSortingTest` now drives
`EnsureRequirements::new()`. The idempotency comparison test
(was "\[Dist, Sort\] vs \[Sort, Dist, Sort\]") is rewritten as
"running EnsureRequirements N times == running it once", which is
the merged rule's actual guarantee.
* `enforce_sorting_monotonicity.rs` and
`replace_with_order_preserving_variants.rs` needed no source
changes — only snapshots updated via the shared test driver.
78 insta snapshots were updated and reviewed: the consistent diff is
`SortExec + CoalescePartitionsExec` (blocking) →
`SortPreservingMergeExec` (streaming), i.e. EnsureRequirements now
applies parallelize_sorts + replace_with_order_preserving_variants on
inputs the single-rule path didn't reach. These are improvements, not
regressions.
Also dropped one inline test inside `ensure_requirements/mod.rs`
(`test_issue_14150_enforce_distribution_idempotent`) that asserted
idempotency of the standalone EnforceDistribution pass — its successor
`test_issue_14150_fetch_survives_multiple_passes` exercises the same
property end-to-end on EnsureRequirements.
Step 3 (drop the remaining inline / new_tests.rs duplicates added in
apache#21976) is intentionally left for a follow-up commit, pending review
on which genuinely-new-behavior cases to keep.
Test results:
- core_integration physical_optimizer:: 454 passed
- datafusion-physical-optimizer (lib) 79 passed
- clippy --all-targets -- -D warnings clean
- cargo fmt --all --check clean
Step 3 of alamb's review redirection on apache#21976: drop the `new_tests.rs` file added alongside the EnsureRequirements rule. The file was an organizational anomaly — a separate test module with duplicated test helpers (`optimize_and_sanity_check` / `assert_idempotent` are also defined in the inline tests module in `ensure_requirements/mod.rs`) and substantial plan-shape overlap with the inline tests: * Hash joins, aggregates, window functions, projections, fetch preservation, filters, repartitioning, skip+fetch, and nested-plan cases all have idempotency + SanityCheckPlan-validity coverage in `mod.rs` already. * The specific production-bug regressions (apache#14150, PR53, PR54) all live in `mod.rs`. The inline tests in `mod.rs` remain — every one of them exercises either `optimize_and_sanity_check` (production-502 regression property) or `assert_idempotent` (the merged-rule's core property), neither of which is covered by the existing tests in `core/tests/physical_optimizer/`. They are not duplicates of the existing rule tests, just of `new_tests.rs`. Test results after the deletion: - datafusion-physical-optimizer (lib) 59 passed (was 79; 20 from new_tests.rs removed) - core_integration physical_optimizer:: 454 passed (unchanged) - clippy -D warnings clean
d368ad5 to
47cd383
Compare
…pendent After retargeting the test framework to EnsureRequirements, the partition count from ConfigOptions::new() (defaulting to CPU count) appears in Hash(\[…\], N) operators in the optimized plan. Snapshots taken on one machine fail on a machine with a different CPU count, which is why the hash-collisions CI job (4-core runner) was failing after the local acceptance (12-core dev machine). Fix: pin target_partitions = 10 in EnforceSortingTest::run(), matching the convention already used in enforce_distribution.rs. Re-accept the affected snapshots so they reflect the pinned value.
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
Three small follow-ups to the retirement of `EnforceDistribution` /
`EnforceSorting`:
- enforce_distribution.rs: the ~120-line introductory rustdoc that
previously documented the `EnforceDistribution` struct ended up
silently rebound to the unrelated internal `JoinKeyPairs` struct
after the struct deletion. Dropped the orphaned block — the brief
`//` retirement note at the top of the file is enough; the original
content lives in git history.
- enforce_sorting/mod.rs: the module-level doc still introduced
"the EnforceSorting optimizer rule". Rewrote to describe the
module's current role (helpers used by `EnsureRequirements`) while
keeping the worked sort-removal example.
- enforce_sorting/mod.rs: `PlanWithCorrespondingSort` / `PlanWith
CorrespondingCoalescePartitions` docs said the type "was originally
used within the `EnforceSorting` rule". They are still used, just by
`EnsureRequirements` now — reworded.
- ensure_requirements/mod.rs: `test_enforce_distribution_idempotent_hash_join`
had stale `expect("first EnforceDistribution failed")` strings and
a stale assertion message. Now says `EnsureRequirements pass failed`
/ `EnsureRequirements not idempotent`.
Caught a few more stale references on a closer re-read:
- enforce_distribution.rs: module-level doc still introduced "the
EnforceDistribution optimizer rule". Rewrote to describe the
helpers used by EnsureRequirements (mirrors the enforce_sorting
module doc).
- enforce_sorting/sort_pushdown.rs: the doc on ParentRequirements
referenced the [`EnforceSorting`] rule and had a now-broken
intra-doc link to crate::enforce_sorting::EnforceSorting. Rewrote
and removed the dangling link.
- optimizer.rs: three pipeline-position comments still mentioned
EnforceDistribution / EnforceSorting as separate rules. Updated to
point at EnsureRequirements instead.
- ensure_requirements/mod.rs: a couple of internal-reference comments
("PR apache#53", "EnforceSorting's ensure_sorting must insert …",
"EnforceDistribution claims to be idempotent") rephrased to neutral
language and to refer to the current rule structure.
- enforce_sorting.rs (tests): the EnforceSortingTest::run comment
about "this file has 4 rules that use tree node, apply these rules
as in the EnforceSorting::optimize implementation" updated to refer
to EnsureRequirements.
Caught a few more on a third re-read: - ensure_requirements/mod.rs (tests): the internal PR numbers `PR apache#53` / `PR apache#54` and the internal product/team acronyms `UXX0/HRC 502s` were in the test docstrings and function names. Renamed and rewrote so the test names and docs describe the actual scenario (`OutputRequirementExec(SinglePartition)` over a multi-partition source, `pushdown_sorts` through `ProjectionExec`) instead of internal ticket IDs. - combine_partial_final_agg.rs: rule-ordering doc still said "should be applied after the EnforceDistribution and EnforceSorting rules". Updated to refer to `EnsureRequirements`.
… multi-partition The existing `test_issue_14150_fetch_survives_multiple_passes` exercises the apache#14150 code path indirectly — the input plan has no `SortPreservingMergeExec` and lets the optimizer insert one. The original apache#14150 bug, however, bit when the *input* plan already contained an `SPM(fetch=N)`: `remove_dist_changing_operators()` would strip it, then `add_merge_on_top()` re-create an SPM without copying the saved `fetch`. This test rebuilds the exact input shape from the previously-deleted `test_issue_14150_enforce_distribution_idempotent` and runs the plan through `EnsureRequirements::new()` twice. The apache#14150 property holds: `fetch=5` is preserved across both passes. The test only asserts the fetch-preservation property, not byte- identical idempotency on this shape, because the merged rule may legitimately deduplicate the `fetch` field between adjacent operators (e.g. consolidate it onto the surrounding `GlobalLimitExec` and drop it from the SPM). Byte-identical idempotency for realistic input shapes is covered separately by the existing test.
Summary
Replace the separate
EnforceDistributionandEnforceSortingoptimizer rules with a singleEnsureRequirementsrule in the default optimizer chain. Fixpushdown_sortsto be distribution-aware and fix theSortPreservingMergeExec/CoalescePartitionsExecfetch preservation issue from #14150, making the composition idempotent.Epic: #21973
Closes: #14150
Problem
EnforceDistributionandEnforceSortingrun as separate rules, but sorting and distribution are coupled throughSortExec.preserve_partitioning. This caused:SanityCheckPlanvalidation failures on multi-partition sort + limit —pushdown_sortssetpreserve_partitioning=trueon multi-partition input without insertingSortPreservingMergeExec, violating theSinglePartitionrequirement coming fromGlobalLimitExec.EnforceDistributiongenerates invalid plan #14150) —EnforceDistributiondroppedfetchfromSortPreservingMergeExec/CoalescePartitionsExecwhen stripping and re-adding distribution operators.DataFusion was the only major query engine with separate rules — Spark (
EnsureRequirements) and Presto/Trino (AddExchanges) handle both in a single rule.Changes
1.
EnsureRequirementsrule (new, replacesEnforceDistribution+EnforceSortingin the default chain)PhysicalOptimizerRulethat calls the distribution + sorting helpers in one coordinated bottom-up sequence.Arc::new(EnforceDistribution) + Arc::new(EnforceSorting)in the default optimizer chain.2. Distribution-aware
pushdown_sorts(sort_pushdown.rs)distribution_requirement: Distributionfield toParentRequirements.add_sort_above_with_distribution()inutils.rs— insertsSortPreservingMergeExecwhen the parent requiresSinglePartitionand the input has multiple partitions.add_sort_abovecall sites to the distribution-aware variant.stronger_distribution()helper.3. Fix fetch preservation in distribution enforcement (#14150)
remove_dist_changing_operators()now savesfetchfrom removed SPM / Coalesce nodes.add_merge_on_top()re-applies the savedfetchto re-created operators.4. Retire the old rule entry points; retarget existing tests
After review feedback from @alamb (#21976 (comment)), the rule structs and their
impl PhysicalOptimizerRuleblocks have been deleted fromenforce_distribution.rsandenforce_sorting/mod.rs. The internal helpers (ensure_distribution,ensure_sorting, the contexts,parallelize_sorts,replace_with_order_preserving_variants,sort_pushdown, …) stay in place —EnsureRequirementscalls them directly.The existing integration tests in
core/tests/physical_optimizer/now exerciseEnsureRequirementsinstead of the deleted rules:enforce_distribution.rs—Run::Distribution/Run::Sortingbranches both callEnsureRequirements::new(). Legacy run sequences (DISTRIB_DISTRIB_SORT,SORT_DISTRIB_DISTRIB) are preserved verbatim; idempotency makes the previously-different orderings converge to the same plan.enforce_sorting.rs—EnforceSortingTestdrivesEnsureRequirements::new()and pinstarget_partitions = 10so snapshots are deterministic across machines. The historical[Dist, Sort]vs[Sort, Dist, Sort]comparison is rewritten as "runningEnsureRequirementsN times == running it once".enforce_sorting_monotonicity.rs/replace_with_order_preserving_variants.rs— driven through the same test framework; only snapshots updated.A previously-separate
ensure_requirements/new_tests.rs(added in an earlier iteration of this PR) is removed; the same coverage lives in the inline tests inensure_requirements/mod.rs.5. Updated SLT
explain.slt:EnforceDistribution+EnforceSortingcollapse toEnsureRequirementsinEXPLAIN VERBOSEoutput.Snapshot drift
~78 snapshots in the retargeted tests refreshed. The consistent pattern is
SortExec + CoalescePartitionsExec(blocking) →SortPreservingMergeExec(streaming), becauseEnsureRequirementsnow runsparallelize_sorts+replace_with_order_preserving_variantson plan shapes that the single-rule path used to miss. These are improvements, not regressions — but worth a careful look in review since they are visible in the diff.Testing
datafusion-physical-optimizer(lib, inline tests)core_integration physical_optimizer::cargo clippy --all-targets -- -D warningscargo fmt --all --checkIdempotency / regression coverage in the inline tests
parallelize_sorts)OutputRequirementExec+SinglePartitionover multi-partition sourceProjectionExec+ multi-partition +SinglePartitionrequirementArchitecture
Idempotent because:
pushdown_sortsnow carriesdistribution_requirementand usesadd_sort_above_with_distribution, so the second pass never re-violates an earlier-establishedSinglePartitionrequirement.fetchacross strip/re-add cycles.EnsureRequirementsrepeatedly converges (verified across the partition-count sweep, hash-join, sort-merge join, window, projection, and Bug: applying multiple timesEnforceDistributiongenerates invalid plan #14150 regression tests).Next steps (future PRs)
pushdown_sortswork into the bottom-upensure_sortingpass.pushdown_sortstraversal.transform_upfor both distribution + sorting, like Spark'sEnsureRequirements).