Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ These are real issues agents have encountered in this codebase. Package-specific
- **Separate tracking for visible vs full captures** — The capture loop alternates between visible-only (cheap, no scrollback) and full (expensive, includes scrollback) tmux captures. Only full captures write to `outputBuf`. The change-detection variables must be independent (`lastVisibleOutput`, `lastFullOutput`) — a single shared variable causes cross-contamination where a visible capture sets the tracker, then the subsequent full capture (returning identical bytes when there's no scrollback) sees no change and skips the buffer write.
- **Completion protocol must be in the user prompt, not just system prompt** — The bridge's `BuildTaskPrompt` must embed the sentinel file instructions directly in the task prompt. The `--append-system-prompt-file` injection in `bridgewire` provides defense-in-depth, but if it fails silently (wrong path, unsupported flag version, etc.), instances have no knowledge of the completion convention and tasks time out. The `completionFileName` constant in the bridge package is duplicated from `orchestrator/types.TaskCompletionFileName` to avoid import cycles — keep them in sync.
- **Populate TaskToInstance before group routing** — `addInstanceToSubgroup` calls `determineSubgroupType` which checks `session.TaskToInstance` to identify execution instances. If `AssignTaskToInstance` hasn't been called yet, the instance falls through to `SubgroupTypeUnknown` and lands at the parent group root instead of the correct "Group N" subgroup. Always call `AssignTaskToInstance` before `AddInstanceToGroup` or `addInstanceToSubgroup`.
- **Pipeline path must mirror ExecutionOrchestrator callbacks** — The pipeline/bridge execution path creates instances through `InstanceFactory.CreateInstance()`, bypassing `ExecutionOrchestrator.startTask()` entirely. Any state management that `startTask()` does (group assignment, TaskToInstance mapping) must be replicated via `SessionRecorderDeps` callbacks (`OnAssign`, `OnComplete`, `OnFailure`). When adding new instance lifecycle hooks to `startTask()`, check whether the pipeline path also needs them.
- **Pipeline path must mirror ExecutionOrchestrator callbacks** — The pipeline/bridge execution path creates instances through `InstanceFactory.CreateInstance()`, bypassing `ExecutionOrchestrator.startTask()` entirely. Any state management that `startTask()` does (group assignment, TaskToInstance mapping, `CurrentGroup` advancement) must be replicated via `SessionRecorderDeps` callbacks (`OnAssign`, `OnComplete`, `OnFailure`). When adding new instance lifecycle hooks to `startTask()`, check whether the pipeline path also needs them. `CurrentGroup` advancement is handled in `AssignTaskInstance` (monotonic high-water-mark), not via `checkAndAdvanceGroup()` which is only called in the legacy path.

---

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **Subprocess Mode** - Removed the experimental subprocess execution mode (`experimental.subprocess_mode`), including the `internal/streamjson/` package, `subprocessFactory`, and all related config/TUI/wiring plumbing. Pipeline instances now always use the tmux-based execution backend.

### Fixed
- **Pipeline Group Navigation** - Fixed h/l navigation not reaching instances in later execution groups during pipeline execution. `CurrentGroup` was never advanced in the pipeline/bridge path, causing all groups except Group 1 to remain collapsed and non-navigable
- **Pipeline Deadlock on Cross-Team Dependencies** - Fixed `pipeline.Decompose` only grouping tasks by shared files, ignoring `DependsOn` edges. When dependent tasks had disjoint files, they landed in separate teams whose `TaskQueue.isClaimable()` could never resolve the cross-queue dependency, permanently blocking the pipeline. The decomposer now unions tasks along dependency edges in addition to file edges, ensuring all task-level dependencies are resolvable within a single team.
- **Pipeline Execution Count Exceeds Total** - Fixed `exec 3/2` display bug where the task done count exceeded the total count. `UpdateTeamCompleted` overwrote `TasksDone`/`TasksFailed` with backend-authoritative values but left `TasksTotal` at the stale incremental count from bridge start events. Now reconciles `TasksTotal` and clears `ActiveTasks` on team completion.
- **Ultraplan h/l Navigation Reversed** - Fixed `h` and `l` keybindings navigating in the opposite visual direction in ultraplan mode with groups. Navigation used plan-execution order (`getNavigableInstances`) while the sidebar rendered in group-structure order (`FlattenGroupsForDisplay`), causing the two orderings to diverge. Navigation now follows the visual display order filtered to navigable instances.
Expand Down
21 changes: 21 additions & 0 deletions internal/orchestrator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,27 @@ func (c *Coordinator) AssignTaskInstance(taskID, instanceID string) {
if !addInstanceToSubgroup(ultraGroup, session, instanceID) {
ultraGroup.AddInstance(instanceID)
}

// Advance CurrentGroup if this task belongs to a later execution group.
// In the pipeline path, dependencies are resolved at the task level, so
// tasks from multiple groups can execute concurrently. The TUI uses
// CurrentGroup to auto-expand the active group in the sidebar and to
// determine which groups are navigable via h/l keys. Without this,
// CurrentGroup stays at 0 forever in the pipeline path because
// ExecutionOrchestrator.checkAndAdvanceGroup() is never called.
groupIdx := getTaskGroupIndex(session, instanceID)
if groupIdx < 0 {
c.logger.Debug("task not found in any execution group, CurrentGroup unchanged",
"task_id", taskID, "instance_id", instanceID)
} else if groupIdx > session.CurrentGroup {
c.logger.Info("advancing CurrentGroup for pipeline execution",
"task_id", taskID,
"instance_id", instanceID,
"from_group", session.CurrentGroup,
"to_group", groupIdx,
)
session.CurrentGroup = groupIdx
}
}

// GetRunningTasks returns the currently running tasks and their instance IDs
Expand Down
165 changes: 165 additions & 0 deletions internal/orchestrator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2690,6 +2690,123 @@ func TestCoordinator_AssignTaskInstance(t *testing.T) {
assertGroupContains(t, multiGroup, "inst-1")
})

