Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ These are real issues agents have encountered in this codebase. Package-specific
- **Pause/resume symmetry in TUI update handlers** — When `HandleInstanceStubCreated` pauses the old active instance and switches to a new stub, all subsequent error paths (`HandleInstanceSetupComplete` setup failure, `StartInstance` failure) must call `ctx.ResumeActiveInstance()` to avoid leaving the previously-active instance permanently paused with a frozen display.
- **Separate tracking for visible vs full captures** — The capture loop alternates between visible-only (cheap, no scrollback) and full (expensive, includes scrollback) tmux captures. Only full captures write to `outputBuf`. The change-detection variables must be independent (`lastVisibleOutput`, `lastFullOutput`) — a single shared variable causes cross-contamination where a visible capture sets the tracker, then the subsequent full capture (returning identical bytes when there's no scrollback) sees no change and skips the buffer write.
- **Completion protocol must be in the user prompt, not just system prompt** — The bridge's `BuildTaskPrompt` must embed the sentinel file instructions directly in the task prompt. The `--append-system-prompt-file` injection in `bridgewire` provides defense-in-depth, but if it fails silently (wrong path, unsupported flag version, etc.), instances have no knowledge of the completion convention and tasks time out. The `completionFileName` constant in the bridge package is duplicated from `orchestrator/types.TaskCompletionFileName` to avoid import cycles — keep them in sync.
- **Populate TaskToInstance before group routing** — `addInstanceToSubgroup` calls `determineSubgroupType` which checks `session.TaskToInstance` to identify execution instances. If `AssignTaskToInstance` hasn't been called yet, the instance falls through to `SubgroupTypeUnknown` and lands at the parent group root instead of the correct "Group N" subgroup. Always call `AssignTaskToInstance` before `AddInstanceToGroup` or `addInstanceToSubgroup`.
- **Pipeline path must mirror ExecutionOrchestrator callbacks** — The pipeline/bridge execution path creates instances through `InstanceFactory.CreateInstance()`, bypassing `ExecutionOrchestrator.startTask()` entirely. Any state management that `startTask()` does (group assignment, TaskToInstance mapping) must be replicated via `SessionRecorderDeps` callbacks (`OnAssign`, `OnComplete`, `OnFailure`). When adding new instance lifecycle hooks to `startTask()`, check whether the pipeline path also needs them.

---

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed
- **Missing Sentinel File in Pipeline Execution** - Fixed task instances not writing `.claudio-task-complete.json` in the Orchestration 2.0 pipeline path. The bridge's `BuildTaskPrompt` relied solely on `--append-system-prompt-file` to inject the completion protocol, which left instances unaware of the sentinel file convention. The completion protocol is now embedded directly in the task prompt as defense-in-depth.
- **Pipeline Task Grouping in Sidebar** - Fixed execution task instances appearing as ungrouped in the sidebar when using the pipeline execution path (Orchestration 2.0, the default since #659). The bridge's `SessionRecorder` was created with nil callbacks, so newly created instances were never added to the ultraplan's `InstanceGroup`. Wired the `OnAssign` callback to `Coordinator.AssignTaskInstance`, which populates `TaskToInstance` before routing the instance to the correct "Group N" subgroup. Also fixed a latent ordering bug in the legacy `ExecutionOrchestrator` path where `AddInstanceToGroup` was called before `AssignTaskToInstance`, causing instances to fall through to `SubgroupTypeUnknown`.
- **Stale Display Early in Session** - Fixed capture loop never populating the output buffer when there's no scrollback (screen not yet full). A single `lastOutput` tracking variable was shared between visible-only and full captures; when a visible capture detected new content and set `lastOutput`, the subsequent forced full capture (which returns identical bytes when there's no scrollback) found no change and skipped the buffer write. Split into independent `lastVisibleOutput` and `lastFullOutput` so each capture type compares against its own history.
- **Flaky `TestPipelineExecutor_E2E_AllPhases`** - Fixed race condition in `completeAllTeamTasks` test helper where `m.AllStatuses()` returned empty (teams not yet added) causing vacuous `allDone = true` and premature return without completing any tasks. The pipeline publishes `phase_changed` before `AddTeam`, so the test goroutine could race ahead of team registration (#685)
- **Terminal Pane Alt+Backspace and Alt+Arrow Keys** - Fixed the terminal pane's key handler dropping the Alt modifier on Backspace and arrow keys, silently sending plain keystrokes instead of Alt-modified ones. The instance path already handled these correctly via `M-` prefix; the terminal path now sends `Escape` + base key to match its existing alt key pattern.
Expand Down
4 changes: 3 additions & 1 deletion internal/cmd/session/pipeline_wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
// equivalent TUI-side registration.
func registerPipelineFactory(coordinator *orchestrator.Coordinator, orch *orchestrator.Orchestrator, logger *logging.Logger) {
coordinator.SetPipelineFactory(func(deps orchestrator.PipelineRunnerDeps) (orchestrator.ExecutionRunner, error) {
recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{})
recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{
OnAssign: coordinator.AssignTaskInstance,
})
return bridgewire.NewPipelineRunner(bridgewire.PipelineRunnerConfig{
Orch: deps.Orch,
Session: deps.Session,
Expand Down
46 changes: 46 additions & 0 deletions internal/orchestrator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,52 @@ func (c *Coordinator) GetProgress() (completed, total int, phase UltraPlanPhase)
return len(session.CompletedTasks), len(session.Plan.Tasks), session.Phase
}

// AssignTaskInstance records the task-to-instance mapping and adds the instance
// to the appropriate ultraplan sidebar group. This is used by the pipeline
// execution path where the bridge creates instances outside the coordinator's
// ExecutionOrchestrator. The method first populates TaskToInstance so that
// subgroup routing (determineSubgroupType) can resolve the correct "Group N"
// subgroup, then adds the instance to that subgroup.
func (c *Coordinator) AssignTaskInstance(taskID, instanceID string) {
if c == nil || c.baseSession == nil || c.manager == nil {
return
}

// Serialize the entire assign-then-route sequence under c.mu.
// addInstanceToSubgroup reads session.TaskToInstance (via determineSubgroupType)
// which is written by AssignTaskToInstance. Without this lock, two concurrent
// bridge goroutines can race: one writing the map while the other iterates it
// — a fatal runtime error in Go.
c.mu.Lock()
defer c.mu.Unlock()

// Record the mapping so subgroup routing can determine the correct group
c.manager.AssignTaskToInstance(taskID, instanceID)

// Add instance to the ultraplan group for sidebar display
session := c.manager.Session()
if session == nil {
c.logger.Debug("AssignTaskInstance: nil session, instance added to TaskToInstance but not grouped",
"task_id", taskID, "instance_id", instanceID)
return
}
sessionType := SessionTypeUltraPlan
if session.Config.MultiPass {
sessionType = SessionTypePlanMulti
}
ultraGroup := c.baseSession.GetGroupBySessionType(sessionType)
if ultraGroup == nil {
c.logger.Debug("AssignTaskInstance: no group found for session type, instance not grouped",
"task_id", taskID, "instance_id", instanceID, "session_type", string(sessionType))
return
}

// Route to the correct subgroup (e.g., "Group 1", "Group 2")
if !addInstanceToSubgroup(ultraGroup, session, instanceID) {
ultraGroup.AddInstance(instanceID)
}
}

// GetRunningTasks returns the currently running tasks and their instance IDs
func (c *Coordinator) GetRunningTasks() map[string]string {
c.mu.RLock()
Expand Down
205 changes: 205 additions & 0 deletions internal/orchestrator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package orchestrator
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"testing"

"github.com/Iron-Ham/claudio/internal/event"
Expand Down Expand Up @@ -2547,3 +2549,206 @@ func TestCoordinator_UsePipelineConfig(t *testing.T) {
t.Error("UsePipeline should be true in config")
}
}

func TestCoordinator_AssignTaskInstance(t *testing.T) {
t.Run("assigns task and adds instance to correct subgroup", func(t *testing.T) {
cfg := DefaultUltraPlanConfig()
ultraSession := NewUltraPlanSession("Test objective", cfg)
ultraSession.Plan = &PlanSpec{
Objective: "Test objective",
Tasks: []PlannedTask{
{ID: "task-1", Title: "First task"},
{ID: "task-2", Title: "Second task"},
},
ExecutionOrder: [][]string{{"task-1", "task-2"}},
}
ultraSession.Phase = PhaseExecuting

baseSession := &Session{
Instances: make([]*Instance, 0),
}
ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test objective")
baseSession.AddGroup(ultraGroup)

manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil)
coord := &Coordinator{
manager: manager,
baseSession: baseSession,
logger: logging.NopLogger(),
}

coord.AssignTaskInstance("task-1", "inst-1")

// Verify TaskToInstance was populated
if ultraSession.TaskToInstance["task-1"] != "inst-1" {
t.Errorf("TaskToInstance[task-1] = %q, want %q", ultraSession.TaskToInstance["task-1"], "inst-1")
}

// Verify instance was added to the group (in a "Group 1" subgroup)
assertGroupContains(t, ultraGroup, "inst-1")

// Verify it was routed to the correct subgroup
if len(ultraGroup.SubGroups) == 0 {
t.Fatal("expected subgroups to be created")
}
subgroup := ultraGroup.SubGroups[0]
if subgroup.Name != "Group 1" {
t.Errorf("subgroup.Name = %q, want %q", subgroup.Name, "Group 1")
}
if !subgroup.HasInstance("inst-1") {
t.Error("inst-1 not found in Group 1 subgroup")
}
})

t.Run("handles nil coordinator gracefully", func(t *testing.T) {
var coord *Coordinator
// Should not panic
coord.AssignTaskInstance("task-1", "inst-1")
})

t.Run("handles nil baseSession gracefully", func(t *testing.T) {
coord := &Coordinator{}
// Should not panic — nil baseSession guard fires before any work
coord.AssignTaskInstance("task-1", "inst-1")
})

t.Run("handles no matching group gracefully", func(t *testing.T) {
ultraSession := NewUltraPlanSession("Test", DefaultUltraPlanConfig())
baseSession := &Session{Instances: make([]*Instance, 0)}
// No group added → GetGroupBySessionType returns nil
manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil)
coord := &Coordinator{
manager: manager,
baseSession: baseSession,
logger: logging.NopLogger(),
}
coord.AssignTaskInstance("task-1", "inst-1")

// TaskToInstance should be populated even though group routing failed
if ultraSession.TaskToInstance["task-1"] != "inst-1" {
t.Errorf("TaskToInstance[task-1] = %q, want %q", ultraSession.TaskToInstance["task-1"], "inst-1")
}
})

t.Run("falls back to parent group when subgroup routing fails", func(t *testing.T) {
cfg := DefaultUltraPlanConfig()
ultraSession := NewUltraPlanSession("Test", cfg)
// No plan = no execution order, so subgroup routing will fail for execution type
// But TaskToInstance IS set, so determineSubgroupType returns SubgroupTypeExecution
// and getTaskGroupIndex returns -1, causing addInstanceToSubgroup to return false
ultraSession.Phase = PhaseExecuting

baseSession := &Session{
Instances: make([]*Instance, 0),
}
ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test")
baseSession.AddGroup(ultraGroup)

manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil)
coord := &Coordinator{
manager: manager,
baseSession: baseSession,
logger: logging.NopLogger(),
}

coord.AssignTaskInstance("task-1", "inst-1")

// TaskToInstance should still be populated
if ultraSession.TaskToInstance["task-1"] != "inst-1" {
t.Errorf("TaskToInstance[task-1] = %q, want %q", ultraSession.TaskToInstance["task-1"], "inst-1")
}

// Instance should be in the group (at the parent level since routing fails)
assertGroupContains(t, ultraGroup, "inst-1")
})

t.Run("handles multi-pass session type", func(t *testing.T) {
cfg := DefaultUltraPlanConfig()
cfg.MultiPass = true
ultraSession := NewUltraPlanSession("Test", cfg)
ultraSession.Plan = &PlanSpec{
ExecutionOrder: [][]string{{"task-1"}},
Tasks: []PlannedTask{{ID: "task-1", Title: "First"}},
}
ultraSession.Phase = PhaseExecuting

baseSession := &Session{
Instances: make([]*Instance, 0),
}
multiGroup := NewInstanceGroupWithType("Test", SessionTypePlanMulti, "Test")
baseSession.AddGroup(multiGroup)

manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil)
coord := &Coordinator{
manager: manager,
baseSession: baseSession,
logger: logging.NopLogger(),
}

coord.AssignTaskInstance("task-1", "inst-1")

assertGroupContains(t, multiGroup, "inst-1")
})

t.Run("concurrent calls do not race", func(t *testing.T) {
cfg := DefaultUltraPlanConfig()
ultraSession := NewUltraPlanSession("Test", cfg)
ultraSession.Plan = &PlanSpec{
Objective: "Test",
Tasks: []PlannedTask{
{ID: "task-1", Title: "T1"},
{ID: "task-2", Title: "T2"},
{ID: "task-3", Title: "T3"},
{ID: "task-4", Title: "T4"},
},
ExecutionOrder: [][]string{{"task-1", "task-2", "task-3", "task-4"}},
}
ultraSession.Phase = PhaseExecuting

baseSession := &Session{Instances: make([]*Instance, 0)}
ultraGroup := NewInstanceGroupWithType("Test", SessionTypeUltraPlan, "Test")
baseSession.AddGroup(ultraGroup)

manager := NewUltraPlanManager(nil, baseSession, ultraSession, nil)
coord := &Coordinator{
manager: manager,
baseSession: baseSession,
logger: logging.NopLogger(),
}

// Simulate concurrent bridge goroutines assigning tasks.
// Under -race this will detect any map read/write races.
var wg sync.WaitGroup
for i := 1; i <= 4; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
taskID := fmt.Sprintf("task-%d", n)
instID := fmt.Sprintf("inst-%d", n)
coord.AssignTaskInstance(taskID, instID)
}(i)
}
wg.Wait()

// All 4 instances should be in the group
allIDs := ultraGroup.AllInstanceIDs()
if len(allIDs) != 4 {
t.Errorf("AllInstanceIDs() has %d entries, want 4", len(allIDs))
}
for i := 1; i <= 4; i++ {
assertGroupContains(t, ultraGroup, fmt.Sprintf("inst-%d", i))
}
})
}

