Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ pub(crate) use data_validation::{
};
pub(crate) use find_files::*;
pub use table_provider::{
DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder,
DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaScanBuilder, DeltaTableProvider, TableProviderBuilder,
next::{DeltaScanExec, DeltaNextPhysicalCodec},
};
pub(crate) use table_provider::{
DeltaScanBuilder, next::FILE_ID_COLUMN_DEFAULT, update_datafusion_session,
next::FILE_ID_COLUMN_DEFAULT, update_datafusion_session,
};

pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
Expand Down
201 changes: 177 additions & 24 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::any::Any;
use std::borrow::Cow;
use std::collections::HashSet as StdHashSet;
use std::fmt;
use std::sync::Arc;

Expand Down Expand Up @@ -40,9 +41,7 @@ use datafusion::{
prelude::Expr,
scalar::ScalarValue,
};
use datafusion::physical_expr_adapter::{DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory};
use delta_kernel::Version;
use futures::TryStreamExt as _;
use futures::future::BoxFuture;
use itertools::Itertools;
use object_store::ObjectMeta;
Expand Down Expand Up @@ -181,6 +180,8 @@ impl DeltaScanConfigBuilder {
enable_parquet_pushdown: self.enable_parquet_pushdown,
schema: self.schema.clone(),
schema_force_view_types: true,
virtual_columns: None,
requested_columns: None,
})
}
}
Expand All @@ -199,6 +200,13 @@ pub struct DeltaScanConfig {
pub schema_force_view_types: bool,
/// Schema to read as
pub schema: Option<SchemaRef>,
/// Virtual columns derived automatically in [`DeltaScanBuilder::build`] by
/// diffing the caller-supplied schema against the table's physical schema.
/// Not part of the public API — do not set manually.
pub(crate) virtual_columns: Option<StdHashSet<String>>,
/// Requested output columns in order (including virtual columns)
/// Used to build projection that null-fills virtual columns
pub requested_columns: Option<Vec<String>>,
}

impl Default for DeltaScanConfig {
Expand All @@ -216,6 +224,8 @@ impl DeltaScanConfig {
enable_parquet_pushdown: true,
schema_force_view_types: true,
schema: None,
virtual_columns: None,
requested_columns: None,
}
}

Expand All @@ -227,6 +237,8 @@ impl DeltaScanConfig {
enable_parquet_pushdown: config_options.execution.parquet.pushdown_filters,
schema_force_view_types: config_options.execution.parquet.schema_force_view_types,
schema: None,
virtual_columns: None,
requested_columns: None,
}
}

Expand Down Expand Up @@ -256,9 +268,14 @@ impl DeltaScanConfig {
self.schema = Some(schema);
self
}

pub fn with_requested_columns(mut self, columns: Vec<String>) -> Self {
self.requested_columns = Some(columns);
self
}
}

pub(crate) struct DeltaScanBuilder<'a> {
pub struct DeltaScanBuilder<'a> {
snapshot: &'a EagerSnapshot,
log_store: LogStoreRef,
filter: Option<Expr>,
Expand Down Expand Up @@ -334,27 +351,64 @@ impl<'a> DeltaScanBuilder<'a> {
None => self.snapshot.read_schema(),
};

// Auto-derive virtual columns: any field present in the caller-supplied
// schema but absent from the table's physical schema (and not a partition
// column) is treated as virtual and will be null-filled during the scan.
// This means callers only need to pass `with_schema(extended)` — there is
// no need to explicitly mark virtual columns.
let config = if config.virtual_columns.is_none() {
let physical_schema = self.snapshot.read_schema();
let partition_cols = self.snapshot.metadata().partition_columns();
let physical_names: StdHashSet<&str> = physical_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
let virtual_cols: StdHashSet<String> = schema
.fields()
.iter()
.filter(|f| {
!physical_names.contains(f.name().as_str())
&& !partition_cols.contains(f.name())
})
.map(|f| f.name().clone())
.collect();
if virtual_cols.is_empty() {
config
} else {
DeltaScanConfig { virtual_columns: Some(virtual_cols), ..config }
}
} else {
config
};

let logical_schema = df_logical_schema(
self.snapshot,
&config.file_column_name,
Some(schema.clone()),
)?;

