From 7231de72c1f3f21004d25bf3d580fce1345ad056 Mon Sep 17 00:00:00 2001 From: Hesham Salman Date: Fri, 20 Mar 2026 18:15:55 -0400 Subject: [PATCH] fix: restore task grouping in sidebar for pipeline execution path The pipeline/bridge execution path (default since #659) created instances via InstanceFactory.CreateInstance() but never added them to the ultraplan's InstanceGroup, causing execution tasks to appear ungrouped in the sidebar. Wire SessionRecorderDeps.OnAssign to Coordinator.AssignTaskInstance, which populates TaskToInstance (for correct subgroup routing) then adds the instance to the correct "Group N" subgroup. The method serializes under c.mu to prevent concurrent map read/write between parallel bridge goroutines. Also fix ordering in the legacy ExecutionOrchestrator path where AddInstanceToGroup was called before AssignTaskToInstance, causing determineSubgroupType to return Unknown instead of Execution. --- AGENTS.md | 2 + CHANGELOG.md | 1 + internal/cmd/session/pipeline_wire.go | 4 +- internal/orchestrator/coordinator.go | 46 +++++ internal/orchestrator/coordinator_test.go | 205 ++++++++++++++++++++++ internal/orchestrator/phase/execution.go | 9 +- internal/tui/pipeline_wire.go | 4 +- 7 files changed, 267 insertions(+), 4 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index b2230715..c01b2a5a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -345,6 +345,8 @@ These are real issues agents have encountered in this codebase. Package-specific - **Pause/resume symmetry in TUI update handlers** — When `HandleInstanceStubCreated` pauses the old active instance and switches to a new stub, all subsequent error paths (`HandleInstanceSetupComplete` setup failure, `StartInstance` failure) must call `ctx.ResumeActiveInstance()` to avoid leaving the previously-active instance permanently paused with a frozen display. - **Separate tracking for visible vs full captures** — The capture loop alternates between visible-only (cheap, no scrollback) and full (expensive, includes scrollback) tmux captures. Only full captures write to `outputBuf`. The change-detection variables must be independent (`lastVisibleOutput`, `lastFullOutput`) — a single shared variable causes cross-contamination where a visible capture sets the tracker, then the subsequent full capture (returning identical bytes when there's no scrollback) sees no change and skips the buffer write. - **Completion protocol must be in the user prompt, not just system prompt** — The bridge's `BuildTaskPrompt` must embed the sentinel file instructions directly in the task prompt. The `--append-system-prompt-file` injection in `bridgewire` provides defense-in-depth, but if it fails silently (wrong path, unsupported flag version, etc.), instances have no knowledge of the completion convention and tasks time out. The `completionFileName` constant in the bridge package is duplicated from `orchestrator/types.TaskCompletionFileName` to avoid import cycles — keep them in sync. +- **Populate TaskToInstance before group routing** — `addInstanceToSubgroup` calls `determineSubgroupType` which checks `session.TaskToInstance` to identify execution instances. If `AssignTaskToInstance` hasn't been called yet, the instance falls through to `SubgroupTypeUnknown` and lands at the parent group root instead of the correct "Group N" subgroup. Always call `AssignTaskToInstance` before `AddInstanceToGroup` or `addInstanceToSubgroup`. +- **Pipeline path must mirror ExecutionOrchestrator callbacks** — The pipeline/bridge execution path creates instances through `InstanceFactory.CreateInstance()`, bypassing `ExecutionOrchestrator.startTask()` entirely. Any state management that `startTask()` does (group assignment, TaskToInstance mapping) must be replicated via `SessionRecorderDeps` callbacks (`OnAssign`, `OnComplete`, `OnFailure`). When adding new instance lifecycle hooks to `startTask()`, check whether the pipeline path also needs them. --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e0e80ed..30ff840b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - **Missing Sentinel File in Pipeline Execution** - Fixed task instances not writing `.claudio-task-complete.json` in the Orchestration 2.0 pipeline path. The bridge's `BuildTaskPrompt` relied solely on `--append-system-prompt-file` to inject the completion protocol, which left instances unaware of the sentinel file convention. The completion protocol is now embedded directly in the task prompt as defense-in-depth. +- **Pipeline Task Grouping in Sidebar** - Fixed execution task instances appearing as ungrouped in the sidebar when using the pipeline execution path (Orchestration 2.0, the default since #659). The bridge's `SessionRecorder` was created with nil callbacks, so newly created instances were never added to the ultraplan's `InstanceGroup`. Wired the `OnAssign` callback to `Coordinator.AssignTaskInstance`, which populates `TaskToInstance` before routing the instance to the correct "Group N" subgroup. Also fixed a latent ordering bug in the legacy `ExecutionOrchestrator` path where `AddInstanceToGroup` was called before `AssignTaskToInstance`, causing instances to fall through to `SubgroupTypeUnknown`. - **Stale Display Early in Session** - Fixed capture loop never populating the output buffer when there's no scrollback (screen not yet full). A single `lastOutput` tracking variable was shared between visible-only and full captures; when a visible capture detected new content and set `lastOutput`, the subsequent forced full capture (which returns identical bytes when there's no scrollback) found no change and skipped the buffer write. Split into independent `lastVisibleOutput` and `lastFullOutput` so each capture type compares against its own history. - **Flaky `TestPipelineExecutor_E2E_AllPhases`** - Fixed race condition in `completeAllTeamTasks` test helper where `m.AllStatuses()` returned empty (teams not yet added) causing vacuous `allDone = true` and premature return without completing any tasks. The pipeline publishes `phase_changed` before `AddTeam`, so the test goroutine could race ahead of team registration (#685) - **Terminal Pane Alt+Backspace and Alt+Arrow Keys** - Fixed the terminal pane's key handler dropping the Alt modifier on Backspace and arrow keys, silently sending plain keystrokes instead of Alt-modified ones. The instance path already handled these correctly via `M-` prefix; the terminal path now sends `Escape` + base key to match its existing alt key pattern. diff --git a/internal/cmd/session/pipeline_wire.go b/internal/cmd/session/pipeline_wire.go index b775375d..534acd5b 100644 --- a/internal/cmd/session/pipeline_wire.go +++ b/internal/cmd/session/pipeline_wire.go @@ -12,7 +12,9 @@ import ( // equivalent TUI-side registration. func registerPipelineFactory(coordinator *orchestrator.Coordinator, orch *orchestrator.Orchestrator, logger *logging.Logger) { coordinator.SetPipelineFactory(func(deps orchestrator.PipelineRunnerDeps) (orchestrator.ExecutionRunner, error) { - recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{}) + recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{ + OnAssign: coordinator.AssignTaskInstance, + }) return bridgewire.NewPipelineRunner(bridgewire.PipelineRunnerConfig{ Orch: deps.Orch, Session: deps.Session, diff --git a/internal/orchestrator/coordinator.go b/internal/orchestrator/coordinator.go index e545a528..ac5de7ef 100644 --- a/internal/orchestrator/coordinator.go +++ b/internal/orchestrator/coordinator.go @@ -1001,6 +1001,52 @@ func (c *Coordinator) GetProgress() (completed, total int, phase UltraPlanPhase) return len(session.CompletedTasks), len(session.Plan.Tasks), session.Phase } +// AssignTaskInstance records the task-to-instance mapping and adds the instance +// to the appropriate ultraplan sidebar group. This is used by the pipeline +// execution path where the bridge creates instances outside the coordinator's +// ExecutionOrchestrator. The method first populates TaskToInstance so that +// subgroup routing (determineSubgroupType) can resolve the correct "Group N" +// subgroup, then adds the instance to that subgroup. +func (c *Coordinator) AssignTaskInstance(taskID, instanceID string) { + if c == nil || c.baseSession == nil || c.manager == nil { + return + } + + // Serialize the entire assign-then-route sequence under c.mu. + // addInstanceToSubgroup reads session.TaskToInstance (via determineSubgroupType) + // which is written by AssignTaskToInstance. Without this lock, two concurrent + // bridge goroutines can race: one writing the map while the other iterates it + // — a fatal runtime error in Go. + c.mu.Lock() + defer c.mu.Unlock() + + // Record the mapping so subgroup routing can determine the correct group + c.manager.AssignTaskToInstance(taskID, instanceID) + + // Add instance to the ultraplan group for sidebar display + session := c.manager.Session() + if session == nil { + c.logger.Debug("AssignTaskInstance: nil session, instance added to TaskToInstance but not grouped", + "task_id", taskID, "instance_id", instanceID) + return + } + sessionType := SessionTypeUltraPlan + if session.Config.MultiPass { + sessionType = SessionTypePlanMulti + } + ultraGroup := c.baseSession.GetGroupBySessionType(sessionType) + if ultraGroup == nil { + c.logger.Debug("AssignTaskInstance: no group found for session type, instance not grouped", + "task_id", taskID, "instance_id", instanceID, "session_type", string(sessionType)) + return + } + + // Route to the correct subgroup (e.g., "Group 1", "Group 2") + if !addInstanceToSubgroup(ultraGroup, session, instanceID) { + ultraGroup.AddInstance(instanceID) + } +} + // GetRunningTasks returns the currently running tasks and their instance IDs func (c *Coordinator) GetRunningTasks() map[string]string { c.mu.RLock() diff --git a/internal/orchestrator/coordinator_test.go b/internal/orchestrator/coordinator_test.go index b51e2543..bbd835de 100644 --- a/internal/orchestrator/coordinator_test.go +++ b/internal/orchestrator/coordinator_test.go @@ -3,7 +3,9 @@ package orchestrator import ( "context" "encoding/json" + "fmt" "strings" + "sync" "testing" "github.com/Iron-Ham/claudio/internal/event" @@ -2547,3 +2549,206 @@ func TestCoordinator_UsePipelineConfig(t *testing.T) { t.Error("UsePipeline should be true in config") } } + +func TestCoordinator_AssignTaskInstance(t *testing.T) { + t.Run("assigns task and adds instance to correct subgroup", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test objective", cfg) + ultraSession.Plan = &PlanSpec{ + Objective: "Test objective", + Tasks: []PlannedTask{ + {ID: "task-1", Title: "First task"}, + {ID: "task-2", Title: "Second task"}, + }, + ExecutionOrder: [][]string{{"task-1", "task-2"}}, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{ + Instances: make([]*Instance, 0), + } + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test objective") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + coord.AssignTaskInstance("task-1", "inst-1") + + // Verify TaskToInstance was populated + if ultraSession.TaskToInstance["task-1"] != "inst-1" { + t.Errorf("TaskToInstance[task-1] = %q, want %q", ultraSession.TaskToInstance["task-1"], "inst-1") + } + + // Verify instance was added to the group (in a "Group 1" subgroup) + assertGroupContains(t, ultraGroup, "inst-1") + + // Verify it was routed to the correct subgroup + if len(ultraGroup.SubGroups) == 0 { + t.Fatal("expected subgroups to be created") + } + subgroup := ultraGroup.SubGroups[0] + if subgroup.Name != "Group 1" { + t.Errorf("subgroup.Name = %q, want %q", subgroup.Name, "Group 1") + } + if !subgroup.HasInstance("inst-1") { + t.Error("inst-1 not found in Group 1 subgroup") + } + }) + + t.Run("handles nil coordinator gracefully", func(t *testing.T) { + var coord *Coordinator + // Should not panic + coord.AssignTaskInstance("task-1", "inst-1") + }) + + t.Run("handles nil baseSession gracefully", func(t *testing.T) { + coord := &Coordinator{} + // Should not panic — nil baseSession guard fires before any work + coord.AssignTaskInstance("task-1", "inst-1") + }) + + t.Run("handles no matching group gracefully", func(t *testing.T) { + ultraSession := NewUltraPlanSession("Test", DefaultUltraPlanConfig()) + baseSession := &Session{Instances: make([]*Instance, 0)} + // No group added → GetGroupBySessionType returns nil + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + coord.AssignTaskInstance("task-1", "inst-1") + + // TaskToInstance should be populated even though group routing failed + if ultraSession.TaskToInstance["task-1"] != "inst-1" { + t.Errorf("TaskToInstance[task-1] = %q, want %q", ultraSession.TaskToInstance["task-1"], "inst-1") + } + }) + + t.Run("falls back to parent group when subgroup routing fails", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test", cfg) + // No plan = no execution order, so subgroup routing will fail for execution type + // But TaskToInstance IS set, so determineSubgroupType returns SubgroupTypeExecution + // and getTaskGroupIndex returns -1, causing addInstanceToSubgroup to return false + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{ + Instances: make([]*Instance, 0), + } + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + coord.AssignTaskInstance("task-1", "inst-1") + + // TaskToInstance should still be populated + if ultraSession.TaskToInstance["task-1"] != "inst-1" { + t.Errorf("TaskToInstance[task-1] = %q, want %q", ultraSession.TaskToInstance["task-1"], "inst-1") + } + + // Instance should be in the group (at the parent level since routing fails) + assertGroupContains(t, ultraGroup, "inst-1") + }) + + t.Run("handles multi-pass session type", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + cfg.MultiPass = true + ultraSession := NewUltraPlanSession("Test", cfg) + ultraSession.Plan = &PlanSpec{ + ExecutionOrder: [][]string{{"task-1"}}, + Tasks: []PlannedTask{{ID: "task-1", Title: "First"}}, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{ + Instances: make([]*Instance, 0), + } + multiGroup := NewInstanceGroupWithType("Test", SessionTypePlanMulti, "Test") + baseSession.AddGroup(multiGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + coord.AssignTaskInstance("task-1", "inst-1") + + assertGroupContains(t, multiGroup, "inst-1") + }) + + t.Run("concurrent calls do not race", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test", cfg) + ultraSession.Plan = &PlanSpec{ + Objective: "Test", + Tasks: []PlannedTask{ + {ID: "task-1", Title: "T1"}, + {ID: "task-2", Title: "T2"}, + {ID: "task-3", Title: "T3"}, + {ID: "task-4", Title: "T4"}, + }, + ExecutionOrder: [][]string{{"task-1", "task-2", "task-3", "task-4"}}, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{Instances: make([]*Instance, 0)} + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + // Simulate concurrent bridge goroutines assigning tasks. + // Under -race this will detect any map read/write races. + var wg sync.WaitGroup + for i := 1; i <= 4; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + taskID := fmt.Sprintf("task-%d", n) + instID := fmt.Sprintf("inst-%d", n) + coord.AssignTaskInstance(taskID, instID) + }(i) + } + wg.Wait() + + // All 4 instances should be in the group + allIDs := ultraGroup.AllInstanceIDs() + if len(allIDs) != 4 { + t.Errorf("AllInstanceIDs() has %d entries, want 4", len(allIDs)) + } + for i := 1; i <= 4; i++ { + assertGroupContains(t, ultraGroup, fmt.Sprintf("inst-%d", i)) + } + }) +} + +// assertGroupContains is a test helper that asserts an instance ID exists +// in the group's AllInstanceIDs (including subgroups). +func assertGroupContains(t *testing.T, group *InstanceGroup, instanceID string) { + t.Helper() + for _, id := range group.AllInstanceIDs() { + if id == instanceID { + return + } + } + t.Errorf("%s not found in group %q AllInstanceIDs()", instanceID, group.Name) +} diff --git a/internal/orchestrator/phase/execution.go b/internal/orchestrator/phase/execution.go index 0d9dc0a7..ee333b2a 100644 --- a/internal/orchestrator/phase/execution.go +++ b/internal/orchestrator/phase/execution.go @@ -714,6 +714,13 @@ func (e *ExecutionOrchestrator) startTask(taskID string) error { // Get instance ID instanceID := e.getInstanceID(inst) + // Record the task-to-instance mapping BEFORE adding to the group. + // addInstanceToSubgroup calls determineSubgroupType which checks + // TaskToInstance to route execution instances to the correct "Group N" + // subgroup. Without this ordering, the instance falls through to + // SubgroupTypeUnknown and lands at the parent group root. + e.notifyTaskStart(taskID, instanceID) + // Add instance to the ultraplan group for sidebar display if e.execCtx != nil && e.execCtx.Coordinator != nil { isMultiPass := false @@ -734,8 +741,6 @@ func (e *ExecutionOrchestrator) startTask(taskID string) error { e.execCtx.Coordinator.AddRunningTask(taskID, instanceID) } - e.notifyTaskStart(taskID, instanceID) - // Start the instance if err := e.phaseCtx.Orchestrator.StartInstance(inst); err != nil { e.mu.Lock() diff --git a/internal/tui/pipeline_wire.go b/internal/tui/pipeline_wire.go index 29eaaeef..dc3974e2 100644 --- a/internal/tui/pipeline_wire.go +++ b/internal/tui/pipeline_wire.go @@ -13,7 +13,9 @@ import ( // orchestrator and bridgewire without creating an import cycle. func registerPipelineFactory(coordinator *orchestrator.Coordinator, orch *orchestrator.Orchestrator, logger *logging.Logger) { coordinator.SetPipelineFactory(func(deps orchestrator.PipelineRunnerDeps) (orchestrator.ExecutionRunner, error) { - recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{}) + recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{ + OnAssign: coordinator.AssignTaskInstance, + }) return bridgewire.NewPipelineRunner(bridgewire.PipelineRunnerConfig{ Orch: deps.Orch, Session: deps.Session,