Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
08e9cc8
[HSTACK] Datafusion main Cargo.toml patch
adragomir Feb 6, 2026
f9b972d
[HSTACK] Expose DeltaScan struct properties
adragomir Feb 9, 2026
3072c92
[HSTACK] REEVALUATE - bring back old table provider
adragomir Feb 6, 2026
c04fb84
[HSTACK] Add support for deep projections
adragomir Feb 6, 2026
fd8dd1e
[HSTACK] Add support for deep projections TEST
adragomir Feb 6, 2026
88bf075
[HSTACK] - propagate creds for object store in distributed exec
adragomir Feb 6, 2026
c01f9b3
[HSTACK][DF] Fix schema mapping for record batches with an empty sche…
adragomir Feb 6, 2026
93669c8
[HSTACK] - fix crash on missing statistics
aditanase Feb 6, 2026
fdbafe6
[HSTACK] Log segment size limits
ccciudatu Feb 6, 2026
4737bea
[HSTACK] - expose log metadata and total size to support weighted cac…
aditanase Feb 6, 2026
63b3800
[HSTACK] - add support for log_store reconfigure
aditanase Feb 6, 2026
43b0425
[HSTACK] FIX table constructor that does not
adragomir Feb 6, 2026
4ae4110
[HSTACK] Add delta table udtf and register function
adragomir Feb 6, 2026
a06ca45
[HSTACK] Expose metrics field so we can create a DeltaScan externally…
adragomir Feb 6, 2026
069e10d
[HSTACK] Implement physical codec for "next" version of ExecutionPlan…
ccciudatu Feb 16, 2026
b80cbca
[HSTACK] - revert lazy use of file_views in scan builder
aditanase Feb 15, 2026
f2145d9
[HSTACK] - optimize push down limit (don't collect all files)
aditanase Feb 15, 2026
d669b75
[HSTACK] - optimize scan builder, avoid parsing stats (double json se…
aditanase Feb 17, 2026
d47563c
[HSTACK] Register log store on next::DeltaScan::scan_with_args
ccciudatu Feb 17, 2026
8d4773a
[HSTACK] Add option to modify the low-level Delta schema to make some…
adragomir Feb 19, 2026
6ee6756
feat: expose snapshot load metrics in order to detect if log truncati…
cdobre Feb 16, 2026
cc5585c
chore: reformat files with `cargo fmt --all`
cdobre Feb 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ datafusion-proto = { version = "52.1.0" }
serde = { version = "1.0.194", features = ["derive"] }
serde_json = "1"
strum = { version = "0.27" }
prost = { version = "0.14.3" }

# "stdlib"
bytes = { version = "1" }
Expand Down Expand Up @@ -155,3 +156,39 @@ inherits = "release"
opt-level = 3
codegen-units = 1
lto = "thin"

[patch.crates-io]
datafusion = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-catalog = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-catalog-listing = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-common = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-common-runtime = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-datasource = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-datasource-avro = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-datasource-csv = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-datasource-json = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-datasource-parquet = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-execution = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-expr = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-expr-common = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-ffi = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-functions = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-functions-aggregate = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-functions-aggregate-common = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-functions-nested = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-functions-table = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-functions-window = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-functions-window-common = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-optimizer = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-physical-expr = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-physical-expr-adapter = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-physical-expr-common = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-physical-optimizer = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-pruning = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-physical-plan = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-proto = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-proto-common = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-session = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-spark = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-sql = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
datafusion-substrait = { git = 'https://github.com/hstack/datafusion.git', branch = 'main' }
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ datafusion-proto = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
strum = { workspace = true }
prost = { workspace = true }

# "stdlib"
bytes = { workspace = true }
Expand Down Expand Up @@ -87,6 +88,7 @@ rand = "0.8"
sqlparser = { version = "0.59.0" }
humantime = { version = "2.1.0", optional = true }
validator = { version = "0.19", features = ["derive"] }
ctor = "0.6"

[dev-dependencies]
criterion = "0.5"
Expand Down
38 changes: 24 additions & 14 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@
use std::fmt::Debug;
use std::sync::Arc;

use crate::delta_datafusion::expr::parse_predicate_expression;
use crate::delta_datafusion::table_provider::DeltaScanWire;
use crate::ensure_table_uri;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, EagerSnapshot, LogDataHandler, Snapshot};
use crate::table::state::DeltaTableState;
use crate::{open_table, open_table_with_storage_options};
use arrow::array::types::UInt16Type;
use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
use arrow_cast::{CastOptions, cast_with_options};
use arrow_schema::{
DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef,
TimeUnit,
DataType as ArrowDataType, DataType, Field, Schema as ArrowSchema, SchemaRef,
SchemaRef as ArrowSchemaRef, TimeUnit,
};
use datafusion::catalog::{Session, TableProviderFactory};
use datafusion::common::scalar::ScalarValue;
Expand All @@ -51,14 +58,7 @@ use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use either::Either;

use crate::delta_datafusion::expr::parse_predicate_expression;
use crate::delta_datafusion::table_provider::DeltaScanWire;
use crate::ensure_table_uri;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, EagerSnapshot, LogDataHandler, Snapshot};
use crate::table::state::DeltaTableState;
use crate::{open_table, open_table_with_storage_options};
use tracing::info;

pub(crate) use self::session::DeltaSessionExt;
pub use self::session::{
Expand All @@ -74,7 +74,7 @@ pub(crate) use data_validation::{
pub(crate) use find_files::*;
pub use table_provider::{
DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder,
next::DeltaScanExec,
next::{DeltaNextPhysicalCodec, DeltaScanExec},
};
pub(crate) use table_provider::{
DeltaScanBuilder, next::FILE_ID_COLUMN_DEFAULT, update_datafusion_session,
Expand All @@ -92,9 +92,14 @@ pub mod logical;
pub mod physical;
pub mod planner;
mod session;
use crate::delta_datafusion::schema_null::rewrite_schema_with_nullable_fields;
pub use session::SessionFallbackPolicy;
pub(crate) use session::{SessionResolveContext, resolve_session_state};

mod schema_null;
mod table_provider;
pub mod table_provider_old;
pub mod udtf;
pub(crate) mod utils;

impl From<DeltaTableError> for DataFusionError {
Expand Down Expand Up @@ -226,7 +231,7 @@ fn _arrow_schema(
partition_columns: &[String],
wrap_partitions: bool,
) -> ArrowSchemaRef {
let fields = schema
let mut fields = schema
.fields()
.into_iter()
.filter(|f| !partition_columns.contains(&f.name().to_string()))
Expand Down Expand Up @@ -255,7 +260,11 @@ fn _arrow_schema(
}),
)
.collect::<Vec<_>>();
Arc::new(ArrowSchema::new(fields))

let mut schema = Arc::new(ArrowSchema::new(fields));
// @Hstack - add the option to have an env var that can nullify fields in the delta schema
schema = rewrite_schema_with_nullable_fields(schema);
schema
}

pub(crate) fn files_matching_predicate<'a>(
Expand Down Expand Up @@ -1383,9 +1392,10 @@ mod tests {
assert_eq!("a", small.iter().next().unwrap().unwrap());

let expected = vec![
ObjectStoreOperation::Get(LocationType::Commit),
ObjectStoreOperation::GetRange(LocationType::Data, 957..965),
ObjectStoreOperation::GetRange(LocationType::Data, 326..957),
#[expect(clippy::single_range_in_vec_init)]
ObjectStoreOperation::GetRanges(LocationType::Data, vec![4..46]),
];
let mut actual = Vec::new();
operations.recv_many(&mut actual, 3).await;
Expand Down
Loading
Loading