diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 7d32f2a88fd9c..1f1159e0c919f 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1385,6 +1385,7 @@ config_namespace! { /// Verbosity level for "EXPLAIN ANALYZE". Default is "dev" /// "summary" shows common metrics for high-level insights. /// "dev" provides deep operator-level introspection for developers. + /// "internal" provides low-level kernel debugging metrics. pub analyze_level: MetricType, default = MetricType::Dev /// Which metric categories to include in "EXPLAIN ANALYZE" output. diff --git a/datafusion/common/src/format.rs b/datafusion/common/src/format.rs index a6bd42be691a9..2d5fc5c5f5b0d 100644 --- a/datafusion/common/src/format.rs +++ b/datafusion/common/src/format.rs @@ -210,7 +210,8 @@ impl ConfigField for ExplainFormat { /// /// The `datafusion.explain.analyze_level` configuration controls which /// type is shown: -/// - `"dev"` (the default): all metrics are shown. +/// - `"dev"` (the default): summary and developer-facing metrics are shown. +/// - `"internal"`: all metrics are shown, including kernel debugging metrics. /// - `"summary"`: only metrics tagged as `Summary` are shown. /// /// This is orthogonal to [`MetricCategory`], which filters by *what kind* @@ -228,19 +229,30 @@ pub enum MetricType { Summary, /// For deep operator-level introspection for developers Dev, + /// For low-level kernel debugging and DataFusion development + Internal, } impl MetricType { /// Returns the set of metric types that should be shown for this level. /// - /// `Dev` is a superset of `Summary`: when the user selects - /// `analyze_level = 'dev'`, both `Summary` and `Dev` metrics are shown. + /// `Dev` is a superset of `Summary`, and `Internal` is a superset of + /// both: when the user selects `analyze_level = 'internal'`, `Summary`, + /// `Dev`, and `Internal` metrics are shown. pub fn included_types(self) -> Vec { match self { MetricType::Summary => vec![MetricType::Summary], MetricType::Dev => vec![MetricType::Summary, MetricType::Dev], + MetricType::Internal => { + vec![MetricType::Summary, MetricType::Dev, MetricType::Internal] + } } } + + /// Returns true if `self` includes metrics tagged as `metric_type`. + pub fn includes(self, metric_type: MetricType) -> bool { + self.included_types().contains(&metric_type) + } } impl FromStr for MetricType { @@ -250,8 +262,9 @@ impl FromStr for MetricType { match s.trim().to_lowercase().as_str() { "summary" => Ok(Self::Summary), "dev" => Ok(Self::Dev), + "internal" => Ok(Self::Internal), other => Err(DataFusionError::Configuration(format!( - "Invalid explain analyze level. Expected 'summary' or 'dev'. Got '{other}'" + "Invalid explain analyze level. Expected 'summary', 'dev', or 'internal'. Got '{other}'" ))), } } @@ -262,6 +275,7 @@ impl Display for MetricType { match self { Self::Summary => write!(f, "summary"), Self::Dev => write!(f, "dev"), + Self::Internal => write!(f, "internal"), } } } diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index b093563d9adda..a2a0d1847eafe 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -236,6 +236,25 @@ async fn collect_plan_with_categories( .to_string() } +async fn collect_plan_after_set( + ctx: &SessionContext, + level: &str, + sql_str: &str, +) -> String { + ctx.sql(&format!("SET datafusion.explain.analyze_level = '{level}'")) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let dataframe = ctx.sql(sql_str).await.unwrap(); + let batches = dataframe.collect().await.unwrap(); + arrow::util::pretty::pretty_format_batches(&batches) + .unwrap() + .to_string() +} + async fn collect_plan(sql_str: &str, level: MetricType) -> String { let ctx = SessionContext::new(); collect_plan_with_context(sql_str, &ctx, level).await @@ -257,6 +276,10 @@ async fn explain_analyze_level() { (MetricType::Dev, "output_rows", true), (MetricType::Dev, "output_bytes", true), (MetricType::Dev, "output_batches", true), + (MetricType::Internal, "spill_count", true), + (MetricType::Internal, "output_rows", true), + (MetricType::Internal, "output_bytes", true), + (MetricType::Internal, "output_batches", true), ] { let plan = collect_plan(sql, level).await; assert_eq!( @@ -267,6 +290,33 @@ async fn explain_analyze_level() { } } +#[tokio::test] +async fn explain_analyze_internal_repartition_metrics() { + let ctx = SessionContext::new_with_config( + SessionConfig::new() + .with_target_partitions(4) + .with_batch_size(4096), + ); + register_aggregate_csv_by_sql(&ctx).await; + + let sql = "EXPLAIN ANALYZE \ + SELECT c1, count(*) \ + FROM aggregate_test_100 \ + GROUP BY c1"; + + let dev_plan = collect_plan_after_set(&ctx, "dev", sql).await; + assert_contains!(&dev_plan, "RepartitionExec"); + assert_not_contains!(&dev_plan, "hash_compute_time"); + assert_not_contains!(&dev_plan, "route_time"); + assert_not_contains!(&dev_plan, "batch_build_time"); + + let internal_plan = collect_plan_after_set(&ctx, "internal", sql).await; + assert_contains!(&internal_plan, "RepartitionExec"); + assert_contains!(&internal_plan, "hash_compute_time"); + assert_contains!(&internal_plan, "route_time"); + assert_contains!(&internal_plan, "batch_build_time"); +} + #[tokio::test] async fn explain_analyze_level_datasource_parquet() { let table_name = "tpch_lineitem_small"; @@ -284,6 +334,8 @@ async fn explain_analyze_level_datasource_parquet() { (MetricType::Summary, "page_index_eval_time", false), (MetricType::Dev, "metadata_load_time", true), (MetricType::Dev, "page_index_eval_time", true), + (MetricType::Internal, "metadata_load_time", true), + (MetricType::Internal, "page_index_eval_time", true), ] { let plan = collect_plan_with_context(&sql, &ctx, level).await; diff --git a/datafusion/physical-expr-common/src/metrics/builder.rs b/datafusion/physical-expr-common/src/metrics/builder.rs index e9c0b76af2582..42758dc438491 100644 --- a/datafusion/physical-expr-common/src/metrics/builder.rs +++ b/datafusion/physical-expr-common/src/metrics/builder.rs @@ -117,6 +117,19 @@ impl<'a> MetricBuilder<'a> { self } + /// Switches to a conditional builder for metrics controlled by + /// `datafusion.explain.analyze_level`. + /// + /// The returned builder preserves this builder's labels, category, and + /// [`MetricType`], but its terminal methods return `Option` and skip + /// registration when `enabled_level` does not include this builder's + /// [`MetricType`]. + pub fn if_enabled(self, enabled_level: MetricType) -> ConditionalMetricBuilder<'a> { + ConditionalMetricBuilder { + builder: enabled_level.includes(self.metric_type).then_some(self), + } + } + /// Consume self and create a metric of the specified value /// registered with the MetricsSet pub fn build(self, value: MetricValue) { @@ -333,3 +346,51 @@ impl<'a> MetricBuilder<'a> { ratio_metrics } } + +/// A conditional metric builder used after [`MetricBuilder::if_enabled`]. +/// +/// This type intentionally mirrors a subset of [`MetricBuilder`], but its +/// terminal methods return `Option`. +/// - `Some(T)` means the metric was registered and can be updated. +/// - `None` means the configured metric level does not include this metric, +/// so the caller should do no metric work. +pub struct ConditionalMetricBuilder<'a> { + builder: Option>, +} + +impl<'a> ConditionalMetricBuilder<'a> { + /// Add a label to the metric being constructed, if this builder is enabled. + pub fn with_label(mut self, label: Label) -> Self { + self.builder = self.builder.map(|builder| builder.with_label(label)); + self + } + + /// Set the semantic category for the metric being constructed, if this + /// builder is enabled. + pub fn with_category(mut self, category: MetricCategory) -> Self { + self.builder = self.builder.map(|builder| builder.with_category(category)); + self + } + + /// Consume self and conditionally create a new timer for recording some + /// subset of an operator's execution time. + pub fn subset_time( + self, + subset_name: impl Into>, + partition: usize, + ) -> Option