From 94c37f27bae6ace55595f7d3361bd2c16107b973 Mon Sep 17 00:00:00 2001 From: Nathan Whitaker Date: Fri, 12 Jun 2026 16:01:27 -0700 Subject: [PATCH 1/3] Re-engage idle objectives so a paused manager doesn't stall forever A manager can pause mid-task ('done X, next I'll do Y') and end its turn without acting, leaving the objective ACTIVE with nothing driving it. The only manager nudges were reactive (worker-finished, PR-merged) and were dropped if the manager session was already terminal. Add a supervisor tick to the scheduler loop. For each active objective it re-pokes the live manager ONLY when no worker is making progress -- a non-terminal worker (even one on a long quiet build) counts as activity, so a progressing objective is left alone. A staleness check skips a manager that's actively streaming output, and a per-objective cooldown keeps it from re-poking every tick. The poke carries an inline state snapshot (worker statuses + PRs) since the manager has no read tools of its own. Live-manager only; respawning a terminal manager is left for a follow-up. --- internal/orch/driver.go | 4 ++ internal/orch/orch.go | 5 ++ internal/orch/orch_test.go | 46 +++++++++++++ internal/orch/supervisor.go | 127 ++++++++++++++++++++++++++++++++++++ 4 files changed, 182 insertions(+) create mode 100644 internal/orch/supervisor.go diff --git a/internal/orch/driver.go b/internal/orch/driver.go index 86c7e44..5e3f876 100644 --- a/internal/orch/driver.go +++ b/internal/orch/driver.go @@ -52,6 +52,10 @@ func (s *Scheduler) Run(ctx context.Context) { defer t.Stop() for { _, _ = s.Tick(ctx) + // Re-engage any active objective that has gone idle (no worker making + // progress) so a paused manager continues instead of stalling forever. + // The poke's own cooldown keeps this from firing every tick. + s.o.SuperviseIdleObjectives(ctx) select { case <-ctx.Done(): return diff --git a/internal/orch/orch.go b/internal/orch/orch.go index 5fd188c..038aa97 100644 --- a/internal/orch/orch.go +++ b/internal/orch/orch.go @@ -7,6 +7,7 @@ package orch import ( "errors" "sync" + "time" "github.com/nathanwhit/orcha/internal/agent" "github.com/nathanwhit/orcha/internal/forge" @@ -76,6 +77,9 @@ type Orchestrator struct { tunnels map[string]*mcpTunnel // reverse MCP tunnels keyed by target id adoptMu sync.Mutex // serializes PR adoption so concurrent scans can't double-record a PR + + pokeMu sync.Mutex // guards lastPoke + lastPoke map[string]time.Time // per-objective last supervisor re-poke time (cooldown) } // SetNotify installs a hook called whenever schedulable state changes (a @@ -116,6 +120,7 @@ func New(st *store.Store, cfg Config) *Orchestrator { guards: map[string]*guardState{}, runs: map[string]*run{}, tunnels: map[string]*mcpTunnel{}, + lastPoke: map[string]time.Time{}, } } diff --git a/internal/orch/orch_test.go b/internal/orch/orch_test.go index d5a02a0..3dcb66d 100644 --- a/internal/orch/orch_test.go +++ b/internal/orch/orch_test.go @@ -1448,6 +1448,52 @@ func TestConflictingPR_SpawnsRebaseFollowup(t *testing.T) { } } +func TestSupervisor_RepokesIdleManagerOnlyWhenNoWorkers(t *testing.T) { + o, st := newTestOrch(t) + addTarget(t, st, "local", model.TargetLocal, 4) + o.RegisterProvider(agent.NewFake(model.AgentClaude, true, nil)) + + obj, mgr, err := o.CreateObjective(NewObjectiveSpec{Title: "x", Prompt: "p"}) + if err != nil { + t.Fatalf("create objective: %v", err) + } + + // A freshly-active manager (UpdatedAt just now) is NOT yet considered stalled. + if pokes := o.idleManagersToPoke(st.Now()); len(pokes) != 0 { + t.Fatalf("a just-active manager should not be poked, got %d", len(pokes)) + } + + // Once it has been quiet past the idle threshold and no worker is running, + // the idle manager is poked. + future := st.Now().Add(superviseIdleAfter + time.Minute) + pokes := o.idleManagersToPoke(future) + if len(pokes) != 1 || pokes[0].ID != mgr.ID { + t.Fatalf("expected idle manager %s to be poked, got %v", mgr.ID, pokes) + } + + // Cooldown: an immediate re-check does not poke again. + if again := o.idleManagersToPoke(future); len(again) != 0 { + t.Fatalf("cooldown should suppress a second poke, got %d", len(again)) + } + // After the cooldown elapses, an objective still idle is poked again. + later := future.Add(supervisePokeCooldown + time.Second) + if again := o.idleManagersToPoke(later); len(again) != 1 { + t.Fatalf("after cooldown, a still-idle manager should be poked again, got %d", len(again)) + } + + // With an active worker, the objective is making progress -> never poked, + // even long after the manager went quiet. + w := &model.Session{ObjectiveID: obj.ID, Role: model.RoleImplementer, + Agent: model.AgentClaude, Status: model.SessionRunning} + if err := st.CreateSession(w); err != nil { + t.Fatalf("create worker: %v", err) + } + evenLater := later.Add(supervisePokeCooldown + time.Second) + if again := o.idleManagersToPoke(evenLater); len(again) != 0 { + t.Fatalf("an objective with an active worker must not be poked, got %d", len(again)) + } +} + func TestFailingChecksPR_SpawnsCIFollowup(t *testing.T) { o, st := newTestOrch(t) addTarget(t, st, "local", model.TargetLocal, 4) diff --git a/internal/orch/supervisor.go b/internal/orch/supervisor.go new file mode 100644 index 0000000..4030c6e --- /dev/null +++ b/internal/orch/supervisor.go @@ -0,0 +1,127 @@ +package orch + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/nathanwhit/orcha/internal/model" +) + +// superviseIdleAfter is how long a manager must be quiet (no progress) before the +// supervisor considers the objective stalled. A manager actively producing output +// advances its UpdatedAt via streamed progress, so this only trips on a manager +// that genuinely paused. +var superviseIdleAfter = 90 * time.Second + +// supervisePokeCooldown is the minimum gap between re-pokes of the same objective, +// so a manager that ignores a poke is nudged again at most once per cooldown +// rather than every scheduler tick. +var supervisePokeCooldown = 90 * time.Second + +// SuperviseIdleObjectives re-engages the manager of any active objective that has +// gone idle. "Idle" means no worker is making progress: a manager paused +// mid-task (or one that never got a nudge because the triggering event was +// dropped) would otherwise leave the objective ACTIVE forever. The supervisor +// pokes only when NO worker is active — a running worker, even one quietly +// grinding through a long build, is non-terminal and counts as activity, so an +// objective that is genuinely progressing is left alone. It pokes a LIVE manager +// only; re-spawning a manager that already went terminal is handled separately. +func (o *Orchestrator) SuperviseIdleObjectives(ctx context.Context) { + for _, mgr := range o.idleManagersToPoke(o.st.Now()) { + o.audit(mgr.ObjectiveID, mgr.ID, "manager_repoke", + "no active workers; re-engaging idle manager", nil) + _ = o.Steer(ctx, mgr.ID, o.idleManagerPokeMessage(mgr.ObjectiveID)) + } +} + +// idleManagersToPoke returns the live managers of active objectives that should +// be re-poked now, recording the poke time for each so the cooldown holds. It is +// separated from the Steer side effect so the decision logic is testable. +func (o *Orchestrator) idleManagersToPoke(now time.Time) []*model.Session { + objs, err := o.st.ListObjectives() + if err != nil { + return nil + } + var out []*model.Session + for _, obj := range objs { + if obj.Status != model.ObjectiveActive { + continue + } + mgr := o.activeManagerFor(obj.ID) + if mgr == nil { + continue // no live manager to poke (terminal-manager respawn is separate) + } + // The rule: only re-poke when NO worker is active. + if o.objectiveHasActiveWorkers(obj.ID, mgr.ID) { + continue + } + // Don't poke a manager that's actively working right now (mid tool-call, + // streaming output): its UpdatedAt is fresh. Only a manager quiet for + // superviseIdleAfter is treated as stalled. + if now.Sub(mgr.UpdatedAt) < superviseIdleAfter { + continue + } + if !o.markPoked(obj.ID, now) { + continue // poked too recently + } + out = append(out, mgr) + } + return out +} + +// markPoked records a re-poke for an objective and reports whether enough time +// has passed since the last one. It returns false (and does not update the time) +// when still within the cooldown. +func (o *Orchestrator) markPoked(objectiveID string, now time.Time) bool { + o.pokeMu.Lock() + defer o.pokeMu.Unlock() + if o.lastPoke == nil { + o.lastPoke = map[string]time.Time{} + } + if last, ok := o.lastPoke[objectiveID]; ok && now.Sub(last) < supervisePokeCooldown { + return false + } + o.lastPoke[objectiveID] = now + return true +} + +// idleManagerPokeMessage builds the re-poke text. Because the manager has no +// read tools, the poke carries the state snapshot inline: every worker's role, +// status, and latest summary, plus the objective's PRs. This is what lets a blind +// manager decide whether to spawn more work or finish. +func (o *Orchestrator) idleManagerPokeMessage(objectiveID string) string { + var b strings.Builder + b.WriteString("No workers are currently running for this objective, so nothing is making progress right now.\n\n") + + sessions, _ := o.st.ListSessionsByObjective(objectiveID) + b.WriteString("Worker status:\n") + wrote := false + for _, s := range sessions { + if s.Role == model.RoleManager { + continue + } + wrote = true + sum := firstNonEmpty(s.LatestSummary, s.CurrentActivity) + if sum == "" { + sum = "(no summary reported)" + } + fmt.Fprintf(&b, "- %q [%s] %s: %s\n", s.Title, s.Role, s.Status, sum) + } + if !wrote { + b.WriteString("- (no workers spawned yet)\n") + } + + if prs, _ := o.st.ListPRsByObjective(objectiveID); len(prs) > 0 { + b.WriteString("\nPull requests:\n") + for _, p := range prs { + fmt.Fprintf(&b, "- PR #%d [%s, checks=%s]: %q\n", p.Number, p.Status, p.ChecksState, p.Title) + } + } + + b.WriteString("\nDecide the next step now. If the objective is complete and every PR has merged, " + + "call mark_objective_done. If work remains, spawn the next worker. If you are blocked on a " + + "decision only the user can make, call ask_user. Do not end your turn without taking one of these actions.") + return b.String() +} From 6bb46f70d1e5c8d1097942476f8bf340a28fc536 Mon Sep 17 00:00:00 2001 From: Nathan Whitaker Date: Fri, 12 Jun 2026 16:09:48 -0700 Subject: [PATCH 2/3] Respawn a manager when an active objective loses its own The idle supervisor only re-poked a LIVE manager; if the manager session went terminal (completed/quiesced/failed) while the objective was still active, nothing could react to worker completions or PR events and the objective stalled forever. Extend the supervisor: when an active objective has no live manager and no worker is making progress, respawn one. The fresh manager gets the original prompt plus an inline state snapshot (worker statuses + PRs) framed as a resume, so it continues rather than restarting completed work or duplicating workers. Respawns are cooldown-gated like pokes and capped per objective (maxManagerSessions); past the cap the supervisor stops respawning and raises a user-facing question instead of burning tokens on a manager that keeps dying. Decision logic (superviseDecisions) is split from the side effects so poke / respawn / escalate are unit-tested directly. --- internal/orch/orch_test.go | 88 +++++++++++++-- internal/orch/supervisor.go | 214 ++++++++++++++++++++++++++++-------- 2 files changed, 246 insertions(+), 56 deletions(-) diff --git a/internal/orch/orch_test.go b/internal/orch/orch_test.go index 3dcb66d..d954fc3 100644 --- a/internal/orch/orch_test.go +++ b/internal/orch/orch_test.go @@ -1459,29 +1459,29 @@ func TestSupervisor_RepokesIdleManagerOnlyWhenNoWorkers(t *testing.T) { } // A freshly-active manager (UpdatedAt just now) is NOT yet considered stalled. - if pokes := o.idleManagersToPoke(st.Now()); len(pokes) != 0 { - t.Fatalf("a just-active manager should not be poked, got %d", len(pokes)) + if acts := o.superviseDecisions(st.Now()); len(acts) != 0 { + t.Fatalf("a just-active manager should not be acted on, got %d", len(acts)) } // Once it has been quiet past the idle threshold and no worker is running, // the idle manager is poked. future := st.Now().Add(superviseIdleAfter + time.Minute) - pokes := o.idleManagersToPoke(future) - if len(pokes) != 1 || pokes[0].ID != mgr.ID { - t.Fatalf("expected idle manager %s to be poked, got %v", mgr.ID, pokes) + acts := o.superviseDecisions(future) + if len(acts) != 1 || acts[0].kind != "poke" || acts[0].manager.ID != mgr.ID { + t.Fatalf("expected idle manager %s to be poked, got %+v", mgr.ID, acts) } - // Cooldown: an immediate re-check does not poke again. - if again := o.idleManagersToPoke(future); len(again) != 0 { + // Cooldown: an immediate re-check does not act again. + if again := o.superviseDecisions(future); len(again) != 0 { t.Fatalf("cooldown should suppress a second poke, got %d", len(again)) } // After the cooldown elapses, an objective still idle is poked again. later := future.Add(supervisePokeCooldown + time.Second) - if again := o.idleManagersToPoke(later); len(again) != 1 { - t.Fatalf("after cooldown, a still-idle manager should be poked again, got %d", len(again)) + if again := o.superviseDecisions(later); len(again) != 1 || again[0].kind != "poke" { + t.Fatalf("after cooldown, a still-idle manager should be poked again, got %+v", again) } - // With an active worker, the objective is making progress -> never poked, + // With an active worker, the objective is making progress -> never acted on, // even long after the manager went quiet. w := &model.Session{ObjectiveID: obj.ID, Role: model.RoleImplementer, Agent: model.AgentClaude, Status: model.SessionRunning} @@ -1489,8 +1489,72 @@ func TestSupervisor_RepokesIdleManagerOnlyWhenNoWorkers(t *testing.T) { t.Fatalf("create worker: %v", err) } evenLater := later.Add(supervisePokeCooldown + time.Second) - if again := o.idleManagersToPoke(evenLater); len(again) != 0 { - t.Fatalf("an objective with an active worker must not be poked, got %d", len(again)) + if again := o.superviseDecisions(evenLater); len(again) != 0 { + t.Fatalf("an objective with an active worker must not be acted on, got %d", len(again)) + } +} + +func TestSupervisor_RespawnsManagerThenEscalates(t *testing.T) { + o, st := newTestOrch(t) + addTarget(t, st, "local", model.TargetLocal, 4) + o.RegisterProvider(agent.NewFake(model.AgentClaude, true, nil)) + + obj, mgr, err := o.CreateObjective(NewObjectiveSpec{Title: "x", Prompt: "do the thing"}) + if err != nil { + t.Fatalf("create objective: %v", err) + } + + // The manager session terminated, the objective is still active, and no worker + // is running -> nothing can drive it. The supervisor decides to respawn. + terminateSession(t, st, mgr.ID, model.SessionSucceeded) + acts := o.superviseDecisions(st.Now()) + if len(acts) != 1 || acts[0].kind != "respawn" { + t.Fatalf("a manager-less active objective should respawn, got %+v", acts) + } + + // Executing it brings up a fresh live manager carrying the resume context. + o.respawnManager(acts[0]) + live := o.activeManagerFor(obj.ID) + if live == nil { + t.Fatal("respawn should create a live manager") + } + if live.Title != "Manager (resumed)" || !strings.Contains(live.Goal, "do the thing") { + t.Fatalf("resumed manager missing prompt/title: title=%q", live.Title) + } + + // Burn through the manager budget: terminate the respawn and add managers until + // the cap is reached, all terminal so none is live. + terminateSession(t, st, live.ID, model.SessionFailed) + for o.countManagers(obj.ID) < maxManagerSessions { + m, mErr := o.CreateSession(SpawnSpec{ObjectiveID: obj.ID, Role: model.RoleManager, + Agent: model.AgentClaude, Mode: model.ModeInteractive, Title: "Manager"}) + if mErr != nil { + t.Fatalf("seed manager: %v", mErr) + } + terminateSession(t, st, m.ID, model.SessionFailed) + } + + // Past the cooldown, with the budget exhausted, it escalates instead of + // respawning forever. + future := st.Now().Add(supervisePokeCooldown + time.Minute) + acts = o.superviseDecisions(future) + if len(acts) != 1 || acts[0].kind != "escalate" { + t.Fatalf("an exhausted manager budget should escalate, got %+v", acts) + } + o.escalateManagerDeaths(obj.ID) + if qs, _ := st.ListQuestionsByObjective(obj.ID); len(qs) == 0 { + t.Fatal("escalation should record an open question for the objective") + } +} + +// terminateSession drives a (queued) session through the legal lifecycle to a +// terminal state: queued -> starting -> running -> final. +func terminateSession(t *testing.T, st *store.Store, id string, final model.SessionStatus) { + t.Helper() + for _, s := range []model.SessionStatus{model.SessionStarting, model.SessionRunning, final} { + if _, err := st.UpdateSessionStatus(id, s); err != nil { + t.Fatalf("transition %s -> %s: %v", id, s, err) + } } } diff --git a/internal/orch/supervisor.go b/internal/orch/supervisor.go index 4030c6e..4c104fe 100644 --- a/internal/orch/supervisor.go +++ b/internal/orch/supervisor.go @@ -15,65 +15,107 @@ import ( // that genuinely paused. var superviseIdleAfter = 90 * time.Second -// supervisePokeCooldown is the minimum gap between re-pokes of the same objective, -// so a manager that ignores a poke is nudged again at most once per cooldown -// rather than every scheduler tick. +// supervisePokeCooldown is the minimum gap between supervisor actions on the same +// objective, so a manager that ignores a poke (or a respawn that immediately dies) +// is retried at most once per cooldown rather than every scheduler tick. var supervisePokeCooldown = 90 * time.Second -// SuperviseIdleObjectives re-engages the manager of any active objective that has -// gone idle. "Idle" means no worker is making progress: a manager paused -// mid-task (or one that never got a nudge because the triggering event was -// dropped) would otherwise leave the objective ACTIVE forever. The supervisor -// pokes only when NO worker is active — a running worker, even one quietly -// grinding through a long build, is non-terminal and counts as activity, so an -// objective that is genuinely progressing is left alone. It pokes a LIVE manager -// only; re-spawning a manager that already went terminal is handled separately. +// maxManagerSessions caps how many manager sessions an objective may accumulate +// (the original plus respawns). Past this, the supervisor stops respawning and +// asks the user instead of spinning up managers forever. +const maxManagerSessions = 4 + +// supervisorAction is a decision the supervisor made for one objective. kind is +// "poke" (re-engage the live manager), "respawn" (the objective has no live +// manager — start a fresh one), or "escalate" (too many manager deaths — ask the +// user). +type supervisorAction struct { + kind string + objectiveID string + prompt string // objective prompt, for respawn + agent model.AgentKind // manager agent to reuse, for respawn + manager *model.Session // the live manager, for poke +} + +// SuperviseIdleObjectives keeps every active objective driven. A manager can +// pause mid-task and end its turn, or its session can go terminal entirely; in +// both cases the objective would otherwise sit ACTIVE with nothing making +// progress. Each tick it re-pokes a live-but-idle manager, respawns a manager for +// an objective that lost one, and (after too many respawns) asks the user. It +// only acts when NO worker is making progress — a non-terminal worker, even one +// on a long quiet build, counts as activity, so a progressing objective is left +// alone. func (o *Orchestrator) SuperviseIdleObjectives(ctx context.Context) { - for _, mgr := range o.idleManagersToPoke(o.st.Now()) { - o.audit(mgr.ObjectiveID, mgr.ID, "manager_repoke", - "no active workers; re-engaging idle manager", nil) - _ = o.Steer(ctx, mgr.ID, o.idleManagerPokeMessage(mgr.ObjectiveID)) + for _, act := range o.superviseDecisions(o.st.Now()) { + switch act.kind { + case "poke": + o.audit(act.objectiveID, act.manager.ID, "manager_repoke", + "no active workers; re-engaging idle manager", nil) + _ = o.Steer(ctx, act.manager.ID, o.idleManagerPokeMessage(act.objectiveID)) + case "respawn": + o.respawnManager(act) + case "escalate": + o.escalateManagerDeaths(act.objectiveID) + } } } -// idleManagersToPoke returns the live managers of active objectives that should -// be re-poked now, recording the poke time for each so the cooldown holds. It is -// separated from the Steer side effect so the decision logic is testable. -func (o *Orchestrator) idleManagersToPoke(now time.Time) []*model.Session { +// superviseDecisions returns the actions to take now, recording the action time +// per objective so the cooldown holds. It is separated from the side effects so +// the decision logic is testable without driving real agents. +func (o *Orchestrator) superviseDecisions(now time.Time) []supervisorAction { objs, err := o.st.ListObjectives() if err != nil { return nil } - var out []*model.Session + var out []supervisorAction for _, obj := range objs { if obj.Status != model.ObjectiveActive { continue } + // The rule, shared by both paths: only act when NO worker is active. A + // running worker means the objective is progressing on its own. + if o.objectiveHasActiveWorkers(obj.ID, "") { + continue + } + mgr := o.activeManagerFor(obj.ID) if mgr == nil { - continue // no live manager to poke (terminal-manager respawn is separate) - } - // The rule: only re-poke when NO worker is active. - if o.objectiveHasActiveWorkers(obj.ID, mgr.ID) { + // No live manager: nothing can react to worker completions or PR events, + // so the objective is stuck. Respawn one — unless this objective has + // already burned through too many managers, in which case ask the user. + if o.countManagers(obj.ID) >= maxManagerSessions { + if o.markPoked(obj.ID, now) { + out = append(out, supervisorAction{kind: "escalate", objectiveID: obj.ID}) + } + continue + } + if !o.markPoked(obj.ID, now) { + continue + } + out = append(out, supervisorAction{ + kind: "respawn", objectiveID: obj.ID, prompt: obj.Prompt, + agent: o.lastManagerAgent(obj.ID), + }) continue } - // Don't poke a manager that's actively working right now (mid tool-call, - // streaming output): its UpdatedAt is fresh. Only a manager quiet for - // superviseIdleAfter is treated as stalled. + + // Live manager: only poke one that has actually been quiet (a manager + // streaming output right now has a fresh UpdatedAt and is left alone). if now.Sub(mgr.UpdatedAt) < superviseIdleAfter { continue } if !o.markPoked(obj.ID, now) { - continue // poked too recently + continue } - out = append(out, mgr) + out = append(out, supervisorAction{kind: "poke", objectiveID: obj.ID, manager: mgr}) } return out } -// markPoked records a re-poke for an objective and reports whether enough time -// has passed since the last one. It returns false (and does not update the time) -// when still within the cooldown. +// markPoked records a supervisor action for an objective and reports whether +// enough time has passed since the last one. It returns false (and leaves the +// time unchanged) while still within the cooldown. func (o *Orchestrator) markPoked(objectiveID string, now time.Time) bool { o.pokeMu.Lock() defer o.pokeMu.Unlock() @@ -87,14 +129,86 @@ func (o *Orchestrator) markPoked(objectiveID string, now time.Time) bool { return true } -// idleManagerPokeMessage builds the re-poke text. Because the manager has no -// read tools, the poke carries the state snapshot inline: every worker's role, -// status, and latest summary, plus the objective's PRs. This is what lets a blind -// manager decide whether to spawn more work or finish. -func (o *Orchestrator) idleManagerPokeMessage(objectiveID string) string { - var b strings.Builder - b.WriteString("No workers are currently running for this objective, so nothing is making progress right now.\n\n") +// countManagers returns how many manager sessions an objective has had (live or +// terminal) — its respawn budget is measured against this. +func (o *Orchestrator) countManagers(objectiveID string) int { + sessions, err := o.st.ListSessionsByObjective(objectiveID) + if err != nil { + return 0 + } + n := 0 + for _, s := range sessions { + if s.Role == model.RoleManager { + n++ + } + } + return n +} +// lastManagerAgent returns the agent kind of the objective's most recent manager +// so a respawn reuses the same provider (codex vs claude), falling back to the +// default when there somehow isn't one. +func (o *Orchestrator) lastManagerAgent(objectiveID string) model.AgentKind { + sessions, err := o.st.ListSessionsByObjective(objectiveID) + if err != nil { + return o.defaultAgent() + } + var agent model.AgentKind + var newest time.Time + for _, s := range sessions { + if s.Role == model.RoleManager && !s.CreatedAt.Before(newest) { + agent, newest = s.Agent, s.CreatedAt + } + } + if agent == "" { + return o.defaultAgent() + } + return agent +} + +// respawnManager starts a fresh manager for an objective that lost its previous +// one. The new manager is handed the original prompt plus a snapshot of what has +// already happened, so it continues the objective instead of restarting it. +func (o *Orchestrator) respawnManager(act supervisorAction) { + goal := act.prompt + "\n\n" + o.resumeManagerContext(act.objectiveID) + mgr, err := o.CreateSession(SpawnSpec{ + ObjectiveID: act.objectiveID, + Role: model.RoleManager, + Agent: act.agent, + Mode: model.ModeInteractive, + Title: "Manager (resumed)", + Goal: goal, + }) + if err != nil { + return + } + _ = o.st.SetObjectiveManager(act.objectiveID, mgr.ID) + o.audit(act.objectiveID, mgr.ID, "manager_respawned", + "respawned manager for objective with no live manager", nil) + o.notifyChange() // wake the scheduler to start it promptly +} + +// escalateManagerDeaths records a user-facing question when an objective has +// churned through its manager budget — repeatedly respawning would just burn +// tokens, so a human should look. +func (o *Orchestrator) escalateManagerDeaths(objectiveID string) { + _ = o.st.CreateQuestion(&model.Question{ + ObjectiveID: objectiveID, + Priority: 20, + Question: "This objective's manager has terminated repeatedly and is still not done. " + + "It may be stuck. Review the work so far and decide whether to keep going, re-scope, or cancel it.", + Context: o.objectiveStateSnapshot(objectiveID), + }) + o.audit(objectiveID, "", "manager_respawn_capped", + "manager respawn budget exhausted; asked the user", nil) +} + +// objectiveStateSnapshot renders the worker statuses and PRs for an objective. +// Both the idle re-poke and a manager respawn embed it, since the manager has no +// read tools of its own — the snapshot is how a blind (or brand-new) manager +// learns what already happened. +func (o *Orchestrator) objectiveStateSnapshot(objectiveID string) string { + var b strings.Builder sessions, _ := o.st.ListSessionsByObjective(objectiveID) b.WriteString("Worker status:\n") wrote := false @@ -112,16 +226,28 @@ func (o *Orchestrator) idleManagerPokeMessage(objectiveID string) string { if !wrote { b.WriteString("- (no workers spawned yet)\n") } - if prs, _ := o.st.ListPRsByObjective(objectiveID); len(prs) > 0 { b.WriteString("\nPull requests:\n") for _, p := range prs { fmt.Fprintf(&b, "- PR #%d [%s, checks=%s]: %q\n", p.Number, p.Status, p.ChecksState, p.Title) } } - - b.WriteString("\nDecide the next step now. If the objective is complete and every PR has merged, " + - "call mark_objective_done. If work remains, spawn the next worker. If you are blocked on a " + - "decision only the user can make, call ask_user. Do not end your turn without taking one of these actions.") return b.String() } + +// idleManagerPokeMessage is the re-poke text for a live but stalled manager. +func (o *Orchestrator) idleManagerPokeMessage(objectiveID string) string { + return "No workers are currently running for this objective, so nothing is making progress right now.\n\n" + + o.objectiveStateSnapshot(objectiveID) + + "\nDecide the next step now. If the objective is complete and every PR has merged, call " + + "mark_objective_done. If work remains, spawn the next worker. If you are blocked on a decision only " + + "the user can make, call ask_user. Do not end your turn without taking one of these actions." +} + +// resumeManagerContext frames the state snapshot for a freshly respawned manager. +func (o *Orchestrator) resumeManagerContext(objectiveID string) string { + return "You are resuming management of an objective that was already in progress; the previous manager " + + "session ended. Below is the current state — continue from here. Do NOT restart work that is already " + + "done or re-spawn workers that already ran; pick up the remaining work and drive it to completion.\n\n" + + o.objectiveStateSnapshot(objectiveID) +} From e318ac6f6f1111b7a04c54bfd786ebd3e20bfc9f Mon Sep 17 00:00:00 2001 From: Nathan Whitaker Date: Fri, 12 Jun 2026 16:26:11 -0700 Subject: [PATCH 3/3] Reclaim workspace checkouts once their objective is done Per-session checkouts (/) were created during prepare and never removed, so long-lived targets accumulated a full repo checkout per worker forever -- nothing in cleanupRun touched disk, and the archived/dirty workspace statuses were defined but never set. Add a periodic GC pass (every 5m, plus a startup sweep) that rm -rf's a workspace's checkout on its target and marks it archived. It is deliberately conservative to avoid the sharing hazard (dependents inherit a checkout, the manager publishes a PR from a worker's checkout, follow-ups reuse a PR-branch checkout): a workspace is reclaimed only once its OBJECTIVE is terminal AND no non-terminal session still references it. The shared bare mirror cache is kept. Passes run under a TryLock so they don't overlap and are idempotent (archived workspaces are skipped); a transient remote rm failure is retried next pass, while a vanished target archives to stop retrying. GC runs async so a remote rm never stalls scheduling. --- internal/orch/driver.go | 12 ++++++ internal/orch/orch.go | 2 + internal/orch/orch_test.go | 68 +++++++++++++++++++++++++++++ internal/orch/reclaim.go | 88 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 170 insertions(+) create mode 100644 internal/orch/reclaim.go diff --git a/internal/orch/driver.go b/internal/orch/driver.go index 5e3f876..9be9c8e 100644 --- a/internal/orch/driver.go +++ b/internal/orch/driver.go @@ -46,10 +46,19 @@ func (s *Scheduler) Wake() { } } +// workspaceGCInterval is how often the scheduler reclaims unneeded checkouts. +// Disk cleanup is not latency-sensitive, so it runs far less often than the +// scheduling tick. +var workspaceGCInterval = 5 * time.Minute + // Run drives the scheduler until ctx is canceled. func (s *Scheduler) Run(ctx context.Context) { t := time.NewTicker(s.interval) + gc := time.NewTicker(workspaceGCInterval) defer t.Stop() + defer gc.Stop() + // Sweep stale checkouts left by a previous process at startup, then on a tick. + go s.o.ReclaimWorkspaces(ctx) for { _, _ = s.Tick(ctx) // Re-engage any active objective that has gone idle (no worker making @@ -61,6 +70,9 @@ func (s *Scheduler) Run(ctx context.Context) { return case <-t.C: case <-s.wake: + case <-gc.C: + // Reclaim asynchronously: a remote rm must not stall scheduling. + go s.o.ReclaimWorkspaces(ctx) } } } diff --git a/internal/orch/orch.go b/internal/orch/orch.go index 038aa97..04aa265 100644 --- a/internal/orch/orch.go +++ b/internal/orch/orch.go @@ -80,6 +80,8 @@ type Orchestrator struct { pokeMu sync.Mutex // guards lastPoke lastPoke map[string]time.Time // per-objective last supervisor re-poke time (cooldown) + + gcMu sync.Mutex // held during a workspace-reclaim pass so passes don't overlap } // SetNotify installs a hook called whenever schedulable state changes (a diff --git a/internal/orch/orch_test.go b/internal/orch/orch_test.go index d954fc3..bed0f21 100644 --- a/internal/orch/orch_test.go +++ b/internal/orch/orch_test.go @@ -1558,6 +1558,74 @@ func terminateSession(t *testing.T, st *store.Store, id string, final model.Sess } } +func TestReclaimWorkspaces(t *testing.T) { + o, st := newTestOrch(t) + tgt := addTarget(t, st, "local", model.TargetLocal, 4) + + mkdir := func(name string) string { + d := filepath.Join(t.TempDir(), name) + if err := os.MkdirAll(d, 0o755); err != nil { + t.Fatal(err) + } + return d + } + newWS := func(objID, path string) *model.Workspace { + ws := &model.Workspace{ObjectiveID: objID, TargetID: tgt.ID, + Kind: model.WorkspaceIsolated, Path: path, Status: model.WorkspaceReady} + if err := st.CreateWorkspace(ws); err != nil { + t.Fatal(err) + } + return ws + } + + // Terminal objective with no live sessions -> its checkout is reclaimed. + doneObj := &model.Objective{Title: "done", Prompt: "p", Status: model.ObjectiveSucceeded} + _ = st.CreateObjective(doneObj) + donePath := mkdir("done") + doneWS := newWS(doneObj.ID, donePath) + + // Active objective -> left alone (it may still publish/inherit the checkout). + activeObj := &model.Objective{Title: "active", Prompt: "p", Status: model.ObjectiveActive} + _ = st.CreateObjective(activeObj) + activePath := mkdir("active") + activeWS := newWS(activeObj.ID, activePath) + + // Terminal objective but a non-terminal session still references the + // workspace -> left alone (safety against the sharing hazard). + busyObj := &model.Objective{Title: "busy", Prompt: "p", Status: model.ObjectiveSucceeded} + _ = st.CreateObjective(busyObj) + busyPath := mkdir("busy") + busyWS := newWS(busyObj.ID, busyPath) + if err := st.CreateSession(&model.Session{ObjectiveID: busyObj.ID, Role: model.RoleImplementer, + Agent: model.AgentClaude, Status: model.SessionRunning, WorkspaceID: busyWS.ID}); err != nil { + t.Fatal(err) + } + + o.ReclaimWorkspaces(context.Background()) + + // Reclaimed: dir gone, status archived. + if _, err := os.Stat(donePath); !os.IsNotExist(err) { + t.Fatalf("terminal objective's checkout should be removed, stat err=%v", err) + } + if ws, _ := st.GetWorkspace(doneWS.ID); ws.Status != model.WorkspaceArchived { + t.Fatalf("reclaimed workspace should be archived, got %s", ws.Status) + } + // Kept: active objective. + if _, err := os.Stat(activePath); err != nil { + t.Fatalf("active objective's checkout must be kept, stat err=%v", err) + } + if ws, _ := st.GetWorkspace(activeWS.ID); ws.Status != model.WorkspaceReady { + t.Fatalf("active workspace must stay ready, got %s", ws.Status) + } + // Kept: in-use by a live session. + if _, err := os.Stat(busyPath); err != nil { + t.Fatalf("in-use checkout must be kept, stat err=%v", err) + } + if ws, _ := st.GetWorkspace(busyWS.ID); ws.Status != model.WorkspaceReady { + t.Fatalf("in-use workspace must stay ready, got %s", ws.Status) + } +} + func TestFailingChecksPR_SpawnsCIFollowup(t *testing.T) { o, st := newTestOrch(t) addTarget(t, st, "local", model.TargetLocal, 4) diff --git a/internal/orch/reclaim.go b/internal/orch/reclaim.go new file mode 100644 index 0000000..be082db --- /dev/null +++ b/internal/orch/reclaim.go @@ -0,0 +1,88 @@ +package orch + +import ( + "context" + + "github.com/nathanwhit/orcha/internal/agent" + "github.com/nathanwhit/orcha/internal/exec" + "github.com/nathanwhit/orcha/internal/model" +) + +// ReclaimWorkspaces removes the on-disk checkout of every workspace that is no +// longer needed and marks it archived, reclaiming disk on long-lived targets +// where per-session checkouts otherwise accumulate forever. It is deliberately +// conservative: a workspace is only reclaimed once its OBJECTIVE is terminal AND +// no non-terminal session still references it. This sidesteps the sharing hazard +// — dependent workers inherit a dependency's checkout, the manager publishes a +// PR from a worker's checkout, follow-ups reuse a PR-branch checkout — by never +// touching a workspace while its objective could still do work. The shared bare +// mirror cache is left in place (it is reused across checkouts and stays small). +// +// It runs under a TryLock so overlapping ticks don't pile up, and is idempotent: +// already-archived workspaces are skipped, so each dir is removed at most once. +func (o *Orchestrator) ReclaimWorkspaces(ctx context.Context) { + if !o.gcMu.TryLock() { + return // a reclaim pass is already running + } + defer o.gcMu.Unlock() + + objs, err := o.st.ListObjectives() + if err != nil { + return + } + for _, obj := range objs { + if obj.Status == model.ObjectiveActive { + continue // active objectives may still publish/inherit these checkouts + } + sessions, err := o.st.ListSessionsByObjective(obj.ID) + if err != nil { + continue + } + inUse := map[string]bool{} + for _, s := range sessions { + if s.WorkspaceID != "" && !s.Status.IsTerminal() { + inUse[s.WorkspaceID] = true + } + } + wss, err := o.st.ListWorkspacesByObjective(obj.ID) + if err != nil { + continue + } + for _, ws := range wss { + if !inUse[ws.ID] { + o.reclaimWorkspace(ctx, ws) + } + } + } +} + +// reclaimWorkspace removes one workspace's checkout dir on its target and marks +// it archived. Already-archived/preparing workspaces are skipped. On a reachable +// target whose rm fails (a transient SSH hiccup) the workspace is left for a +// later pass to retry rather than marked archived; when the target itself is gone +// there is nothing to remove, so it is archived to stop retrying. +func (o *Orchestrator) reclaimWorkspace(ctx context.Context, ws *model.Workspace) { + if ws.Status != model.WorkspaceReady && ws.Status != model.WorkspaceFailed { + return + } + if ws.Path == "" || ws.TargetID == "" { + _ = o.st.SetWorkspaceStatus(ws.ID, model.WorkspaceArchived) + return + } + tgt, err := o.st.GetTarget(ws.TargetID) + if err != nil { + // Target is gone — the dir is unreachable. Archive to stop retrying. + _ = o.st.SetWorkspaceStatus(ws.ID, model.WorkspaceArchived) + return + } + ex := agent.NewExecutor(tgt) + if _, err := exec.RunCapture(ctx, ex, exec.Command{ + Name: "rm", Args: []string{"-rf", ws.Path}, + }); err != nil { + // Reachable target but rm failed; leave it for the next pass to retry. + return + } + _ = o.st.SetWorkspaceStatus(ws.ID, model.WorkspaceArchived) + o.audit(ws.ObjectiveID, "", "workspace_reclaimed", + "removed checkout "+ws.Path, model.JSONMap{"workspace_id": ws.ID, "target_id": ws.TargetID}) +}