Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions internal/orchestrator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9225,6 +9225,8 @@ func (s *Service) replanLoopWithOptions(ctx context.Context, task core.Task, ini
stalledTurns := 0
limitUnproductiveTurns := !taskIsBroadObjective(task)
currentWorkPlan := initial.WorkPlan
lastCompletionRejection := ""
repeatedCompletionRejections := 0
for {
stateSnapshot, err := s.store.Snapshot(ctx)
if err != nil {
Expand Down Expand Up @@ -9312,6 +9314,11 @@ func (s *Service) replanLoopWithOptions(ctx context.Context, task core.Task, ini
}
recoveryHint = reason
stalledTurns++
lastCompletionRejection, repeatedCompletionRejections = updateRepeatedReplanCompletionRejection(lastCompletionRejection, repeatedCompletionRejections, reason)
if repeatedCompletionRejections >= maxConsecutiveUnproductiveReplanTurns {
s.waitForRepeatedReplanCompletionRejection(ctx, task, turn, reason)
return false, "", results
}
continue
}
if reason := broadObjectiveWorkPlanCompletionBlockReason(task, currentWorkPlan); reason != "" {
Expand All @@ -9321,6 +9328,11 @@ func (s *Service) replanLoopWithOptions(ctx context.Context, task core.Task, ini
}
recoveryHint = reason
stalledTurns++
lastCompletionRejection, repeatedCompletionRejections = updateRepeatedReplanCompletionRejection(lastCompletionRejection, repeatedCompletionRejections, reason)
if repeatedCompletionRejections >= maxConsecutiveUnproductiveReplanTurns {
s.waitForRepeatedReplanCompletionRejection(ctx, task, turn, reason)
return false, "", results
}
continue
}
return true, decision.Rationale, results
Expand All @@ -9332,6 +9344,11 @@ func (s *Service) replanLoopWithOptions(ctx context.Context, task core.Task, ini
}
recoveryHint = reason
stalledTurns++
lastCompletionRejection, repeatedCompletionRejections = updateRepeatedReplanCompletionRejection(lastCompletionRejection, repeatedCompletionRejections, reason)
if repeatedCompletionRejections >= maxConsecutiveUnproductiveReplanTurns {
s.waitForRepeatedReplanCompletionRejection(ctx, task, turn, reason)
return false, "", results
}
continue
}
if reason := broadObjectiveWorkPlanCompletionBlockReason(task, currentWorkPlan); reason != "" {
Expand All @@ -9341,6 +9358,11 @@ func (s *Service) replanLoopWithOptions(ctx context.Context, task core.Task, ini
}
recoveryHint = reason
stalledTurns++
lastCompletionRejection, repeatedCompletionRejections = updateRepeatedReplanCompletionRejection(lastCompletionRejection, repeatedCompletionRejections, reason)
if repeatedCompletionRejections >= maxConsecutiveUnproductiveReplanTurns {
s.waitForRepeatedReplanCompletionRejection(ctx, task, turn, reason)
return false, "", results
}
continue
}
if err := s.finishObjectiveFromReplan(ctx, task, decision); err != nil {
Expand Down Expand Up @@ -9474,6 +9496,13 @@ func (s *Service) replanLoopWithOptions(ctx context.Context, task core.Task, ini
}
}

func updateRepeatedReplanCompletionRejection(last string, count int, reason string) (string, int) {
if reason == last {
return last, count + 1
}
return reason, 1
}

func nextReplanTurn(snapshot core.Snapshot, taskID string) int {
turn := 1
for _, event := range snapshot.Events {
Expand Down Expand Up @@ -9533,6 +9562,9 @@ func unpublishedCandidateCompletionBlockReasonFromSnapshot(snapshot core.Snapsho
if result.Status != core.WorkerSucceeded || !resultHasCandidateChanges(result) {
continue
}
if isReviewOrEvaluatorFollowUp(result) {
continue
}
workerID := strings.TrimSpace(result.WorkerID)
if workerID != "" && published[workerID] {
continue
Expand All @@ -9546,6 +9578,18 @@ func unpublishedCandidateCompletionBlockReasonFromSnapshot(snapshot core.Snapsho
return ""
}

func (s *Service) waitForRepeatedReplanCompletionRejection(ctx context.Context, task core.Task, turn int, reason string) {
replanErr := fmt.Errorf("dynamic replanning repeatedly selected blocked completion: %s", reason)
s.waitForReplanFallback(ctx, task, turn, replanErr, replanFallbackConfig{
CompleteReasonPrefix: "fallback completion after repeated blocked completion",
CompleteMessage: "Dynamic replanning repeatedly selected a blocked completion, so aged paused for explicit steering.",
WaitRationale: "dynamic replanning repeatedly selected blocked completion",
WaitQuestion: "Dynamic replanning repeatedly selected a blocked completion. Provide steering so aged can continue with explicit work items/actions.",
WaitReason: "dynamic_replan_completion_rejected",
WaitObjective: "Dynamic replanning repeatedly selected a blocked completion and needs user steering before continuing.",
}, reason)
}

func (s *Service) unpublishedCandidateCompletionBlockReason(ctx context.Context, taskID string, results []WorkerTurnResult) string {
snapshot, err := s.store.Snapshot(ctx)
if err != nil {
Expand Down
121 changes: 119 additions & 2 deletions internal/orchestrator/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,115 @@ func TestServiceRejectsBroadObjectiveFinishWithUnpublishedCandidate(t *testing.T
}
}

func TestServiceAllowsBroadObjectiveFinishWithReviewGateCandidateDiff(t *testing.T) {
ctx := context.Background()
store := openTestStore(t)
defer store.Close()

task := core.Task{
ID: "task-review-gate-candidate-diff",
Title: "Broad objective",
Prompt: "Modernize the UI.",
Metadata: core.MustJSON(map[string]any{"objectiveMode": "broad"}),
}
if _, err := store.Append(ctx, core.Event{
Type: core.EventTaskCreated,
TaskID: task.ID,
Payload: core.MustJSON(map[string]any{
"title": task.Title,
"prompt": task.Prompt,
"metadata": task.Metadata,
}),
}); err != nil {
t.Fatal(err)
}

brain := &replanningBrain{decisions: []ReplanDecision{{
Action: "finish_objective",
Rationale: "all objective slices are merged",
Message: "done",
}}}
service := NewServiceWithWorkspaceManager(store, brain, map[string]worker.Runner{}, t.TempDir(), fakeWorkspaceManager{})
service.replanLoop(ctx, task, Plan{}, []WorkerTurnResult{{
WorkerID: "review-worker",
Status: core.WorkerSucceeded,
Kind: "codex",
Role: "review",
SpawnID: "code-review-gate",
BaseWorkerID: "worker-ui",
Summary: "No findings.",
Changes: WorkspaceChanges{
Dirty: true,
ChangedFiles: []WorkspaceChangedFile{{Path: "web/src/styles.css", Status: "modified"}},
PublishDiff: "diff --git a/web/src/styles.css b/web/src/styles.css\n",
},
}})

snapshot := waitForTaskStatus(t, store, task.ID, core.TaskSucceeded)
if hasTaskAction(snapshot.Events, task.ID, "replan_completion_rejected", "rejected") {
t.Fatalf("review gate candidate diff should not block objective finish:\n%s", taskActionPayloads(snapshot.Events, task.ID))
}
if !hasTaskAction(snapshot.Events, task.ID, "finish_objective", "completed") {
t.Fatalf("missing finish_objective action:\n%s", taskActionPayloads(snapshot.Events, task.ID))
}
}

func TestServicePausesRepeatedBroadObjectiveCompletionRejection(t *testing.T) {
ctx := context.Background()
store := openTestStore(t)
defer store.Close()

task := core.Task{
ID: "task-repeated-completion-rejection",
Title: "Broad objective",
Prompt: "Modernize the UI.",
Metadata: core.MustJSON(map[string]any{"objectiveMode": "broad"}),
}
if _, err := store.Append(ctx, core.Event{
Type: core.EventTaskCreated,
TaskID: task.ID,
Payload: core.MustJSON(map[string]any{
"title": task.Title,
"prompt": task.Prompt,
"metadata": task.Metadata,
}),
}); err != nil {
t.Fatal(err)
}

decisions := make([]ReplanDecision, maxConsecutiveUnproductiveReplanTurns+4)
for i := range decisions {
decisions[i] = ReplanDecision{
Action: "finish_objective",
Rationale: "the objective is done",
Message: "done",
}
}
brain := &replanningBrain{decisions: decisions}
service := NewServiceWithWorkspaceManager(store, brain, map[string]worker.Runner{}, t.TempDir(), fakeWorkspaceManager{})
service.replanLoop(ctx, task, Plan{}, []WorkerTurnResult{{
WorkerID: "worker-ui",
Status: core.WorkerSucceeded,
Kind: "codex",
Summary: "implemented UI changes",
Changes: WorkspaceChanges{
Dirty: true,
ChangedFiles: []WorkspaceChangedFile{{Path: "web/src/styles.css", Status: "modified"}},
},
}})

snapshot := waitForTaskStatus(t, store, task.ID, core.TaskWaiting)
if len(brain.states) != maxConsecutiveUnproductiveReplanTurns {
t.Fatalf("replan states = %d, want %d", len(brain.states), maxConsecutiveUnproductiveReplanTurns)
}
if countTaskActions(snapshot.Events, task.ID, "replan_completion_rejected", "rejected") != maxConsecutiveUnproductiveReplanTurns {
t.Fatalf("rejected completion count = %d, want %d:\n%s", countTaskActions(snapshot.Events, task.ID, "replan_completion_rejected", "rejected"), maxConsecutiveUnproductiveReplanTurns, taskActionPayloads(snapshot.Events, task.ID))
}
if !eventPayloadContains(snapshot.Events, core.EventApprovalNeeded, task.ID, "dynamic_replan_completion_rejected") {
t.Fatalf("missing repeated rejection user-action event:\n%s", taskActionPayloads(snapshot.Events, task.ID))
}
}

func TestServiceRejectsBroadObjectiveCompletionWithIncompleteWorkPlan(t *testing.T) {
ctx := context.Background()
store := openTestStore(t)
Expand Down Expand Up @@ -13629,17 +13738,25 @@ func TestServiceRunsSpawnedWorkersFromDynamicReplan(t *testing.T) {
t.Fatal(err)
}

released := false
defer func() {
if !released {
close(release)
}
}()
got := map[string]bool{}
deadline := time.After(500 * time.Millisecond)
deadline := time.After(3 * time.Second)
for len(got) < 2 {
select {
case kind := <-started:
got[kind] = true
case <-deadline:
t.Fatalf("replanned spawned workers did not start in parallel; started = %+v", got)
snapshot, _ := store.Snapshot(ctx)
t.Fatalf("replanned spawned workers did not start in parallel; started = %+v tasks=%+v workItems=%+v eventCount=%d taskActions=%s", got, snapshot.Tasks, snapshot.WorkItems, len(snapshot.Events), taskActionPayloads(snapshot.Events, task.ID))
}
}
close(release)
released = true

snapshot := waitForTaskStatus(t, store, task.ID, core.TaskSucceeded)
if !hasWorkerCreated(snapshot.Events, task.ID, "reviewer") || !hasWorkerCreated(snapshot.Events, task.ID, "tester") {
Expand Down
Loading