From 95868f123dcc0174ca38d5308e1bc277290878ae Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Thu, 14 May 2026 20:08:33 -0400 Subject: [PATCH] Add merge train controller run-once endpoint --- control_plane/service.py | 699 +++++++++++++++++++++++++++++++++++++ docs/merge-train-policy.md | 12 + docs/service-boundary.md | 12 + tests/test_service.py | 275 +++++++++++++++ 4 files changed, 998 insertions(+) diff --git a/control_plane/service.py b/control_plane/service.py index 7f105a5..fa249f2 100644 --- a/control_plane/service.py +++ b/control_plane/service.py @@ -352,6 +352,7 @@ _MERGE_TRAIN_BATCH_CANDIDATE_RUN_ONCE_ROUTE = "/v1/work-graph/merge-train/batch-candidate/run-once" _MERGE_TRAIN_BATCH_LANDING_RUN_ONCE_ROUTE = "/v1/work-graph/merge-train/batch-landing/run-once" _MERGE_TRAIN_STACK_COLLAPSE_RUN_ONCE_ROUTE = "/v1/work-graph/merge-train/stack-collapse/run-once" +_MERGE_TRAIN_CONTROLLER_RUN_ONCE_ROUTE = "/v1/work-graph/merge-train/controller/run-once" _MERGE_TRAIN_RUN_ONCE_ROUTE = "/v1/work-graph/merge-train/run-once" _EVERY_CODE_GITHUB_WEBHOOK_SECRET_ENV_KEY = "LAUNCHPLANE_EVERY_CODE_GITHUB_WEBHOOK_SECRET" @@ -466,6 +467,29 @@ def _validate_envelope(self) -> "MergeTrainStackCollapseRunOnceEnvelope": return self +class MergeTrainControllerRunOnceEnvelope(BaseModel): + model_config = ConfigDict(extra="forbid") + + schema_version: int = Field(default=1, ge=1) + repository: str + base_branch: str = "main" + mutate: bool = False + github_api_base_url: str = "https://api.github.com" + + @model_validator(mode="after") + def _validate_envelope(self) -> "MergeTrainControllerRunOnceEnvelope": + self.repository = self.repository.strip() + self.base_branch = self.base_branch.strip() + self.github_api_base_url = self.github_api_base_url.strip() or "https://api.github.com" + if not self.repository: + raise ValueError("merge train controller requires repository") + if "/" not in self.repository: + raise ValueError("merge train repository must be owner/name") + if not self.base_branch: + raise ValueError("merge train controller requires base_branch") + return self + + class MergeTrainAdmissionEnvelope(BaseModel): model_config = ConfigDict(extra="forbid") @@ -2292,6 +2316,7 @@ def _build_write_routes() -> frozenset[str]: _EVERY_CODE_GITHUB_WEBHOOK_ROUTE, _MERGE_TRAIN_BATCH_CANDIDATE_RUN_ONCE_ROUTE, _MERGE_TRAIN_BATCH_LANDING_RUN_ONCE_ROUTE, + _MERGE_TRAIN_CONTROLLER_RUN_ONCE_ROUTE, _MERGE_TRAIN_STACK_COLLAPSE_RUN_ONCE_ROUTE, _MERGE_TRAIN_RUN_ONCE_ROUTE, "/v1/agent/write-intents/evaluate", @@ -2626,6 +2651,184 @@ def _read_merge_train_stack_collapse_plan_record( raise ValueError("merge train stack collapse plan record not found") +def _latest_merge_train_batch_candidate_record( + *, + record_store: _MergeTrainBatchCandidateRecordStore, + repository: str, + base_branch: str, +) -> MergeTrainBatchCandidateRecord | None: + records = record_store.list_merge_train_batch_candidate_records( + repository=repository, + base_branch=base_branch, + status="active", + limit=25, + ) + latest_record = _latest_merge_train_batch_candidate_progress_record(records) + if latest_record is None: + return None + terminal_statuses = {"passed", "failed", "stale", "blocked"} + if latest_record.candidate.status in terminal_statuses: + return None + return latest_record + + +def _latest_passed_merge_train_batch_candidate_record( + *, + record_store: _MergeTrainBatchCandidateRecordStore, + repository: str, + base_branch: str, +) -> MergeTrainBatchCandidateRecord | None: + records = record_store.list_merge_train_batch_candidate_records( + repository=repository, + base_branch=base_branch, + status="active", + limit=25, + ) + latest_record = _latest_merge_train_batch_candidate_progress_record(records) + if latest_record is None: + return None + if latest_record.candidate.status != "passed": + return None + return latest_record + + +def _latest_merge_train_batch_landing_plan_record( + *, + record_store: _MergeTrainBatchLandingPlanRecordStore, + repository: str, + base_branch: str, +) -> MergeTrainBatchLandingPlanRecord | None: + records = record_store.list_merge_train_batch_landing_plan_records( + repository=repository, + base_branch=base_branch, + status="active", + limit=25, + ) + latest_record = _latest_merge_train_batch_landing_progress_record(records) + if latest_record is None: + return None + if not any(entry.status == "planned" for entry in latest_record.landing_plan.entries): + return None + return latest_record + + +def _latest_merge_train_stack_collapse_plan_record( + *, + record_store: _MergeTrainStackCollapsePlanRecordStore, + repository: str, + base_branch: str, + plan_status: str, +) -> MergeTrainStackCollapsePlanRecord | None: + records = record_store.list_merge_train_stack_collapse_plan_records( + repository=repository, + base_branch=base_branch, + status="active", + limit=25, + ) + latest_record = _latest_merge_train_stack_collapse_progress_record(records) + if latest_record is None: + return None + if latest_record.plan.status != plan_status: + return None + return latest_record + + +def _validate_merge_train_candidate_record_for_controller( + *, + candidate_record: MergeTrainBatchCandidateRecord, + policy_key: str, + policy_sha256: str, +) -> None: + if candidate_record.candidate.policy_key != policy_key: + raise ValueError("merge train candidate policy key no longer matches") + if candidate_record.candidate.policy_sha256 != policy_sha256: + raise ValueError("merge train candidate policy digest no longer matches") + + +def _validate_merge_train_landing_record_for_controller( + *, + landing_record: MergeTrainBatchLandingPlanRecord, + policy_key: str, + policy_sha256: str, +) -> None: + if landing_record.landing_plan.policy_key != policy_key: + raise ValueError("merge train landing plan policy key no longer matches") + if landing_record.landing_plan.policy_sha256 != policy_sha256: + raise ValueError("merge train landing plan policy digest no longer matches") + + +def _latest_merge_train_batch_candidate_progress_record( + records: tuple[MergeTrainBatchCandidateRecord, ...], +) -> MergeTrainBatchCandidateRecord | None: + if not records: + return None + status_rank = { + "planned": 0, + "building": 1, + "ready_for_checks": 2, + "passed": 3, + "failed": 4, + "stale": 4, + "blocked": 4, + } + return max( + records, + key=lambda record: ( + record.updated_at, + status_rank[record.candidate.status], + record.record_id, + ), + ) + + +def _latest_merge_train_batch_landing_progress_record( + records: tuple[MergeTrainBatchLandingPlanRecord, ...], +) -> MergeTrainBatchLandingPlanRecord | None: + if not records: + return None + return max( + records, + key=lambda record: ( + record.updated_at, + max( + _merge_train_batch_landing_entry_rank(entry.status) + for entry in record.landing_plan.entries + ), + record.record_id, + ), + ) + + +def _latest_merge_train_stack_collapse_progress_record( + records: tuple[MergeTrainStackCollapsePlanRecord, ...], +) -> MergeTrainStackCollapsePlanRecord | None: + if not records: + return None + status_rank = { + "planned": 0, + "collapsing": 1, + "waiting_for_root_checks": 2, + "ready_for_train": 3, + "blocked": 4, + "stale": 4, + } + return max( + records, + key=lambda record: (record.updated_at, status_rank[record.plan.status], record.record_id), + ) + + +def _merge_train_batch_landing_entry_rank(status: str) -> int: + return { + "planned": 0, + "merging": 1, + "merged": 2, + "blocked": 3, + "stale": 3, + "skipped": 3, + }[status] + + def _validate_stack_collapse_record_for_landing( *, collapse_record: MergeTrainStackCollapsePlanRecord, @@ -7559,6 +7762,502 @@ def product_action_allowed( "dry_run_result": dry_run_result.model_dump(mode="json"), "candidate": result["candidate"], } + elif path == _MERGE_TRAIN_CONTROLLER_RUN_ONCE_ROUTE: + controller_request = MergeTrainControllerRunOnceEnvelope.model_validate(payload) + policy_record = resolve_merge_train_policy_record(record_store) + policy = policy_record.policy + repository_policy = policy.find_repository_policy( + repository=controller_request.repository, + base_branch=controller_request.base_branch, + ) + if not authz_policy.allows( + identity=identity, + action=repository_policy.service_authz.action, + product=repository_policy.service_authz.product, + context=repository_policy.service_authz.context, + ): + return _json_response( + start_response=start_response, + status_code=403, + payload={ + "status": "rejected", + "trace_id": request_trace_id, + "error": { + "code": "authorization_denied", + "message": "Workflow cannot run the requested merge train policy.", + }, + }, + ) + token_env = repository_policy.github_token.env_var + if not token_env: + return _json_response( + start_response=start_response, + status_code=503, + payload={ + "status": "rejected", + "trace_id": request_trace_id, + "error": { + "code": "github_token_not_configured", + "message": "Merge train policy does not define a GitHub token environment variable.", + }, + }, + ) + token = os.environ.get(token_env, "").strip() + if not token: + return _json_response( + start_response=start_response, + status_code=503, + payload={ + "status": "rejected", + "trace_id": request_trace_id, + "error": { + "code": "github_token_not_configured", + "message": "Configured merge train GitHub token is not available.", + }, + }, + ) + recorded_at = _utc_now_timestamp() + candidate_store = _merge_train_batch_candidate_record_store(record_store) + landing_store = _merge_train_batch_landing_plan_record_store(record_store) + collapse_store = _merge_train_stack_collapse_plan_record_store(record_store) + transport = UrllibMergeTrainGitHubTransport( + token=token, + api_base_url=controller_request.github_api_base_url, + ) + github_client = GitHubMergeTrainClient(transport=transport) + + active_landing_record = _latest_merge_train_batch_landing_plan_record( + record_store=landing_store, + repository=controller_request.repository, + base_branch=controller_request.base_branch, + ) + if active_landing_record is not None: + _validate_merge_train_landing_record_for_controller( + landing_record=active_landing_record, + policy_key=repository_policy.policy_key, + policy_sha256=policy_record.policy_sha256, + ) + collapse_record = _latest_merge_train_stack_collapse_plan_record( + record_store=collapse_store, + repository=controller_request.repository, + base_branch=controller_request.base_branch, + plan_status="waiting_for_root_checks", + ) + if collapse_record is not None: + _validate_stack_collapse_record_for_landing( + collapse_record=collapse_record, + landing_plan=active_landing_record.landing_plan, + policy_sha256=policy_record.policy_sha256, + ) + if not repository_policy.stack_child_disposition_label: + raise ValueError( + "merge train stack child disposition requires stack_child_disposition_label policy" + ) + if not controller_request.mutate: + result = { + "repository": controller_request.repository, + "base_branch": controller_request.base_branch, + "mode": "dry-run", + "controller_action": "land_batch", + "merge_train_batch_landing_plan_record_id": active_landing_record.record_id, + } + if collapse_record is not None: + result["merge_train_stack_collapse_plan_record_id"] = ( + collapse_record.record_id + ) + driver_result = result + else: + landed_plan = github_client.land_batch_candidate( + landing_plan=active_landing_record.landing_plan + ) + landed_record = build_merge_train_batch_landing_plan_record( + landing_plan=landed_plan, + source=f"service:controller:land:{request_trace_id}", + updated_at=recorded_at, + ) + landing_store.write_merge_train_batch_landing_plan_record(landed_record) + result = { + "merge_train_batch_landing_plan_record_id": landed_record.record_id, + "repository": landed_plan.repository, + "base_branch": landed_plan.base_branch, + "mode": "land", + "controller_action": "land_batch", + "landing_plan": landed_plan.model_dump(mode="json"), + } + if collapse_record is not None: + root_entry = next( + ( + entry + for entry in landed_plan.entries + if entry.pull_request_number + == collapse_record.plan.root_pull_request_number + ), + None, + ) + if root_entry is None or root_entry.status != "merged": + raise ValueError( + "merge train stack child disposition requires merged root PR" + ) + reconciled_collapse_plan = ( + reconcile_merge_train_stack_children_after_root_landing( + plan=collapse_record.plan, + disposition_client=github_client, + root_merge_commit_sha=root_entry.merge_commit_sha, + label=repository_policy.stack_child_disposition_label, + updated_at=recorded_at, + ) + ) + reconciled_record = build_merge_train_stack_collapse_plan_record( + plan=reconciled_collapse_plan, + source=f"service:controller:child-disposition:{request_trace_id}", + updated_at=recorded_at, + ) + collapse_store.write_merge_train_stack_collapse_plan_record( + reconciled_record + ) + result["merge_train_stack_collapse_plan_record_id"] = ( + reconciled_record.record_id + ) + result["stack_collapse_plan"] = reconciled_collapse_plan.model_dump( + mode="json" + ) + driver_result = result + else: + active_candidate_record = _latest_merge_train_batch_candidate_record( + record_store=candidate_store, + repository=controller_request.repository, + base_branch=controller_request.base_branch, + ) + passed_candidate_record = None + if active_candidate_record is None: + passed_candidate_record = _latest_passed_merge_train_batch_candidate_record( + record_store=candidate_store, + repository=controller_request.repository, + base_branch=controller_request.base_branch, + ) + if active_candidate_record is not None: + _validate_merge_train_candidate_record_for_controller( + candidate_record=active_candidate_record, + policy_key=repository_policy.policy_key, + policy_sha256=policy_record.policy_sha256, + ) + if active_candidate_record.candidate.status in {"planned", "building"}: + controller_action = "build_candidate" + if controller_request.mutate: + candidate = github_client.build_batch_candidate( + candidate=active_candidate_record.candidate + ) + else: + candidate = active_candidate_record.candidate + else: + controller_action = "observe_candidate" + if controller_request.mutate: + candidate = github_client.observe_batch_candidate_checks( + candidate=active_candidate_record.candidate + ) + else: + candidate = active_candidate_record.candidate + result = { + "repository": controller_request.repository, + "base_branch": controller_request.base_branch, + "mode": "dry-run" + if not controller_request.mutate + else controller_action, + "controller_action": controller_action, + "merge_train_batch_candidate_record_id": active_candidate_record.record_id, + } + if controller_request.mutate: + updated_candidate_record = build_merge_train_batch_candidate_record( + candidate=candidate, + source=f"service:controller:{controller_action}:{request_trace_id}", + updated_at=recorded_at, + ) + candidate_store.write_merge_train_batch_candidate_record( + updated_candidate_record + ) + result["merge_train_batch_candidate_record_id"] = ( + updated_candidate_record.record_id + ) + result["candidate"] = candidate.model_dump(mode="json") + driver_result = result + elif passed_candidate_record is not None: + _validate_merge_train_candidate_record_for_controller( + candidate_record=passed_candidate_record, + policy_key=repository_policy.policy_key, + policy_sha256=policy_record.policy_sha256, + ) + if not controller_request.mutate: + result = { + "repository": controller_request.repository, + "base_branch": controller_request.base_branch, + "mode": "dry-run", + "controller_action": "plan_landing", + "merge_train_batch_candidate_record_id": passed_candidate_record.record_id, + } + driver_result = result + else: + landing_plan = build_merge_train_batch_landing_plan( + candidate=passed_candidate_record.candidate, + merge_method=repository_policy.merge_method, + created_at=recorded_at, + ) + landing_record = build_merge_train_batch_landing_plan_record( + landing_plan=landing_plan, + source=f"service:controller:landing-plan:{request_trace_id}", + updated_at=recorded_at, + ) + landing_store.write_merge_train_batch_landing_plan_record( + landing_record + ) + result = { + "merge_train_batch_landing_plan_record_id": landing_record.record_id, + "repository": landing_plan.repository, + "base_branch": landing_plan.base_branch, + "mode": "plan_landing", + "controller_action": "plan_landing", + "landing_plan": landing_plan.model_dump(mode="json"), + } + driver_result = result + else: + waiting_collapse_record = _latest_merge_train_stack_collapse_plan_record( + record_store=collapse_store, + repository=controller_request.repository, + base_branch=controller_request.base_branch, + plan_status="waiting_for_root_checks", + ) + if waiting_collapse_record is not None: + snapshot = GitHubMergeTrainSnapshotReader( + transport=transport + ).read_merge_train_snapshot( + repository=controller_request.repository, + base_branch=controller_request.base_branch, + ) + root_pull_request = next( + ( + pull_request + for pull_request in snapshot.pull_requests + if pull_request.number + == waiting_collapse_record.plan.root_pull_request_number + ), + None, + ) + if root_pull_request is None: + raise ValueError("merge train stack collapse root PR is missing") + if root_pull_request.head_sha != _stack_collapse_expected_root_head_sha( + waiting_collapse_record.plan + ): + raise ValueError( + "merge train stack collapse root PR head no longer matches" + ) + root_snapshot = snapshot.model_copy( + update={"pull_requests": (root_pull_request,)} + ) + dry_run_result = build_merge_train_dry_run_result( + policy=policy, snapshot=root_snapshot + ) + if dry_run_result.intended_next_action != "merge": + result = { + "repository": controller_request.repository, + "base_branch": controller_request.base_branch, + "mode": "dry-run", + "controller_action": "wait_for_root_checks", + "merge_train_stack_collapse_plan_record_id": waiting_collapse_record.record_id, + "dry_run_result": dry_run_result.model_dump(mode="json"), + } + driver_result = result + elif not controller_request.mutate: + result = { + "repository": controller_request.repository, + "base_branch": controller_request.base_branch, + "mode": "dry-run", + "controller_action": "admit_collapsed_root", + "merge_train_stack_collapse_plan_record_id": waiting_collapse_record.record_id, + "dry_run_result": dry_run_result.model_dump(mode="json"), + } + driver_result = result + else: + candidate = build_merge_train_batch_candidate( + dry_run_result=dry_run_result, + base_sha=root_snapshot.base_sha, + policy_sha256=policy_record.policy_sha256, + created_at=recorded_at, + ) + candidate_record = build_merge_train_batch_candidate_record( + candidate=candidate, + source=f"service:controller:stack-collapse-admit:{request_trace_id}", + updated_at=recorded_at, + ) + candidate_store.write_merge_train_batch_candidate_record( + candidate_record + ) + result = { + "merge_train_batch_candidate_record_id": candidate_record.record_id, + "merge_train_stack_collapse_plan_record_id": waiting_collapse_record.record_id, + "repository": candidate.repository, + "base_branch": candidate.base_branch, + "mode": "admit_collapsed_root", + "controller_action": "admit_collapsed_root", + "dry_run_result": dry_run_result.model_dump(mode="json"), + "candidate": candidate.model_dump(mode="json"), + } + driver_result = result + else: + planned_collapse_record = ( + _latest_merge_train_stack_collapse_plan_record( + record_store=collapse_store, + repository=controller_request.repository, + base_branch=controller_request.base_branch, + plan_status="planned", + ) + ) + if planned_collapse_record is not None: + result = { + "repository": controller_request.repository, + "base_branch": controller_request.base_branch, + "mode": "dry-run" + if not controller_request.mutate + else "execute_stack_collapse", + "controller_action": "execute_stack_collapse", + "merge_train_stack_collapse_plan_record_id": planned_collapse_record.record_id, + } + if controller_request.mutate: + executed_plan = execute_merge_train_stack_collapse_plan( + plan=planned_collapse_record.plan, + branch_client=github_client, + updated_at=recorded_at, + ) + executed_record = build_merge_train_stack_collapse_plan_record( + plan=executed_plan, + source=f"service:controller:stack-collapse-execute:{request_trace_id}", + updated_at=recorded_at, + ) + collapse_store.write_merge_train_stack_collapse_plan_record( + executed_record + ) + result["merge_train_stack_collapse_plan_record_id"] = ( + executed_record.record_id + ) + result["stack_collapse_plan"] = executed_plan.model_dump( + mode="json" + ) + else: + result["stack_collapse_plan"] = ( + planned_collapse_record.plan.model_dump(mode="json") + ) + driver_result = result + else: + snapshot = GitHubMergeTrainSnapshotReader( + transport=transport + ).read_merge_train_snapshot( + repository=controller_request.repository, + base_branch=controller_request.base_branch, + ) + dry_run_result = build_merge_train_dry_run_result( + policy=policy, snapshot=snapshot + ) + selected_pr = dry_run_result.selected_pr + if ( + selected_pr is not None + and _merge_train_snapshot_has_stack_topology( + snapshot=snapshot, dry_run_result=dry_run_result + ) + ): + stack_discovery = discover_merge_train_stack( + snapshot=snapshot, + root_pull_request_number=selected_pr.number, + ) + else: + stack_discovery = None + if ( + stack_discovery is not None + and stack_discovery.status == "ready_for_collapse" + ): + controller_action = "plan_stack_collapse" + stack_collapse_plan = build_merge_train_stack_collapse_plan( + discovery_result=stack_discovery, + policy_key=dry_run_result.policy_key, + policy_sha256=policy_record.policy_sha256, + created_at=recorded_at, + ) + result = { + "repository": stack_collapse_plan.repository, + "base_branch": stack_collapse_plan.base_branch, + "mode": "dry-run" + if not controller_request.mutate + else controller_action, + "controller_action": controller_action, + "dry_run_result": dry_run_result.model_dump(mode="json"), + "stack_discovery": stack_discovery.model_dump(mode="json"), + "stack_collapse_plan": stack_collapse_plan.model_dump( + mode="json" + ), + } + if controller_request.mutate: + stack_collapse_record = build_merge_train_stack_collapse_plan_record( + plan=stack_collapse_plan, + source=f"service:controller:stack-collapse-plan:{request_trace_id}", + updated_at=recorded_at, + ) + collapse_store.write_merge_train_stack_collapse_plan_record( + stack_collapse_record + ) + result["merge_train_stack_collapse_plan_record_id"] = ( + stack_collapse_record.record_id + ) + driver_result = result + elif ( + stack_discovery is not None + and stack_discovery.status == "unsupported" + ): + result = { + "repository": controller_request.repository, + "base_branch": controller_request.base_branch, + "mode": "dry-run", + "controller_action": "stack_unsupported", + "dry_run_result": dry_run_result.model_dump(mode="json"), + "stack_discovery": stack_discovery.model_dump(mode="json"), + } + driver_result = result + elif dry_run_result.intended_next_action not in {"merge", "idle"}: + result = { + "repository": controller_request.repository, + "base_branch": controller_request.base_branch, + "mode": "dry-run", + "controller_action": dry_run_result.intended_next_action, + "dry_run_result": dry_run_result.model_dump(mode="json"), + } + driver_result = result + else: + controller_action = "plan_candidate" + candidate = build_merge_train_batch_candidate( + dry_run_result=dry_run_result, + base_sha=snapshot.base_sha, + policy_sha256=policy_record.policy_sha256, + created_at=recorded_at, + ) + result = { + "repository": candidate.repository, + "base_branch": candidate.base_branch, + "mode": "dry-run" + if not controller_request.mutate + else controller_action, + "controller_action": controller_action, + "dry_run_result": dry_run_result.model_dump(mode="json"), + "candidate": candidate.model_dump(mode="json"), + } + if controller_request.mutate: + candidate_record = build_merge_train_batch_candidate_record( + candidate=candidate, + source=f"service:controller:candidate-plan:{request_trace_id}", + updated_at=recorded_at, + ) + candidate_store.write_merge_train_batch_candidate_record( + candidate_record + ) + result["merge_train_batch_candidate_record_id"] = ( + candidate_record.record_id + ) + driver_result = result elif path == "/v1/agent/write-intents/evaluate": intent_request = AgentWriteIntentRequest.model_validate(payload) intent_authz_action = authz_action_for_agent_write_intent(intent_request.intent) diff --git a/docs/merge-train-policy.md b/docs/merge-train-policy.md index 718b2d1..5decb0d 100644 --- a/docs/merge-train-policy.md +++ b/docs/merge-train-policy.md @@ -358,6 +358,18 @@ that ref in order. Observe mode records required-check state for the exact candidate SHA. Landing the original PRs remains a later PR-native phase with separate records. +The controller service endpoint +`POST /v1/work-graph/merge-train/controller/run-once` is the preferred operator +entrypoint for the full train. It accepts the same repository/base selector and +`mutate` flag as the Level 1 route, but chooses the next safe batch phase from +the latest DB-backed records. Repeated calls can drive an unstacked train through +candidate plan, build, observe, landing plan, and landing. For a same-repo +linear stack, repeated calls first plan and execute stack collapse, then admit +only the collapsed root PR into the same candidate/build/observe/landing path. +The controller does not introduce new live configuration or repo-specific +conditionals; it fails closed on missing policy, missing token, stale policy +digests, stale root heads, and the existing batch landing guards. + The batch-landing service endpoint `POST /v1/work-graph/merge-train/batch-landing/run-once` owns that PR-native landing phase. It accepts `mode: plan` with a passed candidate record id and diff --git a/docs/service-boundary.md b/docs/service-boundary.md index dc41138..db69cea 100644 --- a/docs/service-boundary.md +++ b/docs/service-boundary.md @@ -230,6 +230,18 @@ performs no GitHub reads and no storage writes. Schedulers use this route to pace calls into `run-once`; execution still re-reads GitHub before any dry-run or mutation. +`POST /v1/work-graph/merge-train/controller/run-once` is the operator-facing +one-action controller for the full batch train. Request payloads name +`repository`, `base_branch`, and optional `mutate`; the route uses the same +policy, authorization, and GitHub token boundary as the lower-level merge-train +routes. Each call advances at most one safe phase from DB-backed records and +fresh GitHub evidence: plan stack collapse, execute stack collapse, admit the +collapsed root PR, plan/build/observe a batch candidate, plan landing, or land +the original PRs. Dry-run calls return the next controller action without +writing records or mutating GitHub. Mutation calls reuse the same persisted +candidate, stack-collapse, and landing-plan records as the phase-specific +routes, and reject stale policy digests before advancing stored records. + `POST /v1/work-graph/merge-train/batch-candidate/run-once` executes one policy-backed batch-candidate phase for a requested repository/base branch. The route accepts `mode: plan`, `mode: build`, or `mode: observe`. Plan mode reads a diff --git a/tests/test_service.py b/tests/test_service.py index e1e95ab..95334b9 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -1765,6 +1765,281 @@ def test_merge_train_batch_candidate_service_plans_stack_collapse_first(self) -> self.assertEqual(stack_records[0].plan.mutations[0].parent_pull_request_number, 1) self.assertEqual(candidate_records, ()) + def test_merge_train_controller_advances_unstacked_batch_flow(self) -> None: + with ( + TemporaryDirectory() as temporary_directory_name, + patch.dict("os.environ", {"GH_TOKEN": "token"}, clear=True), + ): + state_dir = Path(temporary_directory_name) / "state" + _seed_merge_train_policy(state_dir) + app = create_launchplane_service_app( + state_dir=state_dir, + verifier=_StubVerifier(_merge_train_service_identity()), + authz_policy=_merge_train_service_policy(), + control_plane_root_path=Path(temporary_directory_name), + ) + with ( + patch( + "control_plane.service.GitHubMergeTrainSnapshotReader", + _FakeMergeTrainSnapshotReader, + ), + patch("control_plane.service.GitHubMergeTrainClient", _FakeMergeTrainGitHubClient), + ): + plan_status, plan_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + build_status, build_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + observe_status, observe_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + landing_plan_status, landing_plan_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + land_status, land_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + + self.assertEqual(plan_status, 202) + self.assertEqual(plan_payload["result"]["controller_action"], "plan_candidate") + self.assertEqual(build_status, 202) + self.assertEqual(build_payload["result"]["controller_action"], "build_candidate") + self.assertEqual(build_payload["result"]["candidate"]["status"], "ready_for_checks") + self.assertEqual(observe_status, 202) + self.assertEqual(observe_payload["result"]["controller_action"], "observe_candidate") + self.assertEqual(observe_payload["result"]["candidate"]["status"], "passed") + self.assertEqual(landing_plan_status, 202) + self.assertEqual(landing_plan_payload["result"]["controller_action"], "plan_landing") + self.assertEqual(land_status, 202) + self.assertEqual(land_payload["result"]["controller_action"], "land_batch") + self.assertEqual(land_payload["result"]["landing_plan"]["entries"][0]["status"], "merged") + + def test_merge_train_controller_advances_stacked_batch_flow(self) -> None: + with ( + TemporaryDirectory() as temporary_directory_name, + patch.dict("os.environ", {"GH_TOKEN": "token"}, clear=True), + ): + state_dir = Path(temporary_directory_name) / "state" + _seed_merge_train_policy(state_dir) + app = create_launchplane_service_app( + state_dir=state_dir, + verifier=_StubVerifier(_merge_train_service_identity()), + authz_policy=_merge_train_service_policy(), + control_plane_root_path=Path(temporary_directory_name), + ) + with ( + patch( + "control_plane.service.GitHubMergeTrainSnapshotReader", + _FakeStackedMergeTrainSnapshotReader, + ), + patch("control_plane.service.GitHubMergeTrainClient", _FakeMergeTrainGitHubClient), + ): + plan_status, plan_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + execute_status, execute_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + with ( + patch( + "control_plane.service.GitHubMergeTrainSnapshotReader", + _FakeCollapsedRootStackedMergeTrainSnapshotReader, + ), + patch("control_plane.service.GitHubMergeTrainClient", _FakeMergeTrainGitHubClient), + ): + admit_status, admit_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + build_status, build_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + observe_status, observe_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + landing_plan_status, landing_plan_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + land_status, land_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + + self.assertEqual(plan_status, 202) + self.assertEqual(plan_payload["result"]["controller_action"], "plan_stack_collapse") + self.assertEqual(execute_status, 202) + self.assertEqual(execute_payload["result"]["controller_action"], "execute_stack_collapse") + self.assertEqual(admit_status, 202) + self.assertEqual(admit_payload["result"]["controller_action"], "admit_collapsed_root") + self.assertEqual( + admit_payload["result"]["candidate"]["entries"][0]["pull_request_number"], 1 + ) + self.assertEqual(build_status, 202) + self.assertEqual(build_payload["result"]["controller_action"], "build_candidate") + self.assertEqual(observe_status, 202) + self.assertEqual(observe_payload["result"]["candidate"]["status"], "passed") + self.assertEqual(landing_plan_status, 202) + self.assertEqual(landing_plan_payload["result"]["controller_action"], "plan_landing") + self.assertEqual(land_status, 202) + self.assertEqual(land_payload["result"]["controller_action"], "land_batch") + self.assertEqual(land_payload["result"]["stack_collapse_plan"]["status"], "ready_for_train") + self.assertEqual( + land_payload["result"]["stack_collapse_plan"]["child_dispositions"][0]["status"], + "closed", + ) + + def test_merge_train_controller_rejects_candidate_after_policy_digest_changes( + self, + ) -> None: + with ( + TemporaryDirectory() as temporary_directory_name, + patch.dict("os.environ", {"GH_TOKEN": "token"}, clear=True), + ): + state_dir = Path(temporary_directory_name) / "state" + _seed_merge_train_policy(state_dir) + app = create_launchplane_service_app( + state_dir=state_dir, + verifier=_StubVerifier(_merge_train_service_identity()), + authz_policy=_merge_train_service_policy(), + control_plane_root_path=Path(temporary_directory_name), + ) + with patch( + "control_plane.service.GitHubMergeTrainSnapshotReader", + _FakeMergeTrainSnapshotReader, + ): + _, plan_payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + _seed_merge_train_policy( + state_dir, + policy=MergeTrainPolicyRecord( + record_id="merge-train-policy-20260513T220000Z-test", + status="active", + source="test", + updated_at="2026-05-13T22:00:00Z", + policy=build_test_merge_train_policy(repository="cbusillo/codex-skills"), + ), + ) + with patch("control_plane.service.GitHubMergeTrainClient", _FakeMergeTrainGitHubClient): + status_code, payload = _invoke_app( + app, + method="POST", + path="/v1/work-graph/merge-train/controller/run-once", + payload={ + "schema_version": 1, + "repository": "cbusillo/sellyouroutboard", + "base_branch": "main", + "mutate": True, + }, + ) + + self.assertIn("merge_train_batch_candidate_record_id", plan_payload["records"]) + self.assertEqual(status_code, 400) + self.assertEqual(payload["error"]["code"], "invalid_request") + def test_merge_train_stack_collapse_service_executes_existing_plan_record(self) -> None: with ( TemporaryDirectory() as temporary_directory_name,