diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index badb084ef12b7..97ff33ad5f5f9 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -832,6 +832,7 @@ pub struct AlterClusterWaitForHydrated { validity: PlanValidity, plan: plan::AlterClusterPlan, new_config: ClusterVariantManaged, + workload_class: Option, timeout_time: Instant, on_timeout: OnTimeoutAction, } @@ -841,6 +842,7 @@ pub struct AlterClusterFinalize { validity: PlanValidity, plan: plan::AlterClusterPlan, new_config: ClusterVariantManaged, + workload_class: Option, } #[derive(Debug)] diff --git a/src/adapter/src/coord/sequencer/inner/cluster.rs b/src/adapter/src/coord/sequencer/inner/cluster.rs index b51b6aae5a752..d5e88a26a203b 100644 --- a/src/adapter/src/coord/sequencer/inner/cluster.rs +++ b/src/adapter/src/coord/sequencer/inner/cluster.rs @@ -76,6 +76,7 @@ impl Staged for ClusterStage { validity, plan, new_config, + workload_class, timeout_time, on_timeout, } = stage; @@ -84,6 +85,7 @@ impl Staged for ClusterStage { ctx.session(), plan, new_config, + workload_class, timeout_time, on_timeout, validity, @@ -96,6 +98,7 @@ impl Staged for ClusterStage { ctx.session(), stage.plan.clone(), stage.new_config.clone(), + stage.workload_class.clone(), ) .await } @@ -295,6 +298,7 @@ impl Coordinator { let span = Span::current(); let plan = plan.clone(); let duration = duration.clone().to_owned(); + let workload_class = new_config.workload_class.clone(); Ok(StageResult::Handle(mz_ore::task::spawn( || "Finalize Alter Cluster", async move { @@ -303,6 +307,7 @@ impl Coordinator { validity, plan, new_config: new_config_managed, + workload_class, }); Ok(Box::new(stage)) } @@ -317,6 +322,7 @@ impl Coordinator { validity, plan: plan.clone(), new_config: new_config_managed.clone(), + workload_class: new_config.workload_class.clone(), timeout_time: Instant::now() + timeout.to_owned(), on_timeout: on_timeout.to_owned(), }), @@ -362,9 +368,9 @@ impl Coordinator { .. }: AlterClusterPlan, new_config: ClusterVariantManaged, + workload_class: Option, ) -> Result>, AdapterError> { let cluster = self.catalog.get_cluster(cluster_id); - let workload_class = cluster.config.workload_class.clone(); let mut ops = vec![]; // Gather the ops to remove the non pending replicas @@ -484,6 +490,7 @@ impl Coordinator { session: &Session, plan: AlterClusterPlan, new_config: ClusterVariantManaged, + workload_class: Option, timeout_time: Instant, on_timeout: OnTimeoutAction, validity: PlanValidity, @@ -529,6 +536,7 @@ impl Coordinator { validity, plan, new_config, + workload_class, }); Ok(Box::new(stage)) } @@ -562,7 +570,8 @@ impl Coordinator { Ok(Box::new(ClusterStage::Finalize(AlterClusterFinalize { validity, plan, - new_config, + new_config: new_config.clone(), + workload_class: workload_class.clone(), }))) } else { // Check later @@ -571,6 +580,7 @@ impl Coordinator { validity, plan, new_config, + workload_class, timeout_time, on_timeout, }); diff --git a/test/sqllogictest/managed_cluster.slt b/test/sqllogictest/managed_cluster.slt index 2053eb650143b..2ad2cec301c7f 100644 --- a/test/sqllogictest/managed_cluster.slt +++ b/test/sqllogictest/managed_cluster.slt @@ -416,5 +416,41 @@ ALTER CLUSTER foo set (SIZE 'scale=1,workers=4') WITH (WAIT UNTIL READY (TIMEOUT statement ok ALTER CLUSTER foo set (SIZE 'scale=1,workers=4') WITH (WAIT UNTIL READY (TIMEOUT '10ms', ON TIMEOUT 'ROLLBACK') ) +statement ok +DROP CLUSTER foo + +# Regression: zero-downtime finalization (PR #28836) reads workload_class from +# the catalog instead of the planned config, silently dropping the change. + +simple conn=mz_system,user=mz_system +CREATE CLUSTER wc_test SIZE 'scale=1,workers=1' +---- +COMPLETE 0 + +simple conn=mz_system,user=mz_system +ALTER CLUSTER wc_test SET (SIZE 'scale=1,workers=2', WORKLOAD CLASS 'production') WITH (WAIT FOR '0s') +---- +COMPLETE 0 + +query T +SELECT workload_class FROM mz_internal.mz_cluster_workload_classes WHERE id = (SELECT id FROM mz_clusters WHERE name = 'wc_test') +---- +production + +simple conn=mz_system,user=mz_system +ALTER CLUSTER wc_test SET (SIZE 'scale=1,workers=4', WORKLOAD CLASS NULL) WITH (WAIT UNTIL READY (TIMEOUT '0s', ON TIMEOUT 'COMMIT')) +---- +COMPLETE 0 + +query T +SELECT workload_class FROM mz_internal.mz_cluster_workload_classes WHERE id = (SELECT id FROM mz_clusters WHERE name = 'wc_test') +---- +NULL + +simple conn=mz_system,user=mz_system +DROP CLUSTER wc_test +---- +COMPLETE 0 + # Restore pristine server state reset-server