Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ impl ExecutionPlan for DeltaScan {
/// The logical schema for a Deltatable is different from the protocol level schema since partition
/// columns must appear at the end of the schema. This is to align with how partition are handled
/// at the physical level
fn df_logical_schema(
pub(crate) fn df_logical_schema(
snapshot: &EagerSnapshot,
file_column_name: &Option<String>,
schema: Option<SchemaRef>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ impl DeltaScanConfig {
/// such as dictionary encoding of partition columns or
/// view types.
pub(crate) fn table_schema(&self, table_config: &TableConfiguration) -> Result<SchemaRef> {
if let Some(schema) = &self.schema {
return Ok(schema.clone());
Comment thread
cdobre marked this conversation as resolved.
}
let table_schema: Schema = table_config.schema().as_ref().try_into_arrow()?;
self.physical_arrow_schema(table_config, &table_schema)
}
Expand Down
25 changes: 20 additions & 5 deletions crates/core/src/delta_datafusion/table_provider_old.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;
use arrow_schema::Schema;
use arrow_schema::{Schema, SchemaRef};
use datafusion::catalog::{ScanArgs, ScanResult, Session, TableProvider};
use datafusion::common::{Result, Statistics};
use datafusion::datasource::TableType;
Expand All @@ -11,7 +11,7 @@ use datafusion::logical_expr::utils::conjunction;
use datafusion::physical_plan::ExecutionPlan;
use url::Url;
use crate::delta_datafusion::{DataFusionMixins, DeltaScanBuilder, DeltaScanConfigBuilder};
use crate::delta_datafusion::table_provider::get_pushdown_filters;
use crate::delta_datafusion::table_provider::{df_logical_schema, get_pushdown_filters};
use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError};
use crate::logstore::LogStoreRef;
use crate::table::state::DeltaTableState;
Expand All @@ -38,6 +38,8 @@ pub struct DeltaTableOldProvider {
pub config: DeltaTableConfig,
/// log store
pub(crate) log_store: LogStoreRef,
/// Optional schema override for scanning
pub(crate) schema: Option<SchemaRef>,
}

impl DeltaTableOldProvider {
Expand All @@ -47,14 +49,19 @@ impl DeltaTableOldProvider {
pub fn log_store(&self) -> LogStoreRef {
self.log_store.clone()
}
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}
}

impl From<DeltaTable> for DeltaTableOldProvider {
fn from(value: DeltaTable) -> Self {
Self {
state: value.state.clone(),
config: value.config.clone(),
log_store: value.log_store.clone()
log_store: value.log_store.clone(),
schema: None,
}
}
}
Expand All @@ -66,7 +73,15 @@ impl TableProvider for DeltaTableOldProvider {
}

fn schema(&self) -> Arc<Schema> {
self.snapshot().unwrap().snapshot().read_schema()
match &self.schema {
Some(s) => df_logical_schema(
self.snapshot().unwrap().snapshot(),
&None,
Some(s.clone()),
)
.unwrap_or_else(|_| s.clone()),
None => self.snapshot().unwrap().snapshot().read_schema(),
}
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -101,7 +116,7 @@ impl TableProvider for DeltaTableOldProvider {
file_column_name: None,
wrap_partition_values: None,
enable_parquet_pushdown: true,
schema: None,
schema: self.schema.clone(),
};

let config = config
Expand Down
Loading