Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- **StatusFinishing Sidebar State** - Added a `finishing` status for pipeline instances between sentinel file detection and verification completion, providing accurate sidebar feedback instead of showing "working" during the verification phase
- **Spec-Driven Planning (`--spec`)** - New `--spec` flag for ultraplan that converts an existing product spec (Notion page, GitHub issue, markdown file, etc.) into an ultraplan instead of open-ended codebase exploration. The planning agent fetches the spec, preserves its task structure faithfully, and enriches it with codebase-specific file paths.
- **Remove All Instances Command** - Added `:D!` / `:remove!` command to remove all instances from the session at once, complementing the existing `:D` single-instance removal
- **Automatic Tmux Session Recovery** - When a tmux server dies during a live session (macOS `/tmp` cleanup, crash, or kill), the capture loop now automatically detects the death and resumes the Claude session in a fresh tmux session using `--resume`. Recovery attempts are limited (default 3) and only triggered when a backend session ID exists. Includes `OnRecovery` callback for orchestrator state synchronization.
Expand Down
5 changes: 4 additions & 1 deletion internal/bridge/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ Gate.ClaimNext() → FileLockRegistry.ClaimMultiple() → ContextPropagation
→ InstanceFactory.CreateInstance() → StartInstance()
→ Gate.MarkRunning() → (auto-approve if gated)
→ monitor loop (poll CompletionChecker)
→ Gate.Complete/Fail() + SessionRecorder
→ SessionRecorder.RecordSentinelDetected() (on sentinel detection, before verify)
→ CompletionChecker.VerifyWork()
→ Gate.Complete/Fail() + SessionRecorder.RecordCompletion/RecordFailure
→ FileLockRegistry.ReleaseAll() + ContextPropagation.ShareDiscovery()
```

Expand Down Expand Up @@ -48,6 +50,7 @@ These interfaces are implemented by adapters in `internal/orchestrator/bridgewir
- **TaskQueue retry interacts with bridge claim loop** — `TaskQueue.Fail()` has retry logic (`defaultMaxRetries=2`). When the bridge monitor calls `gate.Fail()`, the task may return to `TaskPending` (not permanently failed), and the claim loop re-claims it. Tests that assert on `Running()` after failure must either disable retries via `SetMaxRetries(taskID, 0)` or account for the re-claim cycle.
- **Always log gate.Fail errors** — `gate.Fail()` can fail if the task has already transitioned. Always check and log the return error rather than discarding with `_ =`.
- **File lock conflicts use Release, not Fail** — When `ClaimMultiple` returns `ErrAlreadyClaimed`, use `gate.Release` to return the task to pending without burning retries. Using `gate.Fail` would consume retry attempts, and with scaling enabled (semaphore > 1), multiple tasks competing for the same file lock would exhaust retries and permanently fail. After releasing, call `waitForWake` to avoid a hot retry loop.
- **RecordSentinelDetected before VerifyWork** — When the sentinel file is detected (`done == true`), `recorder.RecordSentinelDetected` is called *before* `checker.VerifyWork`. This lets the production wiring set `inst.Status = StatusFinishing` so the TUI shows an accurate state while verification runs. The ordering is: sentinel detected → RecordSentinelDetected → VerifyWork → RecordCompletion/RecordFailure.
- **Record completion/failure before file lock release** — `recorder.RecordCompletion`/`RecordFailure` must be called immediately after `gate.Complete`/`gate.Fail`, before `reg.ReleaseAll` and `shareCompletion`. The gate transition triggers a synchronous event cascade that can complete the pipeline before the monitor goroutine reaches subsequent lines. If the recorder call comes after file lock I/O, tests (and observers) see the pipeline complete before the recorder fires.
- **Scaling monitor increases semaphore concurrency** — The hub's `ScalingMonitor` reacts to `QueueDepthChangedEvent` and may increase the bridge's semaphore limit via the `OnDecision` callback. Code that assumes semaphore=1 (sequential task execution) is incorrect when scaling is active. File lock claims are the safety net for concurrent access to the same files.

Expand Down
6 changes: 5 additions & 1 deletion internal/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,11 @@ func (b *Bridge) monitorInstance(taskID string, inst Instance) {
continue
}

// Instance wrote its sentinel file. Verify the work.
// Instance wrote its sentinel file — notify recorder so the UI
// can transition to "finishing" while verification runs.
b.recorder.RecordSentinelDetected(taskID, inst.ID())

