diff --git a/internal/orch/driver.go b/internal/orch/driver.go index 86c7e44..9be9c8e 100644 --- a/internal/orch/driver.go +++ b/internal/orch/driver.go @@ -46,17 +46,33 @@ func (s *Scheduler) Wake() { } } +// workspaceGCInterval is how often the scheduler reclaims unneeded checkouts. +// Disk cleanup is not latency-sensitive, so it runs far less often than the +// scheduling tick. +var workspaceGCInterval = 5 * time.Minute + // Run drives the scheduler until ctx is canceled. func (s *Scheduler) Run(ctx context.Context) { t := time.NewTicker(s.interval) + gc := time.NewTicker(workspaceGCInterval) defer t.Stop() + defer gc.Stop() + // Sweep stale checkouts left by a previous process at startup, then on a tick. + go s.o.ReclaimWorkspaces(ctx) for { _, _ = s.Tick(ctx) + // Re-engage any active objective that has gone idle (no worker making + // progress) so a paused manager continues instead of stalling forever. + // The poke's own cooldown keeps this from firing every tick. + s.o.SuperviseIdleObjectives(ctx) select { case <-ctx.Done(): return case <-t.C: case <-s.wake: + case <-gc.C: + // Reclaim asynchronously: a remote rm must not stall scheduling. + go s.o.ReclaimWorkspaces(ctx) } } } diff --git a/internal/orch/orch.go b/internal/orch/orch.go index 5fd188c..04aa265 100644 --- a/internal/orch/orch.go +++ b/internal/orch/orch.go @@ -7,6 +7,7 @@ package orch import ( "errors" "sync" + "time" "github.com/nathanwhit/orcha/internal/agent" "github.com/nathanwhit/orcha/internal/forge" @@ -76,6 +77,11 @@ type Orchestrator struct { tunnels map[string]*mcpTunnel // reverse MCP tunnels keyed by target id adoptMu sync.Mutex // serializes PR adoption so concurrent scans can't double-record a PR + + pokeMu sync.Mutex // guards lastPoke + lastPoke map[string]time.Time // per-objective last supervisor re-poke time (cooldown) + + gcMu sync.Mutex // held during a workspace-reclaim pass so passes don't overlap } // SetNotify installs a hook called whenever schedulable state changes (a @@ -116,6 +122,7 @@ func New(st *store.Store, cfg Config) *Orchestrator { guards: map[string]*guardState{}, runs: map[string]*run{}, tunnels: map[string]*mcpTunnel{}, + lastPoke: map[string]time.Time{}, } } diff --git a/internal/orch/orch_test.go b/internal/orch/orch_test.go index d5a02a0..bed0f21 100644 --- a/internal/orch/orch_test.go +++ b/internal/orch/orch_test.go @@ -1448,6 +1448,184 @@ func TestConflictingPR_SpawnsRebaseFollowup(t *testing.T) { } } +func TestSupervisor_RepokesIdleManagerOnlyWhenNoWorkers(t *testing.T) { + o, st := newTestOrch(t) + addTarget(t, st, "local", model.TargetLocal, 4) + o.RegisterProvider(agent.NewFake(model.AgentClaude, true, nil)) + + obj, mgr, err := o.CreateObjective(NewObjectiveSpec{Title: "x", Prompt: "p"}) + if err != nil { + t.Fatalf("create objective: %v", err) + } + + // A freshly-active manager (UpdatedAt just now) is NOT yet considered stalled. + if acts := o.superviseDecisions(st.Now()); len(acts) != 0 { + t.Fatalf("a just-active manager should not be acted on, got %d", len(acts)) + } + + // Once it has been quiet past the idle threshold and no worker is running, + // the idle manager is poked. + future := st.Now().Add(superviseIdleAfter + time.Minute) + acts := o.superviseDecisions(future) + if len(acts) != 1 || acts[0].kind != "poke" || acts[0].manager.ID != mgr.ID { + t.Fatalf("expected idle manager %s to be poked, got %+v", mgr.ID, acts) + } + + // Cooldown: an immediate re-check does not act again. + if again := o.superviseDecisions(future); len(again) != 0 { + t.Fatalf("cooldown should suppress a second poke, got %d", len(again)) + } + // After the cooldown elapses, an objective still idle is poked again. + later := future.Add(supervisePokeCooldown + time.Second) + if again := o.superviseDecisions(later); len(again) != 1 || again[0].kind != "poke" { + t.Fatalf("after cooldown, a still-idle manager should be poked again, got %+v", again) + } + + // With an active worker, the objective is making progress -> never acted on, + // even long after the manager went quiet. + w := &model.Session{ObjectiveID: obj.ID, Role: model.RoleImplementer, + Agent: model.AgentClaude, Status: model.SessionRunning} + if err := st.CreateSession(w); err != nil { + t.Fatalf("create worker: %v", err) + } + evenLater := later.Add(supervisePokeCooldown + time.Second) + if again := o.superviseDecisions(evenLater); len(again) != 0 { + t.Fatalf("an objective with an active worker must not be acted on, got %d", len(again)) + } +} + +func TestSupervisor_RespawnsManagerThenEscalates(t *testing.T) { + o, st := newTestOrch(t) + addTarget(t, st, "local", model.TargetLocal, 4) + o.RegisterProvider(agent.NewFake(model.AgentClaude, true, nil)) + + obj, mgr, err := o.CreateObjective(NewObjectiveSpec{Title: "x", Prompt: "do the thing"}) + if err != nil { + t.Fatalf("create objective: %v", err) + } + + // The manager session terminated, the objective is still active, and no worker + // is running -> nothing can drive it. The supervisor decides to respawn. + terminateSession(t, st, mgr.ID, model.SessionSucceeded) + acts := o.superviseDecisions(st.Now()) + if len(acts) != 1 || acts[0].kind != "respawn" { + t.Fatalf("a manager-less active objective should respawn, got %+v", acts) + } + + // Executing it brings up a fresh live manager carrying the resume context. + o.respawnManager(acts[0]) + live := o.activeManagerFor(obj.ID) + if live == nil { + t.Fatal("respawn should create a live manager") + } + if live.Title != "Manager (resumed)" || !strings.Contains(live.Goal, "do the thing") { + t.Fatalf("resumed manager missing prompt/title: title=%q", live.Title) + } + + // Burn through the manager budget: terminate the respawn and add managers until + // the cap is reached, all terminal so none is live. + terminateSession(t, st, live.ID, model.SessionFailed) + for o.countManagers(obj.ID) < maxManagerSessions { + m, mErr := o.CreateSession(SpawnSpec{ObjectiveID: obj.ID, Role: model.RoleManager, + Agent: model.AgentClaude, Mode: model.ModeInteractive, Title: "Manager"}) + if mErr != nil { + t.Fatalf("seed manager: %v", mErr) + } + terminateSession(t, st, m.ID, model.SessionFailed) + } + + // Past the cooldown, with the budget exhausted, it escalates instead of + // respawning forever. + future := st.Now().Add(supervisePokeCooldown + time.Minute) + acts = o.superviseDecisions(future) + if len(acts) != 1 || acts[0].kind != "escalate" { + t.Fatalf("an exhausted manager budget should escalate, got %+v", acts) + } + o.escalateManagerDeaths(obj.ID) + if qs, _ := st.ListQuestionsByObjective(obj.ID); len(qs) == 0 { + t.Fatal("escalation should record an open question for the objective") + } +} + +// terminateSession drives a (queued) session through the legal lifecycle to a +// terminal state: queued -> starting -> running -> final. +func terminateSession(t *testing.T, st *store.Store, id string, final model.SessionStatus) { + t.Helper() + for _, s := range []model.SessionStatus{model.SessionStarting, model.SessionRunning, final} { + if _, err := st.UpdateSessionStatus(id, s); err != nil { + t.Fatalf("transition %s -> %s: %v", id, s, err) + } + } +} + +func TestReclaimWorkspaces(t *testing.T) { + o, st := newTestOrch(t) + tgt := addTarget(t, st, "local", model.TargetLocal, 4) + + mkdir := func(name string) string { + d := filepath.Join(t.TempDir(), name) + if err := os.MkdirAll(d, 0o755); err != nil { + t.Fatal(err) + } + return d + } + newWS := func(objID, path string) *model.Workspace { + ws := &model.Workspace{ObjectiveID: objID, TargetID: tgt.ID, + Kind: model.WorkspaceIsolated, Path: path, Status: model.WorkspaceReady} + if err := st.CreateWorkspace(ws); err != nil { + t.Fatal(err) + } + return ws + } + + // Terminal objective with no live sessions -> its checkout is reclaimed. + doneObj := &model.Objective{Title: "done", Prompt: "p", Status: model.ObjectiveSucceeded} + _ = st.CreateObjective(doneObj) + donePath := mkdir("done") + doneWS := newWS(doneObj.ID, donePath) + + // Active objective -> left alone (it may still publish/inherit the checkout). + activeObj := &model.Objective{Title: "active", Prompt: "p", Status: model.ObjectiveActive} + _ = st.CreateObjective(activeObj) + activePath := mkdir("active") + activeWS := newWS(activeObj.ID, activePath) + + // Terminal objective but a non-terminal session still references the + // workspace -> left alone (safety against the sharing hazard). + busyObj := &model.Objective{Title: "busy", Prompt: "p", Status: model.ObjectiveSucceeded} + _ = st.CreateObjective(busyObj) + busyPath := mkdir("busy") + busyWS := newWS(busyObj.ID, busyPath) + if err := st.CreateSession(&model.Session{ObjectiveID: busyObj.ID, Role: model.RoleImplementer, + Agent: model.AgentClaude, Status: model.SessionRunning, WorkspaceID: busyWS.ID}); err != nil { + t.Fatal(err) + } + + o.ReclaimWorkspaces(context.Background()) + + // Reclaimed: dir gone, status archived. + if _, err := os.Stat(donePath); !os.IsNotExist(err) { + t.Fatalf("terminal objective's checkout should be removed, stat err=%v", err) + } + if ws, _ := st.GetWorkspace(doneWS.ID); ws.Status != model.WorkspaceArchived { + t.Fatalf("reclaimed workspace should be archived, got %s", ws.Status) + } + // Kept: active objective. + if _, err := os.Stat(activePath); err != nil { + t.Fatalf("active objective's checkout must be kept, stat err=%v", err) + } + if ws, _ := st.GetWorkspace(activeWS.ID); ws.Status != model.WorkspaceReady { + t.Fatalf("active workspace must stay ready, got %s", ws.Status) + } + // Kept: in-use by a live session. + if _, err := os.Stat(busyPath); err != nil { + t.Fatalf("in-use checkout must be kept, stat err=%v", err) + } + if ws, _ := st.GetWorkspace(busyWS.ID); ws.Status != model.WorkspaceReady { + t.Fatalf("in-use workspace must stay ready, got %s", ws.Status) + } +} + func TestFailingChecksPR_SpawnsCIFollowup(t *testing.T) { o, st := newTestOrch(t) addTarget(t, st, "local", model.TargetLocal, 4) diff --git a/internal/orch/reclaim.go b/internal/orch/reclaim.go new file mode 100644 index 0000000..be082db --- /dev/null +++ b/internal/orch/reclaim.go @@ -0,0 +1,88 @@ +package orch + +import ( + "context" + + "github.com/nathanwhit/orcha/internal/agent" + "github.com/nathanwhit/orcha/internal/exec" + "github.com/nathanwhit/orcha/internal/model" +) + +// ReclaimWorkspaces removes the on-disk checkout of every workspace that is no +// longer needed and marks it archived, reclaiming disk on long-lived targets +// where per-session checkouts otherwise accumulate forever. It is deliberately +// conservative: a workspace is only reclaimed once its OBJECTIVE is terminal AND +// no non-terminal session still references it. This sidesteps the sharing hazard +// — dependent workers inherit a dependency's checkout, the manager publishes a +// PR from a worker's checkout, follow-ups reuse a PR-branch checkout — by never +// touching a workspace while its objective could still do work. The shared bare +// mirror cache is left in place (it is reused across checkouts and stays small). +// +// It runs under a TryLock so overlapping ticks don't pile up, and is idempotent: +// already-archived workspaces are skipped, so each dir is removed at most once. +func (o *Orchestrator) ReclaimWorkspaces(ctx context.Context) { + if !o.gcMu.TryLock() { + return // a reclaim pass is already running + } + defer o.gcMu.Unlock() + + objs, err := o.st.ListObjectives() + if err != nil { + return + } + for _, obj := range objs { + if obj.Status == model.ObjectiveActive { + continue // active objectives may still publish/inherit these checkouts + } + sessions, err := o.st.ListSessionsByObjective(obj.ID) + if err != nil { + continue + } + inUse := map[string]bool{} + for _, s := range sessions { + if s.WorkspaceID != "" && !s.Status.IsTerminal() { + inUse[s.WorkspaceID] = true + } + } + wss, err := o.st.ListWorkspacesByObjective(obj.ID) + if err != nil { + continue + } + for _, ws := range wss { + if !inUse[ws.ID] { + o.reclaimWorkspace(ctx, ws) + } + } + } +} + +// reclaimWorkspace removes one workspace's checkout dir on its target and marks +// it archived. Already-archived/preparing workspaces are skipped. On a reachable +// target whose rm fails (a transient SSH hiccup) the workspace is left for a +// later pass to retry rather than marked archived; when the target itself is gone +// there is nothing to remove, so it is archived to stop retrying. +func (o *Orchestrator) reclaimWorkspace(ctx context.Context, ws *model.Workspace) { + if ws.Status != model.WorkspaceReady && ws.Status != model.WorkspaceFailed { + return + } + if ws.Path == "" || ws.TargetID == "" { + _ = o.st.SetWorkspaceStatus(ws.ID, model.WorkspaceArchived) + return + } + tgt, err := o.st.GetTarget(ws.TargetID) + if err != nil { + // Target is gone — the dir is unreachable. Archive to stop retrying. + _ = o.st.SetWorkspaceStatus(ws.ID, model.WorkspaceArchived) + return + } + ex := agent.NewExecutor(tgt) + if _, err := exec.RunCapture(ctx, ex, exec.Command{ + Name: "rm", Args: []string{"-rf", ws.Path}, + }); err != nil { + // Reachable target but rm failed; leave it for the next pass to retry. + return + } + _ = o.st.SetWorkspaceStatus(ws.ID, model.WorkspaceArchived) + o.audit(ws.ObjectiveID, "", "workspace_reclaimed", + "removed checkout "+ws.Path, model.JSONMap{"workspace_id": ws.ID, "target_id": ws.TargetID}) +} diff --git a/internal/orch/supervisor.go b/internal/orch/supervisor.go new file mode 100644 index 0000000..4c104fe --- /dev/null +++ b/internal/orch/supervisor.go @@ -0,0 +1,253 @@ +package orch + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/nathanwhit/orcha/internal/model" +) + +// superviseIdleAfter is how long a manager must be quiet (no progress) before the +// supervisor considers the objective stalled. A manager actively producing output +// advances its UpdatedAt via streamed progress, so this only trips on a manager +// that genuinely paused. +var superviseIdleAfter = 90 * time.Second + +// supervisePokeCooldown is the minimum gap between supervisor actions on the same +// objective, so a manager that ignores a poke (or a respawn that immediately dies) +// is retried at most once per cooldown rather than every scheduler tick. +var supervisePokeCooldown = 90 * time.Second + +// maxManagerSessions caps how many manager sessions an objective may accumulate +// (the original plus respawns). Past this, the supervisor stops respawning and +// asks the user instead of spinning up managers forever. +const maxManagerSessions = 4 + +// supervisorAction is a decision the supervisor made for one objective. kind is +// "poke" (re-engage the live manager), "respawn" (the objective has no live +// manager — start a fresh one), or "escalate" (too many manager deaths — ask the +// user). +type supervisorAction struct { + kind string + objectiveID string + prompt string // objective prompt, for respawn + agent model.AgentKind // manager agent to reuse, for respawn + manager *model.Session // the live manager, for poke +} + +// SuperviseIdleObjectives keeps every active objective driven. A manager can +// pause mid-task and end its turn, or its session can go terminal entirely; in +// both cases the objective would otherwise sit ACTIVE with nothing making +// progress. Each tick it re-pokes a live-but-idle manager, respawns a manager for +// an objective that lost one, and (after too many respawns) asks the user. It +// only acts when NO worker is making progress — a non-terminal worker, even one +// on a long quiet build, counts as activity, so a progressing objective is left +// alone. +func (o *Orchestrator) SuperviseIdleObjectives(ctx context.Context) { + for _, act := range o.superviseDecisions(o.st.Now()) { + switch act.kind { + case "poke": + o.audit(act.objectiveID, act.manager.ID, "manager_repoke", + "no active workers; re-engaging idle manager", nil) + _ = o.Steer(ctx, act.manager.ID, o.idleManagerPokeMessage(act.objectiveID)) + case "respawn": + o.respawnManager(act) + case "escalate": + o.escalateManagerDeaths(act.objectiveID) + } + } +} + +// superviseDecisions returns the actions to take now, recording the action time +// per objective so the cooldown holds. It is separated from the side effects so +// the decision logic is testable without driving real agents. +func (o *Orchestrator) superviseDecisions(now time.Time) []supervisorAction { + objs, err := o.st.ListObjectives() + if err != nil { + return nil + } + var out []supervisorAction + for _, obj := range objs { + if obj.Status != model.ObjectiveActive { + continue + } + // The rule, shared by both paths: only act when NO worker is active. A + // running worker means the objective is progressing on its own. + if o.objectiveHasActiveWorkers(obj.ID, "") { + continue + } + + mgr := o.activeManagerFor(obj.ID) + if mgr == nil { + // No live manager: nothing can react to worker completions or PR events, + // so the objective is stuck. Respawn one — unless this objective has + // already burned through too many managers, in which case ask the user. + if o.countManagers(obj.ID) >= maxManagerSessions { + if o.markPoked(obj.ID, now) { + out = append(out, supervisorAction{kind: "escalate", objectiveID: obj.ID}) + } + continue + } + if !o.markPoked(obj.ID, now) { + continue + } + out = append(out, supervisorAction{ + kind: "respawn", objectiveID: obj.ID, prompt: obj.Prompt, + agent: o.lastManagerAgent(obj.ID), + }) + continue + } + + // Live manager: only poke one that has actually been quiet (a manager + // streaming output right now has a fresh UpdatedAt and is left alone). + if now.Sub(mgr.UpdatedAt) < superviseIdleAfter { + continue + } + if !o.markPoked(obj.ID, now) { + continue + } + out = append(out, supervisorAction{kind: "poke", objectiveID: obj.ID, manager: mgr}) + } + return out +} + +// markPoked records a supervisor action for an objective and reports whether +// enough time has passed since the last one. It returns false (and leaves the +// time unchanged) while still within the cooldown. +func (o *Orchestrator) markPoked(objectiveID string, now time.Time) bool { + o.pokeMu.Lock() + defer o.pokeMu.Unlock() + if o.lastPoke == nil { + o.lastPoke = map[string]time.Time{} + } + if last, ok := o.lastPoke[objectiveID]; ok && now.Sub(last) < supervisePokeCooldown { + return false + } + o.lastPoke[objectiveID] = now + return true +} + +// countManagers returns how many manager sessions an objective has had (live or +// terminal) — its respawn budget is measured against this. +func (o *Orchestrator) countManagers(objectiveID string) int { + sessions, err := o.st.ListSessionsByObjective(objectiveID) + if err != nil { + return 0 + } + n := 0 + for _, s := range sessions { + if s.Role == model.RoleManager { + n++ + } + } + return n +} + +// lastManagerAgent returns the agent kind of the objective's most recent manager +// so a respawn reuses the same provider (codex vs claude), falling back to the +// default when there somehow isn't one. +func (o *Orchestrator) lastManagerAgent(objectiveID string) model.AgentKind { + sessions, err := o.st.ListSessionsByObjective(objectiveID) + if err != nil { + return o.defaultAgent() + } + var agent model.AgentKind + var newest time.Time + for _, s := range sessions { + if s.Role == model.RoleManager && !s.CreatedAt.Before(newest) { + agent, newest = s.Agent, s.CreatedAt + } + } + if agent == "" { + return o.defaultAgent() + } + return agent +} + +// respawnManager starts a fresh manager for an objective that lost its previous +// one. The new manager is handed the original prompt plus a snapshot of what has +// already happened, so it continues the objective instead of restarting it. +func (o *Orchestrator) respawnManager(act supervisorAction) { + goal := act.prompt + "\n\n" + o.resumeManagerContext(act.objectiveID) + mgr, err := o.CreateSession(SpawnSpec{ + ObjectiveID: act.objectiveID, + Role: model.RoleManager, + Agent: act.agent, + Mode: model.ModeInteractive, + Title: "Manager (resumed)", + Goal: goal, + }) + if err != nil { + return + } + _ = o.st.SetObjectiveManager(act.objectiveID, mgr.ID) + o.audit(act.objectiveID, mgr.ID, "manager_respawned", + "respawned manager for objective with no live manager", nil) + o.notifyChange() // wake the scheduler to start it promptly +} + +// escalateManagerDeaths records a user-facing question when an objective has +// churned through its manager budget — repeatedly respawning would just burn +// tokens, so a human should look. +func (o *Orchestrator) escalateManagerDeaths(objectiveID string) { + _ = o.st.CreateQuestion(&model.Question{ + ObjectiveID: objectiveID, + Priority: 20, + Question: "This objective's manager has terminated repeatedly and is still not done. " + + "It may be stuck. Review the work so far and decide whether to keep going, re-scope, or cancel it.", + Context: o.objectiveStateSnapshot(objectiveID), + }) + o.audit(objectiveID, "", "manager_respawn_capped", + "manager respawn budget exhausted; asked the user", nil) +} + +// objectiveStateSnapshot renders the worker statuses and PRs for an objective. +// Both the idle re-poke and a manager respawn embed it, since the manager has no +// read tools of its own — the snapshot is how a blind (or brand-new) manager +// learns what already happened. +func (o *Orchestrator) objectiveStateSnapshot(objectiveID string) string { + var b strings.Builder + sessions, _ := o.st.ListSessionsByObjective(objectiveID) + b.WriteString("Worker status:\n") + wrote := false + for _, s := range sessions { + if s.Role == model.RoleManager { + continue + } + wrote = true + sum := firstNonEmpty(s.LatestSummary, s.CurrentActivity) + if sum == "" { + sum = "(no summary reported)" + } + fmt.Fprintf(&b, "- %q [%s] %s: %s\n", s.Title, s.Role, s.Status, sum) + } + if !wrote { + b.WriteString("- (no workers spawned yet)\n") + } + if prs, _ := o.st.ListPRsByObjective(objectiveID); len(prs) > 0 { + b.WriteString("\nPull requests:\n") + for _, p := range prs { + fmt.Fprintf(&b, "- PR #%d [%s, checks=%s]: %q\n", p.Number, p.Status, p.ChecksState, p.Title) + } + } + return b.String() +} + +// idleManagerPokeMessage is the re-poke text for a live but stalled manager. +func (o *Orchestrator) idleManagerPokeMessage(objectiveID string) string { + return "No workers are currently running for this objective, so nothing is making progress right now.\n\n" + + o.objectiveStateSnapshot(objectiveID) + + "\nDecide the next step now. If the objective is complete and every PR has merged, call " + + "mark_objective_done. If work remains, spawn the next worker. If you are blocked on a decision only " + + "the user can make, call ask_user. Do not end your turn without taking one of these actions." +} + +// resumeManagerContext frames the state snapshot for a freshly respawned manager. +func (o *Orchestrator) resumeManagerContext(objectiveID string) string { + return "You are resuming management of an objective that was already in progress; the previous manager " + + "session ended. Below is the current state — continue from here. Do NOT restart work that is already " + + "done or re-spawn workers that already ran; pick up the remaining work and drive it to completion.\n\n" + + o.objectiveStateSnapshot(objectiveID) +}