diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 1a8c9767fbed..1df5f0d410b2 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::TreeNodeRecursion; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr_common::metrics::MetricsSet; use datafusion_physical_plan::{ @@ -36,6 +36,7 @@ use crate::execution::FFI_TaskContext; use crate::physical_expr::metrics::FFI_MetricsSet; use crate::plan_properties::FFI_PlanProperties; use crate::record_batch_stream::FFI_RecordBatchStream; +use crate::statistics::{deserialize_statistics, serialize_statistics}; use crate::util::{FFI_Option, FFI_Result}; use crate::{df_result, sresult, sresult_return}; @@ -74,6 +75,15 @@ pub struct FFI_ExecutionPlan { /// underlying [`ExecutionPlan::metrics`] returned `None`. pub metrics: unsafe extern "C" fn(plan: &Self) -> FFI_Option, + /// Snapshot partition statistics. `partition == None` corresponds to + /// statistics over all partitions; `Some(idx)` corresponds to a specific + /// partition. The returned bytes are a prost-encoded + /// `datafusion_proto_common::Statistics`. + pub partition_statistics: unsafe extern "C" fn( + plan: &Self, + partition: FFI_Option, + ) -> FFI_Result>, + /// Used to create a clone on the provider of the execution plan. This should /// only need to be called by the receiver of the plan. pub clone: unsafe extern "C" fn(plan: &Self) -> Self, @@ -195,6 +205,17 @@ unsafe extern "C" fn metrics_fn_wrapper( .into() } +unsafe extern "C" fn partition_statistics_fn_wrapper( + plan: &FFI_ExecutionPlan, + partition: FFI_Option, +) -> FFI_Result> { + let partition: Option = Option::::from(partition).map(|p| p as usize); + plan.inner() + .partition_statistics(partition) + .map(|stats| serialize_statistics(stats.as_ref()).into_iter().collect()) + .into() +} + unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) { unsafe { debug_assert!(!plan.private_data.is_null()); @@ -287,6 +308,7 @@ impl FFI_ExecutionPlan { execute: execute_fn_wrapper, repartitioned: repartitioned_fn_wrapper, metrics: metrics_fn_wrapper, + partition_statistics: partition_statistics_fn_wrapper, clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, @@ -454,10 +476,24 @@ impl ExecutionPlan for ForeignExecutionPlan { unsafe { (self.plan.metrics)(&self.plan) }.into(); ffi.map(MetricsSet::from) } + + fn partition_statistics(&self, partition: Option) -> Result> { + let bytes = df_result!(unsafe { + (self.plan.partition_statistics)( + &self.plan, + partition.map(|p| p as u64).into(), + ) + })?; + Ok(Arc::new(deserialize_statistics(bytes.as_slice())?)) + } } #[cfg(any(test, feature = "integration-tests"))] pub mod tests { + #[cfg(test)] + use datafusion_common::stats::Precision; + #[cfg(test)] + use datafusion_common::{ColumnStatistics, ScalarValue}; use datafusion_physical_plan::Partitioning; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -468,6 +504,7 @@ pub mod tests { props: Arc, children: Vec>, metrics: Option, + statistics: Option, } impl EmptyExec { @@ -481,6 +518,7 @@ pub mod tests { )), children: Vec::default(), metrics: None, + statistics: None, } } @@ -488,6 +526,11 @@ pub mod tests { self.metrics = Some(metrics); self } + + pub fn with_statistics(mut self, statistics: Statistics) -> Self { + self.statistics = Some(statistics); + self + } } impl DisplayAs for EmptyExec { @@ -521,6 +564,7 @@ pub mod tests { props: Arc::clone(&self.props), children, metrics: self.metrics.clone(), + statistics: self.statistics.clone(), })) } @@ -536,6 +580,15 @@ pub mod tests { self.metrics.clone() } + fn partition_statistics( + &self, + _partition: Option, + ) -> Result> { + Ok(Arc::new(self.statistics.clone().unwrap_or_else(|| { + Statistics::new_unknown(self.props.eq_properties.schema()) + }))) + } + fn apply_expressions( &self, f: &mut dyn FnMut( @@ -659,6 +712,51 @@ pub mod tests { Ok(()) } + #[test] + fn test_ffi_execution_plan_partition_statistics_round_trip() -> Result<()> { + let schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Int32, true), + ])); + + // Plans without explicit statistics return Statistics::new_unknown across + // the boundary. + let bare_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let mut bare_local = FFI_ExecutionPlan::new(bare_plan, None); + bare_local.library_marker_id = crate::mock_foreign_marker_id; + let bare_foreign: Arc = (&bare_local).try_into()?; + let bare_stats = bare_foreign.partition_statistics(None)?; + assert_eq!(bare_stats.as_ref(), &Statistics::new_unknown(&schema)); + + // Plans with statistics round-trip them faithfully, including + // ScalarValue-typed min/max. + let original_stats = Statistics { + num_rows: Precision::Exact(7), + total_byte_size: Precision::Inexact(128), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(1), + max_value: Precision::Exact(ScalarValue::Int32(Some(10))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-3))), + sum_value: Precision::Absent, + distinct_count: Precision::Inexact(6), + byte_size: Precision::Exact(28), + }], + }; + let stats_plan = Arc::new( + EmptyExec::new(Arc::clone(&schema)).with_statistics(original_stats.clone()), + ); + let mut stats_local = FFI_ExecutionPlan::new(stats_plan, None); + stats_local.library_marker_id = crate::mock_foreign_marker_id; + let stats_foreign: Arc = (&stats_local).try_into()?; + + let observed = stats_foreign.partition_statistics(None)?; + assert_eq!(observed.as_ref(), &original_stats); + + let observed_partition = stats_foreign.partition_statistics(Some(1))?; + assert_eq!(observed_partition.as_ref(), &original_stats); + + Ok(()) + } + #[test] fn test_ffi_execution_plan_local_bypass() { let schema = Arc::new(arrow::datatypes::Schema::new(vec![ diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index de3caf8c1726..4df6c4b570f3 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -41,6 +41,7 @@ pub mod proto; pub mod record_batch_stream; pub mod schema_provider; pub mod session; +pub mod statistics; pub mod table_provider; pub mod table_provider_factory; pub mod table_source; diff --git a/datafusion/ffi/src/statistics.rs b/datafusion/ffi/src/statistics.rs new file mode 100644 index 000000000000..019a8b2f22fa --- /dev/null +++ b/datafusion/ffi/src/statistics.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Helpers for moving [`Statistics`] across the FFI boundary as prost-encoded +//! `datafusion_proto_common::Statistics` bytes. +//! +//! [`Statistics`] contains [`Precision`] for column min/max/sum, +//! and `ScalarValue` is a large enum that's impractical to mirror in +//! `#[repr(C)]`. The proto round-trip already exists in `datafusion-proto-common` +//! and is the same pattern used to ship filter expressions across the FFI +//! boundary, so we reuse it here. +//! +//! [`Precision`]: datafusion_common::stats::Precision + +use datafusion_common::{DataFusionError, Result, Statistics}; +use prost::Message; + +/// Serialize [`Statistics`] to prost-encoded +/// `datafusion_proto_common::Statistics` bytes. +pub(crate) fn serialize_statistics(stats: &Statistics) -> Vec { + datafusion_proto_common::Statistics::from(stats).encode_to_vec() +} + +/// Decode prost-encoded `datafusion_proto_common::Statistics` bytes back into +/// [`Statistics`]. +pub(crate) fn deserialize_statistics(bytes: &[u8]) -> Result { + let proto = datafusion_proto_common::Statistics::decode(bytes).map_err(|e| { + DataFusionError::Plan(format!("failed to decode Statistics: {e}")) + })?; + Statistics::try_from(&proto) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::ScalarValue; + use datafusion_common::stats::Precision; + use datafusion_common::{ColumnStatistics, Statistics}; + + use super::*; + + #[test] + fn round_trip_unknown_statistics() { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let original = Statistics::new_unknown(&Arc::new(schema)); + + let bytes = serialize_statistics(&original); + let observed = deserialize_statistics(&bytes).expect("decode"); + + assert_eq!(observed, original); + } + + #[test] + fn round_trip_exact_statistics_with_scalar_values() { + let original = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(4096), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(2), + max_value: Precision::Exact(ScalarValue::Int32(Some(50))), + min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(1234))), + distinct_count: Precision::Exact(40), + byte_size: Precision::Exact(800), + }, + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Utf8(Some( + "zebra".to_string(), + ))), + min_value: Precision::Exact(ScalarValue::Utf8(Some( + "ant".to_string(), + ))), + sum_value: Precision::Absent, + distinct_count: Precision::Inexact(95), + byte_size: Precision::Inexact(2048), + }, + ], + }; + + let bytes = serialize_statistics(&original); + let observed = deserialize_statistics(&bytes).expect("decode"); + + assert_eq!(observed, original); + } + + #[test] + fn round_trip_mixed_precision() { + let original = Statistics { + num_rows: Precision::Inexact(42), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Inexact(ScalarValue::Float64(Some(1.5))), + min_value: Precision::Absent, + sum_value: Precision::Inexact(ScalarValue::Float64(Some(63.0))), + distinct_count: Precision::Absent, + byte_size: Precision::Absent, + }], + }; + + let bytes = serialize_statistics(&original); + let observed = deserialize_statistics(&bytes).expect("decode"); + + assert_eq!(observed, original); + } +} diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index b6c077526c5f..b23f22f13dcb 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -22,6 +22,7 @@ use arrow::datatypes::SchemaRef; use async_ffi::{FfiFuture, FutureExt}; use async_trait::async_trait; use datafusion_catalog::{Session, TableProvider}; +use datafusion_common::Statistics; use datafusion_common::error::{DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_expr::dml::InsertOp; @@ -44,6 +45,7 @@ use crate::arrow_wrappers::WrappedSchema; use crate::execution::FFI_TaskContextProvider; use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec; use crate::session::{FFI_SessionRef, ForeignSession}; +use crate::statistics::{deserialize_statistics, serialize_statistics}; use crate::table_source::{FFI_TableProviderFilterPushDown, FFI_TableType}; use crate::util::{FFI_Option, FFI_Result}; use crate::{df_result, sresult_return}; @@ -133,6 +135,13 @@ pub struct FFI_TableProvider { insert_op: FFI_InsertOp, ) -> FfiFuture>, + /// Snapshot the provider's table-level statistics. The inner + /// [`FFI_Option::None`] corresponds to [`TableProvider::statistics`] + /// returning `None`; `Some(bytes)` is a prost-encoded + /// `datafusion_proto_common::Statistics`. + pub statistics: + unsafe extern "C" fn(provider: &Self) -> FFI_Result>>, + pub logical_codec: FFI_LogicalExtensionCodec, /// Used to create a clone on the provider of the execution plan. This should @@ -179,6 +188,16 @@ unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedS provider.inner().schema().into() } +unsafe extern "C" fn statistics_fn_wrapper( + provider: &FFI_TableProvider, +) -> FFI_Result>> { + let serialized: Option> = provider + .inner() + .statistics() + .map(|s| serialize_statistics(&s).into_iter().collect()); + FFI_Result::Ok(serialized.into()) +} + unsafe extern "C" fn table_type_fn_wrapper( provider: &FFI_TableProvider, ) -> FFI_TableType { @@ -344,6 +363,7 @@ unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_Table table_type: table_type_fn_wrapper, supports_filters_pushdown: provider.supports_filters_pushdown, insert_into: provider.insert_into, + statistics: statistics_fn_wrapper, logical_codec: provider.logical_codec.clone(), clone: clone_fn_wrapper, release: release_fn_wrapper, @@ -404,6 +424,7 @@ impl FFI_TableProvider { false => None, }, insert_into: insert_into_fn_wrapper, + statistics: statistics_fn_wrapper, logical_codec, clone: clone_fn_wrapper, release: release_fn_wrapper, @@ -451,6 +472,28 @@ impl TableProvider for ForeignTableProvider { unsafe { (self.0.table_type)(&self.0).into() } } + fn statistics(&self) -> Option { + let ffi_result = df_result!(unsafe { (self.0.statistics)(&self.0) }); + let ffi_opt = match ffi_result { + Ok(v) => v, + Err(e) => { + log::warn!("FFI TableProvider::statistics failed: {e}"); + debug_assert!(false, "FFI TableProvider::statistics failed: {e}"); + return None; + } + }; + let bytes: Option> = ffi_opt.into(); + let bytes = bytes?; + match deserialize_statistics(bytes.as_slice()) { + Ok(stats) => Some(stats), + Err(e) => { + log::warn!("Failed to deserialize FFI statistics: {e}"); + debug_assert!(false, "Failed to deserialize FFI statistics: {e}"); + None + } + } + } + async fn scan( &self, session: &dyn Session, @@ -772,4 +815,101 @@ mod tests { Ok(()) } + + #[test] + fn test_ffi_table_provider_statistics_round_trip() -> Result<()> { + use arrow::datatypes::{DataType, Field}; + use datafusion::arrow::array::Int32Array; + use datafusion::arrow::record_batch::RecordBatch; + use datafusion::datasource::MemTable; + use datafusion_common::stats::Precision; + use datafusion_common::{ColumnStatistics, ScalarValue}; + + // A thin wrapper that lets us inject statistics onto any TableProvider. + #[derive(Debug)] + struct TableWithStats { + inner: Arc, + stats: Option, + } + + #[async_trait] + impl TableProvider for TableWithStats { + fn schema(&self) -> SchemaRef { + self.inner.schema() + } + fn table_type(&self) -> TableType { + self.inner.table_type() + } + fn statistics(&self) -> Option { + self.stats.clone() + } + async fn scan( + &self, + session: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + self.inner.scan(session, projection, filters, limit).await + } + } + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + )?; + + let ctx = Arc::new(SessionContext::new()); + let task_ctx_provider = Arc::clone(&ctx) as Arc; + let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider); + + // Provider without statistics should cross the boundary as None. + let no_stats_inner = Arc::new(MemTable::try_new( + Arc::clone(&schema), + vec![vec![batch.clone()]], + )?); + let no_stats_provider = Arc::new(TableWithStats { + inner: no_stats_inner, + stats: None, + }); + let mut ffi_provider = FFI_TableProvider::new( + no_stats_provider, + true, + None, + task_ctx_provider.clone(), + None, + ); + ffi_provider.library_marker_id = crate::mock_foreign_marker_id; + let foreign: Arc = (&ffi_provider).into(); + assert!(foreign.statistics().is_none()); + + // Provider with statistics should round-trip faithfully. + let original_stats = Statistics { + num_rows: Precision::Exact(3), + total_byte_size: Precision::Inexact(12), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(3))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Exact(ScalarValue::Int64(Some(6))), + distinct_count: Precision::Exact(3), + byte_size: Precision::Exact(12), + }], + }; + let stats_inner = + Arc::new(MemTable::try_new(Arc::clone(&schema), vec![vec![batch]])?); + let stats_provider = Arc::new(TableWithStats { + inner: stats_inner, + stats: Some(original_stats.clone()), + }); + let mut ffi_provider = + FFI_TableProvider::new(stats_provider, true, None, task_ctx_provider, None); + ffi_provider.library_marker_id = crate::mock_foreign_marker_id; + let foreign: Arc = (&ffi_provider).into(); + assert_eq!(foreign.statistics().as_ref(), Some(&original_stats)); + + Ok(()) + } }