let logical_schema = if let Some(used_columns) = self.projection {
let mut fields = Vec::with_capacity(used_columns.len());
for idx in used_columns {
fields.push(logical_schema.field(*idx).to_owned());
}
// partition filters with Exact pushdown were removed from projection by DF optimizer,
// we need to add them back for the predicate pruning to work
if let Some(expr) = &self.filter {
for c in expr.column_refs() {
let idx = logical_schema.index_of(c.name.as_str())?;
if !used_columns.contains(&idx) {
fields.push(logical_schema.field(idx).to_owned());
}
}
}
let logical_schema = if config.virtual_columns.is_some() {
// TODO: can we keep just a patch of the schema?
// When virtual columns are present, keep full logical_schema
// PhysicalExprAdapter will handle the adaptation
// Otherwise, subset logical_schema based on projection indices
logical_schema
} else if let Some(used_columns) = self.projection {
let extra_fields = self.filter
.iter()
.flat_map(|expr| expr.column_refs())
.map(|c| logical_schema.index_of(c.name.as_str()))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.filter(|idx| !used_columns.contains(idx));

let fields = used_columns.iter()
.copied()
.chain(extra_fields)
.map(|idx| logical_schema.field(idx).to_owned())
.collect::<Vec<_>>();

Arc::new(Schema::new(fields))
} else {
logical_schema
Expand Down Expand Up @@ -505,11 +559,51 @@ impl<'a> DeltaScanBuilder<'a> {
schema
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.filter(|f| {
!table_partition_cols.contains(f.name())
&& !config
.virtual_columns
.as_ref()
.map(|vc| vc.contains(f.name()))
.unwrap_or(false)
})
.cloned()
.collect::<Vec<arrow::datatypes::FieldRef>>(),
));

// When virtual columns are present, translate projection to physical schema space
// Filter out virtual column indices since they don't exist in physical files
// If all requested columns are virtual we still need at least one physical column
// to drive row production; ProjectionExec will project everything to nulls.
// Reading a single column is far cheaper than reading all of them (the old `None` fallback).
let file_projection = if let Some(virtual_cols) = &config.virtual_columns {
self.projection.and_then(|proj| {
let physical_indices: Vec<usize> = proj
.iter()
.filter_map(|&idx| {
let field = schema.field(idx);
if virtual_cols.contains(field.name()) {
None
} else {
file_schema.index_of(field.name()).ok()
}
})
.collect();

if physical_indices.is_empty() {
if file_schema.fields().is_empty() {
None
} else {
Some(vec![0usize])
}
} else {
Some(physical_indices)
}
})
} else {
self.projection.map(|v| v.to_vec())
};

let mut table_partition_cols = table_partition_cols
.iter()
.map(|name| schema.field_with_name(name).map(|f| f.to_owned()))
Expand Down Expand Up @@ -626,7 +720,7 @@ impl<'a> DeltaScanBuilder<'a> {

let partition_fields: Vec<Arc<Field>> =
table_partition_cols.into_iter().map(Arc::new).collect();
let table_schema = TableSchema::new(file_schema, partition_fields);
let table_schema = TableSchema::new(file_schema.clone(), partition_fields);

let mut file_source =
ParquetSource::new(table_schema).with_table_parquet_options(parquet_options);
Expand Down Expand Up @@ -654,8 +748,20 @@ impl<'a> DeltaScanBuilder<'a> {
},
)
.with_statistics(stats)
// .with_projection_indices(self.projection.cloned())?
.with_deep_projection(self.projection.cloned(), self.projection_deep.cloned())?
// Remap projection_deep keys from logical (schema) index space to
// file_schema index space, mirroring what file_projection does above.
// Virtual and partition columns are absent from file_schema so they
// are dropped from the map via index_of returning Err.
.with_deep_projection(file_projection, self.projection_deep.map(|deep| {
deep.iter()
.filter_map(|(schema_idx, subfields)| {
file_schema
.index_of(schema.field(*schema_idx).name())
.ok()
.map(|file_idx| (file_idx, subfields.clone()))
})
.collect::<std::collections::HashMap<usize, Vec<String>>>()
}))?
.with_limit(self.limit)
// @Hstack fixme
.with_expr_adapter(build_expr_adapter_factory())
Expand All @@ -669,9 +775,34 @@ impl<'a> DeltaScanBuilder<'a> {
.global_counter("files_pruned")
.add(files_pruned);

let mut parquet_scan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(file_scan_config);

// When virtual columns + requested columns are present, wrap with ProjectionExec
// to project physical output to requested columns (null-filling virtuals)
if let (Some(requested_cols), Some(virtual_cols)) = (&config.requested_columns, &config.virtual_columns) {
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_expr::expressions::{Column as PhysicalColumn, Literal};

let output_schema = parquet_scan.schema();
let mut projection_exprs = Vec::new();

for col_name in requested_cols {
let field = logical_schema.field_with_name(col_name)?;
let expr: Arc<dyn PhysicalExpr> = if virtual_cols.contains(col_name) {
Arc::new(Literal::new(ScalarValue::try_from(field.data_type())?))
} else {
let idx = output_schema.index_of(col_name)?;
Arc::new(PhysicalColumn::new(col_name, idx))
};
projection_exprs.push((expr, col_name.clone()));
}

parquet_scan = Arc::new(ProjectionExec::try_new(projection_exprs, parquet_scan)?);
}

Ok(DeltaScan {
table_url: self.log_store.root_url().clone(),
parquet_scan: DataSourceExec::from_data_source(file_scan_config),
parquet_scan,
config,
logical_schema,
metrics,
Expand Down Expand Up @@ -1085,7 +1216,13 @@ impl ExecutionPlan for DeltaScan {
}

fn schema(&self) -> SchemaRef {
self.parquet_scan.schema()
if self.config.requested_columns.is_some() {
self.parquet_scan.schema()
} else if self.config.virtual_columns.is_some() {
self.logical_schema.clone()
} else {
self.parquet_scan.schema()
}
}

fn properties(&self) -> &PlanProperties {
Expand Down Expand Up @@ -1422,6 +1559,22 @@ mod tests {
assert!(result_plan.metrics().is_some());
}

#[test]
fn test_virtual_columns_not_set_by_default() {
// virtual_columns is derived at scan-build time; the config itself starts empty
let config = DeltaScanConfig::new();
assert!(config.virtual_columns.is_none());
assert!(config.requested_columns.is_none());
}

#[test]
fn test_virtual_columns_config_stores_requested_columns() {
let requested = vec!["id".to_string(), "virtual_col".to_string(), "value".to_string()];
let config = DeltaScanConfig::new().with_requested_columns(requested.clone());
assert_eq!(config.requested_columns, Some(requested));
assert!(config.virtual_columns.is_none());
}

#[test]
fn test_partitioned_file_from_action() {
let mut partition_values = std::collections::HashMap::new();
Expand Down
Loading
Loading