From 5f7f97646d17817bf654e6ab7276ad2814776554 Mon Sep 17 00:00:00 2001
From: Qi Zhu <821684824@qq.com>
Date: Thu, 14 May 2026 15:11:53 +0800
Subject: [PATCH 1/3] Add blog: Sort Pushdown in DataFusion: Skip Sorts, Skip
I/O
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
A walkthrough of the sort pushdown work landed and in flight on Apache
DataFusion. Covers:
- Why SortExec is expensive and what `Exact` / `Inexact` ordering mean
at runtime (static `fetch` vs `TopK` dynamic filter).
- Phase 1 (#19064): the `PushdownSort` rule + reverse row-group case.
- Phase 2 (#21182): statistics-based file sort that upgrades
`Unsupported` to `Exact`, eliminating the `SortExec` on
non-overlapping ASC scans. Includes the BufferExec compensation
(#21426) so the SPM above doesn't lose its implicit memory buffer.
- Reverse scans: today's row-group reverse (Inexact, #18817) and the
community decision to wait for arrow-rs page-level reverse (#9937)
before pursuing Exact reverse, after memory-profile pushback on the
original row-group-level proposal.
- Benchmarks: 2.1×-49× on the ASC-LIMIT sort_pushdown suite.
- What's next: the dynamic / TopK-driven path (#21351 merged, #21733,
#21712, #21956, #21580) including the precise RG-pruning vs
mid-stream-early-return distinction, and the EnsureRequirements
unification (#21976).
- Links into the prior dynamic filters and limit pruning posts so the
series reads as a coherent thread.
---
content/blog/2026-05-11-sort-pushdown.md | 592 ++++++++++++++++++
content/images/sort-pushdown/benchmark.svg | 75 +++
.../sort-pushdown/phase1-file-reorder.svg | 88 +++
.../sort-pushdown/phase2-stats-overlap.svg | 79 +++
content/images/sort-pushdown/plan-diff.svg | 70 +++
content/images/sort-pushdown/reverse-scan.svg | 100 +++
6 files changed, 1004 insertions(+)
create mode 100644 content/blog/2026-05-11-sort-pushdown.md
create mode 100644 content/images/sort-pushdown/benchmark.svg
create mode 100644 content/images/sort-pushdown/phase1-file-reorder.svg
create mode 100644 content/images/sort-pushdown/phase2-stats-overlap.svg
create mode 100644 content/images/sort-pushdown/plan-diff.svg
create mode 100644 content/images/sort-pushdown/reverse-scan.svg
diff --git a/content/blog/2026-05-11-sort-pushdown.md b/content/blog/2026-05-11-sort-pushdown.md
new file mode 100644
index 00000000..d8726038
--- /dev/null
+++ b/content/blog/2026-05-11-sort-pushdown.md
@@ -0,0 +1,592 @@
+---
+layout: post
+title: Sort Pushdown in DataFusion: Skip Sorts, Skip I/O
+date: 2026-05-11
+author: Qi Zhu
+categories: [performance]
+---
+
+
+
+[TOC]
+
+*Qi Zhu, [Massive](https://www.massive.com/)*
+
+Many [Apache Parquet] datasets are already sorted on disk. Time-series
+files are usually written in ingestion-time order. Event logs are sharded
+and sorted by event id. Partitioned tables come with a natural ordering
+implied by the partition key. The information about that ordering is
+sitting right there in the file metadata.
+
+[Apache Parquet]: https://parquet.apache.org/
+
+Until recently, [Apache DataFusion] would still re-sort those files on
+every `ORDER BY` query. Every `SELECT ... ORDER BY ts LIMIT 100` did a
+full external sort across the entire scan, even though the data was
+already in that order. CPU wasted. Memory wasted. Streaming defeated.
+
+[Apache DataFusion]: https://datafusion.apache.org/
+
+This post walks through the **sort pushdown** work that closed that gap.
+It is structured in two phases — file rearrangement first, then a
+statistics-based proof of non-overlap — and lands real benchmark
+speedups of **2.1×–49× on common queries**. The same machinery extends
+to `ORDER BY ... DESC`, and the page-level reverse primitive we are
+adding upstream in [arrow-rs] will push the gains further still.
+
+[arrow-rs]: https://github.com/apache/arrow-rs
+
+## TL;DR
+
+* DataFusion can now **skip `SortExec` entirely** when input files are
+ already in the requested order.
+* Two phases:
+ * **Phase 1** — establish the `PushdownSort` rule and the
+ `Exact` / `Inexact` / `Unsupported` protocol; ship the reverse
+ row-group case for `ORDER BY ... DESC` (reports `Inexact`).
+ * **Phase 2** — sort files within each partition by Parquet
+ `min/max` statistics and *prove* non-overlap, upgrading
+ `Unsupported` to `Exact` so `PushdownSort` removes the `SortExec`
+ that `EnforceSorting` inserted earlier.
+* Real-world benchmarks on the `sort_pushdown` suite:
+ `ORDER BY ... LIMIT` queries get **27× and 49× faster**; full
+ `ORDER BY` scans get **~2×** faster.
+* Reverse scans (`ORDER BY ... DESC`) ride the same machinery: a
+ merged row-group-level reverse returns `Inexact` (Sort stays, but
+ `TopK` terminates early); the page-level reverse primitive needed
+ for `Exact` reverse — and so for full `SortExec` removal on `DESC`
+ queries — is in flight in arrow-rs.
+
+## Why Sort Pushdown Matters
+
+`SortExec` is one of the most expensive operators in a query plan.
+It is blocking by construction — no row can leave until every input
+row has been seen and compared — so it tends to dominate both latency
+and peak memory. The cost gets paid even when:
+
+* the file is already ordered by the sort key (very common for
+ timestamp columns);
+* the query only needs the top *N* rows (`ORDER BY ts LIMIT 100`), in
+ which case full sort + truncate is wildly wasteful;
+* the next operator (`SortPreservingMergeExec`, `SortMergeJoinExec`,
+ a window function) was going to consume ordered input anyway.
+
+The data DataFusion needs to avoid this work is **already in the file
+metadata**. Parquet writers can record per-column statistics (`min`,
+`max`) at the row-group level. Files written by Spark, DuckDB,
+arrow-rs, and others routinely include them. And explicit `WITH ORDER`
+clauses in DataFusion's SQL `CREATE EXTERNAL TABLE` give the optimizer
+a direct ordering hint. The job of sort pushdown is to **use that
+information**.
+
+## How DataFusion Tracks Ordering
+
+
+
+Each `FileScanConfig` carries an `output_ordering` — the ordering
+that the optimizer is willing to claim for the scan's output. There
+are two flavours:
+
+* **`Exact`** — the optimizer is *certain* the output is in this order.
+ Sort-handling rules treat an `Exact` ordering as a proof and **remove
+ the surrounding `SortExec`**. ([`EnforceSorting`] does this when the
+ scan declares `Exact` from the start; the sort pushdown rule covered
+ in this post does the same upgrade later in the pipeline.)
+* **`Inexact`** — the optimizer *believes* the output is probably
+ ordered, but cannot prove it. Downstream operators like
+ `SortPreservingMergeExec` can still benefit from this hint, but the
+ explicit `SortExec` stays for safety.
+
+[`EnforceSorting`]: https://docs.rs/datafusion-physical-optimizer/latest/datafusion_physical_optimizer/enforce_sorting/struct.EnforceSorting.html
+
+A helper called `validated_output_ordering()` is the gatekeeper. It
+walks the list of files inside a partition, checks whether the
+declared per-file ordering is consistent with the file order on disk,
+and either confirms the ordering or **strips it entirely** if it
+sees something ambiguous (e.g. file `b` comes before file `a` in the
+file list but file `a`'s range comes first).
+
+### `Exact` and `Inexact` at runtime
+
+`Exact` and `Inexact` lead to different runtime behaviour, and
+distinguishing them up front makes the rest of this post easier to
+follow:
+
+* With **`Exact`**, the `SortExec` is removed and the LIMIT becomes
+ a **static fetch** on the source. The reader stops the moment the
+ requested number of rows has been emitted — early termination
+ at batch granularity, no dynamic state needed.
+* With **`Inexact`**, the `SortExec` stays in place. The LIMIT
+ materialises inside the sort as a `TopK` heap of size K. `TopK`
+ exposes a [**dynamic filter**][dyn-filters-blog] — a runtime
+ expression of the form *"only rows that could still beat the
+ current K-th-best value are worth considering"* — and pushes it
+ back to the parquet scanner. As more data is processed and the
+ heap tightens, the filter's threshold tightens with it, and entire
+ row groups can be skipped by checking the live threshold against
+ the row group's min/max statistics. (See the earlier
+ [dynamic filters][dyn-filters-blog] and [limit pruning][limit-pruning-blog]
+ posts for the full background on this mechanism.)
+
+Both paths use the same underlying min/max statistics, but for
+different purposes: `Exact` uses them at plan time to prove
+non-overlap and justify removing the sort; `Inexact` uses them at
+runtime to skip row groups that can no longer improve the heap.
+
+[dyn-filters-blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/
+[limit-pruning-blog]: https://datafusion.apache.org/blog/2026/03/20/limit-pruning/
+
+The diagram above shows the result we want: the plan after sort
+pushdown loses the `SortExec` node. Everything downstream — the
+`SortPreservingMergeExec`, the `RepartitionExec`, the
+`DataSourceExec` — was already in the plan. We just need the
+optimizer to convince itself that the bottom of the plan is
+producing the order requested.
+
+## Phase 1: The Pushdown API and Reverse Scans
+
+Phase 1 ([#19064]) introduced the **`PushdownSort`** physical
+optimizer rule and a uniform API for asking each `ExecutionPlan` two
+questions:
+
+[#19064]: https://github.com/apache/datafusion/pull/19064
+
+1. "Can you produce output in *this* ordering?"
+2. "If yes, please rearrange yourself so that it actually does."
+
+The protocol uses three results — `Exact`, `Inexact`, `Unsupported` —
+that downstream operators can interpret uniformly. The Parquet
+`FileSource` answers by comparing the requested ordering against the
+per-file declared ordering: if natural ordering satisfies the request,
+it returns `Exact`; if the *reverse* of the declared ordering does,
+it returns `Inexact` and flips on `reverse_row_groups=true` so the
+scan reads row groups from last to first (the row-group-level reverse
+covered later in this post); otherwise it returns `Unsupported`.
+
+Phase 1's scope was deliberately narrow. It set up the API and
+delivered the reverse-scan case end-to-end, but it did **not** add
+any statistics-based file rearrangement — that came later in Phase 2.
+A finer-grained extension that reorders row groups *within* each file
+by min/max statistics — so the row group with the best sort-key value
+is read first and TopK can tighten its threshold faster — is also
+in progress in [#21956].
+
+Phase 1 also produced a useful side improvement:
+
+* **Reverse-output redesign** ([#19446], [#19557]) extended the same
+ rule to `DESC` queries — picked up again in the reverse-scan
+ section below.
+
+[#19446]: https://github.com/apache/datafusion/pull/19446
+[#19557]: https://github.com/apache/datafusion/pull/19557
+
+## Phase 2: Use Statistics to Prove Non-Overlap
+
+
+
+Phase 1 left a sharp edge that motivated Phase 2 ([#21182]). Consider
+this realistic scenario:
+
+[#21182]: https://github.com/apache/datafusion/pull/21182
+
+* Three files: `a.parquet`, `b.parquet`, `c.parquet`.
+* Each declares `WITH ORDER (ts ASC)`.
+* Internally each file *is* sorted by `ts`.
+* But they were written by different ingestion jobs and end up listed
+ in the **wrong order** on disk (e.g. alphabetical by name, not by
+ time).
+
+`validated_output_ordering()` looks at this, sees that the
+file-internal ordering disagrees with the file-list order, and
+**strips the ordering entirely**. From the optimizer's point of view
+the scan now has no declared ordering, so `EnforceSorting` (which runs
+earlier in the pipeline) inserts a `SortExec`. The data is sorted on
+disk; the optimizer just can't tell.
+
+Phase 2 fixes this in `PushdownSort`, which runs late — after
+`EnforceDistribution` and `EnforceSorting` have already shaped the
+plan. When `PushdownSort` finds a `SortExec` above a file scan whose
+ordering was stripped (a `FileSource` `Unsupported` result), it does
+three things inside `FileScanConfig::try_pushdown_sort`:
+
+1. **Sort the file list by per-file statistics on the sort
+ column(s)** within each file group (the diagram above). The
+ pre-existing [`MinMaxStatistics`] helper (introduced in [#9593])
+ reads each file's `column_statistics[c].min_value` /
+ `.max_value` for each sort column `c`, then sorts the file list by
+ the min row. Phase 2 wires this helper into the optimizer's
+ `Unsupported` branch — `sort_files_within_groups_by_statistics`
+ does the per-group orchestration and decides whether any group is
+ non-overlapping after the sort.
+2. **Check adjacency within each group**: walk each sorted file group
+ independently and ask whether `file[i].max ≤ file[i+1].min` for
+ every adjacent pair (touching at the boundary is fine — value `v`
+ showing up as the last row of one file and the first row of the
+ next still produces a sorted stream). The check is **per file
+ group**, not across groups; cross-group ordering is the job of
+ `SortPreservingMergeExec` at runtime (more on this below).
+3. **Upgrade `Unsupported` to `Exact`** when adjacency holds, the
+ table has a declared `output_ordering` (from `WITH ORDER` or
+ parquet `sorting_columns`), and the sort columns are null-free —
+ the last condition preserves `NULLS LAST`/`NULLS FIRST` semantics
+ across file boundaries. `PushdownSort` then removes the `SortExec`
+ itself and the plan becomes streamable.
+
+[`MinMaxStatistics`]: https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/statistics.rs
+[#9593]: https://github.com/apache/datafusion/pull/9593
+
+One caveat that comes straight from `MinMaxStatistics`: the stats
+sort only fires when every `ORDER BY` expression is a plain column.
+`ORDER BY date_trunc('hour', ts)` silently skips the upgrade — there
+is no per-file min/max for the function output to compare against.
+Extending sort pushdown across monotonic function wrappers is one of
+the open follow-ups.
+
+
+
+The diagram above contrasts the two cases. On the left, ranges are
+non-overlapping after sort, so we can guarantee that emitting the
+files in min-order produces a globally sorted stream. On the right,
+the ranges overlap, so even after sorting the files by `min(ts)` we
+cannot guarantee global ordering — Phase 2 correctly bails out and
+keeps `SortExec` in place.
+
+The implementation handles a few edge cases worth calling out:
+
+* **Buffering the eliminated `SortExec`.** When the `SortExec` was
+ sitting under a `SortPreservingMergeExec` with
+ `preserve_partitioning=true`, it wasn't just sorting — it was also
+ acting as an *implicit in-memory buffer* for the SPM above it. The
+ SPM picks rows from each partition stream one at a time; without
+ the upstream `SortExec` holding batches in memory, the SPM would
+ read directly from I/O-bound sources and stall on every pick. Phase
+ 2 compensates by inserting a [`BufferExec`] in the `SortExec`'s
+ place — bounded streaming buffer, same throughput shape, no
+ blocking sort. Capacity is configurable via
+ [`sort_pushdown_buffer_capacity`] ([#21426]).
+* **`fetch` preservation** through `EnforceDistribution`. The
+ distribution rule sometimes strips a `SortExec`'s `fetch` field and
+ re-adds the node later. Phase 2 plumbs `fetch` through so a
+ surviving `LIMIT` is not lost.
+* **Per-group, not global, non-overlap.** Phase 2's adjacency check is
+ scoped to each file group. Two file groups can have *overlapping*
+ ranges and the upgrade still fires, as long as each group is
+ internally non-overlapping. That works because each group already
+ produces an independently ordered stream at runtime, and
+ `SortPreservingMergeExec` then picks rows across streams in value
+ order to produce the final globally sorted output. Phase 2 only has
+ to prove the per-stream property.
+* **Single-partition vs multi-partition execution**. With the default
+ multi-partition setup, `EnforceDistribution` byte-range-splits files
+ into single-file groups, after which `validated_output_ordering()`
+ works correctly on its own. Phase 2 only triggers when files have
+ not been split — typically `--partitions 1` runs, or files small
+ enough that the splitter leaves them alone. In the typical `--partitions
+ 1` case the "per-group" distinction collapses (one group equals the
+ whole table), which is why the example earlier in this section is
+ drawn that way.
+
+[`BufferExec`]: https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/buffer.rs
+[`sort_pushdown_buffer_capacity`]: https://github.com/apache/datafusion/pull/21426
+[#21426]: https://github.com/apache/datafusion/pull/21426
+
+## Benchmarks
+
+
+
+The [`sort_pushdown`] benchmark suite reproduces the
+"wrong-order file list" scenario by generating Parquet files whose
+names are intentionally reversed against their sort-key ranges. Numbers
+below are `--partitions 1`, release build, on the merged Phase 2
+branch versus `main`:
+
+[`sort_pushdown`]: https://github.com/apache/datafusion/tree/main/benchmarks/queries/sort_pushdown
+
+| Query | Before | After | Speedup |
+| ------------------------------------------- | -------:| -------:| -------:|
+| Q1 — `ORDER BY key` (full scan) | 259 ms | 122 ms | **2.1×** |
+| Q2 — `ORDER BY key LIMIT 100` | 80 ms | 3 ms | **27×** |
+| Q3 — `SELECT * ORDER BY key` | 700 ms | 313 ms | **2.2×** |
+| Q4 — `SELECT * ORDER BY key LIMIT 100` | 342 ms | 7 ms | **49×** |
+
+The shape of the speedup is what you would expect once `SortExec` is
+removed:
+
+* **Full-scan queries (Q1, Q3)** still have to push every row through
+ the pipeline, so the gain is "just" the cost of the sort itself —
+ roughly half the original time. This matches the rule of thumb that
+ a blocking sort doubles end-to-end latency on data that fits in
+ memory.
+* **`LIMIT` queries (Q2, Q4)** benefit much more because removing
+ `SortExec` converts the LIMIT into a static `fetch` on the data
+ source — the reader stops the moment K rows have been emitted,
+ instead of reading the full file, sorting, and truncating.
+ This is the "early termination at batch granularity" case from
+ the runtime-difference section above. A 342 ms full-file scan
+ collapses into a 7 ms K-row read.
+
+It is worth saying explicitly what this change does **not** affect.
+The default multi-partition execution path is unchanged: those plans
+already produced correct orderings via byte-range splitting, so
+Phase 2 simply does not trigger. There is no regression and no behavior
+change for the typical multi-threaded query.
+
+## Reverse Scans for `ORDER BY ... DESC`
+
+
+
+`ORDER BY ts DESC` is the same problem in reverse. If a file is sorted
+ascending and the query wants descending, we should be able to skip
+the sort — we just need to read the data in the opposite order.
+
+The first iteration of this lives in [#18817] and operates at the
+**row group** level: it reverses the *iteration order of row groups*
+so the last RG is opened first, but rows within each RG are still
+decoded forward. The resulting stream is "RGs descending × rows
+ascending" — close to the requested order, but not strictly DESC. The
+optimizer therefore reports this as `Inexact` and leaves the
+`SortExec` in place; the win is that `TopK`'s dynamic filter tightens
+much faster, because the very first row groups read already contain
+values near the final answer. A tight threshold means subsequent row
+groups can be skipped via min/max statistics. This ships today and
+is what powers fast `ORDER BY ts DESC LIMIT N` on ASC-sorted files.
+
+[#18817]: https://github.com/apache/datafusion/pull/18817
+
+To turn this into `Exact` reverse — so the `SortExec` can be removed
+outright — each emitted batch itself has to be in DESC order. The
+straightforward row-group-level approach (decode an entire RG forward,
+materialize all rows, reverse the buffer, then emit) is correct and
+was actually proposed first, in an earlier iteration of this work
+([#18817], later closed and split into smaller pieces). Review
+feedback there — primarily from [@2010YOUY01] — flagged the memory
+profile as too aggressive: caching an entire row group's worth of
+decoded rows before any batch can be emitted is roughly:
+
+* **Peak buffer of one whole row group** (~128 MB by default), versus
+ the few-MB-per-batch streaming profile readers normally have.
+* **First-batch latency = full last-row-group decode**. For
+ `ORDER BY ts DESC LIMIT 10` that means decoding ~1 million rows to
+ return 10 — defeating the point of the `LIMIT`.
+
+The agreed direction coming out of that discussion was to ship the
+narrower `Inexact` row-group-reverse first (which became Phase 1 in
+[#19064]), and to build `Exact` reverse on a finer-grained primitive
+once `arrow-rs` exposed one.
+
+That primitive is the **page-level** reverse traversal. Parquet's
+`OffsetIndex` already gives us byte-precise locations for every data
+page in a column chunk, so we can `seek` directly to the last page,
+decode it forward, reverse the resulting batch, and emit. Peak buffer
+drops to one page (~1 MB) and first-batch latency drops to the cost
+of one page decode — the row-group-level memory cliff disappears.
+
+We are landing this primitive upstream in arrow-rs as
+[#9937], with the discussion in [#9934]. Early numbers on a 100k-row,
+98-page column chunk show **~50× faster time-to-first-N** for `n ≤ 1
+page` and **~9× faster** for `n` spanning 10 pages, compared with the
+row-group-level Exact reverse described above. The DataFusion-side
+integration that turns this primitive into an `Exact` result is a
+follow-up to #9937 and is gated on its merge.
+
+[@2010YOUY01]: https://github.com/2010YOUY01
+
+[#9937]: https://github.com/apache/arrow-rs/pull/9937
+[#9934]: https://github.com/apache/arrow-rs/issues/9934
+
+One natural question: why not reverse the rows *within* a page
+directly? Because we can't. Parquet's page encodings (RLE, dictionary,
+delta, bit-packing) are all forward streams — you cannot decode the
+last value without decoding every value that came before it. The
+design therefore is: **reverse the page traversal, forward-decode
+each page, reverse the resulting RecordBatch**. This is the algorithm
+shape that DataFusion's Phase-2 `RecordBatchReader` integration will
+use once arrow-rs ships the primitive.
+
+The killer use case is **filtered reverse TopK**:
+
+```sql
+SELECT * FROM events
+WHERE user_id = 42
+ORDER BY ts DESC
+LIMIT 10
+```
+
+Here `RowSelection::with_limit` cannot help — you don't know in
+advance which rows match `user_id = 42`, so you can't pre-compute a
+selection of the "last 10 matching rows". The only correct strategy
+is to stream pages backward, evaluate the filter on each, and stop
+when 10 matches are collected. Row-group reverse stops at a
+~128 MB granularity. Page reverse stops at ~1 MB granularity. For a
+selective filter, the saving compounds.
+
+## What's Next
+
+Sort pushdown is a long-running line of work and there is more to do.
+Beyond the `Exact` path described above, there is a complementary
+**dynamic / TopK-driven path** that helps when `Exact` cannot apply —
+e.g. when file ranges genuinely overlap, or when the sort is on a
+function output rather than a plain column. The two directions are
+not alternatives; they compose:
+
+* **`Exact` reverse for `ORDER BY ... DESC`.** Today's row-group
+ reverse returns `Inexact` and the `SortExec` stays on top; the
+ arrow-rs page-level reverse primitive ([#9937]) is what unlocks
+ `Exact` reverse on `DESC` queries (and therefore full `SortExec`
+ elimination on `DESC`). Memory + first-batch latency rule out doing
+ the same thing at the row-group level. Gated on #9937.
+* **Dynamic / TopK-driven path.** When `Exact` cannot fire, `TopK`'s
+ [dynamic filter][dyn-filters-blog] still benefits enormously from
+ reading the *best* data first. This thread also builds on the
+ [limit pruning][limit-pruning-blog] work that turned `LIMIT` into
+ an I/O optimization across the pruning pipeline. The
+ recently-merged morsel-style work scheduling in `FileStream`
+ ([#21351]) gives sibling partitions a *shared work queue* with
+ file-level work-stealing — no CPU sits idle when one partition
+ runs out of files. The proposed [#21733] sorts files in
+ that shared queue by per-file statistics *before* any partition
+ picks, so the first file read is globally optimal and tightens the
+ dynamic filter immediately. Combined with **TopK threshold init from
+ parquet statistics** ([#21712]) and **row-group reorder within each
+ file** ([#21956]), the threshold can be set before reading a single
+ byte. The combined statistics-driven `TopK` pipeline is in flight
+ as [#21580].
+
+ The mechanism here is **RG-level pruning, not mid-stream early
+ return**. With the threshold known up front, the parquet
+ `PruningPredicate` rejects entire row groups against their min/max
+ statistics before any I/O — those row groups are never decoded.
+ The row group(s) the reader *does* open still have their sort
+ column decoded in full to feed the dynamic filter. On the #21580
+ microbenchmark (single file, 61 sorted row groups, `--partitions 1`),
+ **60 of the 61 row groups are skipped** and only one is decoded:
+
+ | Query | Baseline | With pipeline | Speedup |
+ | ------------------------------ | -------: | ------------: | ------: |
+ | `ORDER BY col DESC LIMIT 100` | 28.5 ms | 1.64 ms | **17×** |
+ | `ORDER BY col DESC LIMIT 1000` | 22.2 ms | 0.37 ms | **60×** |
+ | `SELECT * ORDER BY ... LIMIT 100` | 22.5 ms | 0.66 ms | **34×** |
+ | `SELECT * ORDER BY ... LIMIT 1000` | 22.4 ms | 0.61 ms | **37×** |
+
+ The stack reports `Inexact` — the `SortExec` stays on top to
+ enforce correctness across overlapping ranges — so this path
+ cannot do *true* mid-stream early return. Once the parquet reader
+ opens a row group, the sort column has to be decoded all the way
+ through; once a `FileStream` picks up a file from the shared work
+ queue, it has to finish that file. Today's dynamic work scheduling
+ ([#21351]) is **file-granular**: idle partitions stop pulling
+ new files from the queue once a global limit is satisfied, but
+ the partition that's currently inside a file decodes that file's
+ remaining row groups regardless. Mid-file RG-level early return
+ on `TopK` convergence is **not implemented yet** — the work
+ queue holds `PartitionedFile`, not row-group descriptors.
+
+ Closing the tap the moment `TopK` has K confirmed winners therefore
+ needs either:
+
+ * the **`Exact` path**, where the `SortExec` is gone entirely and
+ the data source's own `fetch` becomes a static limit that the
+ reader can honour at batch granularity; or
+ * **finer-grained dynamic scheduling** — having the shared queue
+ hold row-group descriptors instead of whole files, so a partition
+ can release its current file's remaining row groups back to the
+ pool once a global signal says enough TopK winners have been
+ found. This is a natural extension of [#21351] and [#21733] but
+ is not yet on a PR.
+
+ The three mechanisms compose. Stats pruning saves the row groups
+ that *can't* matter (skipped without I/O). The dynamic filter
+ narrows what's decoded inside the row groups the reader does
+ open. `Exact` or finer-grained scheduling is what eventually
+ closes the tap once `TopK` is satisfied.
+* **Phase 3 — filter + sort early termination.** `WHERE filter ORDER
+ BY ts DESC LIMIT N` is the dominant observability query shape and
+ the one where the arrow-rs page-reverse primitive matters most:
+ `RowSelection::with_limit` cannot pre-compute the last `N` matching
+ rows when the filter is selective, so the only correct strategy is
+ to stream pages backward, evaluate the filter, and stop when `N`
+ matches are collected. The DataFusion-side integration is the
+ follow-up to #9937.
+* **Unifying `EnforceDistribution` and `EnforceSorting`** into a
+ single `EnsureRequirements` rule ([#21976]). The two existing rules
+ are coupled through `SortExec.preserve_partitioning`, which makes
+ their composition non-idempotent and has caused a class of
+ production bugs. Other engines (Spark's `EnsureRequirements`,
+ Trino's `AddExchanges`) handle both in a single rule. Merging them
+ also gives future sort-related optimizations a single coherent place
+ to live. In progress.
+* **OFFSET pushdown to parquet** ([#21828]) so `ORDER BY ts LIMIT K
+ OFFSET N` queries can skip the first `N` rows at the row-group level
+ instead of decoding and discarding them. In progress.
+
+[#21976]: https://github.com/apache/datafusion/pull/21976
+[#21956]: https://github.com/apache/datafusion/pull/21956
+[#21712]: https://github.com/apache/datafusion/pull/21712
+[#21580]: https://github.com/apache/datafusion/pull/21580
+[#21828]: https://github.com/apache/datafusion/pull/21828
+[#21351]: https://github.com/apache/datafusion/pull/21351
+[#21733]: https://github.com/apache/datafusion/issues/21733
+
+Concretely useful issues for new contributors:
+
+* [#17348] — the umbrella issue for sort pushdown.
+* [#21317] — sort pushdown: reorder row groups by statistics within
+ each file.
+* [#19394] — add more `ExecutionPlan` impls to support sort pushdown.
+
+[#17348]: https://github.com/apache/datafusion/issues/17348
+[#21317]: https://github.com/apache/datafusion/issues/21317
+[#19394]: https://github.com/apache/datafusion/issues/19394
+
+## Acknowledgements
+
+Thank you to [@alamb], [@adriangb], [@xudong963], [@2010YOUY01], and
+[@Dandandan] for reviewing the design and the patches across many
+iterations. The DataFusion community's willingness to engage deeply
+with optimizer changes — including the ones that touch foundational
+invariants — is what made this work possible.
+
+[@alamb]: https://github.com/alamb
+[@adriangb]: https://github.com/adriangb
+[@xudong963]: https://github.com/xudong963
+[@2010YOUY01]: https://github.com/2010YOUY01
+[@Dandandan]: https://github.com/Dandandan
+
+## References
+
+Prior posts this work builds on:
+
+* [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries][dyn-filters-blog] — the dynamic filter primitive `TopK` uses.
+* [Turning LIMIT into an I/O Optimization: Inside DataFusion's Multi-Layer Pruning Stack][limit-pruning-blog] — the pruning pipeline this work plugs into.
+
+Issues and PRs:
+
+* Umbrella issue: [apache/datafusion#17348][#17348]
+* `MinMaxStatistics` foundation: [apache/datafusion#9593][#9593]
+* Phase 1: [apache/datafusion#19064][#19064]
+* Phase 2: [apache/datafusion#21182][#21182]
+* `BufferExec` capacity for sort elimination: [apache/datafusion#21426][#21426]
+* Dynamic / TopK-driven path: [apache/datafusion#21351][#21351] (morsel-style work scheduling),
+ [apache/datafusion#21733][#21733] (global file reorder in shared queue)
+* Benchmark suite: [`sort_pushdown`]
+* Row-group reverse scan: [apache/datafusion#18817][#18817]
+* Page-level reverse (arrow-rs): [apache/arrow-rs#9934][#9934],
+ [apache/arrow-rs#9937][#9937]
+* `EnsureRequirements`: [apache/datafusion#21976][#21976]
diff --git a/content/images/sort-pushdown/benchmark.svg b/content/images/sort-pushdown/benchmark.svg
new file mode 100644
index 00000000..30afb7b2
--- /dev/null
+++ b/content/images/sort-pushdown/benchmark.svg
@@ -0,0 +1,75 @@
+
diff --git a/content/images/sort-pushdown/phase1-file-reorder.svg b/content/images/sort-pushdown/phase1-file-reorder.svg
new file mode 100644
index 00000000..9ae798ba
--- /dev/null
+++ b/content/images/sort-pushdown/phase1-file-reorder.svg
@@ -0,0 +1,88 @@
+
diff --git a/content/images/sort-pushdown/phase2-stats-overlap.svg b/content/images/sort-pushdown/phase2-stats-overlap.svg
new file mode 100644
index 00000000..027860ef
--- /dev/null
+++ b/content/images/sort-pushdown/phase2-stats-overlap.svg
@@ -0,0 +1,79 @@
+
diff --git a/content/images/sort-pushdown/plan-diff.svg b/content/images/sort-pushdown/plan-diff.svg
new file mode 100644
index 00000000..a4d08673
--- /dev/null
+++ b/content/images/sort-pushdown/plan-diff.svg
@@ -0,0 +1,70 @@
+
diff --git a/content/images/sort-pushdown/reverse-scan.svg b/content/images/sort-pushdown/reverse-scan.svg
new file mode 100644
index 00000000..443a0a1c
--- /dev/null
+++ b/content/images/sort-pushdown/reverse-scan.svg
@@ -0,0 +1,100 @@
+
From 47e45dd28d7b2d2f07b2ce1e344ce25f82391de9 Mon Sep 17 00:00:00 2001
From: Qi Zhu <821684824@qq.com>
Date: Thu, 14 May 2026 15:34:17 +0800
Subject: [PATCH 2/3] Push draft date to 2026-05-25
---
...{2026-05-11-sort-pushdown.md => 2026-05-25-sort-pushdown.md} | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
rename content/blog/{2026-05-11-sort-pushdown.md => 2026-05-25-sort-pushdown.md} (99%)
diff --git a/content/blog/2026-05-11-sort-pushdown.md b/content/blog/2026-05-25-sort-pushdown.md
similarity index 99%
rename from content/blog/2026-05-11-sort-pushdown.md
rename to content/blog/2026-05-25-sort-pushdown.md
index d8726038..8dd9d8b8 100644
--- a/content/blog/2026-05-11-sort-pushdown.md
+++ b/content/blog/2026-05-25-sort-pushdown.md
@@ -1,7 +1,7 @@
---
layout: post
title: Sort Pushdown in DataFusion: Skip Sorts, Skip I/O
-date: 2026-05-11
+date: 2026-05-25
author: Qi Zhu
categories: [performance]
---
From dab94fd74037eb6c1f57861574caa0497fcea9ae Mon Sep 17 00:00:00 2001
From: Qi Zhu <821684824@qq.com>
Date: Fri, 15 May 2026 17:40:51 +0800
Subject: [PATCH 3/3] Add Phase 3 section covering #21956 runtime reorder
pipeline
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
The post previously only mentioned #21956 in passing. The PR is
landing the full mechanism — `try_pushdown_sort` decision tree,
two flags on `ParquetSource`, three composable runtime steps
(file reorder + RG reorder + reverse), and a `sort_prefix`-
preserving short-circuit — so cover it as a dedicated phase
between Phase 2 and the existing reverse-scan section.
- TL;DR: add a Phase 3 bullet alongside Phase 1 and Phase 2.
- Phase 1: replace the in-flight `#21956` aside with a cross-link
to the new section.
- Phase 2: keep the caveat about function-wrapped sorts but note
that #21956's `Inexact` path now covers them via monotonicity
inference.
- New `## Phase 3` section with two SVG diagrams: a decision tree
for the three protocol outcomes, and a three-step pipeline for
the `Inexact` runtime. Covers the two-flag design, the nested
file/RG layers, when EXPLAIN surfaces each flag, and four
scenarios where Phase 3 does not fire (aggregations, multi-
column secondary keys, function-wrapped sorts without a
declared ordering, source declares a forward prefix of the
request).
- "What's Next": rename the old "Phase 3 — filter + sort" bullet
to "Filtered reverse TopK end-to-end" so the label doesn't
clash with the new section, and add a follow-up bullet
referencing #22198 for multi-column / function-wrapped reorder.
---
content/blog/2026-05-25-sort-pushdown.md | 178 ++++++++++++++++--
.../images/sort-pushdown/pr21956-decision.svg | 66 +++++++
.../pr21956-runtime-pipeline.svg | 69 +++++++
3 files changed, 298 insertions(+), 15 deletions(-)
create mode 100644 content/images/sort-pushdown/pr21956-decision.svg
create mode 100644 content/images/sort-pushdown/pr21956-runtime-pipeline.svg
diff --git a/content/blog/2026-05-25-sort-pushdown.md b/content/blog/2026-05-25-sort-pushdown.md
index 8dd9d8b8..32366246 100644
--- a/content/blog/2026-05-25-sort-pushdown.md
+++ b/content/blog/2026-05-25-sort-pushdown.md
@@ -56,8 +56,10 @@ adding upstream in [arrow-rs] will push the gains further still.
## TL;DR
* DataFusion can now **skip `SortExec` entirely** when input files are
- already in the requested order.
-* Two phases:
+ already in the requested order, and **read the most-promising data
+ first** when they aren't — so `TopK` converges fast and the rest
+ gets pruned by statistics.
+* Three phases:
* **Phase 1** — establish the `PushdownSort` rule and the
`Exact` / `Inexact` / `Unsupported` protocol; ship the reverse
row-group case for `ORDER BY ... DESC` (reports `Inexact`).
@@ -65,9 +67,16 @@ adding upstream in [arrow-rs] will push the gains further still.
`min/max` statistics and *prove* non-overlap, upgrading
`Unsupported` to `Exact` so `PushdownSort` removes the `SortExec`
that `EnforceSorting` inserted earlier.
-* Real-world benchmarks on the `sort_pushdown` suite:
- `ORDER BY ... LIMIT` queries get **27× and 49× faster**; full
- `ORDER BY` scans get **~2×** faster.
+ * **Phase 3** ([#21956]) — generalise `Inexact`: whenever the
+ leading sort key is a plain column in the file schema (or the
+ source's reversed declared ordering satisfies the request),
+ `try_pushdown_sort` stamps two flags on the source and the
+ opener runs a three-step runtime pipeline — file-level reorder
+ in the shared morsel queue, row-group reorder by min/max stats,
+ then optional iteration reverse for `DESC` requests.
+* Real-world benchmarks on the `sort_pushdown` suite (Phase 2's
+ `Exact` upgrade): `ORDER BY ... LIMIT` queries get **27× and 49×
+ faster**; full `ORDER BY` scans get **~2×** faster.
* Reverse scans (`ORDER BY ... DESC`) ride the same machinery: a
merged row-group-level reverse returns `Inexact` (Sort stays, but
`TopK` terminates early); the page-level reverse primitive needed
@@ -183,10 +192,9 @@ covered later in this post); otherwise it returns `Unsupported`.
Phase 1's scope was deliberately narrow. It set up the API and
delivered the reverse-scan case end-to-end, but it did **not** add
any statistics-based file rearrangement — that came later in Phase 2.
-A finer-grained extension that reorders row groups *within* each file
-by min/max statistics — so the row group with the best sort-key value
-is read first and TopK can tighten its threshold faster — is also
-in progress in [#21956].
+A finer-grained extension that broadens this `Inexact` path with a
+three-step runtime reorder pipeline lands in [#21956] — covered in
+[Phase 3](#phase-3-runtime-reorder-for-inexact-pushdown) below.
Phase 1 also produced a useful side improvement:
@@ -259,6 +267,12 @@ is no per-file min/max for the function output to compare against.
Extending sort pushdown across monotonic function wrappers is one of
the open follow-ups.
+*(Within #21956's `Inexact` path, `EquivalenceProperties`'s
+monotonicity inference does let function-wrapped sorts benefit from
+row-group iteration reverse when the source declares a compatible
+natural ordering — but stats-based reorder still needs a plain
+column.)*
+
The diagram above contrasts the two cases. On the left, ranges are
@@ -348,6 +362,130 @@ already produced correct orderings via byte-range splitting, so
Phase 2 simply does not trigger. There is no regression and no behavior
change for the typical multi-threaded query.
+## Phase 3: Runtime Reorder for Inexact Pushdown
+
+Phase 2 handles the `Exact` upgrade — strong correctness, sort
+elimination — but only when the table has a declared
+`output_ordering` *and* the files are provably non-overlapping after
+sorting by min. Two large classes of queries fall outside that
+window:
+
+* **Unsorted data** — no `WITH ORDER`, no parquet `sorting_columns`.
+ Phase 2 cannot fire because there is no ordering claim to upgrade.
+* **Overlapping ranges** — files written by different ingestion
+ jobs share time windows. Phase 2 keeps the `SortExec` because the
+ global ordering can't be proven, even though the files often do
+ contain large stretches of in-order data.
+
+For both, a full external `SortExec` is overkill. The parquet
+metadata is right there, and reading the *most-promising* data
+first lets `TopK`'s dynamic filter threshold tighten quickly so the
+rest gets pruned. Phase 3 ([#21956]) wires that up by generalising
+the `Inexact` path Phase 1 introduced.
+
+### `try_pushdown_sort` — one decision, three outcomes
+
+
+
+The `Exact` / `Inexact` / `Unsupported` protocol from Phase 1 stays.
+Phase 3 broadens the **conditions** that route a query into
+`Inexact`:
+
+| Condition | Outcome |
+| --- | --- |
+| `eq_properties.ordering_satisfy(request)` | `Exact` — Phase 1 / 2 sort elimination |
+| Leading sort key is a plain `Column` in the file schema, **or** the source's reversed declared ordering satisfies the request | `Inexact` — Phase 3 runtime pipeline |
+| Neither | `Unsupported` — `SortExec` stays, no source-side optimisation |
+
+The "reversed satisfies" branch is what handles function-wrapped
+sorts (`date_trunc('day', ts) DESC`, `ceil(value) DESC`,
+`CAST(x AS Date) DESC`) — `EquivalenceProperties`'s monotonicity
+reasoning recognises that `f(col) DESC` is satisfied by `col ASC`
+reversed, even though parquet has no stats keyed by `f(col)`
+itself.
+
+### Two flags on `ParquetSource`, three runtime steps
+
+
+
+When `try_pushdown_sort` returns `Inexact`, it stamps two fields on
+the `ParquetSource`:
+
+```rust
+struct ParquetSource {
+ sort_order_for_reorder: Option, // what to reorder by
+ reverse_row_groups: bool, // whether to flip iteration
+ // ...
+}
+```
+
+The opener reads them at scan time to drive three composable steps:
+
+1. **File-level reorder.** `FileSource::reorder_files` sits in the
+ shared morsel queue (the [#21351] work-stealing primitive) and
+ sorts the partitioned-file list by `min(col)`. The first file
+ picked across all partitions is globally the most-promising one.
+2. **Row-group-level reorder.** Once a file is opened,
+ `PreparedAccessPlan::reorder_by_statistics` sorts that file's
+ `row_group_indexes` by `min(col)` ASC. The row group most likely
+ to contribute to `TopK` is decoded first.
+3. **Reverse.** For `DESC` requests,
+ `PreparedAccessPlan::reverse` flips the iteration after the
+ stats reorder normalises everything to ASC-by-min. Same
+ primitive Phase 1 introduced for declared reverse scans — Phase
+ 3 just routes more queries through it.
+
+The two layers **nest by construction**: file `i`'s `min(col)` is
+a lower bound on every row group inside it, so the file queue's
+order is a natural prefix of the within-file row-group order.
+Choosing the same key (`min`) in both layers keeps the strategies
+consistent.
+
+`reverse_row_groups`'s meaning depends on which way `Inexact` was
+reached. When the column-in-schema condition fires, the stats
+reorder produces ASC-by-min, so `reverse_row_groups` simply mirrors
+the request direction. When only the reversed-equivalence
+condition fires (function-wrapped case with a declared source
+ordering), `reverse_row_groups` is `true` unconditionally — there
+is no stats reorder to compose with, just a flip of the file's
+natural order.
+
+Both flags surface on the `DataSourceExec` line in `EXPLAIN` so
+plan inspection and snapshot tests can confirm the pushdown fired:
+
+```text
+DataSourceExec: file_groups=..., file_type=parquet,
+ sort_order_for_reorder=[a@0 ASC], reverse_row_groups=true
+```
+
+Absence of either flag means the corresponding runtime step is a
+no-op.
+
+### When Phase 3 does *not* fire
+
+* **Aggregations on top of the sort key.** `SELECT URL, COUNT(*) AS c
+ FROM hits GROUP BY URL ORDER BY c DESC LIMIT 10` (the ClickBench
+ TopK shape) — the leading sort key (`c`) is an aggregation result
+ and has no per-RG stats in the parquet file, so the
+ column-in-schema check fails. Pushing sort metadata through
+ `AggregateExec` is a separate problem: the aggregated value
+ doesn't exist before aggregation, so even if the metadata reached
+ the scan there'd be nothing actionable to do with it.
+* **Multi-column sort secondary keys.** The reorder currently only
+ uses the leading sort expression — secondary keys are ignored.
+ Tracked as a follow-up in [#22198].
+* **Function-wrapped sort without a source-declared ordering.**
+ Without a declared ordering to invert, the reversed-equivalence
+ branch has nothing to satisfy. Tracked in the same follow-up.
+* **Source declares a forward prefix of the request.** When the
+ source's declared `output_ordering` is a non-empty proper prefix
+ of the request (e.g. source `[a DESC, b ASC]`, request
+ `[a DESC, b ASC, c DESC]`), `try_pushdown_sort` returns
+ `Unsupported` so the surrounding `SortExec` can keep its
+ `sort_prefix` annotation — prefix-aware early termination in
+ `TopK` is strictly better than the Phase 3 reorder on data that
+ is already in prefix order on disk.
+
## Reverse Scans for `ORDER BY ... DESC`
@@ -464,9 +602,11 @@ not alternatives; they compose:
that shared queue by per-file statistics *before* any partition
picks, so the first file read is globally optimal and tightens the
dynamic filter immediately. Combined with **TopK threshold init from
- parquet statistics** ([#21712]) and **row-group reorder within each
- file** ([#21956]), the threshold can be set before reading a single
- byte. The combined statistics-driven `TopK` pipeline is in flight
+ parquet statistics** ([#21712]) and **`try_pushdown_sort` driving
+ runtime row-group / file reorder + reverse** ([#21956], landed),
+ the threshold can be set before reading a single byte. The reorder
+ mechanism applies to any `ORDER BY [LIMIT N]` on
+ parquet, not just TopK queries with a dynamic filter. The combined statistics-driven `TopK` pipeline is in flight
as [#21580].
The mechanism here is **RG-level pruning, not mid-stream early
@@ -516,9 +656,9 @@ not alternatives; they compose:
narrows what's decoded inside the row groups the reader does
open. `Exact` or finer-grained scheduling is what eventually
closes the tap once `TopK` is satisfied.
-* **Phase 3 — filter + sort early termination.** `WHERE filter ORDER
- BY ts DESC LIMIT N` is the dominant observability query shape and
- the one where the arrow-rs page-reverse primitive matters most:
+* **Filtered reverse TopK end-to-end.** `WHERE filter ORDER BY ts
+ DESC LIMIT N` is the dominant observability query shape and the
+ one where the arrow-rs page-reverse primitive matters most:
`RowSelection::with_limit` cannot pre-compute the last `N` matching
rows when the filter is selective, so the only correct strategy is
to stream pages backward, evaluate the filter, and stop when `N`
@@ -535,9 +675,17 @@ not alternatives; they compose:
* **OFFSET pushdown to parquet** ([#21828]) so `ORDER BY ts LIMIT K
OFFSET N` queries can skip the first `N` rows at the row-group level
instead of decoding and discarding them. In progress.
+* **Multi-column and function-wrapped reorder follow-ups** ([#22198]).
+ The reorder mechanism in #21956 currently only uses the leading
+ sort key and only fires on plain columns. Lexicographic multi-key
+ reorder via `arrow::compute::lexsort_to_indices` is low-hanging
+ fruit; extending to monotonic function wrappers via leaf-column
+ extraction (e.g. `date_trunc('day', ts)` → use `min(ts)`) needs a
+ bit more `EquivalenceProperties` integration but is doable.
[#21976]: https://github.com/apache/datafusion/pull/21976
[#21956]: https://github.com/apache/datafusion/pull/21956
+[#22198]: https://github.com/apache/datafusion/issues/22198
[#21712]: https://github.com/apache/datafusion/pull/21712
[#21580]: https://github.com/apache/datafusion/pull/21580
[#21828]: https://github.com/apache/datafusion/pull/21828
diff --git a/content/images/sort-pushdown/pr21956-decision.svg b/content/images/sort-pushdown/pr21956-decision.svg
new file mode 100644
index 00000000..a8203241
--- /dev/null
+++ b/content/images/sort-pushdown/pr21956-decision.svg
@@ -0,0 +1,66 @@
+
diff --git a/content/images/sort-pushdown/pr21956-runtime-pipeline.svg b/content/images/sort-pushdown/pr21956-runtime-pipeline.svg
new file mode 100644
index 00000000..5bb8d678
--- /dev/null
+++ b/content/images/sort-pushdown/pr21956-runtime-pipeline.svg
@@ -0,0 +1,69 @@
+