// assertGroupContains is a test helper that asserts an instance ID exists
// in the group's AllInstanceIDs (including subgroups).
func assertGroupContains(t *testing.T, group *InstanceGroup, instanceID string) {
t.Helper()
for _, id := range group.AllInstanceIDs() {
if id == instanceID {
return
}
}
t.Errorf("%s not found in group %q AllInstanceIDs()", instanceID, group.Name)
}
9 changes: 7 additions & 2 deletions internal/orchestrator/phase/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,13 @@ func (e *ExecutionOrchestrator) startTask(taskID string) error {
// Get instance ID
instanceID := e.getInstanceID(inst)

// Record the task-to-instance mapping BEFORE adding to the group.
// addInstanceToSubgroup calls determineSubgroupType which checks
// TaskToInstance to route execution instances to the correct "Group N"
// subgroup. Without this ordering, the instance falls through to
// SubgroupTypeUnknown and lands at the parent group root.
e.notifyTaskStart(taskID, instanceID)

// Add instance to the ultraplan group for sidebar display
if e.execCtx != nil && e.execCtx.Coordinator != nil {
isMultiPass := false
Expand All @@ -734,8 +741,6 @@ func (e *ExecutionOrchestrator) startTask(taskID string) error {
e.execCtx.Coordinator.AddRunningTask(taskID, instanceID)
}

e.notifyTaskStart(taskID, instanceID)

// Start the instance
if err := e.phaseCtx.Orchestrator.StartInstance(inst); err != nil {
e.mu.Lock()
Expand Down
4 changes: 3 additions & 1 deletion internal/tui/pipeline_wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
// orchestrator and bridgewire without creating an import cycle.
func registerPipelineFactory(coordinator *orchestrator.Coordinator, orch *orchestrator.Orchestrator, logger *logging.Logger) {
coordinator.SetPipelineFactory(func(deps orchestrator.PipelineRunnerDeps) (orchestrator.ExecutionRunner, error) {
recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{})
recorder := bridgewire.NewSessionRecorder(bridgewire.SessionRecorderDeps{
OnAssign: coordinator.AssignTaskInstance,
})
return bridgewire.NewPipelineRunner(bridgewire.PipelineRunnerConfig{
Orch: deps.Orch,
Session: deps.Session,
Expand Down
Loading