From 42c874051e08b7a174e6fde06ca8b5d943d8efd2 Mon Sep 17 00:00:00 2001 From: Catalin Dobre Date: Tue, 24 Mar 2026 00:49:58 +0200 Subject: [PATCH] [HSTACK] feat: support custom logical schemas passed through DeltaScanConfig or through DeltaTableOldProvider --- .../src/delta_datafusion/table_provider.rs | 2 +- .../table_provider/next/scan/plan.rs | 3 +++ .../delta_datafusion/table_provider_old.rs | 25 +++++++++++++++---- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index 4e1473cf2..05f0dcf49 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -1162,7 +1162,7 @@ impl ExecutionPlan for DeltaScan { /// The logical schema for a Deltatable is different from the protocol level schema since partition /// columns must appear at the end of the schema. This is to align with how partition are handled /// at the physical level -fn df_logical_schema( +pub(crate) fn df_logical_schema( snapshot: &EagerSnapshot, file_column_name: &Option, schema: Option, diff --git a/crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs b/crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs index 92363e57d..050ffa086 100644 --- a/crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs +++ b/crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs @@ -235,6 +235,9 @@ impl DeltaScanConfig { /// such as dictionary encoding of partition columns or /// view types. pub(crate) fn table_schema(&self, table_config: &TableConfiguration) -> Result { + if let Some(schema) = &self.schema { + return Ok(schema.clone()); + } let table_schema: Schema = table_config.schema().as_ref().try_into_arrow()?; self.physical_arrow_schema(table_config, &table_schema) } diff --git a/crates/core/src/delta_datafusion/table_provider_old.rs b/crates/core/src/delta_datafusion/table_provider_old.rs index ae4513f01..5e472b97c 100644 --- a/crates/core/src/delta_datafusion/table_provider_old.rs +++ b/crates/core/src/delta_datafusion/table_provider_old.rs @@ -1,7 +1,7 @@ use std::any::Any; use std::borrow::Cow; use std::sync::Arc; -use arrow_schema::Schema; +use arrow_schema::{Schema, SchemaRef}; use datafusion::catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion::common::{Result, Statistics}; use datafusion::datasource::TableType; @@ -11,7 +11,7 @@ use datafusion::logical_expr::utils::conjunction; use datafusion::physical_plan::ExecutionPlan; use url::Url; use crate::delta_datafusion::{DataFusionMixins, DeltaScanBuilder, DeltaScanConfigBuilder}; -use crate::delta_datafusion::table_provider::get_pushdown_filters; +use crate::delta_datafusion::table_provider::{df_logical_schema, get_pushdown_filters}; use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError}; use crate::logstore::LogStoreRef; use crate::table::state::DeltaTableState; @@ -38,6 +38,8 @@ pub struct DeltaTableOldProvider { pub config: DeltaTableConfig, /// log store pub(crate) log_store: LogStoreRef, + /// Optional schema override for scanning + pub(crate) schema: Option, } impl DeltaTableOldProvider { @@ -47,6 +49,10 @@ impl DeltaTableOldProvider { pub fn log_store(&self) -> LogStoreRef { self.log_store.clone() } + pub fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = Some(schema); + self + } } impl From for DeltaTableOldProvider { @@ -54,7 +60,8 @@ impl From for DeltaTableOldProvider { Self { state: value.state.clone(), config: value.config.clone(), - log_store: value.log_store.clone() + log_store: value.log_store.clone(), + schema: None, } } } @@ -66,7 +73,15 @@ impl TableProvider for DeltaTableOldProvider { } fn schema(&self) -> Arc { - self.snapshot().unwrap().snapshot().read_schema() + match &self.schema { + Some(s) => df_logical_schema( + self.snapshot().unwrap().snapshot(), + &None, + Some(s.clone()), + ) + .unwrap_or_else(|_| s.clone()), + None => self.snapshot().unwrap().snapshot().read_schema(), + } } fn table_type(&self) -> TableType { @@ -101,7 +116,7 @@ impl TableProvider for DeltaTableOldProvider { file_column_name: None, wrap_partition_values: None, enable_parquet_pushdown: true, - schema: None, + schema: self.schema.clone(), }; let config = config