Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,7 @@ config_namespace! {
/// Verbosity level for "EXPLAIN ANALYZE". Default is "dev"
/// "summary" shows common metrics for high-level insights.
/// "dev" provides deep operator-level introspection for developers.
/// "internal" provides low-level kernel debugging metrics.
pub analyze_level: MetricType, default = MetricType::Dev

/// Which metric categories to include in "EXPLAIN ANALYZE" output.
Expand Down
22 changes: 18 additions & 4 deletions datafusion/common/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl ConfigField for ExplainFormat {
///
/// The `datafusion.explain.analyze_level` configuration controls which
/// type is shown:
/// - `"dev"` (the default): all metrics are shown.
/// - `"dev"` (the default): summary and developer-facing metrics are shown.
/// - `"internal"`: all metrics are shown, including kernel debugging metrics.
/// - `"summary"`: only metrics tagged as `Summary` are shown.
///
/// This is orthogonal to [`MetricCategory`], which filters by *what kind*
Expand All @@ -228,19 +229,30 @@ pub enum MetricType {
Summary,
/// For deep operator-level introspection for developers
Dev,
/// For low-level kernel debugging and DataFusion development
Internal,
}

impl MetricType {
/// Returns the set of metric types that should be shown for this level.
///
/// `Dev` is a superset of `Summary`: when the user selects
/// `analyze_level = 'dev'`, both `Summary` and `Dev` metrics are shown.
/// `Dev` is a superset of `Summary`, and `Internal` is a superset of
/// both: when the user selects `analyze_level = 'internal'`, `Summary`,
/// `Dev`, and `Internal` metrics are shown.
pub fn included_types(self) -> Vec<MetricType> {
match self {
MetricType::Summary => vec![MetricType::Summary],
MetricType::Dev => vec![MetricType::Summary, MetricType::Dev],
MetricType::Internal => {
vec![MetricType::Summary, MetricType::Dev, MetricType::Internal]
}
}
}

/// Returns true if `self` includes metrics tagged as `metric_type`.
pub fn includes(self, metric_type: MetricType) -> bool {
self.included_types().contains(&metric_type)
}
}

impl FromStr for MetricType {
Expand All @@ -250,8 +262,9 @@ impl FromStr for MetricType {
match s.trim().to_lowercase().as_str() {
"summary" => Ok(Self::Summary),
"dev" => Ok(Self::Dev),
"internal" => Ok(Self::Internal),
other => Err(DataFusionError::Configuration(format!(
"Invalid explain analyze level. Expected 'summary' or 'dev'. Got '{other}'"
"Invalid explain analyze level. Expected 'summary', 'dev', or 'internal'. Got '{other}'"
))),
}
}
Expand All @@ -262,6 +275,7 @@ impl Display for MetricType {
match self {
Self::Summary => write!(f, "summary"),
Self::Dev => write!(f, "dev"),
Self::Internal => write!(f, "internal"),
}
}
}
Expand Down
52 changes: 52 additions & 0 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,25 @@ async fn collect_plan_with_categories(
.to_string()
}

async fn collect_plan_after_set(
ctx: &SessionContext,
level: &str,
sql_str: &str,
) -> String {
ctx.sql(&format!("SET datafusion.explain.analyze_level = '{level}'"))
.await
.unwrap()
.collect()
.await
.unwrap();

let dataframe = ctx.sql(sql_str).await.unwrap();
let batches = dataframe.collect().await.unwrap();
arrow::util::pretty::pretty_format_batches(&batches)
.unwrap()
.to_string()
}

async fn collect_plan(sql_str: &str, level: MetricType) -> String {
let ctx = SessionContext::new();
collect_plan_with_context(sql_str, &ctx, level).await
Expand All @@ -257,6 +276,10 @@ async fn explain_analyze_level() {
(MetricType::Dev, "output_rows", true),
(MetricType::Dev, "output_bytes", true),
(MetricType::Dev, "output_batches", true),
(MetricType::Internal, "spill_count", true),
(MetricType::Internal, "output_rows", true),
(MetricType::Internal, "output_bytes", true),
(MetricType::Internal, "output_batches", true),
] {
let plan = collect_plan(sql, level).await;
assert_eq!(
Expand All @@ -267,6 +290,33 @@ async fn explain_analyze_level() {
}
}

#[tokio::test]
async fn explain_analyze_internal_repartition_metrics() {
let ctx = SessionContext::new_with_config(
SessionConfig::new()
.with_target_partitions(4)
.with_batch_size(4096),
);
register_aggregate_csv_by_sql(&ctx).await;

let sql = "EXPLAIN ANALYZE \
SELECT c1, count(*) \
FROM aggregate_test_100 \
GROUP BY c1";

let dev_plan = collect_plan_after_set(&ctx, "dev", sql).await;
assert_contains!(&dev_plan, "RepartitionExec");
assert_not_contains!(&dev_plan, "hash_compute_time");
assert_not_contains!(&dev_plan, "route_time");
assert_not_contains!(&dev_plan, "batch_build_time");

let internal_plan = collect_plan_after_set(&ctx, "internal", sql).await;
assert_contains!(&internal_plan, "RepartitionExec");
assert_contains!(&internal_plan, "hash_compute_time");
assert_contains!(&internal_plan, "route_time");
assert_contains!(&internal_plan, "batch_build_time");
}

#[tokio::test]
async fn explain_analyze_level_datasource_parquet() {
let table_name = "tpch_lineitem_small";
Expand All @@ -284,6 +334,8 @@ async fn explain_analyze_level_datasource_parquet() {
(MetricType::Summary, "page_index_eval_time", false),
(MetricType::Dev, "metadata_load_time", true),
(MetricType::Dev, "page_index_eval_time", true),
(MetricType::Internal, "metadata_load_time", true),
(MetricType::Internal, "page_index_eval_time", true),
] {
let plan = collect_plan_with_context(&sql, &ctx, level).await;

Expand Down
61 changes: 61 additions & 0 deletions datafusion/physical-expr-common/src/metrics/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,19 @@ impl<'a> MetricBuilder<'a> {
self
}

/// Switches to a conditional builder for metrics controlled by
/// `datafusion.explain.analyze_level`.
///
/// The returned builder preserves this builder's labels, category, and
/// [`MetricType`], but its terminal methods return `Option<T>` and skip
/// registration when `enabled_level` does not include this builder's
/// [`MetricType`].
pub fn if_enabled(self, enabled_level: MetricType) -> ConditionalMetricBuilder<'a> {
ConditionalMetricBuilder {
builder: enabled_level.includes(self.metric_type).then_some(self),
}
}

/// Consume self and create a metric of the specified value
/// registered with the MetricsSet
pub fn build(self, value: MetricValue) {
Expand Down Expand Up @@ -333,3 +346,51 @@ impl<'a> MetricBuilder<'a> {
ratio_metrics
}
}

/// A conditional metric builder used after [`MetricBuilder::if_enabled`].
///
/// This type intentionally mirrors a subset of [`MetricBuilder`], but its
/// terminal methods return `Option<T>`.
/// - `Some(T)` means the metric was registered and can be updated.
/// - `None` means the configured metric level does not include this metric,
/// so the caller should do no metric work.
pub struct ConditionalMetricBuilder<'a> {
builder: Option<MetricBuilder<'a>>,
}

impl<'a> ConditionalMetricBuilder<'a> {
/// Add a label to the metric being constructed, if this builder is enabled.
pub fn with_label(mut self, label: Label) -> Self {
self.builder = self.builder.map(|builder| builder.with_label(label));
self
}

/// Set the semantic category for the metric being constructed, if this
/// builder is enabled.
pub fn with_category(mut self, category: MetricCategory) -> Self {
self.builder = self.builder.map(|builder| builder.with_category(category));
self
}

/// Consume self and conditionally create a new timer for recording some
/// subset of an operator's execution time.
pub fn subset_time(
self,
subset_name: impl Into<Cow<'static, str>>,
partition: usize,
) -> Option<Time> {
self.builder
.map(|builder| builder.subset_time(subset_name, partition))
}

/// Consume self and conditionally create a new counter for recording some
/// arbitrary metric of an operator.
pub fn counter(
self,
counter_name: impl Into<Cow<'static, str>>,
partition: usize,
) -> Option<Count> {
self.builder
.map(|builder| builder.counter(counter_name, partition))
}
}
55 changes: 55 additions & 0 deletions datafusion/physical-expr-common/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,61 @@ mod tests {
assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
}

#[test]
fn test_conditional_metric_builder_uses_metric_type_level() {
for (enabled_level, metric_type, should_register) in [
(MetricType::Summary, MetricType::Summary, true),
(MetricType::Summary, MetricType::Dev, false),
(MetricType::Summary, MetricType::Internal, false),
(MetricType::Dev, MetricType::Summary, true),
(MetricType::Dev, MetricType::Dev, true),
(MetricType::Dev, MetricType::Internal, false),
(MetricType::Internal, MetricType::Summary, true),
(MetricType::Internal, MetricType::Dev, true),
(MetricType::Internal, MetricType::Internal, true),
] {
let metrics = ExecutionPlanMetricsSet::new();
let metric = MetricBuilder::new(&metrics)
.with_type(metric_type)
.if_enabled(enabled_level)
.subset_time("conditional_time", 0);

assert_eq!(
metric.is_some(),
should_register,
"enabled_level={enabled_level:?} metric_type={metric_type:?}"
);
assert_eq!(
metrics
.clone_inner()
.sum_by_name("conditional_time")
.is_some(),
should_register,
"enabled_level={enabled_level:?} metric_type={metric_type:?}"
);
}

let metrics = ExecutionPlanMetricsSet::new();
let label = Label::new("label", "value");
let metric = MetricBuilder::new(&metrics)
.with_type(MetricType::Internal)
.with_category(MetricCategory::Timing)
.if_enabled(MetricType::Internal)
.with_label(label.clone())
.subset_time("internal_time", 0);

assert!(metric.is_some());

let metrics = metrics.clone_inner();
let metric = metrics
.iter()
.find(|metric| metric.value().name() == "internal_time")
.expect("registered internal metric");
assert_eq!(metric.metric_type(), MetricType::Internal);
assert_eq!(metric.metric_category(), Some(MetricCategory::Timing));
assert_eq!(metric.labels(), &[label]);
}

#[test]
fn test_elapsed_compute() {
let metrics = ExecutionPlanMetricsSet::new();
Expand Down
Loading
Loading