Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion control_plane/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2712,6 +2712,28 @@ def _latest_merge_train_batch_landing_plan_record(
return latest_record


def _latest_completed_merge_train_batch_landing_plan_record(
*,
record_store: _MergeTrainBatchLandingPlanRecordStore,
repository: str,
base_branch: str,
) -> MergeTrainBatchLandingPlanRecord | None:
records = record_store.list_merge_train_batch_landing_plan_records(
repository=repository,
base_branch=base_branch,
status="active",
limit=25,
)
latest_record = _latest_merge_train_batch_landing_progress_record(records)
if latest_record is None:
return None
if not latest_record.landing_plan.entries:
return None
if any(entry.status != "merged" for entry in latest_record.landing_plan.entries):
return None
return latest_record


def _latest_merge_train_stack_collapse_plan_record(
*,
record_store: _MergeTrainStackCollapsePlanRecordStore,
Expand Down Expand Up @@ -7843,7 +7865,33 @@ def product_action_allowed(
repository=controller_request.repository,
base_branch=controller_request.base_branch,
)
if active_landing_record is not None:
completed_landing_record = None
if active_landing_record is None:
completed_landing_record = (
_latest_completed_merge_train_batch_landing_plan_record(
record_store=landing_store,
repository=controller_request.repository,
base_branch=controller_request.base_branch,
)
)
if completed_landing_record is not None:
_validate_merge_train_landing_record_for_controller(
landing_record=completed_landing_record,
policy_key=repository_policy.policy_key,
policy_sha256=policy_record.policy_sha256,
)
result = {
"repository": controller_request.repository,
"base_branch": controller_request.base_branch,
"mode": "dry-run",
"controller_action": "batch_landed",
"merge_train_batch_landing_plan_record_id": completed_landing_record.record_id,
"landing_plan": completed_landing_record.landing_plan.model_dump(
mode="json"
),
}
driver_result = result
elif active_landing_record is not None:
_validate_merge_train_landing_record_for_controller(
landing_record=active_landing_record,
policy_key=repository_policy.policy_key,
Expand Down
32 changes: 32 additions & 0 deletions tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1847,6 +1847,27 @@ def test_merge_train_controller_advances_unstacked_batch_flow(self) -> None:
"mutate": True,
},
)
landing_records_before_retire = FilesystemRecordStore(
state_dir
).list_merge_train_batch_landing_plan_records(
repository="cbusillo/sellyouroutboard", base_branch="main"
)
retire_status, retire_payload = _invoke_app(
app,
method="POST",
path="/v1/work-graph/merge-train/controller/run-once",
payload={
"schema_version": 1,
"repository": "cbusillo/sellyouroutboard",
"base_branch": "main",
"mutate": True,
},
)
landing_records_after_retire = FilesystemRecordStore(
state_dir
).list_merge_train_batch_landing_plan_records(
repository="cbusillo/sellyouroutboard", base_branch="main"
)

self.assertEqual(plan_status, 202)
self.assertEqual(plan_payload["result"]["controller_action"], "plan_candidate")
Expand All @@ -1861,6 +1882,17 @@ def test_merge_train_controller_advances_unstacked_batch_flow(self) -> None:
self.assertEqual(land_status, 202)
self.assertEqual(land_payload["result"]["controller_action"], "land_batch")
self.assertEqual(land_payload["result"]["landing_plan"]["entries"][0]["status"], "merged")
self.assertEqual(retire_status, 202)
self.assertEqual(retire_payload["result"]["controller_action"], "batch_landed")
self.assertEqual(retire_payload["result"]["mode"], "dry-run")
self.assertEqual(
retire_payload["result"]["merge_train_batch_landing_plan_record_id"],
land_payload["records"]["merge_train_batch_landing_plan_record_id"],
)
self.assertEqual(
tuple(record.record_id for record in landing_records_after_retire),
tuple(record.record_id for record in landing_records_before_retire),
)

def test_merge_train_controller_stops_after_candidate_failure(self) -> None:
with (
Expand Down