From 7b0e0373af4496eff75836f4981b440d6cff2cd0 Mon Sep 17 00:00:00 2001 From: mailmindlin Date: Tue, 12 May 2026 16:55:32 -0400 Subject: [PATCH 1/9] Add metrics to `FFI_ExecutionPlan` --- datafusion/ffi/src/lib.rs | 1 + datafusion/ffi/src/metrics.rs | 743 ++++++++++++++++++++++++++++++++++ 2 files changed, 744 insertions(+) create mode 100644 datafusion/ffi/src/metrics.rs diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index de3caf8c17260..e9b703ddb4554 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -34,6 +34,7 @@ pub mod execution_plan; pub mod expr; pub mod ffi_option; pub mod insert_op; +pub mod metrics; pub mod physical_expr; pub mod physical_optimizer; pub mod plan_properties; diff --git a/datafusion/ffi/src/metrics.rs b/datafusion/ffi/src/metrics.rs new file mode 100644 index 0000000000000..889065fe6a1db --- /dev/null +++ b/datafusion/ffi/src/metrics.rs @@ -0,0 +1,743 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! FFI-stable mirrors of [`MetricsSet`] and related metric types. +//! +//! Metrics are passed across the FFI boundary as a **snapshot**: all +//! atomic-backed counters/gauges/timers are read into plain integer fields +//! at conversion time. Callers re-invoke [`ExecutionPlan::metrics()`] across +//! the boundary to observe newer values. This matches the documented contract +//! ("Once `self.execute()` has returned... metrics should be complete") and +//! all in-tree consumers (`AnalyzeExec`, `DisplayableExecutionPlan`). +//! +//! The variant *order* of [`FFI_MetricValue`] is part of the stable ABI and +//! must not be reordered. New variants must be appended at the end. +//! +//! [`ExecutionPlan::metrics()`]: datafusion_physical_plan::ExecutionPlan::metrics + +use std::any::Any; +use std::borrow::Cow; +use std::fmt::{self, Debug, Display}; +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use datafusion_common::format::{MetricCategory, MetricType}; +use datafusion_physical_expr_common::metrics::{ + Count, CustomMetricValue, Gauge, MetricValue, MetricsSet, PruningMetrics, + RatioMergeStrategy, RatioMetrics, Time, Timestamp, +}; +use datafusion_physical_expr_common::metrics::{Label, Metric}; +use stabby::string::String as SString; +use stabby::vec::Vec as SVec; + +use crate::ffi_option::FFI_Option; + +/// FFI-stable mirror of [`MetricsSet`]. +#[repr(C)] +#[derive(Debug, Clone)] +pub struct FFI_MetricsSet { + pub metrics: SVec, +} + +/// FFI-stable mirror of [`Metric`]. +#[repr(C)] +#[derive(Debug, Clone)] +pub struct FFI_Metric { + pub value: FFI_MetricValue, + pub labels: SVec, + pub partition: FFI_Option, + pub metric_type: FFI_MetricType, + pub metric_category: FFI_Option, +} + +/// FFI-stable mirror of [`Label`]. +#[repr(C)] +#[derive(Debug, Clone)] +pub struct FFI_Label { + pub name: SString, + pub value: SString, +} + +/// FFI-stable mirror of [`MetricType`]. +#[expect(non_camel_case_types)] +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FFI_MetricType { + Summary, + Dev, +} + +/// FFI-stable mirror of [`MetricCategory`]. +#[expect(non_camel_case_types)] +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FFI_MetricCategory { + Rows, + Bytes, + Timing, + Uncategorized, +} + +/// FFI-stable mirror of [`PruningMetrics`]. All counts are snapshotted at +/// conversion time. +#[repr(C)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct FFI_PruningMetrics { + pub pruned: u64, + pub matched: u64, + pub fully_matched: u64, +} + +/// FFI-stable mirror of [`RatioMergeStrategy`]. +#[expect(non_camel_case_types)] +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FFI_RatioMergeStrategy { + AddPartAddTotal, + AddPartSetTotal, + SetPartAddTotal, +} + +/// FFI-stable mirror of [`RatioMetrics`]. Numerator/denominator are +/// snapshotted at conversion time. +#[repr(C)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct FFI_RatioMetrics { + pub part: u64, + pub total: u64, + pub merge_strategy: FFI_RatioMergeStrategy, + pub display_raw_values: bool, +} + +/// FFI-stable mirror of [`MetricValue`]. The variant order is load-bearing +/// across the FFI boundary; new variants must only be appended. +#[repr(C, u8)] +#[derive(Debug, Clone)] +pub enum FFI_MetricValue { + OutputRows(u64), + /// Nanoseconds. + ElapsedCompute(u64), + SpillCount(u64), + SpilledBytes(u64), + OutputBytes(u64), + OutputBatches(u64), + SpilledRows(u64), + CurrentMemoryUsage(u64), + Count { + name: SString, + count: u64, + }, + Gauge { + name: SString, + gauge: u64, + }, + /// Nanoseconds. + Time { + name: SString, + time: u64, + }, + /// Unix nanoseconds (UTC). + StartTimestamp(FFI_Option), + /// Unix nanoseconds (UTC). + EndTimestamp(FFI_Option), + PruningMetrics { + name: SString, + pruning_metrics: FFI_PruningMetrics, + }, + Ratio { + name: SString, + ratio_metrics: FFI_RatioMetrics, + }, + /// Custom metrics are marshalled as their `Display` output plus the + /// `as_usize()` fallback. The underlying `dyn CustomMetricValue` type is + /// not preserved across the boundary, so `aggregate`/`as_any` downcasting + /// are lost; the reconstructed value uses [`FfiCustomMetricValue`]. + Custom { + name: SString, + display: SString, + as_usize_value: u64, + }, +} + +// ----------------------------------------------------------------------------- +// MetricsSet <-> FFI_MetricsSet +// ----------------------------------------------------------------------------- + +impl From<&MetricsSet> for FFI_MetricsSet { + fn from(set: &MetricsSet) -> Self { + Self { + metrics: set.iter().map(|m| FFI_Metric::from(m.as_ref())).collect(), + } + } +} + +impl From for MetricsSet { + fn from(set: FFI_MetricsSet) -> Self { + let mut out = MetricsSet::new(); + for ffi_metric in set.metrics { + out.push(Arc::new(Metric::from(ffi_metric))); + } + out + } +} + +// ----------------------------------------------------------------------------- +// Metric <-> FFI_Metric +// ----------------------------------------------------------------------------- + +impl From<&Metric> for FFI_Metric { + fn from(m: &Metric) -> Self { + Self { + value: FFI_MetricValue::from(m.value()), + labels: m.labels().iter().map(FFI_Label::from).collect(), + partition: m.partition().map(|p| p as u64).into(), + metric_type: m.metric_type().into(), + metric_category: m.metric_category().map(FFI_MetricCategory::from).into(), + } + } +} + +impl From for Metric { + fn from(m: FFI_Metric) -> Self { + let labels: Vec