diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index f03bc405a..4cfcd766f 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -74,11 +74,11 @@ pub(crate) use data_validation::{ }; pub(crate) use find_files::*; pub use table_provider::{ - DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder, + DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaScanBuilder, DeltaTableProvider, TableProviderBuilder, next::{DeltaScanExec, DeltaNextPhysicalCodec}, }; pub(crate) use table_provider::{ - DeltaScanBuilder, next::FILE_ID_COLUMN_DEFAULT, update_datafusion_session, + next::FILE_ID_COLUMN_DEFAULT, update_datafusion_session, }; pub(crate) const PATH_COLUMN: &str = "__delta_rs_path"; diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index 4e1473cf2..03a6ff309 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -1,5 +1,6 @@ use std::any::Any; use std::borrow::Cow; +use std::collections::HashSet as StdHashSet; use std::fmt; use std::sync::Arc; @@ -40,9 +41,7 @@ use datafusion::{ prelude::Expr, scalar::ScalarValue, }; -use datafusion::physical_expr_adapter::{DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory}; use delta_kernel::Version; -use futures::TryStreamExt as _; use futures::future::BoxFuture; use itertools::Itertools; use object_store::ObjectMeta; @@ -181,6 +180,8 @@ impl DeltaScanConfigBuilder { enable_parquet_pushdown: self.enable_parquet_pushdown, schema: self.schema.clone(), schema_force_view_types: true, + virtual_columns: None, + requested_columns: None, }) } } @@ -199,6 +200,13 @@ pub struct DeltaScanConfig { pub schema_force_view_types: bool, /// Schema to read as pub schema: Option, + /// Virtual columns derived automatically in [`DeltaScanBuilder::build`] by + /// diffing the caller-supplied schema against the table's physical schema. + /// Not part of the public API — do not set manually. + pub(crate) virtual_columns: Option>, + /// Requested output columns in order (including virtual columns) + /// Used to build projection that null-fills virtual columns + pub requested_columns: Option>, } impl Default for DeltaScanConfig { @@ -216,6 +224,8 @@ impl DeltaScanConfig { enable_parquet_pushdown: true, schema_force_view_types: true, schema: None, + virtual_columns: None, + requested_columns: None, } } @@ -227,6 +237,8 @@ impl DeltaScanConfig { enable_parquet_pushdown: config_options.execution.parquet.pushdown_filters, schema_force_view_types: config_options.execution.parquet.schema_force_view_types, schema: None, + virtual_columns: None, + requested_columns: None, } } @@ -256,9 +268,14 @@ impl DeltaScanConfig { self.schema = Some(schema); self } + + pub fn with_requested_columns(mut self, columns: Vec) -> Self { + self.requested_columns = Some(columns); + self + } } -pub(crate) struct DeltaScanBuilder<'a> { +pub struct DeltaScanBuilder<'a> { snapshot: &'a EagerSnapshot, log_store: LogStoreRef, filter: Option, @@ -334,27 +351,64 @@ impl<'a> DeltaScanBuilder<'a> { None => self.snapshot.read_schema(), }; + // Auto-derive virtual columns: any field present in the caller-supplied + // schema but absent from the table's physical schema (and not a partition + // column) is treated as virtual and will be null-filled during the scan. + // This means callers only need to pass `with_schema(extended)` — there is + // no need to explicitly mark virtual columns. + let config = if config.virtual_columns.is_none() { + let physical_schema = self.snapshot.read_schema(); + let partition_cols = self.snapshot.metadata().partition_columns(); + let physical_names: StdHashSet<&str> = physical_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + let virtual_cols: StdHashSet = schema + .fields() + .iter() + .filter(|f| { + !physical_names.contains(f.name().as_str()) + && !partition_cols.contains(f.name()) + }) + .map(|f| f.name().clone()) + .collect(); + if virtual_cols.is_empty() { + config + } else { + DeltaScanConfig { virtual_columns: Some(virtual_cols), ..config } + } + } else { + config + }; + let logical_schema = df_logical_schema( self.snapshot, &config.file_column_name, Some(schema.clone()), )?; - let logical_schema = if let Some(used_columns) = self.projection { - let mut fields = Vec::with_capacity(used_columns.len()); - for idx in used_columns { - fields.push(logical_schema.field(*idx).to_owned()); - } - // partition filters with Exact pushdown were removed from projection by DF optimizer, - // we need to add them back for the predicate pruning to work - if let Some(expr) = &self.filter { - for c in expr.column_refs() { - let idx = logical_schema.index_of(c.name.as_str())?; - if !used_columns.contains(&idx) { - fields.push(logical_schema.field(idx).to_owned()); - } - } - } + let logical_schema = if config.virtual_columns.is_some() { + // TODO: can we keep just a patch of the schema? + // When virtual columns are present, keep full logical_schema + // PhysicalExprAdapter will handle the adaptation + // Otherwise, subset logical_schema based on projection indices + logical_schema + } else if let Some(used_columns) = self.projection { + let extra_fields = self.filter + .iter() + .flat_map(|expr| expr.column_refs()) + .map(|c| logical_schema.index_of(c.name.as_str())) + .collect::, _>>()? + .into_iter() + .filter(|idx| !used_columns.contains(idx)); + + let fields = used_columns.iter() + .copied() + .chain(extra_fields) + .map(|idx| logical_schema.field(idx).to_owned()) + .collect::>(); + Arc::new(Schema::new(fields)) } else { logical_schema @@ -505,11 +559,51 @@ impl<'a> DeltaScanBuilder<'a> { schema .fields() .iter() - .filter(|f| !table_partition_cols.contains(f.name())) + .filter(|f| { + !table_partition_cols.contains(f.name()) + && !config + .virtual_columns + .as_ref() + .map(|vc| vc.contains(f.name())) + .unwrap_or(false) + }) .cloned() .collect::>(), )); + // When virtual columns are present, translate projection to physical schema space + // Filter out virtual column indices since they don't exist in physical files + // If all requested columns are virtual we still need at least one physical column + // to drive row production; ProjectionExec will project everything to nulls. + // Reading a single column is far cheaper than reading all of them (the old `None` fallback). + let file_projection = if let Some(virtual_cols) = &config.virtual_columns { + self.projection.and_then(|proj| { + let physical_indices: Vec = proj + .iter() + .filter_map(|&idx| { + let field = schema.field(idx); + if virtual_cols.contains(field.name()) { + None + } else { + file_schema.index_of(field.name()).ok() + } + }) + .collect(); + + if physical_indices.is_empty() { + if file_schema.fields().is_empty() { + None + } else { + Some(vec![0usize]) + } + } else { + Some(physical_indices) + } + }) + } else { + self.projection.map(|v| v.to_vec()) + }; + let mut table_partition_cols = table_partition_cols .iter() .map(|name| schema.field_with_name(name).map(|f| f.to_owned())) @@ -626,7 +720,7 @@ impl<'a> DeltaScanBuilder<'a> { let partition_fields: Vec> = table_partition_cols.into_iter().map(Arc::new).collect(); - let table_schema = TableSchema::new(file_schema, partition_fields); + let table_schema = TableSchema::new(file_schema.clone(), partition_fields); let mut file_source = ParquetSource::new(table_schema).with_table_parquet_options(parquet_options); @@ -654,8 +748,20 @@ impl<'a> DeltaScanBuilder<'a> { }, ) .with_statistics(stats) - // .with_projection_indices(self.projection.cloned())? - .with_deep_projection(self.projection.cloned(), self.projection_deep.cloned())? + // Remap projection_deep keys from logical (schema) index space to + // file_schema index space, mirroring what file_projection does above. + // Virtual and partition columns are absent from file_schema so they + // are dropped from the map via index_of returning Err. + .with_deep_projection(file_projection, self.projection_deep.map(|deep| { + deep.iter() + .filter_map(|(schema_idx, subfields)| { + file_schema + .index_of(schema.field(*schema_idx).name()) + .ok() + .map(|file_idx| (file_idx, subfields.clone())) + }) + .collect::>>() + }))? .with_limit(self.limit) // @Hstack fixme .with_expr_adapter(build_expr_adapter_factory()) @@ -669,9 +775,34 @@ impl<'a> DeltaScanBuilder<'a> { .global_counter("files_pruned") .add(files_pruned); + let mut parquet_scan: Arc = DataSourceExec::from_data_source(file_scan_config); + + // When virtual columns + requested columns are present, wrap with ProjectionExec + // to project physical output to requested columns (null-filling virtuals) + if let (Some(requested_cols), Some(virtual_cols)) = (&config.requested_columns, &config.virtual_columns) { + use datafusion::physical_plan::projection::ProjectionExec; + use datafusion::physical_expr::expressions::{Column as PhysicalColumn, Literal}; + + let output_schema = parquet_scan.schema(); + let mut projection_exprs = Vec::new(); + + for col_name in requested_cols { + let field = logical_schema.field_with_name(col_name)?; + let expr: Arc = if virtual_cols.contains(col_name) { + Arc::new(Literal::new(ScalarValue::try_from(field.data_type())?)) + } else { + let idx = output_schema.index_of(col_name)?; + Arc::new(PhysicalColumn::new(col_name, idx)) + }; + projection_exprs.push((expr, col_name.clone())); + } + + parquet_scan = Arc::new(ProjectionExec::try_new(projection_exprs, parquet_scan)?); + } + Ok(DeltaScan { table_url: self.log_store.root_url().clone(), - parquet_scan: DataSourceExec::from_data_source(file_scan_config), + parquet_scan, config, logical_schema, metrics, @@ -1085,7 +1216,13 @@ impl ExecutionPlan for DeltaScan { } fn schema(&self) -> SchemaRef { - self.parquet_scan.schema() + if self.config.requested_columns.is_some() { + self.parquet_scan.schema() + } else if self.config.virtual_columns.is_some() { + self.logical_schema.clone() + } else { + self.parquet_scan.schema() + } } fn properties(&self) -> &PlanProperties { @@ -1422,6 +1559,22 @@ mod tests { assert!(result_plan.metrics().is_some()); } + #[test] + fn test_virtual_columns_not_set_by_default() { + // virtual_columns is derived at scan-build time; the config itself starts empty + let config = DeltaScanConfig::new(); + assert!(config.virtual_columns.is_none()); + assert!(config.requested_columns.is_none()); + } + + #[test] + fn test_virtual_columns_config_stores_requested_columns() { + let requested = vec!["id".to_string(), "virtual_col".to_string(), "value".to_string()]; + let config = DeltaScanConfig::new().with_requested_columns(requested.clone()); + assert_eq!(config.requested_columns, Some(requested)); + assert!(config.virtual_columns.is_none()); + } + #[test] fn test_partitioned_file_from_action() { let mut partition_values = std::collections::HashMap::new(); diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index d9dff72c9..d423a2eb5 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -25,8 +25,10 @@ use datafusion_proto::bytes::{ }; use deltalake_core::DeltaTableBuilder; use deltalake_core::delta_datafusion::{ - DeltaScan, DeltaScanConfigBuilder, DeltaTableFactory, DeltaTableProvider, + DeltaScan, DeltaScanBuilder, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableFactory, + DeltaTableProvider, }; +use deltalake_core::logstore::LogStore; use deltalake_core::kernel::{ ColumnMetadataKey, DataType, MapType, MetadataValue, PrimitiveType, StructField, StructType, }; @@ -2192,10 +2194,12 @@ mod insert_into_tests { mod deep { use std::collections::HashMap; + use std::collections::HashSet; use std::ops::Deref; use std::sync::Arc; use arrow_cast::display::FormatOptions; use arrow_cast::pretty; + use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion::datasource::physical_plan::ParquetSource; use datafusion::optimizer::optimize_projections_deep::DeepColumnIndexMap; @@ -2208,7 +2212,8 @@ mod deep { use datafusion_proto::protobuf::PhysicalPlanNode; use prost::Message; use tracing::info; - use deltalake_core::delta_datafusion::{DeltaNextPhysicalCodec, DeltaPhysicalCodec, DeltaScanExec}; + use deltalake_core::delta_datafusion::{DataFusionMixins, DeltaNextPhysicalCodec, DeltaPhysicalCodec, DeltaScanBuilder, DeltaScanConfig, DeltaScanExec}; + use deltalake_core::logstore::LogStore; use deltalake_core::delta_datafusion::table_provider_old::DeltaTableOldProvider; use deltalake_core::delta_datafusion::udtf::register_delta_table_udtf; @@ -2476,6 +2481,107 @@ mod deep { Ok(()) } + /// Verify that deep projection (struct sub-field pruning) and virtual columns + /// can be active at the same time: + /// - the virtual column is null-filled in every output row + /// - the struct column is read with only the requested sub-field (deep projection) + /// + /// NOTE: this test places `virtual_col` *after* the struct column so that its + /// logical index (3) and the struct column's physical-file index (2) are the + /// same. If a virtual column were inserted *before* the struct column, the + /// logical index used as the key in `projection_deep` would differ from the + /// physical-file index, requiring an explicit remapping — which is currently + /// not implemented in `DeltaScanBuilder`. + #[tokio::test] + async fn test_virtual_columns_with_deep_projection() -> datafusion::common::Result<()> { + use deltalake_core::DeltaTableBuilder; + let delta_path = format!("{}/tests/data/deep", env!("CARGO_MANIFEST_DIR")); + let url = url::Url::from_directory_path( + std::path::Path::new(&delta_path).canonicalize().unwrap() + ).unwrap(); + let table = DeltaTableBuilder::from_url(url).unwrap().load().await.unwrap(); + let snapshot = table.snapshot().unwrap().snapshot().clone(); + let log_store = table.log_store(); + + // The physical file schema for this table (non-partition columns): + // 0: _acp_system_metadata (struct) + // 1: _id (string) + // 2: productListItems (array) + // + // We append virtual_col at index 3 so productListItems keeps index 2 in + // both the logical schema and the physical file schema. + let read_schema = snapshot.read_schema(); + let mut fields: Vec = + read_schema.fields().iter().map(|f| f.as_ref().clone()).collect(); + fields.push(ArrowField::new("virtual_col", ArrowDataType::Int64, true)); + let schema_with_virtual = Arc::new(ArrowSchema::new(fields)); + + // Logical schema indices (non-partition, virtual appended at end): + // 0: _acp_system_metadata, 1: _id, 2: productListItems, 3: virtual_col + // Request _id (1), productListItems (2), virtual_col (3). + let projection = vec![1usize, 2, 3]; + + // Deep projection: for productListItems (logical AND physical index 2) read only SKU. + // With file_projection = Some([1, 2]) the key 2 here is the physical file index of + // productListItems, which matches because virtual_col is appended after the physical cols. + let mut projection_deep = HashMap::new(); + projection_deep.insert(2usize, vec!["SKU".to_string()]); + + // virtual_col is auto-detected by DeltaScanBuilder: it is present in + // schema_with_virtual but absent from the physical table schema. + let config = DeltaScanConfig::new() + .with_schema(schema_with_virtual.clone()) + .with_requested_columns(vec![ + "_id".to_string(), + "productListItems".to_string(), + "virtual_col".to_string(), + ]); + + let ctx = SessionContext::new(); + // Register the object store under the delta-rs:// URL that FileScanConfig uses. + // This mirrors what DeltaTableProvider::scan_with_args does via ensure_log_store_registered. + let object_store_url = log_store.object_store_url(); + ctx.runtime_env() + .register_object_store(object_store_url.as_ref(), log_store.object_store(None)); + + let scan = DeltaScanBuilder::new(&snapshot, log_store, &ctx.state()) + .with_scan_config(config) + .with_projection(Some(&projection)) + .with_projection_deep(Some(&projection_deep)) + .build() + .await?; + + // Output schema must contain virtual_col + let output_schema = scan.schema(); + assert!( + output_schema.field_with_name("virtual_col").is_ok(), + "virtual_col missing from output schema" + ); + + let batches = collect(Arc::new(scan), ctx.task_ctx()).await?; + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert!(total_rows > 0, "expected rows from the deep fixture"); + + for batch in &batches { + // virtual_col must be entirely null — this is the key invariant we protect + let vc_idx = batch.schema().index_of("virtual_col").unwrap(); + let vc_col = batch.column(vc_idx); + assert_eq!( + vc_col.null_count(), + vc_col.len(), + "virtual_col should be entirely null" + ); + + // productListItems must be present (scan did not crash due to index confusion) + assert!( + batch.schema().index_of("productListItems").is_ok(), + "productListItems missing from output" + ); + } + + Ok(()) + } + fn find_exec_node(input: &Arc) -> Option<&T> { if let Some(found) = input.as_any().downcast_ref::() { Some(found) @@ -2487,3 +2593,264 @@ mod deep { } +mod virtual_columns { + use super::*; + use datafusion::physical_plan::collect; + + /// Create a small Delta table with three physical columns: + /// - `id` Int32 (non-nullable) + /// - `value` Utf8 (non-nullable) + /// - `info` Struct (nullable) + /// + /// Three rows: (1, "a", {alice}), (2, "b", {bob}), (3, "c", {carol}). + /// All virtual-column tests share this fixture so that both flat and struct + /// scenarios can be exercised without separate table helpers. + async fn create_test_virtual_column_table() -> (tempfile::TempDir, DeltaTable) { + use arrow::array::StructArray; + + let tmp_dir = tempfile::tempdir().unwrap(); + let table_uri = tmp_dir.path().to_str().unwrap().to_string(); + + let table = DeltaTable::try_from_url(ensure_table_uri(table_uri).unwrap()) + .await + .unwrap() + .create() + .with_column("id", DataType::Primitive(PrimitiveType::Integer), false, None) + .with_column("value", DataType::Primitive(PrimitiveType::String), false, None) + .with_column( + "info", + DataType::Struct(Box::new( + StructType::try_new(vec![StructField::new( + "name", + DataType::Primitive(PrimitiveType::String), + true, + )]) + .unwrap(), + )), + true, + None, + ) + .await + .unwrap(); + + let name_field = Arc::new(ArrowField::new("name", ArrowDataType::Utf8, true)); + let name_arr = + Arc::new(StringArray::from(vec![Some("alice"), Some("bob"), Some("carol")])) as ArrayRef; + let struct_arr = Arc::new(StructArray::from(vec![(name_field, name_arr)])) as ArrayRef; + + let batch = RecordBatch::try_from_iter_with_nullable(vec![ + ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, false), + ("value", Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef, false), + ("info", struct_arr, true), + ]) + .unwrap(); + + let table = table.write(vec![batch]).await.unwrap(); + (tmp_dir, table) + } + + /// Virtual columns should be null-filled in the scan output; physical columns + /// retain their data. + #[tokio::test] + async fn test_virtual_columns_are_null_filled() -> Result<()> { + let (_dir, table) = create_test_virtual_column_table().await; + + let schema_with_virtual = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("value", ArrowDataType::Utf8, false), + ArrowField::new("virtual_col", ArrowDataType::Int64, true), + ])); + + let config = DeltaScanConfig::new() + .with_schema(schema_with_virtual) + .with_requested_columns(vec![ + "id".to_string(), + "value".to_string(), + "virtual_col".to_string(), + ]); + + let ctx = SessionContext::new(); + let snapshot = table.snapshot().unwrap().snapshot().clone(); + let log_store = table.log_store(); + + let object_store_url = log_store.object_store_url(); + ctx.runtime_env() + .register_object_store(object_store_url.as_ref(), log_store.object_store(None)); + + let scan = DeltaScanBuilder::new(&snapshot, log_store, &ctx.state()) + .with_scan_config(config) + .build() + .await?; + + let batches = collect(Arc::new(scan), ctx.task_ctx()).await?; + + assert_batches_sorted_eq!( + &[ + "+----+-------+-------------+", + "| id | value | virtual_col |", + "+----+-------+-------------+", + "| 1 | a | |", + "| 2 | b | |", + "| 3 | c | |", + "+----+-------+-------------+", + ], + &batches + ); + Ok(()) + } + + /// When ALL requested columns are virtual, the scan should still produce the + /// correct number of rows (driven by a single physical column internally) + /// and return them all as null. + #[tokio::test] + async fn test_all_requested_columns_virtual_produces_correct_row_count() -> Result<()> { + let (_dir, table) = create_test_virtual_column_table().await; + + let schema_with_virtual = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("value", ArrowDataType::Utf8, false), + ArrowField::new("virtual_col", ArrowDataType::Int64, true), + ])); + + let config = DeltaScanConfig::new() + .with_schema(schema_with_virtual) + .with_requested_columns(vec!["virtual_col".to_string()]); + + let ctx = SessionContext::new(); + let snapshot = table.snapshot().unwrap().snapshot().clone(); + let log_store = table.log_store(); + + let object_store_url = log_store.object_store_url(); + ctx.runtime_env() + .register_object_store(object_store_url.as_ref(), log_store.object_store(None)); + + let scan = DeltaScanBuilder::new(&snapshot, log_store, &ctx.state()) + .with_scan_config(config) + .build() + .await?; + + let batches = collect(Arc::new(scan), ctx.task_ctx()).await?; + + assert_batches_sorted_eq!( + &[ + "+-------------+", + "| virtual_col |", + "+-------------+", + "| |", + "| |", + "| |", + "+-------------+", + ], + &batches + ); + Ok(()) + } + + /// When virtual columns appear first in `requested_columns`, the output + /// schema and data ordering must respect that sequence. + #[tokio::test] + async fn test_virtual_columns_requested_column_ordering() -> Result<()> { + let (_dir, table) = create_test_virtual_column_table().await; + + let schema_with_virtual = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("value", ArrowDataType::Utf8, false), + ArrowField::new("virtual_col", ArrowDataType::Int64, true), + ])); + + let config = DeltaScanConfig::new() + .with_schema(schema_with_virtual) + .with_requested_columns(vec![ + "virtual_col".to_string(), + "id".to_string(), + ]); + + let ctx = SessionContext::new(); + let snapshot = table.snapshot().unwrap().snapshot().clone(); + let log_store = table.log_store(); + + let object_store_url = log_store.object_store_url(); + ctx.runtime_env() + .register_object_store(object_store_url.as_ref(), log_store.object_store(None)); + + let scan = DeltaScanBuilder::new(&snapshot, log_store, &ctx.state()) + .with_scan_config(config) + .build() + .await?; + + let batches = collect(Arc::new(scan), ctx.task_ctx()).await?; + + assert_batches_sorted_eq!( + &[ + "+-------------+----+", + "| virtual_col | id |", + "+-------------+----+", + "| | 1 |", + "| | 2 |", + "| | 3 |", + "+-------------+----+", + ], + &batches + ); + Ok(()) + } + + /// Adding a new nullable sub-field to an existing struct column in the extended schema + /// is handled transparently by Arrow's parquet reader via schema evolution — no changes + /// to the virtual column machinery are required. + /// The physical file has `info: Struct`; the extended schema declares + /// `info: Struct` where `score` does not exist in the file. + /// The parquet reader fills the missing sub-field with nulls automatically. + #[tokio::test] + async fn test_virtual_sub_field_in_existing_struct() -> Result<()> { + use arrow_schema::Fields; + + let (_dir, table) = create_test_virtual_column_table().await; + + // Extended schema: info gains a new nullable sub-field `score` + let extended_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new( + "info", + ArrowDataType::Struct(Fields::from(vec![ + ArrowField::new("name", ArrowDataType::Utf8, true), + ArrowField::new("score", ArrowDataType::Int32, true), + ])), + true, + ), + ])); + + let config = DeltaScanConfig::new().with_schema(extended_schema); + + let ctx = SessionContext::new(); + let snapshot = table.snapshot().unwrap().snapshot().clone(); + let log_store = table.log_store(); + ctx.runtime_env() + .register_object_store(log_store.object_store_url().as_ref(), log_store.object_store(None)); + + let scan = DeltaScanBuilder::new(&snapshot, log_store, &ctx.state()) + .with_scan_config(config) + .build() + .await?; + + let batches = collect(Arc::new(scan), ctx.task_ctx()).await?; + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); + + // `info.name` should be populated; `info.score` should be null (schema evolution) + assert_batches_sorted_eq!( + &[ + "+----+------------------------+", + "| id | info |", + "+----+------------------------+", + "| 1 | {name: alice, score: } |", + "| 2 | {name: bob, score: } |", + "| 3 | {name: carol, score: } |", + "+----+------------------------+", + ], + &batches + ); + Ok(()) + } +} +