From 5652d4d45606f280511aea51bb0e6132fe35d42a Mon Sep 17 00:00:00 2001 From: mailmindlin Date: Tue, 12 May 2026 16:55:32 -0400 Subject: [PATCH 1/9] Add metrics to `FFI_ExecutionPlan` --- Cargo.lock | 1 + datafusion/ffi/Cargo.toml | 1 + datafusion/ffi/src/execution_plan.rs | 72 ++ datafusion/ffi/src/lib.rs | 1 + datafusion/ffi/src/metrics.rs | 743 ++++++++++++++++++ .../physical-expr-common/src/metrics/value.rs | 11 + 6 files changed, 829 insertions(+) create mode 100644 datafusion/ffi/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index d3a6442596390..5fe85b0b81ab1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2136,6 +2136,7 @@ dependencies = [ "arrow-schema", "async-ffi", "async-trait", + "chrono", "datafusion", "datafusion-catalog", "datafusion-common", diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index ea9a12665ad4c..7eed11c0c69e8 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -48,6 +48,7 @@ arrow = { workspace = true, features = ["ffi"] } arrow-schema = { workspace = true } async-ffi = { version = "0.5.0" } async-trait = { workspace = true } +chrono = { workspace = true } datafusion-catalog = { workspace = true } datafusion-common = { workspace = true } datafusion-datasource = { workspace = true } diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 7541880a233c8..7c18bbdf195e0 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -23,6 +23,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr_common::metrics::MetricsSet; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, }; @@ -32,6 +33,7 @@ use tokio::runtime::Handle; use crate::config::FFI_ConfigOptions; use crate::execution::FFI_TaskContext; +use crate::metrics::FFI_MetricsSet; use crate::plan_properties::FFI_PlanProperties; use crate::record_batch_stream::FFI_RecordBatchStream; use crate::util::{FFI_Option, FFI_Result}; @@ -68,6 +70,10 @@ pub struct FFI_ExecutionPlan { ) -> FFI_Result>, + /// Snapshot the plan's execution metrics. Returns `None` when the + /// underlying [`ExecutionPlan::metrics`] returned `None`. + pub metrics: unsafe extern "C" fn(plan: &Self) -> FFI_Option, + /// Used to create a clone on the provider of the execution plan. This should /// only need to be called by the receiver of the plan. pub clone: unsafe extern "C" fn(plan: &Self) -> Self, @@ -179,6 +185,16 @@ unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> SString { plan.inner().name().into() } +unsafe extern "C" fn metrics_fn_wrapper( + plan: &FFI_ExecutionPlan, +) -> FFI_Option { + plan.inner() + .metrics() + .as_ref() + .map(FFI_MetricsSet::from) + .into() +} + unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) { unsafe { debug_assert!(!plan.private_data.is_null()); @@ -270,6 +286,7 @@ impl FFI_ExecutionPlan { name: name_fn_wrapper, execute: execute_fn_wrapper, repartitioned: repartitioned_fn_wrapper, + metrics: metrics_fn_wrapper, clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, @@ -431,6 +448,12 @@ impl ExecutionPlan for ForeignExecutionPlan { .map(|plan| >::try_from(&plan)) .transpose() } + + fn metrics(&self) -> Option { + let ffi: Option = + unsafe { (self.plan.metrics)(&self.plan) }.into(); + ffi.map(MetricsSet::from) + } } #[cfg(any(test, feature = "integration-tests"))] @@ -444,6 +467,7 @@ pub mod tests { pub struct EmptyExec { props: Arc, children: Vec>, + metrics: Option, } impl EmptyExec { @@ -456,8 +480,14 @@ pub mod tests { Boundedness::Bounded, )), children: Vec::default(), + metrics: None, } } + + pub fn with_metrics(mut self, metrics: MetricsSet) -> Self { + self.metrics = Some(metrics); + self + } } impl DisplayAs for EmptyExec { @@ -490,6 +520,7 @@ pub mod tests { Ok(Arc::new(EmptyExec { props: Arc::clone(&self.props), children, + metrics: self.metrics.clone(), })) } @@ -501,6 +532,10 @@ pub mod tests { unimplemented!() } + fn metrics(&self) -> Option { + self.metrics.clone() + } + fn apply_expressions( &self, f: &mut dyn FnMut( @@ -587,6 +622,43 @@ pub mod tests { Ok(()) } + #[test] + fn test_ffi_execution_plan_metrics_round_trip() -> Result<()> { + use datafusion_physical_expr_common::metrics::{Count, Metric, MetricValue}; + + let schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false), + ])); + + // Plans without metrics still return None across the boundary. + let bare_plan = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let mut bare_local = FFI_ExecutionPlan::new(bare_plan, None); + bare_local.library_marker_id = crate::mock_foreign_marker_id; + let bare_foreign: Arc = (&bare_local).try_into()?; + assert!(bare_foreign.metrics().is_none()); + + // Plans with metrics produce equivalent MetricsSets after a round trip. + let mut original_metrics = MetricsSet::new(); + let c0 = Count::new(); + c0.add(11); + original_metrics + .push(Arc::new(Metric::new(MetricValue::OutputRows(c0), Some(0)))); + let c1 = Count::new(); + c1.add(31); + original_metrics + .push(Arc::new(Metric::new(MetricValue::OutputRows(c1), Some(1)))); + + let metric_plan = Arc::new(EmptyExec::new(schema).with_metrics(original_metrics)); + let mut metric_local = FFI_ExecutionPlan::new(metric_plan, None); + metric_local.library_marker_id = crate::mock_foreign_marker_id; + let metric_foreign: Arc = (&metric_local).try_into()?; + + let observed = metric_foreign.metrics().expect("metrics should be present"); + assert_eq!(observed.output_rows(), Some(42)); + + Ok(()) + } + #[test] fn test_ffi_execution_plan_local_bypass() { let schema = Arc::new(arrow::datatypes::Schema::new(vec![ diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index de3caf8c17260..e9b703ddb4554 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -34,6 +34,7 @@ pub mod execution_plan; pub mod expr; pub mod ffi_option; pub mod insert_op; +pub mod metrics; pub mod physical_expr; pub mod physical_optimizer; pub mod plan_properties; diff --git a/datafusion/ffi/src/metrics.rs b/datafusion/ffi/src/metrics.rs new file mode 100644 index 0000000000000..889065fe6a1db --- /dev/null +++ b/datafusion/ffi/src/metrics.rs @@ -0,0 +1,743 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! FFI-stable mirrors of [`MetricsSet`] and related metric types. +//! +//! Metrics are passed across the FFI boundary as a **snapshot**: all +//! atomic-backed counters/gauges/timers are read into plain integer fields +//! at conversion time. Callers re-invoke [`ExecutionPlan::metrics()`] across +//! the boundary to observe newer values. This matches the documented contract +//! ("Once `self.execute()` has returned... metrics should be complete") and +//! all in-tree consumers (`AnalyzeExec`, `DisplayableExecutionPlan`). +//! +//! The variant *order* of [`FFI_MetricValue`] is part of the stable ABI and +//! must not be reordered. New variants must be appended at the end. +//! +//! [`ExecutionPlan::metrics()`]: datafusion_physical_plan::ExecutionPlan::metrics + +use std::any::Any; +use std::borrow::Cow; +use std::fmt::{self, Debug, Display}; +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use datafusion_common::format::{MetricCategory, MetricType}; +use datafusion_physical_expr_common::metrics::{ + Count, CustomMetricValue, Gauge, MetricValue, MetricsSet, PruningMetrics, + RatioMergeStrategy, RatioMetrics, Time, Timestamp, +}; +use datafusion_physical_expr_common::metrics::{Label, Metric}; +use stabby::string::String as SString; +use stabby::vec::Vec as SVec; + +use crate::ffi_option::FFI_Option; + +/// FFI-stable mirror of [`MetricsSet`]. +#[repr(C)] +#[derive(Debug, Clone)] +pub struct FFI_MetricsSet { + pub metrics: SVec, +} + +/// FFI-stable mirror of [`Metric`]. +#[repr(C)] +#[derive(Debug, Clone)] +pub struct FFI_Metric { + pub value: FFI_MetricValue, + pub labels: SVec, + pub partition: FFI_Option, + pub metric_type: FFI_MetricType, + pub metric_category: FFI_Option, +} + +/// FFI-stable mirror of [`Label`]. +#[repr(C)] +#[derive(Debug, Clone)] +pub struct FFI_Label { + pub name: SString, + pub value: SString, +} + +/// FFI-stable mirror of [`MetricType`]. +#[expect(non_camel_case_types)] +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FFI_MetricType { + Summary, + Dev, +} + +/// FFI-stable mirror of [`MetricCategory`]. +#[expect(non_camel_case_types)] +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FFI_MetricCategory { + Rows, + Bytes, + Timing, + Uncategorized, +} + +/// FFI-stable mirror of [`PruningMetrics`]. All counts are snapshotted at +/// conversion time. +#[repr(C)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct FFI_PruningMetrics { + pub pruned: u64, + pub matched: u64, + pub fully_matched: u64, +} + +/// FFI-stable mirror of [`RatioMergeStrategy`]. +#[expect(non_camel_case_types)] +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FFI_RatioMergeStrategy { + AddPartAddTotal, + AddPartSetTotal, + SetPartAddTotal, +} + +/// FFI-stable mirror of [`RatioMetrics`]. Numerator/denominator are +/// snapshotted at conversion time. +#[repr(C)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct FFI_RatioMetrics { + pub part: u64, + pub total: u64, + pub merge_strategy: FFI_RatioMergeStrategy, + pub display_raw_values: bool, +} + +/// FFI-stable mirror of [`MetricValue`]. The variant order is load-bearing +/// across the FFI boundary; new variants must only be appended. +#[repr(C, u8)] +#[derive(Debug, Clone)] +pub enum FFI_MetricValue { + OutputRows(u64), + /// Nanoseconds. + ElapsedCompute(u64), + SpillCount(u64), + SpilledBytes(u64), + OutputBytes(u64), + OutputBatches(u64), + SpilledRows(u64), + CurrentMemoryUsage(u64), + Count { + name: SString, + count: u64, + }, + Gauge { + name: SString, + gauge: u64, + }, + /// Nanoseconds. + Time { + name: SString, + time: u64, + }, + /// Unix nanoseconds (UTC). + StartTimestamp(FFI_Option), + /// Unix nanoseconds (UTC). + EndTimestamp(FFI_Option), + PruningMetrics { + name: SString, + pruning_metrics: FFI_PruningMetrics, + }, + Ratio { + name: SString, + ratio_metrics: FFI_RatioMetrics, + }, + /// Custom metrics are marshalled as their `Display` output plus the + /// `as_usize()` fallback. The underlying `dyn CustomMetricValue` type is + /// not preserved across the boundary, so `aggregate`/`as_any` downcasting + /// are lost; the reconstructed value uses [`FfiCustomMetricValue`]. + Custom { + name: SString, + display: SString, + as_usize_value: u64, + }, +} + +// ----------------------------------------------------------------------------- +// MetricsSet <-> FFI_MetricsSet +// ----------------------------------------------------------------------------- + +impl From<&MetricsSet> for FFI_MetricsSet { + fn from(set: &MetricsSet) -> Self { + Self { + metrics: set.iter().map(|m| FFI_Metric::from(m.as_ref())).collect(), + } + } +} + +impl From for MetricsSet { + fn from(set: FFI_MetricsSet) -> Self { + let mut out = MetricsSet::new(); + for ffi_metric in set.metrics { + out.push(Arc::new(Metric::from(ffi_metric))); + } + out + } +} + +// ----------------------------------------------------------------------------- +// Metric <-> FFI_Metric +// ----------------------------------------------------------------------------- + +impl From<&Metric> for FFI_Metric { + fn from(m: &Metric) -> Self { + Self { + value: FFI_MetricValue::from(m.value()), + labels: m.labels().iter().map(FFI_Label::from).collect(), + partition: m.partition().map(|p| p as u64).into(), + metric_type: m.metric_type().into(), + metric_category: m.metric_category().map(FFI_MetricCategory::from).into(), + } + } +} + +impl From for Metric { + fn from(m: FFI_Metric) -> Self { + let labels: Vec