From f68ff096c9742a8e8ac63ff47846e5f2fb8555d5 Mon Sep 17 00:00:00 2001 From: Justin Bradfield Date: Wed, 6 May 2026 23:22:24 -0500 Subject: [PATCH] sql: Remove `enable_multi_replica_sources` feature flag This flag was already defaulting to `true` and is no longer needed. Removing it entirely eliminates the bug where `ALTER CLUSTER ... SET (SIZE, REPLICATION FACTOR 1) WITH (WAIT FOR ...)` could bypass the single-replica guard when the flag was set to `false`. Fixes SQL-212. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../expressionBuilders.test.sql.ts | 1 - .../source/sourceStatistics.test.sql.ts | 2 - misc/python/materialize/mzcompose/__init__.py | 1 - .../materialize/parallel_workload/action.py | 1 - src/adapter-types/src/dyncfgs.rs | 8 --- src/adapter/src/coord/sequencer/inner.rs | 47 +------------ src/sql/src/plan/error.rs | 29 -------- src/sql/src/plan/statement/ddl.rs | 68 +------------------ test/cloudtest/test_managed_cluster.py | 1 - 9 files changed, 2 insertions(+), 156 deletions(-) diff --git a/console/src/api/materialize/expressionBuilders.test.sql.ts b/console/src/api/materialize/expressionBuilders.test.sql.ts index bfee0002ead4b..fb2c917203eb3 100644 --- a/console/src/api/materialize/expressionBuilders.test.sql.ts +++ b/console/src/api/materialize/expressionBuilders.test.sql.ts @@ -86,7 +86,6 @@ describe("buildFirstReplicaSourceStatisticsTable", () => { await testdrive( ` $ postgres-execute connection=postgres://mz_system:materialize@\${testdrive.materialize-internal-sql-addr} - ALTER SYSTEM SET enable_multi_replica_sources = true ALTER SYSTEM SET default_timestamp_interval = 100 ALTER SYSTEM SET storage_statistics_collection_interval = 100 ALTER SYSTEM SET storage_statistics_interval = 200 diff --git a/console/src/api/materialize/source/sourceStatistics.test.sql.ts b/console/src/api/materialize/source/sourceStatistics.test.sql.ts index 3b69384575b93..2e8552a716cf3 100644 --- a/console/src/api/materialize/source/sourceStatistics.test.sql.ts +++ b/console/src/api/materialize/source/sourceStatistics.test.sql.ts @@ -79,8 +79,6 @@ describe("buildSourceStatisticsQuery", () => { await testdrive( ` $ postgres-execute connection=postgres://mz_system:materialize@\${testdrive.materialize-internal-sql-addr} - # Enable the feature flag to allow sources on multi-replica clusters - ALTER SYSTEM SET enable_multi_replica_sources = true ALTER SYSTEM SET default_timestamp_interval = 100 ALTER SYSTEM SET storage_statistics_collection_interval = 100 ALTER SYSTEM SET storage_statistics_interval = 200 diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 8cf37ccbd71c5..148610b97809d 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -92,7 +92,6 @@ def get_minimal_system_parameters( "enable_load_generator_counter": "true", "enable_logical_compaction_window": "true", "enable_multi_worker_storage_persist_sink": "true", - "enable_multi_replica_sources": "true", "enable_rbac_checks": "true", "enable_reduce_mfp_fusion": "true", "enable_refresh_every_mvs": "true", diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index 5709dae568719..4932eecb5f361 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -1787,7 +1787,6 @@ def __init__( "enable_introspection_subscribes", "plan_insights_notice_fast_path_clusters_optimize_duration", "enable_expression_cache", - "enable_multi_replica_sources", "enable_password_auth", "persist_fast_path_order", "enable_mcp_agent", diff --git a/src/adapter-types/src/dyncfgs.rs b/src/adapter-types/src/dyncfgs.rs index 6a15b943fe7d3..40abb760b140c 100644 --- a/src/adapter-types/src/dyncfgs.rs +++ b/src/adapter-types/src/dyncfgs.rs @@ -103,13 +103,6 @@ pub const ENABLE_EXPRESSION_CACHE: Config = Config::new( "Use a cache to store optimized expressions to help speed up start times.", ); -/// Whether we allow sources in multi-replica clusters. -pub const ENABLE_MULTI_REPLICA_SOURCES: Config = Config::new( - "enable_multi_replica_sources", - true, - "Enable multi-replica sources.", -); - /// Whether to enable password authentication. pub const ENABLE_PASSWORD_AUTH: Config = Config::new( "enable_password_auth", @@ -261,7 +254,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&ENABLE_FRONTEND_SUBSCRIBES) .add(&PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION) .add(&ENABLE_EXPRESSION_CACHE) - .add(&ENABLE_MULTI_REPLICA_SOURCES) .add(&ENABLE_PASSWORD_AUTH) .add(&OIDC_ISSUER) .add(&OIDC_AUDIENCE) diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 74edfa3266338..39609ffa84bb6 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -20,7 +20,7 @@ use futures::{Future, StreamExt, future}; use itertools::Itertools; use mz_adapter_types::compaction::CompactionWindow; use mz_adapter_types::connection::ConnectionId; -use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH}; +use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH; use mz_catalog::memory::error::ErrorKind; use mz_catalog::memory::objects::{ CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type, @@ -64,7 +64,6 @@ use mz_sql::plan::{ }; use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements}; use mz_storage_types::sinks::StorageSinkDesc; -use mz_storage_types::sources::GenericSourceConnection; // Import `plan` module, but only import select elements to avoid merge conflicts on use statements. use mz_sql::plan::{ AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan, @@ -300,50 +299,6 @@ impl Coordinator { { let name = plan.name.clone(); - match plan.source.data_source { - plan::DataSourceDesc::Ingestion(ref desc) - | plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => { - let cluster_id = plan - .in_cluster - .expect("ingestion plans must specify cluster"); - match desc.connection { - GenericSourceConnection::Postgres(_) - | GenericSourceConnection::MySql(_) - | GenericSourceConnection::SqlServer(_) - | GenericSourceConnection::Kafka(_) - | GenericSourceConnection::LoadGenerator(_) => { - if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) { - let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES - .get(self.catalog().system_config().dyncfgs()); - - if !enable_multi_replica_sources && cluster.replica_ids().len() > 1 - { - return Err(AdapterError::Unsupported( - "sources in clusters with >1 replicas", - )); - } - } - } - } - } - plan::DataSourceDesc::Webhook { .. } => { - let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster"); - if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) { - let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES - .get(self.catalog().system_config().dyncfgs()); - - if !enable_multi_replica_sources { - if cluster.replica_ids().len() > 1 { - return Err(AdapterError::Unsupported( - "webhook sources in clusters with >1 replicas", - )); - } - } - } - } - plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {} - } - // Attempt to reduce the `CHECK` expression, we timeout if this takes too long. if let mz_sql::plan::DataSourceDesc::Webhook { validate_using: Some(validate), diff --git a/src/sql/src/plan/error.rs b/src/sql/src/plan/error.rs index 1630ed2155d1c..c5a8efecf7143 100644 --- a/src/sql/src/plan/error.rs +++ b/src/sql/src/plan/error.rs @@ -279,15 +279,6 @@ pub enum PlanError { MissingName(CatalogItemType), InvalidRefreshAt, InvalidRefreshEveryAlignedTo, - CreateReplicaFailStorageObjects { - /// The current number of replicas on the cluster - current_replica_count: usize, - /// THe number of internal replicas on the cluster - internal_replica_count: usize, - /// The number of replicas that executing this command would have - /// created - hypothetical_replica_count: usize, - }, MismatchedObjectType { name: PartialItemName, is_type: ObjectType, @@ -379,23 +370,6 @@ impl PlanError { Self::CsrPurification(e) => e.detail(), Self::KafkaSinkPurification(e) => e.detail(), Self::IcebergSinkPurification(e) => e.detail(), - Self::CreateReplicaFailStorageObjects { - current_replica_count: current, - internal_replica_count: internal, - hypothetical_replica_count: target, - } => { - Some(format!( - "Currently have {} replica{}{}; command would result in {}", - current, - if *current != 1 { "s" } else { "" }, - if *internal > 0 { - format!(" ({} internal)", internal) - } else { - "".to_string() - }, - target - )) - }, Self::SubsourceNameConflict { name: _, upstream_references, @@ -833,9 +807,6 @@ impl fmt::Display for PlanError { write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \ and/or cast to a constant whose type is mz_timestamp") } - Self::CreateReplicaFailStorageObjects {..} => { - write!(f, "cannot create more than one replica of a cluster containing sources or sinks") - }, Self::MismatchedObjectType { name, is_type, diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index e33c482907639..9ece3881ca027 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -20,7 +20,6 @@ use std::time::Duration; use itertools::Itertools; use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION}; -use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES; use mz_arrow_util::builder::ArrowBuilder; use mz_auth::password::Password; use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId}; @@ -564,13 +563,6 @@ pub fn plan_create_webhook_source( // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value // we plan to normalize when we canonicalize the create statement. let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?; - let enable_multi_replica_sources = - ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()); - if !enable_multi_replica_sources { - if in_cluster.replica_ids().len() > 1 { - sql_bail!("cannot create webhook source in cluster with more than one replica") - } - } let create_sql = normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?; @@ -5176,15 +5168,6 @@ pub fn plan_create_cluster_replica( let cluster = scx .catalog .resolve_cluster(Some(&normalize::ident(of_cluster)))?; - let current_replica_count = cluster.replica_ids().iter().count(); - if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 { - let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count(); - return Err(PlanError::CreateReplicaFailStorageObjects { - current_replica_count, - internal_replica_count, - hypothetical_replica_count: current_replica_count + 1, - }); - } let config = plan_replica_config(scx, options)?; @@ -5510,24 +5493,6 @@ fn plan_drop_network_policy( } } -/// Returns `true` if the cluster has any object that requires a single replica. -/// Returns `false` if the cluster has no objects. -fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool { - // If this feature is enabled then all objects support multiple-replicas - if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) { - false - } else { - // Othewise we check for the existence of sources or sinks - cluster.bound_objects().iter().any(|id| { - let item = scx.catalog.get_item(id); - matches!( - item.item_type(), - CatalogItemType::Sink | CatalogItemType::Source - ) - }) - } -} - fn plan_drop_cluster_replica( scx: &StatementContext, if_exists: bool, @@ -6131,7 +6096,7 @@ pub fn plan_alter_cluster( scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?; } - if let Some(replication_factor) = replication_factor { + if replication_factor.is_some() { if schedule.is_some() && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual)) { @@ -6146,37 +6111,6 @@ pub fn plan_alter_cluster( ); } } - - let internal_replica_count = - cluster.replicas().iter().filter(|r| r.internal()).count(); - let hypothetical_replica_count = - internal_replica_count + usize::cast_from(replication_factor); - - // Total number of replicas running is internal replicas - // + replication factor. - if contains_single_replica_objects(scx, cluster) - && hypothetical_replica_count > 1 - { - return Err(PlanError::CreateReplicaFailStorageObjects { - current_replica_count: cluster.replica_ids().iter().count(), - internal_replica_count, - hypothetical_replica_count, - }); - } - } else if alter_strategy.is_some() { - // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration - // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should - // just fail. - let internal_replica_count = - cluster.replicas().iter().filter(|r| r.internal()).count(); - let hypothetical_replica_count = internal_replica_count * 2; - if contains_single_replica_objects(scx, cluster) { - return Err(PlanError::CreateReplicaFailStorageObjects { - current_replica_count: cluster.replica_ids().iter().count(), - internal_replica_count, - hypothetical_replica_count, - }); - } } } false => { diff --git a/test/cloudtest/test_managed_cluster.py b/test/cloudtest/test_managed_cluster.py index 0aae3a1e3e1ce..abae065597259 100644 --- a/test/cloudtest/test_managed_cluster.py +++ b/test/cloudtest/test_managed_cluster.py @@ -117,7 +117,6 @@ def test_zero_downtime_reconfiguration(mz: MaterializeApplication) -> None: mz.environmentd.sql( """ ALTER SYSTEM SET enable_zero_downtime_cluster_reconfiguration = true; - ALTER SYSTEM SET enable_multi_replica_sources = true; """, port="internal", user="mz_system",