t.Run("advances CurrentGroup when task belongs to later group", func(t *testing.T) {
cfg := DefaultUltraPlanConfig()
ultraSession := NewUltraPlanSession("Test", cfg)
ultraSession.Plan = &PlanSpec{
Objective: "Test",
Tasks: []PlannedTask{
{ID: "task-1", Title: "T1"},
{ID: "task-2", Title: "T2"},
{ID: "task-3", Title: "T3"},
},
ExecutionOrder: [][]string{
{"task-1"}, // Group 1 (index 0)
{"task-2"}, // Group 2 (index 1)
{"task-3"}, // Group 3 (index 2)
},
}
ultraSession.Phase = PhaseExecuting

baseSession := &Session{Instances: make([]*Instance, 0)}
ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test")
baseSession.AddGroup(ultraGroup)

manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil)
coord := &Coordinator{
manager: manager,
baseSession: baseSession,
logger: logging.NopLogger(),
}

// Assign a Group 1 task — CurrentGroup should stay at 0
coord.AssignTaskInstance("task-1", "inst-1")
if ultraSession.CurrentGroup != 0 {
t.Errorf("CurrentGroup = %d after Group 1 task, want 0", ultraSession.CurrentGroup)
}

// Assign a Group 2 task — CurrentGroup should advance to 1
coord.AssignTaskInstance("task-2", "inst-2")
if ultraSession.CurrentGroup != 1 {
t.Errorf("CurrentGroup = %d after Group 2 task, want 1", ultraSession.CurrentGroup)
}

// Assign a Group 3 task — CurrentGroup should advance to 2
coord.AssignTaskInstance("task-3", "inst-3")
if ultraSession.CurrentGroup != 2 {
t.Errorf("CurrentGroup = %d after Group 3 task, want 2", ultraSession.CurrentGroup)
}
})

t.Run("does not regress CurrentGroup when earlier group task starts", func(t *testing.T) {
cfg := DefaultUltraPlanConfig()
ultraSession := NewUltraPlanSession("Test", cfg)
ultraSession.Plan = &PlanSpec{
Objective: "Test",
Tasks: []PlannedTask{
{ID: "task-1", Title: "T1"},
{ID: "task-2", Title: "T2"},
},
ExecutionOrder: [][]string{
{"task-1"}, // Group 1 (index 0)
{"task-2"}, // Group 2 (index 1)
},
}
ultraSession.Phase = PhaseExecuting

baseSession := &Session{Instances: make([]*Instance, 0)}
ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test")
baseSession.AddGroup(ultraGroup)

manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil)
coord := &Coordinator{
manager: manager,
baseSession: baseSession,
logger: logging.NopLogger(),
}

// Advance to Group 2 first
coord.AssignTaskInstance("task-2", "inst-2")
if ultraSession.CurrentGroup != 1 {
t.Fatalf("CurrentGroup = %d, want 1", ultraSession.CurrentGroup)
}

// Assigning a Group 1 task should NOT regress CurrentGroup
coord.AssignTaskInstance("task-1", "inst-1")
if ultraSession.CurrentGroup != 1 {
t.Errorf("CurrentGroup = %d after earlier group task, want 1 (should not regress)", ultraSession.CurrentGroup)
}
})

