From afc69411586d02906f4600a5e11c2d7d6b14ffde Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Thu, 14 May 2026 21:23:01 -0400 Subject: [PATCH] Retire landed merge train batches --- control_plane/service.py | 50 +++++++++++++++++++++++++++++++++++++++- tests/test_service.py | 32 +++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 1 deletion(-) diff --git a/control_plane/service.py b/control_plane/service.py index f7225d5..877298b 100644 --- a/control_plane/service.py +++ b/control_plane/service.py @@ -2712,6 +2712,28 @@ def _latest_merge_train_batch_landing_plan_record( return latest_record +def _latest_completed_merge_train_batch_landing_plan_record( + *, + record_store: _MergeTrainBatchLandingPlanRecordStore, + repository: str, + base_branch: str, +) -> MergeTrainBatchLandingPlanRecord | None: + records = record_store.list_merge_train_batch_landing_plan_records( + repository=repository, + base_branch=base_branch, + status="active", + limit=25, + ) + latest_record = _latest_merge_train_batch_landing_progress_record(records) + if latest_record is None: + return None + if not latest_record.landing_plan.entries: + return None + if any(entry.status != "merged" for entry in latest_record.landing_plan.entries): + return None + return latest_record + + def _latest_merge_train_stack_collapse_plan_record( *, record_store: _MergeTrainStackCollapsePlanRecordStore, @@ -7843,7 +7865,33 @@ def product_action_allowed( repository=controller_request.repository, base_branch=controller_request.base_branch, ) - if active_landing_record is not None: + completed_landing_record = None + if active_landing_record is None: + completed_landing_record = ( + _latest_completed_merge_train_batch_landing_plan_record( + record_store=landing_store, + repository=controller_request.repository, + base_branch=controller_request.base_branch, + ) + ) + if completed_landing_record is not None: + _validate_merge_train_landing_record_for_controller( + landing_record=completed_landing_record, + policy_key=repository_policy.policy_key, + policy_sha256=policy_record.policy_sha256, + ) + result = { + "repository": controller_request.repository, + "base_branch": controller_request.base_branch, + "mode": "dry-run", + "controller_action": "batch_landed", + "merge_train_batch_landing_plan_record_id": completed_landing_record.record_id, + "landing_plan": completed_landing_record.landing_plan.model_dump( + mode="json" + ), + } + driver_result = result + elif active_landing_record is not None: _validate_merge_train_landing_record_for_controller( landing_record=active_landing_record, policy_key=repository_policy.policy_key, diff --git a/tests/test_service.py b/tests/test_service.py index 5044fb6..2b79d1d 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -1847,6 +1847,27 @@ def test_merge_train_controller_advances_unstacked_batch_flow(self) -> None: "mutate": True, }, ) + landing_records_before_retire = FilesystemRecordStore( + state_dir + ).list_merge_train_batch_landing_plan_records( + repository="cbusillo/sellyouroutboard", base_branch="main" + ) + retire_status, retire_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + landing_records_after_retire = FilesystemRecordStore( + state_dir + ).list_merge_train_batch_landing_plan_records( + repository="cbusillo/sellyouroutboard", base_branch="main" + ) self.assertEqual(plan_status, 202) self.assertEqual(plan_payload["result"]["controller_action"], "plan_candidate") @@ -1861,6 +1882,17 @@ def test_merge_train_controller_advances_unstacked_batch_flow(self) -> None: self.assertEqual(land_status, 202) self.assertEqual(land_payload["result"]["controller_action"], "land_batch") self.assertEqual(land_payload["result"]["landing_plan"]["entries"][0]["status"], "merged") + self.assertEqual(retire_status, 202) + self.assertEqual(retire_payload["result"]["controller_action"], "batch_landed") + self.assertEqual(retire_payload["result"]["mode"], "dry-run") + self.assertEqual( + retire_payload["result"]["merge_train_batch_landing_plan_record_id"], + land_payload["records"]["merge_train_batch_landing_plan_record_id"], + ) + self.assertEqual( + tuple(record.record_id for record in landing_records_after_retire), + tuple(record.record_id for record in landing_records_before_retire), + ) def test_merge_train_controller_stops_after_candidate_failure(self) -> None: with (