Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ Patterns and conventions observed in this codebase that aren't covered by the ge
- **Per-role factory creation in bridgewire** — `PipelineExecutor.attachBridges` creates a *per-team* `instanceFactory` when `RoleOverrides` contains an entry for the team's role. The factory carries `ai.StartOptions` that flow through `Orchestrator.StartInstanceWithOverrides → newInstanceManager → ManagerOptions.StartOverrides → Manager.Start()`. The default shared factory is used for teams without role overrides.
- **Capture loop recovery pattern** — `Manager.captureLoop()` detects tmux server death at four distinct points (heartbeat check, session status query, unresponsive threshold, capture failure). All four sites call `attemptSessionRecovery()` before `handleSessionEnded()`. Recovery creates a fresh tmux session and resumes the Claude session via `--resume`. The persistent input handler auto-reconnects to the new session (same socket name) without explicit re-initialization.
- **Navigation must follow visual display order** — The ultraplan sidebar is rendered via `FlattenGroupsForDisplay` (group-structure order), but navigation used to use `getNavigableInstances` (plan-execution order). These orderings diverge because instances are added to groups in creation order, not plan order. Any keyboard navigation that moves between sidebar items must use `getInstanceDisplayOrder()` as its ordering source, filtered to the set of navigable items, to stay consistent with what the user sees.
- **Decomposer must union on dependency edges** — `pipeline.Decompose` groups tasks into teams via union-find. Each team's `TaskQueue.isClaimable()` resolves `DependsOn` only within its own task set — if a dep ID isn't in the local queue, the task is permanently blocked. The decomposer must union tasks along `DependsOn` edges (not just shared files) so all task-level dependencies are resolvable within one team.

---

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **Subprocess Mode** - Removed the experimental subprocess execution mode (`experimental.subprocess_mode`), including the `internal/streamjson/` package, `subprocessFactory`, and all related config/TUI/wiring plumbing. Pipeline instances now always use the tmux-based execution backend.

### Fixed
- **Pipeline Deadlock on Cross-Team Dependencies** - Fixed `pipeline.Decompose` only grouping tasks by shared files, ignoring `DependsOn` edges. When dependent tasks had disjoint files, they landed in separate teams whose `TaskQueue.isClaimable()` could never resolve the cross-queue dependency, permanently blocking the pipeline. The decomposer now unions tasks along dependency edges in addition to file edges, ensuring all task-level dependencies are resolvable within a single team.
- **Pipeline Execution Count Exceeds Total** - Fixed `exec 3/2` display bug where the task done count exceeded the total count. `UpdateTeamCompleted` overwrote `TasksDone`/`TasksFailed` with backend-authoritative values but left `TasksTotal` at the stale incremental count from bridge start events. Now reconciles `TasksTotal` and clears `ActiveTasks` on team completion.
- **Ultraplan h/l Navigation Reversed** - Fixed `h` and `l` keybindings navigating in the opposite visual direction in ultraplan mode with groups. Navigation used plan-execution order (`getNavigableInstances`) while the sidebar rendered in group-structure order (`FlattenGroupsForDisplay`), causing the two orderings to diverge. Navigation now follows the visual display order filtered to navigable instances.
- **Silent Plan Validation Failure** - Fixed `handlePlanFileCheckResult` silently swallowing `SetPlan` errors, leaving users stuck in a session that would never progress. Now sets an error message and transitions to `PhaseFailed`, matching the identical error handling in `handlePlanParsed`.
Expand Down
3 changes: 2 additions & 1 deletion internal/pipeline/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ See `doc.go` for package overview and API usage.
The pipeline package implements Phase 3 of the Orchestrator of Orchestrators. It decomposes a `PlanSpec` into teams and orchestrates multi-phase execution.

**Core Components:**
- **Decomposer** — Groups tasks by file affinity using union-find, producing `team.Spec` instances for the execution phase plus optional planning, review, and consolidation teams.
- **Decomposer** — Groups tasks by file affinity and dependency edges using union-find, producing `team.Spec` instances for the execution phase plus optional planning, review, and consolidation teams.
- **Pipeline** — Runs a multi-phase session (planning → execution → review → consolidation → done). Each phase creates its own `team.Manager`, registers teams, runs them to completion, and advances to the next phase.

