From 3f7f1e786db219bba4d4658ab65587b779ee4d73 Mon Sep 17 00:00:00 2001 From: Hesham Salman Date: Fri, 20 Mar 2026 23:39:25 -0400 Subject: [PATCH] fix: resolve pipeline deadlock on cross-team task dependencies The decomposer (pipeline.Decompose) grouped tasks into execution teams using only file-affinity via union-find. Tasks with DependsOn relationships but disjoint files landed in separate teams. Since each team's TaskQueue.isClaimable() only resolves dependencies within its own task set, cross-team dependencies were permanently unsatisfiable, deadlocking the pipeline after the first dependency group completed. Fix: union tasks along DependsOn edges in addition to shared-file edges, ensuring all task-level dependencies are resolvable within a single team's queue. Guard against unknown dep IDs to prevent silent union-find corruption via phantom root nodes. Rename groupByFileAffinity -> groupByAffinity to reflect expanded responsibility. --- AGENTS.md | 1 + CHANGELOG.md | 1 + internal/pipeline/AGENTS.md | 3 +- internal/pipeline/decompose.go | 29 +++++- internal/pipeline/decompose_test.go | 133 ++++++++++++++++++++++++++++ 5 files changed, 162 insertions(+), 5 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 867d3410..85fec081 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -366,6 +366,7 @@ Patterns and conventions observed in this codebase that aren't covered by the ge - **Per-role factory creation in bridgewire** — `PipelineExecutor.attachBridges` creates a *per-team* `instanceFactory` when `RoleOverrides` contains an entry for the team's role. The factory carries `ai.StartOptions` that flow through `Orchestrator.StartInstanceWithOverrides → newInstanceManager → ManagerOptions.StartOverrides → Manager.Start()`. The default shared factory is used for teams without role overrides. - **Capture loop recovery pattern** — `Manager.captureLoop()` detects tmux server death at four distinct points (heartbeat check, session status query, unresponsive threshold, capture failure). All four sites call `attemptSessionRecovery()` before `handleSessionEnded()`. Recovery creates a fresh tmux session and resumes the Claude session via `--resume`. The persistent input handler auto-reconnects to the new session (same socket name) without explicit re-initialization. - **Navigation must follow visual display order** — The ultraplan sidebar is rendered via `FlattenGroupsForDisplay` (group-structure order), but navigation used to use `getNavigableInstances` (plan-execution order). These orderings diverge because instances are added to groups in creation order, not plan order. Any keyboard navigation that moves between sidebar items must use `getInstanceDisplayOrder()` as its ordering source, filtered to the set of navigable items, to stay consistent with what the user sees. +- **Decomposer must union on dependency edges** — `pipeline.Decompose` groups tasks into teams via union-find. Each team's `TaskQueue.isClaimable()` resolves `DependsOn` only within its own task set — if a dep ID isn't in the local queue, the task is permanently blocked. The decomposer must union tasks along `DependsOn` edges (not just shared files) so all task-level dependencies are resolvable within one team. --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 37579bb8..238c02f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Subprocess Mode** - Removed the experimental subprocess execution mode (`experimental.subprocess_mode`), including the `internal/streamjson/` package, `subprocessFactory`, and all related config/TUI/wiring plumbing. Pipeline instances now always use the tmux-based execution backend. ### Fixed +- **Pipeline Deadlock on Cross-Team Dependencies** - Fixed `pipeline.Decompose` only grouping tasks by shared files, ignoring `DependsOn` edges. When dependent tasks had disjoint files, they landed in separate teams whose `TaskQueue.isClaimable()` could never resolve the cross-queue dependency, permanently blocking the pipeline. The decomposer now unions tasks along dependency edges in addition to file edges, ensuring all task-level dependencies are resolvable within a single team. - **Pipeline Execution Count Exceeds Total** - Fixed `exec 3/2` display bug where the task done count exceeded the total count. `UpdateTeamCompleted` overwrote `TasksDone`/`TasksFailed` with backend-authoritative values but left `TasksTotal` at the stale incremental count from bridge start events. Now reconciles `TasksTotal` and clears `ActiveTasks` on team completion. - **Ultraplan h/l Navigation Reversed** - Fixed `h` and `l` keybindings navigating in the opposite visual direction in ultraplan mode with groups. Navigation used plan-execution order (`getNavigableInstances`) while the sidebar rendered in group-structure order (`FlattenGroupsForDisplay`), causing the two orderings to diverge. Navigation now follows the visual display order filtered to navigable instances. - **Silent Plan Validation Failure** - Fixed `handlePlanFileCheckResult` silently swallowing `SetPlan` errors, leaving users stuck in a session that would never progress. Now sets an error message and transitions to `PhaseFailed`, matching the identical error handling in `handlePlanParsed`. diff --git a/internal/pipeline/AGENTS.md b/internal/pipeline/AGENTS.md index e5e6d04c..a610e02e 100644 --- a/internal/pipeline/AGENTS.md +++ b/internal/pipeline/AGENTS.md @@ -10,7 +10,7 @@ See `doc.go` for package overview and API usage. The pipeline package implements Phase 3 of the Orchestrator of Orchestrators. It decomposes a `PlanSpec` into teams and orchestrates multi-phase execution. **Core Components:** -- **Decomposer** — Groups tasks by file affinity using union-find, producing `team.Spec` instances for the execution phase plus optional planning, review, and consolidation teams. +- **Decomposer** — Groups tasks by file affinity and dependency edges using union-find, producing `team.Spec` instances for the execution phase plus optional planning, review, and consolidation teams. - **Pipeline** — Runs a multi-phase session (planning → execution → review → consolidation → done). Each phase creates its own `team.Manager`, registers teams, runs them to completion, and advances to the next phase. **Phase Flow:** @@ -34,6 +34,7 @@ Pipeline.Start(ctx) - **Store Manager in map BEFORE publishing phase events** — `runPhase` must call `p.managers[phase] = mgr` before publishing `PipelinePhaseChangedEvent`. Event handlers may call `p.Manager(phase)` and get nil if the order is wrong. - **Pipeline.run() goroutine must be tracked with WaitGroup** — `Stop()` calls `p.wg.Wait()` after cancelling context to guarantee the `run()` goroutine has exited. Without this, tests checking post-Stop state may race with the goroutine. - **fail() must receive phasesRun from caller** — The `fail()` helper publishes a `PipelineCompletedEvent`. It accepts a `phasesRun int` parameter rather than computing it, because the `run()` function already tracks this counter incrementally and passing it avoids redundant (and possibly wrong) recalculation. +- **Decomposer must union on dependency edges, not just file edges** — Each team's `TaskQueue` resolves `DependsOn` only within its own task set (`isClaimable` does `q.tasks[depID]`). If a task in team B depends on a task in team A (different queues), the dependency is permanently unsatisfiable and the pipeline deadlocks. The decomposer unions tasks along `DependsOn` edges so all dependencies are resolvable within one team. ## Testing diff --git a/internal/pipeline/decompose.go b/internal/pipeline/decompose.go index 30b0c4fd..ae5acde5 100644 --- a/internal/pipeline/decompose.go +++ b/internal/pipeline/decompose.go @@ -24,7 +24,7 @@ func Decompose(plan *ultraplan.PlanSpec, cfg DecomposeConfig) (*DecomposeResult, cfg = cfg.defaults() - groups := groupByFileAffinity(plan.Tasks) + groups := groupByAffinity(plan.Tasks) // Apply MaxTeamSize: split oversized groups. if cfg.MaxTeamSize > 0 { @@ -91,9 +91,16 @@ func Decompose(plan *ultraplan.PlanSpec, cfg DecomposeConfig) (*DecomposeResult, return result, nil } -// groupByFileAffinity groups tasks by shared files using union-find. -// Returns a slice of groups, each group being a sorted slice of task IDs. -func groupByFileAffinity(tasks []ultraplan.PlannedTask) [][]string { +// groupByAffinity groups tasks by shared files and dependency edges +// using union-find. Tasks that share at least one file, or that have a +// direct dependency relationship, land in the same group. +// +// Dependency unioning is essential because each team's TaskQueue can only +// resolve dependencies within its own task set. If task B depends on task A +// but they share no files, without this union they land in separate teams +// and B's dependency is permanently unsatisfiable (isClaimable returns +// false because the dep ID is absent from the local queue). +func groupByAffinity(tasks []ultraplan.PlannedTask) [][]string { ids := make([]string, len(tasks)) for i, t := range tasks { ids[i] = t.ID @@ -101,6 +108,20 @@ func groupByFileAffinity(tasks []ultraplan.PlannedTask) [][]string { uf := newUnionFind(ids) + // Union tasks that share a dependency edge. This ensures all + // DependsOn references are resolvable within a single team's queue. + // Unknown dep IDs are skipped — upstream validation catches invalid + // references before Decompose is called; guarding here prevents + // silent union-find corruption via phantom "" root nodes. + for _, t := range tasks { + for _, depID := range t.DependsOn { + if _, ok := uf.parent[depID]; !ok { + continue + } + uf.Union(t.ID, depID) + } + } + // Build file → task ID index. fileToTasks := make(map[string][]string) for _, t := range tasks { diff --git a/internal/pipeline/decompose_test.go b/internal/pipeline/decompose_test.go index a1e9a865..36dcfe5d 100644 --- a/internal/pipeline/decompose_test.go +++ b/internal/pipeline/decompose_test.go @@ -500,6 +500,139 @@ func TestDecompose_DefaultTeamSizeZeroDefaultsToOne(t *testing.T) { } } +func TestDecompose_DependencyGrouping(t *testing.T) { + // t1 and t2 have disjoint files. t3 depends on t1 but shares no files. + // Without dependency-based unioning, t3 would land in a separate team + // and its dependency on t1 would be permanently unsatisfiable. + plan := &ultraplan.PlanSpec{ + ID: "p1", + Tasks: []ultraplan.PlannedTask{ + {ID: "t1", Title: "Task 1", Files: []string{"a.go"}}, + {ID: "t2", Title: "Task 2", Files: []string{"b.go"}}, + {ID: "t3", Title: "Task 3", Files: []string{"c.go"}, DependsOn: []string{"t1"}}, + }, + } + + result, err := Decompose(plan, DecomposeConfig{}) + if err != nil { + t.Fatalf("Decompose: %v", err) + } + + // t1 and t3 must be in the same team (dependency edge). + // t2 is independent → separate team. + if len(result.ExecutionTeams) != 2 { + t.Fatalf("ExecutionTeams = %d, want 2", len(result.ExecutionTeams)) + } + + // Find the team containing t1. + var t1Team *team.Spec + for i := range result.ExecutionTeams { + for _, task := range result.ExecutionTeams[i].Tasks { + if task.ID == "t1" { + t1Team = &result.ExecutionTeams[i] + break + } + } + } + if t1Team == nil { + t.Fatal("could not find team containing t1") + } + + ids := make(map[string]bool) + for _, task := range t1Team.Tasks { + ids[task.ID] = true + } + if !ids["t1"] || !ids["t3"] { + t.Errorf("dependency team should contain t1 and t3, got %v", ids) + } + if ids["t2"] { + t.Error("t2 should NOT be in t1's team (no shared files or deps)") + } +} + +func TestDecompose_TransitiveDependencyGrouping(t *testing.T) { + // Chain: t3 → t2 → t1. No shared files. All must land in one team. + plan := &ultraplan.PlanSpec{ + ID: "p1", + Tasks: []ultraplan.PlannedTask{ + {ID: "t1", Title: "Task 1", Files: []string{"a.go"}}, + {ID: "t2", Title: "Task 2", Files: []string{"b.go"}, DependsOn: []string{"t1"}}, + {ID: "t3", Title: "Task 3", Files: []string{"c.go"}, DependsOn: []string{"t2"}}, + }, + } + + result, err := Decompose(plan, DecomposeConfig{}) + if err != nil { + t.Fatalf("Decompose: %v", err) + } + + if len(result.ExecutionTeams) != 1 { + t.Fatalf("ExecutionTeams = %d, want 1 (transitive dependency chain)", len(result.ExecutionTeams)) + } + if len(result.ExecutionTeams[0].Tasks) != 3 { + t.Errorf("tasks = %d, want 3", len(result.ExecutionTeams[0].Tasks)) + } +} + +func TestDecompose_DisjointDepsAndFiles(t *testing.T) { + // Reproduces the ultraplan stuck scenario: two independent group-1 tasks + // with disjoint files, and a group-2 task depending on both. + plan := &ultraplan.PlanSpec{ + ID: "p1", + Tasks: []ultraplan.PlannedTask{ + {ID: "t1-swipe", Title: "Swipe Framework", Files: []string{"SwipeSettings.swift"}}, + {ID: "t2-graphql", Title: "GraphQL Codegen", Files: []string{"schema.graphql"}}, + {ID: "t3-wiring", Title: "Wire It Up", Files: []string{"Wiring.swift"}, DependsOn: []string{"t1-swipe", "t2-graphql"}}, + }, + } + + result, err := Decompose(plan, DecomposeConfig{}) + if err != nil { + t.Fatalf("Decompose: %v", err) + } + + // All three must be in one team because t3 depends on both t1 and t2. + if len(result.ExecutionTeams) != 1 { + t.Fatalf("ExecutionTeams = %d, want 1 (cross-dep grouping)", len(result.ExecutionTeams)) + } + if len(result.ExecutionTeams[0].Tasks) != 3 { + t.Errorf("tasks = %d, want 3", len(result.ExecutionTeams[0].Tasks)) + } +} + +func TestDecompose_UnknownDependencyIgnored(t *testing.T) { + // DependsOn references a task ID not in the plan. The decomposer + // should skip the unknown dep rather than corrupting the union-find + // with a phantom "" root node. + plan := &ultraplan.PlanSpec{ + ID: "p1", + Tasks: []ultraplan.PlannedTask{ + {ID: "t1", Title: "Task 1", Files: []string{"a.go"}}, + {ID: "t2", Title: "Task 2", Files: []string{"b.go"}, DependsOn: []string{"nonexistent"}}, + }, + } + + result, err := Decompose(plan, DecomposeConfig{}) + if err != nil { + t.Fatalf("Decompose: %v", err) + } + + // t1 and t2 share no files and the unknown dep should be ignored, + // so they land in separate teams. + if len(result.ExecutionTeams) != 2 { + t.Fatalf("ExecutionTeams = %d, want 2 (unknown dep ignored)", len(result.ExecutionTeams)) + } + + // Verify both tasks are present (no silent drops). + totalTasks := 0 + for _, team := range result.ExecutionTeams { + totalTasks += len(team.Tasks) + } + if totalTasks != 2 { + t.Errorf("total tasks = %d, want 2", totalTasks) + } +} + // -- Union-Find unit tests --------------------------------------------------- func TestUnionFind_BasicOperations(t *testing.T) {