Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions packages/cli/src/commands/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ const startInstanceSubcommand = new Command("start-instance")
const db = await ensureDatabase(ocrDir);

try {
const workflowId = options.workflow
?? (await resolveActiveSession(ocrDir)).id;
const { id: workflowId } = await resolveActiveSession(
ocrDir,
options.workflow,
);

const id = randomUUID();
const persona = options.persona ?? null;
Expand Down Expand Up @@ -235,8 +237,10 @@ const listSubcommand = new Command("list")
const db = await ensureDatabase(ocrDir);

try {
const workflowId = options.workflow
?? (await resolveActiveSession(ocrDir)).id;
const { id: workflowId } = await resolveActiveSession(
ocrDir,
options.workflow,
);
const rows = listAgentSessionsForWorkflow(db, workflowId);

if (options.json) {
Expand Down
32 changes: 18 additions & 14 deletions packages/cli/src/commands/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,18 +249,20 @@ const transitionSubcommand = new Command("transition")
requireOcrSetup(targetDir);
const ocrDir = join(targetDir, ".ocr");

const VALID_PHASES = new Set<string>([
"context", "change-context", "analysis", "reviews",
"aggregation", "discourse", "synthesis", "complete",
"map-context", "topology", "flow-analysis", "requirements-mapping",
]);
if (!VALID_PHASES.has(options.phase)) {
throw new Error(`Invalid phase: "${options.phase}". Must be one of: ${[...VALID_PHASES].join(", ")}`);
}

try {
const sessionId = options.sessionId
?? (await resolveActiveSession(ocrDir)).id;
// Phase validation now lives in stateTransition itself, where it
// can see the session's workflow_type and check legal transitions
// against the workflow-typed phase graph (review vs map). The
// previous flat VALID_PHASES set let a review workflow transition
// to map phases (e.g. "topology") without complaint.
// Single auto-detect path (resolveActiveSession is the back-compat
// shim over resolveSession). Threading the explicit id through
// gives us validation of bad ids AND the stderr announcement when
// we auto-detect.
const { id: sessionId } = await resolveActiveSession(
ocrDir,
options.sessionId,
);

await stateTransition({
sessionId,
Expand Down Expand Up @@ -289,15 +291,17 @@ const transitionSubcommand = new Command("transition")

const closeSubcommand = new Command("close")
.description("Close a session")
.option("--session-id <id>", "Session ID (auto-detects latest active if omitted)")
.option("--session-id <id>", "Session ID (auto-detects latest active if omitted; refuses on ambiguity)")
.action(async (options: { sessionId?: string }) => {
const targetDir = process.cwd();
requireOcrSetup(targetDir);
const ocrDir = join(targetDir, ".ocr");

try {
const sessionId = options.sessionId
?? (await resolveActiveSession(ocrDir)).id;
const { id: sessionId } = await resolveActiveSession(
ocrDir,
options.sessionId,
);

await stateClose({
sessionId,
Expand Down
103 changes: 103 additions & 0 deletions packages/cli/src/lib/db/__tests__/agent-sessions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
bindVendorSessionIdOpportunistically,
setAgentSessionStatus,
sweepStaleAgentSessions,
sweepStaleSessions,
} from "../index.js";
import { runMigrations } from "../migrations.js";
import type { Database } from "sql.js";
Expand Down Expand Up @@ -284,6 +285,108 @@ describe("sweepStaleAgentSessions", () => {
});
});

describe("sweepStaleSessions", () => {
// Each test seeds its own session and asserts on that session_id
// alone — the freshDb's WORKFLOW_ID row has no events and would also
// be swept on every run, so we test inclusion rather than strict
// array equality.

it("closes active sessions whose last event is past the threshold", () => {
insertSession(db, {
id: "stale-old",
branch: "feat/stale",
workflow_type: "review",
session_dir: ".ocr/sessions/stale-old",
});
// Seed a recent event so this session DOES have history — then
// backdate it to look ancient.
db.run(
`INSERT INTO orchestration_events
(session_id, event_type, phase, phase_number, round, created_at)
VALUES ('stale-old', 'session_created', 'context', 1, 1, datetime('now', '-30 days'))`,
);

const result = sweepStaleSessions(db, 7 * 24 * 60 * 60);

expect(result.closedSessionIds).toContain("stale-old");
const after = db.exec("SELECT status FROM sessions WHERE id = 'stale-old'");
expect(after[0]?.values[0]?.[0]).toBe("closed");
});

it("leaves recently-active sessions alone", () => {
insertSession(db, {
id: "fresh-session",
branch: "feat/fresh",
workflow_type: "review",
session_dir: ".ocr/sessions/fresh-session",
});
// Recent event — sweep should leave this session alone.
db.run(
`INSERT INTO orchestration_events
(session_id, event_type, phase, phase_number, round, created_at)
VALUES ('fresh-session', 'session_created', 'context', 1, 1, datetime('now'))`,
);

const result = sweepStaleSessions(db, 7 * 24 * 60 * 60);
expect(result.closedSessionIds).not.toContain("fresh-session");
});

it("does NOT close a stale-active session that still has in-flight dependents", () => {
// The invariant: stale sweep only fires when no command_executions
// are still in flight. Protects long-running but quiet workflows
// (e.g. an AI thinking for hours without writing a state event).
insertSession(db, {
id: "stale-with-deps",
branch: "feat/sd",
workflow_type: "review",
session_dir: ".ocr/sessions/stale-with-deps",
});
db.run(
`INSERT INTO orchestration_events
(session_id, event_type, phase, phase_number, round, created_at)
VALUES ('stale-with-deps', 'session_created', 'context', 1, 1, datetime('now', '-30 days'))`,
);
// In-flight dependent row: finished_at IS NULL.
db.run(
`INSERT INTO command_executions (uid, command, args, started_at, workflow_id)
VALUES ('live-uid', 'review', '[]', datetime('now'), 'stale-with-deps')`,
);

const result = sweepStaleSessions(db, 7 * 24 * 60 * 60);

expect(result.closedSessionIds).not.toContain("stale-with-deps");
const after = db.exec(
"SELECT status FROM sessions WHERE id = 'stale-with-deps'",
);
expect(after[0]?.values[0]?.[0]).toBe("active");
});

it("writes a session_auto_closed_stale event with the threshold", () => {
insertSession(db, {
id: "stale-event",
branch: "feat/se",
workflow_type: "review",
session_dir: ".ocr/sessions/stale-event",
});
db.run(
`INSERT INTO orchestration_events
(session_id, event_type, phase, phase_number, round, created_at)
VALUES ('stale-event', 'session_created', 'context', 1, 1, datetime('now', '-30 days'))`,
);

sweepStaleSessions(db, 7 * 24 * 60 * 60);

const events = db.exec(
`SELECT metadata FROM orchestration_events
WHERE session_id = 'stale-event'
AND event_type = 'session_auto_closed_stale'`,
);
expect(events[0]?.values.length).toBe(1);
const metadata = JSON.parse(events[0]!.values[0]![0] as string);
expect(metadata.threshold_seconds).toBe(7 * 24 * 60 * 60);
});
});

describe("bindVendorSessionIdOpportunistically", () => {
it("returns null when no candidate row exists", () => {
const result = bindVendorSessionIdOpportunistically(db, "vendor-xyz");
Expand Down
74 changes: 74 additions & 0 deletions packages/cli/src/lib/db/agent-sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,77 @@ export function sweepStaleAgentSessions(
orphanedIds: stale.map((row) => row.uid ?? String(row.id)),
};
}

/**
* Sweep stale `sessions.status = 'active'` rows.
*
* A row is considered stale when ALL of the following hold:
* - status is still 'active'
* - no orchestration_event has been recorded for it within
* `thresholdSeconds` (default 7 days at call sites)
* - no dependent command_executions are still in flight
* (every linked row has finished_at NOT NULL)
*
* Stale rows are flipped to 'closed' with a `session_auto_closed_stale`
* event recording the threshold and the last-event-age. This stops them
* from poisoning latest-active auto-detect — the exact failure mode that
* caused the "wrong session closed" bug.
*
* Returns the closed session_ids.
*/
export function sweepStaleSessions(
db: Database,
thresholdSeconds: number,
): import("./types.js").StaleSessionSweepResult {
// Find active sessions whose most recent event is older than the
// threshold AND have no in-flight dependent rows.
const sql = `
SELECT s.id
FROM sessions s
LEFT JOIN (
SELECT session_id, MAX(created_at) AS last_event_at
FROM orchestration_events
GROUP BY session_id
) e ON e.session_id = s.id
WHERE s.status = 'active'
AND (
e.last_event_at IS NULL
OR (julianday('now') - julianday(e.last_event_at)) * 86400 > ?
)
AND NOT EXISTS (
SELECT 1 FROM command_executions ce
WHERE ce.workflow_id = s.id
AND ce.finished_at IS NULL
)
`;
const rows = resultToRows<{ id: string }>(db.exec(sql, [thresholdSeconds]));

if (rows.length === 0) {
return { closedSessionIds: [] };
}

for (const row of rows) {
db.run(
`UPDATE sessions
SET status = 'closed',
current_phase = 'complete',
updated_at = datetime('now')
WHERE id = ?`,
[row.id],
);
db.run(
`INSERT INTO orchestration_events
(session_id, event_type, phase, phase_number, round, metadata, created_at)
VALUES (?, 'session_auto_closed_stale', 'complete', NULL, NULL, ?, datetime('now'))`,
[
row.id,
JSON.stringify({
reason: "no events past threshold; no in-flight dependents",
threshold_seconds: thresholdSeconds,
}),
],
);
}

return { closedSessionIds: rows.map((r) => r.id) };
}
1 change: 1 addition & 0 deletions packages/cli/src/lib/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export {
setAgentSessionStatus,
updateAgentSession,
sweepStaleAgentSessions,
sweepStaleSessions,
} from "./agent-sessions.js";

export type { WorkflowType, SessionStatus } from "../state/types.js";
Expand Down
9 changes: 9 additions & 0 deletions packages/cli/src/lib/db/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ export type SweepResult = {
orphanedIds: string[];
};

/**
* Result of sweepStaleSessions — the workflow_ids that were
* auto-closed because they'd been idle past the threshold AND had no
* running dependent rows.
*/
export type StaleSessionSweepResult = {
closedSessionIds: string[];
};

// ── Migration types ──

export type Migration = {
Expand Down
56 changes: 49 additions & 7 deletions packages/cli/src/lib/progress/__tests__/progress-sqlite.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,31 @@ function createSessionDir(sessionId: string): string {
return dir;
}

/**
* Ordered phase walks per workflow type. The state machine rejects
* direct jumps from `context` to (e.g.) `analysis`, so tests that want
* to land at a specific phase need to walk the legal edges in between.
*/
const REVIEW_PHASE_ORDER: ReadonlyArray<{ phase: ReviewPhase; n: number }> = [
{ phase: "context", n: 1 },
{ phase: "change-context", n: 2 },
{ phase: "analysis", n: 3 },
{ phase: "reviews", n: 4 },
{ phase: "aggregation", n: 5 },
{ phase: "discourse", n: 6 },
{ phase: "synthesis", n: 7 },
{ phase: "complete", n: 8 },
];

const MAP_PHASE_ORDER: ReadonlyArray<{ phase: MapPhase; n: number }> = [
{ phase: "map-context", n: 1 },
{ phase: "topology", n: 2 },
{ phase: "flow-analysis", n: 3 },
{ phase: "requirements-mapping", n: 4 },
{ phase: "synthesis", n: 5 },
{ phase: "complete", n: 6 },
];

async function initDbAndSession(
sessionId: string,
workflowType: "review" | "map",
Expand All @@ -60,13 +85,30 @@ async function initDbAndSession(
ocrDir,
});

if (phase !== "context" || phaseNumber !== 1) {
await stateTransition({
sessionId,
phase: phase as ReviewPhase | MapPhase,
phaseNumber,
ocrDir,
});
// Walk legal phase edges to reach the target phase. Direct jumps are
// rejected by stateTransition's phase-graph validation.
const order = workflowType === "review" ? REVIEW_PHASE_ORDER : MAP_PHASE_ORDER;
const targetIdx = order.findIndex((p) => p.phase === phase);
if (targetIdx > 0) {
for (let i = 1; i <= targetIdx; i++) {
const step = order[i]!;
await stateTransition({
sessionId,
phase: step.phase,
phaseNumber: step.n,
ocrDir,
});
}
// If the caller passed a phase_number that differs from the
// canonical position, do a final no-op transition to that number.
if (order[targetIdx]!.n !== phaseNumber) {
await stateTransition({
sessionId,
phase: order[targetIdx]!.phase,
phaseNumber,
ocrDir,
});
}
}

// Set up the progress DB cache
Expand Down
Loading
Loading