**Phase Flow:**
Expand All @@ -34,6 +34,7 @@ Pipeline.Start(ctx)
- **Store Manager in map BEFORE publishing phase events** — `runPhase` must call `p.managers[phase] = mgr` before publishing `PipelinePhaseChangedEvent`. Event handlers may call `p.Manager(phase)` and get nil if the order is wrong.
- **Pipeline.run() goroutine must be tracked with WaitGroup** — `Stop()` calls `p.wg.Wait()` after cancelling context to guarantee the `run()` goroutine has exited. Without this, tests checking post-Stop state may race with the goroutine.
- **fail() must receive phasesRun from caller** — The `fail()` helper publishes a `PipelineCompletedEvent`. It accepts a `phasesRun int` parameter rather than computing it, because the `run()` function already tracks this counter incrementally and passing it avoids redundant (and possibly wrong) recalculation.
- **Decomposer must union on dependency edges, not just file edges** — Each team's `TaskQueue` resolves `DependsOn` only within its own task set (`isClaimable` does `q.tasks[depID]`). If a task in team B depends on a task in team A (different queues), the dependency is permanently unsatisfiable and the pipeline deadlocks. The decomposer unions tasks along `DependsOn` edges so all dependencies are resolvable within one team.

## Testing

Expand Down
29 changes: 25 additions & 4 deletions internal/pipeline/decompose.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func Decompose(plan *ultraplan.PlanSpec, cfg DecomposeConfig) (*DecomposeResult,

cfg = cfg.defaults()

groups := groupByFileAffinity(plan.Tasks)
groups := groupByAffinity(plan.Tasks)

// Apply MaxTeamSize: split oversized groups.
if cfg.MaxTeamSize > 0 {
Expand Down Expand Up @@ -91,16 +91,37 @@ func Decompose(plan *ultraplan.PlanSpec, cfg DecomposeConfig) (*DecomposeResult,
return result, nil
}

// groupByFileAffinity groups tasks by shared files using union-find.
// Returns a slice of groups, each group being a sorted slice of task IDs.
func groupByFileAffinity(tasks []ultraplan.PlannedTask) [][]string {
// groupByAffinity groups tasks by shared files and dependency edges
// using union-find. Tasks that share at least one file, or that have a
// direct dependency relationship, land in the same group.
//
// Dependency unioning is essential because each team's TaskQueue can only
// resolve dependencies within its own task set. If task B depends on task A
// but they share no files, without this union they land in separate teams
// and B's dependency is permanently unsatisfiable (isClaimable returns
// false because the dep ID is absent from the local queue).
func groupByAffinity(tasks []ultraplan.PlannedTask) [][]string {
ids := make([]string, len(tasks))
for i, t := range tasks {
ids[i] = t.ID
}

uf := newUnionFind(ids)

// Union tasks that share a dependency edge. This ensures all
// DependsOn references are resolvable within a single team's queue.
// Unknown dep IDs are skipped — upstream validation catches invalid
// references before Decompose is called; guarding here prevents
// silent union-find corruption via phantom "" root nodes.
for _, t := range tasks {
for _, depID := range t.DependsOn {
if _, ok := uf.parent[depID]; !ok {
continue
}
uf.Union(t.ID, depID)
}
}

// Build file → task ID index.
fileToTasks := make(map[string][]string)
for _, t := range tasks {
Expand Down
133 changes: 133 additions & 0 deletions internal/pipeline/decompose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,139 @@ func TestDecompose_DefaultTeamSizeZeroDefaultsToOne(t *testing.T) {
}
}

func TestDecompose_DependencyGrouping(t *testing.T) {
// t1 and t2 have disjoint files. t3 depends on t1 but shares no files.
// Without dependency-based unioning, t3 would land in a separate team
// and its dependency on t1 would be permanently unsatisfiable.
plan := &ultraplan.PlanSpec{
ID: "p1",
Tasks: []ultraplan.PlannedTask{
{ID: "t1", Title: "Task 1", Files: []string{"a.go"}},
{ID: "t2", Title: "Task 2", Files: []string{"b.go"}},
{ID: "t3", Title: "Task 3", Files: []string{"c.go"}, DependsOn: []string{"t1"}},
},
}

result, err := Decompose(plan, DecomposeConfig{})
if err != nil {
t.Fatalf("Decompose: %v", err)
}

// t1 and t3 must be in the same team (dependency edge).
// t2 is independent → separate team.
if len(result.ExecutionTeams) != 2 {
t.Fatalf("ExecutionTeams = %d, want 2", len(result.ExecutionTeams))
}

// Find the team containing t1.
var t1Team *team.Spec
for i := range result.ExecutionTeams {
for _, task := range result.ExecutionTeams[i].Tasks {
if task.ID == "t1" {
t1Team = &result.ExecutionTeams[i]
break
}
}
}
if t1Team == nil {
t.Fatal("could not find team containing t1")
}

ids := make(map[string]bool)
for _, task := range t1Team.Tasks {
ids[task.ID] = true
}
if !ids["t1"] || !ids["t3"] {
t.Errorf("dependency team should contain t1 and t3, got %v", ids)
}
if ids["t2"] {
t.Error("t2 should NOT be in t1's team (no shared files or deps)")
}
}

func TestDecompose_TransitiveDependencyGrouping(t *testing.T) {
// Chain: t3 → t2 → t1. No shared files. All must land in one team.
plan := &ultraplan.PlanSpec{
ID: "p1",
Tasks: []ultraplan.PlannedTask{
{ID: "t1", Title: "Task 1", Files: []string{"a.go"}},
{ID: "t2", Title: "Task 2", Files: []string{"b.go"}, DependsOn: []string{"t1"}},
{ID: "t3", Title: "Task 3", Files: []string{"c.go"}, DependsOn: []string{"t2"}},
},
}

result, err := Decompose(plan, DecomposeConfig{})
if err != nil {
t.Fatalf("Decompose: %v", err)
}

if len(result.ExecutionTeams) != 1 {
t.Fatalf("ExecutionTeams = %d, want 1 (transitive dependency chain)", len(result.ExecutionTeams))
}
if len(result.ExecutionTeams[0].Tasks) != 3 {
t.Errorf("tasks = %d, want 3", len(result.ExecutionTeams[0].Tasks))
}
}

func TestDecompose_DisjointDepsAndFiles(t *testing.T) {
// Reproduces the ultraplan stuck scenario: two independent group-1 tasks
// with disjoint files, and a group-2 task depending on both.
plan := &ultraplan.PlanSpec{
ID: "p1",
Tasks: []ultraplan.PlannedTask{
{ID: "t1-swipe", Title: "Swipe Framework", Files: []string{"SwipeSettings.swift"}},
{ID: "t2-graphql", Title: "GraphQL Codegen", Files: []string{"schema.graphql"}},
{ID: "t3-wiring", Title: "Wire It Up", Files: []string{"Wiring.swift"}, DependsOn: []string{"t1-swipe", "t2-graphql"}},
},
}

result, err := Decompose(plan, DecomposeConfig{})
if err != nil {
t.Fatalf("Decompose: %v", err)
}

// All three must be in one team because t3 depends on both t1 and t2.
if len(result.ExecutionTeams) != 1 {
t.Fatalf("ExecutionTeams = %d, want 1 (cross-dep grouping)", len(result.ExecutionTeams))
}
if len(result.ExecutionTeams[0].Tasks) != 3 {
t.Errorf("tasks = %d, want 3", len(result.ExecutionTeams[0].Tasks))
}
}

func TestDecompose_UnknownDependencyIgnored(t *testing.T) {
// DependsOn references a task ID not in the plan. The decomposer
// should skip the unknown dep rather than corrupting the union-find
// with a phantom "" root node.
plan := &ultraplan.PlanSpec{
ID: "p1",
Tasks: []ultraplan.PlannedTask{
{ID: "t1", Title: "Task 1", Files: []string{"a.go"}},
{ID: "t2", Title: "Task 2", Files: []string{"b.go"}, DependsOn: []string{"nonexistent"}},
},
}

result, err := Decompose(plan, DecomposeConfig{})
if err != nil {
t.Fatalf("Decompose: %v", err)
}

// t1 and t2 share no files and the unknown dep should be ignored,
// so they land in separate teams.
if len(result.ExecutionTeams) != 2 {
t.Fatalf("ExecutionTeams = %d, want 2 (unknown dep ignored)", len(result.ExecutionTeams))
}

// Verify both tasks are present (no silent drops).
totalTasks := 0
for _, team := range result.ExecutionTeams {
totalTasks += len(team.Tasks)
}
if totalTasks != 2 {
t.Errorf("total tasks = %d, want 2", totalTasks)
}
}

// -- Union-Find unit tests ---------------------------------------------------

func TestUnionFind_BasicOperations(t *testing.T) {
Expand Down
Loading