// Verify the work.
success, commitCount, verifyErr := b.checker.VerifyWork(
taskID, inst.ID(), inst.WorktreePath(), inst.Branch(),
)
Expand Down
126 changes: 126 additions & 0 deletions internal/bridge/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ func (r *mockRecorder) AssignTask(taskID, instanceID string) {
r.assigned[taskID] = instanceID
}

func (r *mockRecorder) RecordSentinelDetected(_, _ string) {}

func (r *mockRecorder) RecordCompletion(taskID string, commitCount int) {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -843,6 +845,10 @@ func (r *signalingRecorder) AssignTask(taskID, instanceID string) {
r.inner.AssignTask(taskID, instanceID)
}

func (r *signalingRecorder) RecordSentinelDetected(taskID, instanceID string) {
r.inner.RecordSentinelDetected(taskID, instanceID)
}

func (r *signalingRecorder) RecordCompletion(taskID string, commitCount int) {
r.inner.RecordCompletion(taskID, commitCount)
}
Expand All @@ -855,6 +861,126 @@ func (r *signalingRecorder) RecordFailure(taskID, reason string) {
}
}

// orderRecorder tracks the order of RecordSentinelDetected and RecordCompletion calls
// to verify that sentinel detection is reported before completion.
type orderRecorder struct {
inner *mockRecorder
callOrder []string
mu sync.Mutex
completeCh chan string
}

func newOrderRecorder() *orderRecorder {
return &orderRecorder{
inner: newMockRecorder(),
completeCh: make(chan string, 1),
}
}

func (r *orderRecorder) AssignTask(taskID, instanceID string) {
r.inner.AssignTask(taskID, instanceID)
}

func (r *orderRecorder) RecordSentinelDetected(taskID, instanceID string) {
r.mu.Lock()
r.callOrder = append(r.callOrder, "sentinel:"+taskID)
r.mu.Unlock()
}

func (r *orderRecorder) RecordCompletion(taskID string, commitCount int) {
r.mu.Lock()
r.callOrder = append(r.callOrder, "completion:"+taskID)
r.mu.Unlock()
r.inner.RecordCompletion(taskID, commitCount)
select {
case r.completeCh <- taskID:
default:
}
}

func (r *orderRecorder) RecordFailure(taskID, reason string) {
r.inner.RecordFailure(taskID, reason)
}

func (r *orderRecorder) CallOrder() []string {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]string, len(r.callOrder))
copy(out, r.callOrder)
return out
}

func TestBridge_SentinelDetectedBeforeCompletion(t *testing.T) {
bus := event.NewBus()
tasks := []ultraplan.PlannedTask{
{ID: "t1", Title: "Task 1", Description: "Do thing 1"},
}
tt := newTestTeam(t, bus, tasks)

factory := newMockFactory()
checker := newMockChecker()
recorder := newOrderRecorder()

b := bridge.New(tt, factory, checker, recorder, bus,
bridge.WithPollInterval(10*time.Millisecond),
)

// Subscribe before Start to avoid missing the event.
startedCh := make(chan event.Event, 1)
subID := bus.Subscribe("bridge.task_started", func(e event.Event) {
select {
case startedCh <- e:
default:
}
})
defer bus.Unsubscribe(subID)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := b.Start(ctx); err != nil {
t.Fatalf("Start: %v", err)
}
defer b.Stop()

// Wait for the bridge to claim the task.
select {
case <-startedCh:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for bridge.task_started event")
}

// Signal completion.
factory.mu.Lock()
var worktreePath string
for _, inst := range factory.instances {
worktreePath = inst.worktreePath
break
}
factory.mu.Unlock()

checker.MarkComplete(worktreePath)

// Wait for completion to be recorded.
select {
case <-recorder.completeCh:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for RecordCompletion")
}

// Verify that sentinel was recorded before completion.
order := recorder.CallOrder()
if len(order) < 2 {
t.Fatalf("expected at least 2 calls, got %d: %v", len(order), order)
}
if order[0] != "sentinel:t1" {
t.Errorf("first call = %q, want %q", order[0], "sentinel:t1")
}
if order[1] != "completion:t1" {
t.Errorf("second call = %q, want %q", order[1], "completion:t1")
}
}

