From f602ba17a49da238b99a34ab56c3beeac5d647cc Mon Sep 17 00:00:00 2001 From: Hesham Salman Date: Mon, 23 Mar 2026 10:34:46 -0400 Subject: [PATCH] fix: advance CurrentGroup in pipeline path for sidebar navigation The pipeline/bridge execution path never updated session.CurrentGroup, leaving it at 0 permanently. This caused IsGroupCollapsed to return true for all groups except Group 1, hiding later groups from h/l navigation in the ultraplan sidebar. AssignTaskInstance now advances CurrentGroup forward (monotonic high-water-mark) when a task from a later execution group starts. This differs from the legacy path's sequential advancement via checkAndAdvanceGroup because the pipeline resolves dependencies at the task level, allowing tasks from multiple groups to run concurrently. --- AGENTS.md | 2 +- CHANGELOG.md | 1 + internal/orchestrator/coordinator.go | 21 +++ internal/orchestrator/coordinator_test.go | 165 ++++++++++++++++++++++ internal/orchestrator/ultraplan.go | 15 +- 5 files changed, 198 insertions(+), 6 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 85fec08..8a5c39b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -346,7 +346,7 @@ These are real issues agents have encountered in this codebase. Package-specific - **Separate tracking for visible vs full captures** — The capture loop alternates between visible-only (cheap, no scrollback) and full (expensive, includes scrollback) tmux captures. Only full captures write to `outputBuf`. The change-detection variables must be independent (`lastVisibleOutput`, `lastFullOutput`) — a single shared variable causes cross-contamination where a visible capture sets the tracker, then the subsequent full capture (returning identical bytes when there's no scrollback) sees no change and skips the buffer write. - **Completion protocol must be in the user prompt, not just system prompt** — The bridge's `BuildTaskPrompt` must embed the sentinel file instructions directly in the task prompt. The `--append-system-prompt-file` injection in `bridgewire` provides defense-in-depth, but if it fails silently (wrong path, unsupported flag version, etc.), instances have no knowledge of the completion convention and tasks time out. The `completionFileName` constant in the bridge package is duplicated from `orchestrator/types.TaskCompletionFileName` to avoid import cycles — keep them in sync. - **Populate TaskToInstance before group routing** — `addInstanceToSubgroup` calls `determineSubgroupType` which checks `session.TaskToInstance` to identify execution instances. If `AssignTaskToInstance` hasn't been called yet, the instance falls through to `SubgroupTypeUnknown` and lands at the parent group root instead of the correct "Group N" subgroup. Always call `AssignTaskToInstance` before `AddInstanceToGroup` or `addInstanceToSubgroup`. -- **Pipeline path must mirror ExecutionOrchestrator callbacks** — The pipeline/bridge execution path creates instances through `InstanceFactory.CreateInstance()`, bypassing `ExecutionOrchestrator.startTask()` entirely. Any state management that `startTask()` does (group assignment, TaskToInstance mapping) must be replicated via `SessionRecorderDeps` callbacks (`OnAssign`, `OnComplete`, `OnFailure`). When adding new instance lifecycle hooks to `startTask()`, check whether the pipeline path also needs them. +- **Pipeline path must mirror ExecutionOrchestrator callbacks** — The pipeline/bridge execution path creates instances through `InstanceFactory.CreateInstance()`, bypassing `ExecutionOrchestrator.startTask()` entirely. Any state management that `startTask()` does (group assignment, TaskToInstance mapping, `CurrentGroup` advancement) must be replicated via `SessionRecorderDeps` callbacks (`OnAssign`, `OnComplete`, `OnFailure`). When adding new instance lifecycle hooks to `startTask()`, check whether the pipeline path also needs them. `CurrentGroup` advancement is handled in `AssignTaskInstance` (monotonic high-water-mark), not via `checkAndAdvanceGroup()` which is only called in the legacy path. --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 238c02f..5fd137b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Subprocess Mode** - Removed the experimental subprocess execution mode (`experimental.subprocess_mode`), including the `internal/streamjson/` package, `subprocessFactory`, and all related config/TUI/wiring plumbing. Pipeline instances now always use the tmux-based execution backend. ### Fixed +- **Pipeline Group Navigation** - Fixed h/l navigation not reaching instances in later execution groups during pipeline execution. `CurrentGroup` was never advanced in the pipeline/bridge path, causing all groups except Group 1 to remain collapsed and non-navigable - **Pipeline Deadlock on Cross-Team Dependencies** - Fixed `pipeline.Decompose` only grouping tasks by shared files, ignoring `DependsOn` edges. When dependent tasks had disjoint files, they landed in separate teams whose `TaskQueue.isClaimable()` could never resolve the cross-queue dependency, permanently blocking the pipeline. The decomposer now unions tasks along dependency edges in addition to file edges, ensuring all task-level dependencies are resolvable within a single team. - **Pipeline Execution Count Exceeds Total** - Fixed `exec 3/2` display bug where the task done count exceeded the total count. `UpdateTeamCompleted` overwrote `TasksDone`/`TasksFailed` with backend-authoritative values but left `TasksTotal` at the stale incremental count from bridge start events. Now reconciles `TasksTotal` and clears `ActiveTasks` on team completion. - **Ultraplan h/l Navigation Reversed** - Fixed `h` and `l` keybindings navigating in the opposite visual direction in ultraplan mode with groups. Navigation used plan-execution order (`getNavigableInstances`) while the sidebar rendered in group-structure order (`FlattenGroupsForDisplay`), causing the two orderings to diverge. Navigation now follows the visual display order filtered to navigable instances. diff --git a/internal/orchestrator/coordinator.go b/internal/orchestrator/coordinator.go index ac5de7e..ddecfd6 100644 --- a/internal/orchestrator/coordinator.go +++ b/internal/orchestrator/coordinator.go @@ -1045,6 +1045,27 @@ func (c *Coordinator) AssignTaskInstance(taskID, instanceID string) { if !addInstanceToSubgroup(ultraGroup, session, instanceID) { ultraGroup.AddInstance(instanceID) } + + // Advance CurrentGroup if this task belongs to a later execution group. + // In the pipeline path, dependencies are resolved at the task level, so + // tasks from multiple groups can execute concurrently. The TUI uses + // CurrentGroup to auto-expand the active group in the sidebar and to + // determine which groups are navigable via h/l keys. Without this, + // CurrentGroup stays at 0 forever in the pipeline path because + // ExecutionOrchestrator.checkAndAdvanceGroup() is never called. + groupIdx := getTaskGroupIndex(session, instanceID) + if groupIdx < 0 { + c.logger.Debug("task not found in any execution group, CurrentGroup unchanged", + "task_id", taskID, "instance_id", instanceID) + } else if groupIdx > session.CurrentGroup { + c.logger.Info("advancing CurrentGroup for pipeline execution", + "task_id", taskID, + "instance_id", instanceID, + "from_group", session.CurrentGroup, + "to_group", groupIdx, + ) + session.CurrentGroup = groupIdx + } } // GetRunningTasks returns the currently running tasks and their instance IDs diff --git a/internal/orchestrator/coordinator_test.go b/internal/orchestrator/coordinator_test.go index bbd835d..5f05448 100644 --- a/internal/orchestrator/coordinator_test.go +++ b/internal/orchestrator/coordinator_test.go @@ -2690,6 +2690,123 @@ func TestCoordinator_AssignTaskInstance(t *testing.T) { assertGroupContains(t, multiGroup, "inst-1") }) + t.Run("advances CurrentGroup when task belongs to later group", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test", cfg) + ultraSession.Plan = &PlanSpec{ + Objective: "Test", + Tasks: []PlannedTask{ + {ID: "task-1", Title: "T1"}, + {ID: "task-2", Title: "T2"}, + {ID: "task-3", Title: "T3"}, + }, + ExecutionOrder: [][]string{ + {"task-1"}, // Group 1 (index 0) + {"task-2"}, // Group 2 (index 1) + {"task-3"}, // Group 3 (index 2) + }, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{Instances: make([]*Instance, 0)} + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + // Assign a Group 1 task — CurrentGroup should stay at 0 + coord.AssignTaskInstance("task-1", "inst-1") + if ultraSession.CurrentGroup != 0 { + t.Errorf("CurrentGroup = %d after Group 1 task, want 0", ultraSession.CurrentGroup) + } + + // Assign a Group 2 task — CurrentGroup should advance to 1 + coord.AssignTaskInstance("task-2", "inst-2") + if ultraSession.CurrentGroup != 1 { + t.Errorf("CurrentGroup = %d after Group 2 task, want 1", ultraSession.CurrentGroup) + } + + // Assign a Group 3 task — CurrentGroup should advance to 2 + coord.AssignTaskInstance("task-3", "inst-3") + if ultraSession.CurrentGroup != 2 { + t.Errorf("CurrentGroup = %d after Group 3 task, want 2", ultraSession.CurrentGroup) + } + }) + + t.Run("does not regress CurrentGroup when earlier group task starts", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test", cfg) + ultraSession.Plan = &PlanSpec{ + Objective: "Test", + Tasks: []PlannedTask{ + {ID: "task-1", Title: "T1"}, + {ID: "task-2", Title: "T2"}, + }, + ExecutionOrder: [][]string{ + {"task-1"}, // Group 1 (index 0) + {"task-2"}, // Group 2 (index 1) + }, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{Instances: make([]*Instance, 0)} + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + // Advance to Group 2 first + coord.AssignTaskInstance("task-2", "inst-2") + if ultraSession.CurrentGroup != 1 { + t.Fatalf("CurrentGroup = %d, want 1", ultraSession.CurrentGroup) + } + + // Assigning a Group 1 task should NOT regress CurrentGroup + coord.AssignTaskInstance("task-1", "inst-1") + if ultraSession.CurrentGroup != 1 { + t.Errorf("CurrentGroup = %d after earlier group task, want 1 (should not regress)", ultraSession.CurrentGroup) + } + }) + + t.Run("leaves CurrentGroup unchanged when task not in any group", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test", cfg) + // Plan has no execution order — getTaskGroupIndex returns -1 + ultraSession.Plan = &PlanSpec{ + Objective: "Test", + Tasks: []PlannedTask{{ID: "task-1", Title: "T1"}}, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{Instances: make([]*Instance, 0)} + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + coord.AssignTaskInstance("task-1", "inst-1") + + // CurrentGroup should remain 0 since getTaskGroupIndex returns -1 + if ultraSession.CurrentGroup != 0 { + t.Errorf("CurrentGroup = %d after unresolvable task, want 0", ultraSession.CurrentGroup) + } + }) + t.Run("concurrent calls do not race", func(t *testing.T) { cfg := DefaultUltraPlanConfig() ultraSession := NewUltraPlanSession("Test", cfg) @@ -2739,6 +2856,54 @@ func TestCoordinator_AssignTaskInstance(t *testing.T) { assertGroupContains(t, ultraGroup, fmt.Sprintf("inst-%d", i)) } }) + + t.Run("concurrent calls advance CurrentGroup correctly under race", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test", cfg) + ultraSession.Plan = &PlanSpec{ + Objective: "Test", + Tasks: []PlannedTask{ + {ID: "task-1", Title: "T1"}, + {ID: "task-2", Title: "T2"}, + {ID: "task-3", Title: "T3"}, + {ID: "task-4", Title: "T4"}, + }, + ExecutionOrder: [][]string{ + {"task-1"}, + {"task-2"}, + {"task-3"}, + {"task-4"}, + }, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{Instances: make([]*Instance, 0)} + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + // Assign all 4 tasks concurrently from 4 different groups + var wg sync.WaitGroup + for i := 1; i <= 4; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + coord.AssignTaskInstance(fmt.Sprintf("task-%d", n), fmt.Sprintf("inst-%d", n)) + }(i) + } + wg.Wait() + + // CurrentGroup should be at the highest group index (3) + if ultraSession.CurrentGroup != 3 { + t.Errorf("CurrentGroup = %d after concurrent assigns, want 3", ultraSession.CurrentGroup) + } + }) } // assertGroupContains is a test helper that asserts an instance ID exists diff --git a/internal/orchestrator/ultraplan.go b/internal/orchestrator/ultraplan.go index 5eae32b..194b8f7 100644 --- a/internal/orchestrator/ultraplan.go +++ b/internal/orchestrator/ultraplan.go @@ -394,11 +394,16 @@ type UltraPlanSession struct { TaskToInstance map[string]string `json:"task_to_instance"` // PlannedTask.ID -> Instance.ID CompletedTasks []string `json:"completed_tasks"` FailedTasks []string `json:"failed_tasks"` - CurrentGroup int `json:"current_group"` // Index into ExecutionOrder - Created time.Time `json:"created"` - StartedAt *time.Time `json:"started_at,omitempty"` - CompletedAt *time.Time `json:"completed_at,omitempty"` - Error string `json:"error,omitempty"` // Error message if failed + // CurrentGroup is the highest execution group index with running tasks. + // Monotonic high-water-mark: never decreases. The legacy path increments + // sequentially via AdvanceGroupIfComplete; the pipeline path may jump + // forward when tasks from later groups start (see AssignTaskInstance). + // The TUI uses this to auto-expand the active sidebar group. + CurrentGroup int `json:"current_group"` + Created time.Time `json:"created"` + StartedAt *time.Time `json:"started_at,omitempty"` + CompletedAt *time.Time `json:"completed_at,omitempty"` + Error string `json:"error,omitempty"` // Error message if failed // Revision state (persisted for recovery and display) Revision *RevisionState `json:"revision,omitempty"`