From a48b85834db723f4b8d43e97ebd8ae3639000794 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Wed, 28 Jan 2026 15:45:18 +0530 Subject: [PATCH 1/3] refactor: Rename FileSource::try_reverse_output to FileSource::try_pushdown_sort --- datafusion/datasource-parquet/src/opener.rs | 2 +- datafusion/datasource-parquet/src/source.rs | 2 +- datafusion/datasource/src/file.rs | 16 +- datafusion/datasource/src/file_scan_config.rs | 147 ++++++++++++++++-- 4 files changed, 150 insertions(+), 17 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 50407235e7105..50e5802f3f2b9 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -581,7 +581,7 @@ impl FileOpener for ParquetOpener { // ---------------------------------------------------------- // Step: potentially reverse the access plan for performance. - // See `ParquetSource::try_reverse_output` for the rationale. + // See `ParquetSource::try_pushdown_sort` for the rationale. // ---------------------------------------------------------- if reverse_row_groups { prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 07f58db185f49..75d87a4cd16fc 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -757,7 +757,7 @@ impl FileSource for ParquetSource { /// # Returns /// - `Inexact`: Created an optimized source (e.g., reversed scan) that approximates the order /// - `Unsupported`: Cannot optimize for this ordering - fn try_reverse_output( + fn try_pushdown_sort( &self, order: &[PhysicalSortExpr], eq_properties: &EquivalenceProperties, diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index f5380c27ecc28..a2eb6d4653fe6 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -189,7 +189,21 @@ pub trait FileSource: Send + Sync { /// * `Inexact` - Created a source optimized for ordering (e.g., reversed row groups) but not perfectly sorted /// * `Unsupported` - Cannot optimize for this ordering /// - /// Default implementation returns `Unsupported`. + /// Default implementation delegates to [`Self::try_reverse_output`]. + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + eq_properties: &EquivalenceProperties, + ) -> Result>> { + #[allow(deprecated)] + self.try_reverse_output(order, eq_properties) + } + + /// Deprecated: Renamed to [`Self::try_pushdown_sort`]. + #[deprecated( + since = "52.0.0", + note = "Renamed to try_pushdown_sort. This method was never limited to reversing output." + )] fn try_reverse_output( &self, _order: &[PhysicalSortExpr], diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 51b9ba9e06e9b..fe558eaf5dd61 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -851,20 +851,20 @@ impl DataSource for FileScanConfig { &self, order: &[PhysicalSortExpr], ) -> Result>> { - // Delegate to FileSource to check if reverse scanning can satisfy the request. + // Delegate to FileSource to see if it can optimize for the requested ordering. let pushdown_result = self .file_source - .try_reverse_output(order, &self.eq_properties())?; + .try_pushdown_sort(order, &self.eq_properties())?; match pushdown_result { SortOrderPushdownResult::Exact { inner } => { Ok(SortOrderPushdownResult::Exact { - inner: self.rebuild_with_source(inner, true)?, + inner: self.rebuild_with_source(inner, true, order)?, }) } SortOrderPushdownResult::Inexact { inner } => { Ok(SortOrderPushdownResult::Inexact { - inner: self.rebuild_with_source(inner, false)?, + inner: self.rebuild_with_source(inner, false, order)?, }) } SortOrderPushdownResult::Unsupported => { @@ -1157,19 +1157,30 @@ impl FileScanConfig { &self, new_file_source: Arc, is_exact: bool, + order: &[PhysicalSortExpr], ) -> Result> { let mut new_config = self.clone(); - // Reverse file groups (FileScanConfig's responsibility) - new_config.file_groups = new_config - .file_groups - .into_iter() - .map(|group| { - let mut files = group.into_inner(); - files.reverse(); - files.into() - }) - .collect(); + // Reverse file groups (FileScanConfig's responsibility) if doing so helps satisfy the + // requested ordering. + let reverse_file_groups = + LexOrdering::new(order.iter().cloned()).is_some_and(|requested| { + self.output_ordering + .iter() + .any(|ordering| ordering.is_reverse(&requested)) + }); + + if reverse_file_groups { + new_config.file_groups = new_config + .file_groups + .into_iter() + .map(|group| { + let mut files = group.into_inner(); + files.reverse(); + files.into() + }) + .collect(); + } new_config.file_source = new_file_source; @@ -1392,6 +1403,62 @@ mod tests { use datafusion_physical_expr::projection::ProjectionExpr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; + #[derive(Clone)] + struct InexactSortPushdownSource { + metrics: ExecutionPlanMetricsSet, + table_schema: TableSchema, + } + + impl InexactSortPushdownSource { + fn new(table_schema: TableSchema) -> Self { + Self { + metrics: ExecutionPlanMetricsSet::new(), + table_schema, + } + } + } + + impl FileSource for InexactSortPushdownSource { + fn create_file_opener( + &self, + _object_store: Arc, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Result> { + unimplemented!() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn table_schema(&self) -> &TableSchema { + &self.table_schema + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc { + Arc::new(self.clone()) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn file_type(&self) -> &str { + "mock" + } + + fn try_pushdown_sort( + &self, + _order: &[PhysicalSortExpr], + _eq_properties: &EquivalenceProperties, + ) -> Result>> { + Ok(SortOrderPushdownResult::Inexact { + inner: Arc::new(self.clone()) as Arc, + }) + } + } + #[test] fn physical_plan_config_no_projection_tab_cols_as_field() { let file_schema = aggr_test_schema(); @@ -2337,4 +2404,56 @@ mod tests { _ => panic!("Expected Hash partitioning"), } } + + #[test] + fn try_pushdown_sort_reverses_file_groups_only_when_requested_is_reverse() + -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + + let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let file_source = Arc::new(InexactSortPushdownSource::new(table_schema)); + + let file_groups = vec![FileGroup::new(vec![ + PartitionedFile::new("file1", 1), + PartitionedFile::new("file2", 1), + ])]; + + let sort_expr_asc = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(file_groups) + .with_output_ordering(vec![ + LexOrdering::new(vec![sort_expr_asc.clone()]).unwrap(), + ]) + .build(); + + let requested_asc = vec![sort_expr_asc.clone()]; + let result = config.try_pushdown_sort(&requested_asc)?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let pushed_files = pushed_config.file_groups[0].files(); + assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file1"); + assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file2"); + + let requested_desc = vec![sort_expr_asc.reverse()]; + let result = config.try_pushdown_sort(&requested_desc)?; + let SortOrderPushdownResult::Inexact { inner } = result else { + panic!("Expected Inexact result"); + }; + let pushed_config = inner + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let pushed_files = pushed_config.file_groups[0].files(); + assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file2"); + assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file1"); + + Ok(()) + } } From 7544fe26c8c26be030d74b48d22fcbcf7f7aa288 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Wed, 28 Jan 2026 18:45:15 +0530 Subject: [PATCH 2/3] fix clippy --- datafusion/datasource/src/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index a2eb6d4653fe6..31149938253a2 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -195,7 +195,7 @@ pub trait FileSource: Send + Sync { order: &[PhysicalSortExpr], eq_properties: &EquivalenceProperties, ) -> Result>> { - #[allow(deprecated)] + #[expect(deprecated)] self.try_reverse_output(order, eq_properties) } From 7d9eff578249dec640195df9714e99dc8a66f287 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Thu, 29 Jan 2026 21:53:02 +0530 Subject: [PATCH 3/3] proper explaination of deprecation --- datafusion/datasource/src/file.rs | 12 ++++++-- datafusion/datasource/src/file_scan_config.rs | 28 ++++++++++++++----- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 31149938253a2..ddcf075fc5b32 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -189,7 +189,15 @@ pub trait FileSource: Send + Sync { /// * `Inexact` - Created a source optimized for ordering (e.g., reversed row groups) but not perfectly sorted /// * `Unsupported` - Cannot optimize for this ordering /// - /// Default implementation delegates to [`Self::try_reverse_output`]. + /// # Deprecation / migration notes + /// - [`Self::try_reverse_output`] was renamed to this method and deprecated since `52.0.0`. + /// Per DataFusion's deprecation guidelines, it will be removed in `58.0.0` or later + /// (6 major versions or 6 months, whichever is longer). + /// - New implementations should override [`Self::try_pushdown_sort`] directly. + /// - For backwards compatibility, the default implementation of + /// [`Self::try_pushdown_sort`] delegates to the deprecated + /// [`Self::try_reverse_output`] until it is removed. After that point, the + /// default implementation will return [`SortOrderPushdownResult::Unsupported`]. fn try_pushdown_sort( &self, order: &[PhysicalSortExpr], @@ -202,7 +210,7 @@ pub trait FileSource: Send + Sync { /// Deprecated: Renamed to [`Self::try_pushdown_sort`]. #[deprecated( since = "52.0.0", - note = "Renamed to try_pushdown_sort. This method was never limited to reversing output." + note = "Renamed to try_pushdown_sort. This method was never limited to reversing output. It will be removed in 58.0.0 or later." )] fn try_reverse_output( &self, diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index fe558eaf5dd61..fe78c0e5262a4 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1161,14 +1161,28 @@ impl FileScanConfig { ) -> Result> { let mut new_config = self.clone(); - // Reverse file groups (FileScanConfig's responsibility) if doing so helps satisfy the + // Reverse file order (within each group) if the caller is requesting a reversal of this + // scan's declared output ordering. + // + // Historically this function always reversed `file_groups` because it was only reached + // via `FileSource::try_reverse_output` (where a reversal was the only supported + // optimization). + // + // Now that `FileSource::try_pushdown_sort` is generic, we must not assume reversal: other + // optimizations may become possible (e.g. already-sorted data, statistics-based file + // reordering). Therefore we only reverse files when it is known to help satisfy the // requested ordering. - let reverse_file_groups = - LexOrdering::new(order.iter().cloned()).is_some_and(|requested| { - self.output_ordering - .iter() - .any(|ordering| ordering.is_reverse(&requested)) - }); + let reverse_file_groups = if self.output_ordering.is_empty() { + false + } else if let Some(requested) = LexOrdering::new(order.iter().cloned()) { + let projected_schema = self.projected_schema()?; + let orderings = project_orderings(&self.output_ordering, &projected_schema); + orderings + .iter() + .any(|ordering| ordering.is_reverse(&requested)) + } else { + false + }; if reverse_file_groups { new_config.file_groups = new_config