diff --git a/AGENTS.md b/AGENTS.md index 85fec08..8a5c39b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -346,7 +346,7 @@ These are real issues agents have encountered in this codebase. Package-specific - **Separate tracking for visible vs full captures** — The capture loop alternates between visible-only (cheap, no scrollback) and full (expensive, includes scrollback) tmux captures. Only full captures write to `outputBuf`. The change-detection variables must be independent (`lastVisibleOutput`, `lastFullOutput`) — a single shared variable causes cross-contamination where a visible capture sets the tracker, then the subsequent full capture (returning identical bytes when there's no scrollback) sees no change and skips the buffer write. - **Completion protocol must be in the user prompt, not just system prompt** — The bridge's `BuildTaskPrompt` must embed the sentinel file instructions directly in the task prompt. The `--append-system-prompt-file` injection in `bridgewire` provides defense-in-depth, but if it fails silently (wrong path, unsupported flag version, etc.), instances have no knowledge of the completion convention and tasks time out. The `completionFileName` constant in the bridge package is duplicated from `orchestrator/types.TaskCompletionFileName` to avoid import cycles — keep them in sync. - **Populate TaskToInstance before group routing** — `addInstanceToSubgroup` calls `determineSubgroupType` which checks `session.TaskToInstance` to identify execution instances. If `AssignTaskToInstance` hasn't been called yet, the instance falls through to `SubgroupTypeUnknown` and lands at the parent group root instead of the correct "Group N" subgroup. Always call `AssignTaskToInstance` before `AddInstanceToGroup` or `addInstanceToSubgroup`. -- **Pipeline path must mirror ExecutionOrchestrator callbacks** — The pipeline/bridge execution path creates instances through `InstanceFactory.CreateInstance()`, bypassing `ExecutionOrchestrator.startTask()` entirely. Any state management that `startTask()` does (group assignment, TaskToInstance mapping) must be replicated via `SessionRecorderDeps` callbacks (`OnAssign`, `OnComplete`, `OnFailure`). When adding new instance lifecycle hooks to `startTask()`, check whether the pipeline path also needs them. +- **Pipeline path must mirror ExecutionOrchestrator callbacks** — The pipeline/bridge execution path creates instances through `InstanceFactory.CreateInstance()`, bypassing `ExecutionOrchestrator.startTask()` entirely. Any state management that `startTask()` does (group assignment, TaskToInstance mapping, `CurrentGroup` advancement) must be replicated via `SessionRecorderDeps` callbacks (`OnAssign`, `OnComplete`, `OnFailure`). When adding new instance lifecycle hooks to `startTask()`, check whether the pipeline path also needs them. `CurrentGroup` advancement is handled in `AssignTaskInstance` (monotonic high-water-mark), not via `checkAndAdvanceGroup()` which is only called in the legacy path. --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 238c02f..5fd137b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Subprocess Mode** - Removed the experimental subprocess execution mode (`experimental.subprocess_mode`), including the `internal/streamjson/` package, `subprocessFactory`, and all related config/TUI/wiring plumbing. Pipeline instances now always use the tmux-based execution backend. ### Fixed +- **Pipeline Group Navigation** - Fixed h/l navigation not reaching instances in later execution groups during pipeline execution. `CurrentGroup` was never advanced in the pipeline/bridge path, causing all groups except Group 1 to remain collapsed and non-navigable - **Pipeline Deadlock on Cross-Team Dependencies** - Fixed `pipeline.Decompose` only grouping tasks by shared files, ignoring `DependsOn` edges. When dependent tasks had disjoint files, they landed in separate teams whose `TaskQueue.isClaimable()` could never resolve the cross-queue dependency, permanently blocking the pipeline. The decomposer now unions tasks along dependency edges in addition to file edges, ensuring all task-level dependencies are resolvable within a single team. - **Pipeline Execution Count Exceeds Total** - Fixed `exec 3/2` display bug where the task done count exceeded the total count. `UpdateTeamCompleted` overwrote `TasksDone`/`TasksFailed` with backend-authoritative values but left `TasksTotal` at the stale incremental count from bridge start events. Now reconciles `TasksTotal` and clears `ActiveTasks` on team completion. - **Ultraplan h/l Navigation Reversed** - Fixed `h` and `l` keybindings navigating in the opposite visual direction in ultraplan mode with groups. Navigation used plan-execution order (`getNavigableInstances`) while the sidebar rendered in group-structure order (`FlattenGroupsForDisplay`), causing the two orderings to diverge. Navigation now follows the visual display order filtered to navigable instances. diff --git a/internal/orchestrator/coordinator.go b/internal/orchestrator/coordinator.go index ac5de7e..ddecfd6 100644 --- a/internal/orchestrator/coordinator.go +++ b/internal/orchestrator/coordinator.go @@ -1045,6 +1045,27 @@ func (c *Coordinator) AssignTaskInstance(taskID, instanceID string) { if !addInstanceToSubgroup(ultraGroup, session, instanceID) { ultraGroup.AddInstance(instanceID) } + + // Advance CurrentGroup if this task belongs to a later execution group. + // In the pipeline path, dependencies are resolved at the task level, so + // tasks from multiple groups can execute concurrently. The TUI uses + // CurrentGroup to auto-expand the active group in the sidebar and to + // determine which groups are navigable via h/l keys. Without this, + // CurrentGroup stays at 0 forever in the pipeline path because + // ExecutionOrchestrator.checkAndAdvanceGroup() is never called. + groupIdx := getTaskGroupIndex(session, instanceID) + if groupIdx < 0 { + c.logger.Debug("task not found in any execution group, CurrentGroup unchanged", + "task_id", taskID, "instance_id", instanceID) + } else if groupIdx > session.CurrentGroup { + c.logger.Info("advancing CurrentGroup for pipeline execution", + "task_id", taskID, + "instance_id", instanceID, + "from_group", session.CurrentGroup, + "to_group", groupIdx, + ) + session.CurrentGroup = groupIdx + } } // GetRunningTasks returns the currently running tasks and their instance IDs diff --git a/internal/orchestrator/coordinator_test.go b/internal/orchestrator/coordinator_test.go index bbd835d..5f05448 100644 --- a/internal/orchestrator/coordinator_test.go +++ b/internal/orchestrator/coordinator_test.go @@ -2690,6 +2690,123 @@ func TestCoordinator_AssignTaskInstance(t *testing.T) { assertGroupContains(t, multiGroup, "inst-1") }) + t.Run("advances CurrentGroup when task belongs to later group", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test", cfg) + ultraSession.Plan = &PlanSpec{ + Objective: "Test", + Tasks: []PlannedTask{ + {ID: "task-1", Title: "T1"}, + {ID: "task-2", Title: "T2"}, + {ID: "task-3", Title: "T3"}, + }, + ExecutionOrder: [][]string{ + {"task-1"}, // Group 1 (index 0) + {"task-2"}, // Group 2 (index 1) + {"task-3"}, // Group 3 (index 2) + }, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{Instances: make([]*Instance, 0)} + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + // Assign a Group 1 task — CurrentGroup should stay at 0 + coord.AssignTaskInstance("task-1", "inst-1") + if ultraSession.CurrentGroup != 0 { + t.Errorf("CurrentGroup = %d after Group 1 task, want 0", ultraSession.CurrentGroup) + } + + // Assign a Group 2 task — CurrentGroup should advance to 1 + coord.AssignTaskInstance("task-2", "inst-2") + if ultraSession.CurrentGroup != 1 { + t.Errorf("CurrentGroup = %d after Group 2 task, want 1", ultraSession.CurrentGroup) + } + + // Assign a Group 3 task — CurrentGroup should advance to 2 + coord.AssignTaskInstance("task-3", "inst-3") + if ultraSession.CurrentGroup != 2 { + t.Errorf("CurrentGroup = %d after Group 3 task, want 2", ultraSession.CurrentGroup) + } + }) + + t.Run("does not regress CurrentGroup when earlier group task starts", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test", cfg) + ultraSession.Plan = &PlanSpec{ + Objective: "Test", + Tasks: []PlannedTask{ + {ID: "task-1", Title: "T1"}, + {ID: "task-2", Title: "T2"}, + }, + ExecutionOrder: [][]string{ + {"task-1"}, // Group 1 (index 0) + {"task-2"}, // Group 2 (index 1) + }, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{Instances: make([]*Instance, 0)} + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + // Advance to Group 2 first + coord.AssignTaskInstance("task-2", "inst-2") + if ultraSession.CurrentGroup != 1 { + t.Fatalf("CurrentGroup = %d, want 1", ultraSession.CurrentGroup) + } + + // Assigning a Group 1 task should NOT regress CurrentGroup + coord.AssignTaskInstance("task-1", "inst-1") + if ultraSession.CurrentGroup != 1 { + t.Errorf("CurrentGroup = %d after earlier group task, want 1 (should not regress)", ultraSession.CurrentGroup) + } + }) + + t.Run("leaves CurrentGroup unchanged when task not in any group", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test", cfg) + // Plan has no execution order — getTaskGroupIndex returns -1 + ultraSession.Plan = &PlanSpec{ + Objective: "Test", + Tasks: []PlannedTask{{ID: "task-1", Title: "T1"}}, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{Instances: make([]*Instance, 0)} + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + coord.AssignTaskInstance("task-1", "inst-1") + + // CurrentGroup should remain 0 since getTaskGroupIndex returns -1 + if ultraSession.CurrentGroup != 0 { + t.Errorf("CurrentGroup = %d after unresolvable task, want 0", ultraSession.CurrentGroup) + } + }) + t.Run("concurrent calls do not race", func(t *testing.T) { cfg := DefaultUltraPlanConfig() ultraSession := NewUltraPlanSession("Test", cfg) @@ -2739,6 +2856,54 @@ func TestCoordinator_AssignTaskInstance(t *testing.T) { assertGroupContains(t, ultraGroup, fmt.Sprintf("inst-%d", i)) } }) + + t.Run("concurrent calls advance CurrentGroup correctly under race", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test", cfg) + ultraSession.Plan = &PlanSpec{ + Objective: "Test", + Tasks: []PlannedTask{ + {ID: "task-1", Title: "T1"}, + {ID: "task-2", Title: "T2"}, + {ID: "task-3", Title: "T3"}, + {ID: "task-4", Title: "T4"}, + }, + ExecutionOrder: [][]string{ + {"task-1"}, + {"task-2"}, + {"task-3"}, + {"task-4"}, + }, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{Instances: make([]*Instance, 0)} + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + // Assign all 4 tasks concurrently from 4 different groups + var wg sync.WaitGroup + for i := 1; i <= 4; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + coord.AssignTaskInstance(fmt.Sprintf("task-%d", n), fmt.Sprintf("inst-%d", n)) + }(i) + } + wg.Wait() + + // CurrentGroup should be at the highest group index (3) + if ultraSession.CurrentGroup != 3 { + t.Errorf("CurrentGroup = %d after concurrent assigns, want 3", ultraSession.CurrentGroup) + } + }) } // assertGroupContains is a test helper that asserts an instance ID exists diff --git a/internal/orchestrator/ultraplan.go b/internal/orchestrator/ultraplan.go index 5eae32b..194b8f7 100644 --- a/internal/orchestrator/ultraplan.go +++ b/internal/orchestrator/ultraplan.go @@ -394,11 +394,16 @@ type UltraPlanSession struct { TaskToInstance map[string]string `json:"task_to_instance"` // PlannedTask.ID -> Instance.ID CompletedTasks []string `json:"completed_tasks"` FailedTasks []string `json:"failed_tasks"` - CurrentGroup int `json:"current_group"` // Index into ExecutionOrder - Created time.Time `json:"created"` - StartedAt *time.Time `json:"started_at,omitempty"` - CompletedAt *time.Time `json:"completed_at,omitempty"` - Error string `json:"error,omitempty"` // Error message if failed + // CurrentGroup is the highest execution group index with running tasks. + // Monotonic high-water-mark: never decreases. The legacy path increments + // sequentially via AdvanceGroupIfComplete; the pipeline path may jump + // forward when tasks from later groups start (see AssignTaskInstance). + // The TUI uses this to auto-expand the active sidebar group. + CurrentGroup int `json:"current_group"` + Created time.Time `json:"created"` + StartedAt *time.Time `json:"started_at,omitempty"` + CompletedAt *time.Time `json:"completed_at,omitempty"` + Error string `json:"error,omitempty"` // Error message if failed // Revision state (persisted for recovery and display) Revision *RevisionState `json:"revision,omitempty"`