Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ describe("buildFirstReplicaSourceStatisticsTable", () => {
await testdrive(
`
$ postgres-execute connection=postgres://mz_system:materialize@\${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_multi_replica_sources = true
ALTER SYSTEM SET default_timestamp_interval = 100
ALTER SYSTEM SET storage_statistics_collection_interval = 100
ALTER SYSTEM SET storage_statistics_interval = 200
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ describe("buildSourceStatisticsQuery", () => {
await testdrive(
`
$ postgres-execute connection=postgres://mz_system:materialize@\${testdrive.materialize-internal-sql-addr}
# Enable the feature flag to allow sources on multi-replica clusters
ALTER SYSTEM SET enable_multi_replica_sources = true
ALTER SYSTEM SET default_timestamp_interval = 100
ALTER SYSTEM SET storage_statistics_collection_interval = 100
ALTER SYSTEM SET storage_statistics_interval = 200
Expand Down
1 change: 0 additions & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ def get_minimal_system_parameters(
"enable_load_generator_counter": "true",
"enable_logical_compaction_window": "true",
"enable_multi_worker_storage_persist_sink": "true",
"enable_multi_replica_sources": "true",
"enable_rbac_checks": "true",
"enable_reduce_mfp_fusion": "true",
"enable_refresh_every_mvs": "true",
Expand Down
1 change: 0 additions & 1 deletion misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1787,7 +1787,6 @@ def __init__(
"enable_introspection_subscribes",
"plan_insights_notice_fast_path_clusters_optimize_duration",
"enable_expression_cache",
"enable_multi_replica_sources",
"enable_password_auth",
"persist_fast_path_order",
"enable_mcp_agent",
Expand Down
8 changes: 0 additions & 8 deletions src/adapter-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,6 @@ pub const ENABLE_EXPRESSION_CACHE: Config<bool> = Config::new(
"Use a cache to store optimized expressions to help speed up start times.",
);

/// Whether we allow sources in multi-replica clusters.
pub const ENABLE_MULTI_REPLICA_SOURCES: Config<bool> = Config::new(
"enable_multi_replica_sources",
true,
"Enable multi-replica sources.",
);

/// Whether to enable password authentication.
pub const ENABLE_PASSWORD_AUTH: Config<bool> = Config::new(
"enable_password_auth",
Expand Down Expand Up @@ -261,7 +254,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&ENABLE_FRONTEND_SUBSCRIBES)
.add(&PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION)
.add(&ENABLE_EXPRESSION_CACHE)
.add(&ENABLE_MULTI_REPLICA_SOURCES)
.add(&ENABLE_PASSWORD_AUTH)
.add(&OIDC_ISSUER)
.add(&OIDC_AUDIENCE)
Expand Down
47 changes: 1 addition & 46 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::{Future, StreamExt, future};
use itertools::Itertools;
use mz_adapter_types::compaction::CompactionWindow;
use mz_adapter_types::connection::ConnectionId;
use mz_adapter_types::dyncfgs::{ENABLE_MULTI_REPLICA_SOURCES, ENABLE_PASSWORD_AUTH};
use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
use mz_catalog::memory::error::ErrorKind;
use mz_catalog::memory::objects::{
CatalogItem, Connection, DataSourceDesc, Sink, Source, Table, TableDataSource, Type,
Expand Down Expand Up @@ -64,7 +64,6 @@ use mz_sql::plan::{
};
use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
use mz_storage_types::sinks::StorageSinkDesc;
use mz_storage_types::sources::GenericSourceConnection;
// Import `plan` module, but only import select elements to avoid merge conflicts on use statements.
use mz_sql::plan::{
AlterConnectionAction, AlterConnectionPlan, CreateSourcePlanBundle, ExplainSinkSchemaPlan,
Expand Down Expand Up @@ -300,50 +299,6 @@ impl Coordinator {
{
let name = plan.name.clone();

match plan.source.data_source {
plan::DataSourceDesc::Ingestion(ref desc)
| plan::DataSourceDesc::OldSyntaxIngestion { ref desc, .. } => {
let cluster_id = plan
.in_cluster
.expect("ingestion plans must specify cluster");
match desc.connection {
GenericSourceConnection::Postgres(_)
| GenericSourceConnection::MySql(_)
| GenericSourceConnection::SqlServer(_)
| GenericSourceConnection::Kafka(_)
| GenericSourceConnection::LoadGenerator(_) => {
if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
.get(self.catalog().system_config().dyncfgs());

if !enable_multi_replica_sources && cluster.replica_ids().len() > 1
{
return Err(AdapterError::Unsupported(
"sources in clusters with >1 replicas",
));
}
}
}
}
}
plan::DataSourceDesc::Webhook { .. } => {
let cluster_id = plan.in_cluster.expect("webhook plans must specify cluster");
if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES
.get(self.catalog().system_config().dyncfgs());

if !enable_multi_replica_sources {
if cluster.replica_ids().len() > 1 {
return Err(AdapterError::Unsupported(
"webhook sources in clusters with >1 replicas",
));
}
}
}
}
plan::DataSourceDesc::IngestionExport { .. } | plan::DataSourceDesc::Progress => {}
}

// Attempt to reduce the `CHECK` expression, we timeout if this takes too long.
if let mz_sql::plan::DataSourceDesc::Webhook {
validate_using: Some(validate),
Expand Down
29 changes: 0 additions & 29 deletions src/sql/src/plan/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,6 @@ pub enum PlanError {
MissingName(CatalogItemType),
InvalidRefreshAt,
InvalidRefreshEveryAlignedTo,
CreateReplicaFailStorageObjects {
/// The current number of replicas on the cluster
current_replica_count: usize,
/// THe number of internal replicas on the cluster
internal_replica_count: usize,
/// The number of replicas that executing this command would have
/// created
hypothetical_replica_count: usize,
},
MismatchedObjectType {
name: PartialItemName,
is_type: ObjectType,
Expand Down Expand Up @@ -379,23 +370,6 @@ impl PlanError {
Self::CsrPurification(e) => e.detail(),
Self::KafkaSinkPurification(e) => e.detail(),
Self::IcebergSinkPurification(e) => e.detail(),
Self::CreateReplicaFailStorageObjects {
current_replica_count: current,
internal_replica_count: internal,
hypothetical_replica_count: target,
} => {
Some(format!(
"Currently have {} replica{}{}; command would result in {}",
current,
if *current != 1 { "s" } else { "" },
if *internal > 0 {
format!(" ({} internal)", internal)
} else {
"".to_string()
},
target
))
},
Self::SubsourceNameConflict {
name: _,
upstream_references,
Expand Down Expand Up @@ -833,9 +807,6 @@ impl fmt::Display for PlanError {
write!(f, "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified \
and/or cast to a constant whose type is mz_timestamp")
}
Self::CreateReplicaFailStorageObjects {..} => {
write!(f, "cannot create more than one replica of a cluster containing sources or sinks")
},
Self::MismatchedObjectType {
name,
is_type,
Expand Down
68 changes: 1 addition & 67 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::time::Duration;

use itertools::Itertools;
use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
use mz_arrow_util::builder::ArrowBuilder;
use mz_auth::password::Password;
use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
Expand Down Expand Up @@ -564,13 +563,6 @@ pub fn plan_create_webhook_source(
// We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
// we plan to normalize when we canonicalize the create statement.
let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
let enable_multi_replica_sources =
ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
if !enable_multi_replica_sources {
if in_cluster.replica_ids().len() > 1 {
sql_bail!("cannot create webhook source in cluster with more than one replica")
}
}
let create_sql =
normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;

Expand Down Expand Up @@ -5176,15 +5168,6 @@ pub fn plan_create_cluster_replica(
let cluster = scx
.catalog
.resolve_cluster(Some(&normalize::ident(of_cluster)))?;
let current_replica_count = cluster.replica_ids().iter().count();
if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
return Err(PlanError::CreateReplicaFailStorageObjects {
current_replica_count,
internal_replica_count,
hypothetical_replica_count: current_replica_count + 1,
});
}

let config = plan_replica_config(scx, options)?;

Expand Down Expand Up @@ -5510,24 +5493,6 @@ fn plan_drop_network_policy(
}
}

/// Returns `true` if the cluster has any object that requires a single replica.
/// Returns `false` if the cluster has no objects.
fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
// If this feature is enabled then all objects support multiple-replicas
if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
false
} else {
// Othewise we check for the existence of sources or sinks
cluster.bound_objects().iter().any(|id| {
let item = scx.catalog.get_item(id);
matches!(
item.item_type(),
CatalogItemType::Sink | CatalogItemType::Source
)
})
}
}

fn plan_drop_cluster_replica(
scx: &StatementContext,
if_exists: bool,
Expand Down Expand Up @@ -6131,7 +6096,7 @@ pub fn plan_alter_cluster(
scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
}

if let Some(replication_factor) = replication_factor {
if replication_factor.is_some() {
if schedule.is_some()
&& !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
{
Expand All @@ -6146,37 +6111,6 @@ pub fn plan_alter_cluster(
);
}
}

let internal_replica_count =
cluster.replicas().iter().filter(|r| r.internal()).count();
let hypothetical_replica_count =
internal_replica_count + usize::cast_from(replication_factor);

// Total number of replicas running is internal replicas
// + replication factor.
if contains_single_replica_objects(scx, cluster)
&& hypothetical_replica_count > 1
{
return Err(PlanError::CreateReplicaFailStorageObjects {
current_replica_count: cluster.replica_ids().iter().count(),
internal_replica_count,
hypothetical_replica_count,
});
}
} else if alter_strategy.is_some() {
// AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
// and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
// just fail.
let internal_replica_count =
cluster.replicas().iter().filter(|r| r.internal()).count();
let hypothetical_replica_count = internal_replica_count * 2;
if contains_single_replica_objects(scx, cluster) {
return Err(PlanError::CreateReplicaFailStorageObjects {
current_replica_count: cluster.replica_ids().iter().count(),
internal_replica_count,
hypothetical_replica_count,
});
}
}
}
false => {
Expand Down
1 change: 0 additions & 1 deletion test/cloudtest/test_managed_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def test_zero_downtime_reconfiguration(mz: MaterializeApplication) -> None:
mz.environmentd.sql(
"""
ALTER SYSTEM SET enable_zero_downtime_cluster_reconfiguration = true;
ALTER SYSTEM SET enable_multi_replica_sources = true;
""",
port="internal",
user="mz_system",
Expand Down
Loading