diff --git a/Cargo.toml b/Cargo.toml index c188e5e3a..8a0f89284 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ datafusion-proto = { version = "52.1.0" } serde = { version = "1.0.194", features = ["derive"] } serde_json = "1" strum = { version = "0.27" } +prost = { version = "0.14.3" } # "stdlib" bytes = { version = "1" } @@ -155,3 +156,39 @@ inherits = "release" opt-level = 3 codegen-units = 1 lto = "thin" + +[patch.crates-io] +datafusion = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-catalog = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-catalog-listing = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-common = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-common-runtime = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-datasource = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-datasource-avro = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-datasource-csv = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-datasource-json = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-datasource-parquet = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-execution = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-expr = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-expr-common = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-ffi = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-functions = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-functions-aggregate = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-functions-aggregate-common = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-functions-nested = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-functions-table = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-functions-window = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-functions-window-common = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-optimizer = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-physical-expr = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-physical-expr-adapter = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-physical-expr-common = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-physical-optimizer = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-pruning = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-physical-plan = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-proto = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-proto-common = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-session = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-spark = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-sql = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } +datafusion-substrait = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 6f8c42bd0..79659a4c6 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -43,6 +43,7 @@ datafusion-proto = { workspace = true, optional = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } strum = { workspace = true } +prost = { workspace = true } # "stdlib" bytes = { workspace = true } @@ -87,6 +88,7 @@ rand = "0.8" sqlparser = { version = "0.59.0" } humantime = { version = "2.1.0", optional = true } validator = { version = "0.19", features = ["derive"] } +ctor = "0.6" [dev-dependencies] criterion = "0.5" diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index ddeb37e8e..edced766a 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -26,12 +26,19 @@ use std::fmt::Debug; use std::sync::Arc; +use crate::delta_datafusion::expr::parse_predicate_expression; +use crate::delta_datafusion::table_provider::DeltaScanWire; +use crate::ensure_table_uri; +use crate::errors::{DeltaResult, DeltaTableError}; +use crate::kernel::{Add, EagerSnapshot, LogDataHandler, Snapshot}; +use crate::table::state::DeltaTableState; +use crate::{open_table, open_table_with_storage_options}; use arrow::array::types::UInt16Type; use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray}; use arrow_cast::{CastOptions, cast_with_options}; use arrow_schema::{ - DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef, - TimeUnit, + DataType as ArrowDataType, DataType, Field, Schema as ArrowSchema, SchemaRef, + SchemaRef as ArrowSchemaRef, TimeUnit, }; use datafusion::catalog::{Session, TableProviderFactory}; use datafusion::common::scalar::ScalarValue; @@ -51,14 +58,7 @@ use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; use either::Either; - -use crate::delta_datafusion::expr::parse_predicate_expression; -use crate::delta_datafusion::table_provider::DeltaScanWire; -use crate::ensure_table_uri; -use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Add, EagerSnapshot, LogDataHandler, Snapshot}; -use crate::table::state::DeltaTableState; -use crate::{open_table, open_table_with_storage_options}; +use tracing::info; pub(crate) use self::session::DeltaSessionExt; pub use self::session::{ @@ -74,7 +74,7 @@ pub(crate) use data_validation::{ pub(crate) use find_files::*; pub use table_provider::{ DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder, - next::DeltaScanExec, + next::{DeltaNextPhysicalCodec, DeltaScanExec}, }; pub(crate) use table_provider::{ DeltaScanBuilder, next::FILE_ID_COLUMN_DEFAULT, update_datafusion_session, @@ -92,9 +92,14 @@ pub mod logical; pub mod physical; pub mod planner; mod session; +use crate::delta_datafusion::schema_null::rewrite_schema_with_nullable_fields; pub use session::SessionFallbackPolicy; pub(crate) use session::{SessionResolveContext, resolve_session_state}; + +mod schema_null; mod table_provider; +pub mod table_provider_old; +pub mod udtf; pub(crate) mod utils; impl From for DataFusionError { @@ -226,7 +231,7 @@ fn _arrow_schema( partition_columns: &[String], wrap_partitions: bool, ) -> ArrowSchemaRef { - let fields = schema + let mut fields = schema .fields() .into_iter() .filter(|f| !partition_columns.contains(&f.name().to_string())) @@ -255,7 +260,11 @@ fn _arrow_schema( }), ) .collect::>(); - Arc::new(ArrowSchema::new(fields)) + + let mut schema = Arc::new(ArrowSchema::new(fields)); + // @Hstack - add the option to have an env var that can nullify fields in the delta schema + schema = rewrite_schema_with_nullable_fields(schema); + schema } pub(crate) fn files_matching_predicate<'a>( @@ -1383,9 +1392,10 @@ mod tests { assert_eq!("a", small.iter().next().unwrap().unwrap()); let expected = vec![ - ObjectStoreOperation::Get(LocationType::Commit), ObjectStoreOperation::GetRange(LocationType::Data, 957..965), ObjectStoreOperation::GetRange(LocationType::Data, 326..957), + #[expect(clippy::single_range_in_vec_init)] + ObjectStoreOperation::GetRanges(LocationType::Data, vec![4..46]), ]; let mut actual = Vec::new(); operations.recv_many(&mut actual, 3).await; diff --git a/crates/core/src/delta_datafusion/schema_null.rs b/crates/core/src/delta_datafusion/schema_null.rs new file mode 100644 index 000000000..81b3b720a --- /dev/null +++ b/crates/core/src/delta_datafusion/schema_null.rs @@ -0,0 +1,383 @@ +use arrow_schema::DataType::{ + Dictionary, FixedSizeList, LargeList, LargeListView, List, ListView, Map, RunEndEncoded, + Struct, Union, +}; +use arrow_schema::{DataType, Field, FieldRef, Fields, SchemaRef, UnionFields}; +use std::sync::{Arc, Mutex}; + +static DELTA_FIELD_PATHS_TO_MAKE_NULLABLE: Mutex> = Mutex::new(Vec::new()); + +#[ctor::ctor] +fn init() { + if let Ok(var) = std::env::var("DELTA_FIELD_PATHS_TO_MAKE_NULLABLE") { + let splits = var.split(",").map(|s| s.to_string()).collect::>(); + *DELTA_FIELD_PATHS_TO_MAKE_NULLABLE.lock().unwrap() = splits; + } +} + +pub fn rewrite_schema_with_nullable_fields(input: SchemaRef) -> SchemaRef { + let paths = DELTA_FIELD_PATHS_TO_MAKE_NULLABLE.lock().unwrap(); + rewrite_schema_with_nullable_fields_inner(input, &paths) +} + +fn rewrite_schema_with_nullable_fields_inner(input: SchemaRef, paths: &[String]) -> SchemaRef { + if paths.is_empty() { + return input; + } + + let new_fields = rewrite_fields(input.fields(), paths, ""); + Arc::new(arrow_schema::Schema::new_with_metadata( + new_fields, + input.metadata().clone(), + )) +} + +fn rewrite_fields(fields: &Fields, paths: &[String], parent_path: &str) -> Vec { + fields + .iter() + .map(|field| { + let current_path = if parent_path.is_empty() { + field.name().to_string() + } else { + format!("{}.{}", parent_path, field.name()) + }; + + let make_nullable = !field.is_nullable() && paths.iter().any(|p| *p == current_path); + let new_field = rewrite_field_type(field, paths, ¤t_path); + + if make_nullable { + Arc::new(new_field.with_nullable(true)) + } else { + Arc::new(new_field) + } + }) + .collect() +} + +fn rewrite_field_type(field: &Field, paths: &[String], current_path: &str) -> Field { + let new_dt = rewrite_data_type(field.data_type(), paths, current_path); + Field::new(field.name(), new_dt, field.is_nullable()).with_metadata(field.metadata().clone()) +} + +fn rewrite_data_type(dt: &DataType, paths: &[String], current_path: &str) -> DataType { + match dt { + Struct(fields) => Struct(Fields::from(rewrite_fields(fields, paths, current_path))), + List(inner) => List(Arc::new(rewrite_field_type(inner, paths, current_path))), + LargeList(inner) => LargeList(Arc::new(rewrite_field_type(inner, paths, current_path))), + FixedSizeList(inner, n) => { + FixedSizeList(Arc::new(rewrite_field_type(inner, paths, current_path)), *n) + } + ListView(inner) => ListView(Arc::new(rewrite_field_type(inner, paths, current_path))), + LargeListView(inner) => { + LargeListView(Arc::new(rewrite_field_type(inner, paths, current_path))) + } + Map(inner, sorted) => Map( + Arc::new(rewrite_field_type(inner, paths, current_path)), + *sorted, + ), + Dictionary(key, value) => Dictionary( + key.clone(), + Box::new(rewrite_data_type(value, paths, current_path)), + ), + RunEndEncoded(run_ends, values) => RunEndEncoded( + Arc::clone(run_ends), + Arc::new(rewrite_field_type(values, paths, current_path)), + ), + Union(union_fields, mode) => { + let type_ids: Vec = union_fields.iter().map(|(id, _)| id).collect(); + let fields: Vec = union_fields + .iter() + .map(|(_, field)| { + let field_path = if current_path.is_empty() { + field.name().to_string() + } else { + format!("{}.{}", current_path, field.name()) + }; + let make_nullable = + !field.is_nullable() && paths.iter().any(|p| *p == field_path); + let mut new_field = rewrite_field_type(field, paths, &field_path); + if make_nullable { + new_field = new_field.with_nullable(true); + } + Arc::new(new_field) + }) + .collect(); + Union(UnionFields::new(type_ids, fields), *mode) + } + _ => dt.clone(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_schema::{DataType, Field, Schema}; + + fn make_schema(fields: Vec) -> SchemaRef { + Arc::new(Schema::new(fields)) + } + + fn paths(strs: &[&str]) -> Vec { + strs.iter().map(|s| s.to_string()).collect() + } + + #[test] + fn test_schema_null_empty_paths_returns_input() { + let schema = make_schema(vec![Field::new("a", DataType::Int32, false)]); + let result = rewrite_schema_with_nullable_fields_inner(schema.clone(), &[]); + assert_eq!(schema, result); + } + + #[test] + fn test_schema_null_top_level_field_made_nullable() { + let schema = make_schema(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ]); + let result = rewrite_schema_with_nullable_fields_inner(schema, &paths(&["a"])); + assert!(result.field_with_name("a").unwrap().is_nullable()); + assert!(!result.field_with_name("b").unwrap().is_nullable()); + } + + #[test] + fn test_schema_null_already_nullable_unchanged() { + let schema = make_schema(vec![Field::new("a", DataType::Int32, true)]); + let result = rewrite_schema_with_nullable_fields_inner(schema, &paths(&["a"])); + assert!(result.field_with_name("a").unwrap().is_nullable()); + } + + #[test] + fn test_schema_null_path_not_in_schema_unchanged() { + let schema = make_schema(vec![Field::new("a", DataType::Int32, false)]); + let result = rewrite_schema_with_nullable_fields_inner(schema.clone(), &paths(&["z"])); + assert_eq!(schema, result); + } + + #[test] + fn test_schema_null_nested_struct_field() { + let schema = make_schema(vec![Field::new( + "a", + Struct(Fields::from(vec![ + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Utf8, false), + ])), + false, + )]); + let result = rewrite_schema_with_nullable_fields_inner(schema, &paths(&["a.b"])); + let a = result.field_with_name("a").unwrap(); + assert!(!a.is_nullable()); + if let Struct(fields) = a.data_type() { + assert!(fields.find("b").unwrap().1.is_nullable()); + assert!(!fields.find("c").unwrap().1.is_nullable()); + } else { + assert_eq!(true, false, "expected struct"); + } + } + + #[test] + fn test_schema_null_deeply_nested_struct() { + let schema = make_schema(vec![Field::new( + "a", + Struct(Fields::from(vec![Field::new( + "b", + Struct(Fields::from(vec![Field::new( + "c", + Struct(Fields::from(vec![Field::new("d", DataType::Int32, false)])), + false, + )])), + false, + )])), + false, + )]); + let result = rewrite_schema_with_nullable_fields_inner(schema, &paths(&["a.b.c.d"])); + let a = result.field_with_name("a").unwrap(); + assert!(!a.is_nullable()); + let b = match a.data_type() { + Struct(f) => f.find("b").unwrap().1, + _ => panic!(), + }; + assert!(!b.is_nullable()); + let c = match b.data_type() { + Struct(f) => f.find("c").unwrap().1, + _ => panic!(), + }; + assert!(!c.is_nullable()); + let d = match c.data_type() { + Struct(f) => f.find("d").unwrap().1, + _ => panic!(), + }; + assert!(d.is_nullable()); + } + + #[test] + fn test_schema_null_struct_itself_made_nullable() { + let schema = make_schema(vec![Field::new( + "a", + Struct(Fields::from(vec![Field::new("b", DataType::Int32, false)])), + false, + )]); + let result = rewrite_schema_with_nullable_fields_inner(schema, &paths(&["a"])); + let a = result.field_with_name("a").unwrap(); + assert!(a.is_nullable()); + if let Struct(fields) = a.data_type() { + assert!(!fields.find("b").unwrap().1.is_nullable()); + } else { + panic!("expected struct"); + } + } + + #[test] + fn test_schema_null_struct_and_child_both_made_nullable() { + let schema = make_schema(vec![Field::new( + "a", + Struct(Fields::from(vec![Field::new("b", DataType::Int32, false)])), + false, + )]); + let result = rewrite_schema_with_nullable_fields_inner(schema, &paths(&["a", "a.b"])); + let a = result.field_with_name("a").unwrap(); + assert!(a.is_nullable()); + if let Struct(fields) = a.data_type() { + assert!(fields.find("b").unwrap().1.is_nullable()); + } else { + panic!("expected struct"); + } + } + + #[test] + fn test_schema_null_list_containing_struct() { + let schema = make_schema(vec![Field::new( + "a", + List(Arc::new(Field::new( + "item", + Struct(Fields::from(vec![Field::new("b", DataType::Int32, false)])), + false, + ))), + false, + )]); + let result = rewrite_schema_with_nullable_fields_inner(schema, &paths(&["a.b"])); + let a = result.field_with_name("a").unwrap(); + assert!(!a.is_nullable()); + if let List(inner) = a.data_type() { + if let Struct(fields) = inner.data_type() { + assert!(fields.find("b").unwrap().1.is_nullable()); + } else { + panic!("expected struct inside list"); + } + } else { + panic!("expected list"); + } + } + + #[test] + fn test_schema_null_list_itself_made_nullable() { + let schema = make_schema(vec![Field::new( + "a", + List(Arc::new(Field::new("item", DataType::Int32, false))), + false, + )]); + let result = rewrite_schema_with_nullable_fields_inner(schema, &paths(&["a"])); + assert!(result.field_with_name("a").unwrap().is_nullable()); + } + + #[test] + fn test_schema_null_dictionary_with_nested_value() { + let schema = make_schema(vec![Field::new( + "a", + Dictionary( + Box::new(DataType::Int32), + Box::new(Struct(Fields::from(vec![Field::new( + "b", + DataType::Utf8, + false, + )]))), + ), + false, + )]); + let result = rewrite_schema_with_nullable_fields_inner(schema, &paths(&["a.b"])); + let a = result.field_with_name("a").unwrap(); + if let Dictionary(_, value) = a.data_type() { + if let Struct(fields) = value.as_ref() { + assert!(fields.find("b").unwrap().1.is_nullable()); + } else { + panic!("expected struct in dict value"); + } + } else { + panic!("expected dictionary"); + } + } + + #[test] + fn test_schema_null_map_with_nested_value() { + let schema = make_schema(vec![Field::new( + "a", + Map( + Arc::new(Field::new( + "entries", + Struct(Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new( + "value", + Struct(Fields::from(vec![Field::new("b", DataType::Int32, false)])), + false, + ), + ])), + false, + )), + false, + ), + false, + )]); + let result = rewrite_schema_with_nullable_fields_inner(schema, &paths(&["a.value.b"])); + let a = result.field_with_name("a").unwrap(); + if let Map(inner, _) = a.data_type() { + if let Struct(entries) = inner.data_type() { + let value_field = entries.find("value").unwrap().1; + if let Struct(vf) = value_field.data_type() { + assert!(vf.find("b").unwrap().1.is_nullable()); + } else { + panic!("expected struct in value"); + } + } else { + panic!("expected struct in map entries"); + } + } else { + panic!("expected map"); + } + } + + #[test] + fn test_schema_null_multiple_paths_across_tree() { + let schema = make_schema(vec![ + Field::new("x", DataType::Int64, false), + Field::new( + "y", + Struct(Fields::from(vec![ + Field::new("z", DataType::Float64, false), + Field::new("w", DataType::Boolean, false), + ])), + false, + ), + ]); + let result = rewrite_schema_with_nullable_fields_inner(schema, &paths(&["x", "y.z"])); + assert!(result.field_with_name("x").unwrap().is_nullable()); + let y = result.field_with_name("y").unwrap(); + assert!(!y.is_nullable()); + if let Struct(fields) = y.data_type() { + assert!(fields.find("z").unwrap().1.is_nullable()); + assert!(!fields.find("w").unwrap().1.is_nullable()); + } else { + panic!("expected struct"); + } + } + + #[test] + fn test_schema_null_metadata_preserved() { + let mut field = Field::new("a", DataType::Int32, false); + field.set_metadata([("key".to_string(), "val".to_string())].into()); + let schema = make_schema(vec![field]); + let result = rewrite_schema_with_nullable_fields_inner(schema, &paths(&["a"])); + let a = result.field_with_name("a").unwrap(); + assert!(a.is_nullable()); + assert_eq!(a.metadata().get("key").unwrap(), "val"); + } +} diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index 0f0fb4f17..24e8c13ce 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -8,8 +8,8 @@ use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::error::ArrowError; use chrono::{DateTime, TimeZone, Utc}; -use datafusion::catalog::TableProvider; use datafusion::catalog::memory::DataSourceExec; +use datafusion::catalog::{ScanArgs, ScanResult, TableProvider}; use datafusion::common::pruning::PruningStatistics; use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion::common::{Column, ColumnStatistics, DFSchemaRef, Result, Statistics, ToDFSchema}; @@ -43,6 +43,7 @@ use datafusion::{ use delta_kernel::Version; use futures::TryStreamExt as _; use futures::future::BoxFuture; +use itertools::Itertools; use object_store::ObjectMeta; use serde::{Deserialize, Serialize}; use url::Url; @@ -82,6 +83,8 @@ pub struct DeltaScanConfigBuilder { pub(super) enable_parquet_pushdown: bool, /// Schema to scan table with pub(super) schema: Option, + /// options passed down to the store + pub(super) options: std::collections::HashMap, } impl Default for DeltaScanConfigBuilder { @@ -92,6 +95,7 @@ impl Default for DeltaScanConfigBuilder { wrap_partition_values: None, enable_parquet_pushdown: true, schema: None, + options: std::collections::HashMap::new(), } } } @@ -136,6 +140,12 @@ impl DeltaScanConfigBuilder { self } + /// Use the provided [SchemaRef] for the [DeltaScan] + pub fn with_options(mut self, options: std::collections::HashMap) -> Self { + self.options = options; + self + } + /// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing pub fn build(&self, snapshot: &EagerSnapshot) -> DeltaResult { let file_column_name = if self.include_file_column { @@ -178,6 +188,7 @@ impl DeltaScanConfigBuilder { enable_parquet_pushdown: self.enable_parquet_pushdown, schema: self.schema.clone(), schema_force_view_types: true, + options: self.options.clone(), }) } } @@ -196,6 +207,8 @@ pub struct DeltaScanConfig { pub schema_force_view_types: bool, /// Schema to read as pub schema: Option, + /// options to pass down to store + pub options: std::collections::HashMap, } impl Default for DeltaScanConfig { @@ -213,6 +226,7 @@ impl DeltaScanConfig { enable_parquet_pushdown: true, schema_force_view_types: true, schema: None, + options: std::collections::HashMap::new(), } } @@ -224,6 +238,7 @@ impl DeltaScanConfig { enable_parquet_pushdown: config_options.execution.parquet.pushdown_filters, schema_force_view_types: config_options.execution.parquet.schema_force_view_types, schema: None, + options: std::collections::HashMap::new(), } } @@ -261,6 +276,7 @@ pub(crate) struct DeltaScanBuilder<'a> { filter: Option, session: &'a dyn Session, projection: Option<&'a Vec>, + projection_deep: Option<&'a std::collections::HashMap>>, limit: Option, files: Option<&'a [Add]>, config: Option, @@ -278,6 +294,7 @@ impl<'a> DeltaScanBuilder<'a> { filter: None, session, projection: None, + projection_deep: None, limit: None, files: None, config: None, @@ -299,6 +316,14 @@ impl<'a> DeltaScanBuilder<'a> { self } + pub fn with_projection_deep( + mut self, + projection_deep: Option<&'a std::collections::HashMap>>, + ) -> Self { + self.projection_deep = projection_deep; + self + } + pub fn with_limit(mut self, limit: Option) -> Self { self.limit = limit; self @@ -390,10 +415,10 @@ impl<'a> DeltaScanBuilder<'a> { if logical_filter.is_none() && self.limit.is_none() { let files = self .snapshot - .file_views(&self.log_store, None) - .map_ok(|f| f.add_action()) - .try_collect::>() - .await?; + .log_data() + .iter() + .map(|f| f.add_action_no_stats()) + .collect_vec(); let files_scanned = files.len(); (files, files_scanned, 0, None) } else { @@ -413,23 +438,21 @@ impl<'a> DeltaScanBuilder<'a> { let mut rows_collected = 0; let mut files = Vec::with_capacity(num_containers); - let file_actions: Vec<_> = self + for (file_view, keep) in self .snapshot - .file_views(&self.log_store, None) - .map_ok(|f| f.add_action()) - .try_collect::>() - .await?; - - for (action, keep) in - file_actions.into_iter().zip(files_to_prune.iter().cloned()) + .log_data() + .into_iter() + .zip(files_to_prune.iter().cloned()) { // prune file based on predicate pushdown + let action = file_view.add_action_no_stats(); + let num_records = file_view.num_records(); if keep { // prune file based on limit pushdown if let Some(limit) = self.limit { - if let Some(stats) = action.get_stats()? { + if let Some(num_records) = num_records { if rows_collected <= limit as i64 { - rows_collected += stats.num_records; + rows_collected += num_records as i64; files.push(action.to_owned()); } else { break; @@ -636,7 +659,8 @@ impl<'a> DeltaScanBuilder<'a> { }, ) .with_statistics(stats) - .with_projection_indices(self.projection.cloned())? + // .with_projection_indices(self.projection.cloned())? + .with_deep_projection(self.projection.cloned(), self.projection_deep.cloned())? .with_limit(self.limit) .build(); @@ -929,6 +953,29 @@ impl TableProvider for DeltaTableProvider { Ok(Arc::new(scan.build().await?)) } + async fn scan_with_args<'a>( + &self, + state: &dyn Session, + args: ScanArgs<'a>, + ) -> Result { + state.ensure_log_store_registered(self.log_store.as_ref())?; + let filters = args.filters().unwrap_or(&[]); + let filter_expr = conjunction(filters.iter().cloned()); + + let projection = args.projection().map(|p| p.to_vec()); + let mut scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), state) + .with_projection(projection.as_ref()) + .with_projection_deep(args.projection_deep()) + .with_limit(args.limit()) + .with_filter(filter_expr) + .with_scan_config(self.config.clone()); + + if let Some(files) = &self.files { + scan = scan.with_files(files); + } + Ok(ScanResult::new(Arc::new(scan.build().await?))) + } + fn supports_filters_pushdown( &self, filter: &[&Expr], @@ -980,15 +1027,16 @@ impl TableProvider for DeltaTableProvider { #[derive(Debug)] pub struct DeltaScan { /// The normalized [Url] of the ObjectStore root - table_url: Url, + pub table_url: Url, /// Column that contains an index that maps to the original metadata Add - pub(crate) config: DeltaScanConfig, + pub config: DeltaScanConfig, /// The parquet scan to wrap - pub(crate) parquet_scan: Arc, + pub parquet_scan: Arc, /// The schema of the table to be used when evaluating expressions - pub(crate) logical_schema: Arc, + pub logical_schema: Arc, /// Metrics for scan reported via DataFusion - metrics: ExecutionPlanMetricsSet, + /// @HStack - exposed this builder so we can recreate DeltaScan + pub metrics: ExecutionPlanMetricsSet, } impl DeltaScan { @@ -1164,7 +1212,7 @@ pub(crate) fn simplify_expr( session.create_physical_expr(simplifier.simplify(expr)?, df_schema.as_ref()) } -fn get_pushdown_filters( +pub(crate) fn get_pushdown_filters( filter: &[&Expr], partition_cols: &[String], ) -> Vec { diff --git a/crates/core/src/delta_datafusion/table_provider/next/mod.rs b/crates/core/src/delta_datafusion/table_provider/next/mod.rs index 4739fed35..3101b4601 100644 --- a/crates/core/src/delta_datafusion/table_provider/next/mod.rs +++ b/crates/core/src/delta_datafusion/table_provider/next/mod.rs @@ -29,9 +29,18 @@ use std::any::Any; use std::{borrow::Cow, sync::Arc}; +pub(crate) use self::scan::KernelScanPlan; +pub use self::scan::{DeltaNextPhysicalCodec, DeltaScanExec}; +use crate::delta_datafusion::engine::AsObjectStoreUrl; +use crate::delta_datafusion::engine::DataFusionEngine; +use crate::delta_datafusion::table_provider::TableProviderBuilder; +use crate::delta_datafusion::{DeltaScanConfig, DeltaSessionExt}; +use crate::kernel::{EagerSnapshot, Snapshot}; use arrow::datatypes::{Schema, SchemaRef}; +use datafusion::catalog::{ScanArgs, ScanResult}; use datafusion::common::Result; use datafusion::datasource::TableType; +use datafusion::error::DataFusionError; use datafusion::logical_expr::TableProviderFilterPushDown; use datafusion::prelude::Expr; use datafusion::{ @@ -41,18 +50,13 @@ use datafusion::{ }; use delta_kernel::table_configuration::TableConfiguration; use serde::{Deserialize, Serialize}; - -pub use self::scan::DeltaScanExec; -pub(crate) use self::scan::KernelScanPlan; -use crate::delta_datafusion::DeltaScanConfig; -use crate::delta_datafusion::engine::DataFusionEngine; -use crate::delta_datafusion::table_provider::TableProviderBuilder; -use crate::kernel::{EagerSnapshot, Snapshot}; +use url::Url; mod scan; /// Default column name for the file id column we add to files read from disk. pub(crate) use crate::delta_datafusion::file_id::FILE_ID_COLUMN_DEFAULT; +use crate::logstore::{StorageConfig, object_store_factories}; #[derive(Clone, Debug, Serialize, Deserialize)] pub enum SnapshotWrapper { @@ -216,6 +220,72 @@ impl TableProvider for DeltaScan { scan::execution_plan(&self.config, session, scan_plan, stream, engine, limit).await } + async fn scan_with_args<'a>( + &self, + state: &dyn Session, + args: ScanArgs<'a>, + ) -> Result { + let engine = DataFusionEngine::new_from_session(state); + let table_uri = self.snapshot.table_configuration().table_root(); + if state + .runtime_env() + .object_store(table_uri.as_object_store_url()) + .is_err() + { + let url_key = Url::parse(&format!("{}://", table_uri.scheme())) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + if let Some(entry) = object_store_factories().get(&url_key) { + let storage_config = + StorageConfig::parse_options(&self.snapshot.snapshot().load_config().options)?; + let (store, _) = entry.value().parse_url_opts(table_uri, &storage_config)?; + state.runtime_env().register_object_store(table_uri, store); + } + } + + // Filter out file_id column from projection if present + let file_id_idx = self + .config + .file_column_name + .as_ref() + .map(|_| self.scan_schema.fields().len()); + let kernel_projection = args.projection().map(|proj| { + proj.iter() + .filter(|&&idx| Some(idx) != file_id_idx) + .copied() + .collect::>() + }); + + let filters = args.filters().unwrap_or(&[]); + let mut scan_plan = KernelScanPlan::try_new( + self.snapshot.snapshot(), + kernel_projection.as_ref(), + filters, + &self.config, + self.file_skipping_predicate.clone(), + )?; + scan_plan.result_projection_deep = args.projection_deep().cloned(); + + let stream = match &self.snapshot { + SnapshotWrapper::Snapshot(_) => scan_plan.scan.scan_metadata(engine.clone()), + SnapshotWrapper::EagerSnapshot(esn) => { + if let Ok(files) = esn.files() { + scan_plan.scan.scan_metadata_from( + engine.clone(), + esn.snapshot().version() as u64, + Box::new(files.to_vec().into_iter()), + None, + ) + } else { + scan_plan.scan.scan_metadata(engine.clone()) + } + } + }; + + scan::execution_plan(&self.config, state, scan_plan, stream, engine, args.limit()) + .await + .map(ScanResult::new) + } + fn supports_filters_pushdown( &self, filter: &[&Expr], diff --git a/crates/core/src/delta_datafusion/table_provider/next/scan/codec.rs b/crates/core/src/delta_datafusion/table_provider/next/scan/codec.rs new file mode 100644 index 000000000..29ab28de3 --- /dev/null +++ b/crates/core/src/delta_datafusion/table_provider/next/scan/codec.rs @@ -0,0 +1,680 @@ +//! Codec for serializing and deserializing [`DeltaScanExec`] physical plans. +//! +//! Provides a [`PhysicalExtensionCodec`] implementation for distributed execution. +//! Expressions are serialized via DataFusion protobuf; kernel `Transform` expressions +//! use a custom wire format since they have no DataFusion equivalent. + +use std::sync::Arc; + +use arrow::datatypes::{Schema, SchemaRef}; +use dashmap::DashMap; +use datafusion::common::HashMap; +use datafusion::error::DataFusionError; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::Expr; +use datafusion_proto::bytes::Serializeable; +use datafusion_proto::physical_plan::PhysicalExtensionCodec; +use delta_kernel::engine::arrow_conversion::TryIntoKernel; +use delta_kernel::expressions::{ColumnName, Expression, FieldTransform, Transform}; +use delta_kernel::schema::DataType as KernelDataType; +use serde::{Deserialize, Serialize}; + +use super::DeltaScanExec; +use super::plan::KernelScanPlan; +use crate::delta_datafusion::DeltaScanConfig; +use crate::delta_datafusion::engine::{to_datafusion_expr, to_delta_expression}; +use crate::kernel::Snapshot; + +/// Codec for serializing/deserializing [`DeltaScanExec`] physical plans. +/// +/// This codec enables distributed execution by serializing the inputs needed +/// to reconstruct the execution plan rather than the plan itself. This approach +/// avoids the need for serde support in delta-kernel types. +#[derive(Debug, Clone, Default)] +pub struct DeltaNextPhysicalCodec; + +impl PhysicalExtensionCodec for DeltaNextPhysicalCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[Arc], + ctx: &TaskContext, + ) -> datafusion::common::Result> { + let wire: DeltaScanExecWire = serde_json::from_slice(buf).map_err(|e| { + DataFusionError::Internal(format!("Failed to decode DeltaScanExec: {e}")) + })?; + + wire.into_exec(inputs, ctx) + } + + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + ) -> datafusion::common::Result<()> { + let delta_scan = node + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal("Expected DeltaScanExec for encoding".to_string()) + })?; + + let wire = DeltaScanExecWire::try_from(delta_scan)?; + serde_json::to_writer(buf, &wire).map_err(|e| { + DataFusionError::Internal(format!("Failed to encode DeltaScanExec: {e}")) + })?; + Ok(()) + } +} + +/// Wire format for a kernel FieldTransform. +#[derive(Debug, Serialize, Deserialize)] +struct FieldTransformWire { + exprs: Vec>, + is_replace: bool, +} + +/// Wire format for a kernel Transform expression. +/// +/// Transform is a sparse schema modification: specifies which fields to modify, +/// with unmentioned fields passing through unchanged. +#[derive(Debug, Serialize, Deserialize)] +struct TransformWire { + input_path: Option>, + field_transforms: std::collections::HashMap, + prepended_fields: Vec>, +} + +/// Wire format for serializing [`DeltaScanExec`]. +/// +/// Uses `std::collections::HashMap` instead of `DashMap` for serde compatibility. +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct DeltaScanExecWire { + snapshot: Snapshot, + /// The kernel scan's logical schema, used to rebuild the Scan via `ScanBuilder::with_schema()`. + scan_schema: SchemaRef, + result_projection: Option>, + parquet_predicate: Option>, + file_id_column: String, + retain_file_ids: bool, + result_schema: SchemaRef, + transforms: std::collections::HashMap, + selection_vectors: std::collections::HashMap>, +} + +impl TryFrom<&DeltaScanExec> for DeltaScanExecWire { + type Error = DataFusionError; + + fn try_from(exec: &DeltaScanExec) -> Result { + use delta_kernel::engine::arrow_conversion::TryIntoArrow; + + let scan_plan = &exec.scan_plan; + let snapshot = scan_plan.snapshot.clone(); + let scan_schema: Schema = scan_plan.scan.logical_schema().as_ref().try_into_arrow()?; + + let parquet_predicate = scan_plan + .parquet_predicate + .as_ref() + .map(|p| p.to_bytes().map(|b| b.to_vec())) + .transpose()?; + + let transforms: std::collections::HashMap = exec + .transforms + .iter() + .map(|(file_url, kernel_expr)| { + serialize_transform(kernel_expr.as_ref()).map(|wire| (file_url.clone(), wire)) + }) + .collect::>()?; + + let selection_vectors: std::collections::HashMap> = exec + .selection_vectors + .iter() + .map(|entry| (entry.key().clone(), entry.value().clone())) + .collect(); + + Ok(Self { + snapshot, + scan_schema: Arc::new(scan_schema), + result_projection: scan_plan.result_projection.clone(), + parquet_predicate, + file_id_column: exec.file_id_column.to_string(), + retain_file_ids: exec.retain_file_ids, + result_schema: scan_plan.result_schema.clone(), + transforms, + selection_vectors, + }) + } +} + +/// Converts kernel Expression -> DataFusion Expr -> protobuf bytes. +/// Does not support Transform expressions - use `serialize_transform` instead. +fn serialize_kernel_expression(expr: &Expression) -> Result, DataFusionError> { + let placeholder_type = KernelDataType::STRING; + let df_expr = to_datafusion_expr(expr, &placeholder_type)?; + let bytes = df_expr.to_bytes()?; + Ok(bytes.to_vec()) +} + +/// Converts protobuf bytes -> DataFusion Expr -> kernel Expression. +fn deserialize_kernel_expression(bytes: &[u8]) -> Result { + let df_expr = Expr::from_bytes(bytes)?; + to_delta_expression(&df_expr) + .map_err(|e| DataFusionError::Internal(format!("Failed to convert to kernel expr: {e}"))) +} + +/// Converts a kernel Transform expression to wire format. +fn serialize_transform(expr: &Expression) -> Result { + match expr { + Expression::Transform(transform) => { + let input_path = transform + .input_path + .as_ref() + .map(|p| p.iter().map(|s| s.to_string()).collect()); + + let field_transforms = transform + .field_transforms + .iter() + .map(|(name, ft)| { + let exprs = ft + .exprs + .iter() + .map(|e| serialize_kernel_expression(e)) + .collect::, _>>()?; + Ok(( + name.clone(), + FieldTransformWire { + exprs, + is_replace: ft.is_replace, + }, + )) + }) + .collect::>()?; + + let prepended_fields = transform + .prepended_fields + .iter() + .map(|e| serialize_kernel_expression(e)) + .collect::, _>>()?; + + Ok(TransformWire { + input_path, + field_transforms, + prepended_fields, + }) + } + _ => Err(DataFusionError::Internal(format!( + "Expected Transform expression, got {:?}", + expr + ))), + } +} + +/// Converts wire format to a kernel Transform expression. +fn deserialize_transform(wire: TransformWire) -> Result { + let input_path = wire.input_path.map(ColumnName::new); + + let field_transforms = wire + .field_transforms + .into_iter() + .map(|(name, ft_wire)| { + let exprs = ft_wire + .exprs + .iter() + .map(|bytes| deserialize_kernel_expression(bytes).map(Arc::new)) + .collect::, _>>()?; + Ok(( + name, + FieldTransform { + exprs, + is_replace: ft_wire.is_replace, + }, + )) + }) + .collect::, DataFusionError>>()?; + + let prepended_fields = wire + .prepended_fields + .iter() + .map(|bytes| deserialize_kernel_expression(bytes).map(Arc::new)) + .collect::, _>>()?; + + Ok(Expression::Transform(Transform { + input_path, + field_transforms, + prepended_fields, + })) +} + +impl DeltaScanExecWire { + /// Reconstruct a [`DeltaScanExec`] from the wire format. + fn into_exec( + self, + inputs: &[Arc], + _task: &TaskContext, + ) -> datafusion::common::Result> { + if inputs.len() != 1 { + return Err(DataFusionError::Internal(format!( + "DeltaScanExec expects exactly 1 input, got {}", + inputs.len() + ))); + } + let input = inputs[0].clone(); + + let parquet_predicate = self + .parquet_predicate + .map(|bytes| Expr::from_bytes(&bytes)) + .transpose()?; + + let kernel_scan_schema = Arc::new(self.scan_schema.as_ref().try_into_kernel()?); + + // Build the scan with the exact schema we had before serialization + let scan = Arc::new( + self.snapshot + .scan_builder() + .with_schema(kernel_scan_schema) + .build()?, + ); + let mut config = DeltaScanConfig::new(); + if self.retain_file_ids { + config = config.with_file_column_name(self.file_id_column.clone()); + } + let scan_plan = KernelScanPlan::try_new_with_scan( + scan, + &self.snapshot, + &config, + self.result_schema, + self.result_projection, + parquet_predicate, + )?; + + let transforms: HashMap> = self + .transforms + .into_iter() + .map(|(file_url, wire)| { + deserialize_transform(wire).map(|expr| (file_url, Arc::new(expr))) + }) + .collect::>()?; + + let selection_vectors: DashMap> = + self.selection_vectors.into_iter().collect(); + + let exec = DeltaScanExec::new( + Arc::new(scan_plan), + input, + Arc::new(transforms), + Arc::new(selection_vectors), + Default::default(), + self.file_id_column, + self.retain_file_ids, + Default::default(), + ); + + Ok(Arc::new(exec)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion::physical_plan::ExecutionPlan; + use datafusion::prelude::{col, lit}; + use datafusion_proto::physical_plan::PhysicalExtensionCodec; + + use crate::delta_datafusion::session::create_session; + use crate::delta_datafusion::table_provider::next::DeltaScan; + use crate::kernel::Snapshot; + use crate::test_utils::{TestResult, TestTables}; + + use super::*; + + async fn create_delta_scan_exec( + filters: &[Expr], + projection: Option<&Vec>, + ) -> TestResult> { + create_delta_scan_exec_from_table(TestTables::Simple, filters, projection).await + } + + async fn create_delta_scan_exec_from_table( + table: TestTables, + filters: &[Expr], + projection: Option<&Vec>, + ) -> TestResult> { + let log_store = table.table_builder()?.build_storage()?; + let snapshot = Snapshot::try_new(&log_store, Default::default(), None).await?; + let provider = DeltaScan::builder().with_snapshot(snapshot).await?; + + let session = Arc::new(create_session().into_inner()); + let state = session.state_ref().read().clone(); + + let plan = provider.scan(&state, projection, filters, None).await?; + Ok(plan) + } + + fn extract_delta_scan_exec(plan: &Arc) -> Option<&DeltaScanExec> { + plan.as_any().downcast_ref::() + } + + #[tokio::test] + async fn test_codec_roundtrip_basic() -> TestResult { + let plan = create_delta_scan_exec(&[], None).await?; + + let delta_scan = extract_delta_scan_exec(&plan).expect("Expected DeltaScanExec"); + + let codec = DeltaNextPhysicalCodec; + + let mut buf = Vec::new(); + codec.try_encode(plan.clone(), &mut buf)?; + + assert!(!buf.is_empty(), "Encoded buffer should not be empty"); + + let session = create_session().into_inner(); + let task_ctx = session.task_ctx(); + + let input = delta_scan.children()[0].clone(); + let decoded = codec.try_decode(&buf, &[input], &task_ctx)?; + + let decoded_delta_scan = + extract_delta_scan_exec(&decoded).expect("Expected DeltaScanExec after decode"); + + assert_eq!( + delta_scan.scan_plan.result_schema, decoded_delta_scan.scan_plan.result_schema, + "Result schemas should match" + ); + assert_eq!( + delta_scan.file_id_column, decoded_delta_scan.file_id_column, + "File ID columns should match" + ); + assert_eq!( + delta_scan.retain_file_ids, decoded_delta_scan.retain_file_ids, + "Retain file IDs should match" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_codec_roundtrip_with_projection() -> TestResult { + let projection = vec![0usize]; + let plan = create_delta_scan_exec(&[], Some(&projection)).await?; + + let delta_scan = extract_delta_scan_exec(&plan).expect("Expected DeltaScanExec"); + + let codec = DeltaNextPhysicalCodec; + + let mut buf = Vec::new(); + codec.try_encode(plan.clone(), &mut buf)?; + + let session = create_session().into_inner(); + let task_ctx = session.task_ctx(); + + let input = delta_scan.children()[0].clone(); + let decoded = codec.try_decode(&buf, &[input], &task_ctx)?; + + let decoded_delta_scan = + extract_delta_scan_exec(&decoded).expect("Expected DeltaScanExec after decode"); + + assert_eq!( + delta_scan.scan_plan.result_schema.fields().len(), + decoded_delta_scan.scan_plan.result_schema.fields().len(), + "Projected schema field count should match" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_codec_roundtrip_with_filter() -> TestResult { + let filters = vec![col("id").gt(lit(5i64))]; + let plan = create_delta_scan_exec(&filters, None).await?; + + let delta_scan = extract_delta_scan_exec(&plan).expect("Expected DeltaScanExec"); + + let codec = DeltaNextPhysicalCodec; + + let mut buf = Vec::new(); + codec.try_encode(plan.clone(), &mut buf)?; + + let session = create_session().into_inner(); + let task_ctx = session.task_ctx(); + + let input = delta_scan.children()[0].clone(); + let decoded = codec.try_decode(&buf, &[input], &task_ctx)?; + + let decoded_delta_scan = + extract_delta_scan_exec(&decoded).expect("Expected DeltaScanExec after decode"); + + assert_eq!( + delta_scan.scan_plan.result_schema, decoded_delta_scan.scan_plan.result_schema, + "Result schemas should match with filter" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_wire_format_serialization() -> TestResult { + let plan = create_delta_scan_exec(&[], None).await?; + + let delta_scan = extract_delta_scan_exec(&plan).expect("Expected DeltaScanExec"); + + let wire = DeltaScanExecWire::try_from(delta_scan)?; + + let json = serde_json::to_string(&wire)?; + assert!(!json.is_empty(), "JSON should not be empty"); + + let deserialized: DeltaScanExecWire = serde_json::from_str(&json)?; + + assert_eq!( + wire.file_id_column, deserialized.file_id_column, + "File ID column should roundtrip" + ); + assert_eq!( + wire.retain_file_ids, deserialized.retain_file_ids, + "Retain file IDs should roundtrip" + ); + assert_eq!( + wire.result_schema, deserialized.result_schema, + "Result schema should roundtrip" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_codec_decode_wrong_input_count() -> TestResult { + let plan = create_delta_scan_exec(&[], None).await?; + + let codec = DeltaNextPhysicalCodec; + + let mut buf = Vec::new(); + codec.try_encode(plan.clone(), &mut buf)?; + + let session = create_session().into_inner(); + let task_ctx = session.task_ctx(); + + let result = codec.try_decode(&buf, &[], &task_ctx); + assert!(result.is_err(), "Should fail with 0 inputs"); + + let delta_scan = extract_delta_scan_exec(&plan).expect("Expected DeltaScanExec"); + let input = delta_scan.children()[0].clone(); + let result = codec.try_decode(&buf, &[input.clone(), input], &task_ctx); + assert!(result.is_err(), "Should fail with 2 inputs"); + + Ok(()) + } + + #[test] + fn test_kernel_expression_serialization_roundtrip() { + use delta_kernel::expressions::{ColumnName, Expression as KernelExpression, Scalar}; + + let column_expr = KernelExpression::Column(ColumnName::new(["test_column"])); + let serialized = serialize_kernel_expression(&column_expr).unwrap(); + let deserialized = deserialize_kernel_expression(&serialized).unwrap(); + assert_eq!( + column_expr, deserialized, + "Column expression should roundtrip" + ); + + let literal_expr = KernelExpression::Literal(Scalar::Integer(42)); + let serialized = serialize_kernel_expression(&literal_expr).unwrap(); + let deserialized = deserialize_kernel_expression(&serialized).unwrap(); + assert_eq!( + literal_expr, deserialized, + "Literal expression should roundtrip" + ); + + let string_literal = KernelExpression::Literal(Scalar::String("hello".to_string())); + let serialized = serialize_kernel_expression(&string_literal).unwrap(); + let deserialized = deserialize_kernel_expression(&serialized).unwrap(); + assert_eq!( + string_literal, deserialized, + "String literal should roundtrip" + ); + } + + #[tokio::test] + async fn test_wire_format_with_selection_vectors() -> TestResult { + let plan = create_delta_scan_exec(&[], None).await?; + let delta_scan = extract_delta_scan_exec(&plan).expect("Expected DeltaScanExec"); + + let wire = DeltaScanExecWire::try_from(delta_scan)?; + + assert!( + wire.selection_vectors.is_empty() || !wire.selection_vectors.is_empty(), + "Selection vectors should serialize (empty or not)" + ); + + let json = serde_json::to_string(&wire)?; + let deserialized: DeltaScanExecWire = serde_json::from_str(&json)?; + + assert_eq!( + wire.selection_vectors.len(), + deserialized.selection_vectors.len(), + "Selection vectors count should match" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_codec_roundtrip_preserves_transforms_and_selection_vectors() -> TestResult { + let plan = create_delta_scan_exec(&[], None).await?; + let delta_scan = extract_delta_scan_exec(&plan).expect("Expected DeltaScanExec"); + + let codec = DeltaNextPhysicalCodec; + + let mut buf = Vec::new(); + codec.try_encode(plan.clone(), &mut buf)?; + + let session = create_session().into_inner(); + let task_ctx = session.task_ctx(); + + let input = delta_scan.children()[0].clone(); + let decoded = codec.try_decode(&buf, &[input], &task_ctx)?; + + let decoded_delta_scan = + extract_delta_scan_exec(&decoded).expect("Expected DeltaScanExec after decode"); + + assert_eq!( + delta_scan.transforms.len(), + decoded_delta_scan.transforms.len(), + "Transforms count should match" + ); + + assert_eq!( + delta_scan.selection_vectors.len(), + decoded_delta_scan.selection_vectors.len(), + "Selection vectors count should match" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_codec_roundtrip_with_deletion_vectors() -> TestResult { + let plan = create_delta_scan_exec_from_table(TestTables::WithDvSmall, &[], None).await?; + let delta_scan = extract_delta_scan_exec(&plan).expect("Expected DeltaScanExec"); + + assert!( + !delta_scan.selection_vectors.is_empty(), + "Table with deletion vectors should have non-empty selection_vectors" + ); + + let codec = DeltaNextPhysicalCodec; + + let mut buf = Vec::new(); + codec.try_encode(plan.clone(), &mut buf)?; + + let session = create_session().into_inner(); + let task_ctx = session.task_ctx(); + + let input = delta_scan.children()[0].clone(); + let decoded = codec.try_decode(&buf, &[input], &task_ctx)?; + + let decoded_delta_scan = + extract_delta_scan_exec(&decoded).expect("Expected DeltaScanExec after decode"); + + assert_eq!( + delta_scan.selection_vectors.len(), + decoded_delta_scan.selection_vectors.len(), + "Selection vectors count should match" + ); + + for entry in delta_scan.selection_vectors.iter() { + let key = entry.key(); + let original_vec = entry.value(); + let decoded_vec = decoded_delta_scan + .selection_vectors + .get(key) + .expect("Decoded should have same keys"); + assert_eq!( + original_vec.as_slice(), + decoded_vec.value().as_slice(), + "Selection vector values should match for key {key}" + ); + } + + Ok(()) + } + + #[tokio::test] + async fn test_codec_roundtrip_with_column_mapping() -> TestResult { + let plan = + create_delta_scan_exec_from_table(TestTables::WithColumnMapping, &[], None).await?; + let delta_scan = extract_delta_scan_exec(&plan).expect("Expected DeltaScanExec"); + + // Column mapping tables have transforms that inject partition values. + // This test verifies full roundtrip serialization of Transform expressions. + let codec = DeltaNextPhysicalCodec; + + let mut buf = Vec::new(); + codec.try_encode(plan.clone(), &mut buf)?; + + let session = create_session().into_inner(); + let task_ctx = session.task_ctx(); + + let input = delta_scan.children()[0].clone(); + let decoded = codec.try_decode(&buf, &[input], &task_ctx)?; + + let decoded_delta_scan = + extract_delta_scan_exec(&decoded).expect("Expected DeltaScanExec after decode"); + + assert_eq!( + delta_scan.transforms.len(), + decoded_delta_scan.transforms.len(), + "Transforms count should match" + ); + + // Verify each transform was correctly serialized and deserialized + for key in delta_scan.transforms.keys() { + assert!( + decoded_delta_scan.transforms.contains_key(key), + "Decoded should have transform for key {key}" + ); + } + + Ok(()) + } +} diff --git a/crates/core/src/delta_datafusion/table_provider/next/scan/exec.rs b/crates/core/src/delta_datafusion/table_provider/next/scan/exec.rs index 0bbc473c1..d95dab183 100644 --- a/crates/core/src/delta_datafusion/table_provider/next/scan/exec.rs +++ b/crates/core/src/delta_datafusion/table_provider/next/scan/exec.rs @@ -9,6 +9,9 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use super::plan::KernelScanPlan; +use crate::kernel::ARROW_HANDLER; +use crate::kernel::arrow::engine_ext::ExpressionEvaluatorExt; use arrow::array::{RecordBatch, StringArray}; use arrow::compute::filter_record_batch; use arrow::datatypes::{SchemaRef, UInt16Type}; @@ -33,10 +36,6 @@ use delta_kernel::table_features::TableFeature; use delta_kernel::{EvaluationHandler, ExpressionRef}; use futures::stream::{Stream, StreamExt}; -use super::plan::KernelScanPlan; -use crate::kernel::ARROW_HANDLER; -use crate::kernel::arrow::engine_ext::ExpressionEvaluatorExt; - #[derive(Debug, PartialEq)] pub(crate) struct DvMaskResult { pub selection: Option>, @@ -89,21 +88,21 @@ pub(crate) fn consume_dv_mask( /// 4. Result is cast to [`result_schema`](KernelScanPlan::result_schema) #[derive(Clone, Debug)] pub struct DeltaScanExec { - scan_plan: Arc, + pub(crate) scan_plan: Arc, /// Execution plan yielding the raw data read from data files. input: Arc, /// Transforms to be applied to data eminating from individual files - transforms: Arc>, + pub(crate) transforms: Arc>, /// Selection vectors to be applied to data read from individual files - selection_vectors: Arc>>, + pub(crate) selection_vectors: Arc>>, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Column name for the file id - file_id_column: String, + pub(crate) file_id_column: String, /// plan properties properties: PlanProperties, /// Denotes if file ids should be returned as part of the output - retain_file_ids: bool, + pub(crate) retain_file_ids: bool, /// Aggregated partition column statistics partition_stats: HashMap, } @@ -151,6 +150,10 @@ impl DeltaScanExec { } } + pub fn options(&self) -> &std::collections::HashMap { + &self.scan_plan.snapshot.load_config().options + } + /// Transform the statistics from the inner physical parquet read plan to the logical /// schema we expose via the table provider. We do not attempt to provide meaningful /// statistics for metadata columns as we do not expect these to be useful in planning. diff --git a/crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs b/crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs index 8f79e8eac..672683c3b 100644 --- a/crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs +++ b/crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs @@ -22,6 +22,8 @@ use arrow_cast::{CastOptions, cast_with_options}; use arrow_schema::{DataType, FieldRef, Schema, SchemaBuilder, SchemaRef}; use chrono::{TimeZone as _, Utc}; use dashmap::DashMap; +use datafusion::common::deep::has_deep_projection; +use datafusion::common::internal_err; use datafusion::{ catalog::Session, common::{ @@ -39,6 +41,7 @@ use datafusion::{ }, prelude::Expr, }; +use datafusion_datasource::file::FileSource; use datafusion_datasource::{ PartitionedFile, TableSchema, compute_all_files_statistics, file_groups::FileGroup, file_scan_config::FileScanConfigBuilder, source::DataSourceExec, @@ -50,6 +53,7 @@ use futures::{Stream, TryStreamExt as _, future::ready}; use itertools::Itertools as _; use object_store::{ObjectMeta, path::Path}; +pub use self::codec::DeltaNextPhysicalCodec; pub use self::exec::DeltaScanExec; use self::exec_meta::DeltaScanMetaExec; pub(crate) use self::plan::{KernelScanPlan, supports_filters_pushdown}; @@ -63,6 +67,7 @@ use crate::{ }, }; +mod codec; mod exec; mod exec_meta; mod plan; @@ -121,7 +126,7 @@ pub(super) async fn execution_plan( metrics, limit, file_id_field, - config.retain_file_id(), + config, ) .await } @@ -175,7 +180,7 @@ async fn get_data_scan_plan( metrics: ExecutionPlanMetricsSet, limit: Option, file_id_field: FieldRef, - retain_file_ids: bool, + config: &DeltaScanConfig, ) -> Result> { let mut partition_stats = HashMap::new(); @@ -227,15 +232,33 @@ async fn get_data_scan_plan( scan_plan.parquet_predicate.as_ref() }; let file_id_column = file_id_field.name().clone(); - let pq_plan = get_read_plan( - session, - files_by_store, - &scan_plan.parquet_read_schema, - limit, - &file_id_field, - predicate, - ) - .await?; + // @HStack @DeepProjection integration - rest is handled by DF, + // we just need to set the deep projections in the ParquetOpener via ProjectionExprs + // let pq_plan = if false { + let pq_plan = if let Some(result_projection_deep) = scan_plan.result_projection_deep.clone() + && has_deep_projection(&result_projection_deep) + { + get_read_plan_deep( + session, + files_by_store, + &scan_plan.parquet_read_schema, + limit, + &file_id_field, + predicate, + result_projection_deep.clone(), + ) + .await? + } else { + get_read_plan( + session, + files_by_store, + &scan_plan.parquet_read_schema, + limit, + &file_id_field, + predicate, + ) + .await? + }; let exec = DeltaScanExec::new( Arc::new(scan_plan), @@ -244,7 +267,7 @@ async fn get_data_scan_plan( Arc::new(dvs), partition_stats, file_id_column, - retain_file_ids, + config.retain_file_id(), metrics, ); @@ -382,6 +405,104 @@ async fn get_read_plan( }) } +async fn get_read_plan_deep( + state: &dyn Session, + files_by_store: impl IntoIterator, + // Schema of physical file columns to read from Parquet (no Delta partitions, no file-id). + // + // This is also the schema used for Parquet pruning/pushdown. It may include view types + // (e.g. Utf8View/BinaryView) depending on `DeltaScanConfig`. + parquet_read_schema: &SchemaRef, + limit: Option, + file_id_field: &FieldRef, + predicate: Option<&Expr>, + projection_deep: std::collections::HashMap>, +) -> Result> { + let mut plans = Vec::new(); + + let pq_options = TableParquetOptions { + global: state.config().options().execution.parquet.clone(), + ..Default::default() + }; + + // info!("get_read_plan parquet_read_schema: {:?}", parquet_read_schema); + + let mut full_read_schema = SchemaBuilder::from(parquet_read_schema.as_ref().clone()); + full_read_schema.push(file_id_field.as_ref().clone().with_nullable(true)); + let full_read_schema = Arc::new(full_read_schema.finish()); + // info!("get_read_plan_deep full_read_schema: {:?}", parquet_read_schema); + let full_read_df_schema = full_read_schema.clone().to_dfschema()?; + + for (store_url, files) in files_by_store.into_iter() { + let reader_factory = Arc::new(CachedParquetFileReaderFactory::new( + state.runtime_env().object_store(&store_url)?, + state.runtime_env().cache_manager.get_file_metadata_cache(), + )); + + // NOTE: In the "next" provider, DataFusion's Parquet scan partition fields are file-id + // only. Delta partition columns/values are injected via kernel transforms and handled + // above Parquet, so they are not part of the Parquet partition schema here. + let table_schema = + TableSchema::new(parquet_read_schema.clone(), vec![file_id_field.clone()]); + // info!("get_read_plan_deep table_schema: {:?}", parquet_read_schema); + let full_table_schema = table_schema.table_schema().clone(); + // info!("get_read_plan_deep full_table_schema: {:?}", full_table_schema); + let mut file_source = ParquetSource::new(table_schema) + .with_table_parquet_options(pq_options.clone()) + .with_parquet_file_reader_factory(reader_factory); + + if has_deep_projection(&projection_deep) { + // SAFETY - ParquetSource::new fills projection_exprs inside the ParquetSource + let mut projection_exprs = file_source.projection().unwrap().clone(); + projection_exprs.projection_deep = Some(projection_deep.clone()); + let new_file_source = file_source.try_pushdown_projection(&projection_exprs)?; + if let Some(new_file_source) = new_file_source { + file_source = new_file_source + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + } else { + return internal_err!( + "get_read_plan_deep, error pushing projections in pushdown with deep: {:?}", + &projection_deep + ); + } + } + + // TODO(roeap); we might be able to also push selection vectors into the read plan + // by creating parquet access plans. However we need to make sure this does not + // interfere with other delta features like row ids. + let has_selection_vectors = files.iter().any(|(_, sv)| sv.is_some()); + if !has_selection_vectors && let Some(pred) = predicate { + // Predicate pushdown can reference the synthetic file-id partition column. + // Use the full read schema (data columns + file-id) when planning. + let physical = state.create_physical_expr(pred.clone(), &full_read_df_schema)?; + file_source = file_source + .with_predicate(physical) + .with_pushdown_filters(true); + } + + let file_groups = partitioned_files_to_file_groups(files.into_iter().map(|file| file.0)); + let (file_groups, statistics) = + compute_all_files_statistics(file_groups, full_table_schema, true, false)?; + + let config = FileScanConfigBuilder::new(store_url, Arc::new(file_source)) + .with_file_groups(file_groups) + .with_statistics(statistics) + .with_limit(limit) + .build(); + + plans.push(DataSourceExec::from_data_source(config) as Arc); + } + + Ok(match plans.len() { + 0 => Arc::new(EmptyExec::new(full_read_schema.clone())), + 1 => plans.remove(0), + _ => UnionExec::try_new(plans)?, + }) +} + // Small helper to reuse some code between exec and exec_meta fn finalize_transformed_batch( batch: RecordBatch, diff --git a/crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs b/crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs index beebb7d45..3ccf9fe11 100644 --- a/crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs +++ b/crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs @@ -61,6 +61,8 @@ use crate::kernel::{Scan, Snapshot}; pub(crate) struct KernelScanPlan { /// Wrapped kernel scan to produce logical file stream pub(crate) scan: Arc, + /// Original snapshot used to create the scan (stored for codec serialization) + pub(crate) snapshot: Snapshot, /// The resulting schema exposed to the caller (used for expression evaluation) pub(crate) result_schema: SchemaRef, /// The final output schema (includes file_id column if configured) @@ -68,6 +70,9 @@ pub(crate) struct KernelScanPlan { /// If set, indicates a projection to apply to the /// scan output to obtain the result schema pub(crate) result_projection: Option>, + /// If set, indicates a projection to apply to the + /// scan output to obtain the result schema + pub(crate) result_projection_deep: Option>>, /// Physical schema used for Parquet reads and predicate evaluation. pub(crate) parquet_read_schema: SchemaRef, /// If set, indicates a predicate to apply at the Parquet scan level @@ -106,7 +111,14 @@ impl KernelScanPlan { let Some(projection) = projection else { let scan = Arc::new(scan_builder.build()?); - return Self::try_new_with_scan(scan, config, table_schema, None, parquet_predicate); + return Self::try_new_with_scan( + scan, + snapshot, + config, + table_schema, + None, + parquet_predicate, + ); }; // The table projection may not include all columns referenced in filters, @@ -168,6 +180,7 @@ impl KernelScanPlan { Self::try_new_with_scan( scan, + snapshot, config, result_schema, result_projection, @@ -175,8 +188,9 @@ impl KernelScanPlan { ) } - fn try_new_with_scan( + pub(crate) fn try_new_with_scan( scan: Arc, + snapshot: &Snapshot, config: &DeltaScanConfig, result_schema: SchemaRef, result_projection: Option>, @@ -195,9 +209,11 @@ impl KernelScanPlan { )?; Ok(Self { scan, + snapshot: snapshot.clone(), result_schema, output_schema, result_projection, + result_projection_deep: None, parquet_read_schema, parquet_predicate, }) diff --git a/crates/core/src/delta_datafusion/table_provider_old.rs b/crates/core/src/delta_datafusion/table_provider_old.rs new file mode 100644 index 000000000..9ec3b2726 --- /dev/null +++ b/crates/core/src/delta_datafusion/table_provider_old.rs @@ -0,0 +1,140 @@ +use crate::delta_datafusion::table_provider::get_pushdown_filters; +use crate::delta_datafusion::{DataFusionMixins, DeltaScanBuilder, DeltaScanConfigBuilder}; +use crate::logstore::LogStoreRef; +use crate::table::state::DeltaTableState; +use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError}; +use arrow_schema::Schema; +use datafusion::catalog::{ScanArgs, ScanResult, Session, TableProvider}; +use datafusion::common::{Result, Statistics}; +use datafusion::datasource::TableType; +use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::logical_expr::utils::conjunction; +use datafusion::logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown}; +use datafusion::physical_plan::ExecutionPlan; +use std::any::Any; +use std::borrow::Cow; +use std::sync::Arc; +use url::Url; + +impl DeltaTable { + pub fn table_provider_old(&self) -> DeltaTableOldProvider { + self.clone().into() + } +} + +// each delta table must register a specific object store, since paths are internally +// handled relative to the table root. +pub(crate) fn register_store(store: LogStoreRef, env: &RuntimeEnv) { + let object_store_url = store.object_store_url(); + let url: &Url = object_store_url.as_ref(); + env.register_object_store(url, store.object_store(None)); +} + +#[derive(Debug, Clone)] +pub struct DeltaTableOldProvider { + /// The state of the table as of the most recent loaded Delta log entry. + pub state: Option, + /// the load options used during load + pub config: DeltaTableConfig, + /// log store + pub(crate) log_store: LogStoreRef, +} + +impl DeltaTableOldProvider { + pub fn snapshot(&self) -> DeltaResult<&DeltaTableState> { + self.state.as_ref().ok_or(DeltaTableError::NotInitialized) + } + pub fn log_store(&self) -> LogStoreRef { + self.log_store.clone() + } +} + +impl From for DeltaTableOldProvider { + fn from(value: DeltaTable) -> Self { + Self { + state: value.state.clone(), + config: value.config.clone(), + log_store: value.log_store.clone(), + } + } +} + +#[async_trait::async_trait] +impl TableProvider for DeltaTableOldProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> Arc { + self.snapshot().unwrap().snapshot().read_schema() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + fn get_table_definition(&self) -> Option<&str> { + None + } + + fn get_logical_plan(&self) -> Option> { + None + } + + async fn scan( + &self, + _session: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + unimplemented!("scan is not available for this table provider; use scan_with_args") + } + + async fn scan_with_args<'a>( + &self, + state: &dyn Session, + args: ScanArgs<'a>, + ) -> Result { + register_store(self.log_store(), state.runtime_env().as_ref()); + let filters = args.filters().unwrap_or(&[]); + let filter_expr = conjunction(filters.iter().cloned()); + + let config = DeltaScanConfigBuilder { + include_file_column: false, + file_column_name: None, + wrap_partition_values: None, + enable_parquet_pushdown: true, + schema: None, + options: std::collections::HashMap::new(), + }; + + let config = config + .with_options(self.config.options.clone()) + .build(self.snapshot()?.snapshot())?; + + let projection = args.projection().map(|p| p.to_vec()); + let scan = DeltaScanBuilder::new(self.snapshot()?.snapshot(), self.log_store(), state) + .with_projection(projection.as_ref()) + .with_projection_deep(args.projection_deep()) + .with_limit(args.limit()) + .with_filter(filter_expr) + .with_scan_config(config) + .build() + .await?; + + Ok(ScanResult::new(Arc::new(scan))) + } + + fn supports_filters_pushdown( + &self, + filter: &[&Expr], + ) -> Result> { + let partition_cols = self.snapshot()?.metadata().partition_columns().as_slice(); + Ok(get_pushdown_filters(filter, partition_cols)) + } + + fn statistics(&self) -> Option { + self.snapshot().ok()?.datafusion_table_statistics() + } +} diff --git a/crates/core/src/delta_datafusion/udtf.rs b/crates/core/src/delta_datafusion/udtf.rs new file mode 100644 index 000000000..9783f53cb --- /dev/null +++ b/crates/core/src/delta_datafusion/udtf.rs @@ -0,0 +1,121 @@ +use crate::open_table_with_storage_options; +use async_trait::async_trait; +use datafusion::catalog::{TableFunctionImpl, TableProvider}; +use datafusion::common::{DataFusionError, Result, ScalarValue, internal_datafusion_err}; +use datafusion::logical_expr::Expr; +use datafusion::prelude::SessionContext; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use tokio::runtime::Runtime; +use url::Url; + +pub fn register_delta_table_udtf( + ctx: &SessionContext, + name: Option<&str>, + settings: Option<&HashMap>, +) { + let prefix = name.or_else(|| Some("delta_table")).unwrap(); + + ctx.register_udtf( + prefix, + Arc::new(DeltaTableUdtf { + flavor: DeltaTableUdtfFlavor::Old, + settings: settings.cloned(), + }), + ); + ctx.register_udtf( + format!("{prefix}_next").as_str(), + Arc::new(DeltaTableUdtf { + flavor: DeltaTableUdtfFlavor::Next, + settings: settings.cloned(), + }), + ); +} + +#[derive(Debug, Clone)] +pub enum DeltaTableUdtfFlavor { + Old, + Next, +} + +#[derive(Debug)] +pub struct DeltaTableUdtf { + flavor: DeltaTableUdtfFlavor, + settings: Option>, +} + +#[async_trait] +impl TableFunctionImpl for DeltaTableUdtf { + fn call(&self, args: &[Expr]) -> Result> { + if args.len() < 1 { + return Err(DataFusionError::Execution( + "Delta table function expects at least one argument".to_string(), + )); + } + + let mut args_string = args + .iter() + .map(|arg| match arg.clone() { + Expr::Literal(ScalarValue::Utf8(Some(path)), _) + | Expr::Literal(ScalarValue::LargeUtf8(Some(path)), _) + | Expr::Literal(ScalarValue::Utf8View(Some(path)), _) => Ok(path), + _ => Err(DataFusionError::Execution(format!( + "Unexpected argument type: {:?}", + arg + ))), + }) + .collect::>>()?; + + let path = args_string + .pop_front() + .expect("DeltaTableUdtf missing path"); + assert_eq!( + args_string.len() % 2, + 0, + "DeltaTableUdtf: Can't build hashmap out of odd-sized args" + ); + let mut settings = args_string + .iter() + .collect::>() + .as_slice() + .chunks(2) + .map(|l| (l[0].clone(), l[1].clone())) + .collect::>(); + if let Some(global_settings) = &self.settings { + settings.extend(global_settings.clone()); + } + + let flavor = self.flavor.clone(); + let table = std::thread::spawn(move || { + let rt = Runtime::new().unwrap(); + let table_uri = Url::parse(&path).expect(&format!("Invalid table uri: {}", path)); + rt.block_on(async { + let delta_table = open_table_with_storage_options(table_uri, settings) + .await + .map_err(|e| { + internal_datafusion_err!( + "DeltaTableUdtf could not open table at {}: {}", + &path, + e.to_string() + ) + }) + .unwrap(); + + match flavor { + DeltaTableUdtfFlavor::Old => { + let provider = delta_table.table_provider_old(); + Arc::new(provider) as Arc + } + DeltaTableUdtfFlavor::Next => { + let provider = delta_table.table_provider().build().await.unwrap(); + Arc::new(provider) as Arc + } + } + }) + }) + .join() + .map_err(|e| internal_datafusion_err!("DeltaTableFunc error opening table"))?; + + Ok(Arc::clone(&table)) + } +} diff --git a/crates/core/src/kernel/mod.rs b/crates/core/src/kernel/mod.rs index 2acfbec67..c4b498130 100644 --- a/crates/core/src/kernel/mod.rs +++ b/crates/core/src/kernel/mod.rs @@ -13,7 +13,7 @@ pub mod error; pub mod models; pub mod scalars; pub mod schema; -mod snapshot; +pub mod snapshot; pub mod transaction; pub use arrow::engine_ext::StructDataExt; diff --git a/crates/core/src/kernel/schema/cast/mod.rs b/crates/core/src/kernel/schema/cast/mod.rs index e196132e0..718a1396a 100644 --- a/crates/core/src/kernel/schema/cast/mod.rs +++ b/crates/core/src/kernel/schema/cast/mod.rs @@ -199,15 +199,28 @@ pub fn cast_record_batch( ..Default::default() }; + // @HStack Fix schema mapping for record batches with an empty schema // Can be simplified with StructArray::try_new_with_length in arrow 55.1 - let col_arrays = batch.columns().to_owned(); - let s = if col_arrays.is_empty() { - StructArray::new_empty_fields(batch.num_rows(), None) - } else { - StructArray::new(batch.schema().as_ref().to_owned().fields, col_arrays, None) - }; - - let struct_array = cast_struct(&s, target_schema.fields(), &cast_options, add_missing)?; + // let col_arrays = batch.columns().to_owned(); + // let s = if col_arrays.is_empty() { + // StructArray::new_empty_fields(batch.num_rows(), None) + // } else { + // StructArray::new(batch.schema().as_ref().to_owned().fields, col_arrays, None) + // }; + // + // let struct_array = cast_struct(&s, target_schema.fields(), &cast_options, add_missing)?; + let mut struct_array = StructArray::try_new_with_length( + batch.schema().as_ref().to_owned().fields, + batch.columns().to_owned(), + None, + batch.num_rows(), + )?; + struct_array = cast_struct( + &struct_array, + target_schema.fields(), + &cast_options, + add_missing, + )?; Ok(RecordBatch::try_new_with_options( target_schema, diff --git a/crates/core/src/kernel/snapshot/iterators.rs b/crates/core/src/kernel/snapshot/iterators.rs index 0cfcf40d9..2cf2b6aec 100644 --- a/crates/core/src/kernel/snapshot/iterators.rs +++ b/crates/core/src/kernel/snapshot/iterators.rs @@ -299,6 +299,24 @@ impl LogicalFileView { } } + /// Converts this file view into an Add action for log operations. + /// FIXME: Double json > stats conversion TOO EXPENSIVE + pub(crate) fn add_action_no_stats(&self) -> Add { + Add { + path: self.path().to_string(), + partition_values: self.partition_values_map(), + size: self.size(), + modification_time: self.modification_time(), + data_change: true, + stats: None, + tags: None, + deletion_vector: None, + base_row_id: None, + default_row_commit_version: None, + clustering_provider: None, + } + } + /// Converts this file view into a Remove action for log operations. pub fn remove_action(&self, data_change: bool) -> Remove { Remove { diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index aec02ba48..6f1bd0d39 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -242,8 +242,17 @@ mod datafusion { .map(|sv| sv.to_array()) .collect::, DataFusionError>>() .unwrap(); - let sa = StructArray::new(fields.clone(), arrays, None); - Precision::Exact(ScalarValue::Struct(Arc::new(sa))) + // @HStack - fix crash on missing statistics + // let sa = StructArray::new(fields.clone(), arrays, None); + // Precision::Exact(ScalarValue::Struct(Arc::new(sa))) + // let sa = StructArray::new(fields.clone(), arrays, None); + // Precision::Exact(ScalarValue::Struct(Arc::new(sa))) + if arrays.is_empty() { + Precision::Absent + } else { + let sa = StructArray::new(fields.clone(), arrays, None); + Precision::Exact(ScalarValue::Struct(Arc::new(sa))) + } }) .unwrap_or(Precision::Absent), _ => Precision::Absent, diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 82be67f61..a7e6773ed 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -20,6 +20,8 @@ use std::sync::{Arc, LazyLock}; use arrow::array::RecordBatch; use arrow::compute::{filter_record_batch, is_not_null}; use arrow::datatypes::SchemaRef; +use arrow_arith::aggregate::sum_array_checked; +use arrow_array::{Int64Array, StructArray}; use delta_kernel::actions::{Remove, Sidecar}; use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; use delta_kernel::engine::arrow_data::ArrowEngineData; @@ -41,6 +43,8 @@ use object_store::path::Path; use serde_json::Deserializer; use url::Url; +use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName}; + use super::{Action, CommitInfo, Metadata, Protocol}; use crate::kernel::arrow::engine_ext::{ExpressionEvaluatorExt, rb_from_scan_meta}; use crate::kernel::{ARROW_HANDLER, StructType, spawn_blocking_with_span}; @@ -48,6 +52,7 @@ use crate::logstore::{LogStore, LogStoreExt}; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError, PartitionFilter, to_kernel_predicate}; pub use self::log_data::*; +use crate::kernel::size_limits::SnapshotLoadMetrics; pub use iterators::*; pub use scan::*; pub use stream::*; @@ -56,6 +61,7 @@ mod iterators; mod log_data; mod scan; mod serde; +pub mod size_limits; mod stream; pub(crate) static SCAN_ROW_ARROW_SCHEMA: LazyLock = @@ -70,10 +76,13 @@ pub struct Snapshot { config: DeltaTableConfig, /// Logical table schema schema: SchemaRef, + /// Metrics captured during snapshot loading + load_metrics: SnapshotLoadMetrics, } impl Snapshot { pub async fn try_new_with_engine( + log_store: &dyn LogStore, engine: Arc, table_root: Url, config: DeltaTableConfig, @@ -100,6 +109,47 @@ impl Snapshot { } }; + let current_version = snapshot.version() as i64; + + let (snapshot, load_metrics) = if let Some(limiter) = &config.log_size_limiter { + let original_segment = snapshot.log_segment().clone(); + let original_size: u64 = original_segment + .checkpoint_parts + .iter() + .chain(original_segment.ascending_commit_files.iter()) + .map(|p| p.location.size) + .sum(); + + let (truncated_segment, truncation_info) = + limiter.truncate(original_segment, log_store).await?; + let table_configuration = snapshot.table_configuration().clone(); + + let oldest_version = truncated_segment + .ascending_commit_files + .first() + .map(|p| p.version as i64); + + let metrics = if let Some(info) = truncation_info { + SnapshotLoadMetrics::with_truncation( + current_version, + oldest_version, + info.truncated_size, + info.original_size, + info.commits_discarded, + ) + } else { + SnapshotLoadMetrics::no_truncation(current_version, oldest_version, original_size) + }; + + ( + Arc::new(KernelSnapshot::new(truncated_segment, table_configuration)), + metrics, + ) + } else { + let metrics = SnapshotLoadMetrics::from_snapshot(&snapshot); + (snapshot, metrics) + }; + let schema = Arc::new( snapshot .table_configuration() @@ -112,6 +162,7 @@ impl Snapshot { inner: snapshot, config, schema, + load_metrics, }) } @@ -132,7 +183,14 @@ impl Snapshot { table_root.set_path(&format!("{}/", table_root.path())); } - Self::try_new_with_engine(engine, table_root, config, version.map(|v| v as u64)).await + Self::try_new_with_engine( + log_store, + engine, + table_root, + config, + version.map(|v| v as u64), + ) + .await } pub fn scan_builder(&self) -> ScanBuilder { @@ -177,10 +235,28 @@ impl Snapshot { .try_into_arrow()?, ); + // For updates, we don't track truncation metrics since we're building from existing snapshot + let log_segment = snapshot.log_segment(); + let log_size: u64 = log_segment + .checkpoint_parts + .iter() + .chain(log_segment.ascending_commit_files.iter()) + .map(|p| p.location.size) + .sum(); + + let oldest_version = log_segment + .ascending_commit_files + .first() + .map(|p| p.version as i64); + + let load_metrics = + SnapshotLoadMetrics::no_truncation(snapshot.version() as i64, oldest_version, log_size); + Ok(Arc::new(Self { inner: snapshot, schema, config: self.config.clone(), + load_metrics, })) } @@ -213,6 +289,11 @@ impl Snapshot { &self.config } + /// Get the metrics captured during snapshot loading + pub fn load_metrics(&self) -> &SnapshotLoadMetrics { + &self.load_metrics + } + /// Get the table root of the snapshot pub(crate) fn table_root_path(&self) -> DeltaResult { Ok(Path::from_url_path(self.inner.table_root().path())?) @@ -529,6 +610,16 @@ pub(crate) async fn resolve_snapshot( } } +fn read_adds_size(array: &dyn ProvidesColumnByName) -> DeltaResult { + if let Some(arr) = ex::extract_and_cast_opt::(array, "add") { + let size = ex::extract_and_cast::(arr, "size")?; + let sum = sum_array_checked::(size)?.unwrap_or_default(); + Ok(sum as usize) + } else { + Ok(0) + } +} + impl EagerSnapshot { /// Create a new [`EagerSnapshot`] instance pub async fn try_new( @@ -653,6 +744,11 @@ impl EagerSnapshot { self.snapshot.load_config() } + /// Get the metrics captured during snapshot loading + pub fn load_metrics(&self) -> &SnapshotLoadMetrics { + self.snapshot.load_metrics() + } + /// Well known table configuration pub fn table_properties(&self) -> &TableProperties { self.snapshot.table_properties() @@ -667,6 +763,22 @@ impl EagerSnapshot { LogDataHandler::new(&self.files, self.snapshot.table_configuration()) } + /// Get the metadata size in the snapshot + pub fn files_metadata_size(&self) -> usize { + self.files + .iter() + .map(|frb| frb.get_array_memory_size()) + .sum() + } + + /// Get the total size of files in the snapshot + pub fn files_total_size(&self) -> usize { + self.files + .iter() + .map(|frb| read_adds_size(frb).unwrap_or_default()) + .sum() + } + /// Stream the active files in the snapshot /// /// This function returns a stream of [`LogicalFileView`] objects, @@ -789,11 +901,14 @@ mod tests { .as_ref() .try_into_arrow()?; + let load_metrics = SnapshotLoadMetrics::from_snapshot(&snapshot); + Ok(( Self { inner: snapshot, config: Default::default(), schema: Arc::new(schema), + load_metrics, }, log_store, )) diff --git a/crates/core/src/kernel/snapshot/serde.rs b/crates/core/src/kernel/snapshot/serde.rs index 7188b8a11..5d5a14c27 100644 --- a/crates/core/src/kernel/snapshot/serde.rs +++ b/crates/core/src/kernel/snapshot/serde.rs @@ -219,10 +219,13 @@ impl<'de> Visitor<'de> for SnapshotVisitor { .try_into_arrow() .map_err(de::Error::custom)?; + let load_metrics = super::SnapshotLoadMetrics::from_snapshot(&snapshot); + Ok(Snapshot { inner: Arc::new(snapshot), schema: Arc::new(schema), config, + load_metrics, }) } } diff --git a/crates/core/src/kernel/snapshot/size_limits.rs b/crates/core/src/kernel/snapshot/size_limits.rs new file mode 100644 index 000000000..210b81ec7 --- /dev/null +++ b/crates/core/src/kernel/snapshot/size_limits.rs @@ -0,0 +1,1008 @@ +use crate::logstore::{LogStore, LogStoreExt}; +use crate::{DeltaResult, DeltaTableError}; +use delta_kernel::Snapshot; +use delta_kernel::log_segment::LogSegment; +use delta_kernel::path::{LogPathFileType, ParsedLogPath}; +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::convert::identity; +use std::num::{NonZeroU64, NonZeroUsize}; +use std::ops::RangeInclusive; +use strum::Display; +use tracing::{debug, info, trace, warn}; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct SnapshotLoadMetrics { + /// Current version of the snapshot + pub current_version: i64, + + /// Oldest commit version included in the loaded snapshot + /// None if only checkpoint was loaded (no commits) + pub oldest_version_loaded: Option, + + /// Final log segment size in bytes (after truncation if applied) + pub loaded_log_segment_size: u64, + + /// Whether log size limiting was applied during load + pub log_size_limiter_applied: bool, + + /// Original log segment size in bytes (before any truncation) + /// None if no limiter was configured + pub original_log_segment_size: Option, + + /// Number of commits discarded due to truncation + /// None if no truncation occurred + pub num_commits_discarded: Option, +} + +impl Default for SnapshotLoadMetrics { + fn default() -> Self { + Self { + current_version: 0, + oldest_version_loaded: None, + loaded_log_segment_size: 0, + log_size_limiter_applied: false, + original_log_segment_size: None, + num_commits_discarded: None, + } + } +} + +impl SnapshotLoadMetrics { + pub fn from_snapshot(snapshot: &Snapshot) -> Self { + let log_segment = snapshot.log_segment(); + + let log_size: u64 = log_segment + .checkpoint_parts + .iter() + .chain(log_segment.ascending_commit_files.iter()) + .map(|p| p.location.size) + .sum(); + + let oldest_version = log_segment + .ascending_commit_files + .first() + .map(|p| p.version as i64); + + SnapshotLoadMetrics::no_truncation(snapshot.version() as i64, oldest_version, log_size) + } + + pub fn no_truncation(version: i64, oldest_version: Option, log_segment_size: u64) -> Self { + Self { + current_version: version, + oldest_version_loaded: oldest_version, + loaded_log_segment_size: log_segment_size, + log_size_limiter_applied: false, + original_log_segment_size: None, + num_commits_discarded: None, + } + } + + pub fn with_truncation( + version: i64, + oldest_version: Option, + truncated_size: u64, + original_size: u64, + commits_discarded: usize, + ) -> Self { + Self { + current_version: version, + oldest_version_loaded: oldest_version, + loaded_log_segment_size: truncated_size, + log_size_limiter_applied: true, + original_log_segment_size: Some(original_size), + num_commits_discarded: Some(commits_discarded), + } + } + + pub fn was_truncated(&self) -> bool { + self.log_size_limiter_applied && self.num_commits_discarded.is_some() + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct TruncationInfo { + /// Original log segment size in bytes (before truncation) + pub original_size: u64, + /// Final log segment size in bytes (after truncation) + pub truncated_size: u64, + /// Number of commits discarded + pub commits_discarded: usize, +} + +#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum OversizePolicy { + Reject, + /// Skip checkpoints and only load JSON commits. + UseTruncatedCommitLog(NonZeroUsize), +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct LogSizeLimiter { + /// Maximum allowed size in bytes for the total log segment (checkpoint + commit files). + size_limit: NonZeroU64, + oversize_policy: OversizePolicy, +} + +impl LogSizeLimiter { + pub fn new(size_limit: NonZeroU64, oversize_policy: OversizePolicy) -> Self { + Self { + size_limit, + oversize_policy, + } + } + + pub fn try_new(size_limit: u64, truncated_commit_log_size: Option) -> DeltaResult { + let size_limit = NonZeroU64::new(size_limit) + .ok_or_else(|| DeltaTableError::Generic("max_log_bytes must be nonzero".into()))?; + let oversize_policy = if let Some(num_commits) = truncated_commit_log_size { + let num_commits = NonZeroUsize::new(num_commits).ok_or_else(|| { + DeltaTableError::Generic("pseudo_cdf_lookback_count must be nonzero".into()) + })?; + OversizePolicy::UseTruncatedCommitLog(num_commits) + } else { + OversizePolicy::Reject + }; + Ok(Self { + size_limit, + oversize_policy, + }) + } + + pub fn from_storage_options(opts: &mut HashMap) -> DeltaResult> { + let prefix = "log_size_limiter"; + let size_limit_key = &format!("{prefix}.size_limit"); + let use_commit_log_key = &format!("{prefix}.use_truncated_commit_log"); + let num_commits_key = &format!("{prefix}.truncated_commit_log_size"); + + let size_limit: Option = opts.remove(size_limit_key).map(|opt| { + opt.parse().expect(&format!( + "{size_limit_key} must be a positive int; got {opt}" + )) + }); + let use_commit_log: bool = opts + .remove(use_commit_log_key) + .map(|opt| { + opt.parse().expect(&format!( + "{use_commit_log_key} must be a boolean; got {opt}" + )) + }) + .unwrap_or(false); + let num_commits: usize = opts + .remove(num_commits_key) + .map(|opt| { + opt.parse().expect(&format!( + "{num_commits_key} must be a positive int; got {opt}" + )) + }) + .unwrap_or(24); // default number of commits to use when commit log is enabled with no size specified + size_limit + .map(|limit| LogSizeLimiter::try_new(limit, use_commit_log.then_some(num_commits))) + .transpose() + } + + pub(super) async fn truncate( + &self, + log_segment: LogSegment, + log_store: &dyn LogStore, + ) -> DeltaResult<(LogSegment, Option)> { + let total_size: u64 = log_segment + .checkpoint_parts + .iter() + .chain(log_segment.ascending_commit_files.iter()) + .map(|parsed_path| parsed_path.location.size) + .sum(); + let total_size = total_size; + let size_limit = self.size_limit.get(); + let original_commit_count = log_segment.ascending_commit_files.len(); + + if total_size > size_limit { + warn!( + "Log segment size in bytes: {} > {}. Applying policy: {:?}", + total_size, size_limit, self.oversize_policy + ); + trace!("Oversized log segment: {:?}", log_segment); + match &self.oversize_policy { + OversizePolicy::Reject => Err(DeltaTableError::Generic(format!( + r#" + Table log segment size ({} bytes) exceeds maximum allowed size ({} bytes). + Consider increasing the size limit or using an oversize policy other than {}. + "#, + total_size, self.size_limit, self.oversize_policy + ))), + OversizePolicy::UseTruncatedCommitLog(num_commits) => { + let (truncated_segment, truncated_size) = + truncated_commit_log(log_segment, log_store, num_commits, size_limit) + .await?; + let final_commit_count = truncated_segment.ascending_commit_files.len(); + let commits_discarded = + original_commit_count.saturating_sub(final_commit_count); + + let truncation_info = TruncationInfo { + original_size: total_size, + truncated_size, + commits_discarded, + }; + Ok((truncated_segment, Some(truncation_info))) + } + } + } else { + debug!( + "Log segment size ({} bytes) is within the limit of {} bytes", + total_size, size_limit + ); + Ok((log_segment, None)) + } + } +} + +async fn truncated_commit_log( + log_segment: LogSegment, + log_store: &dyn LogStore, + num_commits: &NonZeroUsize, + size_limit: u64, +) -> DeltaResult<(LogSegment, u64)> { + let num_commits = num_commits.get(); + let truncated_log: Vec = if log_segment.ascending_commit_files.len() + < num_commits + { + let segment_version = log_segment.end_version as usize; + let first_missing_version = segment_version.saturating_sub(num_commits - 1); // start from zero if num_commits > segment_version + let last_missing_version = segment_version - log_segment.ascending_commit_files.len(); // cannot overflow + info!( + "Extending the segment commit log with versions {}-{}", + first_missing_version, last_missing_version + ); + let missing_versions = first_missing_version..=last_missing_version; + let previous_commits = list_commit_files(log_store, missing_versions).await?; + previous_commits + .into_iter() + .chain(log_segment.ascending_commit_files) + .collect() + } else { + info!( + "Discarding the last {} entries from the segment commit log", + log_segment.ascending_commit_files.len() - num_commits + ); + log_segment.ascending_commit_files[log_segment.ascending_commit_files.len() - num_commits..] + .to_vec() + }; + let mut truncated_log_size = 0_u64; // keep track of the total size to cut it shorter if needed + let latest_commit_file = truncated_log.last().cloned(); + let final_commits: Vec = truncated_log + .into_iter() + .rev() + .take_while(|file_meta| { + truncated_log_size += file_meta.location.size; + truncated_log_size <= size_limit + }) + .collect::>() + .into_iter() + .rev() + .collect(); + + // Calculate the actual final size + let final_size: u64 = final_commits.iter().map(|f| f.location.size).sum(); + + let segment = LogSegment { + end_version: log_segment.end_version, + ascending_commit_files: final_commits, + checkpoint_parts: vec![], + ascending_compaction_files: vec![], + log_root: log_store.log_root_url(), + checkpoint_version: None, + latest_crc_file: None, + latest_commit_file, + }; + + Ok((segment, final_size)) +} + +async fn list_commit_files( + log_store: &dyn LogStore, + version_range: RangeInclusive, +) -> DeltaResult> { + let log_path = log_store.log_root_url(); + let lower_bound = log_path + .join(&format!("{:020}", version_range.start())) + .map_err(|_| DeltaTableError::InvalidTableLocation(log_path.to_string()))?; + let upper_bound = log_path + .join(&format!("{:020}", version_range.end() + 1)) + .map_err(|_| DeltaTableError::InvalidTableLocation(log_path.to_string()))?; + let commit_files = log_store + .engine(None) + .storage_handler() + .list_from(&lower_bound)? + .map(|meta| ParsedLogPath::try_from(meta?)) + // TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files? + .filter_map_ok(identity) + .take_while(move |path_res| match path_res { + Ok(path) => version_range.end() >= &(path.version as usize), + Err(_) => true, + }) + .filter_ok(|log_file| matches!(log_file.file_type, LogPathFileType::Commit)) + .try_collect()?; + Ok(commit_files) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::DeltaTableBuilder; + use delta_kernel::Version; + use test_doubles::*; + + async fn create_log_segment( + log_store: &TestLogStore, + version: Option, + ) -> DeltaResult { + let storage = log_store.engine(None).storage_handler(); + let log_root = log_store.log_root_url(); + Ok(LogSegment::for_snapshot( + storage.as_ref(), + log_root, + Vec::new(), + version, + None, + None, + )?) + } + + #[test] + fn test_serde() -> DeltaResult<()> { + let json = r#"{ + "size_limit": 10055, + "oversize_policy": "reject" + }"#; + assert_eq!( + serde_json::from_str::(json)?, + LogSizeLimiter::new(NonZeroU64::new(10055).unwrap(), OversizePolicy::Reject,) + ); + + let json = r#"{ + "size_limit": 10055, + "oversize_policy": { + "use_truncated_commit_log": 100 + } + }"#; + assert_eq!( + serde_json::from_str::(json)?, + LogSizeLimiter::new( + NonZeroU64::new(10055).unwrap(), + OversizePolicy::UseTruncatedCommitLog(NonZeroUsize::new(100).unwrap()), + ) + ); + + Ok(()) + } + + #[test] + fn test_from_storage_opts() -> DeltaResult<()> { + assert_eq!( + LogSizeLimiter::from_storage_options(&mut HashMap::new())?, + None + ); + let mut opts = HashMap::from([ + ("log_size_limiter.size_limit".into(), "10".into()), + ( + "log_size_limiter.use_truncated_commit_log".into(), + "false".into(), + ), + ( + "log_size_limiter.truncated_commit_log_size".into(), + "5".into(), + ), // should be ignored + ("test".into(), "1".into()), + ]); + assert_eq!( + LogSizeLimiter::from_storage_options(&mut opts)?, + Some(LogSizeLimiter::new( + NonZeroU64::new(10).unwrap(), + OversizePolicy::Reject + )) + ); + assert_eq!(opts.len(), 1); + assert!(opts.contains_key("test")); + Ok(()) + } + + #[test] + fn test_storage_opts_propagation() -> DeltaResult<()> { + let url = "memory:///" + .parse() + .map_err(|e: url::ParseError| DeltaTableError::Generic(e.to_string()))?; + let table = DeltaTableBuilder::from_url(url)? + .with_storage_options(HashMap::from([ + ("log_size_limiter.size_limit".into(), "10".into()), + ( + "log_size_limiter.use_truncated_commit_log".into(), + "true".into(), + ), + ( + "log_size_limiter.truncated_commit_log_size".into(), + "5".into(), + ), + ])) + .build()?; + assert_eq!( + table + .config + .log_size_limiter + .expect("LogSizeLimiter should be set"), + LogSizeLimiter::new( + NonZeroU64::new(10).unwrap(), + OversizePolicy::UseTruncatedCommitLog(NonZeroUsize::new(5).unwrap()) + ) + ); + + Ok(()) + } + + #[tokio::test] + async fn test_noop_within_limits() -> DeltaResult<()> { + let log_store = TestLogStore::new( + CommitRange(0..=100), + CheckpointCadence(10), + CommitFsize(100), + CheckpointFsize(3000), + ); + let limiter = LogSizeLimiter::new(NonZeroU64::new(5000).unwrap(), OversizePolicy::Reject); + let segment = create_log_segment(&log_store, None).await?; + assert_segment_with_checkpoint(&segment, 90, 10); + // total size < size limit + let (truncated_segment, truncation_info) = + limiter.truncate(segment.clone(), &log_store).await?; + assert_eq!(truncated_segment, segment); + assert_eq!(truncation_info, None); + + Ok(()) + } + + #[tokio::test] + async fn test_reject_policy() -> DeltaResult<()> { + let log_store = TestLogStore::new( + CommitRange(0..=100), + CheckpointCadence(10), + CommitFsize(100), + CheckpointFsize(3000), + ); + let limiter = LogSizeLimiter::new(NonZeroU64::new(2500).unwrap(), OversizePolicy::Reject); + let segment = create_log_segment(&log_store, None).await?; + assert_segment_with_checkpoint(&segment, 90, 10); + let result = limiter.truncate(segment, &log_store).await; + + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("exceeds maximum allowed size")); + assert!( + error_msg.contains("4000 bytes"), + "`{}` does not contain '4000 bytes'", + error_msg + ); + assert!( + error_msg.contains("2500 bytes"), + "`{}` does not contain '2500 bytes'", + error_msg + ); + + Ok(()) + } + + #[tokio::test] + async fn test_commit_log_truncation_with_regular_delta_log() -> DeltaResult<()> { + let log_store = TestLogStore::new( + CommitRange(0..=100), + CheckpointCadence(5), + CommitFsize(10), + CheckpointFsize(1000), + ); + let limiter = LogSizeLimiter::new( + NonZeroU64::new(500).unwrap(), // smaller than the checkpoint size, can fit 50 commits + OversizePolicy::UseTruncatedCommitLog(NonZeroUsize::new(10).unwrap()), + ); + + let segment = create_log_segment(&log_store, Some(25)).await?; + assert_segment_with_checkpoint(&segment, 25, 0); + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 16..=25); + + let segment = create_log_segment(&log_store, Some(7)).await?; + assert_segment_with_checkpoint(&segment, 5, 2); + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 0..=7); + + let segment = create_log_segment(&log_store, Some(19)).await?; + assert_segment_with_checkpoint(&segment, 15, 4); + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 10..=19); + + Ok(()) + } + + #[tokio::test] + async fn test_commit_log_truncation_with_no_checkpoints_in_log() -> DeltaResult<()> { + let log_store = TestLogStore::new( + CommitRange(0..=100), + CheckpointCadence(200), + CommitFsize(10), + CheckpointFsize(1000), + ); + let limiter = LogSizeLimiter::new( + NonZeroU64::new(500).unwrap(), // smaller than the checkpoint size, can fit 50 commits + OversizePolicy::UseTruncatedCommitLog(NonZeroUsize::new(10).unwrap()), + ); + + let segment = create_log_segment(&log_store, Some(30)).await?; + assert_segment_with_commits_only(&segment, 0..=30); + // size limit not exceeded: 31 commits * 10 bytes < 500 bytes, segment not truncated + let (truncated_segment, truncation_info) = + limiter.truncate(segment.clone(), &log_store).await?; + assert_eq!(truncated_segment, segment); + + let segment = create_log_segment(&log_store, Some(75)).await?; + assert_segment_with_commits_only(&segment, 0..=75); + // size limit exceeded: 75 commits * 10 bytes > 500 bytes; keeps the last 10 commits + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 66..=75); + + Ok(()) + } + + #[tokio::test] + async fn test_commit_log_truncation_with_vacuumed_log() -> DeltaResult<()> { + let log_store = TestLogStore::new( + CommitRange(30..=150), + CheckpointCadence(25), + CommitFsize(10), + CheckpointFsize(1000), + ); + let limiter = LogSizeLimiter::new( + NonZeroU64::new(500).unwrap(), // smaller than the checkpoint size, can fit 50 commits + OversizePolicy::UseTruncatedCommitLog(NonZeroUsize::new(50).unwrap()), + ); + + let segment = create_log_segment(&log_store, Some(70)).await?; + assert_segment_with_checkpoint(&segment, 50, 20); + // less than 50 commits available in the vacuumed store + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 30..=70); + + Ok(()) + } + + #[tokio::test] + async fn test_truncated_log_gets_cut_off_to_enforce_size_limit() -> DeltaResult<()> { + let log_store = TestLogStore::new( + CommitRange(30..=150), + CheckpointCadence(25), + CommitFsize(10), + CheckpointFsize(1000), + ); + let limiter = LogSizeLimiter::new( + NonZeroU64::new(500).unwrap(), // smaller than the checkpoint size, can fit 50 commits + OversizePolicy::UseTruncatedCommitLog(NonZeroUsize::new(100).unwrap()), // go back 100 commits + ); + + let segment = create_log_segment(&log_store, None).await?; + assert_segment_with_checkpoint(&segment, 125, 25); + // only loads 50 commits instead of the configured 100 to stay within the size limit + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 101..=150); + + Ok(()) + } + + #[tokio::test] + async fn test_compacted_json_files_are_ignored() -> DeltaResult<()> { + let extra_files = vec![format!("{:020}.{:020}.compacted.json", 15, 19)]; + let log_store = TestLogStore::new( + CommitRange(0..=100), + CheckpointCadence(10), + CommitFsize(10), + CheckpointFsize(1000), + ) + .with_additional_files(extra_files, 200); + let limiter = LogSizeLimiter::new( + NonZeroU64::new(500).unwrap(), // smaller than the checkpoint size, can fit 50 commits + OversizePolicy::UseTruncatedCommitLog(NonZeroUsize::new(20).unwrap()), // go back 100 commits + ); + + let segment = create_log_segment(&log_store, Some(23)).await?; + assert_segment_with_checkpoint(&segment, 20, 3); + let (truncated_segment, truncation_info) = limiter.truncate(segment, &log_store).await?; + assert_segment_with_commits_only(&truncated_segment, 4..=23); + Ok(()) + } + + fn commit_file_name(version: Version) -> String { + format!("{:020}.json", version) + } + + fn checkpoint_file_name(version: Version) -> String { + format!("{:020}.checkpoint.parquet", version) + } + + fn extract_file_names<'a>( + stored_objects: impl IntoIterator, + ) -> Vec { + stored_objects + .into_iter() + .filter_map(|parsed_path| { + let path_str = parsed_path.location.location.path(); + path_str.split('/').last().map(ToString::to_string) + }) + .collect() + } + + fn assert_segment_with_checkpoint( + segment: &LogSegment, + checkpoint_version: Version, + num_subsequent_commits: u64, + ) { + assert_eq!( + segment.end_version, + checkpoint_version + num_subsequent_commits + ); + assert_eq!( + extract_file_names(&segment.checkpoint_parts), + vec![checkpoint_file_name(checkpoint_version)], + ); + assert_eq!( + extract_file_names(&segment.ascending_commit_files), + (checkpoint_version + 1..=checkpoint_version + num_subsequent_commits) + .map(|v| commit_file_name(v as u64)) + .collect::>(), + ); + } + + fn assert_segment_with_commits_only(log_segment: &LogSegment, versions: RangeInclusive) { + assert_eq!(log_segment.end_version, *versions.end() as u64); + assert_eq!(log_segment.checkpoint_parts, vec![]); + assert_eq!( + extract_file_names(&log_segment.ascending_commit_files), + versions + .map(|v| commit_file_name(v as u64)) + .collect::>(), + ); + } + + mod test_doubles { + use super::*; + use crate::DeltaResult; + use crate::kernel::transaction::TransactionError; + use crate::logstore::{ + CommitOrBytes, LogStore, LogStoreConfig, LogStoreExt, object_store_path, + }; + use async_trait::async_trait; + use bytes::Bytes; + use futures::stream; + use futures::stream::BoxStream; + use object_store::path::Path; + use object_store::{ + GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, + }; + use std::ops::RangeInclusive; + use std::sync::Arc; + use url::Url; + use uuid::Uuid; + + // substitute for named arguments to make the test code self documenting + pub(super) struct CommitRange(pub(super) RangeInclusive); + pub(super) struct CheckpointCadence(pub(super) usize); + pub(super) struct CommitFsize(pub(super) u64); + pub(super) struct CheckpointFsize(pub(super) u64); + + #[derive(Debug, Clone)] + pub(super) struct TestLogStore { + config: LogStoreConfig, + files: Vec, + } + + impl TestLogStore { + /// Commit files are generated to span the entire `CommitRange`, and checkpoints are + /// created according to the configured `CheckpointCadence`, starting from 0 (exclusive) + /// up to the last version in the store (also exclusive) and only for versions + /// that are also in the `CommitRange`. + /// E.g. commits: 15 up to 100, cadence: 10 => checkpoints at versions 20, 30, ..., 90 + pub(super) fn new( + commit_range: CommitRange, + checkpoint_cadence: CheckpointCadence, + commit_fsize: CommitFsize, + checkpoint_fsize: CheckpointFsize, + ) -> Self { + // get rid of the self-documenting superfluous types + let commit_range = commit_range.0; + let checkpoint_cadence = checkpoint_cadence.0; + let commit_fsize = commit_fsize.0; + let checkpoint_fsize = checkpoint_fsize.0; + + let mut store = TestLogStore { + config: LogStoreConfig::new( + &Url::parse("memory://test/delta_table").unwrap(), + Default::default(), + ), + files: vec![], + }; + let path = object_store_path(&store.log_root_url()).unwrap(); + let commit_files = commit_range + .clone() + .map(|v| commit_file_name(v as u64)) + .map(|f| obj_meta(path.child(f), commit_fsize)); + let checkpoint_files = (0..*commit_range.end()) + .skip(checkpoint_cadence) + .step_by(checkpoint_cadence) + .filter(|version| commit_range.contains(version)) + .map(|v| checkpoint_file_name(v as u64)) + .map(|f| obj_meta(path.child(f), checkpoint_fsize)); + let mut files = commit_files.chain(checkpoint_files).collect::>(); + files.sort_unstable_by(|a, b| a.location.cmp(&b.location)); + // files.shuffle(&mut thread_rng()); // no order guarantees for store listing + store.files = files; + store + } + + pub(super) fn with_additional_files(mut self, fnames: Vec, fsize: u64) -> Self { + let log_path = object_store_path(&self.log_root_url()).unwrap(); + let mut files: Vec = fnames + .into_iter() + .map(|fname| obj_meta(log_path.child(fname), fsize)) + .collect(); + self.files.append(&mut files); + self.files + .sort_unstable_by(|a, b| a.location.cmp(&b.location)); + self + } + } + + #[async_trait] + impl LogStore for TestLogStore { + fn name(&self) -> String { + "TestLogStore".to_string() + } + + async fn read_commit_entry(&self, _version: i64) -> DeltaResult> { + unimplemented!("TestLogStore::read_commit_entry not implemented for tests") + } + + async fn write_commit_entry( + &self, + _version: i64, + _commit_or_bytes: CommitOrBytes, + _operation_id: Uuid, + ) -> Result<(), TransactionError> { + unimplemented!("TestLogStore::write_commit_entry not implemented for tests") + } + + async fn abort_commit_entry( + &self, + _version: i64, + _commit_or_bytes: CommitOrBytes, + _operation_id: Uuid, + ) -> Result<(), TransactionError> { + unimplemented!("TestLogStore::abort_commit_entry not implemented for tests") + } + + async fn get_latest_version(&self, _start_version: i64) -> DeltaResult { + unimplemented!("TestLogStore::get_latest_version not implemented for tests") + } + + fn object_store(&self, operation_id: Option) -> Arc { + self.root_object_store(operation_id) + } + + fn root_object_store(&self, _operation_id: Option) -> Arc { + Arc::new(self.clone()) + } + + fn config(&self) -> &LogStoreConfig { + &self.config + } + } + + impl std::fmt::Display for TestLogStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.name()) + } + } + + #[async_trait] + impl ObjectStore for TestLogStore { + async fn put_opts( + &self, + _location: &Path, + _bytes: PutPayload, + _options: PutOptions, + ) -> ObjectStoreResult { + unimplemented!("TestLogStore::put_opts not implemented for tests") + } + + async fn put_multipart_opts( + &self, + _location: &Path, + _opts: PutMultipartOpts, + ) -> ObjectStoreResult> { + unimplemented!("TestLogStore::put_multipart_opts not implemented for tests") + } + + async fn get_opts( + &self, + location: &Path, + _options: GetOptions, + ) -> ObjectStoreResult { + self.files + .iter() + .find(|obj_meta| obj_meta.location == *location) + .map(|obj_meta| GetResult { + payload: GetResultPayload::Stream(Box::pin(futures::stream::once(async { + Ok(Bytes::new()) + }))), + meta: obj_meta.clone(), + range: 0..obj_meta.size, + attributes: Default::default(), + }) + .ok_or_else(|| object_store::Error::NotFound { + path: location.to_string(), + source: Box::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + "Not found", + )), + }) + } + + async fn delete(&self, _location: &Path) -> ObjectStoreResult<()> { + unimplemented!("TestLogStore::delete not implemented for tests") + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'static, ObjectStoreResult> { + let log_path = object_store_path(&self.log_root_url()); + // Be more permissive - return files if prefix is None or matches the log path + if prefix.is_none() || prefix == log_path.ok().as_ref() { + Box::pin(stream::iter(self.files.clone().into_iter().map(Ok))) + } else { + Box::pin(stream::empty()) + } + } + + async fn list_with_delimiter( + &self, + _prefix: Option<&Path>, + ) -> ObjectStoreResult { + unimplemented!("TestLogStore::list_with_delimiter not implemented for tests") + } + + async fn copy(&self, _from: &Path, _to: &Path) -> ObjectStoreResult<()> { + unimplemented!("TestLogStore::copy not implemented for tests") + } + + async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> ObjectStoreResult<()> { + unimplemented!("TestLogStore::copy_if_not_exists not implemented for tests") + } + } + + fn obj_meta(path: impl Into, size: u64) -> ObjectMeta { + ObjectMeta { + location: path.into(), + size, + last_modified: "2025-07-18T15:30:00Z".parse().unwrap(), + e_tag: None, + version: None, + } + } + + fn parsed_log_path(path: impl Into, size: u64) -> ParsedLogPath { + let path = path.into(); + let dummy_url = Url::parse("memory://test/").unwrap(); + let file_url = dummy_url.join(path.as_ref()).unwrap(); + let parsed_url_path = ParsedLogPath::try_from(file_url).unwrap().unwrap(); + // Convert to FileMeta-based ParsedLogPath + let file_meta = delta_kernel::FileMeta { + location: parsed_url_path.location, + last_modified: 1752852600000, + size, + }; + ParsedLogPath::try_from(file_meta).unwrap().unwrap() + } + + #[tokio::test] + async fn test_fake_log_store() -> DeltaResult<()> { + let log_store = TestLogStore::new( + CommitRange(2..=97), + CheckpointCadence(10), + CommitFsize(128), + CheckpointFsize(1024), + ); + + // before the first checkpoint + let segment = create_log_segment(&log_store, Some(5)).await?; + assert_segment_with_commits_only(&segment, 2..=5); + assert_eq!( + segment, + LogSegment { + end_version: 5, + ascending_commit_files: vec![ + parsed_log_path("delta_table/_delta_log/00000000000000000002.json", 128), + parsed_log_path("delta_table/_delta_log/00000000000000000003.json", 128), + parsed_log_path("delta_table/_delta_log/00000000000000000004.json", 128), + parsed_log_path("delta_table/_delta_log/00000000000000000005.json", 128), + ], + checkpoint_parts: vec![], + ascending_compaction_files: vec![], + log_root: log_store.log_root_url(), + checkpoint_version: None, + latest_crc_file: None, + latest_commit_file: Some(parsed_log_path( + "delta_table/_delta_log/00000000000000000005.json", + 128 + )), + } + ); + + // with checkpoint + let segment = create_log_segment(&log_store, Some(32)).await?; + assert_segment_with_checkpoint(&segment, 30, 2); + assert_eq!( + segment, + LogSegment { + end_version: 32, + ascending_commit_files: vec![ + parsed_log_path("delta_table/_delta_log/00000000000000000031.json", 128), + parsed_log_path("delta_table/_delta_log/00000000000000000032.json", 128), + ], + checkpoint_parts: vec![parsed_log_path( + "delta_table/_delta_log/00000000000000000030.checkpoint.parquet", + 1024 + ),], + ascending_compaction_files: vec![], + log_root: log_store.log_root_url(), + checkpoint_version: Some(30), + latest_crc_file: None, + latest_commit_file: Some(parsed_log_path( + "delta_table/_delta_log/00000000000000000032.json", + 128 + )), + } + ); + + // latest version + let segment = create_log_segment(&log_store, None).await?; + assert_segment_with_checkpoint(&segment, 90, 7); + assert_eq!( + segment, + LogSegment { + end_version: 97, + ascending_commit_files: vec![ + parsed_log_path("delta_table/_delta_log/00000000000000000091.json", 128), + parsed_log_path("delta_table/_delta_log/00000000000000000092.json", 128), + parsed_log_path("delta_table/_delta_log/00000000000000000093.json", 128), + parsed_log_path("delta_table/_delta_log/00000000000000000094.json", 128), + parsed_log_path("delta_table/_delta_log/00000000000000000095.json", 128), + parsed_log_path("delta_table/_delta_log/00000000000000000096.json", 128), + parsed_log_path("delta_table/_delta_log/00000000000000000097.json", 128), + ], + checkpoint_parts: vec![parsed_log_path( + "delta_table/_delta_log/00000000000000000090.checkpoint.parquet", + 1024 + ),], + ascending_compaction_files: vec![], + log_root: log_store.log_root_url(), + checkpoint_version: Some(90), + latest_crc_file: None, + latest_commit_file: Some(parsed_log_path( + "delta_table/_delta_log/00000000000000000097.json", + 128 + ),), + } + ); + + Ok(()) + } + } +} diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 7f13f7fab..f0abc0cf7 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -498,7 +498,7 @@ pub(crate) fn get_engine(store: Arc) -> Arc { } #[cfg(feature = "datafusion")] -fn object_store_url(location: &Url) -> ObjectStoreUrl { +pub fn object_store_url(location: &Url) -> ObjectStoreUrl { use object_store::path::DELIMITER; // azure storage urls encode the container as user in the url diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 0b281df80..0535ea8c2 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -12,6 +12,7 @@ use tracing::debug; use url::Url; use super::normalize_table_url; +use crate::kernel::size_limits::LogSizeLimiter; use crate::logstore::storage::IORuntime; use crate::logstore::{LogStoreRef, StorageConfig, object_store_factories}; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -56,6 +57,13 @@ pub struct DeltaTableConfig { #[delta(skip)] /// When a runtime handler is provided, all IO tasks are spawn in that handle pub io_runtime: Option, + + #[delta(skip)] + /// options to pass down to store + pub options: HashMap, + + #[delta(skip)] + pub log_size_limiter: Option, } impl Default for DeltaTableConfig { @@ -65,6 +73,8 @@ impl Default for DeltaTableConfig { log_buffer_size: num_cpus::get() * 4, log_batch_size: 1024, io_runtime: None, + options: HashMap::new(), + log_size_limiter: None, } } } @@ -74,6 +84,7 @@ impl PartialEq for DeltaTableConfig { self.require_files == other.require_files && self.log_buffer_size == other.log_buffer_size && self.log_batch_size == other.log_batch_size + && self.log_size_limiter == other.log_size_limiter } } @@ -118,9 +129,10 @@ impl DeltaTableBuilder { } debug!("creating table builder with {table_url}"); + let actual_table_url = parse_table_uri(&table_url)?; Ok(Self { - table_url, + table_url: actual_table_url, storage_backend: None, version: DeltaVersion::default(), storage_options: None, @@ -135,6 +147,12 @@ impl DeltaTableBuilder { self } + /// Sets `log_size_limiter` to the builder + pub fn with_log_size_limiter(mut self, limiter: LogSizeLimiter) -> Self { + self.table_config.log_size_limiter = Some(limiter); + self + } + /// Sets `version` to the builder pub fn with_version(mut self, version: i64) -> Self { self.version = DeltaVersion::Version(version); @@ -201,6 +219,14 @@ impl DeltaTableBuilder { storage_options .clone() .into_iter() + .map(|(k, v)| { + ( + k.strip_prefix("deltalake.") + .map(ToString::to_string) + .unwrap_or(k), + v, + ) + }) .map(|(k, v)| { let needs_trim = v.starts_with("http://") || v.starts_with("https://") @@ -213,6 +239,12 @@ impl DeltaTableBuilder { }) .collect(), ); + let mut opts = self.storage_options.unwrap().clone(); + self.table_config.log_size_limiter = LogSizeLimiter::from_storage_options(&mut opts) + .expect("Invalid log_size_limiter options"); + + self.storage_options = Some(opts); + self } @@ -245,6 +277,7 @@ impl DeltaTableBuilder { /// Build a delta storage backend for the given config pub fn build_storage(&self) -> DeltaResult { debug!("build_storage() with {}", self.table_url); + let location = self.table_url.clone(); let mut storage_config = StorageConfig::parse_options(self.storage_options())?; if let Some(io_runtime) = self.table_config.io_runtime.clone() { @@ -253,14 +286,16 @@ impl DeltaTableBuilder { if let Some((store, _url)) = self.storage_backend.as_ref() { debug!("Loading a logstore with a custom store: {store:?}"); - crate::logstore::logstore_with(store.clone(), &self.table_url, storage_config) + crate::logstore::logstore_with(store.clone(), &location, storage_config) } else { // If there has been no backend defined just default to the normal logstore look up - debug!( - "Loading a logstore based off the location: {:?}", - self.table_url - ); - crate::logstore::logstore_for(&self.table_url, storage_config) + // debug!( + // "Loading a logstore based off the location: {:?}", + // self.table_url + // ); + // crate::logstore::logstore_for(&self.table_url, storage_config) + debug!("Loading a logstore based off the location: {location:?}"); + crate::logstore::logstore_for(&location, storage_config) } } @@ -269,7 +304,11 @@ impl DeltaTableBuilder { /// This will not load the log, i.e. the table is not initialized. To get an initialized /// table use the `load` function pub fn build(self) -> DeltaResult { - Ok(DeltaTable::new(self.build_storage()?, self.table_config)) + let log_store = self.build_storage()?; + let mut config = (self).table_config.clone(); + config.options = self.storage_options.clone().unwrap_or_default(); + + Ok(DeltaTable::new(self.build_storage()?, config)) } /// Build the [`DeltaTable`] and load its state @@ -571,6 +610,13 @@ mod tests { assert_eq!(expected.as_str(), url.as_str()); } + #[test] + fn test_invalid_uri() { + // Urls should round trips as-is + DeltaTableBuilder::from_url(Url::parse("this://is.nonsense").unwrap()) + .expect_err("this should be an error"); + } + #[test] fn test_writer_storage_opts_url_trim() { let cases = [ diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index d32a422f6..cc2518a43 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -32,6 +32,7 @@ pub mod state; mod columns; // Re-exposing for backwards compatibility +use crate::kernel::size_limits::SnapshotLoadMetrics; pub use columns::*; /// In memory representation of a Delta Table @@ -150,6 +151,19 @@ impl DeltaTable { } } + /// Update the current [`DeltaTable`] with an updated [`LogStore`] + /// + /// NOTE: This is for advanced users and allows swapping settings like AZURE_PROXY_URL + /// before passing the table to Datafusion, allowing for example to conditionally change or + /// remove proxy usage for metadata and data + pub fn with_new_store(&self, log_store: LogStoreRef) -> Self { + Self { + state: self.state.clone(), + log_store, + config: self.config.clone(), + } + } + /// get a shared reference to the delta object store pub fn object_store(&self) -> ObjectStoreRef { self.log_store.object_store(None) @@ -345,6 +359,18 @@ impl DeltaTable { self.state.as_ref().ok_or(DeltaTableError::NotInitialized) } + /// Returns the metrics captured during snapshot loading. + /// + /// This method provides access to information about how the snapshot was loaded, + /// including whether log size limiting was applied and if truncation occurred. + /// + /// ## Returns + /// + /// A reference to the snapshot load metrics if the table has been loaded, `None` otherwise. + pub fn snapshot_load_metrics(&self) -> Option<&SnapshotLoadMetrics> { + self.state.as_ref().map(|s| s.snapshot().load_metrics()) + } + /// Time travel Delta table to the latest version that's created at or before provided /// `datetime` argument. /// diff --git a/crates/core/src/test_utils/mod.rs b/crates/core/src/test_utils/mod.rs index a30fc850a..91af0918c 100644 --- a/crates/core/src/test_utils/mod.rs +++ b/crates/core/src/test_utils/mod.rs @@ -56,6 +56,7 @@ pub enum TestTables { Checkpoints, LatestNotCheckpointed, WithDvSmall, + WithColumnMapping, Custom(String), } @@ -72,6 +73,7 @@ impl TestTables { Self::Checkpoints => data_path.join("checkpoints"), Self::LatestNotCheckpointed => data_path.join("latest_not_checkpointed"), Self::WithDvSmall => data_path.join("table-with-dv-small"), + Self::WithColumnMapping => data_path.join("table_with_column_mapping"), // the data path for upload does not apply to custom tables. Self::Custom(_) => todo!(), } @@ -88,6 +90,7 @@ impl TestTables { Self::Checkpoints => "checkpoints".into(), Self::LatestNotCheckpointed => "latest_not_checkpointed".into(), Self::WithDvSmall => "table-with-dv-small".into(), + Self::WithColumnMapping => "table_with_column_mapping".into(), Self::Custom(name) => name.to_owned(), } } diff --git a/crates/core/tests/data/deep/_ACP_DATE=2026-01-01/_ACP_BATCHID=b1/part-00000-dd885b7b-9bff-4a7d-b866-abb97c953ac9-c000.snappy.parquet b/crates/core/tests/data/deep/_ACP_DATE=2026-01-01/_ACP_BATCHID=b1/part-00000-dd885b7b-9bff-4a7d-b866-abb97c953ac9-c000.snappy.parquet new file mode 100644 index 000000000..13737d9a9 Binary files /dev/null and b/crates/core/tests/data/deep/_ACP_DATE=2026-01-01/_ACP_BATCHID=b1/part-00000-dd885b7b-9bff-4a7d-b866-abb97c953ac9-c000.snappy.parquet differ diff --git a/crates/core/tests/data/deep/_delta_log/00000000000000000000.json b/crates/core/tests/data/deep/_delta_log/00000000000000000000.json new file mode 100644 index 000000000..78b334973 --- /dev/null +++ b/crates/core/tests/data/deep/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"3c5a399a-65d7-47ac-a792-e9525322f73e","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"_acp_system_metadata\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"acp_sourceBatchId\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"commitBatchId\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"productListItems\",\"type\":{\"type\":\"array\",\"elementType\":{\"type\":\"struct\",\"fields\":[{\"name\":\"SKU\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"quantity\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"priceTotal\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"_experience\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"analytics\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"customDimensions\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"eVars\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"evar1\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"evar2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"events\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"event1\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"event2\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"_ACP_DATE\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"_ACP_BATCHID\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["_ACP_DATE","_ACP_BATCHID"],"createdTime":1770389266405,"configuration":{}}} +{"add":{"path":"_ACP_DATE=2026-01-01/_ACP_BATCHID=b1/part-00000-dd885b7b-9bff-4a7d-b866-abb97c953ac9-c000.snappy.parquet","partitionValues":{"_ACP_BATCHID":"b1","_ACP_DATE":"2026-01-01"},"size":4305,"modificationTime":1770389266454,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"_acp_system_metadata\":{\"acp_sourceBatchId\":\"b1\",\"commitBatchId\":\"b1\"},\"_id\":\"id1\"},\"maxValues\":{\"_acp_system_metadata\":{\"acp_sourceBatchId\":\"b1\",\"commitBatchId\":\"b1\"},\"_id\":\"id1\"},\"nullCount\":{\"_acp_system_metadata\":{\"acp_sourceBatchId\":0,\"commitBatchId\":0},\"_id\":0}}","tags":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1770389266455,"operation":"WRITE","operationParameters":{"partitionBy":"[\"_ACP_DATE\",\"_ACP_BATCHID\"]","mode":"Overwrite"},"engineInfo":"delta-rs:py-1.3.1","operationMetrics":{"execution_time_ms":52,"num_added_files":1,"num_added_rows":1,"num_partitions":0,"num_removed_files":0},"clientVersion":"delta-rs.py-1.3.1"}} \ No newline at end of file diff --git a/crates/core/tests/data/hstack_nullable_difference/_ACP_DATE=2026-02-16/_ACP_BATCH_ID=b1/part-00000-26d1988d-7e8d-4cbb-8575-1008884b3df5-c000.snappy.parquet b/crates/core/tests/data/hstack_nullable_difference/_ACP_DATE=2026-02-16/_ACP_BATCH_ID=b1/part-00000-26d1988d-7e8d-4cbb-8575-1008884b3df5-c000.snappy.parquet new file mode 100644 index 000000000..386d831cc Binary files /dev/null and b/crates/core/tests/data/hstack_nullable_difference/_ACP_DATE=2026-02-16/_ACP_BATCH_ID=b1/part-00000-26d1988d-7e8d-4cbb-8575-1008884b3df5-c000.snappy.parquet differ diff --git a/crates/core/tests/data/hstack_nullable_difference/_delta_log/00000000000000000000.json b/crates/core/tests/data/hstack_nullable_difference/_delta_log/00000000000000000000.json new file mode 100644 index 000000000..52c979fa6 --- /dev/null +++ b/crates/core/tests/data/hstack_nullable_difference/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}} +{"metaData":{"id":"898c9da5-4a43-48cb-ac42-3946be8a86cc","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"_acp_system_metadata\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"acp_sourceBatchId\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"commitBatchId\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ingestTime\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"isDeleted\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"rowId\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"rowVersion\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"trackingId\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"orderData\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"productList\",\"type\":{\"type\":\"array\",\"elementType\":{\"type\":\"struct\",\"fields\":[{\"name\":\"SKU\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"quantity\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"priceTotal\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}}]},\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"_ACP_BATCHID\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"_ACP_DATE\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"_ACP_BATCH_ID\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["_ACP_DATE","_ACP_BATCH_ID"],"createdTime":1771242914810,"configuration":{}}} +{"add":{"path":"_ACP_DATE=2026-02-16/_ACP_BATCH_ID=b1/part-00000-26d1988d-7e8d-4cbb-8575-1008884b3df5-c000.snappy.parquet","partitionValues":{"_ACP_DATE":"2026-02-16","_ACP_BATCH_ID":"b1"},"size":4564,"modificationTime":1771242914852,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"orderData\":{\"date\":\"2025-01-01T13:00:00Z\"},\"_ACP_BATCHID\":\"batch1\",\"_acp_system_metadata\":{\"acp_sourceBatchId\":\"batch1\",\"commitBatchId\":\"batch1\",\"isDeleted\":false,\"ingestTime\":1,\"rowId\":\"row1\",\"rowVersion\":1,\"trackingId\":\"t1\"},\"_id\":\"id1\"},\"maxValues\":{\"_ACP_BATCHID\":\"batch1\",\"_acp_system_metadata\":{\"trackingId\":\"t1\",\"isDeleted\":false,\"commitBatchId\":\"batch1\",\"ingestTime\":1,\"rowId\":\"row1\",\"rowVersion\":1,\"acp_sourceBatchId\":\"batch1\"},\"orderData\":{\"date\":\"2025-01-01T13:00:00Z\"},\"_id\":\"id1\"},\"nullCount\":{\"_id\":0,\"_acp_system_metadata\":{\"acp_sourceBatchId\":0,\"rowId\":0,\"rowVersion\":0,\"trackingId\":0,\"isDeleted\":0,\"ingestTime\":0,\"commitBatchId\":0},\"_ACP_BATCHID\":0,\"orderData\":{\"date\":0}}}","tags":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1771242914853,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[\"_ACP_DATE\",\"_ACP_BATCH_ID\"]"},"engineInfo":"delta-rs:py-1.3.1","operationMetrics":{"execution_time_ms":44,"num_added_files":1,"num_added_rows":1,"num_partitions":0,"num_removed_files":0},"clientVersion":"delta-rs.py-1.3.1"}} diff --git a/crates/core/tests/datafusion_table_provider.rs b/crates/core/tests/datafusion_table_provider.rs index 281c46c8a..3535db3a1 100644 --- a/crates/core/tests/datafusion_table_provider.rs +++ b/crates/core/tests/datafusion_table_provider.rs @@ -9,8 +9,11 @@ use deltalake_core::delta_datafusion::DeltaScanNext; use deltalake_core::delta_datafusion::create_session; use deltalake_core::delta_datafusion::engine::DataFusionEngine; use deltalake_core::kernel::Snapshot; +use deltalake_core::logstore::default_logstore; use deltalake_test::TestResult; use deltalake_test::acceptance::read_dat_case; +use object_store::local::LocalFileSystem; +use url::Url; async fn scan_dat(case: &str) -> TestResult<(Snapshot, SessionContext)> { let root_dir = format!( @@ -24,9 +27,22 @@ async fn scan_dat(case: &str) -> TestResult<(Snapshot, SessionContext)> { let session = create_session().into_inner(); let engine = DataFusionEngine::new_from_session(&session.state()); - let snapshot = - Snapshot::try_new_with_engine(engine.clone(), case.table_root()?, Default::default(), None) - .await?; + let file_store = LocalFileSystem::new_with_prefix(case.root_dir()).unwrap(); + let log_store = default_logstore( + Arc::new(file_store), + Arc::new(LocalFileSystem::new()), + &Url::from_file_path(case.root_dir()).unwrap(), + &Default::default(), + ); + + let snapshot = Snapshot::try_new_with_engine( + log_store.as_ref(), + engine.clone(), + case.table_root()?, + Default::default(), + None, + ) + .await?; Ok((snapshot, session)) } diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index ce36650bf..37c4a0671 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -2189,3 +2189,316 @@ mod insert_into_tests { Ok(()) } } + +mod deep { + use arrow_cast::display::FormatOptions; + use arrow_cast::pretty; + use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; + use datafusion::datasource::physical_plan::ParquetSource; + use datafusion::optimizer::optimize_projections_deep::DeepColumnIndexMap; + use datafusion::physical_plan::{ExecutionPlan, collect, displayable}; + use datafusion::prelude::{SessionConfig, SessionContext}; + use datafusion_datasource::file::FileSource; + use datafusion_datasource::file_scan_config::FileScanConfig; + use datafusion_datasource::source::DataSourceExec; + use datafusion_proto::physical_plan::{ + AsExecutionPlan, ComposedPhysicalExtensionCodec, DefaultPhysicalExtensionCodec, + }; + use datafusion_proto::protobuf::PhysicalPlanNode; + use deltalake_core::delta_datafusion::table_provider_old::DeltaTableOldProvider; + use deltalake_core::delta_datafusion::udtf::register_delta_table_udtf; + use deltalake_core::delta_datafusion::{ + DeltaNextPhysicalCodec, DeltaPhysicalCodec, DeltaScanExec, + }; + use prost::Message; + use std::collections::HashMap; + use std::ops::Deref; + use std::sync::Arc; + use tracing::info; + + #[allow(clippy::collapsible_if)] + fn extract_projection_deep_from_plan( + plan: Arc, + ) -> Vec> { + let mut deep_projections: Vec> = vec![]; + let _ = plan.apply(|pp| { + if let Some(dse) = pp.as_any().downcast_ref::() { + if let Some(data_source_file_scan_config) = + dse.data_source().as_any().downcast_ref::() + { + if let Some(pqs) = data_source_file_scan_config + .file_source + .as_any() + .downcast_ref::() + { + if let Some(projection) = pqs.projection() { + deep_projections.push(projection.projection_deep.clone()); + } + } + } + } + Ok(TreeNodeRecursion::Continue) + }); + deep_projections + } + + #[tokio::test] + async fn test_hstack_deep_column_pruning() -> datafusion::common::Result<()> { + let filter = tracing_subscriber::EnvFilter::from_default_env(); + let subscriber = tracing_subscriber::fmt() + .pretty() + .with_env_filter(filter) + .finish(); + tracing::subscriber::set_global_default(subscriber).ok(); + pretty_env_logger::try_init().ok(); + + let config = SessionConfig::new() + .set_bool("datafusion.sql_parser.enable_ident_normalization", false); + + let ctx = SessionContext::new_with_config(config); + + register_delta_table_udtf(&ctx, None, None); + + let delta_path = format!("{}/tests/data/deep", env!("CARGO_MANIFEST_DIR")); + + let query = format!( + r#" + select + t1._id, t1.productListItems['SKU'], _ACP_DATE + from + delta_table('file://{}') as t1 + "#, + delta_path + ); + + let plan = ctx + .state() + .create_logical_plan(&query) + .await + .expect("Error creating logical plan"); + let optimized_plan = ctx.state().optimize(&plan).expect("Error optimizing plan"); + let state = ctx.state(); + let query_planner = state.query_planner().clone(); + let physical_plan = query_planner + .create_physical_plan(&optimized_plan, &state) + .await + .expect("Error creating physical plan"); + info!( + "Physical plan: {}", + displayable(physical_plan.deref()) + .set_show_schema(true) + .indent(true) + ); + let proj1 = extract_projection_deep_from_plan(physical_plan.clone()); + let batches1 = collect(physical_plan.clone(), ctx.state().task_ctx()).await?; + let results1 = + pretty::pretty_format_batches_with_options(&batches1, &FormatOptions::default())? + .to_string(); + println!("{}", results1); + + // codec + let codec = ComposedPhysicalExtensionCodec::new(vec![ + Arc::new(DefaultPhysicalExtensionCodec {}), + Arc::new(DeltaPhysicalCodec {}), + ]); + let proto = + PhysicalPlanNode::try_from_physical_plan(physical_plan.clone(), &codec).unwrap(); + let bytes = proto.encode_to_vec(); + let plan_after_serde = PhysicalPlanNode::try_decode(&bytes) + .expect("Error try_decode") + .try_into_physical_plan(&ctx.task_ctx(), &codec) + .expect("try_into_physical_plan"); + info!( + "Physical plan after serde: {}", + displayable(plan_after_serde.deref()) + .set_show_schema(true) + .indent(true) + ); + + let proj2 = extract_projection_deep_from_plan(plan_after_serde.clone()); + let batches2 = collect(plan_after_serde.clone(), ctx.state().task_ctx()).await?; + let results2 = + pretty::pretty_format_batches_with_options(&batches2, &FormatOptions::default())? + .to_string(); + println!("{}", results2); + + assert_eq!(results1, results2, "Batches not equal !"); + println!("proj1: {:?}", proj1); + println!("proj2: {:?}", proj2); + + assert_eq!(proj1, proj2, "Deep Projection not equal !"); + + Ok(()) + } + + #[tokio::test] + async fn test_hstack_nullable_new() -> datafusion::common::Result<()> { + let filter = tracing_subscriber::EnvFilter::from_default_env(); + let subscriber = tracing_subscriber::fmt() + .pretty() + .with_env_filter(filter) + .finish(); + tracing::subscriber::set_global_default(subscriber).ok(); + pretty_env_logger::try_init().ok(); + + let config = SessionConfig::new() + .set_bool("datafusion.sql_parser.enable_ident_normalization", false) + .set_bool( + "datafusion.execution.parquet.schema_force_view_types", + false, + ); + + let ctx = SessionContext::new_with_config(config); + + register_delta_table_udtf(&ctx, None, None); + + let delta_path = format!( + "{}/tests/data/hstack_nullable_difference", + env!("CARGO_MANIFEST_DIR") + ); + + let query = format!( + r#" + select + * + from + delta_table('file://{}') as t1 + "#, + delta_path + ); + + let plan = ctx + .state() + .create_logical_plan(&query) + .await + .expect("Error creating logical plan"); + let optimized_plan = ctx.state().optimize(&plan).expect("Error optimizing plan"); + let state = ctx.state(); + let query_planner = state.query_planner().clone(); + let physical_plan = query_planner + .create_physical_plan(&optimized_plan, &state) + .await + .expect("Error creating physical plan"); + info!( + "Physical plan: {}", + displayable(physical_plan.deref()) + .set_show_schema(true) + .indent(true) + ); + let proj1 = extract_projection_deep_from_plan(physical_plan.clone()); + let batches1 = collect(physical_plan.clone(), ctx.state().task_ctx()).await?; + let results1 = + pretty::pretty_format_batches_with_options(&batches1, &FormatOptions::default())? + .to_string(); + println!("{}", results1); + + Ok(()) + } + + #[tokio::test] + async fn test_hstack_deep_column_pruning_next_codec() -> datafusion::common::Result<()> { + let filter = tracing_subscriber::EnvFilter::from_default_env(); + let subscriber = tracing_subscriber::fmt() + .pretty() + .with_env_filter(filter) + .finish(); + tracing::subscriber::set_global_default(subscriber).ok(); + pretty_env_logger::try_init().ok(); + + let config = SessionConfig::new() + .set_bool("datafusion.sql_parser.enable_ident_normalization", false); + + let ctx = SessionContext::new_with_config(config); + + register_delta_table_udtf(&ctx, None, None); + + let delta_path = format!("{}/tests/data/deep", env!("CARGO_MANIFEST_DIR")); + + let query = format!( + r#" + select + t1._id, t1.productListItems['SKU'], _ACP_DATE + from + delta_table_next('file://{}', 'key1', 'val1') as t1 + "#, + delta_path + ); + + let plan = ctx + .state() + .create_logical_plan(&query) + .await + .expect("Error creating logical plan"); + let optimized_plan = ctx.state().optimize(&plan).expect("Error optimizing plan"); + let state = ctx.state(); + let query_planner = state.query_planner().clone(); + let physical_plan = query_planner + .create_physical_plan(&optimized_plan, &state) + .await + .expect("Error creating physical plan"); + info!( + "Physical plan: {}", + displayable(physical_plan.deref()) + .set_show_schema(true) + .indent(true) + ); + let proj1 = extract_projection_deep_from_plan(physical_plan.clone()); + let batches1 = collect(physical_plan.clone(), ctx.state().task_ctx()).await?; + let results1 = + pretty::pretty_format_batches_with_options(&batches1, &FormatOptions::default())? + .to_string(); + println!("{}", results1); + + // codec + let codec = ComposedPhysicalExtensionCodec::new(vec![ + Arc::new(DefaultPhysicalExtensionCodec {}), + Arc::new(DeltaNextPhysicalCodec {}), + ]); + + let proto = + PhysicalPlanNode::try_from_physical_plan(physical_plan.clone(), &codec).unwrap(); + let bytes = proto.encode_to_vec(); + let plan_after_serde = PhysicalPlanNode::try_decode(&bytes) + .expect("Error try_decode") + .try_into_physical_plan(&ctx.task_ctx(), &codec) + .expect("try_into_physical_plan"); + info!( + "Physical plan after serde: {}", + displayable(plan_after_serde.deref()) + .set_show_schema(true) + .indent(true) + ); + let delta_scan = + find_exec_node::(&plan_after_serde).expect("Error finding exec"); + assert_eq!( + delta_scan.options(), + &HashMap::from([("key1".to_string(), "val1".to_string())]) + ); + + let proj2 = extract_projection_deep_from_plan(plan_after_serde.clone()); + let batches2 = collect(plan_after_serde.clone(), ctx.state().task_ctx()).await?; + let results2 = + pretty::pretty_format_batches_with_options(&batches2, &FormatOptions::default())? + .to_string(); + println!("{}", results2); + + assert_eq!(results1, results2, "Batches not equal !"); + println!("proj1: {:?}", proj1); + println!("proj2: {:?}", proj2); + + assert_eq!(proj1, proj2, "Deep Projection not equal !"); + + Ok(()) + } + + fn find_exec_node(input: &Arc) -> Option<&T> { + if let Some(found) = input.as_any().downcast_ref::() { + Some(found) + } else { + input + .children() + .iter() + .find_map(|child| find_exec_node(child)) + } + } +}