Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ impl FileOpener for ParquetOpener {

// ----------------------------------------------------------
// Step: potentially reverse the access plan for performance.
// See `ParquetSource::try_reverse_output` for the rationale.
// See `ParquetSource::try_pushdown_sort` for the rationale.
// ----------------------------------------------------------
if reverse_row_groups {
prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ impl FileSource for ParquetSource {
/// # Returns
/// - `Inexact`: Created an optimized source (e.g., reversed scan) that approximates the order
/// - `Unsupported`: Cannot optimize for this ordering
fn try_reverse_output(
fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
eq_properties: &EquivalenceProperties,
Expand Down
24 changes: 23 additions & 1 deletion datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,29 @@ pub trait FileSource: Send + Sync {
/// * `Inexact` - Created a source optimized for ordering (e.g., reversed row groups) but not perfectly sorted
/// * `Unsupported` - Cannot optimize for this ordering
///
/// Default implementation returns `Unsupported`.
/// # Deprecation / migration notes
/// - [`Self::try_reverse_output`] was renamed to this method and deprecated since `52.0.0`.
/// Per DataFusion's deprecation guidelines, it will be removed in `58.0.0` or later
/// (6 major versions or 6 months, whichever is longer).
/// - New implementations should override [`Self::try_pushdown_sort`] directly.
/// - For backwards compatibility, the default implementation of
/// [`Self::try_pushdown_sort`] delegates to the deprecated
/// [`Self::try_reverse_output`] until it is removed. After that point, the
/// default implementation will return [`SortOrderPushdownResult::Unsupported`].
fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
eq_properties: &EquivalenceProperties,
) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
#[expect(deprecated)]
self.try_reverse_output(order, eq_properties)
}

/// Deprecated: Renamed to [`Self::try_pushdown_sort`].
#[deprecated(
since = "52.0.0",
note = "Renamed to try_pushdown_sort. This method was never limited to reversing output. It will be removed in 58.0.0 or later."
)]
fn try_reverse_output(
&self,
_order: &[PhysicalSortExpr],
Expand Down
161 changes: 147 additions & 14 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,20 +851,20 @@ impl DataSource for FileScanConfig {
&self,
order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
// Delegate to FileSource to check if reverse scanning can satisfy the request.
// Delegate to FileSource to see if it can optimize for the requested ordering.
let pushdown_result = self
.file_source
.try_reverse_output(order, &self.eq_properties())?;
.try_pushdown_sort(order, &self.eq_properties())?;

match pushdown_result {
SortOrderPushdownResult::Exact { inner } => {
Ok(SortOrderPushdownResult::Exact {
inner: self.rebuild_with_source(inner, true)?,
inner: self.rebuild_with_source(inner, true, order)?,
})
}
SortOrderPushdownResult::Inexact { inner } => {
Ok(SortOrderPushdownResult::Inexact {
inner: self.rebuild_with_source(inner, false)?,
inner: self.rebuild_with_source(inner, false, order)?,
})
}
SortOrderPushdownResult::Unsupported => {
Expand Down Expand Up @@ -1157,19 +1157,44 @@ impl FileScanConfig {
&self,
new_file_source: Arc<dyn FileSource>,
is_exact: bool,
order: &[PhysicalSortExpr],
) -> Result<Arc<dyn DataSource>> {
let mut new_config = self.clone();

// Reverse file groups (FileScanConfig's responsibility)
new_config.file_groups = new_config
.file_groups
.into_iter()
.map(|group| {
let mut files = group.into_inner();
files.reverse();
files.into()
})
.collect();
// Reverse file order (within each group) if the caller is requesting a reversal of this
// scan's declared output ordering.
//
// Historically this function always reversed `file_groups` because it was only reached
// via `FileSource::try_reverse_output` (where a reversal was the only supported
// optimization).
//
// Now that `FileSource::try_pushdown_sort` is generic, we must not assume reversal: other
// optimizations may become possible (e.g. already-sorted data, statistics-based file
// reordering). Therefore we only reverse files when it is known to help satisfy the
// requested ordering.
let reverse_file_groups = if self.output_ordering.is_empty() {
false
} else if let Some(requested) = LexOrdering::new(order.iter().cloned()) {
let projected_schema = self.projected_schema()?;
let orderings = project_orderings(&self.output_ordering, &projected_schema);
orderings
.iter()
.any(|ordering| ordering.is_reverse(&requested))
} else {
false
};

if reverse_file_groups {
new_config.file_groups = new_config
.file_groups
.into_iter()
.map(|group| {
let mut files = group.into_inner();
files.reverse();
files.into()
})
.collect();
}

new_config.file_source = new_file_source;

Expand Down Expand Up @@ -1392,6 +1417,62 @@ mod tests {
use datafusion_physical_expr::projection::ProjectionExpr;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;

#[derive(Clone)]
struct InexactSortPushdownSource {
metrics: ExecutionPlanMetricsSet,
table_schema: TableSchema,
}

impl InexactSortPushdownSource {
fn new(table_schema: TableSchema) -> Self {
Self {
metrics: ExecutionPlanMetricsSet::new(),
table_schema,
}
}
}

impl FileSource for InexactSortPushdownSource {
fn create_file_opener(
&self,
_object_store: Arc<dyn object_store::ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn crate::file_stream::FileOpener>> {
unimplemented!()
}

fn as_any(&self) -> &dyn Any {
self
}

fn table_schema(&self) -> &TableSchema {
&self.table_schema
}

fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
Arc::new(self.clone())
}

fn metrics(&self) -> &ExecutionPlanMetricsSet {
&self.metrics
}

fn file_type(&self) -> &str {
"mock"
}

fn try_pushdown_sort(
&self,
_order: &[PhysicalSortExpr],
_eq_properties: &EquivalenceProperties,
) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(self.clone()) as Arc<dyn FileSource>,
})
}
}

#[test]
fn physical_plan_config_no_projection_tab_cols_as_field() {
let file_schema = aggr_test_schema();
Expand Down Expand Up @@ -2337,4 +2418,56 @@ mod tests {
_ => panic!("Expected Hash partitioning"),
}
}

#[test]
fn try_pushdown_sort_reverses_file_groups_only_when_requested_is_reverse()
-> Result<()> {
let file_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));

let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
let file_source = Arc::new(InexactSortPushdownSource::new(table_schema));

let file_groups = vec![FileGroup::new(vec![
PartitionedFile::new("file1", 1),
PartitionedFile::new("file2", 1),
])];

let sort_expr_asc = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
.with_file_groups(file_groups)
.with_output_ordering(vec![
LexOrdering::new(vec![sort_expr_asc.clone()]).unwrap(),
])
.build();

let requested_asc = vec![sort_expr_asc.clone()];
let result = config.try_pushdown_sort(&requested_asc)?;
let SortOrderPushdownResult::Inexact { inner } = result else {
panic!("Expected Inexact result");
};
let pushed_config = inner
.as_any()
.downcast_ref::<FileScanConfig>()
.expect("Expected FileScanConfig");
let pushed_files = pushed_config.file_groups[0].files();
assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file1");
assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file2");

let requested_desc = vec![sort_expr_asc.reverse()];
let result = config.try_pushdown_sort(&requested_desc)?;
let SortOrderPushdownResult::Inexact { inner } = result else {
panic!("Expected Inexact result");
};
let pushed_config = inner
.as_any()
.downcast_ref::<FileScanConfig>()
.expect("Expected FileScanConfig");
let pushed_files = pushed_config.file_groups[0].files();
assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file2");
assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file1");

Ok(())
}
}