From 5aee3c0e0c168fb1802860fbaed0707c2e27a0a4 Mon Sep 17 00:00:00 2001 From: Nathan Whitaker Date: Thu, 11 Jun 2026 09:53:44 -0700 Subject: [PATCH] Stop broad objective completion rejection loops Ignore review/evaluator gate worker results when checking for unpublished implementation candidates before finishing a broad objective. Add a repeated backend completion-rejection brake so a broad objective pauses for steering instead of repeatedly invoking the replanner on the same blocked finish decision. Harden the dynamic spawned-worker concurrency test so CI load does not expire the assertion before replanning reaches the spawned workers. --- internal/orchestrator/service.go | 44 ++++++++++ internal/orchestrator/service_test.go | 121 +++++++++++++++++++++++++- 2 files changed, 163 insertions(+), 2 deletions(-) diff --git a/internal/orchestrator/service.go b/internal/orchestrator/service.go index a9c57df..fa11e2e 100644 --- a/internal/orchestrator/service.go +++ b/internal/orchestrator/service.go @@ -9225,6 +9225,8 @@ func (s *Service) replanLoopWithOptions(ctx context.Context, task core.Task, ini stalledTurns := 0 limitUnproductiveTurns := !taskIsBroadObjective(task) currentWorkPlan := initial.WorkPlan + lastCompletionRejection := "" + repeatedCompletionRejections := 0 for { stateSnapshot, err := s.store.Snapshot(ctx) if err != nil { @@ -9312,6 +9314,11 @@ func (s *Service) replanLoopWithOptions(ctx context.Context, task core.Task, ini } recoveryHint = reason stalledTurns++ + lastCompletionRejection, repeatedCompletionRejections = updateRepeatedReplanCompletionRejection(lastCompletionRejection, repeatedCompletionRejections, reason) + if repeatedCompletionRejections >= maxConsecutiveUnproductiveReplanTurns { + s.waitForRepeatedReplanCompletionRejection(ctx, task, turn, reason) + return false, "", results + } continue } if reason := broadObjectiveWorkPlanCompletionBlockReason(task, currentWorkPlan); reason != "" { @@ -9321,6 +9328,11 @@ func (s *Service) replanLoopWithOptions(ctx context.Context, task core.Task, ini } recoveryHint = reason stalledTurns++ + lastCompletionRejection, repeatedCompletionRejections = updateRepeatedReplanCompletionRejection(lastCompletionRejection, repeatedCompletionRejections, reason) + if repeatedCompletionRejections >= maxConsecutiveUnproductiveReplanTurns { + s.waitForRepeatedReplanCompletionRejection(ctx, task, turn, reason) + return false, "", results + } continue } return true, decision.Rationale, results @@ -9332,6 +9344,11 @@ func (s *Service) replanLoopWithOptions(ctx context.Context, task core.Task, ini } recoveryHint = reason stalledTurns++ + lastCompletionRejection, repeatedCompletionRejections = updateRepeatedReplanCompletionRejection(lastCompletionRejection, repeatedCompletionRejections, reason) + if repeatedCompletionRejections >= maxConsecutiveUnproductiveReplanTurns { + s.waitForRepeatedReplanCompletionRejection(ctx, task, turn, reason) + return false, "", results + } continue } if reason := broadObjectiveWorkPlanCompletionBlockReason(task, currentWorkPlan); reason != "" { @@ -9341,6 +9358,11 @@ func (s *Service) replanLoopWithOptions(ctx context.Context, task core.Task, ini } recoveryHint = reason stalledTurns++ + lastCompletionRejection, repeatedCompletionRejections = updateRepeatedReplanCompletionRejection(lastCompletionRejection, repeatedCompletionRejections, reason) + if repeatedCompletionRejections >= maxConsecutiveUnproductiveReplanTurns { + s.waitForRepeatedReplanCompletionRejection(ctx, task, turn, reason) + return false, "", results + } continue } if err := s.finishObjectiveFromReplan(ctx, task, decision); err != nil { @@ -9474,6 +9496,13 @@ func (s *Service) replanLoopWithOptions(ctx context.Context, task core.Task, ini } } +func updateRepeatedReplanCompletionRejection(last string, count int, reason string) (string, int) { + if reason == last { + return last, count + 1 + } + return reason, 1 +} + func nextReplanTurn(snapshot core.Snapshot, taskID string) int { turn := 1 for _, event := range snapshot.Events { @@ -9533,6 +9562,9 @@ func unpublishedCandidateCompletionBlockReasonFromSnapshot(snapshot core.Snapsho if result.Status != core.WorkerSucceeded || !resultHasCandidateChanges(result) { continue } + if isReviewOrEvaluatorFollowUp(result) { + continue + } workerID := strings.TrimSpace(result.WorkerID) if workerID != "" && published[workerID] { continue @@ -9546,6 +9578,18 @@ func unpublishedCandidateCompletionBlockReasonFromSnapshot(snapshot core.Snapsho return "" } +func (s *Service) waitForRepeatedReplanCompletionRejection(ctx context.Context, task core.Task, turn int, reason string) { + replanErr := fmt.Errorf("dynamic replanning repeatedly selected blocked completion: %s", reason) + s.waitForReplanFallback(ctx, task, turn, replanErr, replanFallbackConfig{ + CompleteReasonPrefix: "fallback completion after repeated blocked completion", + CompleteMessage: "Dynamic replanning repeatedly selected a blocked completion, so aged paused for explicit steering.", + WaitRationale: "dynamic replanning repeatedly selected blocked completion", + WaitQuestion: "Dynamic replanning repeatedly selected a blocked completion. Provide steering so aged can continue with explicit work items/actions.", + WaitReason: "dynamic_replan_completion_rejected", + WaitObjective: "Dynamic replanning repeatedly selected a blocked completion and needs user steering before continuing.", + }, reason) +} + func (s *Service) unpublishedCandidateCompletionBlockReason(ctx context.Context, taskID string, results []WorkerTurnResult) string { snapshot, err := s.store.Snapshot(ctx) if err != nil { diff --git a/internal/orchestrator/service_test.go b/internal/orchestrator/service_test.go index c1e8268..12b8811 100644 --- a/internal/orchestrator/service_test.go +++ b/internal/orchestrator/service_test.go @@ -149,6 +149,115 @@ func TestServiceRejectsBroadObjectiveFinishWithUnpublishedCandidate(t *testing.T } } +func TestServiceAllowsBroadObjectiveFinishWithReviewGateCandidateDiff(t *testing.T) { + ctx := context.Background() + store := openTestStore(t) + defer store.Close() + + task := core.Task{ + ID: "task-review-gate-candidate-diff", + Title: "Broad objective", + Prompt: "Modernize the UI.", + Metadata: core.MustJSON(map[string]any{"objectiveMode": "broad"}), + } + if _, err := store.Append(ctx, core.Event{ + Type: core.EventTaskCreated, + TaskID: task.ID, + Payload: core.MustJSON(map[string]any{ + "title": task.Title, + "prompt": task.Prompt, + "metadata": task.Metadata, + }), + }); err != nil { + t.Fatal(err) + } + + brain := &replanningBrain{decisions: []ReplanDecision{{ + Action: "finish_objective", + Rationale: "all objective slices are merged", + Message: "done", + }}} + service := NewServiceWithWorkspaceManager(store, brain, map[string]worker.Runner{}, t.TempDir(), fakeWorkspaceManager{}) + service.replanLoop(ctx, task, Plan{}, []WorkerTurnResult{{ + WorkerID: "review-worker", + Status: core.WorkerSucceeded, + Kind: "codex", + Role: "review", + SpawnID: "code-review-gate", + BaseWorkerID: "worker-ui", + Summary: "No findings.", + Changes: WorkspaceChanges{ + Dirty: true, + ChangedFiles: []WorkspaceChangedFile{{Path: "web/src/styles.css", Status: "modified"}}, + PublishDiff: "diff --git a/web/src/styles.css b/web/src/styles.css\n", + }, + }}) + + snapshot := waitForTaskStatus(t, store, task.ID, core.TaskSucceeded) + if hasTaskAction(snapshot.Events, task.ID, "replan_completion_rejected", "rejected") { + t.Fatalf("review gate candidate diff should not block objective finish:\n%s", taskActionPayloads(snapshot.Events, task.ID)) + } + if !hasTaskAction(snapshot.Events, task.ID, "finish_objective", "completed") { + t.Fatalf("missing finish_objective action:\n%s", taskActionPayloads(snapshot.Events, task.ID)) + } +} + +func TestServicePausesRepeatedBroadObjectiveCompletionRejection(t *testing.T) { + ctx := context.Background() + store := openTestStore(t) + defer store.Close() + + task := core.Task{ + ID: "task-repeated-completion-rejection", + Title: "Broad objective", + Prompt: "Modernize the UI.", + Metadata: core.MustJSON(map[string]any{"objectiveMode": "broad"}), + } + if _, err := store.Append(ctx, core.Event{ + Type: core.EventTaskCreated, + TaskID: task.ID, + Payload: core.MustJSON(map[string]any{ + "title": task.Title, + "prompt": task.Prompt, + "metadata": task.Metadata, + }), + }); err != nil { + t.Fatal(err) + } + + decisions := make([]ReplanDecision, maxConsecutiveUnproductiveReplanTurns+4) + for i := range decisions { + decisions[i] = ReplanDecision{ + Action: "finish_objective", + Rationale: "the objective is done", + Message: "done", + } + } + brain := &replanningBrain{decisions: decisions} + service := NewServiceWithWorkspaceManager(store, brain, map[string]worker.Runner{}, t.TempDir(), fakeWorkspaceManager{}) + service.replanLoop(ctx, task, Plan{}, []WorkerTurnResult{{ + WorkerID: "worker-ui", + Status: core.WorkerSucceeded, + Kind: "codex", + Summary: "implemented UI changes", + Changes: WorkspaceChanges{ + Dirty: true, + ChangedFiles: []WorkspaceChangedFile{{Path: "web/src/styles.css", Status: "modified"}}, + }, + }}) + + snapshot := waitForTaskStatus(t, store, task.ID, core.TaskWaiting) + if len(brain.states) != maxConsecutiveUnproductiveReplanTurns { + t.Fatalf("replan states = %d, want %d", len(brain.states), maxConsecutiveUnproductiveReplanTurns) + } + if countTaskActions(snapshot.Events, task.ID, "replan_completion_rejected", "rejected") != maxConsecutiveUnproductiveReplanTurns { + t.Fatalf("rejected completion count = %d, want %d:\n%s", countTaskActions(snapshot.Events, task.ID, "replan_completion_rejected", "rejected"), maxConsecutiveUnproductiveReplanTurns, taskActionPayloads(snapshot.Events, task.ID)) + } + if !eventPayloadContains(snapshot.Events, core.EventApprovalNeeded, task.ID, "dynamic_replan_completion_rejected") { + t.Fatalf("missing repeated rejection user-action event:\n%s", taskActionPayloads(snapshot.Events, task.ID)) + } +} + func TestServiceRejectsBroadObjectiveCompletionWithIncompleteWorkPlan(t *testing.T) { ctx := context.Background() store := openTestStore(t) @@ -13629,17 +13738,25 @@ func TestServiceRunsSpawnedWorkersFromDynamicReplan(t *testing.T) { t.Fatal(err) } + released := false + defer func() { + if !released { + close(release) + } + }() got := map[string]bool{} - deadline := time.After(500 * time.Millisecond) + deadline := time.After(3 * time.Second) for len(got) < 2 { select { case kind := <-started: got[kind] = true case <-deadline: - t.Fatalf("replanned spawned workers did not start in parallel; started = %+v", got) + snapshot, _ := store.Snapshot(ctx) + t.Fatalf("replanned spawned workers did not start in parallel; started = %+v tasks=%+v workItems=%+v eventCount=%d taskActions=%s", got, snapshot.Tasks, snapshot.WorkItems, len(snapshot.Events), taskActionPayloads(snapshot.Events, task.ID)) } } close(release) + released = true snapshot := waitForTaskStatus(t, store, task.ID, core.TaskSucceeded) if !hasWorkerCreated(snapshot.Events, task.ID, "reviewer") || !hasWorkerCreated(snapshot.Events, task.ID, "tester") {