From d656834f5fe1379a95524cf7eac30f14ba163efe Mon Sep 17 00:00:00 2001 From: Spencer Marx Date: Sun, 10 May 2026 15:20:04 +0200 Subject: [PATCH 1/4] feat(dashboard): derive command outcome from workflow lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a parent AI process exits 0 mid-workflow (e.g. macOS sleep drops the streaming connection before the AI ever calls `ocr state close-session`), the dashboard previously labelled the command "Success" — its only completion signal was the process exit code. Users had no way to tell from the dashboard that the workflow was actually unfinished. Cross-check the workflow lifecycle when reporting outcome. New pure helper `deriveCommandOutcome(exit_code, workflow_status)` returns: - 'success' — exit 0 AND (no workflow | workflow.status='closed') - 'incomplete' — exit 0 BUT linked workflow still 'active' - 'failed' — non-zero exit code (excluding -2 cancel sentinel) - 'cancelled' — exit code -2 Wire-up: - command-runner.finishExecution queries workflow status via a single LEFT JOIN and includes outcome in the `command:finished` socket event - getCommandHistory LEFT JOINs sessions; the route projects outcome onto each row so historical data is reclassified retroactively at read time (no schema migration, no backfill) - Client provider, history list, and workflow-output badge prefer `outcome` from server, falling back to legacy exit-code mapping for older sockets / unhydrated rows - New 'Incomplete' StatusFilter chip (amber, distinct from green Success and red Failed); badge tooltip explains the likely cause and points to "Resume in terminal" Single source of truth means client + server can never disagree on labelling. Co-Authored-By: claude-flow --- .../features/commands/commands-page.tsx | 1 + .../commands/components/command-history.tsx | 33 +++++++-- .../commands/components/workflow-output.tsx | 55 +++++++++++++-- .../features/commands/hooks/use-commands.ts | 7 ++ .../providers/command-state-provider.tsx | 58 +++++++++++----- packages/dashboard/src/server/db.ts | 28 ++++++-- .../dashboard/src/server/routes/commands.ts | 21 ++++-- .../__tests__/command-outcome.test.ts | 41 +++++++++++ .../src/server/services/command-outcome.ts | 69 +++++++++++++++++++ .../src/server/socket/command-runner.ts | 13 ++++ packages/dashboard/src/shared/types.ts | 15 ++++ 11 files changed, 302 insertions(+), 39 deletions(-) create mode 100644 packages/dashboard/src/server/services/__tests__/command-outcome.test.ts create mode 100644 packages/dashboard/src/server/services/command-outcome.ts diff --git a/packages/dashboard/src/client/features/commands/commands-page.tsx b/packages/dashboard/src/client/features/commands/commands-page.tsx index 86b00a6..4d314ca 100644 --- a/packages/dashboard/src/client/features/commands/commands-page.tsx +++ b/packages/dashboard/src/client/features/commands/commands-page.tsx @@ -157,6 +157,7 @@ export function CommandsPage() { events={activeTab.events} isRunning={activeTab.status === 'running'} exitCode={activeTab.exitCode} + status={activeTab.status} commandName={activeTab.command} onCancel={handleCancel} /> diff --git a/packages/dashboard/src/client/features/commands/components/command-history.tsx b/packages/dashboard/src/client/features/commands/components/command-history.tsx index 6560764..f5af3b0 100644 --- a/packages/dashboard/src/client/features/commands/components/command-history.tsx +++ b/packages/dashboard/src/client/features/commands/components/command-history.tsx @@ -24,7 +24,15 @@ import { EventStreamRenderer } from './event-stream/event-stream-renderer' // ── Types ── -type StatusFilter = 'all' | 'success' | 'fail' | 'cancelled' | 'running' | 'stalled' | 'orphaned' +type StatusFilter = + | 'all' + | 'success' + | 'fail' + | 'cancelled' + | 'incomplete' + | 'running' + | 'stalled' + | 'orphaned' type SortField = 'date' | 'command' | 'duration' | 'status' type SortDir = 'asc' | 'desc' @@ -64,6 +72,13 @@ function getStatus(entry: CommandHistoryEntry): StatusFilter { } return 'running' } + // Prefer the server-derived outcome when present — it accounts for + // the workflow's actual lifecycle, not just process exit code. + if (entry.outcome === 'success') return 'success' + if (entry.outcome === 'incomplete') return 'incomplete' + if (entry.outcome === 'cancelled') return 'cancelled' + if (entry.outcome === 'failed') return 'fail' + // Legacy fallback for rows that predate the outcome field. if (entry.exit_code === 0) return 'success' if (entry.exit_code === -2) return 'cancelled' if (entry.exit_code === -3) return 'orphaned' @@ -75,6 +90,7 @@ function statusLabel(s: StatusFilter): string { case 'success': return 'Success' case 'fail': return 'Fail' case 'cancelled': return 'Cancelled' + case 'incomplete': return 'Incomplete' case 'running': return 'Running' case 'stalled': return 'Stalled' case 'orphaned': return 'Orphaned' @@ -89,6 +105,10 @@ function statusPillClasses(status: StatusFilter): string { return 'border-emerald-500/25 bg-emerald-500/15 text-emerald-700 dark:text-emerald-400' case 'cancelled': return 'border-amber-500/25 bg-amber-500/15 text-amber-700 dark:text-amber-400' + case 'incomplete': + // Same amber family as cancelled/stalled — both are "needs attention, + // not a hard failure". Resume-in-terminal is the natural follow-up. + return 'border-amber-500/25 bg-amber-500/15 text-amber-700 dark:text-amber-400' case 'stalled': return 'border-amber-500/30 bg-amber-500/10 text-amber-700 dark:text-amber-400' case 'orphaned': @@ -120,10 +140,11 @@ function compareEntries(a: CommandHistoryEntry, b: CommandHistoryEntry, field: S const order: Record = { running: 0, stalled: 1, - orphaned: 2, - success: 3, - cancelled: 4, - fail: 5, + incomplete: 2, + orphaned: 3, + success: 4, + cancelled: 5, + fail: 6, all: -1, } cmp = (order[getStatus(a)] ?? 0) - (order[getStatus(b)] ?? 0) @@ -141,6 +162,7 @@ const STATUS_OPTIONS: { value: StatusFilter; label: string }[] = [ { value: 'stalled', label: 'Stalled' }, { value: 'orphaned', label: 'Orphaned' }, { value: 'success', label: 'Success' }, + { value: 'incomplete', label: 'Incomplete' }, { value: 'fail', label: 'Fail' }, { value: 'cancelled', label: 'Cancelled' }, ] @@ -468,6 +490,7 @@ export function CommandHistory({ isRunning, onRerun }: CommandHistoryProps) { success: 0, fail: 0, cancelled: 0, + incomplete: 0, running: 0, stalled: 0, orphaned: 0, diff --git a/packages/dashboard/src/client/features/commands/components/workflow-output.tsx b/packages/dashboard/src/client/features/commands/components/workflow-output.tsx index b50d035..63ca6c8 100644 --- a/packages/dashboard/src/client/features/commands/components/workflow-output.tsx +++ b/packages/dashboard/src/client/features/commands/components/workflow-output.tsx @@ -13,6 +13,41 @@ import { EventStreamRenderer } from './event-stream/event-stream-renderer' * previously dumped into the header — the raw command is always one * click away via the "Raw" toggle. */ +/** + * Map status (preferred) and exit code (fallback) to the badge label. + * Status comes from the server-derived CommandOutcome via the provider; + * exit code is the legacy fallback for unhydrated rows. + */ +function statusBadgeLabel( + status: 'running' | 'complete' | 'incomplete' | 'cancelled' | 'failed' | undefined, + exitCode: number, +): string { + if (status === 'complete') return 'Complete' + if (status === 'incomplete') return 'Incomplete' + if (status === 'cancelled') return 'Cancelled' + if (status === 'failed') return `Exit: ${exitCode}` + // Fallback (no status from server) — exit code only. + if (exitCode === 0) return 'Complete' + if (exitCode === -2) return 'Cancelled' + return `Exit: ${exitCode}` +} + +/** Tailwind color classes for each end-state. Amber for incomplete keeps */ +/** it visually distinct from green (success) and red (failed). */ +function statusBadgeClasses( + status: 'running' | 'complete' | 'incomplete' | 'cancelled' | 'failed' | undefined, + exitCode: number, +): string { + const effective = status ?? (exitCode === 0 ? 'complete' : exitCode === -2 ? 'cancelled' : 'failed') + if (effective === 'complete') + return 'border-emerald-500/25 bg-emerald-500/15 text-emerald-700 dark:text-emerald-400' + if (effective === 'incomplete') + return 'border-amber-500/25 bg-amber-500/15 text-amber-700 dark:text-amber-400' + if (effective === 'cancelled') + return 'border-amber-500/25 bg-amber-500/15 text-amber-700 dark:text-amber-400' + return 'border-red-500/25 bg-red-500/15 text-red-700 dark:text-red-400' +} + function parseCommandSummary(command: string | null): { verb: string reviewerCount: number | null @@ -49,6 +84,12 @@ type WorkflowOutputProps = { events?: StreamEvent[] isRunning: boolean exitCode: number | null + /** + * Canonical end-state from the parent provider, derived server-side + * from (exit_code, linked workflow.status). Falls back to exit-code + * mapping if absent (older sockets / unhydrated state). + */ + status?: 'running' | 'complete' | 'incomplete' | 'cancelled' | 'failed' commandName: string | null onCancel: () => void bare?: boolean @@ -66,6 +107,7 @@ export function WorkflowOutput({ events, isRunning, exitCode, + status, commandName, onCancel, bare, @@ -145,14 +187,15 @@ export function WorkflowOutput({ - {exitCode === 0 ? 'Complete' : exitCode === -2 ? 'Cancelled' : `Exit: ${exitCode}`} + {statusBadgeLabel(status, exitCode)} )} {hasTimeline && ( diff --git a/packages/dashboard/src/client/features/commands/hooks/use-commands.ts b/packages/dashboard/src/client/features/commands/hooks/use-commands.ts index c434b0d..9003bc8 100644 --- a/packages/dashboard/src/client/features/commands/hooks/use-commands.ts +++ b/packages/dashboard/src/client/features/commands/hooks/use-commands.ts @@ -11,6 +11,13 @@ export type CommandHistoryEntry = { duration_ms: number | null exit_code: number | null output: string + /** + * Server-derived from (exit_code, linked workflow.status). Distinguishes + * a workflow that exited cleanly from one that exited 0 mid-flight (e.g. + * macOS sleep dropped the streaming connection before the AI ever called + * `ocr state close-session`). Absent on rows from older server builds. + */ + outcome?: 'success' | 'incomplete' | 'failed' | 'cancelled' | null // ── Agent-session journal fields (added by migration v11) ── workflow_id?: string | null vendor?: string | null diff --git a/packages/dashboard/src/client/providers/command-state-provider.tsx b/packages/dashboard/src/client/providers/command-state-provider.tsx index d679893..624ff13 100644 --- a/packages/dashboard/src/client/providers/command-state-provider.tsx +++ b/packages/dashboard/src/client/providers/command-state-provider.tsx @@ -21,7 +21,18 @@ import { useSocket, useSocketEvent } from './socket-provider' import { fetchApi } from '../lib/utils' import type { CommandEventsResponse, StreamEvent } from '../lib/api-types' -export type TabStatus = 'running' | 'complete' | 'cancelled' | 'failed' +export type TabStatus = 'running' | 'complete' | 'incomplete' | 'cancelled' | 'failed' + +/** Maps the server's CommandOutcome onto the client's TabStatus vocabulary. */ +function outcomeToTabStatus( + outcome: 'success' | 'incomplete' | 'failed' | 'cancelled' | null, +): TabStatus { + if (outcome === 'success') return 'complete' + if (outcome === 'incomplete') return 'incomplete' + if (outcome === 'failed') return 'failed' + if (outcome === 'cancelled') return 'cancelled' + return 'running' +} export type CommandTab = { executionId: number @@ -203,23 +214,38 @@ export function CommandStateProvider({ children }: { children: ReactNode }) { }) }) - useSocketEvent<{ execution_id: number; exitCode: number }>( - 'command:finished', - (data) => { - setTabMap((prev) => { - const existing = prev.get(data.execution_id) - if (!existing) return prev + useSocketEvent<{ + execution_id: number + exitCode: number + /** + * Server-derived from (exit_code, linked workflow.status). Distinguishes + * a cleanly-finished workflow from one whose parent process exited 0 + * mid-flight (macOS-sleep / network-drop). May be absent on rows from + * older server builds — fall back to the exit-code-only mapping. + */ + outcome?: 'success' | 'incomplete' | 'failed' | 'cancelled' | null + }>('command:finished', (data) => { + setTabMap((prev) => { + const existing = prev.get(data.execution_id) + if (!existing) return prev - const next = new Map(prev) - next.set(data.execution_id, { - ...existing, - status: data.exitCode === -2 ? 'cancelled' : data.exitCode === 0 ? 'complete' : 'failed', - exitCode: data.exitCode, - }) - return next + const next = new Map(prev) + const status: TabStatus = data.outcome + ? outcomeToTabStatus(data.outcome) + : data.exitCode === -2 + ? 'cancelled' + : data.exitCode === 0 + ? 'complete' + : 'failed' + + next.set(data.execution_id, { + ...existing, + status, + exitCode: data.exitCode, }) - }, - ) + return next + }) + }) // Actions const dismissTab = useCallback( diff --git a/packages/dashboard/src/server/db.ts b/packages/dashboard/src/server/db.ts index e2ac3d4..6b4c6ae 100644 --- a/packages/dashboard/src/server/db.ts +++ b/packages/dashboard/src/server/db.ts @@ -661,12 +661,30 @@ export function deleteNote(db: Database, noteId: number): void { // ── Command execution queries ── -export function getCommandHistory(db: Database, limit = 50): CommandExecutionRow[] { - return resultToRows( +/** + * Like {@link CommandExecutionRow} but with the linked workflow's status + * projected on as `workflow_status` (via LEFT JOIN sessions). Used by the + * commands history route to derive the {@link CommandOutcome} per row in + * one round-trip rather than N+1 lookups. `null` when the row has no + * `workflow_id` or the workflow row was deleted. + */ +export type CommandExecutionRowWithWorkflowStatus = CommandExecutionRow & { + workflow_status: 'active' | 'closed' | null +} + +export function getCommandHistory( + db: Database, + limit = 50, +): CommandExecutionRowWithWorkflowStatus[] { + return resultToRows( db.exec( - 'SELECT * FROM command_executions ORDER BY started_at DESC LIMIT ?', - [limit] - ) + `SELECT ce.*, s.status AS workflow_status + FROM command_executions ce + LEFT JOIN sessions s ON s.id = ce.workflow_id + ORDER BY ce.started_at DESC + LIMIT ?`, + [limit], + ), ) } diff --git a/packages/dashboard/src/server/routes/commands.ts b/packages/dashboard/src/server/routes/commands.ts index 9e2ed7c..b83b7f3 100644 --- a/packages/dashboard/src/server/routes/commands.ts +++ b/packages/dashboard/src/server/routes/commands.ts @@ -7,6 +7,7 @@ import type { Database } from 'sql.js' import { getCommandHistory } from '../db.js' import { getActiveCommands } from '../socket/command-runner.js' import { readEventJournal } from '../services/event-journal.js' +import { deriveCommandOutcome } from '../services/command-outcome.js' type CommandDefinition = { name: string @@ -67,13 +68,19 @@ export function createCommandsRouter(db: Database, ocrDir: string): Router { router.get('/history', (req, res) => { try { const limit = parseInt(req.query['limit'] as string, 10) || 50 - const history = getCommandHistory(db, limit).map((row) => ({ - ...row, - duration_ms: - row.finished_at && row.started_at - ? new Date(row.finished_at).getTime() - new Date(row.started_at).getTime() - : null, - })) + const history = getCommandHistory(db, limit).map((row) => { + const { workflow_status, ...persisted } = row + return { + ...persisted, + duration_ms: + row.finished_at && row.started_at + ? new Date(row.finished_at).getTime() - new Date(row.started_at).getTime() + : null, + // Derived from (exit_code, workflow_status) — single source of + // truth shared with the live `command:finished` socket event. + outcome: deriveCommandOutcome(row.exit_code, workflow_status), + } + }) res.json(history) } catch (err) { console.error('Failed to fetch command history:', err) diff --git a/packages/dashboard/src/server/services/__tests__/command-outcome.test.ts b/packages/dashboard/src/server/services/__tests__/command-outcome.test.ts new file mode 100644 index 0000000..5a9e2d7 --- /dev/null +++ b/packages/dashboard/src/server/services/__tests__/command-outcome.test.ts @@ -0,0 +1,41 @@ +import { describe, it, expect } from 'vitest' +import { deriveCommandOutcome } from '../command-outcome' + +describe('deriveCommandOutcome', () => { + it('returns null when the command has not finished', () => { + expect(deriveCommandOutcome(null, null)).toBeNull() + expect(deriveCommandOutcome(null, 'active')).toBeNull() + expect(deriveCommandOutcome(null, 'closed')).toBeNull() + }) + + it("returns 'cancelled' for the user-cancel sentinel (-2)", () => { + expect(deriveCommandOutcome(-2, null)).toBe('cancelled') + expect(deriveCommandOutcome(-2, 'active')).toBe('cancelled') + expect(deriveCommandOutcome(-2, 'closed')).toBe('cancelled') + }) + + it("returns 'failed' for any non-zero exit code other than -2", () => { + expect(deriveCommandOutcome(1, null)).toBe('failed') + expect(deriveCommandOutcome(-1, null)).toBe('failed') + expect(deriveCommandOutcome(127, 'active')).toBe('failed') + expect(deriveCommandOutcome(137, 'closed')).toBe('failed') + }) + + describe('exit 0', () => { + it("returns 'success' for non-workflow commands (no linked workflow)", () => { + // Utility commands (sync-reviewers, doctor, etc.) have no workflow_id. + expect(deriveCommandOutcome(0, null)).toBe('success') + }) + + it("returns 'success' when the linked workflow has reached terminal 'closed' status", () => { + // Happy path — AI ran the full workflow and called `state close-session`. + expect(deriveCommandOutcome(0, 'closed')).toBe('success') + }) + + it("returns 'incomplete' when the linked workflow is still 'active' (the macOS-sleep bug)", () => { + // The bug case: parent process exits 0 (e.g. streaming connection + // dropped on Mac sleep) but the AI never reached `state close-session`. + expect(deriveCommandOutcome(0, 'active')).toBe('incomplete') + }) + }) +}) diff --git a/packages/dashboard/src/server/services/command-outcome.ts b/packages/dashboard/src/server/services/command-outcome.ts new file mode 100644 index 0000000..984ff76 --- /dev/null +++ b/packages/dashboard/src/server/services/command-outcome.ts @@ -0,0 +1,69 @@ +/** + * Command outcome derivation — pure function, single source of truth. + * + * Bridges process exit code semantics with workflow lifecycle semantics + * so the dashboard can distinguish "AI parent process exited cleanly and + * the workflow finished" from "AI parent process exited cleanly while the + * workflow was still mid-flight" (the macOS-sleep / network-drop case). + * + * Used both at finish time (command-runner emits outcome on the socket + * event) and at read time (commands history route projects outcome onto + * each row). Single helper means client + server can never disagree. + */ + +import type { Database } from 'sql.js' +import type { CommandOutcome, SessionStatus } from '../../shared/types.js' + +/** Cancel sentinel set by finishExecution when the user clicks Cancel. */ +const CANCEL_EXIT_CODE = -2 + +/** + * Pure derivation. Takes the two facts that determine outcome and + * returns the canonical label. Returns `null` when the command has + * not yet finished. + * + * `workflowStatus = null` means either: + * - the command is not linked to a workflow (utility commands like + * sync-reviewers, doctor) — those are `success` on exit 0 + * - or the lookup failed because the workflow row was deleted — + * treat as `success` to avoid spurious "incomplete" labels on + * historical rows whose sessions were cleaned up + */ +export function deriveCommandOutcome( + exitCode: number | null, + workflowStatus: SessionStatus | null, +): CommandOutcome | null { + if (exitCode === null) return null + if (exitCode === CANCEL_EXIT_CODE) return 'cancelled' + if (exitCode !== 0) return 'failed' + // Exit 0 — cross-check the linked workflow. + if (workflowStatus === null || workflowStatus === 'closed') return 'success' + return 'incomplete' +} + +/** + * Look up the linked workflow's status for a command_executions row. + * Returns `null` when the row has no `workflow_id` or the workflow + * row no longer exists. + * + * Single SQL round-trip — used by both finishExecution (for the + * `command:finished` socket event) and the history route (for + * computing outcome on every row). + */ +export function getWorkflowStatusForExecution( + db: Database, + executionId: number, +): SessionStatus | null { + const result = db.exec( + `SELECT s.status + FROM command_executions ce + LEFT JOIN sessions s ON s.id = ce.workflow_id + WHERE ce.id = ?`, + [executionId], + ) + const row = result[0]?.values[0] + if (!row) return null + const status = row[0] as string | null + if (status === 'active' || status === 'closed') return status + return null +} diff --git a/packages/dashboard/src/server/socket/command-runner.ts b/packages/dashboard/src/server/socket/command-runner.ts index ce9e7e6..5018333 100644 --- a/packages/dashboard/src/server/socket/command-runner.ts +++ b/packages/dashboard/src/server/socket/command-runner.ts @@ -16,6 +16,10 @@ import { dirname, join } from 'node:path' import type { Server as SocketIOServer, Socket } from 'socket.io' import type { Database } from 'sql.js' import { saveDb } from '../db.js' +import { + deriveCommandOutcome, + getWorkflowStatusForExecution, +} from '../services/command-outcome.js' import type { SessionCaptureService } from '../services/capture/session-capture-service.js' import { AiCliService, @@ -1112,6 +1116,14 @@ function finishExecution( ) saveDb(db, ocrDir) + // Cross-check workflow lifecycle so the UI can distinguish a cleanly + // finished workflow from one whose parent process exited 0 mid-flight + // (the macOS-sleep / network-drop case). Read AFTER the exit_code + // UPDATE above so the lookup sees current data; the JOIN handles the + // common no-workflow case (utility commands). + const workflowStatus = getWorkflowStatusForExecution(db, executionId) + const outcome = deriveCommandOutcome(code, workflowStatus) + // Best-effort JSONL backup if (entry?.uid) { appendCommandLog(ocrDir, { @@ -1133,6 +1145,7 @@ function finishExecution( execution_id: executionId, exitCode: code, finished_at: finishedAt, + outcome, }) activeCommands.delete(executionId) diff --git a/packages/dashboard/src/shared/types.ts b/packages/dashboard/src/shared/types.ts index 1822d7d..e5b8aaf 100644 --- a/packages/dashboard/src/shared/types.ts +++ b/packages/dashboard/src/shared/types.ts @@ -2,6 +2,21 @@ // Socket.IO event types, API response types, etc. export type SessionStatus = 'active' | 'closed' + +/** + * Final state of a command_executions row, derived from + * (exit_code, linked workflow's session.status). Surfaced to the client + * so the UI can distinguish a workflow that exited cleanly from one that + * exited 0 mid-flight (e.g. parent process died on macOS sleep before the + * AI ever called `ocr state close-session`). + * + * - 'success' — exit 0 AND (no linked workflow | workflow.status='closed') + * - 'incomplete' — exit 0 BUT workflow exists and is still 'active' + * - 'failed' — non-zero exit code (excluding cancel sentinel) + * - 'cancelled' — exit code -2 (cancel sentinel from finishExecution) + * - null — command not finished yet + */ +export type CommandOutcome = 'success' | 'incomplete' | 'failed' | 'cancelled' export type WorkflowType = 'review' | 'map' export type FindingTriage = 'unread' | 'read' | 'acknowledged' | 'fixed' | 'wont_fix' export type RoundTriage = 'needs_review' | 'in_progress' | 'changes_made' | 'acknowledged' | 'dismissed' From b1b8204e72b486f659913e05f5e443f4bebb71e5 Mon Sep 17 00:00:00 2001 From: Spencer Marx Date: Sun, 10 May 2026 15:20:16 +0200 Subject: [PATCH 2/4] fix(cli): resolve completion session via dashboard execution UID MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `ocr state round-complete` and `ocr state close-session` previously used a "latest-active" heuristic when no `--session-id` was given — 'SELECT * FROM sessions WHERE status = active ORDER BY started_at DESC LIMIT 1'. With multiple stale-active rows in the DB (which the incomplete-workflow bug itself produces over time), this picked the wrong session and wrote round-meta.json into an unrelated session directory. The dashboard already sets OCR_DASHBOARD_EXECUTION_UID when it spawns the AI, and the SessionCaptureService binds that uid to command_executions.workflow_id. Teach `resolveSessionForCompletion` to follow that linkage before falling back to latest-active: 1. explicit --session-id (most specific) 2. process.env.OCR_DASHBOARD_EXECUTION_UID → command_executions.workflow_id 3. getLatestActiveSession (fine for direct CLI use) A dashboard-spawned AI now always knows its own workflow regardless of how many other active rows exist. Direct CLI users see no behavior change. Co-Authored-By: claude-flow --- .../cli/src/lib/state/__tests__/state.test.ts | 65 +++++++++++++++++++ packages/cli/src/lib/state/index.ts | 35 +++++++++- 2 files changed, 99 insertions(+), 1 deletion(-) diff --git a/packages/cli/src/lib/state/__tests__/state.test.ts b/packages/cli/src/lib/state/__tests__/state.test.ts index a27131a..fab2985 100644 --- a/packages/cli/src/lib/state/__tests__/state.test.ts +++ b/packages/cli/src/lib/state/__tests__/state.test.ts @@ -923,6 +923,71 @@ describe("stateRoundComplete", () => { expect(rcEvent?.round).toBe(3); }); + it("prefers OCR_DASHBOARD_EXECUTION_UID linkage over latest-active fallback", async () => { + // Reproduces the auto-detect ambiguity bug: with multiple active + // sessions, the latest-active fallback would pick the wrong one. + // The dashboard sets OCR_DASHBOARD_EXECUTION_UID for the AI it spawns, + // and the capture service binds that uid -> command_executions.workflow_id. + // round-complete must follow that linkage. + + // Older "right" session — what the dashboard actually spawned the AI for. + const rightDir = sessionDir("right-session"); + await stateInit({ + sessionId: "right-session", + branch: "feat/right", + workflowType: "review", + sessionDir: rightDir, + ocrDir, + }); + + // Newer "wrong" session — would win latest-active auto-detect because + // its started_at is later. Simulates an unrelated session a user kicked + // off in the same project shortly after. + await new Promise((r) => setTimeout(r, 1100)); + const wrongDir = sessionDir("wrong-session"); + await stateInit({ + sessionId: "wrong-session", + branch: "feat/wrong", + workflowType: "review", + sessionDir: wrongDir, + ocrDir, + }); + + // Seed a command_executions row linked to the right session, mirroring + // what the SessionCaptureService produces when the dashboard binds. + const { ensureDatabase: getDb } = await import("../../db/index.js"); + const db = await getDb(ocrDir); + db.run( + `INSERT INTO command_executions + (uid, command, args, started_at, vendor, workflow_id) + VALUES (?, 'review', '[]', datetime('now'), 'claude', ?)`, + ["dash-uid-test", "right-session"], + ); + + const meta = makeRoundMeta({ verdict: "APPROVE", reviewers: [] }); + const filePath = writeRoundMeta(rightDir, meta); + + const prevEnv = process.env["OCR_DASHBOARD_EXECUTION_UID"]; + process.env["OCR_DASHBOARD_EXECUTION_UID"] = "dash-uid-test"; + try { + const result = await stateRoundComplete({ + source: "file", + ocrDir, + filePath, + }); + expect(result.sessionId).toBe("right-session"); + } finally { + if (prevEnv === undefined) delete process.env["OCR_DASHBOARD_EXECUTION_UID"]; + else process.env["OCR_DASHBOARD_EXECUTION_UID"] = prevEnv; + } + + // Wrong session must NOT have a round_completed event. + const wrongShow = await stateShow(ocrDir, "wrong-session"); + expect( + wrongShow?.events.find((e) => e.event_type === "round_completed"), + ).toBeUndefined(); + }); + it("allows targeting a closed session via explicit session-id", async () => { const dir = sessionDir("closed-target"); await stateInit({ diff --git a/packages/cli/src/lib/state/index.ts b/packages/cli/src/lib/state/index.ts index 320a92a..26a9c95 100644 --- a/packages/cli/src/lib/state/index.ts +++ b/packages/cli/src/lib/state/index.ts @@ -364,7 +364,17 @@ function parseRawJson(raw: string, label: string): unknown { /** * Resolve the active session for a completion command. - * Uses explicit ID if provided, otherwise falls back to the latest active session. + * + * Resolution order, most-specific to least: + * 1. Explicit `--session-id` argument — caller knows exactly which row. + * 2. `OCR_DASHBOARD_EXECUTION_UID` env var → `command_executions.workflow_id`. + * Set by the dashboard when it spawns the AI; lets `state round-complete` + * and `state close-session` find their workflow even when several + * sessions are stale-active in the DB. Without this, the latest-active + * fallback can pick a wrong recently-modified session — see the + * "session auto-detect picked the wrong row" failure mode. + * 3. `getLatestActiveSession` — works fine for direct CLI use where there + * is typically only one active session in a project. */ function resolveSessionForCompletion( db: Database, @@ -380,6 +390,29 @@ function resolveSessionForCompletion( current_map_run: existing.current_map_run, }; } + // Path 2 — env var linkage. Skip silently when not running under the + // dashboard (env var absent) or when the linkage hasn't been recorded + // yet (race window before the dashboard binds the workflow). + const dashboardUid = process.env["OCR_DASHBOARD_EXECUTION_UID"]; + if (dashboardUid) { + const result = db.exec( + "SELECT workflow_id FROM command_executions WHERE uid = ?", + [dashboardUid], + ); + const row = result[0]?.values[0]; + const workflowId = row?.[0] as string | null | undefined; + if (workflowId) { + const existing = getSession(db, workflowId); + if (existing) { + return { + id: existing.id, + session_dir: existing.session_dir, + current_round: existing.current_round, + current_map_run: existing.current_map_run, + }; + } + } + } const active = getLatestActiveSession(db); if (!active) throw new Error("No active session found"); return { From b7c3f4e7add247078115898cac26d477cbbc6c37 Mon Sep 17 00:00:00 2001 From: Spencer Marx Date: Mon, 11 May 2026 22:37:02 +0200 Subject: [PATCH 3/4] fix(cli): air-tight workflow state lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Audited every state-mutation surface (state init/transition/close/ round-complete/map-complete/sync) for the failure modes that caused the "incomplete session wrongly marked complete" and "wrong session got closed" bugs. Closes the structural gaps that PR #31's outcome derivation alone couldn't fix: 1. Single session resolver. Two parallel helpers (resolveActiveSession + resolveSessionForCompletion) diverged — fixing one missed the other. Collapsed into `resolveSession`, used by every CLI subcommand that takes an optional --session-id. Resolution order: explicit --session-id → OCR_DASHBOARD_EXECUTION_UID → latest-active. Refuses with a hard ambiguity error when >1 active sessions exist and no env var is set, rather than silently picking one. Auto-detect decisions are now printed to stderr ("Auto-detected session: X (via latest-active)") so the user sees which session a command will affect. 2. Phase-progression graph. Replaced the flat VALID_PHASES set (12 phases mixing review + map) with two workflow-typed graphs. stateTransition validates source→target legality: a review workflow can no longer transition to map phases, and the AI can no longer skip from `reviews` straight to `complete`. Round/map-run boundaries are treated as a permitted reset to the initial phase. 3. Cascade stateClose + idempotency. Closing a workflow now stamps any in-flight dependent command_executions with exit_code=-4 and a "closed by parent workflow close" note. Idempotent: closing an already-closed session no-ops with a notice rather than writing a duplicate session_closed event. 4. Round derivation from events. stateInit's re-open path used to walk the filesystem (rounds/round-N/final.md presence) to decide the next round. Now derives from MAX(round_completed.round) + 1 — events are authoritative, filesystem is observational. stateRoundComplete also advances sessions.current_round so the column stays in sync. 5. Workflow_type compat on re-open. stateInit refuses to re-open a review session as a map (and vice versa) — disjoint phase graphs would corrupt state immediately. 6. Workflow-typed initial phase. stateInit now sets current_phase='map-context' for map workflows (was always 'context'). Without this, every subsequent map transition fails the new phase-graph check. Tests: stateTransition phase-graph (legal/illegal/cross-type/round- boundary), stateClose cascade + idempotency, stateInit round-from- events + type-mismatch, resolveActiveSession ambiguity refusal + env-var disambiguation. Pre-existing progress-sqlite test helper updated to walk legal phase edges. Co-Authored-By: claude-flow --- packages/cli/src/commands/session.ts | 12 +- packages/cli/src/commands/state.ts | 32 +- .../__tests__/progress-sqlite.test.ts | 56 ++- .../cli/src/lib/state/__tests__/state.test.ts | 283 ++++++++++- packages/cli/src/lib/state/index.ts | 455 ++++++++++++++---- 5 files changed, 704 insertions(+), 134 deletions(-) diff --git a/packages/cli/src/commands/session.ts b/packages/cli/src/commands/session.ts index 1cc0bd5..a7f3f0a 100644 --- a/packages/cli/src/commands/session.ts +++ b/packages/cli/src/commands/session.ts @@ -87,8 +87,10 @@ const startInstanceSubcommand = new Command("start-instance") const db = await ensureDatabase(ocrDir); try { - const workflowId = options.workflow - ?? (await resolveActiveSession(ocrDir)).id; + const { id: workflowId } = await resolveActiveSession( + ocrDir, + options.workflow, + ); const id = randomUUID(); const persona = options.persona ?? null; @@ -235,8 +237,10 @@ const listSubcommand = new Command("list") const db = await ensureDatabase(ocrDir); try { - const workflowId = options.workflow - ?? (await resolveActiveSession(ocrDir)).id; + const { id: workflowId } = await resolveActiveSession( + ocrDir, + options.workflow, + ); const rows = listAgentSessionsForWorkflow(db, workflowId); if (options.json) { diff --git a/packages/cli/src/commands/state.ts b/packages/cli/src/commands/state.ts index 2c66982..2be48f2 100644 --- a/packages/cli/src/commands/state.ts +++ b/packages/cli/src/commands/state.ts @@ -249,18 +249,20 @@ const transitionSubcommand = new Command("transition") requireOcrSetup(targetDir); const ocrDir = join(targetDir, ".ocr"); - const VALID_PHASES = new Set([ - "context", "change-context", "analysis", "reviews", - "aggregation", "discourse", "synthesis", "complete", - "map-context", "topology", "flow-analysis", "requirements-mapping", - ]); - if (!VALID_PHASES.has(options.phase)) { - throw new Error(`Invalid phase: "${options.phase}". Must be one of: ${[...VALID_PHASES].join(", ")}`); - } - try { - const sessionId = options.sessionId - ?? (await resolveActiveSession(ocrDir)).id; + // Phase validation now lives in stateTransition itself, where it + // can see the session's workflow_type and check legal transitions + // against the workflow-typed phase graph (review vs map). The + // previous flat VALID_PHASES set let a review workflow transition + // to map phases (e.g. "topology") without complaint. + // Single auto-detect path (resolveActiveSession is the back-compat + // shim over resolveSession). Threading the explicit id through + // gives us validation of bad ids AND the stderr announcement when + // we auto-detect. + const { id: sessionId } = await resolveActiveSession( + ocrDir, + options.sessionId, + ); await stateTransition({ sessionId, @@ -289,15 +291,17 @@ const transitionSubcommand = new Command("transition") const closeSubcommand = new Command("close") .description("Close a session") - .option("--session-id ", "Session ID (auto-detects latest active if omitted)") + .option("--session-id ", "Session ID (auto-detects latest active if omitted; refuses on ambiguity)") .action(async (options: { sessionId?: string }) => { const targetDir = process.cwd(); requireOcrSetup(targetDir); const ocrDir = join(targetDir, ".ocr"); try { - const sessionId = options.sessionId - ?? (await resolveActiveSession(ocrDir)).id; + const { id: sessionId } = await resolveActiveSession( + ocrDir, + options.sessionId, + ); await stateClose({ sessionId, diff --git a/packages/cli/src/lib/progress/__tests__/progress-sqlite.test.ts b/packages/cli/src/lib/progress/__tests__/progress-sqlite.test.ts index 775d843..8c43ae2 100644 --- a/packages/cli/src/lib/progress/__tests__/progress-sqlite.test.ts +++ b/packages/cli/src/lib/progress/__tests__/progress-sqlite.test.ts @@ -45,6 +45,31 @@ function createSessionDir(sessionId: string): string { return dir; } +/** + * Ordered phase walks per workflow type. The state machine rejects + * direct jumps from `context` to (e.g.) `analysis`, so tests that want + * to land at a specific phase need to walk the legal edges in between. + */ +const REVIEW_PHASE_ORDER: ReadonlyArray<{ phase: ReviewPhase; n: number }> = [ + { phase: "context", n: 1 }, + { phase: "change-context", n: 2 }, + { phase: "analysis", n: 3 }, + { phase: "reviews", n: 4 }, + { phase: "aggregation", n: 5 }, + { phase: "discourse", n: 6 }, + { phase: "synthesis", n: 7 }, + { phase: "complete", n: 8 }, +]; + +const MAP_PHASE_ORDER: ReadonlyArray<{ phase: MapPhase; n: number }> = [ + { phase: "map-context", n: 1 }, + { phase: "topology", n: 2 }, + { phase: "flow-analysis", n: 3 }, + { phase: "requirements-mapping", n: 4 }, + { phase: "synthesis", n: 5 }, + { phase: "complete", n: 6 }, +]; + async function initDbAndSession( sessionId: string, workflowType: "review" | "map", @@ -60,13 +85,30 @@ async function initDbAndSession( ocrDir, }); - if (phase !== "context" || phaseNumber !== 1) { - await stateTransition({ - sessionId, - phase: phase as ReviewPhase | MapPhase, - phaseNumber, - ocrDir, - }); + // Walk legal phase edges to reach the target phase. Direct jumps are + // rejected by stateTransition's phase-graph validation. + const order = workflowType === "review" ? REVIEW_PHASE_ORDER : MAP_PHASE_ORDER; + const targetIdx = order.findIndex((p) => p.phase === phase); + if (targetIdx > 0) { + for (let i = 1; i <= targetIdx; i++) { + const step = order[i]!; + await stateTransition({ + sessionId, + phase: step.phase, + phaseNumber: step.n, + ocrDir, + }); + } + // If the caller passed a phase_number that differs from the + // canonical position, do a final no-op transition to that number. + if (order[targetIdx]!.n !== phaseNumber) { + await stateTransition({ + sessionId, + phase: order[targetIdx]!.phase, + phaseNumber, + ocrDir, + }); + } } // Set up the progress DB cache diff --git a/packages/cli/src/lib/state/__tests__/state.test.ts b/packages/cli/src/lib/state/__tests__/state.test.ts index fab2985..37a30f5 100644 --- a/packages/cli/src/lib/state/__tests__/state.test.ts +++ b/packages/cli/src/lib/state/__tests__/state.test.ts @@ -152,6 +152,13 @@ describe("stateTransition", () => { ocrDir, }); + // Walk through legal phases (the graph rejects context → analysis). + await stateTransition({ + sessionId: "phase-event", + phase: "change-context", + phaseNumber: 2, + ocrDir, + }); await stateTransition({ sessionId: "phase-event", phase: "analysis", @@ -162,7 +169,7 @@ describe("stateTransition", () => { const result = await stateShow(ocrDir, "phase-event"); const events = result?.events ?? []; const transitionEvent = events.find( - (e) => e.event_type === "phase_transition", + (e) => e.event_type === "phase_transition" && e.phase === "analysis", ); expect(transitionEvent).toBeDefined(); expect(transitionEvent?.phase).toBe("analysis"); @@ -407,10 +414,11 @@ describe("stateShow", () => { ocrDir, }); + // Walk a legal edge (context → change-context). await stateTransition({ sessionId: "events-show", - phase: "analysis", - phaseNumber: 3, + phase: "change-context", + phaseNumber: 2, ocrDir, }); @@ -598,6 +606,275 @@ describe("resolveActiveSession", () => { "No active session found", ); }); + + it("refuses ambiguous auto-detect when multiple active sessions exist", async () => { + // Two active sessions; no env var; no explicit id → must refuse + // rather than silently pick one. This is the structural fix for + // the "wrong session got closed" failure mode. + await stateInit({ + sessionId: "amb-one", + branch: "feat/a", + workflowType: "review", + sessionDir: sessionDir("amb-one"), + ocrDir, + }); + await stateInit({ + sessionId: "amb-two", + branch: "feat/b", + workflowType: "review", + sessionDir: sessionDir("amb-two"), + ocrDir, + }); + + await expect(resolveActiveSession(ocrDir)).rejects.toThrow( + /Ambiguous auto-detect/, + ); + }); + + it("disambiguates via OCR_DASHBOARD_EXECUTION_UID even with multiple active sessions", async () => { + // Same setup as the ambiguity test, but with the env var pointing at + // a command_executions row linked to the right session. The resolver + // must follow that linkage instead of throwing. + await stateInit({ + sessionId: "uid-right", + branch: "feat/r", + workflowType: "review", + sessionDir: sessionDir("uid-right"), + ocrDir, + }); + await stateInit({ + sessionId: "uid-wrong", + branch: "feat/w", + workflowType: "review", + sessionDir: sessionDir("uid-wrong"), + ocrDir, + }); + + const { ensureDatabase: getDb } = await import("../../db/index.js"); + const db = await getDb(ocrDir); + db.run( + `INSERT INTO command_executions (uid, command, args, started_at, workflow_id) + VALUES (?, 'review', '[]', datetime('now'), ?)`, + ["dash-uid-disambig", "uid-right"], + ); + + const prev = process.env["OCR_DASHBOARD_EXECUTION_UID"]; + process.env["OCR_DASHBOARD_EXECUTION_UID"] = "dash-uid-disambig"; + try { + const r = await resolveActiveSession(ocrDir); + expect(r.id).toBe("uid-right"); + expect(r.decision).toBe("dashboard-uid"); + } finally { + if (prev === undefined) delete process.env["OCR_DASHBOARD_EXECUTION_UID"]; + else process.env["OCR_DASHBOARD_EXECUTION_UID"] = prev; + } + }); +}); + +describe("stateTransition — phase graph", () => { + it("rejects illegal phase jumps (reviews → complete) on a review workflow", async () => { + const dir = sessionDir("phase-jump-review"); + await stateInit({ + sessionId: "phase-jump-review", + branch: "feat/pj", + workflowType: "review", + sessionDir: dir, + ocrDir, + }); + // Walk legal phases up to `reviews` first. + for (const [phase, n] of [ + ["change-context", 2], + ["analysis", 3], + ["reviews", 4], + ] as const) { + await stateTransition({ + sessionId: "phase-jump-review", + phase, + phaseNumber: n, + ocrDir, + }); + } + await expect( + stateTransition({ + sessionId: "phase-jump-review", + phase: "complete", + phaseNumber: 8, + ocrDir, + }), + ).rejects.toThrow(/Illegal phase transition: reviews → complete/); + }); + + it("rejects cross-workflow-type phases (topology on a review workflow)", async () => { + const dir = sessionDir("cross-type"); + await stateInit({ + sessionId: "cross-type", + branch: "feat/ct", + workflowType: "review", + sessionDir: dir, + ocrDir, + }); + await expect( + stateTransition({ + sessionId: "cross-type", + // @ts-expect-error — intentionally passing a map phase + phase: "topology", + phaseNumber: 2, + ocrDir, + }), + ).rejects.toThrow(/Invalid phase "topology" for workflow_type "review"/); + }); + + it("allows round-boundary transitions to reset to context", async () => { + const dir = sessionDir("round-boundary"); + await stateInit({ + sessionId: "round-boundary", + branch: "feat/rb", + workflowType: "review", + sessionDir: dir, + ocrDir, + }); + // Walk to complete legitimately. + for (const [phase, n] of [ + ["change-context", 2], + ["analysis", 3], + ["reviews", 4], + ["aggregation", 5], + ["discourse", 6], + ["synthesis", 7], + ["complete", 8], + ] as const) { + await stateTransition({ + sessionId: "round-boundary", + phase, + phaseNumber: n, + ocrDir, + }); + } + // Start round 2 by jumping back to context with explicit round bump. + await expect( + stateTransition({ + sessionId: "round-boundary", + phase: "context", + phaseNumber: 1, + round: 2, + ocrDir, + }), + ).resolves.toBeUndefined(); + }); +}); + +describe("stateClose — cascade + idempotency", () => { + it("is idempotent on already-closed sessions", async () => { + const dir = sessionDir("idemp"); + await stateInit({ + sessionId: "idemp", + branch: "feat/idemp", + workflowType: "review", + sessionDir: dir, + ocrDir, + }); + await stateClose({ sessionId: "idemp", ocrDir }); + // Second close: must not throw, must not write a duplicate event. + await expect( + stateClose({ sessionId: "idemp", ocrDir }), + ).resolves.toBeUndefined(); + + const show = await stateShow(ocrDir, "idemp"); + const closeEvents = + show?.events.filter((e) => e.event_type === "session_closed") ?? []; + expect(closeEvents.length).toBe(1); + }); + + it("cascade-closes in-flight dependent command_executions", async () => { + const dir = sessionDir("cascade"); + await stateInit({ + sessionId: "cascade", + branch: "feat/c", + workflowType: "review", + sessionDir: dir, + ocrDir, + }); + + const { ensureDatabase: getDb } = await import("../../db/index.js"); + const db = await getDb(ocrDir); + db.run( + `INSERT INTO command_executions (uid, command, args, started_at, workflow_id, last_heartbeat_at) + VALUES (?, 'review', '[]', datetime('now'), ?, datetime('now'))`, + ["dash-cascade-uid", "cascade"], + ); + + await stateClose({ sessionId: "cascade", ocrDir }); + + const result = db.exec( + `SELECT exit_code, finished_at, notes + FROM command_executions + WHERE uid = ?`, + ["dash-cascade-uid"], + ); + const row = result[0]?.values[0]; + expect(row).toBeDefined(); + expect(row?.[0]).toBe(-4); // CASCADE_CLOSE_EXIT_CODE + expect(row?.[1]).toBeTruthy(); // finished_at populated + expect(String(row?.[2] ?? "")).toMatch(/closed by parent workflow close/); + }); +}); + +describe("stateInit — re-open derives round from events", () => { + it("derives next round from MAX(round_completed.round) + 1, ignoring filesystem", async () => { + // Setup: complete round 1 via stateRoundComplete, then re-init. + const dir = sessionDir("round-from-events"); + await stateInit({ + sessionId: "round-from-events", + branch: "feat/rfe", + workflowType: "review", + sessionDir: dir, + ocrDir, + }); + const meta = makeRoundMeta({ verdict: "APPROVE", reviewers: [] }); + const filePath = writeRoundMeta(dir, meta); + await stateRoundComplete({ + source: "file", + ocrDir, + filePath, + sessionId: "round-from-events", + }); + await stateClose({ sessionId: "round-from-events", ocrDir }); + + // No filesystem round directories — only the round-meta.json we wrote + // is in `dir`, not a `rounds/round-N/` layout. Filesystem inference + // would say round 1. Event-based derivation says round 2. + const sid = await stateInit({ + sessionId: "round-from-events", + branch: "feat/rfe", + workflowType: "review", + sessionDir: dir, + ocrDir, + }); + expect(sid).toBe("round-from-events"); + const show = await stateShow(ocrDir, "round-from-events"); + expect(show?.session.current_round).toBe(2); + }); + + it("rejects re-open with mismatched workflow_type", async () => { + const dir = sessionDir("type-mismatch"); + await stateInit({ + sessionId: "type-mismatch", + branch: "feat/tm", + workflowType: "review", + sessionDir: dir, + ocrDir, + }); + await stateClose({ sessionId: "type-mismatch", ocrDir }); + await expect( + stateInit({ + sessionId: "type-mismatch", + branch: "feat/tm", + workflowType: "map", + sessionDir: dir, + ocrDir, + }), + ).rejects.toThrow(/Cannot re-open session/); + }); }); // ── round-meta.json helpers ── diff --git a/packages/cli/src/lib/state/index.ts b/packages/cli/src/lib/state/index.ts index 26a9c95..17bde92 100644 --- a/packages/cli/src/lib/state/index.ts +++ b/packages/cli/src/lib/state/index.ts @@ -66,6 +66,30 @@ export type { // ── Helpers ── +/** + * Derive the next round number from `round_completed` events. + * + * Events are authoritative — they record what actually happened. The + * filesystem is observational and may drift. If the highest completed + * round is N, the next round is N+1. If no rounds have completed yet, + * the next round is the session's current_round (i.e. still on the + * current round — caller is resuming, not advancing). + */ +function deriveNextRound( + db: Database, + sessionId: string, + fallbackRound: number, +): number { + const result = db.exec( + `SELECT MAX(round) FROM orchestration_events + WHERE session_id = ? AND event_type = 'round_completed'`, + [sessionId], + ); + const max = result[0]?.values[0]?.[0]; + if (typeof max === "number") return max + 1; + return fallbackRound; +} + /** Returns true if the directory contains at least one .md or .json file (recursively). */ function hasArtifacts(dir: string): boolean { try { @@ -97,29 +121,32 @@ export async function stateInit(params: InitParams): Promise { const existing = getSession(db, sessionId); if (existing) { - // Session exists — determine the correct round from filesystem - const roundsDir = join(sessionDir, "rounds"); - let nextRound = 1; - - if (existsSync(roundsDir)) { - const roundDirs = readdirSync(roundsDir) - .filter((d) => /^round-\d+$/.test(d)) - .map((d) => parseInt(d.replace("round-", ""), 10)) - .sort((a, b) => a - b); - - if (roundDirs.length > 0) { - const highest = roundDirs[roundDirs.length - 1]!; - const hasFinal = existsSync( - join(roundsDir, `round-${highest}`, "final.md"), - ); - nextRound = hasFinal ? highest + 1 : highest; - } + // Workflow type compatibility: re-opening with a different type would + // corrupt phase semantics (review vs map have disjoint phase graphs). + if (existing.workflow_type !== workflowType) { + throw new Error( + `Cannot re-open session ${sessionId} as workflow_type "${workflowType}": ` + + `existing workflow_type is "${existing.workflow_type}". ` + + `Maps and reviews have disjoint phase graphs.`, + ); } + // Session exists — derive next round from DB events (authoritative) + // rather than filesystem (observational). Previously this read + // rounds/round-N/final.md presence on disk, which broke if the disk + // state was missing or out-of-sync with the DB. Events are the + // system of record; filesystem is a side-effect. + const nextRound = deriveNextRound(db, sessionId, existing.current_round); + + // Each workflow type starts at its own initial phase. The phase + // graph treats review and map vocabularies as disjoint — using the + // wrong one here causes every subsequent transition to be rejected. + const initialPhase = workflowType === "map" ? "map-context" : "context"; + // Re-open the session for the next round updateSession(db, sessionId, { status: "active", - current_phase: "context", + current_phase: initialPhase, phase_number: 1, current_round: nextRound, }); @@ -130,7 +157,7 @@ export async function stateInit(params: InitParams): Promise { nextRound > (existing.current_round ?? 1) ? "round_started" : "session_resumed", - phase: "context", + phase: initialPhase, phase_number: 1, round: nextRound, }); @@ -139,12 +166,14 @@ export async function stateInit(params: InitParams): Promise { return sessionId; } + const initialPhase = workflowType === "map" ? "map-context" : "context"; + // New session — original path insertSession(db, { id: sessionId, branch, workflow_type: workflowType, - current_phase: "context", + current_phase: initialPhase, phase_number: 1, current_round: 1, current_map_run: 1, @@ -154,7 +183,7 @@ export async function stateInit(params: InitParams): Promise { insertEvent(db, { session_id: sessionId, event_type: "session_created", - phase: "context", + phase: initialPhase, phase_number: 1, round: 1, }); @@ -164,6 +193,82 @@ export async function stateInit(params: InitParams): Promise { return sessionId; } +/** + * Phase-progression graphs. Each entry maps a phase to the set of phases + * legally reachable from it. Self-loops (idempotent re-entry of the same + * phase) are always allowed and don't need to appear in the map. + * + * `complete` loops back to the initial phase to allow a new round/run. + * + * Why enforce this: without a transition graph, the AI could jump from + * `reviews` straight to `complete`, skipping aggregation/discourse/ + * synthesis. The dashboard's outcome derivation (sessions.status) would + * still mark the workflow closed, masking the gap. Treating the phase + * sequence as a state machine makes that class of bug impossible. + */ +const REVIEW_PHASE_GRAPH: Record> = { + context: ["change-context"], + "change-context": ["analysis"], + analysis: ["reviews"], + reviews: ["aggregation"], + aggregation: ["discourse"], + discourse: ["synthesis"], + synthesis: ["complete"], + complete: ["context"], +}; + +const MAP_PHASE_GRAPH: Record> = { + "map-context": ["topology"], + topology: ["flow-analysis"], + "flow-analysis": ["requirements-mapping"], + "requirements-mapping": ["synthesis"], + synthesis: ["complete"], + complete: ["map-context"], +}; + +function graphFor( + workflowType: "review" | "map", +): Record> { + return workflowType === "review" ? REVIEW_PHASE_GRAPH : MAP_PHASE_GRAPH; +} + +/** + * Validate that `target` is a legal next phase given `source` and the + * workflow's type. Self-loops are always allowed. Round/mapRun bumps + * are treated as a permitted reset back to the first phase regardless + * of source (a new round legitimately starts over at `context`). + */ +function validatePhaseTransition( + workflowType: "review" | "map", + source: string, + target: string, + isRoundBoundary: boolean, +): void { + const graph = graphFor(workflowType); + // Target must belong to this workflow_type's phase vocabulary. + if (!(target in graph)) { + const validPhases = Object.keys(graph).join(", "); + throw new Error( + `Invalid phase "${target}" for workflow_type "${workflowType}". ` + + `Valid phases: ${validPhases}`, + ); + } + // Same-phase re-entry: always allowed (retries, idempotent calls). + if (source === target) return; + // Round/mapRun boundary: any phase of the same workflow is reachable. + if (isRoundBoundary) return; + const allowed = graph[source]; + if (!allowed || !allowed.includes(target)) { + throw new Error( + `Illegal phase transition: ${source} → ${target}. ` + + `From "${source}", only ${ + allowed && allowed.length > 0 ? allowed.join(", ") : "(no edges)" + } are reachable. ` + + `Pass --current-round to start a new round if the workflow is resetting.`, + ); + } +} + /** * Transition a session to a new phase in SQLite. */ @@ -178,6 +283,17 @@ export async function stateTransition(params: TransitionParams): Promise { } const previousRound = existing.current_round; + const previousMapRun = existing.current_map_run; + const isRoundBoundary = + (round !== undefined && round !== previousRound) || + (mapRun !== undefined && mapRun !== previousMapRun); + + validatePhaseTransition( + existing.workflow_type, + existing.current_phase, + phase, + isRoundBoundary, + ); updateSession(db, sessionId, { current_phase: phase, @@ -208,8 +324,22 @@ export async function stateTransition(params: TransitionParams): Promise { saveDatabase(db, dbPath); } +/** Sentinel exit code stamped on dependent rows cascade-closed by a + * parent stateClose. Distinct from -2 (user cancel) and -3 (orphaned by + * liveness sweep) so triage can tell the cause apart. */ +const CASCADE_CLOSE_EXIT_CODE = -4; + /** * Close a session in SQLite. + * + * Idempotent: if the session is already `closed`, returns without writing + * a second `session_closed` event. + * + * Cascades to dependent `command_executions` rows: any still in flight + * (finished_at IS NULL) for this workflow are stamped terminal with + * exit_code = -4 and a structured note. Without this, closing a workflow + * left stranded child rows whose only cleanup path was the heartbeat + * liveness sweep — and that sweep depends on the dashboard running. */ export async function stateClose(params: CloseParams): Promise { const { sessionId, ocrDir } = params; @@ -221,6 +351,14 @@ export async function stateClose(params: CloseParams): Promise { throw new Error(`Session not found: ${sessionId}`); } + if (existing.status === "closed") { + // Idempotent no-op. Caller still gets a clean exit; the stderr + // notice tells them their action had no effect — useful when the AI + // accidentally retries close after a successful first attempt. + console.error(`[ocr] Session already closed: ${sessionId}`); + return; + } + updateSession(db, sessionId, { status: "closed", current_phase: "complete", @@ -234,6 +372,22 @@ export async function stateClose(params: CloseParams): Promise { round: existing.current_round, }); + // Cascade: terminate any dependent command_executions rows still in + // flight. Without this, a workflow close leaves orphan rows that only + // the heartbeat sweep can recover — and that sweep needs the dashboard + // running. Doing it here makes close authoritative. + const note = "closed by parent workflow close"; + db.run( + `UPDATE command_executions + SET finished_at = datetime('now'), + exit_code = ?, + pid = NULL, + notes = COALESCE(notes || char(10), '') || ? + WHERE workflow_id = ? + AND finished_at IS NULL`, + [CASCADE_CLOSE_EXIT_CODE, note, sessionId], + ); + saveDatabase(db, dbPath); } @@ -315,20 +469,138 @@ export async function stateList( } /** - * Resolves the active session ID from SQLite. - * Throws if no active session is found. + * How the resolver arrived at the chosen session. Surfaced on the + * result so callers (and tests) can verify the decision path. Also + * printed to stderr by {@link announceResolveDecision} so users see + * which session a command will affect when they omit `--session-id`. + */ +export type ResolveDecision = "explicit" | "dashboard-uid" | "latest-active"; + +export type ResolveSessionResult = { + id: string; + session_dir: string; + current_round: number; + current_map_run: number; + workflow_type: "review" | "map"; + decision: ResolveDecision; +}; + +/** + * Single source of truth for "which session does this CLI invocation + * apply to?". Replaces the two parallel helpers that previously diverged + * (resolveActiveSession + resolveSessionForCompletion). Used by every + * `state` and `session` subcommand that accepts an optional `--session-id`. + * + * Resolution order, most-specific to least: + * 1. `explicitId` — caller passed `--session-id` + * 2. `OCR_DASHBOARD_EXECUTION_UID` env var → `command_executions.workflow_id`. + * Set by the dashboard when it spawns the AI; the SessionCaptureService + * binds that uid to the workflow_id once the AI calls `state init`. + * 3. latest-active fallback — only when exactly one active session exists. + * With >1 active sessions and no env var, this throws an ambiguity + * error rather than silently picking one. Brittle auto-detect is the + * root cause of the "wrong session got closed" failure mode. + */ +export function resolveSession( + db: Database, + explicitId?: string, +): ResolveSessionResult { + // 1. Explicit + if (explicitId) { + const s = getSession(db, explicitId); + if (!s) throw new Error(`Session not found: ${explicitId}`); + return { + id: s.id, + session_dir: s.session_dir, + current_round: s.current_round, + current_map_run: s.current_map_run, + workflow_type: s.workflow_type, + decision: "explicit", + }; + } + + // 2. Dashboard execution UID + const uid = process.env["OCR_DASHBOARD_EXECUTION_UID"]; + if (uid) { + const result = db.exec( + "SELECT workflow_id FROM command_executions WHERE uid = ?", + [uid], + ); + const workflowId = result[0]?.values[0]?.[0] as string | null | undefined; + if (workflowId) { + const s = getSession(db, workflowId); + if (s) { + return { + id: s.id, + session_dir: s.session_dir, + current_round: s.current_round, + current_map_run: s.current_map_run, + workflow_type: s.workflow_type, + decision: "dashboard-uid", + }; + } + } + // env var present but no linkage yet (race window before the + // capture service binds workflow_id). Fall through to latest-active. + } + + // 3. Latest-active. Refuse if ambiguous. + const activeRows = db.exec( + `SELECT id, session_dir, current_round, current_map_run, workflow_type + FROM sessions + WHERE status = 'active' + ORDER BY started_at DESC`, + ); + const rows = activeRows[0]?.values ?? []; + if (rows.length === 0) throw new Error("No active session found"); + if (rows.length > 1) { + const ids = rows.map((r) => r[0] as string); + throw new Error( + `Ambiguous auto-detect: ${rows.length} active sessions exist. ` + + `Pass --session-id explicitly. Candidates: ${ids.join(", ")}`, + ); + } + const row = rows[0]!; + return { + id: row[0] as string, + session_dir: row[1] as string, + current_round: row[2] as number, + current_map_run: row[3] as number, + workflow_type: row[4] as "review" | "map", + decision: "latest-active", + }; +} + +/** + * Print the auto-detect decision to stderr so a user running a CLI + * subcommand without `--session-id` sees which session they're acting on. + * No-op when the caller passed an explicit id — they already know. + */ +export function announceResolveDecision(r: ResolveSessionResult): void { + if (r.decision === "explicit") return; + const path = + r.decision === "dashboard-uid" + ? "via OCR_DASHBOARD_EXECUTION_UID" + : "via latest-active"; + console.error(`[ocr] Auto-detected session: ${r.id} (${path})`); +} + +/** + * Backward-compat shim for callers that still take `ocrDir` instead of + * a Database handle (CLI subcommands in state.ts / session.ts). New code + * should prefer {@link resolveSession} directly. */ export async function resolveActiveSession( ocrDir: string, -): Promise<{ id: string; sessionDir: string }> { + explicitId?: string, +): Promise<{ id: string; sessionDir: string; decision: ResolveDecision }> { const db = await ensureDatabase(ocrDir); - const session = getLatestActiveSession(db); - if (!session) { - throw new Error("No active session found"); - } + const result = resolveSession(db, explicitId); + announceResolveDecision(result); return { - id: session.id, - sessionDir: session.session_dir, + id: result.id, + sessionDir: result.session_dir, + decision: result.decision, }; } @@ -362,66 +634,6 @@ function parseRawJson(raw: string, label: string): unknown { } } -/** - * Resolve the active session for a completion command. - * - * Resolution order, most-specific to least: - * 1. Explicit `--session-id` argument — caller knows exactly which row. - * 2. `OCR_DASHBOARD_EXECUTION_UID` env var → `command_executions.workflow_id`. - * Set by the dashboard when it spawns the AI; lets `state round-complete` - * and `state close-session` find their workflow even when several - * sessions are stale-active in the DB. Without this, the latest-active - * fallback can pick a wrong recently-modified session — see the - * "session auto-detect picked the wrong row" failure mode. - * 3. `getLatestActiveSession` — works fine for direct CLI use where there - * is typically only one active session in a project. - */ -function resolveSessionForCompletion( - db: Database, - explicitId?: string, -): { id: string; session_dir: string; current_round: number; current_map_run: number } { - if (explicitId) { - const existing = getSession(db, explicitId); - if (!existing) throw new Error(`Session not found: ${explicitId}`); - return { - id: existing.id, - session_dir: existing.session_dir, - current_round: existing.current_round, - current_map_run: existing.current_map_run, - }; - } - // Path 2 — env var linkage. Skip silently when not running under the - // dashboard (env var absent) or when the linkage hasn't been recorded - // yet (race window before the dashboard binds the workflow). - const dashboardUid = process.env["OCR_DASHBOARD_EXECUTION_UID"]; - if (dashboardUid) { - const result = db.exec( - "SELECT workflow_id FROM command_executions WHERE uid = ?", - [dashboardUid], - ); - const row = result[0]?.values[0]; - const workflowId = row?.[0] as string | null | undefined; - if (workflowId) { - const existing = getSession(db, workflowId); - if (existing) { - return { - id: existing.id, - session_dir: existing.session_dir, - current_round: existing.current_round, - current_map_run: existing.current_map_run, - }; - } - } - } - const active = getLatestActiveSession(db); - if (!active) throw new Error("No active session found"); - return { - id: active.id, - session_dir: active.session_dir, - current_round: active.current_round, - current_map_run: active.current_map_run, - }; -} // ── Round-meta validation helpers ── @@ -587,7 +799,7 @@ export async function stateRoundComplete( const counts = computeRoundCounts(meta); // ── 3. Resolve session and round ── - const session = resolveSessionForCompletion(db, params.sessionId); + const session = resolveSession(db, params.sessionId); const roundNumber = params.round ?? session.current_round; // ── 4. Write round-meta.json when source is stdin ── @@ -617,6 +829,14 @@ export async function stateRoundComplete( }), }); + // ── 6. Advance current_round on the session row. Without this, the + // sessions table lags the events log — the next stateInit re-open + // would have to re-derive round each time. Keeping the column in + // sync with the event log lets the dashboard read it directly. + if (roundNumber >= session.current_round) { + updateSession(db, session.id, { current_round: roundNumber }); + } + saveDatabase(db, dbPath); return { sessionId: session.id, round: roundNumber, metaPath }; @@ -724,7 +944,7 @@ export async function stateMapComplete( const counts = computeMapCounts(meta); // ── 3. Resolve session and map run ── - const session = resolveSessionForCompletion(db, params.sessionId); + const session = resolveSession(db, params.sessionId); const mapRunNumber = params.mapRun ?? session.current_map_run; // ── 4. Write map-meta.json when source is stdin ── @@ -804,20 +1024,35 @@ export async function stateSync(ocrDir: string): Promise { const branchMatch = dirName.match(/^\d{4}-\d{2}-\d{2}-(.+)$/); const branch = branchMatch?.[1] ?? dirName; - // Determine completion phase from filesystem artifacts. - // Sessions with a final.md (review) or map.md (map) in their latest - // round/run are complete. + // Reconstruct the most-likely terminal state from artifacts. + // Sessions with a final.md (review) / map.md (map) in their latest + // round/run are complete; phase_number tracks the workflow's terminal + // phase index so the dashboard renders the same progress as a session + // that closed cleanly. let inferredPhase = "context"; + let inferredPhaseNumber = 1; + let inferredRound = 1; + let inferredMapRun = 1; if (workflowType === "review") { const roundsDir = join(dirPath, "rounds"); if (existsSync(roundsDir)) { const roundDirs = readdirSync(roundsDir) .filter((d) => /^round-\d+$/.test(d)) - .sort((a, b) => parseInt(a.replace(/^\D+-/, ""), 10) - parseInt(b.replace(/^\D+-/, ""), 10)); - const latestRound = roundDirs[roundDirs.length - 1]; - if (latestRound && existsSync(join(roundsDir, latestRound, "final.md"))) { - inferredPhase = "complete"; + .map((d) => parseInt(d.replace("round-", ""), 10)) + .filter((n) => Number.isFinite(n)) + .sort((a, b) => a - b); + const latestRoundNum = roundDirs[roundDirs.length - 1]; + if (latestRoundNum !== undefined) { + inferredRound = latestRoundNum; + if ( + existsSync( + join(roundsDir, `round-${latestRoundNum}`, "final.md"), + ) + ) { + inferredPhase = "complete"; + inferredPhaseNumber = 8; + } } } } else if (workflowType === "map") { @@ -825,10 +1060,18 @@ export async function stateSync(ocrDir: string): Promise { if (existsSync(runsDir)) { const runDirs = readdirSync(runsDir) .filter((d) => /^run-\d+$/.test(d)) - .sort((a, b) => parseInt(a.replace(/^\D+-/, ""), 10) - parseInt(b.replace(/^\D+-/, ""), 10)); - const latestRun = runDirs[runDirs.length - 1]; - if (latestRun && existsSync(join(runsDir, latestRun, "map.md"))) { - inferredPhase = "complete"; + .map((d) => parseInt(d.replace("run-", ""), 10)) + .filter((n) => Number.isFinite(n)) + .sort((a, b) => a - b); + const latestRunNum = runDirs[runDirs.length - 1]; + if (latestRunNum !== undefined) { + inferredMapRun = latestRunNum; + if ( + existsSync(join(runsDir, `run-${latestRunNum}`, "map.md")) + ) { + inferredPhase = "complete"; + inferredPhaseNumber = 6; + } } } } @@ -838,9 +1081,9 @@ export async function stateSync(ocrDir: string): Promise { branch, workflow_type: workflowType, current_phase: inferredPhase, - phase_number: 1, - current_round: 1, - current_map_run: 1, + phase_number: inferredPhaseNumber, + current_round: inferredRound, + current_map_run: inferredMapRun, session_dir: dirPath, }); From 4bf35963b31cae1428fcce178a0d9168782ad468 Mon Sep 17 00:00:00 2001 From: Spencer Marx Date: Mon, 11 May 2026 22:37:12 +0200 Subject: [PATCH 4/4] feat(dashboard,cli): sweep stale-active sessions + periodic dashboard timer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `sessions.status = 'active'` rows previously had no automated cleanup — sessions that initialised but never reached close-session accumulated forever, poisoning latest-active auto-detect (the root cause of the "wrong session got closed" failure mode). The Wrkbelt DB showed 5 March stragglers from this exact path. Adds `sweepStaleSessions(db, thresholdSeconds)` — closes any status='active' row whose most recent orchestration_event is older than the threshold AND has no in-flight dependent command_executions. Writes a `session_auto_closed_stale` event recording the reason. Wires the new sweep alongside the existing agent-session liveness sweep: - Dashboard startup: both sweeps run before the API routes register - Periodic: 5-minute timer inside the running dashboard runs both sweeps so long-running dashboards don't accumulate stranded rows (the previous design only swept on startup) Threshold for stale sessions: 7 days. Long enough that an in-progress review can sit overnight without triggering, short enough that abandoned/crashed initialisations get cleaned up within a week. Tests cover: closes past-threshold sessions, leaves recent sessions alone, preserves stale-active sessions with in-flight dependents, writes the auto_closed_stale event with the threshold. Co-Authored-By: claude-flow --- .../lib/db/__tests__/agent-sessions.test.ts | 103 ++++++++++++++++++ packages/cli/src/lib/db/agent-sessions.ts | 74 +++++++++++++ packages/cli/src/lib/db/index.ts | 1 + packages/cli/src/lib/db/types.ts | 9 ++ packages/dashboard/src/server/index.ts | 53 ++++++++- 5 files changed, 238 insertions(+), 2 deletions(-) diff --git a/packages/cli/src/lib/db/__tests__/agent-sessions.test.ts b/packages/cli/src/lib/db/__tests__/agent-sessions.test.ts index 6f3a68b..39018d3 100644 --- a/packages/cli/src/lib/db/__tests__/agent-sessions.test.ts +++ b/packages/cli/src/lib/db/__tests__/agent-sessions.test.ts @@ -15,6 +15,7 @@ import { bindVendorSessionIdOpportunistically, setAgentSessionStatus, sweepStaleAgentSessions, + sweepStaleSessions, } from "../index.js"; import { runMigrations } from "../migrations.js"; import type { Database } from "sql.js"; @@ -284,6 +285,108 @@ describe("sweepStaleAgentSessions", () => { }); }); +describe("sweepStaleSessions", () => { + // Each test seeds its own session and asserts on that session_id + // alone — the freshDb's WORKFLOW_ID row has no events and would also + // be swept on every run, so we test inclusion rather than strict + // array equality. + + it("closes active sessions whose last event is past the threshold", () => { + insertSession(db, { + id: "stale-old", + branch: "feat/stale", + workflow_type: "review", + session_dir: ".ocr/sessions/stale-old", + }); + // Seed a recent event so this session DOES have history — then + // backdate it to look ancient. + db.run( + `INSERT INTO orchestration_events + (session_id, event_type, phase, phase_number, round, created_at) + VALUES ('stale-old', 'session_created', 'context', 1, 1, datetime('now', '-30 days'))`, + ); + + const result = sweepStaleSessions(db, 7 * 24 * 60 * 60); + + expect(result.closedSessionIds).toContain("stale-old"); + const after = db.exec("SELECT status FROM sessions WHERE id = 'stale-old'"); + expect(after[0]?.values[0]?.[0]).toBe("closed"); + }); + + it("leaves recently-active sessions alone", () => { + insertSession(db, { + id: "fresh-session", + branch: "feat/fresh", + workflow_type: "review", + session_dir: ".ocr/sessions/fresh-session", + }); + // Recent event — sweep should leave this session alone. + db.run( + `INSERT INTO orchestration_events + (session_id, event_type, phase, phase_number, round, created_at) + VALUES ('fresh-session', 'session_created', 'context', 1, 1, datetime('now'))`, + ); + + const result = sweepStaleSessions(db, 7 * 24 * 60 * 60); + expect(result.closedSessionIds).not.toContain("fresh-session"); + }); + + it("does NOT close a stale-active session that still has in-flight dependents", () => { + // The invariant: stale sweep only fires when no command_executions + // are still in flight. Protects long-running but quiet workflows + // (e.g. an AI thinking for hours without writing a state event). + insertSession(db, { + id: "stale-with-deps", + branch: "feat/sd", + workflow_type: "review", + session_dir: ".ocr/sessions/stale-with-deps", + }); + db.run( + `INSERT INTO orchestration_events + (session_id, event_type, phase, phase_number, round, created_at) + VALUES ('stale-with-deps', 'session_created', 'context', 1, 1, datetime('now', '-30 days'))`, + ); + // In-flight dependent row: finished_at IS NULL. + db.run( + `INSERT INTO command_executions (uid, command, args, started_at, workflow_id) + VALUES ('live-uid', 'review', '[]', datetime('now'), 'stale-with-deps')`, + ); + + const result = sweepStaleSessions(db, 7 * 24 * 60 * 60); + + expect(result.closedSessionIds).not.toContain("stale-with-deps"); + const after = db.exec( + "SELECT status FROM sessions WHERE id = 'stale-with-deps'", + ); + expect(after[0]?.values[0]?.[0]).toBe("active"); + }); + + it("writes a session_auto_closed_stale event with the threshold", () => { + insertSession(db, { + id: "stale-event", + branch: "feat/se", + workflow_type: "review", + session_dir: ".ocr/sessions/stale-event", + }); + db.run( + `INSERT INTO orchestration_events + (session_id, event_type, phase, phase_number, round, created_at) + VALUES ('stale-event', 'session_created', 'context', 1, 1, datetime('now', '-30 days'))`, + ); + + sweepStaleSessions(db, 7 * 24 * 60 * 60); + + const events = db.exec( + `SELECT metadata FROM orchestration_events + WHERE session_id = 'stale-event' + AND event_type = 'session_auto_closed_stale'`, + ); + expect(events[0]?.values.length).toBe(1); + const metadata = JSON.parse(events[0]!.values[0]![0] as string); + expect(metadata.threshold_seconds).toBe(7 * 24 * 60 * 60); + }); +}); + describe("bindVendorSessionIdOpportunistically", () => { it("returns null when no candidate row exists", () => { const result = bindVendorSessionIdOpportunistically(db, "vendor-xyz"); diff --git a/packages/cli/src/lib/db/agent-sessions.ts b/packages/cli/src/lib/db/agent-sessions.ts index ad03611..eb20595 100644 --- a/packages/cli/src/lib/db/agent-sessions.ts +++ b/packages/cli/src/lib/db/agent-sessions.ts @@ -487,3 +487,77 @@ export function sweepStaleAgentSessions( orphanedIds: stale.map((row) => row.uid ?? String(row.id)), }; } + +/** + * Sweep stale `sessions.status = 'active'` rows. + * + * A row is considered stale when ALL of the following hold: + * - status is still 'active' + * - no orchestration_event has been recorded for it within + * `thresholdSeconds` (default 7 days at call sites) + * - no dependent command_executions are still in flight + * (every linked row has finished_at NOT NULL) + * + * Stale rows are flipped to 'closed' with a `session_auto_closed_stale` + * event recording the threshold and the last-event-age. This stops them + * from poisoning latest-active auto-detect — the exact failure mode that + * caused the "wrong session closed" bug. + * + * Returns the closed session_ids. + */ +export function sweepStaleSessions( + db: Database, + thresholdSeconds: number, +): import("./types.js").StaleSessionSweepResult { + // Find active sessions whose most recent event is older than the + // threshold AND have no in-flight dependent rows. + const sql = ` + SELECT s.id + FROM sessions s + LEFT JOIN ( + SELECT session_id, MAX(created_at) AS last_event_at + FROM orchestration_events + GROUP BY session_id + ) e ON e.session_id = s.id + WHERE s.status = 'active' + AND ( + e.last_event_at IS NULL + OR (julianday('now') - julianday(e.last_event_at)) * 86400 > ? + ) + AND NOT EXISTS ( + SELECT 1 FROM command_executions ce + WHERE ce.workflow_id = s.id + AND ce.finished_at IS NULL + ) + `; + const rows = resultToRows<{ id: string }>(db.exec(sql, [thresholdSeconds])); + + if (rows.length === 0) { + return { closedSessionIds: [] }; + } + + for (const row of rows) { + db.run( + `UPDATE sessions + SET status = 'closed', + current_phase = 'complete', + updated_at = datetime('now') + WHERE id = ?`, + [row.id], + ); + db.run( + `INSERT INTO orchestration_events + (session_id, event_type, phase, phase_number, round, metadata, created_at) + VALUES (?, 'session_auto_closed_stale', 'complete', NULL, NULL, ?, datetime('now'))`, + [ + row.id, + JSON.stringify({ + reason: "no events past threshold; no in-flight dependents", + threshold_seconds: thresholdSeconds, + }), + ], + ); + } + + return { closedSessionIds: rows.map((r) => r.id) }; +} diff --git a/packages/cli/src/lib/db/index.ts b/packages/cli/src/lib/db/index.ts index b581d82..8f9bd50 100644 --- a/packages/cli/src/lib/db/index.ts +++ b/packages/cli/src/lib/db/index.ts @@ -54,6 +54,7 @@ export { setAgentSessionStatus, updateAgentSession, sweepStaleAgentSessions, + sweepStaleSessions, } from "./agent-sessions.js"; export type { WorkflowType, SessionStatus } from "../state/types.js"; diff --git a/packages/cli/src/lib/db/types.ts b/packages/cli/src/lib/db/types.ts index a7beab7..059835e 100644 --- a/packages/cli/src/lib/db/types.ts +++ b/packages/cli/src/lib/db/types.ts @@ -113,6 +113,15 @@ export type SweepResult = { orphanedIds: string[]; }; +/** + * Result of sweepStaleSessions — the workflow_ids that were + * auto-closed because they'd been idle past the threshold AND had no + * running dependent rows. + */ +export type StaleSessionSweepResult = { + closedSessionIds: string[]; +}; + // ── Migration types ── export type Migration = { diff --git a/packages/dashboard/src/server/index.ts b/packages/dashboard/src/server/index.ts index c94a3a6..e5fec0b 100644 --- a/packages/dashboard/src/server/index.ts +++ b/packages/dashboard/src/server/index.ts @@ -38,7 +38,12 @@ import { registerCommandHandlers } from './socket/command-runner.js' import { registerChatHandlers, cleanupAllChats } from './socket/chat-handler.js' import { registerPostHandlers, cleanupAllPostGenerations } from './socket/post-handler.js' import { flushSave } from './routes/progress.js' -import { replayCommandLog, sweepStaleAgentSessions, walCheckpointTruncate } from '@open-code-review/cli/db' +import { + replayCommandLog, + sweepStaleAgentSessions, + sweepStaleSessions, + walCheckpointTruncate, +} from '@open-code-review/cli/db' import { getAgentHeartbeatSeconds } from '@open-code-review/cli/runtime-config' import { homedir } from 'node:os' @@ -301,7 +306,8 @@ export async function startServer(options: StartServerOptions = {}): Promise 0) { @@ -311,6 +317,49 @@ export async function startServer(options: StartServerOptions = {}): Promise 0) { + saveDb(db, ocrDir) + console.log( + ` Auto-closed ${staleSessionResult.closedSessionIds.length} stale active session(s) (threshold 7 days)` + ) + } + + // ── Periodic sweep timer ── + // Runs every 5 minutes inside the running dashboard so liveness and + // stale-session cleanup keep happening without a restart. Each sweep + // is cheap (single SQL update per sweep type); 5 min keeps the cadence + // responsive without DB pressure. + const SWEEP_INTERVAL_MS = 5 * 60 * 1000 + const sweepTimer = setInterval(() => { + try { + const agentSweep = sweepStaleAgentSessions(db, heartbeatSeconds) + const sessionSweep = sweepStaleSessions( + db, + STALE_SESSION_THRESHOLD_SECONDS, + ) + if ( + agentSweep.orphanedIds.length > 0 || + sessionSweep.closedSessionIds.length > 0 + ) { + saveDb(db, ocrDir) + } + } catch (err) { + console.error('[sweep] periodic sweep failed:', err) + } + }, SWEEP_INTERVAL_MS) + // Don't block process exit on the timer. + sweepTimer.unref() + // ── API Routes ── // GET /api/reviews — all review rounds across sessions