From 7c347ede9053a6cdfcc5a042be8713d5e730834e Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 9 Apr 2026 13:37:33 +0800 Subject: [PATCH 01/10] Add exact reverse scan with per-RG buffering and tests When exact_reverse is enabled via with_exact_reverse(true) on ParquetSource: - try_reverse_output returns Exact (Sort operator removed, fetch pushdown enabled) - ReversedRowGroupStream buffers batches per row group, reverses batch order, and reverses rows within each batch. Memory: O(largest_row_group). - Default (exact_reverse=false) returns Inexact (backward compatible) Row reversal is done in ReversedRowGroupStream (per-RG buffer), NOT in the per-batch map closure. This ensures correct ordering across batch boundaries within a row group. Tests added: - test_exact_reverse_scan_per_rg_buffer: multi-RG, small batch_size, verifies [6,5,4,3,2,1] - test_inexact_reverse_scan_preserves_row_order: verifies [4,5,6,1,2,3] - test_reversed_row_group_stream_standalone: unit test for ReversedRowGroupStream - test_exact_reverse_returns_exact: option returns Exact - test_default_returns_inexact: default returns Inexact Based on the approach from apache/datafusion#18817. --- datafusion/datasource-parquet/src/opener.rs | 158 +++++++++++++++++- datafusion/datasource-parquet/src/source.rs | 127 ++++++++++++-- .../physical-optimizer/src/pushdown_sort.rs | 42 ++++- 3 files changed, 313 insertions(+), 14 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 962b78f394354..64ce07ecfe1c1 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -29,7 +29,7 @@ use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -122,6 +122,8 @@ pub(super) struct ParquetOpener { /// discard partially-matched row groups because they may contain rows that /// sort before fully-matched groups. pub preserve_order: bool, + /// Whether to reverse rows within each batch (for Exact reverse scan) + pub reverse_rows: bool, } /// Represents a prepared access plan with optional row selection @@ -279,6 +281,7 @@ impl FileOpener for ParquetOpener { let max_predicate_cache_size = self.max_predicate_cache_size; let reverse_row_groups = self.reverse_row_groups; + let reverse_rows = self.reverse_rows; Ok(Box::pin(async move { #[cfg(feature = "parquet_encryption")] let file_decryption_properties = encryption_context @@ -563,6 +566,18 @@ impl FileOpener for ParquetOpener { prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; } + // Collect per-RG row counts for exact reverse buffering + let rg_row_counts: Vec = if reverse_rows { + let rg_metadata = file_metadata.row_groups(); + prepared_plan + .row_group_indexes + .iter() + .map(|&idx| rg_metadata[idx].num_rows() as usize) + .collect() + } else { + vec![] + }; + // Apply the prepared plan to the builder builder = prepared_plan.apply_to_builder(builder); @@ -609,6 +624,9 @@ impl FileOpener for ParquetOpener { &predicate_cache_inner_records, &predicate_cache_records, ); + // Note: per-batch row reversal is handled by ReversedRowGroupStream + // (wraps the stream below), NOT here. Reversing per-batch here would + // double-reverse when combined with the RG-level buffer+reverse. b = projector.project_batch(&b)?; if replace_schema { // Ensure the output batch has the expected schema. @@ -685,6 +703,15 @@ impl FileOpener for ParquetOpener { }) }); + // When exact reverse is enabled, wrap the stream to buffer + // and reverse rows per row group. Memory cost: O(largest_RG). + let stream: futures::stream::BoxStream<'static, Result> = + if reverse_rows { + ReversedRowGroupStream::new(stream, rg_row_counts).boxed() + } else { + stream.boxed() + }; + if let Some(file_pruner) = file_pruner { Ok(EarlyStoppingStream::new( stream, @@ -699,6 +726,127 @@ impl FileOpener for ParquetOpener { } } +/// Buffers batches per row group, then emits them in reversed order with +/// reversed rows within each batch. Memory: O(largest row group). +/// +/// The input stream has row groups already in reversed order (via +/// `PreparedAccessPlan::reverse`). This stream reverses the row order +/// *within* each row group so the final output is in exact descending order. +struct ReversedRowGroupStream { + inner: S, + /// Number of rows in each row group (in read order, already reversed) + rg_row_counts: Vec, + /// Index of the current row group being buffered + current_rg: usize, + /// Rows remaining in the current row group + rows_remaining_in_rg: usize, + /// Buffered batches for the current row group + buffer: Vec, + /// Reversed batches ready to emit + output_buffer: VecDeque, + /// Whether the inner stream is exhausted + done: bool, +} + +impl ReversedRowGroupStream { + fn new(inner: S, rg_row_counts: Vec) -> Self { + let rows_remaining = rg_row_counts.first().copied().unwrap_or(0); + Self { + inner, + rg_row_counts, + current_rg: 0, + rows_remaining_in_rg: rows_remaining, + buffer: Vec::new(), + output_buffer: VecDeque::new(), + done: false, + } + } + + /// Reverse the buffered batches: reverse batch order, reverse rows + /// within each batch, and move them to output_buffer. + fn flush_buffer(&mut self) -> Result<()> { + let batches = std::mem::take(&mut self.buffer); + for batch in batches.into_iter().rev() { + if batch.num_rows() <= 1 { + self.output_buffer.push_back(batch); + continue; + } + let indices = arrow::array::UInt32Array::from_iter_values( + (0..batch.num_rows() as u32).rev(), + ); + let reversed = arrow::compute::take_record_batch(&batch, &indices)?; + self.output_buffer.push_back(reversed); + } + // Advance to next row group + self.current_rg += 1; + self.rows_remaining_in_rg = self + .rg_row_counts + .get(self.current_rg) + .copied() + .unwrap_or(0); + Ok(()) + } +} + +impl Stream for ReversedRowGroupStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + use Poll; + + // First, emit any already-reversed batches + if let Some(batch) = self.output_buffer.pop_front() { + return Poll::Ready(Some(Ok(batch))); + } + + if self.done { + return Poll::Ready(None); + } + + // Pull batches from the inner stream until we complete a row group + loop { + match ready!(self.inner.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let num_rows = batch.num_rows(); + self.buffer.push(batch); + self.rows_remaining_in_rg = + self.rows_remaining_in_rg.saturating_sub(num_rows); + + if self.rows_remaining_in_rg == 0 { + // Row group complete — flush buffer + if let Err(e) = self.flush_buffer() { + return Poll::Ready(Some(Err(e))); + } + if let Some(batch) = self.output_buffer.pop_front() { + return Poll::Ready(Some(Ok(batch))); + } + } + } + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => { + self.done = true; + // Flush any remaining buffered batches + if !self.buffer.is_empty() + && let Err(e) = self.flush_buffer() + { + return Poll::Ready(Some(Err(e))); + } + if let Some(batch) = self.output_buffer.pop_front() { + return Poll::Ready(Some(Ok(batch))); + } + return Poll::Ready(None); + } + } + } + } +} + /// Copies metrics from ArrowReaderMetrics (the metrics collected by the /// arrow-rs parquet reader) to the parquet file metrics for DataFusion fn copy_arrow_reader_metrics( @@ -1035,6 +1183,7 @@ fn should_enable_page_index( #[cfg(test)] mod test { + use std::pin::Pin; use std::sync::Arc; use super::{ConstantColumns, constant_columns_from_stats}; @@ -1085,6 +1234,7 @@ mod test { max_predicate_cache_size: Option, reverse_row_groups: bool, preserve_order: bool, + reverse_rows: bool, } impl ParquetOpenerBuilder { @@ -1111,6 +1261,7 @@ mod test { max_predicate_cache_size: None, reverse_row_groups: false, preserve_order: false, + reverse_rows: false, } } @@ -1231,6 +1382,7 @@ mod test { max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, preserve_order: self.preserve_order, + reverse_rows: self.reverse_rows, } } } @@ -1319,7 +1471,7 @@ mod test { } async fn count_batches_and_rows( - mut stream: std::pin::Pin< + mut stream: Pin< Box< dyn Stream> + Send, @@ -1337,7 +1489,7 @@ mod test { /// Helper to collect all int32 values from the first column of batches async fn collect_int32_values( - mut stream: std::pin::Pin< + mut stream: Pin< Box< dyn Stream> + Send, diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 6eee12e5c609d..8b8930fc91395 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -288,11 +288,19 @@ pub struct ParquetSource { pub(crate) projection: ProjectionExprs, #[cfg(feature = "parquet_encryption")] pub(crate) encryption_factory: Option>, - /// If true, read files in reverse order and reverse row groups within files. - /// But it's not guaranteed that rows within row groups are in reverse order, - /// so we still need to sort them after reading, so the reverse scan is inexact. - /// Used to optimize ORDER BY ... DESC on sorted data. + /// If true, read row groups in reverse order within each file. + /// Combined with `reverse_rows`, controls the sort pushdown behavior: + /// - `reverse_row_groups=true, reverse_rows=false`: Inexact (RGs reversed, rows within RG not) + /// - `reverse_row_groups=true, reverse_rows=true`: Exact (both RGs and rows reversed) reverse_row_groups: bool, + /// If true, reverse the row order within each batch after reading. + /// This gives exact descending order when combined with `reverse_row_groups`. + reverse_rows: bool, + /// If true, `try_reverse_output` returns `Exact` (reverse_row_groups + reverse_rows), + /// allowing the Sort operator to be removed entirely and fetch to be pushed down. + /// If false (default), returns `Inexact` (only reverse_row_groups), preserving + /// backward-compatible behavior where Sort is kept as TopK. + exact_reverse: bool, } impl ParquetSource { @@ -318,6 +326,8 @@ impl ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: None, reverse_row_groups: false, + reverse_rows: false, + exact_reverse: false, } } @@ -481,6 +491,25 @@ impl ParquetSource { pub fn reverse_row_groups(&self) -> bool { self.reverse_row_groups } + + /// Enable or disable exact reverse scanning. + /// + /// When `true`, `try_reverse_output` returns `Exact` (both row groups and + /// rows within each batch are reversed), allowing the Sort operator to be + /// removed entirely and fetch/limit to be pushed down to the scan. + /// + /// When `false` (default), `try_reverse_output` returns `Inexact` (only row + /// groups are reversed), preserving backward-compatible behavior where the + /// Sort operator is kept as TopK. + pub fn with_exact_reverse(mut self, exact_reverse: bool) -> Self { + self.exact_reverse = exact_reverse; + self + } + + /// Returns whether exact reverse scanning is enabled. + pub fn exact_reverse(&self) -> bool { + self.exact_reverse + } } /// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit @@ -568,6 +597,7 @@ impl FileSource for ParquetSource { max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, preserve_order: !base_config.output_ordering.is_empty(), + reverse_rows: self.reverse_rows, }); Ok(opener) } @@ -804,12 +834,24 @@ impl FileSource for ParquetSource { return Ok(SortOrderPushdownResult::Unsupported); } - // Return Inexact because we're only reversing row group order, - // not guaranteeing perfect row-level ordering - let new_source = self.clone().with_reverse_row_groups(true); - Ok(SortOrderPushdownResult::Inexact { - inner: Arc::new(new_source) as Arc, - }) + let new_source: Arc = if self.exact_reverse { + // Exact: reverse both row groups and rows within each batch, + // giving globally sorted output. This allows the Sort operator + // to be removed entirely and fetch to be pushed down to the scan. + let mut source = self.clone().with_reverse_row_groups(true); + source.reverse_rows = true; + Arc::new(source) + } else { + // Inexact: only reverse row groups. The Sort operator stays + // (as TopK) but benefits from early termination. + Arc::new(self.clone().with_reverse_row_groups(true)) + }; + + if self.exact_reverse { + Ok(SortOrderPushdownResult::Exact { inner: new_source }) + } else { + Ok(SortOrderPushdownResult::Inexact { inner: new_source }) + } // TODO Phase 2: Add support for other optimizations: // - File reordering based on min/max statistics @@ -917,4 +959,69 @@ mod tests { assert!(source.reverse_row_groups()); assert!(source.filter().is_some()); } + + #[test] + fn test_exact_reverse_returns_exact() { + use arrow::compute::SortOptions; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_physical_expr::EquivalenceProperties; + use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; + use datafusion_physical_plan::SortOrderPushdownResult; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])); + + let source = ParquetSource::new(schema.clone()).with_exact_reverse(true); + + // Build equivalence properties with ASC ordering + let sort_expr = PhysicalSortExpr { + expr: Arc::new(datafusion_physical_expr::expressions::Column::new("a", 0)), + options: SortOptions::default(), // ASC NULLS LAST + }; + let mut eq_properties = EquivalenceProperties::new(schema); + eq_properties.add_orderings(vec![vec![sort_expr.clone()]]); + + // Request DESC ordering (reverse of source) + let desc_expr = sort_expr.reverse(); + + let result = source + .try_reverse_output(&[desc_expr], &eq_properties) + .unwrap(); + + assert!( + matches!(result, SortOrderPushdownResult::Exact { .. }), + "with_exact_reverse(true) should return Exact" + ); + } + + #[test] + fn test_default_returns_inexact() { + use arrow::compute::SortOptions; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_physical_expr::EquivalenceProperties; + use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; + use datafusion_physical_plan::SortOrderPushdownResult; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])); + + // Default: exact_reverse is false + let source = ParquetSource::new(schema.clone()); + + let sort_expr = PhysicalSortExpr { + expr: Arc::new(datafusion_physical_expr::expressions::Column::new("a", 0)), + options: SortOptions::default(), + }; + let mut eq_properties = EquivalenceProperties::new(schema); + eq_properties.add_orderings(vec![vec![sort_expr.clone()]]); + + let desc_expr = sort_expr.reverse(); + + let result = source + .try_reverse_output(&[desc_expr], &eq_properties) + .unwrap(); + + assert!( + matches!(result, SortOrderPushdownResult::Inexact { .. }), + "default (exact_reverse=false) should return Inexact" + ); + } } diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 1fa15492d2a92..00fa95f570e70 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -95,7 +95,16 @@ impl PhysicalOptimizerRule for PushdownSort { // Each node type defines its own pushdown behavior via try_pushdown_sort() match sort_input.try_pushdown_sort(required_ordering)? { SortOrderPushdownResult::Exact { inner } => { - // Data source guarantees perfect ordering - remove the Sort operator + // Data source guarantees perfect ordering - remove the Sort. + // If Sort had a fetch (TopK), push it into the inner plan + // tree for file-level early termination. Traverse + // single-child nodes (Projection, Cooperative) to reach + // the leaf data source that supports with_fetch. + let inner = if let Some(fetch) = sort_exec.fetch() { + push_fetch_into_plan(inner, fetch) + } else { + inner + }; Ok(Transformed::yes(inner)) } SortOrderPushdownResult::Inexact { inner } => { @@ -127,3 +136,34 @@ impl PhysicalOptimizerRule for PushdownSort { true } } + +/// Push fetch (limit) into a plan tree for Exact sort pushdown. +/// +/// Traverses single-child nodes (ProjectionExec, CooperativeExec, etc.) +/// to find the deepest node that supports `with_fetch` (typically +/// FileScanExec or DataSourceExec) and sets fetch on it. +/// +/// Falls back to wrapping with GlobalLimitExec if no node supports it. +fn push_fetch_into_plan( + plan: Arc, + fetch: usize, +) -> Arc { + // Try with_fetch on the current node + if let Some(plan_with_fetch) = plan.with_fetch(Some(fetch)) { + return plan_with_fetch; + } + + // Single-child node: recurse into child, then rebuild parent + let children = plan.children(); + if children.len() == 1 { + let child = Arc::clone(children[0]); + let new_child = push_fetch_into_plan(child, fetch); + if let Ok(rebuilt) = Arc::clone(&plan).with_new_children(vec![new_child]) { + return rebuilt; + } + } + + // Fallback: wrap with GlobalLimitExec + use datafusion_physical_plan::limit::GlobalLimitExec; + Arc::new(GlobalLimitExec::new(plan, 0, Some(fetch))) +} From fdfa2c773ce44621962e863b989a535276f17082 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 15 Apr 2026 11:47:03 +0800 Subject: [PATCH 02/10] Add session config, limit-after-reverse fix, SLT tests, and plan display for exact reverse scan - Add `enable_exact_reverse_scan` to ParquetOptions (default false) - Wire config through `with_table_parquet_options` to set `exact_reverse` - Fix limit correctness: skip passing limit to parquet reader when reverse_rows=true; apply limit in ReversedRowGroupStream after reversal - Display `scan_direction=Reversed` for exact, `reverse_row_groups=true` for inexact - Add 4 snapshot tests for exact reverse (removes Sort, fetch pushdown, through projection) - Add 8 SLT tests: EXPLAIN plans + result verification, LIMIT, OFFSET+LIMIT, ASC unchanged, toggle off --- datafusion/common/src/config.rs | 7 + .../common/src/file_options/parquet_writer.rs | 1 + .../tests/physical_optimizer/pushdown_sort.rs | 122 ++++++++++++++++- .../tests/physical_optimizer/test_utils.rs | 17 +++ datafusion/datasource-parquet/src/opener.rs | 46 ++++++- datafusion/datasource-parquet/src/source.rs | 7 +- .../sqllogictest/test_files/sort_pushdown.slt | 127 ++++++++++++++++++ 7 files changed, 315 insertions(+), 12 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 14dacbc7e9536..935de21fc503e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -732,6 +732,13 @@ config_namespace! { /// parquet reader setting. 0 means no caching. pub max_predicate_cache_size: Option, default = None + /// (reading) If true, reverse scans produce exact descending order + /// by reversing rows within each row group. This allows the Sort + /// operator to be removed entirely and fetch/limit to be pushed + /// down to the scan. If false (default), reverse scans only reverse + /// row group order (inexact), keeping TopK above for final sorting. + pub enable_exact_reverse_scan: bool, default = false + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 196cb96f3832d..4c33fa3883b01 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -209,6 +209,7 @@ impl ParquetOptions { coerce_int96: _, // not used for writer props skip_arrow_metadata: _, max_predicate_cache_size: _, + enable_exact_reverse_scan: _, // reads not used for writer props } = self; let mut builder = WriterProperties::builder() diff --git a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs index caef0fba052cb..1fc4a6d8a6b68 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs @@ -33,9 +33,9 @@ use std::sync::Arc; use crate::physical_optimizer::test_utils::{ OptimizationTest, coalesce_batches_exec, coalesce_partitions_exec, parquet_exec, - parquet_exec_with_sort, projection_exec, projection_exec_with_alias, - repartition_exec, schema, simple_projection_exec, sort_exec, sort_exec_with_fetch, - sort_expr, sort_expr_named, test_scan_with_ordering, + parquet_exec_with_sort, parquet_exec_with_sort_exact_reverse, projection_exec, + projection_exec_with_alias, repartition_exec, schema, simple_projection_exec, + sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_named, test_scan_with_ordering, }; #[test] @@ -1038,3 +1038,119 @@ fn test_sort_pushdown_with_test_scan_arbitrary_ordering() { " ); } + +// ============================================================================ +// EXACT REVERSE SCAN TESTS +// ============================================================================ +// These tests verify behavior when exact_reverse is enabled on ParquetSource. +// With exact reverse, the Sort operator is removed entirely and fetch is pushed +// down to the scan. + +#[test] +fn test_exact_reverse_removes_sort() { + // With exact_reverse=true, Sort should be removed entirely + let schema = schema(); + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = + parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]); + + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec(desc_ordering, source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed + " + ); +} + +#[test] +fn test_exact_reverse_with_fetch_pushes_limit() { + // With exact_reverse=true, Sort with fetch should be removed and fetch + // pushed down to the scan + let schema = schema(); + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = + parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]); + + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec_with_fetch(desc_ordering, Some(10), source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], limit=10, output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed + " + ); +} + +#[test] +fn test_exact_reverse_through_projection_with_fetch() { + // Exact reverse with fetch pushes through projection + let schema = schema(); + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = + parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]); + + let projection = simple_projection_exec(source, vec![0, 1]); + + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec_with_fetch(desc_ordering, Some(5), projection); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: TopK(fetch=5), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - ProjectionExec: expr=[a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - ProjectionExec: expr=[a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], limit=5, output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed + " + ); +} + +#[test] +fn test_exact_reverse_without_fetch_no_limit() { + // Exact reverse without fetch: Sort removed, no limit on scan + let schema = schema(); + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = + parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]); + + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec(desc_ordering, source); // no fetch + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed + " + ); +} diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 5b50181d7fd3e..71e32c16f558a 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -101,6 +101,23 @@ pub(crate) fn parquet_exec_with_sort( DataSourceExec::from_data_source(config) } +/// Create a single parquet file that is sorted with exact_reverse enabled +pub(crate) fn parquet_exec_with_sort_exact_reverse( + schema: SchemaRef, + output_ordering: Vec, +) -> Arc { + let source = ParquetSource::new(schema).with_exact_reverse(true); + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::new(source), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_output_ordering(output_ordering) + .build(); + + DataSourceExec::from_data_source(config) +} + fn int64_stats() -> ColumnStatistics { ColumnStatistics { null_count: Precision::Absent, diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 64ce07ecfe1c1..b88fb4d0689de 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -581,8 +581,14 @@ impl FileOpener for ParquetOpener { // Apply the prepared plan to the builder builder = prepared_plan.apply_to_builder(builder); + // When reverse_rows is enabled, limit must be applied AFTER row + // reversal (in ReversedRowGroupStream), not at the parquet reader + // level. Applying limit here would read the first N rows in forward + // order and then reverse them, giving wrong results. if let Some(limit) = limit { - builder = builder.with_limit(limit) + if !reverse_rows { + builder = builder.with_limit(limit) + } } if let Some(max_predicate_cache_size) = max_predicate_cache_size { @@ -705,9 +711,11 @@ impl FileOpener for ParquetOpener { // When exact reverse is enabled, wrap the stream to buffer // and reverse rows per row group. Memory cost: O(largest_RG). + // The limit is applied here (after reversal) instead of at the + // parquet reader level so that we get the correct reversed rows. let stream: futures::stream::BoxStream<'static, Result> = if reverse_rows { - ReversedRowGroupStream::new(stream, rg_row_counts).boxed() + ReversedRowGroupStream::new(stream, rg_row_counts, limit).boxed() } else { stream.boxed() }; @@ -746,10 +754,12 @@ struct ReversedRowGroupStream { output_buffer: VecDeque, /// Whether the inner stream is exhausted done: bool, + /// Optional row limit (applied after reversal for correct results) + remaining_limit: Option, } impl ReversedRowGroupStream { - fn new(inner: S, rg_row_counts: Vec) -> Self { + fn new(inner: S, rg_row_counts: Vec, limit: Option) -> Self { let rows_remaining = rg_row_counts.first().copied().unwrap_or(0); Self { inner, @@ -759,6 +769,25 @@ impl ReversedRowGroupStream { buffer: Vec::new(), output_buffer: VecDeque::new(), done: false, + remaining_limit: limit, + } + } + + /// Truncate batch to remaining limit and update the counter. + /// Returns the (possibly truncated) batch. + fn apply_limit(&mut self, batch: RecordBatch) -> RecordBatch { + if let Some(remaining) = self.remaining_limit.as_mut() { + let rows = batch.num_rows(); + if rows <= *remaining { + *remaining -= rows; + batch + } else { + let truncated = batch.slice(0, *remaining); + *remaining = 0; + truncated + } + } else { + batch } } @@ -798,11 +827,14 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - use Poll; + // Check if limit has been reached + if self.remaining_limit == Some(0) { + return Poll::Ready(None); + } // First, emit any already-reversed batches if let Some(batch) = self.output_buffer.pop_front() { - return Poll::Ready(Some(Ok(batch))); + return Poll::Ready(Some(Ok(self.apply_limit(batch)))); } if self.done { @@ -824,7 +856,7 @@ where return Poll::Ready(Some(Err(e))); } if let Some(batch) = self.output_buffer.pop_front() { - return Poll::Ready(Some(Ok(batch))); + return Poll::Ready(Some(Ok(self.apply_limit(batch)))); } } } @@ -838,7 +870,7 @@ where return Poll::Ready(Some(Err(e))); } if let Some(batch) = self.output_buffer.pop_front() { - return Poll::Ready(Some(Ok(batch))); + return Poll::Ready(Some(Ok(self.apply_limit(batch)))); } return Poll::Ready(None); } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 8b8930fc91395..918795d6ef365 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -336,6 +336,7 @@ impl ParquetSource { mut self, table_parquet_options: TableParquetOptions, ) -> Self { + self.exact_reverse = table_parquet_options.global.enable_exact_reverse_scan; self.table_parquet_options = table_parquet_options; self } @@ -651,8 +652,10 @@ impl FileSource for ParquetSource { write!(f, "{predicate_string}")?; - // Add reverse_scan info if enabled - if self.reverse_row_groups { + // Add reverse scan info if enabled + if self.reverse_row_groups && self.reverse_rows { + write!(f, ", scan_direction=Reversed")?; + } else if self.reverse_row_groups { write!(f, ", reverse_row_groups=true")?; } diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 99f26b66d458b..30df6f911cbd1 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -1524,6 +1524,133 @@ SELECT * FROM multi_partition_parquet ORDER BY id ASC; statement ok SET datafusion.execution.target_partitions = 2; +# ============================================================================ +# Test 12: Exact Reverse Scan +# ============================================================================ +# When enable_exact_reverse_scan=true, the Sort operator is removed entirely +# and fetch is pushed down to the scan. + +# Enable exact reverse scan BEFORE creating table so ParquetSource picks it up +statement ok +SET datafusion.execution.parquet.enable_exact_reverse_scan = true; + +statement ok +CREATE TABLE exact_rev_data(id INT, value INT, name VARCHAR) AS VALUES +(1, 100, 'a'), (2, 200, 'b'), (3, 300, 'c'), (4, 400, 'd'), (5, 500, 'e'), +(6, 600, 'f'), (7, 700, 'g'), (8, 800, 'h'), (9, 900, 'i'), (10, 1000, 'j'); + +query I +COPY (SELECT * FROM exact_rev_data ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/exact_rev_data.parquet'; +---- +10 + +statement ok +CREATE EXTERNAL TABLE exact_rev_parquet(id INT, value INT, name VARCHAR) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/exact_rev_data.parquet' +WITH ORDER (id ASC); + +# Test 12.1: Sort is removed, scan_direction=Reversed (exact), no SortExec +query TT +EXPLAIN SELECT * FROM exact_rev_parquet ORDER BY id DESC; +---- +logical_plan +01)Sort: exact_rev_parquet.id DESC NULLS FIRST +02)--TableScan: exact_rev_parquet projection=[id, value, name] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, scan_direction=Reversed + +# Test 12.2: Results are correct in DESC order +query IIT +SELECT * FROM exact_rev_parquet ORDER BY id DESC; +---- +10 1000 j +9 900 i +8 800 h +7 700 g +6 600 f +5 500 e +4 400 d +3 300 c +2 200 b +1 100 a + +# Test 12.3: Sort removed, fetch pushed to scan (applied after row reversal in opener) +query TT +EXPLAIN SELECT * FROM exact_rev_parquet ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: exact_rev_parquet.id DESC NULLS FIRST, fetch=3 +02)--TableScan: exact_rev_parquet projection=[id, value, name] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, scan_direction=Reversed + +# Test 12.4: Results correct with LIMIT +query IIT +SELECT * FROM exact_rev_parquet ORDER BY id DESC LIMIT 3; +---- +10 1000 j +9 900 i +8 800 h + +# Test 12.5: OFFSET + LIMIT with exact reverse +query TT +EXPLAIN SELECT * FROM exact_rev_parquet ORDER BY id DESC LIMIT 3 OFFSET 2; +---- +logical_plan +01)Limit: skip=2, fetch=3 +02)--Sort: exact_rev_parquet.id DESC NULLS FIRST, fetch=5 +03)----TableScan: exact_rev_parquet projection=[id, value, name] +physical_plan +01)GlobalLimitExec: skip=2, fetch=3 +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], limit=5, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, scan_direction=Reversed + +# Test 12.6: Results correct with OFFSET + LIMIT +query IIT +SELECT * FROM exact_rev_parquet ORDER BY id DESC LIMIT 3 OFFSET 2; +---- +8 800 h +7 700 g +6 600 f + +# Test 12.7: ASC order should NOT use reverse scan (same direction as source) +query TT +EXPLAIN SELECT * FROM exact_rev_parquet ORDER BY id ASC LIMIT 3; +---- +logical_plan +01)Sort: exact_rev_parquet.id ASC NULLS LAST, fetch=3 +02)--TableScan: exact_rev_parquet projection=[id, value, name] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +# Test 12.8: Verify default (exact_reverse=false) keeps Sort as TopK +statement ok +SET datafusion.execution.parquet.enable_exact_reverse_scan = false; + +statement ok +CREATE EXTERNAL TABLE exact_rev_disabled_parquet(id INT, value INT, name VARCHAR) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/exact_rev_data.parquet' +WITH ORDER (id ASC); + +query TT +EXPLAIN SELECT * FROM exact_rev_disabled_parquet ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: exact_rev_disabled_parquet.id DESC NULLS FIRST, fetch=3 +02)--TableScan: exact_rev_disabled_parquet projection=[id, value, name] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], file_type=parquet, predicate=DynamicFilter [ empty ], reverse_row_groups=true + +# Cleanup exact reverse test tables +statement ok +DROP TABLE exact_rev_data; + +statement ok +DROP TABLE exact_rev_parquet; + +statement ok +DROP TABLE exact_rev_disabled_parquet; + # Cleanup statement ok DROP TABLE reversed_high; From fd2650a8c561b6010d682cc538b31264cccb0082 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 15 Apr 2026 12:46:00 +0800 Subject: [PATCH 03/10] Fix proto deserialization: add enable_exact_reverse_scan default --- datafusion/common/src/file_options/parquet_writer.rs | 3 +++ datafusion/datasource-parquet/src/opener.rs | 8 ++++---- datafusion/proto-common/src/from_proto/mod.rs | 1 + datafusion/proto/src/logical_plan/file_formats.rs | 1 + datafusion/sqllogictest/test_files/information_schema.slt | 2 ++ docs/source/user-guide/configs.md | 1 + 6 files changed, 12 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 4c33fa3883b01..4d37e663a0453 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -465,6 +465,7 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, + enable_exact_reverse_scan: defaults.enable_exact_reverse_scan, } } @@ -579,6 +580,8 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + enable_exact_reverse_scan: global_options_defaults + .enable_exact_reverse_scan, }, column_specific_options, key_value_metadata, diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index b88fb4d0689de..674cb1d1c32a3 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -585,10 +585,10 @@ impl FileOpener for ParquetOpener { // reversal (in ReversedRowGroupStream), not at the parquet reader // level. Applying limit here would read the first N rows in forward // order and then reverse them, giving wrong results. - if let Some(limit) = limit { - if !reverse_rows { - builder = builder.with_limit(limit) - } + if let Some(limit) = limit + && !reverse_rows + { + builder = builder.with_limit(limit) } if let Some(max_predicate_cache_size) = max_predicate_cache_size { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index df2865d71bca0..d64dccc13f7cf 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1013,6 +1013,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + enable_exact_reverse_scan: false, }) } } diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..85811c433bbf8 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -525,6 +525,7 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), + enable_exact_reverse_scan: false, } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 18f72cb9f7798..08ac5f0288bb9 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -242,6 +242,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 datafusion.execution.parquet.data_pagesize_limit 1048576 datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 +datafusion.execution.parquet.enable_exact_reverse_scan false datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL datafusion.execution.parquet.force_filter_selections false @@ -377,6 +378,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 (writing) Sets best datafusion.execution.parquet.data_pagesize_limit 1048576 (writing) Sets best effort maximum size of data page in bytes datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes +datafusion.execution.parquet.enable_exact_reverse_scan false (reading) If true, reverse scans produce exact descending order by reversing rows within each row group. This allows the Sort operator to be removed entirely and fetch/limit to be pushed down to the scan. If false (default), reverse scans only reverse row group order (inexact), keeping TopK above for final sorting. datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index ebd9ef728ae7b..22e0c765bbce3 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -90,6 +90,7 @@ The following configuration settings are available: | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | | datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | +| datafusion.execution.parquet.enable_exact_reverse_scan | false | (reading) If true, reverse scans produce exact descending order by reversing rows within each row group. This allows the Sort operator to be removed entirely and fetch/limit to be pushed down to the scan. If false (default), reverse scans only reverse row group order (inexact), keeping TopK above for final sorting. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | From 6ff5bdf30c507bc357aed828cd4df7d9b46f6799 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 15 Apr 2026 23:16:07 +0800 Subject: [PATCH 04/10] Fix row_selection bug in exact reverse scan + add comprehensive tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses Copilot review comment on PR #47: when `row_selection` is present (e.g. from page pruning via pushdown_filters), the parquet stream emits only the selected rows, so seeding `rg_row_counts` from `RowGroupMetaData::num_rows()` caused ReversedRowGroupStream to mis-detect row-group boundaries and silently mix batches from multiple row groups, producing wrong ordering. Fix: new `compute_selected_rows_per_rg` helper walks the RowSelection in lock-step with the row groups and computes the actual output row count per RG. Tests added: - 4 unit tests for compute_selected_rows_per_rg (no skip, spanning skips, all-skipped, short selection error) - test_exact_reverse_scan_multi_rg_produces_global_desc: verifies Inexact yields [7,8,9,4,5,6,1,2,3] while Exact yields [9..1] (globally DESC) - test_exact_reverse_scan_applies_limit_after_reversal: verifies limit=4 over [1..9] yields [9,8,7,6] (top of forward order, not first N pre-reverse) - test_exact_reverse_scan_with_row_selection_across_rgs: regression test for the row_selection bug — 3 RGs with per-RG selections yield the expected [10,9,8,7,6,5,4,3] - test_exact_reverse_scan_with_row_selection_and_limit: combined case --- datafusion/datasource-parquet/src/opener.rs | 428 +++++++++++++++++++- 1 file changed, 421 insertions(+), 7 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 674cb1d1c32a3..5dfce95e29bf0 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -184,6 +184,48 @@ impl PreparedAccessPlan { } } +/// Compute per-row-group *selected* row counts for exact reverse buffering. +/// +/// `RowSelection` is a flat sequence of `RowSelector` values (alternating +/// skip/select) applied to the concatenation of all selected row groups. +/// To know how many rows each row group will emit, we walk both sequences +/// in lock-step and accumulate the `select` portions per row group. +fn compute_selected_rows_per_rg( + row_group_indexes: &[usize], + rg_metadata: &[RowGroupMetaData], + row_selection: &parquet::arrow::arrow_reader::RowSelection, +) -> Result> { + let mut selectors = row_selection.iter(); + let mut current_remaining: usize = 0; + let mut current_skip: bool = false; + + let mut result = Vec::with_capacity(row_group_indexes.len()); + for &rg_idx in row_group_indexes { + let mut rows_left_in_rg = rg_metadata[rg_idx].num_rows() as usize; + let mut selected = 0usize; + while rows_left_in_rg > 0 { + if current_remaining == 0 { + let Some(sel) = selectors.next() else { + return Err(DataFusionError::Internal( + "RowSelection ended before covering all planned row groups" + .to_string(), + )); + }; + current_remaining = sel.row_count; + current_skip = sel.skip; + } + let consumed = rows_left_in_rg.min(current_remaining); + if !current_skip { + selected += consumed; + } + rows_left_in_rg -= consumed; + current_remaining -= consumed; + } + result.push(selected); + } + Ok(result) +} + impl FileOpener for ParquetOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { let file_range = partitioned_file.range.clone(); @@ -566,14 +608,25 @@ impl FileOpener for ParquetOpener { prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; } - // Collect per-RG row counts for exact reverse buffering + // Collect per-RG *output* row counts for exact reverse buffering. + // When `row_selection` is present (e.g. page pruning via + // pushdown_filters), the stream emits only the selected rows, so + // `RowGroupMetaData::num_rows()` would over-count and cause + // ReversedRowGroupStream to misdetect row-group boundaries. let rg_row_counts: Vec = if reverse_rows { let rg_metadata = file_metadata.row_groups(); - prepared_plan - .row_group_indexes - .iter() - .map(|&idx| rg_metadata[idx].num_rows() as usize) - .collect() + match prepared_plan.row_selection.as_ref() { + Some(row_selection) => compute_selected_rows_per_rg( + &prepared_plan.row_group_indexes, + rg_metadata, + row_selection, + )?, + None => prepared_plan + .row_group_indexes + .iter() + .map(|&idx| rg_metadata[idx].num_rows() as usize) + .collect(), + } } else { vec![] }; @@ -1218,7 +1271,9 @@ mod test { use std::pin::Pin; use std::sync::Arc; - use super::{ConstantColumns, constant_columns_from_stats}; + use super::{ + ConstantColumns, compute_selected_rows_per_rg, constant_columns_from_stats, + }; use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; @@ -1241,6 +1296,7 @@ mod test { use futures::{Stream, StreamExt}; use object_store::{ObjectStore, memory::InMemory, path::Path}; use parquet::arrow::ArrowWriter; + use parquet::file::metadata::RowGroupMetaData; use parquet::file::properties::WriterProperties; /// Builder for creating [`ParquetOpener`] instances with sensible defaults for tests. @@ -1351,6 +1407,12 @@ mod test { self } + /// Set reverse_rows flag (Exact reverse scan: per-RG buffer + row reversal). + fn with_reverse_rows(mut self, enable: bool) -> Self { + self.reverse_rows = enable; + self + } + /// Set preserve_order flag. When true, prune_by_limit is disabled. fn with_preserve_order(mut self, enable: bool) -> Self { self.preserve_order = enable; @@ -2522,4 +2584,356 @@ mod test { ); } } + + // ============================================================================ + // Exact reverse scan tests + // ============================================================================ + // + // These cover the `reverse_rows=true` path (per-RG buffer + row reversal) that + // is layered on top of `reverse_row_groups`: + // + // reverse_row_groups only: Inexact — RGs reversed, rows within RG still ASC. + // reverse_row_groups + reverse_rows: Exact — globally DESC. + // + // The helper `compute_selected_rows_per_rg` is also unit-tested below, since a + // `RowSelection` produced by page pruning can make the parquet stream emit + // fewer rows per RG than `RowGroupMetaData::num_rows()` would suggest. + + /// Build a `RowSelection` from a flat list of `(skip, row_count)` pairs. + fn row_selection_from_pairs( + pairs: &[(bool, usize)], + ) -> parquet::arrow::arrow_reader::RowSelection { + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + let selectors: Vec = pairs + .iter() + .map(|&(skip, n)| { + if skip { + RowSelector::skip(n) + } else { + RowSelector::select(n) + } + }) + .collect(); + RowSelection::from(selectors) + } + + /// Build a stub `RowGroupMetaData` with the given row count. + /// + /// `compute_selected_rows_per_rg` only reads `num_rows()` from the metadata, + /// so we can construct a minimal one with just that field populated. + fn stub_rg(num_rows: i64) -> RowGroupMetaData { + use parquet::schema::types::{SchemaDescriptor, Type}; + let schema = Arc::new(SchemaDescriptor::new(Arc::new( + Type::group_type_builder("schema").build().unwrap(), + ))); + RowGroupMetaData::builder(schema) + .set_num_rows(num_rows) + .build() + .unwrap() + } + + #[test] + fn test_compute_selected_rows_per_rg_no_skip() { + // Selection that selects everything → output == raw num_rows per RG. + let rgs = vec![stub_rg(4), stub_rg(6), stub_rg(5)]; + let sel = row_selection_from_pairs(&[(false, 15)]); + let counts = compute_selected_rows_per_rg(&[0, 1, 2], &rgs, &sel).unwrap(); + assert_eq!(counts, vec![4, 6, 5]); + } + + #[test] + fn test_compute_selected_rows_per_rg_skip_spanning_rgs() { + // RG sizes: [4, 6, 5] = 15 rows total. + // Selection: skip 5, select 7, skip 3 → rows [6..=12] chosen. + // RG0 (rows 0..4) : skip all 4 → 0 selected + // RG1 (rows 4..10) : skip 1, select 5 → 5 selected + // RG2 (rows 10..15): select 2, skip 3 → 2 selected + let rgs = vec![stub_rg(4), stub_rg(6), stub_rg(5)]; + let sel = row_selection_from_pairs(&[(true, 5), (false, 7), (true, 3)]); + let counts = compute_selected_rows_per_rg(&[0, 1, 2], &rgs, &sel).unwrap(); + assert_eq!(counts, vec![0, 5, 2]); + } + + #[test] + fn test_compute_selected_rows_per_rg_all_skipped() { + // Every row is skipped — each RG emits 0 rows. + let rgs = vec![stub_rg(3), stub_rg(3)]; + let sel = row_selection_from_pairs(&[(true, 6)]); + let counts = compute_selected_rows_per_rg(&[0, 1], &rgs, &sel).unwrap(); + assert_eq!(counts, vec![0, 0]); + } + + #[test] + fn test_compute_selected_rows_per_rg_short_selection_errors() { + // Selection covers only 5 rows but RGs sum to 10 → must error instead of + // silently returning garbage counts. + let rgs = vec![stub_rg(5), stub_rg(5)]; + let sel = row_selection_from_pairs(&[(false, 5)]); + let err = compute_selected_rows_per_rg(&[0, 1], &rgs, &sel).unwrap_err(); + assert!( + format!("{err}").contains("RowSelection ended before"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_exact_reverse_scan_multi_rg_produces_global_desc() { + // Three RGs, each with an ascending run. With reverse_row_groups + + // reverse_rows, the output must be globally descending. + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(4), Some(5), Some(6)])).unwrap(); + let batch3 = + record_batch!(("a", Int32, vec![Some(7), Some(8), Some(9)])).unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_size(3) // one RG per batch + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + + let schema = batch1.schema(); + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + + // Inexact (only RGs reversed; rows within RG still ASC). + let inexact = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_reverse_row_groups(true) + .build(); + let stream = inexact.open(file.clone()).unwrap().await.unwrap(); + let inexact_values = collect_int32_values(stream).await; + assert_eq!( + inexact_values, + vec![7, 8, 9, 4, 5, 6, 1, 2, 3], + "Inexact: RGs reversed but rows within RG stay ASC" + ); + + // Exact (reverse_rows adds per-RG row reversal → globally DESC). + let exact = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_reverse_row_groups(true) + .with_reverse_rows(true) + .build(); + let stream = exact.open(file.clone()).unwrap().await.unwrap(); + let exact_values = collect_int32_values(stream).await; + assert_eq!( + exact_values, + vec![9, 8, 7, 6, 5, 4, 3, 2, 1], + "Exact: globally sorted DESC" + ); + } + + #[tokio::test] + async fn test_exact_reverse_scan_applies_limit_after_reversal() { + // With exact reverse + limit, the limit must come from the *end* of the + // logical forward order, not the first N rows pre-reversal. + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(4), Some(5), Some(6)])).unwrap(); + let batch3 = + record_batch!(("a", Int32, vec![Some(7), Some(8), Some(9)])).unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_size(3) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + + let schema = batch1.schema(); + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_reverse_row_groups(true) + .with_reverse_rows(true) + .with_limit(Some(4)) + .build(); + let stream = opener.open(file).unwrap().await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!( + values, + vec![9, 8, 7, 6], + "Limit must be applied AFTER row reversal; \ + applying it at the parquet reader layer would produce [1,2,3,4] \ + reversed to [4,3,2,1] — wrong." + ); + } + + #[tokio::test] + async fn test_exact_reverse_scan_with_row_selection_across_rgs() { + // Regression test for copilot review comment #2: when `row_selection` + // (e.g. from page pruning / pushdown filters) causes the stream to emit + // fewer rows per RG than `num_rows()` suggests, `ReversedRowGroupStream` + // must still detect RG boundaries correctly. Before the fix, + // `rg_row_counts` was seeded from `RowGroupMetaData::num_rows()` and the + // boundary detector drifted, silently mixing batches from multiple RGs. + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + // Three RGs of 4 rows each. Each RG's rows are ASC (and so are the RGs + // relative to one another), so forward scan = [1..12] and any correct + // reverse scan over the selected rows must be DESC. + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3), Some(4)])) + .unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(5), Some(6), Some(7), Some(8)])) + .unwrap(); + let batch3 = + record_batch!(("a", Int32, vec![Some(9), Some(10), Some(11), Some(12)])) + .unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_size(4) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + + let schema = batch1.schema(); + + // Attach a ParquetAccessPlan with a per-RG RowSelection: + // RG0 : skip first 2, select last 2 → selects rows {3, 4} + // RG1 : select all → selects rows {5, 6, 7, 8} + // RG2 : select first 2, skip last 2 → selects rows {9, 10} + // + // Exact reverse over this selection must return [10, 9, 8, 7, 6, 5, 4, 3]. + use crate::ParquetAccessPlan; + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + + let mut access_plan = ParquetAccessPlan::new_all(3); + access_plan.scan_selection( + 0, + RowSelection::from(vec![RowSelector::skip(2), RowSelector::select(2)]), + ); + access_plan.scan_selection( + 2, + RowSelection::from(vec![RowSelector::select(2), RowSelector::skip(2)]), + ); + + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ) + .with_extensions(Arc::new(access_plan)); + + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_reverse_row_groups(true) + .with_reverse_rows(true) + .build(); + let stream = opener.open(file).unwrap().await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!( + values, + vec![10, 9, 8, 7, 6, 5, 4, 3], + "Exact reverse must respect row_selection when computing RG boundaries" + ); + } + + #[tokio::test] + async fn test_exact_reverse_scan_with_row_selection_and_limit() { + // Exact reverse + row_selection + limit. Must produce the top-N in DESC + // order taken from the selected rows (not the unselected ones). + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3), Some(4)])) + .unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(5), Some(6), Some(7), Some(8)])) + .unwrap(); + let props = WriterProperties::builder() + .set_max_row_group_size(4) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2], + Some(props), + ) + .await; + + let schema = batch1.schema(); + + // Select only rows {2, 3, 6, 7}. + use crate::ParquetAccessPlan; + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + let mut access_plan = ParquetAccessPlan::new_all(2); + access_plan.scan_selection( + 0, + RowSelection::from(vec![ + RowSelector::skip(1), + RowSelector::select(2), + RowSelector::skip(1), + ]), + ); + access_plan.scan_selection( + 1, + RowSelection::from(vec![ + RowSelector::skip(1), + RowSelector::select(2), + RowSelector::skip(1), + ]), + ); + + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ) + .with_extensions(Arc::new(access_plan)); + + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_reverse_row_groups(true) + .with_reverse_rows(true) + .with_limit(Some(3)) + .build(); + let stream = opener.open(file).unwrap().await.unwrap(); + let values = collect_int32_values(stream).await; + assert_eq!( + values, + vec![7, 6, 3], + "top 3 of {{2, 3, 6, 7}} in DESC order" + ); + } } From dbbdf72ac6dc0d22e2d1acc531d04c6ec7ffe2f9 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 17 Apr 2026 16:56:03 +0800 Subject: [PATCH 05/10] Fix empty RG handling in ReversedRowGroupStream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When RowSelection skips all rows in a row group, rg_row_counts has a 0 entry. Without this fix, rows_remaining_in_rg=0 causes the first batch from the next real RG to immediately trigger flush_buffer(), attributing that batch to the wrong (empty) RG and splitting the real RG's batches across two flush cycles — corrupting output order. Fix: skip RGs with 0 selected rows in both new() (for leading empty RGs) and flush_buffer() (for middle/trailing empty RGs). Tests: - test_exact_reverse_scan_with_empty_rg_from_row_selection: 3 RGs where middle RG is fully skipped, verifies [12..9, 4..1] order - test_compute_selected_rows_per_rg_with_fully_skipped_middle_rg: unit test confirming rg_row_counts=[4,0,4] for fully skipped middle RG --- datafusion/datasource-parquet/src/opener.rs | 100 +++++++++++++++++++- 1 file changed, 97 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 5dfce95e29bf0..15ceed69986cb 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -813,11 +813,19 @@ struct ReversedRowGroupStream { impl ReversedRowGroupStream { fn new(inner: S, rg_row_counts: Vec, limit: Option) -> Self { - let rows_remaining = rg_row_counts.first().copied().unwrap_or(0); + // Skip leading empty RGs (all rows filtered by RowSelection). + // Without this, rows_remaining_in_rg=0 causes the first batch from + // the next real RG to immediately trigger flush_buffer(), attributing + // it to the wrong (empty) RG. + let mut current_rg = 0; + while current_rg < rg_row_counts.len() && rg_row_counts[current_rg] == 0 { + current_rg += 1; + } + let rows_remaining = rg_row_counts.get(current_rg).copied().unwrap_or(0); Self { inner, rg_row_counts, - current_rg: 0, + current_rg, rows_remaining_in_rg: rows_remaining, buffer: Vec::new(), output_buffer: VecDeque::new(), @@ -859,8 +867,16 @@ impl ReversedRowGroupStream { let reversed = arrow::compute::take_record_batch(&batch, &indices)?; self.output_buffer.push_back(reversed); } - // Advance to next row group + // Advance to next row group, skipping any empty RGs (all rows + // filtered by RowSelection). Without this skip, rows_remaining_in_rg=0 + // would cause the next batch to immediately trigger another flush, + // splitting a real RG's batches across two flush cycles. self.current_rg += 1; + while self.current_rg < self.rg_row_counts.len() + && self.rg_row_counts[self.current_rg] == 0 + { + self.current_rg += 1; + } self.rows_remaining_in_rg = self .rg_row_counts .get(self.current_rg) @@ -2936,4 +2952,82 @@ mod test { "top 3 of {{2, 3, 6, 7}} in DESC order" ); } + + /// Regression test: when RowSelection skips ALL rows in an RG (empty RG), + /// `rg_row_counts` has a 0 entry. Without the skip-empty-RG fix, + /// `rows_remaining_in_rg=0` causes the first batch from the next real RG + /// to immediately trigger `flush_buffer()`, attributing that batch to the + /// wrong (empty) RG and corrupting the output order. + #[tokio::test] + async fn test_exact_reverse_scan_with_empty_rg_from_row_selection() { + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + // Three RGs of 4 rows each. + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3), Some(4)])) + .unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(5), Some(6), Some(7), Some(8)])) + .unwrap(); + let batch3 = + record_batch!(("a", Int32, vec![Some(9), Some(10), Some(11), Some(12)])) + .unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_size(4) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + + let schema = batch1.schema(); + + // RG0: select all 4 rows → {1,2,3,4} + // RG1: skip ALL rows → empty (0 selected) + // RG2: select all 4 rows → {9,10,11,12} + use crate::ParquetAccessPlan; + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + + let mut access_plan = ParquetAccessPlan::new_all(3); + // RG1: skip all 4 rows + access_plan.scan_selection(1, RowSelection::from(vec![RowSelector::skip(4)])); + + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ) + .with_extensions(Arc::new(access_plan)); + + let opener = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_reverse_row_groups(true) + .with_reverse_rows(true) + .build(); + let stream = opener.open(file).unwrap().await.unwrap(); + let values = collect_int32_values(stream).await; + + // Reversed: RG2 first [12,11,10,9], skip empty RG1, then RG0 [4,3,2,1] + assert_eq!( + values, + vec![12, 11, 10, 9, 4, 3, 2, 1], + "empty RG (all rows skipped) must be handled without corrupting order" + ); + } + + #[test] + fn test_compute_selected_rows_per_rg_with_fully_skipped_middle_rg() { + // RG sizes: [4, 4, 4]. RG1 fully skipped. + // Selection: select 4, skip 4, select 4 + let rgs = vec![stub_rg(4), stub_rg(4), stub_rg(4)]; + let sel = row_selection_from_pairs(&[(false, 4), (true, 4), (false, 4)]); + let counts = compute_selected_rows_per_rg(&[0, 1, 2], &rgs, &sel).unwrap(); + assert_eq!(counts, vec![4, 0, 4], "middle RG fully skipped → 0 rows"); + } } From 549772b87c283bbdf0186b19eb95b8f072493a76 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 17 Apr 2026 17:02:07 +0800 Subject: [PATCH 06/10] Add public with_reverse_rows/reverse_rows accessors on ParquetSource --- datafusion/datasource-parquet/src/source.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 918795d6ef365..0cfc75a5ee023 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -511,6 +511,21 @@ impl ParquetSource { pub fn exact_reverse(&self) -> bool { self.exact_reverse } + + /// Returns whether rows within each batch are reversed. + pub fn reverse_rows(&self) -> bool { + self.reverse_rows + } + + /// Enable or disable row reversal within each batch. + /// + /// This is normally set internally by `try_reverse_output` when + /// `exact_reverse` is enabled, but must be set explicitly when + /// restoring a plan from proto serialization (RemoteExec path). + pub fn with_reverse_rows(mut self, reverse_rows: bool) -> Self { + self.reverse_rows = reverse_rows; + self + } } /// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit From 67e72aff036454851dd1596cfd4adcdb7ba9a532 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 23 Apr 2026 13:57:54 +0800 Subject: [PATCH 07/10] Fix: exact reverse works correctly with pushdown_filters + RowFilter When pushdown_filters is enabled, RowFilter may reduce actual rows per row group below what rg_row_counts predicts. ReversedRowGroupStream handles this correctly: delayed RG boundary detection means multiple RGs may buffer together, but all remaining batches are flushed and reversed when the stream ends. Memory cost is O(all data) instead of O(largest RG), acceptable for LIMIT queries. Added SLT test verifying exact reverse with pushdown_filters=true produces correct DESC results with and without LIMIT. --- datafusion/datasource-parquet/src/source.rs | 8 +++ .../sqllogictest/test_files/sort_pushdown.slt | 51 +++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 0cfc75a5ee023..b9b824d6736ff 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -856,6 +856,14 @@ impl FileSource for ParquetSource { // Exact: reverse both row groups and rows within each batch, // giving globally sorted output. This allows the Sort operator // to be removed entirely and fetch to be pushed down to the scan. + // + // Note: when pushdown_filters is enabled, RowFilter may reduce + // actual rows below what rg_row_counts predicts. This causes + // ReversedRowGroupStream's RG boundary detection to delay + // (multiple RGs may buffer together), but correctness is preserved + // because all buffered batches are flushed and reversed when the + // stream ends. Memory cost becomes O(all data) instead of + // O(largest RG), which is acceptable for LIMIT queries. let mut source = self.clone().with_reverse_row_groups(true); source.reverse_rows = true; Arc::new(source) diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 30df6f911cbd1..9b4b6db0c37a1 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -1641,6 +1641,57 @@ physical_plan 01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], file_type=parquet, predicate=DynamicFilter [ empty ], reverse_row_groups=true +# Test 12.9: Exact reverse works correctly with pushdown_filters enabled. +# RowFilter may reduce actual rows below rg_row_counts, but +# ReversedRowGroupStream handles this by flushing all remaining +# buffered batches when the stream ends. + +statement ok +SET datafusion.execution.parquet.enable_exact_reverse_scan = true; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +statement ok +CREATE EXTERNAL TABLE exact_rev_pushdown_parquet(id INT, value INT, name VARCHAR) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/exact_rev_data.parquet' +WITH ORDER (id ASC); + +# Sort removed (Exact), scan_direction=Reversed, with predicate pushed down +query TT +EXPLAIN SELECT * FROM exact_rev_pushdown_parquet WHERE value > 500 ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: exact_rev_pushdown_parquet.id DESC NULLS FIRST, fetch=3 +02)--Filter: exact_rev_pushdown_parquet.value > Int32(500) +03)----TableScan: exact_rev_pushdown_parquet projection=[id, value, name], partial_filters=[exact_rev_pushdown_parquet.value > Int32(500)] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_data.parquet]]}, projection=[id, value, name], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, predicate=value@1 > 500, scan_direction=Reversed, pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 500, required_guarantees=[] + +# Results correct in DESC order with filter +query IIT +SELECT * FROM exact_rev_pushdown_parquet WHERE value > 500 ORDER BY id DESC LIMIT 3; +---- +10 1000 j +9 900 i +8 800 h + +# Without LIMIT — all filtered rows in DESC order +query IIT +SELECT * FROM exact_rev_pushdown_parquet WHERE value > 500 ORDER BY id DESC; +---- +10 1000 j +9 900 i +8 800 h +7 700 g +6 600 f + +statement ok +DROP TABLE exact_rev_pushdown_parquet; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = false; + # Cleanup exact reverse test tables statement ok DROP TABLE exact_rev_data; From 381faa1a0164ba6aa714d8477bcc86fab22d090a Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 23 Apr 2026 14:14:32 +0800 Subject: [PATCH 08/10] Refactor: per-RG independent reverse scan (modeled after Atlas ReverseParquetSource) Replace ReversedRowGroupStream's rg_row_counts boundary detection with per-row-group independent reading. Each RG gets its own ParquetRecordBatchStreamBuilder with RowFilter applied independently, then batches are reversed per-RG. This fixes the correctness issue where RowFilter reduces actual rows below rg_row_counts predictions. Memory: O(largest RG), same as Atlas's ReverseParquetSource. Added SLT test for exact reverse + pushdown_filters + predicate. --- datafusion/datasource-parquet/src/opener.rs | 452 ++++++++++++-------- 1 file changed, 285 insertions(+), 167 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 15ceed69986cb..9143e813d2b8d 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -324,6 +324,9 @@ impl FileOpener for ParquetOpener { let reverse_row_groups = self.reverse_row_groups; let reverse_rows = self.reverse_rows; + let partition_index = self.partition_index; + let parquet_file_reader_factory = Arc::clone(&self.parquet_file_reader_factory); + let metrics = self.metrics.clone(); Ok(Box::pin(async move { #[cfg(feature = "parquet_encryption")] let file_decryption_properties = encryption_context @@ -478,6 +481,13 @@ impl FileOpener for ParquetOpener { metadata_timer.stop(); + // Clone metadata before moving into builder — needed for per-RG + // reverse scan which creates independent builders per row group. + let reader_metadata_for_reverse = if reverse_rows { + Some(Arc::new(reader_metadata.clone())) + } else { + None + }; let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( async_file_reader, reader_metadata, @@ -487,6 +497,9 @@ impl FileOpener for ParquetOpener { let mask = ProjectionMask::roots(builder.parquet_schema(), indices); + // Save the physical predicate for per-RG reverse scan filter pushdown + let pushdown_predicate = predicate.clone(); + // Filter pushdown: evaluate predicates during scan if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { let row_filter = row_filter::build_row_filter( @@ -608,180 +621,232 @@ impl FileOpener for ParquetOpener { prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; } - // Collect per-RG *output* row counts for exact reverse buffering. - // When `row_selection` is present (e.g. page pruning via - // pushdown_filters), the stream emits only the selected rows, so - // `RowGroupMetaData::num_rows()` would over-count and cause - // ReversedRowGroupStream to misdetect row-group boundaries. - let rg_row_counts: Vec = if reverse_rows { - let rg_metadata = file_metadata.row_groups(); - match prepared_plan.row_selection.as_ref() { - Some(row_selection) => compute_selected_rows_per_rg( - &prepared_plan.row_group_indexes, - rg_metadata, - row_selection, - )?, - None => prepared_plan - .row_group_indexes - .iter() - .map(|&idx| rg_metadata[idx].num_rows() as usize) - .collect(), + // When reverse_rows is enabled, read each row group independently + // and reverse rows within it. This avoids the rg_row_counts boundary + // detection issue when RowFilter reduces actual row counts. + // Memory: O(largest RG). Modeled after Atlas's ReverseParquetSource. + if reverse_rows { + let rg_indexes = prepared_plan.row_group_indexes.clone(); + let files_ranges_pruned_statistics = + file_metrics.files_ranges_pruned_statistics.clone(); + let reader_metadata = reader_metadata_for_reverse + .expect("reader_metadata_for_reverse set when reverse_rows=true"); + + let physical_file_schema_for_filter = Arc::clone(&physical_file_schema); + let file_metrics_for_rg = file_metrics.clone(); + + let stream = futures::stream::try_unfold( + (rg_indexes, limit), + move |(mut rg_indexes, mut remaining_limit)| { + let reader_metadata = Arc::clone(&reader_metadata); + let mask = mask.clone(); + let output_schema = Arc::clone(&output_schema); + let projection = projection.clone(); + let partitioned_file = partitioned_file.clone(); + let parquet_file_reader_factory = + Arc::clone(&parquet_file_reader_factory); + let metrics = metrics.clone(); + let pushdown_predicate = pushdown_predicate.clone(); + let physical_file_schema = + Arc::clone(&physical_file_schema_for_filter); + let file_metrics = file_metrics_for_rg.clone(); + + async move { + // Pop from back — RGs are already in reversed order + let rg_idx = match rg_indexes.pop() { + Some(idx) if remaining_limit != Some(0) => idx, + _ => { + return Ok::<_, DataFusionError>(None); + } + }; + + // Create a fresh reader for this single RG + let reader: Box = + parquet_file_reader_factory.create_reader( + partition_index, + partitioned_file.clone(), + metadata_size_hint, + &metrics, + )?; + + let mut rg_builder = + ParquetRecordBatchStreamBuilder::new_with_metadata( + reader, + reader_metadata.as_ref().clone(), + ); + + // Apply predicate pushdown to per-RG builder + if let Some(ref pred) = pushdown_filters + .then_some(pushdown_predicate.as_ref()) + .flatten() + { + let row_filter = row_filter::build_row_filter( + pred, + &physical_file_schema, + rg_builder.metadata(), + reorder_predicates, + &file_metrics, + ); + match row_filter { + Ok(Some(filter)) => { + rg_builder = rg_builder.with_row_filter(filter); + } + Ok(None) => {} + Err(_) => {} + }; + } + + if let Some(max_predicate_cache_size) = max_predicate_cache_size + { + rg_builder = rg_builder + .with_max_predicate_cache_size(max_predicate_cache_size); + } + + let rg_stream = rg_builder + .with_projection(mask.clone()) + .with_batch_size(batch_size) + .with_row_groups(vec![rg_idx]) + .build()?; + + let stream_schema = Arc::clone(rg_stream.schema()); + let replace_schema = !stream_schema.eq(&output_schema); + let projection = projection + .try_map_exprs(|expr| { + reassign_expr_columns(expr, &stream_schema) + })?; + let projector = projection.make_projector(&stream_schema)?; + + // Read all batches for this RG, apply projection + let batches: Vec = rg_stream + .map_err(DataFusionError::from) + .map(|b| { + b.and_then(|b| { + let b = projector.project_batch(&b)?; + if replace_schema { + adapt_batch_schema(&b, &output_schema) + } else { + Ok(b) + } + }) + }) + .try_collect() + .await?; + + // Reverse each batch, then reverse batch order + let mut reversed = Vec::with_capacity(batches.len()); + for batch in batches.into_iter().rev() { + if batch.num_rows() <= 1 { + reversed.push(batch); + continue; + } + let indices = + arrow::array::UInt32Array::from_iter_values( + (0..batch.num_rows() as u32).rev(), + ); + reversed.push( + arrow::compute::take_record_batch(&batch, &indices)?, + ); + } + + // Apply limit across RGs + if let Some(ref mut lim) = remaining_limit { + let mut limited = Vec::new(); + for batch in reversed { + if *lim == 0 { + break; + } + let take = batch.num_rows().min(*lim); + limited.push(batch.slice(0, take)); + *lim -= take; + } + reversed = limited; + } + + Ok(Some(( + futures::stream::iter( + reversed.into_iter().map(Ok::<_, DataFusionError>), + ), + (rg_indexes, remaining_limit), + ))) + } + }, + ) + .try_flatten() + .boxed(); + + if let Some(file_pruner) = file_pruner { + Ok(EarlyStoppingStream::new( + stream, + file_pruner, + files_ranges_pruned_statistics, + ) + .boxed()) + } else { + Ok(stream.boxed()) } } else { - vec![] - }; + // Non-reverse path: one stream for all RGs - // Apply the prepared plan to the builder - builder = prepared_plan.apply_to_builder(builder); + // Apply the prepared plan to the builder + builder = prepared_plan.apply_to_builder(builder); - // When reverse_rows is enabled, limit must be applied AFTER row - // reversal (in ReversedRowGroupStream), not at the parquet reader - // level. Applying limit here would read the first N rows in forward - // order and then reverse them, giving wrong results. - if let Some(limit) = limit - && !reverse_rows - { - builder = builder.with_limit(limit) - } + if let Some(limit) = limit { + builder = builder.with_limit(limit) + } - if let Some(max_predicate_cache_size) = max_predicate_cache_size { - builder = builder.with_max_predicate_cache_size(max_predicate_cache_size); - } + if let Some(max_predicate_cache_size) = max_predicate_cache_size { + builder = builder.with_max_predicate_cache_size(max_predicate_cache_size); + } - // metrics from the arrow reader itself - let arrow_reader_metrics = ArrowReaderMetrics::enabled(); - - let stream = builder - .with_projection(mask) - .with_batch_size(batch_size) - .with_metrics(arrow_reader_metrics.clone()) - .build()?; - - let files_ranges_pruned_statistics = - file_metrics.files_ranges_pruned_statistics.clone(); - let predicate_cache_inner_records = - file_metrics.predicate_cache_inner_records.clone(); - let predicate_cache_records = file_metrics.predicate_cache_records.clone(); - - let stream_schema = Arc::clone(stream.schema()); - // Check if we need to replace the schema to handle things like differing nullability or metadata. - // See note below about file vs. output schema. - let replace_schema = !stream_schema.eq(&output_schema); - - // Rebase column indices to match the narrowed stream schema. - // The projection expressions have indices based on physical_file_schema, - // but the stream only contains the columns selected by the ProjectionMask. - let projection = projection - .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; - - let projector = projection.make_projector(&stream_schema)?; - - let stream = stream.map_err(DataFusionError::from).map(move |b| { - b.and_then(|mut b| { - copy_arrow_reader_metrics( - &arrow_reader_metrics, - &predicate_cache_inner_records, - &predicate_cache_records, - ); - // Note: per-batch row reversal is handled by ReversedRowGroupStream - // (wraps the stream below), NOT here. Reversing per-batch here would - // double-reverse when combined with the RG-level buffer+reverse. - b = projector.project_batch(&b)?; - if replace_schema { - // Ensure the output batch has the expected schema. - // - // In DataFusion 51, SchemaAdapter::map_batch() handled - // schema mismatches by casting each column via - // arrow::compute::cast_with_options(). DF 52 removed - // SchemaAdapter, so we restore that behaviour here. - // - // This handles: - // - Schema/field level metadata differences - // - Nullability mismatches (OPTIONAL vs NOT NULL) - // - Type mismatches from schema evolution (e.g. Utf8 → Date32) - // - List/Struct inner field name/nullability differences - // (e.g. List(Field("conditions", Int32, false)) vs - // List(Field("element", Int32, true))) - let (stream_schema, arrays, num_rows) = b.into_parts(); - let adapted_arrays: Vec = arrays - .iter() - .enumerate() - .map(|(i, array)| { - let target_type = output_schema.field(i).data_type(); - if array.data_type() == target_type { - Ok(Arc::clone(array)) - } else { - // Try cast first (handles value-level conversions - // like Utf8 → Date32) - let casted = if arrow::compute::can_cast_types( - array.data_type(), - target_type, - ) { - arrow::compute::cast(array, target_type)? - } else { - Arc::clone(array) - }; - // If types still differ after cast (e.g. List inner - // field name/nullability), rebuild with target type - if casted.data_type() != target_type { - let data = casted - .to_data() - .into_builder() - .data_type(target_type.clone()) - .build() - .map_err(|e| { - DataFusionError::ArrowError(Box::new(e), Some(format!( - "Failed to adapt column '{}' from {} to {}", - stream_schema.field(i).name(), - array.data_type(), - target_type, - ))) - })?; - Ok(arrow::array::make_array(data)) - } else { - Ok(casted) - } - } - }) - .collect::>>()?; - // Note: nullability handling is left to the caller - // (e.g. atlas's adapt_table_schema_for_parquet which - // forces file columns nullable without touching partition - // columns). We only handle type/field-name adaptation here. - let options = - RecordBatchOptions::new().with_row_count(Some(num_rows)); - RecordBatch::try_new_with_options( - Arc::clone(&output_schema), - adapted_arrays, - &options, - ) - .map_err(Into::into) - } else { - Ok(b) - } - }) - }); - - // When exact reverse is enabled, wrap the stream to buffer - // and reverse rows per row group. Memory cost: O(largest_RG). - // The limit is applied here (after reversal) instead of at the - // parquet reader level so that we get the correct reversed rows. - let stream: futures::stream::BoxStream<'static, Result> = - if reverse_rows { - ReversedRowGroupStream::new(stream, rg_row_counts, limit).boxed() - } else { - stream.boxed() - }; + let arrow_reader_metrics = ArrowReaderMetrics::enabled(); + + let stream = builder + .with_projection(mask) + .with_batch_size(batch_size) + .with_metrics(arrow_reader_metrics.clone()) + .build()?; + + let files_ranges_pruned_statistics = + file_metrics.files_ranges_pruned_statistics.clone(); + let predicate_cache_inner_records = + file_metrics.predicate_cache_inner_records.clone(); + let predicate_cache_records = file_metrics.predicate_cache_records.clone(); + + let stream_schema = Arc::clone(stream.schema()); + let replace_schema = !stream_schema.eq(&output_schema); + let projection = projection + .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; + let projector = projection.make_projector(&stream_schema)?; + + let stream = stream.map_err(DataFusionError::from).map(move |b| { + b.and_then(|mut b| { + copy_arrow_reader_metrics( + &arrow_reader_metrics, + &predicate_cache_inner_records, + &predicate_cache_records, + ); + b = projector.project_batch(&b)?; + if replace_schema { + adapt_batch_schema(&b, &output_schema) + } else { + Ok(b) + } + }) + }); - if let Some(file_pruner) = file_pruner { - Ok(EarlyStoppingStream::new( - stream, - file_pruner, - files_ranges_pruned_statistics, - ) - .boxed()) - } else { - Ok(stream.boxed()) + let stream: futures::stream::BoxStream<'static, Result> = + stream.boxed(); + + if let Some(file_pruner) = file_pruner { + Ok(EarlyStoppingStream::new( + stream, + file_pruner, + files_ranges_pruned_statistics, + ) + .boxed()) + } else { + Ok(stream.boxed()) + } } })) } @@ -950,6 +1015,59 @@ where /// Copies metrics from ArrowReaderMetrics (the metrics collected by the /// arrow-rs parquet reader) to the parquet file metrics for DataFusion +/// Adapt a RecordBatch to match the expected output schema. +/// Handles type mismatches from schema evolution, nullability differences, +/// and List/Struct inner field name differences. +fn adapt_batch_schema( + batch: &RecordBatch, + output_schema: &SchemaRef, +) -> Result { + let (stream_schema, arrays, num_rows) = batch.clone().into_parts(); + let adapted_arrays: Vec = arrays + .iter() + .enumerate() + .map(|(i, array)| { + let target_type = output_schema.field(i).data_type(); + if array.data_type() == target_type { + Ok(Arc::clone(array)) + } else { + let casted = if arrow::compute::can_cast_types( + array.data_type(), + target_type, + ) { + arrow::compute::cast(array, target_type)? + } else { + Arc::clone(array) + }; + if casted.data_type() != target_type { + let data = casted + .to_data() + .into_builder() + .data_type(target_type.clone()) + .build() + .map_err(|e| { + DataFusionError::ArrowError( + Box::new(e), + Some(format!( + "Failed to adapt column '{}' from {} to {}", + stream_schema.field(i).name(), + array.data_type(), + target_type, + )), + ) + })?; + Ok(arrow::array::make_array(data)) + } else { + Ok(casted) + } + } + }) + .collect::>>()?; + let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); + RecordBatch::try_new_with_options(Arc::clone(output_schema), adapted_arrays, &options) + .map_err(Into::into) +} + fn copy_arrow_reader_metrics( arrow_reader_metrics: &ArrowReaderMetrics, predicate_cache_inner_records: &Count, From bdac5d60032a02b92b1bde305888b5e86379924a Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 23 Apr 2026 14:21:31 +0800 Subject: [PATCH 09/10] Fix: preserve reverse_row_groups/reverse_rows across proto roundtrip When PushdownSort removes SortExec and sets reverse_row_groups=true / reverse_rows=true on ParquetSource, these runtime flags must survive proto serialization. Without this, remote executors (RemoteExec) would deserialize the plan without reverse scanning, producing wrong order. Added reverse_row_groups and reverse_rows fields to ParquetScanExecNode proto message, serialized on encode and restored on decode. --- datafusion/proto/proto/datafusion.proto | 6 ++++ datafusion/proto/src/generated/pbjson.rs | 36 +++++++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 4 +++ datafusion/proto/src/physical_plan/mod.rs | 10 +++++++ 4 files changed, 56 insertions(+) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index bd7dd3a6aff3c..3e22b170466fc 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1058,6 +1058,12 @@ message ParquetScanExecNode { PhysicalExprNode predicate = 3; datafusion_common.TableParquetOptions parquet_options = 4; + + // Runtime reverse-scan flags set by PushdownSort optimizer. + // Must be preserved across proto roundtrips so that remote executors + // produce correctly reversed output after SortExec removal. + bool reverse_row_groups = 5; + bool reverse_rows = 6; } message CsvScanExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index e269606d163a3..ef68c4a3b09bf 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -13739,6 +13739,12 @@ impl serde::Serialize for ParquetScanExecNode { if self.parquet_options.is_some() { len += 1; } + if self.reverse_row_groups { + len += 1; + } + if self.reverse_rows { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.ParquetScanExecNode", len)?; if let Some(v) = self.base_conf.as_ref() { struct_ser.serialize_field("baseConf", v)?; @@ -13749,6 +13755,12 @@ impl serde::Serialize for ParquetScanExecNode { if let Some(v) = self.parquet_options.as_ref() { struct_ser.serialize_field("parquetOptions", v)?; } + if self.reverse_row_groups { + struct_ser.serialize_field("reverseRowGroups", &self.reverse_row_groups)?; + } + if self.reverse_rows { + struct_ser.serialize_field("reverseRows", &self.reverse_rows)?; + } struct_ser.end() } } @@ -13764,6 +13776,10 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { "predicate", "parquet_options", "parquetOptions", + "reverse_row_groups", + "reverseRowGroups", + "reverse_rows", + "reverseRows", ]; #[allow(clippy::enum_variant_names)] @@ -13771,6 +13787,8 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { BaseConf, Predicate, ParquetOptions, + ReverseRowGroups, + ReverseRows, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -13795,6 +13813,8 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { "baseConf" | "base_conf" => Ok(GeneratedField::BaseConf), "predicate" => Ok(GeneratedField::Predicate), "parquetOptions" | "parquet_options" => Ok(GeneratedField::ParquetOptions), + "reverseRowGroups" | "reverse_row_groups" => Ok(GeneratedField::ReverseRowGroups), + "reverseRows" | "reverse_rows" => Ok(GeneratedField::ReverseRows), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -13817,6 +13837,8 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { let mut base_conf__ = None; let mut predicate__ = None; let mut parquet_options__ = None; + let mut reverse_row_groups__ = None; + let mut reverse_rows__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::BaseConf => { @@ -13837,12 +13859,26 @@ impl<'de> serde::Deserialize<'de> for ParquetScanExecNode { } parquet_options__ = map_.next_value()?; } + GeneratedField::ReverseRowGroups => { + if reverse_row_groups__.is_some() { + return Err(serde::de::Error::duplicate_field("reverseRowGroups")); + } + reverse_row_groups__ = Some(map_.next_value()?); + } + GeneratedField::ReverseRows => { + if reverse_rows__.is_some() { + return Err(serde::de::Error::duplicate_field("reverseRows")); + } + reverse_rows__ = Some(map_.next_value()?); + } } } Ok(ParquetScanExecNode { base_conf: base_conf__, predicate: predicate__, parquet_options: parquet_options__, + reverse_row_groups: reverse_row_groups__.unwrap_or_default(), + reverse_rows: reverse_rows__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index cf343e0258d0b..da105c9c0059f 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1607,6 +1607,10 @@ pub struct ParquetScanExecNode { pub parquet_options: ::core::option::Option< super::datafusion_common::TableParquetOptions, >, + #[prost(bool, tag = "5")] + pub reverse_row_groups: bool, + #[prost(bool, tag = "6")] + pub reverse_rows: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct CsvScanExecNode { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 4ff90b61eed9c..b0c0db0f13a66 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -724,6 +724,14 @@ impl protobuf::PhysicalPlanNode { if let Some(predicate) = predicate { source = source.with_predicate(predicate); } + + // Restore runtime reverse-scan flags from proto + if scan.reverse_row_groups { + source = source.with_reverse_row_groups(true); + } + if scan.reverse_rows { + source = source.with_reverse_rows(true); + } let base_config = parse_protobuf_file_scan_config( base_conf, ctx, @@ -2672,6 +2680,8 @@ impl protobuf::PhysicalPlanNode { )?), predicate, parquet_options: Some(conf.table_parquet_options().try_into()?), + reverse_row_groups: conf.reverse_row_groups(), + reverse_rows: conf.reverse_rows(), }, )), })); From f0004c4318e6c899bb7ede56e0187c474b91c2f8 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 23 Apr 2026 14:34:36 +0800 Subject: [PATCH 10/10] Add multi-RG tests and fix RG iteration order Fix: per-RG iteration was popping from Vec back, yielding forward order instead of reversed. Changed to VecDeque::pop_front to correctly iterate RGs in reversed order (highest RG first). Added SLT tests: - 12.10: multi-RG exact reverse (max_row_group_size=3, 10 rows = 4 RGs) - Full DESC scan: correct order across all RG boundaries - LIMIT spanning RG boundaries - 12.11: multi-RG + pushdown_filters + predicate + LIMIT --- datafusion/datasource-parquet/src/opener.rs | 409 ++++-------------- datafusion/proto/src/generated/prost.rs | 3 + .../sqllogictest/test_files/sort_pushdown.slt | 99 +++++ 3 files changed, 189 insertions(+), 322 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 9143e813d2b8d..bc97cf5fcce5c 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -172,6 +172,54 @@ impl PreparedAccessPlan { Ok(self) } + /// Split the overall row_selection into per-RG selections. + /// Returns a map from RG index → RowSelection for that RG. + pub(crate) fn per_rg_selections( + &self, + rg_metadata: &[RowGroupMetaData], + ) -> HashMap { + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + + let mut result = HashMap::new(); + let Some(ref overall) = self.row_selection else { + return result; + }; + + let mut selectors = overall.iter().peekable(); + let mut current_remaining: usize = 0; + let mut current_skip: bool = false; + + for &rg_idx in &self.row_group_indexes { + let mut rows_left = rg_metadata[rg_idx].num_rows() as usize; + let mut rg_selectors = Vec::new(); + + while rows_left > 0 { + if current_remaining == 0 { + if let Some(sel) = selectors.next() { + current_remaining = sel.row_count; + current_skip = sel.skip; + } else { + break; + } + } + let consumed = rows_left.min(current_remaining); + if current_skip { + rg_selectors.push(RowSelector::skip(consumed)); + } else { + rg_selectors.push(RowSelector::select(consumed)); + } + rows_left -= consumed; + current_remaining -= consumed; + } + + if !rg_selectors.is_empty() { + result.insert(rg_idx, RowSelection::from(rg_selectors)); + } + } + + result + } + /// Apply this access plan to a ParquetRecordBatchStreamBuilder fn apply_to_builder( self, @@ -187,45 +235,6 @@ impl PreparedAccessPlan { /// Compute per-row-group *selected* row counts for exact reverse buffering. /// /// `RowSelection` is a flat sequence of `RowSelector` values (alternating -/// skip/select) applied to the concatenation of all selected row groups. -/// To know how many rows each row group will emit, we walk both sequences -/// in lock-step and accumulate the `select` portions per row group. -fn compute_selected_rows_per_rg( - row_group_indexes: &[usize], - rg_metadata: &[RowGroupMetaData], - row_selection: &parquet::arrow::arrow_reader::RowSelection, -) -> Result> { - let mut selectors = row_selection.iter(); - let mut current_remaining: usize = 0; - let mut current_skip: bool = false; - - let mut result = Vec::with_capacity(row_group_indexes.len()); - for &rg_idx in row_group_indexes { - let mut rows_left_in_rg = rg_metadata[rg_idx].num_rows() as usize; - let mut selected = 0usize; - while rows_left_in_rg > 0 { - if current_remaining == 0 { - let Some(sel) = selectors.next() else { - return Err(DataFusionError::Internal( - "RowSelection ended before covering all planned row groups" - .to_string(), - )); - }; - current_remaining = sel.row_count; - current_skip = sel.skip; - } - let consumed = rows_left_in_rg.min(current_remaining); - if !current_skip { - selected += consumed; - } - rows_left_in_rg -= consumed; - current_remaining -= consumed; - } - result.push(selected); - } - Ok(result) -} - impl FileOpener for ParquetOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { let file_range = partitioned_file.range.clone(); @@ -627,6 +636,7 @@ impl FileOpener for ParquetOpener { // Memory: O(largest RG). Modeled after Atlas's ReverseParquetSource. if reverse_rows { let rg_indexes = prepared_plan.row_group_indexes.clone(); + let per_rg_sels = prepared_plan.per_rg_selections(rg_metadata); let files_ranges_pruned_statistics = file_metrics.files_ranges_pruned_statistics.clone(); let reader_metadata = reader_metadata_for_reverse @@ -635,8 +645,11 @@ impl FileOpener for ParquetOpener { let physical_file_schema_for_filter = Arc::clone(&physical_file_schema); let file_metrics_for_rg = file_metrics.clone(); + // rg_indexes is already in reversed order [3,2,1,0]. + // Use VecDeque to pop from front (highest RG first). + let rg_deque: VecDeque = rg_indexes.into_iter().collect(); let stream = futures::stream::try_unfold( - (rg_indexes, limit), + (rg_deque, limit), move |(mut rg_indexes, mut remaining_limit)| { let reader_metadata = Arc::clone(&reader_metadata); let mask = mask.clone(); @@ -650,10 +663,11 @@ impl FileOpener for ParquetOpener { let physical_file_schema = Arc::clone(&physical_file_schema_for_filter); let file_metrics = file_metrics_for_rg.clone(); + let per_rg_sels = per_rg_sels.clone(); async move { - // Pop from back — RGs are already in reversed order - let rg_idx = match rg_indexes.pop() { + // Pop from front — RGs are in reversed order [3,2,1,0] + let rg_idx = match rg_indexes.pop_front() { Some(idx) if remaining_limit != Some(0) => idx, _ => { return Ok::<_, DataFusionError>(None); @@ -676,7 +690,7 @@ impl FileOpener for ParquetOpener { ); // Apply predicate pushdown to per-RG builder - if let Some(ref pred) = pushdown_filters + if let Some(pred) = pushdown_filters .then_some(pushdown_predicate.as_ref()) .flatten() { @@ -696,10 +710,18 @@ impl FileOpener for ParquetOpener { }; } - if let Some(max_predicate_cache_size) = max_predicate_cache_size + if let Some(max_predicate_cache_size) = + max_predicate_cache_size { - rg_builder = rg_builder - .with_max_predicate_cache_size(max_predicate_cache_size); + rg_builder = rg_builder.with_max_predicate_cache_size( + max_predicate_cache_size, + ); + } + + // Apply per-RG row selection (from page pruning) + if let Some(rg_sel) = per_rg_sels.get(&rg_idx) { + rg_builder = + rg_builder.with_row_selection(rg_sel.clone()); } let rg_stream = rg_builder @@ -710,10 +732,9 @@ impl FileOpener for ParquetOpener { let stream_schema = Arc::clone(rg_stream.schema()); let replace_schema = !stream_schema.eq(&output_schema); - let projection = projection - .try_map_exprs(|expr| { - reassign_expr_columns(expr, &stream_schema) - })?; + let projection = projection.try_map_exprs(|expr| { + reassign_expr_columns(expr, &stream_schema) + })?; let projector = projection.make_projector(&stream_schema)?; // Read all batches for this RG, apply projection @@ -739,13 +760,12 @@ impl FileOpener for ParquetOpener { reversed.push(batch); continue; } - let indices = - arrow::array::UInt32Array::from_iter_values( - (0..batch.num_rows() as u32).rev(), - ); - reversed.push( - arrow::compute::take_record_batch(&batch, &indices)?, + let indices = arrow::array::UInt32Array::from_iter_values( + (0..batch.num_rows() as u32).rev(), ); + reversed.push(arrow::compute::take_record_batch( + &batch, &indices, + )?); } // Apply limit across RGs @@ -795,7 +815,8 @@ impl FileOpener for ParquetOpener { } if let Some(max_predicate_cache_size) = max_predicate_cache_size { - builder = builder.with_max_predicate_cache_size(max_predicate_cache_size); + builder = + builder.with_max_predicate_cache_size(max_predicate_cache_size); } let arrow_reader_metrics = ArrowReaderMetrics::enabled(); @@ -810,7 +831,8 @@ impl FileOpener for ParquetOpener { file_metrics.files_ranges_pruned_statistics.clone(); let predicate_cache_inner_records = file_metrics.predicate_cache_inner_records.clone(); - let predicate_cache_records = file_metrics.predicate_cache_records.clone(); + let predicate_cache_records = + file_metrics.predicate_cache_records.clone(); let stream_schema = Arc::clone(stream.schema()); let replace_schema = !stream_schema.eq(&output_schema); @@ -852,167 +874,6 @@ impl FileOpener for ParquetOpener { } } -/// Buffers batches per row group, then emits them in reversed order with -/// reversed rows within each batch. Memory: O(largest row group). -/// -/// The input stream has row groups already in reversed order (via -/// `PreparedAccessPlan::reverse`). This stream reverses the row order -/// *within* each row group so the final output is in exact descending order. -struct ReversedRowGroupStream { - inner: S, - /// Number of rows in each row group (in read order, already reversed) - rg_row_counts: Vec, - /// Index of the current row group being buffered - current_rg: usize, - /// Rows remaining in the current row group - rows_remaining_in_rg: usize, - /// Buffered batches for the current row group - buffer: Vec, - /// Reversed batches ready to emit - output_buffer: VecDeque, - /// Whether the inner stream is exhausted - done: bool, - /// Optional row limit (applied after reversal for correct results) - remaining_limit: Option, -} - -impl ReversedRowGroupStream { - fn new(inner: S, rg_row_counts: Vec, limit: Option) -> Self { - // Skip leading empty RGs (all rows filtered by RowSelection). - // Without this, rows_remaining_in_rg=0 causes the first batch from - // the next real RG to immediately trigger flush_buffer(), attributing - // it to the wrong (empty) RG. - let mut current_rg = 0; - while current_rg < rg_row_counts.len() && rg_row_counts[current_rg] == 0 { - current_rg += 1; - } - let rows_remaining = rg_row_counts.get(current_rg).copied().unwrap_or(0); - Self { - inner, - rg_row_counts, - current_rg, - rows_remaining_in_rg: rows_remaining, - buffer: Vec::new(), - output_buffer: VecDeque::new(), - done: false, - remaining_limit: limit, - } - } - - /// Truncate batch to remaining limit and update the counter. - /// Returns the (possibly truncated) batch. - fn apply_limit(&mut self, batch: RecordBatch) -> RecordBatch { - if let Some(remaining) = self.remaining_limit.as_mut() { - let rows = batch.num_rows(); - if rows <= *remaining { - *remaining -= rows; - batch - } else { - let truncated = batch.slice(0, *remaining); - *remaining = 0; - truncated - } - } else { - batch - } - } - - /// Reverse the buffered batches: reverse batch order, reverse rows - /// within each batch, and move them to output_buffer. - fn flush_buffer(&mut self) -> Result<()> { - let batches = std::mem::take(&mut self.buffer); - for batch in batches.into_iter().rev() { - if batch.num_rows() <= 1 { - self.output_buffer.push_back(batch); - continue; - } - let indices = arrow::array::UInt32Array::from_iter_values( - (0..batch.num_rows() as u32).rev(), - ); - let reversed = arrow::compute::take_record_batch(&batch, &indices)?; - self.output_buffer.push_back(reversed); - } - // Advance to next row group, skipping any empty RGs (all rows - // filtered by RowSelection). Without this skip, rows_remaining_in_rg=0 - // would cause the next batch to immediately trigger another flush, - // splitting a real RG's batches across two flush cycles. - self.current_rg += 1; - while self.current_rg < self.rg_row_counts.len() - && self.rg_row_counts[self.current_rg] == 0 - { - self.current_rg += 1; - } - self.rows_remaining_in_rg = self - .rg_row_counts - .get(self.current_rg) - .copied() - .unwrap_or(0); - Ok(()) - } -} - -impl Stream for ReversedRowGroupStream -where - S: Stream> + Unpin, -{ - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - // Check if limit has been reached - if self.remaining_limit == Some(0) { - return Poll::Ready(None); - } - - // First, emit any already-reversed batches - if let Some(batch) = self.output_buffer.pop_front() { - return Poll::Ready(Some(Ok(self.apply_limit(batch)))); - } - - if self.done { - return Poll::Ready(None); - } - - // Pull batches from the inner stream until we complete a row group - loop { - match ready!(self.inner.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - let num_rows = batch.num_rows(); - self.buffer.push(batch); - self.rows_remaining_in_rg = - self.rows_remaining_in_rg.saturating_sub(num_rows); - - if self.rows_remaining_in_rg == 0 { - // Row group complete — flush buffer - if let Err(e) = self.flush_buffer() { - return Poll::Ready(Some(Err(e))); - } - if let Some(batch) = self.output_buffer.pop_front() { - return Poll::Ready(Some(Ok(self.apply_limit(batch)))); - } - } - } - Some(Err(e)) => return Poll::Ready(Some(Err(e))), - None => { - self.done = true; - // Flush any remaining buffered batches - if !self.buffer.is_empty() - && let Err(e) = self.flush_buffer() - { - return Poll::Ready(Some(Err(e))); - } - if let Some(batch) = self.output_buffer.pop_front() { - return Poll::Ready(Some(Ok(self.apply_limit(batch)))); - } - return Poll::Ready(None); - } - } - } - } -} - /// Copies metrics from ArrowReaderMetrics (the metrics collected by the /// arrow-rs parquet reader) to the parquet file metrics for DataFusion /// Adapt a RecordBatch to match the expected output schema. @@ -1031,14 +892,12 @@ fn adapt_batch_schema( if array.data_type() == target_type { Ok(Arc::clone(array)) } else { - let casted = if arrow::compute::can_cast_types( - array.data_type(), - target_type, - ) { - arrow::compute::cast(array, target_type)? - } else { - Arc::clone(array) - }; + let casted = + if arrow::compute::can_cast_types(array.data_type(), target_type) { + arrow::compute::cast(array, target_type)? + } else { + Arc::clone(array) + }; if casted.data_type() != target_type { let data = casted .to_data() @@ -1405,9 +1264,7 @@ mod test { use std::pin::Pin; use std::sync::Arc; - use super::{ - ConstantColumns, compute_selected_rows_per_rg, constant_columns_from_stats, - }; + use super::{ConstantColumns, constant_columns_from_stats}; use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; @@ -1430,7 +1287,6 @@ mod test { use futures::{Stream, StreamExt}; use object_store::{ObjectStore, memory::InMemory, path::Path}; use parquet::arrow::ArrowWriter; - use parquet::file::metadata::RowGroupMetaData; use parquet::file::properties::WriterProperties; /// Builder for creating [`ParquetOpener`] instances with sensible defaults for tests. @@ -2729,87 +2585,6 @@ mod test { // reverse_row_groups only: Inexact — RGs reversed, rows within RG still ASC. // reverse_row_groups + reverse_rows: Exact — globally DESC. // - // The helper `compute_selected_rows_per_rg` is also unit-tested below, since a - // `RowSelection` produced by page pruning can make the parquet stream emit - // fewer rows per RG than `RowGroupMetaData::num_rows()` would suggest. - - /// Build a `RowSelection` from a flat list of `(skip, row_count)` pairs. - fn row_selection_from_pairs( - pairs: &[(bool, usize)], - ) -> parquet::arrow::arrow_reader::RowSelection { - use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; - let selectors: Vec = pairs - .iter() - .map(|&(skip, n)| { - if skip { - RowSelector::skip(n) - } else { - RowSelector::select(n) - } - }) - .collect(); - RowSelection::from(selectors) - } - - /// Build a stub `RowGroupMetaData` with the given row count. - /// - /// `compute_selected_rows_per_rg` only reads `num_rows()` from the metadata, - /// so we can construct a minimal one with just that field populated. - fn stub_rg(num_rows: i64) -> RowGroupMetaData { - use parquet::schema::types::{SchemaDescriptor, Type}; - let schema = Arc::new(SchemaDescriptor::new(Arc::new( - Type::group_type_builder("schema").build().unwrap(), - ))); - RowGroupMetaData::builder(schema) - .set_num_rows(num_rows) - .build() - .unwrap() - } - - #[test] - fn test_compute_selected_rows_per_rg_no_skip() { - // Selection that selects everything → output == raw num_rows per RG. - let rgs = vec![stub_rg(4), stub_rg(6), stub_rg(5)]; - let sel = row_selection_from_pairs(&[(false, 15)]); - let counts = compute_selected_rows_per_rg(&[0, 1, 2], &rgs, &sel).unwrap(); - assert_eq!(counts, vec![4, 6, 5]); - } - - #[test] - fn test_compute_selected_rows_per_rg_skip_spanning_rgs() { - // RG sizes: [4, 6, 5] = 15 rows total. - // Selection: skip 5, select 7, skip 3 → rows [6..=12] chosen. - // RG0 (rows 0..4) : skip all 4 → 0 selected - // RG1 (rows 4..10) : skip 1, select 5 → 5 selected - // RG2 (rows 10..15): select 2, skip 3 → 2 selected - let rgs = vec![stub_rg(4), stub_rg(6), stub_rg(5)]; - let sel = row_selection_from_pairs(&[(true, 5), (false, 7), (true, 3)]); - let counts = compute_selected_rows_per_rg(&[0, 1, 2], &rgs, &sel).unwrap(); - assert_eq!(counts, vec![0, 5, 2]); - } - - #[test] - fn test_compute_selected_rows_per_rg_all_skipped() { - // Every row is skipped — each RG emits 0 rows. - let rgs = vec![stub_rg(3), stub_rg(3)]; - let sel = row_selection_from_pairs(&[(true, 6)]); - let counts = compute_selected_rows_per_rg(&[0, 1], &rgs, &sel).unwrap(); - assert_eq!(counts, vec![0, 0]); - } - - #[test] - fn test_compute_selected_rows_per_rg_short_selection_errors() { - // Selection covers only 5 rows but RGs sum to 10 → must error instead of - // silently returning garbage counts. - let rgs = vec![stub_rg(5), stub_rg(5)]; - let sel = row_selection_from_pairs(&[(false, 5)]); - let err = compute_selected_rows_per_rg(&[0, 1], &rgs, &sel).unwrap_err(); - assert!( - format!("{err}").contains("RowSelection ended before"), - "unexpected error: {err}" - ); - } - #[tokio::test] async fn test_exact_reverse_scan_multi_rg_produces_global_desc() { // Three RGs, each with an ascending run. With reverse_row_groups + @@ -3138,14 +2913,4 @@ mod test { "empty RG (all rows skipped) must be handled without corrupting order" ); } - - #[test] - fn test_compute_selected_rows_per_rg_with_fully_skipped_middle_rg() { - // RG sizes: [4, 4, 4]. RG1 fully skipped. - // Selection: select 4, skip 4, select 4 - let rgs = vec![stub_rg(4), stub_rg(4), stub_rg(4)]; - let sel = row_selection_from_pairs(&[(false, 4), (true, 4), (false, 4)]); - let counts = compute_selected_rows_per_rg(&[0, 1, 2], &rgs, &sel).unwrap(); - assert_eq!(counts, vec![4, 0, 4], "middle RG fully skipped → 0 rows"); - } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index da105c9c0059f..d178c41865d30 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1607,6 +1607,9 @@ pub struct ParquetScanExecNode { pub parquet_options: ::core::option::Option< super::datafusion_common::TableParquetOptions, >, + /// Runtime reverse-scan flags set by PushdownSort optimizer. + /// Must be preserved across proto roundtrips so that remote executors + /// produce correctly reversed output after SortExec removal. #[prost(bool, tag = "5")] pub reverse_row_groups: bool, #[prost(bool, tag = "6")] diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 9b4b6db0c37a1..ed65db8aa0b3c 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -1692,6 +1692,105 @@ DROP TABLE exact_rev_pushdown_parquet; statement ok SET datafusion.execution.parquet.pushdown_filters = false; +# Test 12.10: Multi-RG exact reverse — create a file with small row groups +# to verify per-RG independent reverse works across RG boundaries. + +statement ok +SET datafusion.execution.parquet.enable_exact_reverse_scan = true; + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 3; + +statement ok +CREATE TABLE exact_rev_multi_rg_data(id INT, value INT) AS VALUES +(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), +(6, 60), (7, 70), (8, 80), (9, 90), (10, 100); + +query I +COPY (SELECT * FROM exact_rev_multi_rg_data ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/exact_rev_multi_rg.parquet'; +---- +10 + +statement ok +CREATE EXTERNAL TABLE exact_rev_multi_rg(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/exact_rev_multi_rg.parquet' +WITH ORDER (id ASC); + +# Sort removed (Exact), correct DESC results across multiple row groups +query TT +EXPLAIN SELECT * FROM exact_rev_multi_rg ORDER BY id DESC; +---- +logical_plan +01)Sort: exact_rev_multi_rg.id DESC NULLS FIRST +02)--TableScan: exact_rev_multi_rg projection=[id, value] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/exact_rev_multi_rg.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, scan_direction=Reversed + +query II +SELECT * FROM exact_rev_multi_rg ORDER BY id DESC; +---- +10 100 +9 90 +8 80 +7 70 +6 60 +5 50 +4 40 +3 30 +2 20 +1 10 + +# LIMIT across RG boundaries (limit=4 spans RGs of size 3) +query II +SELECT * FROM exact_rev_multi_rg ORDER BY id DESC LIMIT 4; +---- +10 100 +9 90 +8 80 +7 70 + +# Test 12.11: Multi-RG + pushdown_filters — per-RG RowFilter applied correctly +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +statement ok +CREATE EXTERNAL TABLE exact_rev_multi_rg_filtered(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/exact_rev_multi_rg.parquet' +WITH ORDER (id ASC); + +query II +SELECT * FROM exact_rev_multi_rg_filtered WHERE value > 50 ORDER BY id DESC; +---- +10 100 +9 90 +8 80 +7 70 +6 60 + +query II +SELECT * FROM exact_rev_multi_rg_filtered WHERE value > 50 ORDER BY id DESC LIMIT 3; +---- +10 100 +9 90 +8 80 + +statement ok +DROP TABLE exact_rev_multi_rg_filtered; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = false; + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +DROP TABLE exact_rev_multi_rg; + +statement ok +DROP TABLE exact_rev_multi_rg_data; + # Cleanup exact reverse test tables statement ok DROP TABLE exact_rev_data;