Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ def get_variable_system_parameters(
"true" if version >= MzVersion.parse_mz("v26.9.0-dev") else "false",
["true", "false"],
),
VariableSystemParameter(
"enable_frontend_subscribes",
"true" if version >= MzVersion.parse_mz("v26.18.0-dev") else "false",
["true", "false"],
),
VariableSystemParameter(
"default_timestamp_interval",
"1s",
Expand Down
4 changes: 4 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,10 @@ def __init__(
"true",
"false",
]
self.flags_with_values["enable_frontend_subscribes"] = [
"true",
"false",
]
self.flags_with_values["enable_case_literal_transform"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_cast_elimination"] = BOOLEAN_FLAG_VALUES

Expand Down
8 changes: 8 additions & 0 deletions src/adapter-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ pub const ENABLE_INTROSPECTION_SUBSCRIBES: Config<bool> = Config::new(
"Enable installation of introspection subscribes.",
);

/// Enable sending subscribes down the new frontend-peek path.
pub const ENABLE_FRONTEND_SUBSCRIBES: Config<bool> = Config::new(
"enable_frontend_subscribes",
true,
"Enable sending subscribes down the new frontend-peek path.",
);

/// The plan insights notice will not investigate fast path clusters if plan optimization took longer than this.
pub const PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION: Config<Duration> = Config::new(
"plan_insights_notice_fast_path_clusters_optimize_duration",
Expand Down Expand Up @@ -211,6 +218,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&ENABLE_0DT_CAUGHT_UP_REPLICA_STATUS_CHECK)
.add(&ENABLE_STATEMENT_LIFECYCLE_LOGGING)
.add(&ENABLE_INTROSPECTION_SUBSCRIBES)
.add(&ENABLE_FRONTEND_SUBSCRIBES)
.add(&PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION)
.add(&ENABLE_CONTINUAL_TASK_BUILTINS)
.add(&ENABLE_EXPRESSION_CACHE)
Expand Down
11 changes: 1 addition & 10 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,7 @@ impl SessionClient {
| Command::GetTransactionReadHoldsBundle { .. }
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteSubscribe { .. }
| Command::CopyToPreflight { .. }
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. }
Expand Down Expand Up @@ -1160,16 +1161,6 @@ impl SessionClient {
self.timeouts.recv().await
}

/// Returns a reference to the PeekClient used for frontend peek sequencing.
pub fn peek_client(&self) -> &PeekClient {
&self.peek_client
}

/// Returns a reference to the PeekClient used for frontend peek sequencing.
pub fn peek_client_mut(&mut self) -> &mut PeekClient {
&mut self.peek_client
}

/// Attempt to sequence a peek from the session task.
///
/// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
Expand Down
18 changes: 17 additions & 1 deletion src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType
use mz_sql::ast::{FetchDirection, Raw, Statement};
use mz_sql::catalog::ObjectType;
use mz_sql::optimizer_metrics::OptimizerMetrics;
use mz_sql::plan;
use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
use mz_sql::session::user::User;
use mz_sql::session::vars::{OwnedVarInput, SystemVars};
Expand All @@ -52,11 +53,11 @@ use crate::coord::timestamp_selection::TimestampDetermination;
use crate::coord::{ExecuteContextExtra, ExecuteContextGuard};
use crate::error::AdapterError;
use crate::session::{EndTransactionAction, RowBatchStream, Session};
use crate::statement_logging::WatchSetCreation;
use crate::statement_logging::{
FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
StatementLoggingFrontend,
};
use crate::statement_logging::{StatementLoggingId, WatchSetCreation};
use crate::util::Transmittable;
use crate::webhook::AppendWebhookResponse;
use crate::{
Expand Down Expand Up @@ -264,6 +265,19 @@ pub enum Command {
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
},

ExecuteSubscribe {
df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
dependency_ids: BTreeSet<GlobalId>,
cluster_id: ComputeInstanceId,
replica_id: Option<ReplicaId>,
conn_id: ConnectionId,
session_uuid: Uuid,
read_holds: ReadHolds<mz_repr::Timestamp>,
plan: plan::SubscribePlan,
statement_logging_id: Option<StatementLoggingId>,
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
},

/// Preflight check for COPY TO S3 operation. This runs the slow S3 operations
/// (loading SDK config, checking bucket path, verifying permissions, uploading sentinel)
/// in a background task to avoid blocking the coordinator.
Expand Down Expand Up @@ -364,6 +378,7 @@ impl Command {
| Command::GetTransactionReadHoldsBundle { .. }
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteSubscribe { .. }
| Command::CopyToPreflight { .. }
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. }
Expand Down Expand Up @@ -401,6 +416,7 @@ impl Command {
| Command::GetTransactionReadHoldsBundle { .. }
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteSubscribe { .. }
| Command::CopyToPreflight { .. }
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. }
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ impl Message {
}
Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
Command::ExecuteSubscribe { .. } => "execute-subscribe",
Command::CopyToPreflight { .. } => "copy-to-preflight",
Command::ExecuteCopyTo { .. } => "execute-copy-to",
Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
Expand Down
32 changes: 32 additions & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,38 @@ impl Coordinator {
let _ = tx.send(result);
}

Command::ExecuteSubscribe {
df_desc,
dependency_ids,
cluster_id,
replica_id,
conn_id,
session_uuid,
read_holds,
plan,
statement_logging_id,
tx,
} => {
let mut ctx_extra = ExecuteContextGuard::new(
statement_logging_id,
self.internal_cmd_tx.clone(),
);
let result = self
.implement_subscribe(
&mut ctx_extra,
df_desc,
dependency_ids,
cluster_id,
replica_id,
conn_id,
session_uuid,
read_holds,
plan,
)
.await;
let _ = tx.send(result);
}

Command::CopyToPreflight {
s3_sink_connection,
sink_id,
Expand Down
100 changes: 69 additions & 31 deletions src/adapter/src/coord/sequencer/inner/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,23 @@
// by the Apache License, Version 2.0.

use maplit::btreemap;
use mz_adapter_types::connection::ConnectionId;
use mz_cluster_client::ReplicaId;
use mz_compute_types::ComputeInstanceId;
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::Plan;
use mz_ore::collections::CollectionExt;
use mz_ore::instrument;
use mz_repr::GlobalId;
use mz_repr::explain::{ExprHumanizerExt, TransientItem};
use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
use mz_sql::plan::{self, QueryWhen, SubscribeFrom};
use mz_sql::session::metadata::SessionMetadata;
use std::collections::BTreeSet;
use timely::progress::Antichain;
use tokio::sync::mpsc;
use tracing::Span;
use uuid::Uuid;

use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
use crate::command::ExecuteResponse;
Expand All @@ -31,7 +39,9 @@ use crate::error::AdapterError;
use crate::explain::optimizer_trace::OptimizerTrace;
use crate::optimize::Optimize;
use crate::session::{Session, TransactionOps};
use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize};
use crate::{
AdapterNotice, ExecuteContext, ExecuteContextGuard, ReadHolds, TimelineContext, optimize,
};

impl Staged for SubscribeStage {
type Ctx = ExecuteContext;
Expand Down Expand Up @@ -428,42 +438,76 @@ impl Coordinator {
SubscribeFinish {
validity: _,
cluster_id,
plan:
plan::SubscribePlan {
copy_to,
emit_progress,
output,
..
},
plan,
global_lir_plan,
dependency_ids,
replica_id,
}: SubscribeFinish,
) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
let sink_id = global_lir_plan.sink_id();
let (df_desc, df_meta) = global_lir_plan.unapply();
emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
let conn_id = ctx.session.conn_id().clone();
let session_uuid = ctx.session().uuid();
let txn_read_holds = self
.txn_read_holds
.remove(&conn_id)
.expect("must have previously installed read holds");
let resp = self
.implement_subscribe(
ctx.extra_mut(),
df_desc,
dependency_ids,
cluster_id,
replica_id,
conn_id,
session_uuid,
txn_read_holds,
plan,
)
.await?;
Ok(StageResult::Response(resp))
}

#[instrument]
pub(crate) async fn implement_subscribe(
&mut self,
ctx_extra: &mut ExecuteContextGuard,
df_desc: DataflowDescription<Plan>,
dependency_ids: BTreeSet<GlobalId>,
cluster_id: ComputeInstanceId,
replica_id: Option<ReplicaId>,
conn_id: ConnectionId,
session_uuid: Uuid,
read_holds: ReadHolds<mz_repr::Timestamp>,
plan: plan::SubscribePlan,
) -> Result<ExecuteResponse, AdapterError> {
let sink_id = df_desc.sink_id();

let (tx, rx) = mpsc::unbounded_channel();
let active_subscribe = ActiveSubscribe {
conn_id: ctx.session().conn_id().clone(),
session_uuid: ctx.session().uuid(),
conn_id: conn_id.clone(),
session_uuid,
channel: tx,
emit_progress,
as_of: global_lir_plan
.as_of()
emit_progress: plan.emit_progress,
as_of: df_desc
.as_of
.as_ref()
.and_then(|t| t.as_option())
.copied()
.expect("set to Some in an earlier stage"),
arity: global_lir_plan.sink_desc().from_desc.arity(),
arity: df_desc
.sink_exports
.values()
.into_element()
.from_desc
.arity(),
cluster_id,
depends_on: dependency_ids,
start_time: self.now(),
output,
output: plan.output,
};
active_subscribe.initialize();

let (df_desc, df_meta) = global_lir_plan.unapply();

// Emit notices.
emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);

// Add metadata for the new SUBSCRIBE.
let write_notify_fut = self
.add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
Expand All @@ -475,28 +519,22 @@ impl Coordinator {
// requests to external services, which can take time, so we run them concurrently.
let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;

// Release the pre-optimization read holds because the controller is now handling those.
let txn_read_holds = self
.txn_read_holds
.remove(ctx.session().conn_id())
.expect("must have previously installed read holds");

// Explicitly drop read holds, just to make it obvious what's happening.
drop(txn_read_holds);
drop(read_holds);

let resp = ExecuteResponse::Subscribing {
rx,
ctx_extra: std::mem::take(ctx.extra_mut()),
ctx_extra: std::mem::take(ctx_extra),
instance_id: cluster_id,
};
let resp = match copy_to {
let resp = match plan.copy_to {
None => resp,
Some(format) => ExecuteResponse::CopyTo {
format,
resp: Box::new(resp),
},
};
Ok(StageResult::Response(resp))
Ok(resp)
}

#[instrument]
Expand Down
Loading
Loading