Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions native/core/src/parquet/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::datasource::physical_plan::{
};
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::physical_expr::expressions::{BinaryExpr, Column};
use datafusion::physical_expr::expressions::Column;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion::prelude::SessionContext;
Expand Down Expand Up @@ -130,22 +130,6 @@ pub(crate) fn init_datasource_exec(
let mut parquet_source =
ParquetSource::new(table_schema).with_table_parquet_options(table_parquet_options);

// Create a conjunctive form of the vector because ParquetExecBuilder takes
// a single expression
if let Some(data_filters) = data_filters {
let cnf_data_filters = data_filters.clone().into_iter().reduce(|left, right| {
Arc::new(BinaryExpr::new(
left,
datafusion::logical_expr::Operator::And,
right,
))
});

if let Some(filter) = cnf_data_filters {
parquet_source = parquet_source.with_predicate(filter);
}
}

if encryption_enabled {
parquet_source = parquet_source.with_encryption_factory(
session_ctx
Expand All @@ -159,12 +143,29 @@ pub(crate) fn init_datasource_exec(
parquet_source = parquet_source
.with_parquet_file_reader_factory(Arc::new(CachingParquetReaderFactory::new(store)));

// Route data filters through `try_pushdown_filters` rather than calling
// `with_predicate` directly. This is the contract DataFusion's optimizer
// uses, and it correctly classifies any filter that ParquetSource cannot
// evaluate as a row filter (e.g., references to virtual columns like
// Parquet `row_number`) as `PushedDown::No`. Spark's Filter operator above
// the scan re-evaluates all dataFilters anyway, so the No-classified ones
// remain correct without us needing to add a FilterExec here.
let file_source: Arc<dyn FileSource> = match data_filters {
Some(filters) if !filters.is_empty() => {
let state = session_ctx.state();
let propagation =
parquet_source.try_pushdown_filters(filters, state.config_options())?;
propagation
.updated_node
.unwrap_or_else(|| Arc::new(parquet_source))
}
_ => Arc::new(parquet_source),
};

let expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory> = Arc::new(
SparkPhysicalExprAdapterFactory::new(spark_parquet_options, default_values),
);

let file_source: Arc<dyn FileSource> = Arc::new(parquet_source);

let file_groups = file_groups
.iter()
.map(|files| FileGroup::new(files.clone()))
Expand Down
Loading