From aff2821f12ba3c7e43f52b78e113ccc10aa90b72 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Wed, 25 Mar 2026 16:30:06 -0400 Subject: [PATCH 01/13] Minor simplifications in frontend peeks --- src/adapter/src/client.rs | 10 ---------- src/adapter/src/frontend_peek.rs | 25 ++++++++++++++----------- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index 4fb7e177a94ec..e3b9a64ac4643 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -1160,16 +1160,6 @@ impl SessionClient { self.timeouts.recv().await } - /// Returns a reference to the PeekClient used for frontend peek sequencing. - pub fn peek_client(&self) -> &PeekClient { - &self.peek_client - } - - /// Returns a reference to the PeekClient used for frontend peek sequencing. - pub fn peek_client_mut(&mut self) -> &mut PeekClient { - &mut self.peek_client - } - /// Attempt to sequence a peek from the session task. /// /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 4cdf96201e42f..30f7338c39534 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -428,15 +428,17 @@ impl PeekClient { // We have checked the plan kind above. assert!(plan.allowed_in_read_only()); - let target_cluster = match session.transaction().cluster() { - // Use the current transaction's cluster. - Some(cluster_id) => TargetCluster::Transaction(cluster_id), - // If there isn't a current cluster set for a transaction, then try to auto route. - None => { - coord::catalog_serving::auto_run_on_catalog_server(&conn_catalog, session, &plan) - } - }; let (cluster, target_cluster_id, target_cluster_name) = { + let target_cluster = match session.transaction().cluster() { + // Use the current transaction's cluster. + Some(cluster_id) => TargetCluster::Transaction(cluster_id), + // If there isn't a current cluster set for a transaction, then try to auto route. + None => coord::catalog_serving::auto_run_on_catalog_server( + &conn_catalog, + session, + &plan, + ), + }; let cluster = catalog.resolve_target_cluster(target_cluster, session)?; (cluster, cluster.id, &cluster.name) }; @@ -1012,12 +1014,13 @@ impl PeekClient { }) } Err(err) => { - if optimizer.is_right() { + let optimizer = if let Either::Left(optimizer) = optimizer { + optimizer + } else { // COPY TO has no EXPLAIN BROKEN support return Err(err); - } + }; // SELECT/EXPLAIN error handling - let optimizer = optimizer.expect_left("checked above"); if let ExplainContext::Plan(explain_ctx) = explain_ctx { if explain_ctx.broken { // EXPLAIN BROKEN: log error and continue with defaults From 453a5ff267c7d776946d3913f1371586861c2fcb Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Wed, 25 Mar 2026 17:08:32 -0400 Subject: [PATCH 02/13] Less branching in the peek path The copy-to and regular paths were interleaved with shared logic, which complicated the control flow and required a bunch of Either wrapping. This shifts a bunch of code around to get to a more linear flow with less branching. --- src/adapter/src/frontend_peek.rs | 349 +++++++++++++++---------------- 1 file changed, 172 insertions(+), 177 deletions(-) diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 30f7338c39534..2ba57aeb20da2 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -11,13 +11,14 @@ use std::collections::BTreeMap; use std::collections::BTreeSet; use std::sync::Arc; -use itertools::{Either, Itertools}; +use itertools::Itertools; use mz_compute_types::ComputeInstanceId; use mz_controller_types::ClusterId; use mz_expr::{CollectionPlan, ResultSpec}; use mz_ore::cast::{CastFrom, CastLossy}; use mz_ore::collections::CollectionExt; use mz_ore::now::EpochMillis; +use mz_ore::task::JoinHandle; use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log}; use mz_repr::optimize::{OptimizerFeatures, OverrideFrom}; use mz_repr::role_id::RoleId; @@ -48,8 +49,8 @@ use crate::coord::{ }; use crate::explain::insights::PlanInsightsContext; use crate::explain::optimizer_trace::OptimizerTrace; +use crate::optimize::Optimize; use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder}; -use crate::optimize::{Optimize, OptimizerError}; use crate::session::{Session, TransactionOps, TransactionStatus}; use crate::statement_logging::WatchSetCreation; use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent}; @@ -494,44 +495,6 @@ impl PeekClient { let (_, view_id) = self.transient_id_gen.allocate_id(); let (_, index_id) = self.transient_id_gen.allocate_id(); - let mut optimizer = if let Some(mut copy_to_ctx) = copy_to_ctx { - // COPY TO path: calculate output_batch_count and create copy_to optimizer - let worker_counts = cluster.replicas().map(|r| { - let loc = &r.config.location; - loc.workers().unwrap_or_else(|| loc.num_processes()) - }); - let max_worker_count = match worker_counts.max() { - Some(count) => u64::cast_from(count), - None => { - return Err(AdapterError::NoClusterReplicasAvailable { - name: cluster.name.clone(), - is_managed: cluster.is_managed(), - }); - } - }; - copy_to_ctx.output_batch_count = Some(max_worker_count); - - Either::Right(optimize::copy_to::Optimizer::new( - Arc::clone(&catalog), - compute_instance_snapshot.clone(), - view_id, - copy_to_ctx, - optimizer_config, - self.optimizer_metrics.clone(), - )) - } else { - // SELECT/EXPLAIN path: create peek optimizer - Either::Left(optimize::peek::Optimizer::new( - Arc::clone(&catalog), - compute_instance_snapshot.clone(), - select_plan.finishing.clone(), - view_id, - index_id, - optimizer_config, - self.optimizer_metrics.clone(), - )) - }; - let target_replica_name = session.vars().cluster_replica(); let mut target_replica = target_replica_name .map(|name| { @@ -603,7 +566,8 @@ impl PeekClient { // # From peek_timestamp_read_hold - let dataflow_builder = DataflowBuilder::new(catalog.state(), compute_instance_snapshot); + let dataflow_builder = + DataflowBuilder::new(catalog.state(), compute_instance_snapshot.clone()); let input_id_bundle = dataflow_builder.sufficient_collections(source_ids.clone()); // ## From sequence_peek_timestamp @@ -848,44 +812,87 @@ impl PeekClient { } let source_ids_for_closure = source_ids.clone(); - let optimization_future = mz_ore::task::spawn_blocking( - || "optimize peek", - move || { - span.in_scope(|| { - let _dispatch_guard = explain_ctx.dispatch_guard(); - - let raw_expr = select_plan.source.clone(); - - // The purpose of wrapping the following in a closure is to control where the - // `?`s return from, so that even when a `catch_unwind_optimize` call fails, - // we can still handle `EXPLAIN BROKEN`. - let pipeline = || -> Result< - Either< - optimize::peek::GlobalLirPlan, - optimize::copy_to::GlobalLirPlan, - >, - OptimizerError, - > { - match optimizer.as_mut() { - Either::Left(optimizer) => { - // SELECT/EXPLAIN path - // HIR ⇒ MIR lowering and MIR optimization (local) - let local_mir_plan = - optimizer.catch_unwind_optimize(raw_expr.clone())?; - // Attach resolved context required to continue the pipeline. - let local_mir_plan = local_mir_plan.resolve( - timestamp_context.clone(), - &session_meta, - stats, - ); - // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global) - let global_lir_plan = - optimizer.catch_unwind_optimize(local_mir_plan)?; - Ok(Either::Left(global_lir_plan)) - } - Either::Right(optimizer) => { - // COPY TO path - // HIR ⇒ MIR lowering and MIR optimization (local) + let raw_expr = select_plan.source.clone(); + + let optimization_future: JoinHandle> = match copy_to_ctx { + Some(mut copy_to_ctx) => { + // COPY TO path: calculate output_batch_count and create copy_to optimizer + let worker_counts = cluster.replicas().map(|r| { + let loc = &r.config.location; + loc.workers().unwrap_or_else(|| loc.num_processes()) + }); + let max_worker_count = match worker_counts.max() { + Some(count) => u64::cast_from(count), + None => { + return Err(AdapterError::NoClusterReplicasAvailable { + name: cluster.name.clone(), + is_managed: cluster.is_managed(), + }); + } + }; + copy_to_ctx.output_batch_count = Some(max_worker_count); + + let mut optimizer = optimize::copy_to::Optimizer::new( + Arc::clone(&catalog), + compute_instance_snapshot, + view_id, + copy_to_ctx, + optimizer_config, + self.optimizer_metrics.clone(), + ); + + mz_ore::task::spawn_blocking( + || "optimize peek", + move || { + span.in_scope(|| { + let _dispatch_guard = explain_ctx.dispatch_guard(); + + // COPY TO path + // HIR ⇒ MIR lowering and MIR optimization (local) + let local_mir_plan = + optimizer.catch_unwind_optimize(raw_expr.clone())?; + // Attach resolved context required to continue the pipeline. + let local_mir_plan = local_mir_plan.resolve( + timestamp_context.clone(), + &session_meta, + stats, + ); + // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global) + let global_lir_plan = + optimizer.catch_unwind_optimize(local_mir_plan)?; + Ok(Execution::CopyToS3 { + global_lir_plan, + source_ids: source_ids_for_closure, + }) + }) + }, + ) + } + None => { + // SELECT/EXPLAIN path: create peek optimizer + let mut optimizer = optimize::peek::Optimizer::new( + Arc::clone(&catalog), + compute_instance_snapshot, + select_plan.finishing.clone(), + view_id, + index_id, + optimizer_config, + self.optimizer_metrics.clone(), + ); + + mz_ore::task::spawn_blocking( + || "optimize peek", + move || { + span.in_scope(|| { + let _dispatch_guard = explain_ctx.dispatch_guard(); + + // SELECT/EXPLAIN path + // HIR ⇒ MIR lowering and MIR optimization (local) + + // The purpose of wrapping the following in a closure is to control where the + // `?`s return from, so that even when a `catch_unwind_optimize` call fails, + // we can still handle `EXPLAIN BROKEN`. + let pipeline = || { let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr.clone())?; // Attach resolved context required to continue the pipeline. @@ -897,66 +904,85 @@ impl PeekClient { // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global) let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?; - Ok(Either::Right(global_lir_plan)) - } - } - }; - - let global_lir_plan_result = pipeline(); - let optimization_finished_at = now(); + Ok::<_, AdapterError>(global_lir_plan) + }; - let create_insights_ctx = - |optimizer: &optimize::peek::Optimizer, - is_notice: bool| - -> Option> { - if !needs_plan_insights { - return None; - } + let global_lir_plan_result = pipeline(); + let optimization_finished_at = now(); + + let create_insights_ctx = + |optimizer: &optimize::peek::Optimizer, + is_notice: bool| + -> Option> { + if !needs_plan_insights { + return None; + } + + let catalog = catalog_for_insights.as_ref()?; + + let enable_re_optimize = if needs_plan_insights { + // Disable any plan insights that use the optimizer if we only want the + // notice and plan optimization took longer than the threshold. This is + // to prevent a situation where optimizing takes a while and there are + // lots of clusters, which would delay peek execution by the product of + // those. + // + // (This heuristic doesn't work well, see #9492.) + let dyncfgs = catalog.system_config().dyncfgs(); + let opt_limit = mz_adapter_types::dyncfgs + ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION + .get(dyncfgs); + !(is_notice && optimizer.duration() > opt_limit) + } else { + false + }; - let catalog = catalog_for_insights.as_ref()?; - - let enable_re_optimize = if needs_plan_insights { - // Disable any plan insights that use the optimizer if we only want the - // notice and plan optimization took longer than the threshold. This is - // to prevent a situation where optimizing takes a while and there are - // lots of clusters, which would delay peek execution by the product of - // those. - // - // (This heuristic doesn't work well, see #9492.) - let dyncfgs = catalog.system_config().dyncfgs(); - let opt_limit = mz_adapter_types::dyncfgs - ::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION - .get(dyncfgs); - !(is_notice && optimizer.duration() > opt_limit) - } else { - false + Some(Box::new(PlanInsightsContext { + stmt: select_plan + .select + .as_deref() + .map(Clone::clone) + .map(Statement::Select), + raw_expr: raw_expr.clone(), + catalog: Arc::clone(catalog), + compute_instances, + target_instance: target_cluster_name, + metrics: optimizer.metrics().clone(), + finishing: optimizer.finishing().clone(), + optimizer_config: optimizer.config().clone(), + session: session_meta, + timestamp_context, + view_id: optimizer.select_id(), + index_id: optimizer.index_id(), + enable_re_optimize, + })) + }; + + let global_lir_plan = match global_lir_plan_result { + Ok(plan) => plan, + Err(err) => { + let result = if let ExplainContext::Plan(explain_ctx) = + explain_ctx + && explain_ctx.broken + { + // EXPLAIN BROKEN: log error and continue with defaults + tracing::error!( + "error while handling EXPLAIN statement: {}", + err + ); + Ok(Execution::ExplainPlan { + df_meta: Default::default(), + explain_ctx, + optimizer, + insights_ctx: None, + }) + } else { + Err(err) + }; + return result; + } }; - Some(Box::new(PlanInsightsContext { - stmt: select_plan - .select - .as_deref() - .map(Clone::clone) - .map(Statement::Select), - raw_expr: raw_expr.clone(), - catalog: Arc::clone(catalog), - compute_instances, - target_instance: target_cluster_name, - metrics: optimizer.metrics().clone(), - finishing: optimizer.finishing().clone(), - optimizer_config: optimizer.config().clone(), - session: session_meta, - timestamp_context, - view_id: optimizer.select_id(), - index_id: optimizer.index_id(), - enable_re_optimize, - })) - }; - - match global_lir_plan_result { - Ok(Either::Left(global_lir_plan)) => { - // SELECT/EXPLAIN path - let optimizer = optimizer.unwrap_left(); match explain_ctx { ExplainContext::Plan(explain_ctx) => { let (_, df_meta, _) = global_lir_plan.unapply(); @@ -1005,46 +1031,12 @@ impl PeekClient { }) } } - } - Ok(Either::Right(global_lir_plan)) => { - // COPY TO S3 path - Ok(Execution::CopyToS3 { - global_lir_plan, - source_ids: source_ids_for_closure, - }) - } - Err(err) => { - let optimizer = if let Either::Left(optimizer) = optimizer { - optimizer - } else { - // COPY TO has no EXPLAIN BROKEN support - return Err(err); - }; - // SELECT/EXPLAIN error handling - if let ExplainContext::Plan(explain_ctx) = explain_ctx { - if explain_ctx.broken { - // EXPLAIN BROKEN: log error and continue with defaults - tracing::error!( - "error while handling EXPLAIN statement: {}", - err - ); - Ok(Execution::ExplainPlan { - df_meta: Default::default(), - explain_ctx, - optimizer, - insights_ctx: None, - }) - } else { - Err(err) - } - } else { - Err(err) - } - } - } - }) - }, - ); + }) + }, + ) + } + }; + let optimization_timeout = *session.vars().statement_timeout(); let optimization_result = // Note: spawn_blocking tasks cannot be cancelled, so on timeout we stop waiting but the @@ -1053,12 +1045,15 @@ impl PeekClient { // optimizer runs. match tokio::time::timeout(optimization_timeout, optimization_future).await { Ok(Ok(result)) => result, - Ok(Err(optimizer_error)) => { + Ok(Err(AdapterError::Optimizer(err))) => { return Err(AdapterError::Internal(format!( "internal error in optimizer: {}", - optimizer_error + err ))); } + Ok(Err(err)) => { + return Err(err); + } Err(_elapsed) => { warn!("optimize peek timed out after {:?}", optimization_timeout); return Err(AdapterError::StatementTimeout); From fd69bae8a58d71a5af140af80e38e623cf84588d Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Wed, 25 Mar 2026 17:31:58 -0400 Subject: [PATCH 03/13] Output enum in the peek path --- src/adapter/src/frontend_peek.rs | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 2ba57aeb20da2..5354bd4ba1f65 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -313,7 +313,13 @@ impl PeekClient { let pcx = session.pcx(); let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?; - let (select_plan, explain_ctx, copy_to_ctx) = match &plan { + /// What do we do with the result of the select? + enum OutputContext { + Default, + CopyTo(CopyToContext), + } + + let (select_plan, explain_ctx, output_ctx) = match &plan { Plan::Select(select_plan) => { let explain_ctx = if session.vars().emit_plan_insights_notice() { let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths()); @@ -321,11 +327,15 @@ impl PeekClient { } else { ExplainContext::None }; - (select_plan, explain_ctx, None) + (select_plan, explain_ctx, OutputContext::Default) } Plan::ShowColumns(show_columns_plan) => { // ShowColumns wraps a SelectPlan, extract it and proceed as normal. - (&show_columns_plan.select_plan, ExplainContext::None, None) + ( + &show_columns_plan.select_plan, + ExplainContext::None, + OutputContext::Default, + ) } Plan::ExplainPlan(plan::ExplainPlanPlan { stage, @@ -344,7 +354,7 @@ impl PeekClient { desc: Some(desc.clone()), optimizer_trace, }); - (plan, explain_ctx, None) + (plan, explain_ctx, OutputContext::Default) } // COPY TO S3 Plan::CopyTo(plan::CopyToPlan { @@ -369,7 +379,11 @@ impl PeekClient { output_batch_count: None, }; - (select_plan, ExplainContext::None, Some(copy_to_ctx)) + ( + select_plan, + ExplainContext::None, + OutputContext::CopyTo(copy_to_ctx), + ) } Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => { // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements @@ -380,7 +394,7 @@ impl PeekClient { desc: _, }) => { let explain_ctx = ExplainContext::Pushdown; - (plan, explain_ctx, None) + (plan, explain_ctx, OutputContext::Default) } _ => { // This shouldn't happen because we already checked for this at the AST @@ -814,8 +828,8 @@ impl PeekClient { let source_ids_for_closure = source_ids.clone(); let raw_expr = select_plan.source.clone(); - let optimization_future: JoinHandle> = match copy_to_ctx { - Some(mut copy_to_ctx) => { + let optimization_future: JoinHandle> = match output_ctx { + OutputContext::CopyTo(mut copy_to_ctx) => { // COPY TO path: calculate output_batch_count and create copy_to optimizer let worker_counts = cluster.replicas().map(|r| { let loc = &r.config.location; @@ -868,7 +882,7 @@ impl PeekClient { }, ) } - None => { + OutputContext::Default => { // SELECT/EXPLAIN path: create peek optimizer let mut optimizer = optimize::peek::Optimizer::new( Arc::clone(&catalog), From 3da73f3c75db3dde48528f25ff3437d752325f7d Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Wed, 25 Mar 2026 18:00:15 -0400 Subject: [PATCH 04/13] Inline a single-use function --- src/sql/src/plan/statement/dml.rs | 40 ++++++++++--------------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/src/sql/src/plan/statement/dml.rs b/src/sql/src/plan/statement/dml.rs index c064a952d07bc..4fb3d26a69645 100644 --- a/src/sql/src/plan/statement/dml.rs +++ b/src/sql/src/plan/statement/dml.rs @@ -1510,32 +1510,6 @@ pub fn plan_explain_timestamp( })) } -/// Plans and decorrelates a [`Query`]. Like [`query::plan_root_query`], but -/// returns an [`MirRelationExpr`], which cannot include correlated expressions. -#[deprecated = "Use `query::plan_root_query` and use `HirRelationExpr` in `~Plan` structs."] -pub fn plan_query( - scx: &StatementContext, - query: Query, - params: &Params, - lifetime: QueryLifetime, -) -> Result, PlanError> { - let query::PlannedRootQuery { - mut expr, - desc, - finishing, - scope, - } = query::plan_root_query(scx, query, lifetime)?; - expr.bind_parameters(scx, lifetime, params)?; - - Ok(query::PlannedRootQuery { - // No metrics passed! One more reason not to use this deprecated function. - expr: expr.lower(scx.catalog.system_vars(), None)?, - desc, - finishing, - scope, - }) -} - generate_extracted_config!(SubscribeOption, (Snapshot, bool), (Progress, bool)); pub fn describe_subscribe( @@ -1669,7 +1643,19 @@ pub fn plan_subscribe( } SubscribeRelation::Query(query) => { #[allow(deprecated)] // TODO(aalexandrov): Use HirRelationExpr in Subscribe - let query = plan_query(scx, query, params, QueryLifetime::Subscribe)?; + let query::PlannedRootQuery { + mut expr, + desc, + finishing, + scope, + } = query::plan_root_query(scx, query, QueryLifetime::Subscribe)?; + expr.bind_parameters(scx, QueryLifetime::Subscribe, params)?; + let query = query::PlannedRootQuery { + expr: expr.lower(scx.catalog.system_vars(), None)?, + desc, + finishing, + scope, + }; // There's no way to apply finishing operations to a `SUBSCRIBE` directly, so the // finishing should have already been turned into a `TopK` by // `plan_query` / `plan_root_query`, upon seeing the `QueryLifetime::Subscribe`. From ac5ec9a812c85458908a3b2139b63d4b07231fc5 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Wed, 25 Mar 2026 18:01:25 -0400 Subject: [PATCH 05/13] Subscribe threads HIR now Subscribe statements capture the full select now --- src/adapter/src/coord/catalog_serving.rs | 15 +++++++-------- src/adapter/src/optimize/subscribe.rs | 7 +++++-- src/sql/src/plan.rs | 13 +++++++------ src/sql/src/plan/statement/dml.rs | 23 ++++++++++++++++++----- 4 files changed, 37 insertions(+), 21 deletions(-) diff --git a/src/adapter/src/coord/catalog_serving.rs b/src/adapter/src/coord/catalog_serving.rs index aa3a92b5c4b2a..0fb8ae2ceb50f 100644 --- a/src/adapter/src/coord/catalog_serving.rs +++ b/src/adapter/src/coord/catalog_serving.rs @@ -27,6 +27,7 @@ use mz_sql::plan::{ SubscribePlan, }; use smallvec::SmallVec; +use std::collections::BTreeSet; use crate::AdapterError; use crate::catalog::ConnCatalog; @@ -47,7 +48,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>( plan.from.depends_on(), match &plan.from { SubscribeFrom::Id(_) => false, - SubscribeFrom::Query { expr, desc: _ } => expr.could_run_expensive_function(), + SubscribeFrom::Query { select, .. } => select.source.could_run_expensive_function(), }, ) }; @@ -234,18 +235,16 @@ pub fn check_cluster_restrictions( // // Note: Creating other objects like Materialized Views is prevented elsewhere. We define the // 'mz_catalog_server' cluster to be "read-only", which restricts these actions. - let depends_on: Box> = match plan { - Plan::ReadThenWrite(plan) => Box::new(plan.selection.depends_on().into_iter()), - Plan::Subscribe(plan) => match plan.from { - SubscribeFrom::Id(id) => Box::new(std::iter::once(id)), - SubscribeFrom::Query { ref expr, .. } => Box::new(expr.depends_on().into_iter()), - }, - Plan::Select(plan) => Box::new(plan.source.depends_on().into_iter()), + let depends_on: BTreeSet = match plan { + Plan::ReadThenWrite(plan) => plan.selection.depends_on(), + Plan::Subscribe(plan) => plan.from.depends_on(), + Plan::Select(plan) => plan.source.depends_on(), _ => return Ok(()), }; // Collect any items that are not allowed to be run on the catalog server cluster. let unallowed_dependents: SmallVec<[String; 2]> = depends_on + .into_iter() .filter_map(|id| { let item = catalog.get_item_by_global_id(&id); let full_name = catalog.resolve_full_name(item.name()); diff --git a/src/adapter/src/optimize/subscribe.rs b/src/adapter/src/optimize/subscribe.rs index 6947fa40c25cf..0d83362114737 100644 --- a/src/adapter/src/optimize/subscribe.rs +++ b/src/adapter/src/optimize/subscribe.rs @@ -21,7 +21,7 @@ use mz_ore::collections::CollectionExt; use mz_ore::soft_assert_or_log; use mz_repr::{GlobalId, Timestamp}; use mz_sql::optimizer_metrics::OptimizerMetrics; -use mz_sql::plan::SubscribeFrom; +use mz_sql::plan::{HirToMirConfig, SubscribeFrom, SubscribePlan}; use mz_transform::TransformCtx; use mz_transform::dataflow::{DataflowMetainfo, optimize_dataflow_snapshot}; use mz_transform::normalize_lets::normalize_lets; @@ -222,7 +222,7 @@ impl Optimize for Optimizer { }; df_desc.export_sink(self.sink_id, sink_description); } - SubscribeFrom::Query { expr, desc } => { + SubscribeFrom::Query { select, desc } => { // TODO: Change the `expr` type to be `HirRelationExpr` and run // HIR ⇒ MIR lowering and decorrelation here. This would allow // us implement something like `EXPLAIN RAW PLAN FOR SUBSCRIBE.` @@ -238,6 +238,9 @@ impl Optimize for Optimizer { Some(&mut self.metrics), Some(self.view_id), ); + let expr = select + .source + .lower(HirToMirConfig::from(&self.config), None)?; let expr = optimize_mir_local(expr, &mut transform_ctx)?; df_builder.import_view_into_dataflow( diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index a44770aa455e6..e1458c2deaf6d 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -37,9 +37,7 @@ use ipnet::IpNet; use maplit::btreeset; use mz_adapter_types::compaction::CompactionWindow; use mz_controller_types::{ClusterId, ReplicaId}; -use mz_expr::{ - CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing, -}; +use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing}; use mz_ore::now::{self, NOW_ZERO}; use mz_pgcopy::CopyFormatParams; use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem}; @@ -927,7 +925,7 @@ pub enum SubscribeFrom { Id(GlobalId), /// Query to subscribe to. Query { - expr: MirRelationExpr, + select: SelectPlan, desc: RelationDesc, }, } @@ -936,14 +934,17 @@ impl SubscribeFrom { pub fn depends_on(&self) -> BTreeSet { match self { SubscribeFrom::Id(id) => BTreeSet::from([*id]), - SubscribeFrom::Query { expr, .. } => expr.depends_on(), + SubscribeFrom::Query { select, .. } => select.source.depends_on(), } } pub fn contains_temporal(&self) -> bool { match self { SubscribeFrom::Id(_) => false, - SubscribeFrom::Query { expr, .. } => expr.contains_temporal(), + SubscribeFrom::Query { select, .. } => select + .source + .contains_temporal() + .expect("Unexpected error in `visit_scalars` call"), } } } diff --git a/src/sql/src/plan/statement/dml.rs b/src/sql/src/plan/statement/dml.rs index 4fb3d26a69645..53ad5574642d8 100644 --- a/src/sql/src/plan/statement/dml.rs +++ b/src/sql/src/plan/statement/dml.rs @@ -18,8 +18,8 @@ use std::collections::{BTreeMap, BTreeSet}; use itertools::Itertools; use mz_arrow_util::builder::ArrowBuilder; +use mz_expr::RowSetFinishing; use mz_expr::visit::Visit; -use mz_expr::{MirRelationExpr, RowSetFinishing}; use mz_ore::num::NonNeg; use mz_ore::soft_panic_or_log; use mz_ore::str::separated; @@ -1620,6 +1620,8 @@ pub fn plan_subscribe( params: &Params, copy_to: Option, ) -> Result { + let when = query::plan_as_of(scx, as_of)?; + let (from, desc, scope) = match relation { SubscribeRelation::Name(name) => { let item = scx.get_item_by_resolved_name(&name)?; @@ -1651,7 +1653,7 @@ pub fn plan_subscribe( } = query::plan_root_query(scx, query, QueryLifetime::Subscribe)?; expr.bind_parameters(scx, QueryLifetime::Subscribe, params)?; let query = query::PlannedRootQuery { - expr: expr.lower(scx.catalog.system_vars(), None)?, + expr, desc, finishing, scope, @@ -1663,11 +1665,23 @@ pub fn plan_subscribe( &query.finishing, query.desc.arity() )); + let finishing = RowSetFinishing { + order_by: vec![], + limit: None, + offset: 0, + project: query.finishing.project, + }; let desc = query.desc.clone(); ( SubscribeFrom::Query { - expr: query.expr, - desc: query.desc, + select: SelectPlan { + select: None, + source: query.expr, + when: when.clone(), + finishing, + copy_to: None, + }, + desc: desc.clone(), }, desc, query.scope, @@ -1675,7 +1689,6 @@ pub fn plan_subscribe( } }; - let when = query::plan_as_of(scx, as_of)?; let up_to = up_to .map(|up_to| plan_as_of_or_up_to(scx, up_to)) .transpose()?; From 61d16841175e974b5672910ceccfc49d1df8db90 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Wed, 25 Mar 2026 18:01:25 -0400 Subject: [PATCH 06/13] Support subscribe on the frontend-peek path --- src/adapter/src/client.rs | 1 + src/adapter/src/command.rs | 18 +- src/adapter/src/coord.rs | 1 + src/adapter/src/coord/command_handler.rs | 32 ++++ .../src/coord/sequencer/inner/subscribe.rs | 100 +++++++---- src/adapter/src/frontend_peek.rs | 162 +++++++++++++++++- src/adapter/src/optimize/subscribe.rs | 20 +-- 7 files changed, 275 insertions(+), 59 deletions(-) diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index e3b9a64ac4643..232542fe3a589 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -1080,6 +1080,7 @@ impl SessionClient { | Command::GetTransactionReadHoldsBundle { .. } | Command::StoreTransactionReadHolds { .. } | Command::ExecuteSlowPathPeek { .. } + | Command::ExecuteSubscribe { .. } | Command::CopyToPreflight { .. } | Command::ExecuteCopyTo { .. } | Command::ExecuteSideEffectingFunc { .. } diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index 4e6a80cc43cad..55d4ddd526891 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -35,6 +35,7 @@ use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType use mz_sql::ast::{FetchDirection, Raw, Statement}; use mz_sql::catalog::ObjectType; use mz_sql::optimizer_metrics::OptimizerMetrics; +use mz_sql::plan; use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc}; use mz_sql::session::user::User; use mz_sql::session::vars::{OwnedVarInput, SystemVars}; @@ -52,11 +53,11 @@ use crate::coord::timestamp_selection::TimestampDetermination; use crate::coord::{ExecuteContextExtra, ExecuteContextGuard}; use crate::error::AdapterError; use crate::session::{EndTransactionAction, RowBatchStream, Session}; -use crate::statement_logging::WatchSetCreation; use crate::statement_logging::{ FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy, StatementLoggingFrontend, }; +use crate::statement_logging::{StatementLoggingId, WatchSetCreation}; use crate::util::Transmittable; use crate::webhook::AppendWebhookResponse; use crate::{ @@ -264,6 +265,19 @@ pub enum Command { tx: oneshot::Sender>, }, + ExecuteSubscribe { + df_desc: DataflowDescription, + dependency_ids: BTreeSet, + cluster_id: ComputeInstanceId, + replica_id: Option, + conn_id: ConnectionId, + session_uuid: Uuid, + read_holds: ReadHolds, + plan: plan::SubscribePlan, + statement_logging_id: Option, + tx: oneshot::Sender>, + }, + /// Preflight check for COPY TO S3 operation. This runs the slow S3 operations /// (loading SDK config, checking bucket path, verifying permissions, uploading sentinel) /// in a background task to avoid blocking the coordinator. @@ -364,6 +378,7 @@ impl Command { | Command::GetTransactionReadHoldsBundle { .. } | Command::StoreTransactionReadHolds { .. } | Command::ExecuteSlowPathPeek { .. } + | Command::ExecuteSubscribe { .. } | Command::CopyToPreflight { .. } | Command::ExecuteCopyTo { .. } | Command::ExecuteSideEffectingFunc { .. } @@ -401,6 +416,7 @@ impl Command { | Command::GetTransactionReadHoldsBundle { .. } | Command::StoreTransactionReadHolds { .. } | Command::ExecuteSlowPathPeek { .. } + | Command::ExecuteSubscribe { .. } | Command::CopyToPreflight { .. } | Command::ExecuteCopyTo { .. } | Command::ExecuteSideEffectingFunc { .. } diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 1e7a95dc87cf2..a4370bfffa5d8 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -445,6 +445,7 @@ impl Message { } Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds", Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek", + Command::ExecuteSubscribe { .. } => "execute-subscribe", Command::CopyToPreflight { .. } => "copy-to-preflight", Command::ExecuteCopyTo { .. } => "execute-copy-to", Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func", diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 90fe5404dc91e..d58c6b4c095aa 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -431,6 +431,38 @@ impl Coordinator { let _ = tx.send(result); } + Command::ExecuteSubscribe { + df_desc, + dependency_ids, + cluster_id, + replica_id, + conn_id, + session_uuid, + read_holds, + plan, + statement_logging_id, + tx, + } => { + let mut ctx_extra = ExecuteContextGuard::new( + statement_logging_id, + self.internal_cmd_tx.clone(), + ); + let result = self + .implement_subscribe( + &mut ctx_extra, + df_desc, + dependency_ids, + cluster_id, + replica_id, + conn_id, + session_uuid, + read_holds, + plan, + ) + .await; + let _ = tx.send(result); + } + Command::CopyToPreflight { s3_sink_connection, sink_id, diff --git a/src/adapter/src/coord/sequencer/inner/subscribe.rs b/src/adapter/src/coord/sequencer/inner/subscribe.rs index 429ee32b14067..32c5ed5c95297 100644 --- a/src/adapter/src/coord/sequencer/inner/subscribe.rs +++ b/src/adapter/src/coord/sequencer/inner/subscribe.rs @@ -8,15 +8,23 @@ // by the Apache License, Version 2.0. use maplit::btreemap; +use mz_adapter_types::connection::ConnectionId; +use mz_cluster_client::ReplicaId; +use mz_compute_types::ComputeInstanceId; +use mz_compute_types::dataflows::DataflowDescription; +use mz_compute_types::plan::Plan; +use mz_ore::collections::CollectionExt; use mz_ore::instrument; use mz_repr::GlobalId; use mz_repr::explain::{ExprHumanizerExt, TransientItem}; use mz_repr::optimize::{OptimizerFeatures, OverrideFrom}; use mz_sql::plan::{self, QueryWhen, SubscribeFrom}; use mz_sql::session::metadata::SessionMetadata; +use std::collections::BTreeSet; use timely::progress::Antichain; use tokio::sync::mpsc; use tracing::Span; +use uuid::Uuid; use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe}; use crate::command::ExecuteResponse; @@ -31,7 +39,9 @@ use crate::error::AdapterError; use crate::explain::optimizer_trace::OptimizerTrace; use crate::optimize::Optimize; use crate::session::{Session, TransactionOps}; -use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize}; +use crate::{ + AdapterNotice, ExecuteContext, ExecuteContextGuard, ReadHolds, TimelineContext, optimize, +}; impl Staged for SubscribeStage { type Ctx = ExecuteContext; @@ -428,42 +438,76 @@ impl Coordinator { SubscribeFinish { validity: _, cluster_id, - plan: - plan::SubscribePlan { - copy_to, - emit_progress, - output, - .. - }, + plan, global_lir_plan, dependency_ids, replica_id, }: SubscribeFinish, ) -> Result>, AdapterError> { - let sink_id = global_lir_plan.sink_id(); + let (df_desc, df_meta) = global_lir_plan.unapply(); + emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices); + let conn_id = ctx.session.conn_id().clone(); + let session_uuid = ctx.session().uuid(); + let txn_read_holds = self + .txn_read_holds + .remove(&conn_id) + .expect("must have previously installed read holds"); + let resp = self + .implement_subscribe( + ctx.extra_mut(), + df_desc, + dependency_ids, + cluster_id, + replica_id, + conn_id, + session_uuid, + txn_read_holds, + plan, + ) + .await?; + Ok(StageResult::Response(resp)) + } + + #[instrument] + pub(crate) async fn implement_subscribe( + &mut self, + ctx_extra: &mut ExecuteContextGuard, + df_desc: DataflowDescription, + dependency_ids: BTreeSet, + cluster_id: ComputeInstanceId, + replica_id: Option, + conn_id: ConnectionId, + session_uuid: Uuid, + read_holds: ReadHolds, + plan: plan::SubscribePlan, + ) -> Result { + let sink_id = df_desc.sink_id(); let (tx, rx) = mpsc::unbounded_channel(); let active_subscribe = ActiveSubscribe { - conn_id: ctx.session().conn_id().clone(), - session_uuid: ctx.session().uuid(), + conn_id: conn_id.clone(), + session_uuid, channel: tx, - emit_progress, - as_of: global_lir_plan - .as_of() + emit_progress: plan.emit_progress, + as_of: df_desc + .as_of + .as_ref() + .and_then(|t| t.as_option()) + .copied() .expect("set to Some in an earlier stage"), - arity: global_lir_plan.sink_desc().from_desc.arity(), + arity: df_desc + .sink_exports + .values() + .into_element() + .from_desc + .arity(), cluster_id, depends_on: dependency_ids, start_time: self.now(), - output, + output: plan.output, }; active_subscribe.initialize(); - let (df_desc, df_meta) = global_lir_plan.unapply(); - - // Emit notices. - emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices); - // Add metadata for the new SUBSCRIBE. let write_notify_fut = self .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe)) @@ -475,28 +519,22 @@ impl Coordinator { // requests to external services, which can take time, so we run them concurrently. let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await; - // Release the pre-optimization read holds because the controller is now handling those. - let txn_read_holds = self - .txn_read_holds - .remove(ctx.session().conn_id()) - .expect("must have previously installed read holds"); - // Explicitly drop read holds, just to make it obvious what's happening. - drop(txn_read_holds); + drop(read_holds); let resp = ExecuteResponse::Subscribing { rx, - ctx_extra: std::mem::take(ctx.extra_mut()), + ctx_extra: std::mem::take(ctx_extra), instance_id: cluster_id, }; - let resp = match copy_to { + let resp = match plan.copy_to { None => resp, Some(format) => ExecuteResponse::CopyTo { format, resp: Box::new(resp), }, }; - Ok(StageResult::Response(resp)) + Ok(resp) } #[instrument] diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 5354bd4ba1f65..e6456ee6a6268 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -13,6 +13,7 @@ use std::sync::Arc; use itertools::Itertools; use mz_compute_types::ComputeInstanceId; +use mz_compute_types::dataflows::DataflowDescription; use mz_controller_types::ClusterId; use mz_expr::{CollectionPlan, ResultSpec}; use mz_ore::cast::{CastFrom, CastLossy}; @@ -25,15 +26,18 @@ use mz_repr::role_id::RoleId; use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp}; use mz_sql::ast::Raw; use mz_sql::catalog::CatalogCluster; -use mz_sql::plan::Params; -use mz_sql::plan::{self, Explainee, ExplaineeStatement, Plan, QueryWhen}; +use mz_sql::plan::{self, Explainee, ExplaineeStatement, Plan, QueryWhen, SubscribePlan}; +use mz_sql::plan::{Params, SubscribeFrom}; use mz_sql::rbac; use mz_sql::session::metadata::SessionMetadata; use mz_sql::session::vars::IsolationLevel; -use mz_sql_parser::ast::{CopyDirection, CopyRelation, ExplainStage, ShowStatement, Statement}; +use mz_sql_parser::ast::{ + CopyDirection, ExplainStage, ShowStatement, Statement, SubscribeRelation, +}; use mz_transform::EmptyStatisticsOracle; use mz_transform::dataflow::DataflowMetainfo; use opentelemetry::trace::TraceContextExt; +use timely::progress::Antichain; use tracing::{Span, debug, warn}; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -180,6 +184,13 @@ impl PeekClient { } } } + Statement::Subscribe(subscribe) => { + if let SubscribeRelation::Query(_) = &subscribe.relation { + // We have a select statement to process; continue. + } else { + return Ok(None); + } + } _ => { debug!( "Bailing out from try_frontend_peek, because statement type is not supported" @@ -228,16 +239,20 @@ impl PeekClient { if let Some(logging_id) = statement_logging_id { let reason = match &result { // Streaming results are handled asynchronously by the coordinator - Ok(Some(ExecuteResponse::SendingRowsStreaming { .. })) => { - // Don't log here - the peek is still executing. + Ok(Some( + ExecuteResponse::SendingRowsStreaming { .. } + | ExecuteResponse::Subscribing { .. }, + )) => { + // Don't log here - the peek or subscribe is still executing. // It will be logged when handle_peek_notification is called. return result; } // COPY TO needs to check its inner response Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => { match inner.as_ref() { - ExecuteResponse::SendingRowsStreaming { .. } => { - // Don't log here - the peek is still executing. + ExecuteResponse::SendingRowsStreaming { .. } + | ExecuteResponse::Subscribing { .. } => { + // Don't log here - the peek or subscribe is still executing. // It will be logged when handle_peek_notification is called. return result; } @@ -317,6 +332,7 @@ impl PeekClient { enum OutputContext { Default, CopyTo(CopyToContext), + Subscribe { plan: SubscribePlan }, } let (select_plan, explain_ctx, output_ctx) = match &plan { @@ -424,6 +440,27 @@ impl PeekClient { .await?; return Ok(Some(response)); } + Plan::Subscribe(subscribe) => { + let select = match &subscribe.from { + SubscribeFrom::Id(_) => { + // This shouldn't happen because we already checked for this at the AST + // level before calling `try_frontend_peek_inner`. + soft_panic_or_log!( + "Unexpected plan kind in frontend peek sequencing: {:?}", + subscribe + ); + return Ok(None); + } + SubscribeFrom::Query { select, .. } => select, + }; + ( + select, + ExplainContext::None, + OutputContext::Subscribe { + plan: subscribe.clone(), + }, + ) + } _ => { // This shouldn't happen because we already checked for this at the AST // level before calling `try_frontend_peek_inner`. @@ -598,7 +635,8 @@ impl PeekClient { // it would be the cleanest to just simply disallow AS OF queries inside transactions. let in_immediate_multi_stmt_txn = session .transaction() - .in_immediate_multi_stmt_txn(&select_plan.when); + .in_immediate_multi_stmt_txn(&select_plan.when) + && !matches!(output_ctx, OutputContext::Subscribe { .. }); // Fetch or generate a timestamp for this query and fetch or acquire read holds. let (determination, read_holds) = match session.get_transaction_timestamp_determination() { @@ -764,7 +802,9 @@ impl PeekClient { // depend on whether or not reads have occurred in the txn. let requires_linearization = (&explain_ctx).into(); let mut transaction_determination = determination.clone(); - if select_plan.when.is_transactional() { + if matches!(output_ctx, OutputContext::Subscribe { .. }) { + session.add_transaction_ops(TransactionOps::Subscribe)?; + } else if select_plan.when.is_transactional() { session.add_transaction_ops(TransactionOps::Peeks { determination: transaction_determination, cluster_id: target_cluster_id, @@ -1049,6 +1089,58 @@ impl PeekClient { }, ) } + OutputContext::Subscribe { plan } => { + let catalog: Arc = Arc::clone(&catalog); + let mut optimizer = optimize::subscribe::Optimizer::new( + catalog, + compute_instance_snapshot.clone(), + view_id, + index_id, + plan.with_snapshot, + plan.up_to, + "TODO".to_string(), + optimizer_config, + self.optimizer_metrics.clone(), + ); + mz_ore::task::spawn_blocking( + || "optimize subscribe", + move || { + span.in_scope(|| { + let _dispatch_guard = explain_ctx.dispatch_guard(); + + // SELECT/EXPLAIN path + // HIR ⇒ MIR lowering and MIR optimization (local) + let global_mir_plan = + optimizer.catch_unwind_optimize(plan.from.clone())?; + let as_of = timestamp_context.timestamp_or_default(); + + // Attach resolved context required to continue the pipeline. + if let Some(up_to) = optimizer.up_to() { + if as_of > up_to { + return Err(AdapterError::AbsurdSubscribeBounds { + as_of, + up_to, + }); + } + } + let local_mir_plan = + global_mir_plan.resolve(Antichain::from_elem(as_of)); + + let global_lir_plan = + optimizer.catch_unwind_optimize(local_mir_plan)?; + let optimization_finished_at = now(); + + let (df_desc, df_meta) = global_lir_plan.unapply(); + Ok(Execution::Subscribe { + subscribe_plan: plan, + df_desc, + df_meta, + optimization_finished_at, + }) + }) + }, + ) + } }; let optimization_timeout = *session.vars().statement_timeout(); @@ -1293,6 +1385,39 @@ impl PeekClient { }, })) } + Execution::Subscribe { + subscribe_plan, + df_desc, + df_meta, + optimization_finished_at: _optimization_finished_at, + } => { + if df_desc.as_of.as_ref().expect("as of set") == &df_desc.until { + session.add_notice(AdapterNotice::EqualSubscribeBounds { + bound: *df_desc.until.as_option().expect("as of set"), + }); + } + coord::sequencer::emit_optimizer_notices( + &*catalog, + session, + &df_meta.optimizer_notices, + ); + + let response = self + .call_coordinator(|tx| Command::ExecuteSubscribe { + df_desc, + dependency_ids: subscribe_plan.from.depends_on(), + cluster_id: target_cluster_id, + replica_id: target_replica, + conn_id: session.conn_id().clone(), + session_uuid: session.uuid(), + read_holds, + plan: subscribe_plan, + statement_logging_id, + tx, + }) + .await?; + Ok(Some(response)) + } Execution::CopyToS3 { global_lir_plan, source_ids, @@ -1514,6 +1639,19 @@ impl PeekClient { // No read holds assertions needed for EXPLAIN variants return; } + Execution::Subscribe { df_desc, .. } => { + let as_of = df_desc + .as_of + .clone() + .expect("dataflow has an as_of") + .into_element(); + ( + df_desc.source_imports.keys().cloned().collect(), + df_desc.index_imports.keys().cloned().collect(), + as_of, + "Subscribe", + ) + } }; // Assert that we have some read holds for all the imports of the dataflow. @@ -1583,6 +1721,12 @@ enum Execution { plan_insights_optimizer_trace: Option, insights_ctx: Option>, }, + Subscribe { + subscribe_plan: SubscribePlan, + df_desc: DataflowDescription, + df_meta: DataflowMetainfo, + optimization_finished_at: EpochMillis, + }, CopyToS3 { global_lir_plan: optimize::copy_to::GlobalLirPlan, source_ids: BTreeSet, diff --git a/src/adapter/src/optimize/subscribe.rs b/src/adapter/src/optimize/subscribe.rs index 0d83362114737..69b0117356de7 100644 --- a/src/adapter/src/optimize/subscribe.rs +++ b/src/adapter/src/optimize/subscribe.rs @@ -17,11 +17,10 @@ use differential_dataflow::lattice::Lattice; use mz_compute_types::ComputeInstanceId; use mz_compute_types::plan::Plan; use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection}; -use mz_ore::collections::CollectionExt; use mz_ore::soft_assert_or_log; use mz_repr::{GlobalId, Timestamp}; use mz_sql::optimizer_metrics::OptimizerMetrics; -use mz_sql::plan::{HirToMirConfig, SubscribeFrom, SubscribePlan}; +use mz_sql::plan::SubscribeFrom; use mz_transform::TransformCtx; use mz_transform::dataflow::{DataflowMetainfo, optimize_dataflow_snapshot}; use mz_transform::normalize_lets::normalize_lets; @@ -124,7 +123,7 @@ impl Optimizer { /// 3. jointly optimizing the `MIR` plans in the [`MirDataflowDescription`]. #[derive(Clone, Debug)] pub struct GlobalMirPlan { - df_desc: MirDataflowDescription, + pub df_desc: MirDataflowDescription, df_meta: DataflowMetainfo, phantom: PhantomData, } @@ -153,21 +152,6 @@ impl GlobalLirPlan { pub fn sink_id(&self) -> GlobalId { self.df_desc.sink_id() } - - pub fn as_of(&self) -> Option { - self.df_desc.as_of.clone().map(|as_of| as_of.into_element()) - } - - /// Returns the description of the dataflow's sink export. - /// - /// # Panics - /// - /// Panics if the dataflow has no sink exports or has more than one. - pub fn sink_desc(&self) -> &ComputeSinkDesc { - let sink_exports = &self.df_desc.sink_exports; - let sink_desc = sink_exports.values().into_element(); - sink_desc - } } /// Marker type for [`GlobalMirPlan`] structs representing an optimization From 63395cae2f0dc956ae6c71631126beb54adff1b5 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 27 Mar 2026 17:33:01 -0400 Subject: [PATCH 07/13] Revert "Subscribe threads HIR now" This reverts commit ac5ec9a812c85458908a3b2139b63d4b07231fc5. --- src/adapter/src/coord/catalog_serving.rs | 15 ++++++++------- src/adapter/src/optimize/subscribe.rs | 5 +---- src/sql/src/plan.rs | 13 ++++++------- src/sql/src/plan/statement/dml.rs | 22 ++++------------------ 4 files changed, 19 insertions(+), 36 deletions(-) diff --git a/src/adapter/src/coord/catalog_serving.rs b/src/adapter/src/coord/catalog_serving.rs index 0fb8ae2ceb50f..aa3a92b5c4b2a 100644 --- a/src/adapter/src/coord/catalog_serving.rs +++ b/src/adapter/src/coord/catalog_serving.rs @@ -27,7 +27,6 @@ use mz_sql::plan::{ SubscribePlan, }; use smallvec::SmallVec; -use std::collections::BTreeSet; use crate::AdapterError; use crate::catalog::ConnCatalog; @@ -48,7 +47,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>( plan.from.depends_on(), match &plan.from { SubscribeFrom::Id(_) => false, - SubscribeFrom::Query { select, .. } => select.source.could_run_expensive_function(), + SubscribeFrom::Query { expr, desc: _ } => expr.could_run_expensive_function(), }, ) }; @@ -235,16 +234,18 @@ pub fn check_cluster_restrictions( // // Note: Creating other objects like Materialized Views is prevented elsewhere. We define the // 'mz_catalog_server' cluster to be "read-only", which restricts these actions. - let depends_on: BTreeSet = match plan { - Plan::ReadThenWrite(plan) => plan.selection.depends_on(), - Plan::Subscribe(plan) => plan.from.depends_on(), - Plan::Select(plan) => plan.source.depends_on(), + let depends_on: Box> = match plan { + Plan::ReadThenWrite(plan) => Box::new(plan.selection.depends_on().into_iter()), + Plan::Subscribe(plan) => match plan.from { + SubscribeFrom::Id(id) => Box::new(std::iter::once(id)), + SubscribeFrom::Query { ref expr, .. } => Box::new(expr.depends_on().into_iter()), + }, + Plan::Select(plan) => Box::new(plan.source.depends_on().into_iter()), _ => return Ok(()), }; // Collect any items that are not allowed to be run on the catalog server cluster. let unallowed_dependents: SmallVec<[String; 2]> = depends_on - .into_iter() .filter_map(|id| { let item = catalog.get_item_by_global_id(&id); let full_name = catalog.resolve_full_name(item.name()); diff --git a/src/adapter/src/optimize/subscribe.rs b/src/adapter/src/optimize/subscribe.rs index 69b0117356de7..ac220041e663c 100644 --- a/src/adapter/src/optimize/subscribe.rs +++ b/src/adapter/src/optimize/subscribe.rs @@ -206,7 +206,7 @@ impl Optimize for Optimizer { }; df_desc.export_sink(self.sink_id, sink_description); } - SubscribeFrom::Query { select, desc } => { + SubscribeFrom::Query { expr, desc } => { // TODO: Change the `expr` type to be `HirRelationExpr` and run // HIR ⇒ MIR lowering and decorrelation here. This would allow // us implement something like `EXPLAIN RAW PLAN FOR SUBSCRIBE.` @@ -222,9 +222,6 @@ impl Optimize for Optimizer { Some(&mut self.metrics), Some(self.view_id), ); - let expr = select - .source - .lower(HirToMirConfig::from(&self.config), None)?; let expr = optimize_mir_local(expr, &mut transform_ctx)?; df_builder.import_view_into_dataflow( diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index e1458c2deaf6d..a44770aa455e6 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -37,7 +37,9 @@ use ipnet::IpNet; use maplit::btreeset; use mz_adapter_types::compaction::CompactionWindow; use mz_controller_types::{ClusterId, ReplicaId}; -use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing}; +use mz_expr::{ + CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing, +}; use mz_ore::now::{self, NOW_ZERO}; use mz_pgcopy::CopyFormatParams; use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem}; @@ -925,7 +927,7 @@ pub enum SubscribeFrom { Id(GlobalId), /// Query to subscribe to. Query { - select: SelectPlan, + expr: MirRelationExpr, desc: RelationDesc, }, } @@ -934,17 +936,14 @@ impl SubscribeFrom { pub fn depends_on(&self) -> BTreeSet { match self { SubscribeFrom::Id(id) => BTreeSet::from([*id]), - SubscribeFrom::Query { select, .. } => select.source.depends_on(), + SubscribeFrom::Query { expr, .. } => expr.depends_on(), } } pub fn contains_temporal(&self) -> bool { match self { SubscribeFrom::Id(_) => false, - SubscribeFrom::Query { select, .. } => select - .source - .contains_temporal() - .expect("Unexpected error in `visit_scalars` call"), + SubscribeFrom::Query { expr, .. } => expr.contains_temporal(), } } } diff --git a/src/sql/src/plan/statement/dml.rs b/src/sql/src/plan/statement/dml.rs index 53ad5574642d8..7bebe89216638 100644 --- a/src/sql/src/plan/statement/dml.rs +++ b/src/sql/src/plan/statement/dml.rs @@ -16,7 +16,6 @@ use std::borrow::Cow; use std::collections::{BTreeMap, BTreeSet}; use itertools::Itertools; - use mz_arrow_util::builder::ArrowBuilder; use mz_expr::RowSetFinishing; use mz_expr::visit::Visit; @@ -1620,8 +1619,6 @@ pub fn plan_subscribe( params: &Params, copy_to: Option, ) -> Result { - let when = query::plan_as_of(scx, as_of)?; - let (from, desc, scope) = match relation { SubscribeRelation::Name(name) => { let item = scx.get_item_by_resolved_name(&name)?; @@ -1653,7 +1650,7 @@ pub fn plan_subscribe( } = query::plan_root_query(scx, query, QueryLifetime::Subscribe)?; expr.bind_parameters(scx, QueryLifetime::Subscribe, params)?; let query = query::PlannedRootQuery { - expr, + expr: expr.lower(scx.catalog.system_vars(), None)?, desc, finishing, scope, @@ -1665,23 +1662,11 @@ pub fn plan_subscribe( &query.finishing, query.desc.arity() )); - let finishing = RowSetFinishing { - order_by: vec![], - limit: None, - offset: 0, - project: query.finishing.project, - }; let desc = query.desc.clone(); ( SubscribeFrom::Query { - select: SelectPlan { - select: None, - source: query.expr, - when: when.clone(), - finishing, - copy_to: None, - }, - desc: desc.clone(), + expr: query.expr, + desc: query.desc, }, desc, query.scope, @@ -1689,6 +1674,7 @@ pub fn plan_subscribe( } }; + let when = query::plan_as_of(scx, as_of)?; let up_to = up_to .map(|up_to| plan_as_of_or_up_to(scx, up_to)) .transpose()?; From a85d99206b14684b0ce78d73ac2b30f7be248a35 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 27 Mar 2026 17:30:43 -0400 Subject: [PATCH 08/13] Don't assume a select in the new frontend query path --- src/adapter/src/frontend_peek.rs | 128 ++++++++++++++++--------------- 1 file changed, 65 insertions(+), 63 deletions(-) diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index e6456ee6a6268..d5d3c763e193c 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -15,7 +15,7 @@ use itertools::Itertools; use mz_compute_types::ComputeInstanceId; use mz_compute_types::dataflows::DataflowDescription; use mz_controller_types::ClusterId; -use mz_expr::{CollectionPlan, ResultSpec}; +use mz_expr::{CollectionPlan, ResultSpec, RowSetFinishing}; use mz_ore::cast::{CastFrom, CastLossy}; use mz_ore::collections::CollectionExt; use mz_ore::now::EpochMillis; @@ -26,8 +26,10 @@ use mz_repr::role_id::RoleId; use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp}; use mz_sql::ast::Raw; use mz_sql::catalog::CatalogCluster; -use mz_sql::plan::{self, Explainee, ExplaineeStatement, Plan, QueryWhen, SubscribePlan}; -use mz_sql::plan::{Params, SubscribeFrom}; +use mz_sql::plan::Params; +use mz_sql::plan::{ + self, Explainee, ExplaineeStatement, Plan, QueryWhen, SelectPlan, SubscribePlan, +}; use mz_sql::rbac; use mz_sql::session::metadata::SessionMetadata; use mz_sql::session::vars::IsolationLevel; @@ -185,11 +187,7 @@ impl PeekClient { } } Statement::Subscribe(subscribe) => { - if let SubscribeRelation::Query(_) = &subscribe.relation { - // We have a select statement to process; continue. - } else { - return Ok(None); - } + // We have a subscribe statement to process; continue. } _ => { debug!( @@ -329,13 +327,13 @@ impl PeekClient { let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?; /// What do we do with the result of the select? - enum OutputContext { - Default, - CopyTo(CopyToContext), - Subscribe { plan: SubscribePlan }, + enum QueryPlan<'a> { + Select(&'a SelectPlan), + CopyTo(&'a SelectPlan, CopyToContext), + Subscribe(&'a SubscribePlan), } - let (select_plan, explain_ctx, output_ctx) = match &plan { + let (query_plan, explain_ctx) = match &plan { Plan::Select(select_plan) => { let explain_ctx = if session.vars().emit_plan_insights_notice() { let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths()); @@ -343,14 +341,13 @@ impl PeekClient { } else { ExplainContext::None }; - (select_plan, explain_ctx, OutputContext::Default) + (QueryPlan::Select(select_plan), explain_ctx) } Plan::ShowColumns(show_columns_plan) => { // ShowColumns wraps a SelectPlan, extract it and proceed as normal. ( - &show_columns_plan.select_plan, + QueryPlan::Select(&show_columns_plan.select_plan), ExplainContext::None, - OutputContext::Default, ) } Plan::ExplainPlan(plan::ExplainPlanPlan { @@ -370,7 +367,7 @@ impl PeekClient { desc: Some(desc.clone()), optimizer_trace, }); - (plan, explain_ctx, OutputContext::Default) + (QueryPlan::Select(plan), explain_ctx) } // COPY TO S3 Plan::CopyTo(plan::CopyToPlan { @@ -396,9 +393,8 @@ impl PeekClient { }; ( - select_plan, + QueryPlan::CopyTo(select_plan, copy_to_ctx), ExplainContext::None, - OutputContext::CopyTo(copy_to_ctx), ) } Plan::ExplainPushdown(plan::ExplainPushdownPlan { explainee }) => { @@ -410,7 +406,7 @@ impl PeekClient { desc: _, }) => { let explain_ctx = ExplainContext::Pushdown; - (plan, explain_ctx, OutputContext::Default) + (QueryPlan::Select(plan), explain_ctx) } _ => { // This shouldn't happen because we already checked for this at the AST @@ -440,27 +436,7 @@ impl PeekClient { .await?; return Ok(Some(response)); } - Plan::Subscribe(subscribe) => { - let select = match &subscribe.from { - SubscribeFrom::Id(_) => { - // This shouldn't happen because we already checked for this at the AST - // level before calling `try_frontend_peek_inner`. - soft_panic_or_log!( - "Unexpected plan kind in frontend peek sequencing: {:?}", - subscribe - ); - return Ok(None); - } - SubscribeFrom::Query { select, .. } => select, - }; - ( - select, - ExplainContext::None, - OutputContext::Subscribe { - plan: subscribe.clone(), - }, - ) - } + Plan::Subscribe(subscribe) => (QueryPlan::Subscribe(subscribe), ExplainContext::None), _ => { // This shouldn't happen because we already checked for this at the AST // level before calling `try_frontend_peek_inner`. @@ -475,6 +451,24 @@ impl PeekClient { } }; + let when = match query_plan { + QueryPlan::Select(s) => &s.when, + QueryPlan::CopyTo(s, _) => &s.when, + QueryPlan::Subscribe(s) => &s.when, + }; + + let depends_on = match query_plan { + QueryPlan::Select(s) => s.source.depends_on(), + QueryPlan::CopyTo(s, _) => s.source.depends_on(), + QueryPlan::Subscribe(s) => s.from.depends_on(), + }; + + let contains_temporal = match query_plan { + QueryPlan::Select(s) => s.source.contains_temporal(), + QueryPlan::CopyTo(s, _) => s.source.contains_temporal(), + QueryPlan::Subscribe(s) => Ok(s.from.contains_temporal()), + }; + // # From sequence_plan // We have checked the plan kind above. @@ -558,14 +552,12 @@ impl PeekClient { }) .transpose()?; - let source_ids = select_plan.source.depends_on(); + let source_ids = depends_on; // TODO(peek-seq): validate_timeline_context can be expensive in real scenarios (not in // simple benchmarks), because it traverses transitive dependencies even of indexed views and // materialized views (also traversing their MIR plans). let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?; - if matches!(timeline_context, TimelineContext::TimestampIndependent) - && select_plan.source.contains_temporal()? - { + if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal? { // If the source IDs are timestamp independent but the query contains temporal functions, // then the timeline context needs to be upgraded to timestamp dependent. This is // required because `source_ids` doesn't contain functions. @@ -586,7 +578,7 @@ impl PeekClient { let isolation_level = session.vars().transaction_isolation().clone(); let timeline = Coordinator::get_timeline(&timeline_context); let needs_linearized_read_ts = - Coordinator::needs_linearized_read_ts(&isolation_level, &select_plan.when); + Coordinator::needs_linearized_read_ts(&isolation_level, when); let oracle_read_ts = match timeline { Some(timeline) if needs_linearized_read_ts => { @@ -633,10 +625,8 @@ impl PeekClient { // peek sequencing still does a timedomain validation. The new peek sequencing does not do // timedomain validation for AS OF queries, which seems more natural. But I'm thinking that // it would be the cleanest to just simply disallow AS OF queries inside transactions. - let in_immediate_multi_stmt_txn = session - .transaction() - .in_immediate_multi_stmt_txn(&select_plan.when) - && !matches!(output_ctx, OutputContext::Subscribe { .. }); + let in_immediate_multi_stmt_txn = session.transaction().in_immediate_multi_stmt_txn(when) + && !matches!(query_plan, QueryPlan::Subscribe { .. }); // Fetch or generate a timestamp for this query and fetch or acquire read holds. let (determination, read_holds) = match session.get_transaction_timestamp_determination() { @@ -721,7 +711,7 @@ impl PeekClient { .frontend_determine_timestamp( session, determine_bundle, - &select_plan.when, + when, target_cluster_id, &timeline_context, oracle_read_ts, @@ -802,9 +792,9 @@ impl PeekClient { // depend on whether or not reads have occurred in the txn. let requires_linearization = (&explain_ctx).into(); let mut transaction_determination = determination.clone(); - if matches!(output_ctx, OutputContext::Subscribe { .. }) { + if matches!(query_plan, QueryPlan::Subscribe { .. }) { session.add_transaction_ops(TransactionOps::Subscribe)?; - } else if select_plan.when.is_transactional() { + } else if when.is_transactional() { session.add_transaction_ops(TransactionOps::Peeks { determination: transaction_determination, cluster_id: target_cluster_id, @@ -838,7 +828,6 @@ impl PeekClient { let timestamp_context = determination.timestamp_context.clone(); let session_meta = session.meta(); let now = catalog.config().now.clone(); - let select_plan = select_plan.clone(); let target_cluster_name = target_cluster_name.clone(); let needs_plan_insights = explain_ctx.needs_plan_insights(); let determination_for_pushdown = if matches!(explain_ctx, ExplainContext::Pushdown) { @@ -866,10 +855,11 @@ impl PeekClient { } let source_ids_for_closure = source_ids.clone(); - let raw_expr = select_plan.source.clone(); - let optimization_future: JoinHandle> = match output_ctx { - OutputContext::CopyTo(mut copy_to_ctx) => { + let optimization_future: JoinHandle> = match query_plan { + QueryPlan::CopyTo(select_plan, mut copy_to_ctx) => { + let raw_expr = select_plan.source.clone(); + // COPY TO path: calculate output_batch_count and create copy_to optimizer let worker_counts = cluster.replicas().map(|r| { let loc = &r.config.location; @@ -922,7 +912,10 @@ impl PeekClient { }, ) } - OutputContext::Default => { + QueryPlan::Select(select_plan) => { + let select_plan = select_plan.clone(); + let raw_expr = select_plan.source.clone(); + // SELECT/EXPLAIN path: create peek optimizer let mut optimizer = optimize::peek::Optimizer::new( Arc::clone(&catalog), @@ -1052,6 +1045,8 @@ impl PeekClient { global_lir_plan, optimization_finished_at, plan_insights_optimizer_trace: None, + finishing: select_plan.finishing, + copy_to: select_plan.copy_to, insights_ctx: None, }), ExplainContext::PlanInsightsNotice(optimizer_trace) => { @@ -1060,6 +1055,8 @@ impl PeekClient { global_lir_plan, optimization_finished_at, plan_insights_optimizer_trace: Some(optimizer_trace), + finishing: select_plan.finishing, + copy_to: select_plan.copy_to, insights_ctx, }) } @@ -1089,7 +1086,8 @@ impl PeekClient { }, ) } - OutputContext::Subscribe { plan } => { + QueryPlan::Subscribe(plan) => { + let plan = plan.clone(); let catalog: Arc = Arc::clone(&catalog); let mut optimizer = optimize::subscribe::Optimizer::new( catalog, @@ -1232,6 +1230,8 @@ impl PeekClient { global_lir_plan, optimization_finished_at: _optimization_finished_at, plan_insights_optimizer_trace, + finishing, + copy_to, insights_ctx, } => { // Continue with normal execution @@ -1255,7 +1255,7 @@ impl PeekClient { .into_plan_insights( &features, &catalog.for_session(session), - Some(select_plan.finishing.clone()), + Some(finishing.clone()), Some(target_cluster), df_meta.clone(), insights_ctx, @@ -1322,7 +1322,7 @@ impl PeekClient { self.implement_fast_path_peek_plan( fast_path_plan, determination.timestamp_context.timestamp_or_default(), - select_plan.finishing, + finishing, target_cluster_id, target_replica, typ, @@ -1346,7 +1346,7 @@ impl PeekClient { self.call_coordinator(|tx| Command::ExecuteSlowPathPeek { dataflow_plan: Box::new(dataflow_plan), determination, - finishing: select_plan.finishing, + finishing, compute_instance: target_cluster_id, target_replica, intermediate_result_type: typ, @@ -1376,7 +1376,7 @@ impl PeekClient { session.add_notice(AdapterNotice::QueryTimestamp { explanation }); } - Ok(Some(match select_plan.copy_to { + Ok(Some(match copy_to { None => response, // COPY TO STDOUT Some(format) => ExecuteResponse::CopyTo { @@ -1719,6 +1719,8 @@ enum Execution { global_lir_plan: optimize::peek::GlobalLirPlan, optimization_finished_at: EpochMillis, plan_insights_optimizer_trace: Option, + finishing: RowSetFinishing, + copy_to: Option, insights_ctx: Option>, }, Subscribe { From 0f9a579add6845ffe5951d464d9ba93776595bc7 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Mon, 30 Mar 2026 10:55:34 -0400 Subject: [PATCH 09/13] Move lowering into optimization for subscribe This moves us more in line with how selects work without actually inlining a select plan. --- src/adapter/src/optimize/subscribe.rs | 4 +++- src/sql/src/plan.rs | 10 +++++----- src/sql/src/plan/statement/dml.rs | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/adapter/src/optimize/subscribe.rs b/src/adapter/src/optimize/subscribe.rs index ac220041e663c..e3de269c9eb79 100644 --- a/src/adapter/src/optimize/subscribe.rs +++ b/src/adapter/src/optimize/subscribe.rs @@ -20,7 +20,7 @@ use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeS use mz_ore::soft_assert_or_log; use mz_repr::{GlobalId, Timestamp}; use mz_sql::optimizer_metrics::OptimizerMetrics; -use mz_sql::plan::SubscribeFrom; +use mz_sql::plan::{HirToMirConfig, SubscribeFrom}; use mz_transform::TransformCtx; use mz_transform::dataflow::{DataflowMetainfo, optimize_dataflow_snapshot}; use mz_transform::normalize_lets::normalize_lets; @@ -222,6 +222,8 @@ impl Optimize for Optimizer { Some(&mut self.metrics), Some(self.view_id), ); + + let expr = expr.lower(HirToMirConfig::from(&self.config), None)?; let expr = optimize_mir_local(expr, &mut transform_ctx)?; df_builder.import_view_into_dataflow( diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index a44770aa455e6..daca6ddee91d0 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -37,9 +37,7 @@ use ipnet::IpNet; use maplit::btreeset; use mz_adapter_types::compaction::CompactionWindow; use mz_controller_types::{ClusterId, ReplicaId}; -use mz_expr::{ - CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing, -}; +use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing}; use mz_ore::now::{self, NOW_ZERO}; use mz_pgcopy::CopyFormatParams; use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem}; @@ -927,7 +925,7 @@ pub enum SubscribeFrom { Id(GlobalId), /// Query to subscribe to. Query { - expr: MirRelationExpr, + expr: HirRelationExpr, desc: RelationDesc, }, } @@ -943,7 +941,9 @@ impl SubscribeFrom { pub fn contains_temporal(&self) -> bool { match self { SubscribeFrom::Id(_) => false, - SubscribeFrom::Query { expr, .. } => expr.contains_temporal(), + SubscribeFrom::Query { expr, .. } => expr + .contains_temporal() + .expect("Unexpected error in `visit_scalars` call"), } } } diff --git a/src/sql/src/plan/statement/dml.rs b/src/sql/src/plan/statement/dml.rs index 7bebe89216638..eb0e8289468c8 100644 --- a/src/sql/src/plan/statement/dml.rs +++ b/src/sql/src/plan/statement/dml.rs @@ -1650,7 +1650,7 @@ pub fn plan_subscribe( } = query::plan_root_query(scx, query, QueryLifetime::Subscribe)?; expr.bind_parameters(scx, QueryLifetime::Subscribe, params)?; let query = query::PlannedRootQuery { - expr: expr.lower(scx.catalog.system_vars(), None)?, + expr, desc, finishing, scope, From f1d1e9efd92f308cf147c60c9a60375b2b5920aa Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Mon, 30 Mar 2026 11:09:33 -0400 Subject: [PATCH 10/13] Restore a is_transactional guard for subscribes --- src/adapter/src/frontend_peek.rs | 41 +++++++++++++++++++------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index d5d3c763e193c..930dca424a578 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -792,23 +792,30 @@ impl PeekClient { // depend on whether or not reads have occurred in the txn. let requires_linearization = (&explain_ctx).into(); let mut transaction_determination = determination.clone(); - if matches!(query_plan, QueryPlan::Subscribe { .. }) { - session.add_transaction_ops(TransactionOps::Subscribe)?; - } else if when.is_transactional() { - session.add_transaction_ops(TransactionOps::Peeks { - determination: transaction_determination, - cluster_id: target_cluster_id, - requires_linearization, - })?; - } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) { - // If the query uses AS OF, then ignore the timestamp. - transaction_determination.timestamp_context = TimestampContext::NoTimestamp; - session.add_transaction_ops(TransactionOps::Peeks { - determination: transaction_determination, - cluster_id: target_cluster_id, - requires_linearization, - })?; - }; + match query_plan { + QueryPlan::Subscribe { .. } => { + if when.is_transactional() { + session.add_transaction_ops(TransactionOps::Subscribe)?; + } + } + QueryPlan::Select(..) | QueryPlan::CopyTo(..) => { + if when.is_transactional() { + session.add_transaction_ops(TransactionOps::Peeks { + determination: transaction_determination, + cluster_id: target_cluster_id, + requires_linearization, + })?; + } else if matches!(session.transaction(), &TransactionStatus::InTransaction(_)) { + // If the query uses AS OF, then ignore the timestamp. + transaction_determination.timestamp_context = TimestampContext::NoTimestamp; + session.add_transaction_ops(TransactionOps::Peeks { + determination: transaction_determination, + cluster_id: target_cluster_id, + requires_linearization, + })?; + } + } + } // # From peek_optimize From e2fb5507f8746e4e823ac778a13086da8c7166d4 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Mon, 30 Mar 2026 11:09:33 -0400 Subject: [PATCH 11/13] Allow copy-to subscribes in the frontend path --- src/adapter/src/frontend_peek.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 930dca424a578..9c6a760d1d261 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -169,14 +169,7 @@ impl PeekClient { Statement::Copy(copy_stmt) => { match ©_stmt.direction { CopyDirection::To => { - // Check for SUBSCRIBE inside COPY TO - we don't handle Plan::Subscribe - if matches!(©_stmt.relation, CopyRelation::Subscribe(_)) { - debug!( - "Bailing out from try_frontend_peek, because COPY (SUBSCRIBE ...) TO is not supported" - ); - return Ok(None); - } - // This is COPY TO (SELECT), continue + // This is COPY TO (...), continue } CopyDirection::From => { debug!( From 986141a1a46094810f7db03a7399d4059cc5515c Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Mon, 30 Mar 2026 13:01:03 -0400 Subject: [PATCH 12/13] Fix typos and other minor issues --- src/adapter/src/frontend_peek.rs | 8 +++----- src/adapter/src/optimize/subscribe.rs | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 9c6a760d1d261..f777ea73b7bf3 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -886,7 +886,7 @@ impl PeekClient { ); mz_ore::task::spawn_blocking( - || "optimize peek", + || "optimize copy-to", move || { span.in_scope(|| { let _dispatch_guard = explain_ctx.dispatch_guard(); @@ -1089,6 +1089,7 @@ impl PeekClient { QueryPlan::Subscribe(plan) => { let plan = plan.clone(); let catalog: Arc = Arc::clone(&catalog); + let debug_name = format!("subscribe-{}", index_id); let mut optimizer = optimize::subscribe::Optimizer::new( catalog, compute_instance_snapshot.clone(), @@ -1096,7 +1097,7 @@ impl PeekClient { index_id, plan.with_snapshot, plan.up_to, - "TODO".to_string(), + debug_name, optimizer_config, self.optimizer_metrics.clone(), ); @@ -1106,13 +1107,10 @@ impl PeekClient { span.in_scope(|| { let _dispatch_guard = explain_ctx.dispatch_guard(); - // SELECT/EXPLAIN path - // HIR ⇒ MIR lowering and MIR optimization (local) let global_mir_plan = optimizer.catch_unwind_optimize(plan.from.clone())?; let as_of = timestamp_context.timestamp_or_default(); - // Attach resolved context required to continue the pipeline. if let Some(up_to) = optimizer.up_to() { if as_of > up_to { return Err(AdapterError::AbsurdSubscribeBounds { diff --git a/src/adapter/src/optimize/subscribe.rs b/src/adapter/src/optimize/subscribe.rs index e3de269c9eb79..996f7f8955ad5 100644 --- a/src/adapter/src/optimize/subscribe.rs +++ b/src/adapter/src/optimize/subscribe.rs @@ -123,7 +123,7 @@ impl Optimizer { /// 3. jointly optimizing the `MIR` plans in the [`MirDataflowDescription`]. #[derive(Clone, Debug)] pub struct GlobalMirPlan { - pub df_desc: MirDataflowDescription, + df_desc: MirDataflowDescription, df_meta: DataflowMetainfo, phantom: PhantomData, } From 155d3213e04567d1f8e44e454b76fe388e5ddf8a Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Mon, 30 Mar 2026 13:08:26 -0400 Subject: [PATCH 13/13] Guard frontend subscribes behind a flag --- misc/python/materialize/mzcompose/__init__.py | 5 +++++ misc/python/materialize/parallel_workload/action.py | 4 ++++ src/adapter-types/src/dyncfgs.rs | 8 ++++++++ src/adapter/src/frontend_peek.rs | 10 ++++++---- 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 69133b985f1fa..b608d89af346b 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -207,6 +207,11 @@ def get_variable_system_parameters( "true" if version >= MzVersion.parse_mz("v26.9.0-dev") else "false", ["true", "false"], ), + VariableSystemParameter( + "enable_frontend_subscribes", + "true" if version >= MzVersion.parse_mz("v26.18.0-dev") else "false", + ["true", "false"], + ), VariableSystemParameter( "default_timestamp_interval", "1s", diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index f6ada163aa824..8eff452ab0736 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -1566,6 +1566,10 @@ def __init__( "true", "false", ] + self.flags_with_values["enable_frontend_subscribes"] = [ + "true", + "false", + ] self.flags_with_values["enable_case_literal_transform"] = BOOLEAN_FLAG_VALUES self.flags_with_values["enable_cast_elimination"] = BOOLEAN_FLAG_VALUES diff --git a/src/adapter-types/src/dyncfgs.rs b/src/adapter-types/src/dyncfgs.rs index 1f67aa57290b5..d36c3e55e8607 100644 --- a/src/adapter-types/src/dyncfgs.rs +++ b/src/adapter-types/src/dyncfgs.rs @@ -77,6 +77,13 @@ pub const ENABLE_INTROSPECTION_SUBSCRIBES: Config = Config::new( "Enable installation of introspection subscribes.", ); +/// Enable sending subscribes down the new frontend-peek path. +pub const ENABLE_FRONTEND_SUBSCRIBES: Config = Config::new( + "enable_frontend_subscribes", + true, + "Enable sending subscribes down the new frontend-peek path.", +); + /// The plan insights notice will not investigate fast path clusters if plan optimization took longer than this. pub const PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION: Config = Config::new( "plan_insights_notice_fast_path_clusters_optimize_duration", @@ -211,6 +218,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK) .add(&ENABLE_STATEMENT_LIFECYCLE_LOGGING) .add(&ENABLE_INTROSPECTION_SUBSCRIBES) + .add(&ENABLE_FRONTEND_SUBSCRIBES) .add(&PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION) .add(&ENABLE_CONTINUAL_TASK_BUILTINS) .add(&ENABLE_EXPRESSION_CACHE) diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index f777ea73b7bf3..da26a83b4ba85 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -12,6 +12,7 @@ use std::collections::BTreeSet; use std::sync::Arc; use itertools::Itertools; +use mz_adapter_types::dyncfgs::ENABLE_FRONTEND_SUBSCRIBES; use mz_compute_types::ComputeInstanceId; use mz_compute_types::dataflows::DataflowDescription; use mz_controller_types::ClusterId; @@ -33,9 +34,7 @@ use mz_sql::plan::{ use mz_sql::rbac; use mz_sql::session::metadata::SessionMetadata; use mz_sql::session::vars::IsolationLevel; -use mz_sql_parser::ast::{ - CopyDirection, ExplainStage, ShowStatement, Statement, SubscribeRelation, -}; +use mz_sql_parser::ast::{CopyDirection, ExplainStage, ShowStatement, Statement}; use mz_transform::EmptyStatisticsOracle; use mz_transform::dataflow::DataflowMetainfo; use opentelemetry::trace::TraceContextExt; @@ -179,7 +178,10 @@ impl PeekClient { } } } - Statement::Subscribe(subscribe) => { + + Statement::Subscribe(_) + if ENABLE_FRONTEND_SUBSCRIBES.get(catalog.system_config().dyncfgs()) => + { // We have a subscribe statement to process; continue. } _ => {