func TestBuildTaskPrompt(t *testing.T) {
prompt := bridge.BuildTaskPrompt(
"task-1",
Expand Down
4 changes: 4 additions & 0 deletions internal/bridge/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type SessionRecorder interface {
// AssignTask records that a task has been assigned to an instance.
AssignTask(taskID, instanceID string)

// RecordSentinelDetected signals that the instance wrote its sentinel
// file and is entering the finishing phase (verification in progress).
RecordSentinelDetected(taskID, instanceID string)

// RecordCompletion records successful task completion.
RecordCompletion(taskID string, commitCount int)

Expand Down
3 changes: 3 additions & 0 deletions internal/cmd/session/pipeline_wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ func registerPipelineFactory(coordinator *orchestrator.Coordinator, orch *orches
coordinator.SetPipelineFactory(func(deps orchestrator.PipelineRunnerDeps) (orchestrator.ExecutionRunner, error) {
recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{
OnAssign: coordinator.AssignTaskInstance,
OnSentinelDetected: func(_, instanceID string) {
orch.SetInstanceStatus(instanceID, orchestrator.StatusFinishing)
},
})
return bridgewire.NewPipelineRunner(bridgewire.PipelineRunnerConfig{
Orch: deps.Orch,
Expand Down
10 changes: 10 additions & 0 deletions internal/orchestrator/bridgewire/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ type SessionRecorderDeps struct {
// OnAssign is called when a task is assigned to an instance.
OnAssign func(taskID, instanceID string)

// OnSentinelDetected is called when the instance writes its sentinel file
// (entering the finishing phase before verification completes).
OnSentinelDetected func(taskID, instanceID string)

// OnComplete is called when a task completes successfully.
OnComplete func(taskID string, commitCount int)

Expand All @@ -119,6 +123,12 @@ func (r *sessionRecorder) AssignTask(taskID, instanceID string) {
}
}

func (r *sessionRecorder) RecordSentinelDetected(taskID, instanceID string) {
if r.deps.OnSentinelDetected != nil {
r.deps.OnSentinelDetected(taskID, instanceID)
}
}

func (r *sessionRecorder) RecordCompletion(taskID string, commitCount int) {
if r.deps.OnComplete != nil {
r.deps.OnComplete(taskID, commitCount)
Expand Down
11 changes: 11 additions & 0 deletions internal/orchestrator/bridgewire/adapters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func TestNewCompletionChecker_VerifyWorkFailure(t *testing.T) {

func TestNewSessionRecorder(t *testing.T) {
var assignedTask, assignedInst string
var sentinelTask, sentinelInst string
var completedTask string
var completedCommits int
var failedTask, failedReason string
Expand All @@ -94,6 +95,10 @@ func TestNewSessionRecorder(t *testing.T) {
assignedTask = taskID
assignedInst = instanceID
},
OnSentinelDetected: func(taskID, instanceID string) {
sentinelTask = taskID
sentinelInst = instanceID
},
OnComplete: func(taskID string, commitCount int) {
completedTask = taskID
completedCommits = commitCount
Expand All @@ -109,6 +114,11 @@ func TestNewSessionRecorder(t *testing.T) {
t.Errorf("AssignTask: got (%q, %q), want (%q, %q)", assignedTask, assignedInst, "t1", "inst-1")
}

recorder.RecordSentinelDetected("t1", "inst-1")
if sentinelTask != "t1" || sentinelInst != "inst-1" {
t.Errorf("RecordSentinelDetected: got (%q, %q), want (%q, %q)", sentinelTask, sentinelInst, "t1", "inst-1")
}

recorder.RecordCompletion("t2", 5)
if completedTask != "t2" || completedCommits != 5 {
t.Errorf("RecordCompletion: got (%q, %d), want (%q, %d)", completedTask, completedCommits, "t2", 5)
Expand All @@ -125,6 +135,7 @@ func TestNewSessionRecorder_NilCallbacks(t *testing.T) {

// Should not panic with nil callbacks.
recorder.AssignTask("t1", "inst-1")
recorder.RecordSentinelDetected("t1", "inst-1")
recorder.RecordCompletion("t1", 1)
recorder.RecordFailure("t1", "reason")
}
Expand Down
2 changes: 2 additions & 0 deletions internal/orchestrator/bridgewire/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func (r *trackingRecorder) AssignTask(taskID, instanceID string) {
}
}

func (r *trackingRecorder) RecordSentinelDetected(_, _ string) {}

func (r *trackingRecorder) RecordCompletion(taskID string, commitCount int) {
r.mu.Lock()
r.completed[taskID] = commitCount
Expand Down
19 changes: 19 additions & 0 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2375,6 +2375,25 @@ func (o *Orchestrator) GetInstance(id string) *Instance {
return nil
}

// SetInstanceStatus atomically sets the status of an instance by ID.
// Returns false if the instance was not found.
func (o *Orchestrator) SetInstanceStatus(id string, status InstanceStatus) bool {
o.mu.Lock()
defer o.mu.Unlock()

if o.session == nil {
return false
}

for _, inst := range o.session.Instances {
if inst.ID == id {
inst.Status = status
return true
}
}
return false
}

// GetInstanceDiff returns the git diff for an instance against main
func (o *Orchestrator) GetInstanceDiff(worktreePath string) (string, error) {
return o.wt.GetDiffAgainstMain(worktreePath)
Expand Down
9 changes: 5 additions & 4 deletions internal/orchestrator/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
StatusPending InstanceStatus = "pending"
StatusPreparing InstanceStatus = "preparing" // Worktree creation in progress (async setup)
StatusWorking InstanceStatus = "working"
StatusFinishing InstanceStatus = "finishing" // Sentinel file detected, verification in progress
StatusWaitingInput InstanceStatus = "waiting_input"
StatusPaused InstanceStatus = "paused"
StatusCompleted InstanceStatus = "completed"
Expand Down Expand Up @@ -255,7 +256,7 @@ func (s *Session) NeedsRecovery() bool {

// Check if any instances were in a working state
for _, inst := range s.Instances {
if inst.Status == StatusWorking || inst.Status == StatusWaitingInput {
if inst.Status == StatusWorking || inst.Status == StatusFinishing || inst.Status == StatusWaitingInput {
return true
}
}
Expand All @@ -267,7 +268,7 @@ func (s *Session) NeedsRecovery() bool {
func (s *Session) GetInterruptedInstances() []*Instance {
var interrupted []*Instance
for _, inst := range s.Instances {
if inst.Status == StatusWorking || inst.Status == StatusWaitingInput {
if inst.Status == StatusWorking || inst.Status == StatusFinishing || inst.Status == StatusWaitingInput {
// Mark as interrupted if we're detecting a recovery scenario
interrupted = append(interrupted, inst)
}
Expand All @@ -280,7 +281,7 @@ func (s *Session) GetResumableInstances() []*Instance {
var resumable []*Instance
for _, inst := range s.Instances {
// Instances with backend session ID can be resumed if they were running or interrupted
if inst.ClaudeSessionID != "" && (inst.Status == StatusWorking || inst.Status == StatusWaitingInput || inst.Status == StatusPaused || inst.Status == StatusInterrupted) {
if inst.ClaudeSessionID != "" && (inst.Status == StatusWorking || inst.Status == StatusFinishing || inst.Status == StatusWaitingInput || inst.Status == StatusPaused || inst.Status == StatusInterrupted) {
resumable = append(resumable, inst)
}
}
Expand All @@ -292,7 +293,7 @@ func (s *Session) GetResumableInstances() []*Instance {
func (s *Session) MarkInstancesInterrupted() {
now := time.Now()
for _, inst := range s.Instances {
if inst.Status == StatusWorking || inst.Status == StatusWaitingInput {
if inst.Status == StatusWorking || inst.Status == StatusFinishing || inst.Status == StatusWaitingInput {
inst.Status = StatusInterrupted
inst.InterruptedAt = &now
}
Expand Down
13 changes: 10 additions & 3 deletions internal/orchestrator/workflows/tripleshot/teamwire/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ func (c *judgeCompletionChecker) VerifyWork(_, _, worktreePath, _ string) (bool,

// sessionRecorderDeps defines the callbacks for the session recorder.
type sessionRecorderDeps struct {
OnAssign func(taskID, instanceID string)
OnComplete func(taskID string, commitCount int)
OnFailure func(taskID, reason string)
OnAssign func(taskID, instanceID string)
OnSentinelDetected func(taskID, instanceID string)
OnComplete func(taskID string, commitCount int)
OnFailure func(taskID, reason string)
}

// sessionRecorder delegates to caller-provided callbacks.
Expand All @@ -122,6 +123,12 @@ func (r *sessionRecorder) AssignTask(taskID, instanceID string) {
}
}

func (r *sessionRecorder) RecordSentinelDetected(taskID, instanceID string) {
if r.deps.OnSentinelDetected != nil {
r.deps.OnSentinelDetected(taskID, instanceID)
}
}

func (r *sessionRecorder) RecordCompletion(taskID string, commitCount int) {
if r.deps.OnComplete != nil {
r.deps.OnComplete(taskID, commitCount)
Expand Down
Loading
Loading