t.Run("leaves CurrentGroup unchanged when task not in any group", func(t *testing.T) {
cfg := DefaultUltraPlanConfig()
ultraSession := NewUltraPlanSession("Test", cfg)
// Plan has no execution order — getTaskGroupIndex returns -1
ultraSession.Plan = &PlanSpec{
Objective: "Test",
Tasks: []PlannedTask{{ID: "task-1", Title: "T1"}},
}
ultraSession.Phase = PhaseExecuting

baseSession := &Session{Instances: make([]*Instance, 0)}
ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test")
baseSession.AddGroup(ultraGroup)

manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil)
coord := &Coordinator{
manager: manager,
baseSession: baseSession,
logger: logging.NopLogger(),
}

coord.AssignTaskInstance("task-1", "inst-1")

// CurrentGroup should remain 0 since getTaskGroupIndex returns -1
if ultraSession.CurrentGroup != 0 {
t.Errorf("CurrentGroup = %d after unresolvable task, want 0", ultraSession.CurrentGroup)
}
})

t.Run("concurrent calls do not race", func(t *testing.T) {
cfg := DefaultUltraPlanConfig()
ultraSession := NewUltraPlanSession("Test", cfg)
Expand Down Expand Up @@ -2739,6 +2856,54 @@ func TestCoordinator_AssignTaskInstance(t *testing.T) {
assertGroupContains(t, ultraGroup, fmt.Sprintf("inst-%d", i))
}
})

t.Run("concurrent calls advance CurrentGroup correctly under race", func(t *testing.T) {
cfg := DefaultUltraPlanConfig()
ultraSession := NewUltraPlanSession("Test", cfg)
ultraSession.Plan = &PlanSpec{
Objective: "Test",
Tasks: []PlannedTask{
{ID: "task-1", Title: "T1"},
{ID: "task-2", Title: "T2"},
{ID: "task-3", Title: "T3"},
{ID: "task-4", Title: "T4"},
},
ExecutionOrder: [][]string{
{"task-1"},
{"task-2"},
{"task-3"},
{"task-4"},
},
}
ultraSession.Phase = PhaseExecuting

baseSession := &Session{Instances: make([]*Instance, 0)}
ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test")
baseSession.AddGroup(ultraGroup)

manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil)
coord := &Coordinator{
manager: manager,
baseSession: baseSession,
logger: logging.NopLogger(),
}

// Assign all 4 tasks concurrently from 4 different groups
var wg sync.WaitGroup
for i := 1; i <= 4; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
coord.AssignTaskInstance(fmt.Sprintf("task-%d", n), fmt.Sprintf("inst-%d", n))
}(i)
}
wg.Wait()

// CurrentGroup should be at the highest group index (3)
if ultraSession.CurrentGroup != 3 {
t.Errorf("CurrentGroup = %d after concurrent assigns, want 3", ultraSession.CurrentGroup)
}
})
}

// assertGroupContains is a test helper that asserts an instance ID exists
Expand Down
15 changes: 10 additions & 5 deletions internal/orchestrator/ultraplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,16 @@ type UltraPlanSession struct {
TaskToInstance map[string]string `json:"task_to_instance"` // PlannedTask.ID -> Instance.ID
CompletedTasks []string `json:"completed_tasks"`
FailedTasks []string `json:"failed_tasks"`
CurrentGroup int `json:"current_group"` // Index into ExecutionOrder
Created time.Time `json:"created"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Error string `json:"error,omitempty"` // Error message if failed
// CurrentGroup is the highest execution group index with running tasks.
// Monotonic high-water-mark: never decreases. The legacy path increments
// sequentially via AdvanceGroupIfComplete; the pipeline path may jump
// forward when tasks from later groups start (see AssignTaskInstance).
// The TUI uses this to auto-expand the active sidebar group.
CurrentGroup int `json:"current_group"`
Created time.Time `json:"created"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Error string `json:"error,omitempty"` // Error message if failed

// Revision state (persisted for recovery and display)
Revision *RevisionState `json:"revision,omitempty"`
Expand Down
Loading