Skip to content
Merged

Poke #12

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions internal/orch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,33 @@ func (s *Scheduler) Wake() {
}
}

// workspaceGCInterval is how often the scheduler reclaims unneeded checkouts.
// Disk cleanup is not latency-sensitive, so it runs far less often than the
// scheduling tick.
var workspaceGCInterval = 5 * time.Minute

// Run drives the scheduler until ctx is canceled.
func (s *Scheduler) Run(ctx context.Context) {
t := time.NewTicker(s.interval)
gc := time.NewTicker(workspaceGCInterval)
defer t.Stop()
defer gc.Stop()
// Sweep stale checkouts left by a previous process at startup, then on a tick.
go s.o.ReclaimWorkspaces(ctx)
for {
_, _ = s.Tick(ctx)
// Re-engage any active objective that has gone idle (no worker making
// progress) so a paused manager continues instead of stalling forever.
// The poke's own cooldown keeps this from firing every tick.
s.o.SuperviseIdleObjectives(ctx)
select {
case <-ctx.Done():
return
case <-t.C:
case <-s.wake:
case <-gc.C:
// Reclaim asynchronously: a remote rm must not stall scheduling.
go s.o.ReclaimWorkspaces(ctx)
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions internal/orch/orch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package orch
import (
"errors"
"sync"
"time"

"github.com/nathanwhit/orcha/internal/agent"
"github.com/nathanwhit/orcha/internal/forge"
Expand Down Expand Up @@ -76,6 +77,11 @@ type Orchestrator struct {
tunnels map[string]*mcpTunnel // reverse MCP tunnels keyed by target id

adoptMu sync.Mutex // serializes PR adoption so concurrent scans can't double-record a PR

pokeMu sync.Mutex // guards lastPoke
lastPoke map[string]time.Time // per-objective last supervisor re-poke time (cooldown)

gcMu sync.Mutex // held during a workspace-reclaim pass so passes don't overlap
}

// SetNotify installs a hook called whenever schedulable state changes (a
Expand Down Expand Up @@ -116,6 +122,7 @@ func New(st *store.Store, cfg Config) *Orchestrator {
guards: map[string]*guardState{},
runs: map[string]*run{},
tunnels: map[string]*mcpTunnel{},
lastPoke: map[string]time.Time{},
}
}

Expand Down
178 changes: 178 additions & 0 deletions internal/orch/orch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,184 @@ func TestConflictingPR_SpawnsRebaseFollowup(t *testing.T) {
}
}

func TestSupervisor_RepokesIdleManagerOnlyWhenNoWorkers(t *testing.T) {
o, st := newTestOrch(t)
addTarget(t, st, "local", model.TargetLocal, 4)
o.RegisterProvider(agent.NewFake(model.AgentClaude, true, nil))

obj, mgr, err := o.CreateObjective(NewObjectiveSpec{Title: "x", Prompt: "p"})
if err != nil {
t.Fatalf("create objective: %v", err)
}

// A freshly-active manager (UpdatedAt just now) is NOT yet considered stalled.
if acts := o.superviseDecisions(st.Now()); len(acts) != 0 {
t.Fatalf("a just-active manager should not be acted on, got %d", len(acts))
}

// Once it has been quiet past the idle threshold and no worker is running,
// the idle manager is poked.
future := st.Now().Add(superviseIdleAfter + time.Minute)
acts := o.superviseDecisions(future)
if len(acts) != 1 || acts[0].kind != "poke" || acts[0].manager.ID != mgr.ID {
t.Fatalf("expected idle manager %s to be poked, got %+v", mgr.ID, acts)
}

// Cooldown: an immediate re-check does not act again.
if again := o.superviseDecisions(future); len(again) != 0 {
t.Fatalf("cooldown should suppress a second poke, got %d", len(again))
}
// After the cooldown elapses, an objective still idle is poked again.
later := future.Add(supervisePokeCooldown + time.Second)
if again := o.superviseDecisions(later); len(again) != 1 || again[0].kind != "poke" {
t.Fatalf("after cooldown, a still-idle manager should be poked again, got %+v", again)
}

// With an active worker, the objective is making progress -> never acted on,
// even long after the manager went quiet.
w := &model.Session{ObjectiveID: obj.ID, Role: model.RoleImplementer,
Agent: model.AgentClaude, Status: model.SessionRunning}
if err := st.CreateSession(w); err != nil {
t.Fatalf("create worker: %v", err)
}
evenLater := later.Add(supervisePokeCooldown + time.Second)
if again := o.superviseDecisions(evenLater); len(again) != 0 {
t.Fatalf("an objective with an active worker must not be acted on, got %d", len(again))
}
}

func TestSupervisor_RespawnsManagerThenEscalates(t *testing.T) {
o, st := newTestOrch(t)
addTarget(t, st, "local", model.TargetLocal, 4)
o.RegisterProvider(agent.NewFake(model.AgentClaude, true, nil))

obj, mgr, err := o.CreateObjective(NewObjectiveSpec{Title: "x", Prompt: "do the thing"})
if err != nil {
t.Fatalf("create objective: %v", err)
}

// The manager session terminated, the objective is still active, and no worker
// is running -> nothing can drive it. The supervisor decides to respawn.
terminateSession(t, st, mgr.ID, model.SessionSucceeded)
acts := o.superviseDecisions(st.Now())
if len(acts) != 1 || acts[0].kind != "respawn" {
t.Fatalf("a manager-less active objective should respawn, got %+v", acts)
}

// Executing it brings up a fresh live manager carrying the resume context.
o.respawnManager(acts[0])
live := o.activeManagerFor(obj.ID)
if live == nil {
t.Fatal("respawn should create a live manager")
}
if live.Title != "Manager (resumed)" || !strings.Contains(live.Goal, "do the thing") {
t.Fatalf("resumed manager missing prompt/title: title=%q", live.Title)
}

// Burn through the manager budget: terminate the respawn and add managers until
// the cap is reached, all terminal so none is live.
terminateSession(t, st, live.ID, model.SessionFailed)
for o.countManagers(obj.ID) < maxManagerSessions {
m, mErr := o.CreateSession(SpawnSpec{ObjectiveID: obj.ID, Role: model.RoleManager,
Agent: model.AgentClaude, Mode: model.ModeInteractive, Title: "Manager"})
if mErr != nil {
t.Fatalf("seed manager: %v", mErr)
}
terminateSession(t, st, m.ID, model.SessionFailed)
}

// Past the cooldown, with the budget exhausted, it escalates instead of
// respawning forever.
future := st.Now().Add(supervisePokeCooldown + time.Minute)
acts = o.superviseDecisions(future)
if len(acts) != 1 || acts[0].kind != "escalate" {
t.Fatalf("an exhausted manager budget should escalate, got %+v", acts)
}
o.escalateManagerDeaths(obj.ID)
if qs, _ := st.ListQuestionsByObjective(obj.ID); len(qs) == 0 {
t.Fatal("escalation should record an open question for the objective")
}
}

// terminateSession drives a (queued) session through the legal lifecycle to a
// terminal state: queued -> starting -> running -> final.
func terminateSession(t *testing.T, st *store.Store, id string, final model.SessionStatus) {
t.Helper()
for _, s := range []model.SessionStatus{model.SessionStarting, model.SessionRunning, final} {
if _, err := st.UpdateSessionStatus(id, s); err != nil {
t.Fatalf("transition %s -> %s: %v", id, s, err)
}
}
}

func TestReclaimWorkspaces(t *testing.T) {
o, st := newTestOrch(t)
tgt := addTarget(t, st, "local", model.TargetLocal, 4)

mkdir := func(name string) string {
d := filepath.Join(t.TempDir(), name)
if err := os.MkdirAll(d, 0o755); err != nil {
t.Fatal(err)
}
return d
}
newWS := func(objID, path string) *model.Workspace {
ws := &model.Workspace{ObjectiveID: objID, TargetID: tgt.ID,
Kind: model.WorkspaceIsolated, Path: path, Status: model.WorkspaceReady}
if err := st.CreateWorkspace(ws); err != nil {
t.Fatal(err)
}
return ws
}

// Terminal objective with no live sessions -> its checkout is reclaimed.
doneObj := &model.Objective{Title: "done", Prompt: "p", Status: model.ObjectiveSucceeded}
_ = st.CreateObjective(doneObj)
donePath := mkdir("done")
doneWS := newWS(doneObj.ID, donePath)

// Active objective -> left alone (it may still publish/inherit the checkout).
activeObj := &model.Objective{Title: "active", Prompt: "p", Status: model.ObjectiveActive}
_ = st.CreateObjective(activeObj)
activePath := mkdir("active")
activeWS := newWS(activeObj.ID, activePath)

// Terminal objective but a non-terminal session still references the
// workspace -> left alone (safety against the sharing hazard).
busyObj := &model.Objective{Title: "busy", Prompt: "p", Status: model.ObjectiveSucceeded}
_ = st.CreateObjective(busyObj)
busyPath := mkdir("busy")
busyWS := newWS(busyObj.ID, busyPath)
if err := st.CreateSession(&model.Session{ObjectiveID: busyObj.ID, Role: model.RoleImplementer,
Agent: model.AgentClaude, Status: model.SessionRunning, WorkspaceID: busyWS.ID}); err != nil {
t.Fatal(err)
}

o.ReclaimWorkspaces(context.Background())

// Reclaimed: dir gone, status archived.
if _, err := os.Stat(donePath); !os.IsNotExist(err) {
t.Fatalf("terminal objective's checkout should be removed, stat err=%v", err)
}
if ws, _ := st.GetWorkspace(doneWS.ID); ws.Status != model.WorkspaceArchived {
t.Fatalf("reclaimed workspace should be archived, got %s", ws.Status)
}
// Kept: active objective.
if _, err := os.Stat(activePath); err != nil {
t.Fatalf("active objective's checkout must be kept, stat err=%v", err)
}
if ws, _ := st.GetWorkspace(activeWS.ID); ws.Status != model.WorkspaceReady {
t.Fatalf("active workspace must stay ready, got %s", ws.Status)
}
// Kept: in-use by a live session.
if _, err := os.Stat(busyPath); err != nil {
t.Fatalf("in-use checkout must be kept, stat err=%v", err)
}
if ws, _ := st.GetWorkspace(busyWS.ID); ws.Status != model.WorkspaceReady {
t.Fatalf("in-use workspace must stay ready, got %s", ws.Status)
}
}

func TestFailingChecksPR_SpawnsCIFollowup(t *testing.T) {
o, st := newTestOrch(t)
addTarget(t, st, "local", model.TargetLocal, 4)
Expand Down
88 changes: 88 additions & 0 deletions internal/orch/reclaim.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package orch

import (
"context"

"github.com/nathanwhit/orcha/internal/agent"
"github.com/nathanwhit/orcha/internal/exec"
"github.com/nathanwhit/orcha/internal/model"
)

// ReclaimWorkspaces removes the on-disk checkout of every workspace that is no
// longer needed and marks it archived, reclaiming disk on long-lived targets
// where per-session checkouts otherwise accumulate forever. It is deliberately
// conservative: a workspace is only reclaimed once its OBJECTIVE is terminal AND
// no non-terminal session still references it. This sidesteps the sharing hazard
// — dependent workers inherit a dependency's checkout, the manager publishes a
// PR from a worker's checkout, follow-ups reuse a PR-branch checkout — by never
// touching a workspace while its objective could still do work. The shared bare
// mirror cache is left in place (it is reused across checkouts and stays small).
//
// It runs under a TryLock so overlapping ticks don't pile up, and is idempotent:
// already-archived workspaces are skipped, so each dir is removed at most once.
func (o *Orchestrator) ReclaimWorkspaces(ctx context.Context) {
if !o.gcMu.TryLock() {
return // a reclaim pass is already running
}
defer o.gcMu.Unlock()

objs, err := o.st.ListObjectives()
if err != nil {
return
}
for _, obj := range objs {
if obj.Status == model.ObjectiveActive {
continue // active objectives may still publish/inherit these checkouts
}
sessions, err := o.st.ListSessionsByObjective(obj.ID)
if err != nil {
continue
}
inUse := map[string]bool{}
for _, s := range sessions {
if s.WorkspaceID != "" && !s.Status.IsTerminal() {
inUse[s.WorkspaceID] = true
}
}
wss, err := o.st.ListWorkspacesByObjective(obj.ID)
if err != nil {
continue
}
for _, ws := range wss {
if !inUse[ws.ID] {
o.reclaimWorkspace(ctx, ws)
}
}
}
}

// reclaimWorkspace removes one workspace's checkout dir on its target and marks
// it archived. Already-archived/preparing workspaces are skipped. On a reachable
// target whose rm fails (a transient SSH hiccup) the workspace is left for a
// later pass to retry rather than marked archived; when the target itself is gone
// there is nothing to remove, so it is archived to stop retrying.
func (o *Orchestrator) reclaimWorkspace(ctx context.Context, ws *model.Workspace) {
if ws.Status != model.WorkspaceReady && ws.Status != model.WorkspaceFailed {
return
}
if ws.Path == "" || ws.TargetID == "" {
_ = o.st.SetWorkspaceStatus(ws.ID, model.WorkspaceArchived)
return
}
tgt, err := o.st.GetTarget(ws.TargetID)
if err != nil {
// Target is gone — the dir is unreachable. Archive to stop retrying.
_ = o.st.SetWorkspaceStatus(ws.ID, model.WorkspaceArchived)
return
}
ex := agent.NewExecutor(tgt)
if _, err := exec.RunCapture(ctx, ex, exec.Command{
Name: "rm", Args: []string{"-rf", ws.Path},
}); err != nil {
// Reachable target but rm failed; leave it for the next pass to retry.
return
}
_ = o.st.SetWorkspaceStatus(ws.ID, model.WorkspaceArchived)
o.audit(ws.ObjectiveID, "", "workspace_reclaimed",
"removed checkout "+ws.Path, model.JSONMap{"workspace_id": ws.ID, "target_id": ws.TargetID})
}
Loading
Loading