diff --git a/AGENTS.md b/AGENTS.md index b2230715..c01b2a5a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -345,6 +345,8 @@ These are real issues agents have encountered in this codebase. Package-specific - **Pause/resume symmetry in TUI update handlers** — When `HandleInstanceStubCreated` pauses the old active instance and switches to a new stub, all subsequent error paths (`HandleInstanceSetupComplete` setup failure, `StartInstance` failure) must call `ctx.ResumeActiveInstance()` to avoid leaving the previously-active instance permanently paused with a frozen display. - **Separate tracking for visible vs full captures** — The capture loop alternates between visible-only (cheap, no scrollback) and full (expensive, includes scrollback) tmux captures. Only full captures write to `outputBuf`. The change-detection variables must be independent (`lastVisibleOutput`, `lastFullOutput`) — a single shared variable causes cross-contamination where a visible capture sets the tracker, then the subsequent full capture (returning identical bytes when there's no scrollback) sees no change and skips the buffer write. - **Completion protocol must be in the user prompt, not just system prompt** — The bridge's `BuildTaskPrompt` must embed the sentinel file instructions directly in the task prompt. The `--append-system-prompt-file` injection in `bridgewire` provides defense-in-depth, but if it fails silently (wrong path, unsupported flag version, etc.), instances have no knowledge of the completion convention and tasks time out. The `completionFileName` constant in the bridge package is duplicated from `orchestrator/types.TaskCompletionFileName` to avoid import cycles — keep them in sync. +- **Populate TaskToInstance before group routing** — `addInstanceToSubgroup` calls `determineSubgroupType` which checks `session.TaskToInstance` to identify execution instances. If `AssignTaskToInstance` hasn't been called yet, the instance falls through to `SubgroupTypeUnknown` and lands at the parent group root instead of the correct "Group N" subgroup. Always call `AssignTaskToInstance` before `AddInstanceToGroup` or `addInstanceToSubgroup`. +- **Pipeline path must mirror ExecutionOrchestrator callbacks** — The pipeline/bridge execution path creates instances through `InstanceFactory.CreateInstance()`, bypassing `ExecutionOrchestrator.startTask()` entirely. Any state management that `startTask()` does (group assignment, TaskToInstance mapping) must be replicated via `SessionRecorderDeps` callbacks (`OnAssign`, `OnComplete`, `OnFailure`). When adding new instance lifecycle hooks to `startTask()`, check whether the pipeline path also needs them. --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e0e80ed..30ff840b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - **Missing Sentinel File in Pipeline Execution** - Fixed task instances not writing `.claudio-task-complete.json` in the Orchestration 2.0 pipeline path. The bridge's `BuildTaskPrompt` relied solely on `--append-system-prompt-file` to inject the completion protocol, which left instances unaware of the sentinel file convention. The completion protocol is now embedded directly in the task prompt as defense-in-depth. +- **Pipeline Task Grouping in Sidebar** - Fixed execution task instances appearing as ungrouped in the sidebar when using the pipeline execution path (Orchestration 2.0, the default since #659). The bridge's `SessionRecorder` was created with nil callbacks, so newly created instances were never added to the ultraplan's `InstanceGroup`. Wired the `OnAssign` callback to `Coordinator.AssignTaskInstance`, which populates `TaskToInstance` before routing the instance to the correct "Group N" subgroup. Also fixed a latent ordering bug in the legacy `ExecutionOrchestrator` path where `AddInstanceToGroup` was called before `AssignTaskToInstance`, causing instances to fall through to `SubgroupTypeUnknown`. - **Stale Display Early in Session** - Fixed capture loop never populating the output buffer when there's no scrollback (screen not yet full). A single `lastOutput` tracking variable was shared between visible-only and full captures; when a visible capture detected new content and set `lastOutput`, the subsequent forced full capture (which returns identical bytes when there's no scrollback) found no change and skipped the buffer write. Split into independent `lastVisibleOutput` and `lastFullOutput` so each capture type compares against its own history. - **Flaky `TestPipelineExecutor_E2E_AllPhases`** - Fixed race condition in `completeAllTeamTasks` test helper where `m.AllStatuses()` returned empty (teams not yet added) causing vacuous `allDone = true` and premature return without completing any tasks. The pipeline publishes `phase_changed` before `AddTeam`, so the test goroutine could race ahead of team registration (#685) - **Terminal Pane Alt+Backspace and Alt+Arrow Keys** - Fixed the terminal pane's key handler dropping the Alt modifier on Backspace and arrow keys, silently sending plain keystrokes instead of Alt-modified ones. The instance path already handled these correctly via `M-` prefix; the terminal path now sends `Escape` + base key to match its existing alt key pattern. diff --git a/internal/cmd/session/pipeline_wire.go b/internal/cmd/session/pipeline_wire.go index b775375d..534acd5b 100644 --- a/internal/cmd/session/pipeline_wire.go +++ b/internal/cmd/session/pipeline_wire.go @@ -12,7 +12,9 @@ import ( // equivalent TUI-side registration. func registerPipelineFactory(coordinator *orchestrator.Coordinator, orch *orchestrator.Orchestrator, logger *logging.Logger) { coordinator.SetPipelineFactory(func(deps orchestrator.PipelineRunnerDeps) (orchestrator.ExecutionRunner, error) { - recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{}) + recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{ + OnAssign: coordinator.AssignTaskInstance, + }) return bridgewire.NewPipelineRunner(bridgewire.PipelineRunnerConfig{ Orch: deps.Orch, Session: deps.Session, diff --git a/internal/orchestrator/coordinator.go b/internal/orchestrator/coordinator.go index e545a528..ac5de7ef 100644 --- a/internal/orchestrator/coordinator.go +++ b/internal/orchestrator/coordinator.go @@ -1001,6 +1001,52 @@ func (c *Coordinator) GetProgress() (completed, total int, phase UltraPlanPhase) return len(session.CompletedTasks), len(session.Plan.Tasks), session.Phase } +// AssignTaskInstance records the task-to-instance mapping and adds the instance +// to the appropriate ultraplan sidebar group. This is used by the pipeline +// execution path where the bridge creates instances outside the coordinator's +// ExecutionOrchestrator. The method first populates TaskToInstance so that +// subgroup routing (determineSubgroupType) can resolve the correct "Group N" +// subgroup, then adds the instance to that subgroup. +func (c *Coordinator) AssignTaskInstance(taskID, instanceID string) { + if c == nil || c.baseSession == nil || c.manager == nil { + return + } + + // Serialize the entire assign-then-route sequence under c.mu. + // addInstanceToSubgroup reads session.TaskToInstance (via determineSubgroupType) + // which is written by AssignTaskToInstance. Without this lock, two concurrent + // bridge goroutines can race: one writing the map while the other iterates it + // — a fatal runtime error in Go. + c.mu.Lock() + defer c.mu.Unlock() + + // Record the mapping so subgroup routing can determine the correct group + c.manager.AssignTaskToInstance(taskID, instanceID) + + // Add instance to the ultraplan group for sidebar display + session := c.manager.Session() + if session == nil { + c.logger.Debug("AssignTaskInstance: nil session, instance added to TaskToInstance but not grouped", + "task_id", taskID, "instance_id", instanceID) + return + } + sessionType := SessionTypeUltraPlan + if session.Config.MultiPass { + sessionType = SessionTypePlanMulti + } + ultraGroup := c.baseSession.GetGroupBySessionType(sessionType) + if ultraGroup == nil { + c.logger.Debug("AssignTaskInstance: no group found for session type, instance not grouped", + "task_id", taskID, "instance_id", instanceID, "session_type", string(sessionType)) + return + } + + // Route to the correct subgroup (e.g., "Group 1", "Group 2") + if !addInstanceToSubgroup(ultraGroup, session, instanceID) { + ultraGroup.AddInstance(instanceID) + } +} + // GetRunningTasks returns the currently running tasks and their instance IDs func (c *Coordinator) GetRunningTasks() map[string]string { c.mu.RLock() diff --git a/internal/orchestrator/coordinator_test.go b/internal/orchestrator/coordinator_test.go index b51e2543..bbd835de 100644 --- a/internal/orchestrator/coordinator_test.go +++ b/internal/orchestrator/coordinator_test.go @@ -3,7 +3,9 @@ package orchestrator import ( "context" "encoding/json" + "fmt" "strings" + "sync" "testing" "github.com/Iron-Ham/claudio/internal/event" @@ -2547,3 +2549,206 @@ func TestCoordinator_UsePipelineConfig(t *testing.T) { t.Error("UsePipeline should be true in config") } } + +func TestCoordinator_AssignTaskInstance(t *testing.T) { + t.Run("assigns task and adds instance to correct subgroup", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test objective", cfg) + ultraSession.Plan = &PlanSpec{ + Objective: "Test objective", + Tasks: []PlannedTask{ + {ID: "task-1", Title: "First task"}, + {ID: "task-2", Title: "Second task"}, + }, + ExecutionOrder: [][]string{{"task-1", "task-2"}}, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{ + Instances: make([]*Instance, 0), + } + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test objective") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + coord.AssignTaskInstance("task-1", "inst-1") + + // Verify TaskToInstance was populated + if ultraSession.TaskToInstance["task-1"] != "inst-1" { + t.Errorf("TaskToInstance[task-1] = %q, want %q", ultraSession.TaskToInstance["task-1"], "inst-1") + } + + // Verify instance was added to the group (in a "Group 1" subgroup) + assertGroupContains(t, ultraGroup, "inst-1") + + // Verify it was routed to the correct subgroup + if len(ultraGroup.SubGroups) == 0 { + t.Fatal("expected subgroups to be created") + } + subgroup := ultraGroup.SubGroups[0] + if subgroup.Name != "Group 1" { + t.Errorf("subgroup.Name = %q, want %q", subgroup.Name, "Group 1") + } + if !subgroup.HasInstance("inst-1") { + t.Error("inst-1 not found in Group 1 subgroup") + } + }) + + t.Run("handles nil coordinator gracefully", func(t *testing.T) { + var coord *Coordinator + // Should not panic + coord.AssignTaskInstance("task-1", "inst-1") + }) + + t.Run("handles nil baseSession gracefully", func(t *testing.T) { + coord := &Coordinator{} + // Should not panic — nil baseSession guard fires before any work + coord.AssignTaskInstance("task-1", "inst-1") + }) + + t.Run("handles no matching group gracefully", func(t *testing.T) { + ultraSession := NewUltraPlanSession("Test", DefaultUltraPlanConfig()) + baseSession := &Session{Instances: make([]*Instance, 0)} + // No group added → GetGroupBySessionType returns nil + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + coord.AssignTaskInstance("task-1", "inst-1") + + // TaskToInstance should be populated even though group routing failed + if ultraSession.TaskToInstance["task-1"] != "inst-1" { + t.Errorf("TaskToInstance[task-1] = %q, want %q", ultraSession.TaskToInstance["task-1"], "inst-1") + } + }) + + t.Run("falls back to parent group when subgroup routing fails", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test", cfg) + // No plan = no execution order, so subgroup routing will fail for execution type + // But TaskToInstance IS set, so determineSubgroupType returns SubgroupTypeExecution + // and getTaskGroupIndex returns -1, causing addInstanceToSubgroup to return false + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{ + Instances: make([]*Instance, 0), + } + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + coord.AssignTaskInstance("task-1", "inst-1") + + // TaskToInstance should still be populated + if ultraSession.TaskToInstance["task-1"] != "inst-1" { + t.Errorf("TaskToInstance[task-1] = %q, want %q", ultraSession.TaskToInstance["task-1"], "inst-1") + } + + // Instance should be in the group (at the parent level since routing fails) + assertGroupContains(t, ultraGroup, "inst-1") + }) + + t.Run("handles multi-pass session type", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + cfg.MultiPass = true + ultraSession := NewUltraPlanSession("Test", cfg) + ultraSession.Plan = &PlanSpec{ + ExecutionOrder: [][]string{{"task-1"}}, + Tasks: []PlannedTask{{ID: "task-1", Title: "First"}}, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{ + Instances: make([]*Instance, 0), + } + multiGroup := NewInstanceGroupWithType("Test", SessionTypePlanMulti, "Test") + baseSession.AddGroup(multiGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + coord.AssignTaskInstance("task-1", "inst-1") + + assertGroupContains(t, multiGroup, "inst-1") + }) + + t.Run("concurrent calls do not race", func(t *testing.T) { + cfg := DefaultUltraPlanConfig() + ultraSession := NewUltraPlanSession("Test", cfg) + ultraSession.Plan = &PlanSpec{ + Objective: "Test", + Tasks: []PlannedTask{ + {ID: "task-1", Title: "T1"}, + {ID: "task-2", Title: "T2"}, + {ID: "task-3", Title: "T3"}, + {ID: "task-4", Title: "T4"}, + }, + ExecutionOrder: [][]string{{"task-1", "task-2", "task-3", "task-4"}}, + } + ultraSession.Phase = PhaseExecuting + + baseSession := &Session{Instances: make([]*Instance, 0)} + ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test") + baseSession.AddGroup(ultraGroup) + + manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil) + coord := &Coordinator{ + manager: manager, + baseSession: baseSession, + logger: logging.NopLogger(), + } + + // Simulate concurrent bridge goroutines assigning tasks. + // Under -race this will detect any map read/write races. + var wg sync.WaitGroup + for i := 1; i <= 4; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + taskID := fmt.Sprintf("task-%d", n) + instID := fmt.Sprintf("inst-%d", n) + coord.AssignTaskInstance(taskID, instID) + }(i) + } + wg.Wait() + + // All 4 instances should be in the group + allIDs := ultraGroup.AllInstanceIDs() + if len(allIDs) != 4 { + t.Errorf("AllInstanceIDs() has %d entries, want 4", len(allIDs)) + } + for i := 1; i <= 4; i++ { + assertGroupContains(t, ultraGroup, fmt.Sprintf("inst-%d", i)) + } + }) +} + +// assertGroupContains is a test helper that asserts an instance ID exists +// in the group's AllInstanceIDs (including subgroups). +func assertGroupContains(t *testing.T, group *InstanceGroup, instanceID string) { + t.Helper() + for _, id := range group.AllInstanceIDs() { + if id == instanceID { + return + } + } + t.Errorf("%s not found in group %q AllInstanceIDs()", instanceID, group.Name) +} diff --git a/internal/orchestrator/phase/execution.go b/internal/orchestrator/phase/execution.go index 0d9dc0a7..ee333b2a 100644 --- a/internal/orchestrator/phase/execution.go +++ b/internal/orchestrator/phase/execution.go @@ -714,6 +714,13 @@ func (e *ExecutionOrchestrator) startTask(taskID string) error { // Get instance ID instanceID := e.getInstanceID(inst) + // Record the task-to-instance mapping BEFORE adding to the group. + // addInstanceToSubgroup calls determineSubgroupType which checks + // TaskToInstance to route execution instances to the correct "Group N" + // subgroup. Without this ordering, the instance falls through to + // SubgroupTypeUnknown and lands at the parent group root. + e.notifyTaskStart(taskID, instanceID) + // Add instance to the ultraplan group for sidebar display if e.execCtx != nil && e.execCtx.Coordinator != nil { isMultiPass := false @@ -734,8 +741,6 @@ func (e *ExecutionOrchestrator) startTask(taskID string) error { e.execCtx.Coordinator.AddRunningTask(taskID, instanceID) } - e.notifyTaskStart(taskID, instanceID) - // Start the instance if err := e.phaseCtx.Orchestrator.StartInstance(inst); err != nil { e.mu.Lock() diff --git a/internal/tui/pipeline_wire.go b/internal/tui/pipeline_wire.go index 29eaaeef..dc3974e2 100644 --- a/internal/tui/pipeline_wire.go +++ b/internal/tui/pipeline_wire.go @@ -13,7 +13,9 @@ import ( // orchestrator and bridgewire without creating an import cycle. func registerPipelineFactory(coordinator *orchestrator.Coordinator, orch *orchestrator.Orchestrator, logger *logging.Logger) { coordinator.SetPipelineFactory(func(deps orchestrator.PipelineRunnerDeps) (orchestrator.ExecutionRunner, error) { - recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{}) + recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{ + OnAssign: coordinator.AssignTaskInstance, + }) return bridgewire.NewPipelineRunner(bridgewire.PipelineRunnerConfig{ Orch: deps.Orch, Session: deps.Session,