Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
733331a
fix(decopilot): cross-pod recovery actually fires on pod death
viktormarinho May 17, 2026
b27293e
fix(decopilot): keep purge on resume, guard with duplicate-detection …
viktormarinho May 18, 2026
3b61478
fix(heartbeat): re-arm death detection on NATS reconnect
viktormarinho May 18, 2026
0c01905
Merge remote-tracking branch 'origin/main' into viktormarinho/pod-dea…
viktormarinho May 19, 2026
f8bc4dd
test(multi-pod): clear DBOS workflow state between scenarios
viktormarinho May 19, 2026
f2ab135
refactor(heartbeat): swap NATS KV for Postgres advisory locks
viktormarinho May 19, 2026
fa4898a
test(pod-death): gate late watcher on chunk-10 to avoid kill-window race
viktormarinho May 19, 2026
5c800ed
fix(heartbeat): xact-scoped probe lock, ordered start, rename to stud…
viktormarinho May 20, 2026
05db218
fix(heartbeat): respect DB SSL config + hold xact lock through peer D…
viktormarinho May 20, 2026
38641d4
fix(claimOrphanedRun): real CAS on previous owner
viktormarinho May 20, 2026
0b8ec39
fix(recovery): scope startup orphan sweep to dead-owner threads
viktormarinho May 20, 2026
f4bfb1f
fix(heartbeat): route graceful shutdown through the same death signal
viktormarinho May 20, 2026
7d6af94
refactor: delete heartbeat + ownership, trust DBOS for recovery
viktormarinho May 20, 2026
9343540
fix(claimRunStart): drop pod-bound CAS so DBOS replay can claim
viktormarinho May 20, 2026
c6099d5
feat(settings): refuse to boot in production without POD_NAME
viktormarinho May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 20 additions & 143 deletions apps/mesh/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ import { NatsStreamBuffer } from "./routes/decopilot/nats-stream-buffer";
import { RunRegistry } from "./routes/decopilot/run-registry";
import type { RunReactorDeps } from "./routes/decopilot/run-reactor";
import { SqlThreadStorage } from "../storage/threads";
import type { Thread } from "../storage/types";
import { registerMonitoringRetentionWorkflow } from "../monitoring/dbos-retention-workflow";
import { cleanupOldMonitoringFiles } from "../monitoring/ndjson-retention";
import { getLogsDir, getTracesDir, getMetricsDir } from "../monitoring/schema";
Expand All @@ -114,12 +113,6 @@ import {
} from "../dispatch-queue";
import { DBOS } from "@dbos-inc/dbos-sdk";
import { dispatchRunAndWait } from "./routes/decopilot/dispatch-run";
import {
PersistedRunConfigSchema,
toModelsConfig,
} from "./routes/decopilot/run-config";
import { getPodId } from "../core/pod-identity";
import { NatsPodHeartbeat } from "../nats/pod-heartbeat";
import { createAutomationsStorage } from "../storage/automations";
import { KyselyKVStorage } from "../storage/kv";
import { KyselyTriggerCallbackTokenStorage } from "../storage/trigger-callback-tokens";
Expand Down Expand Up @@ -748,7 +741,7 @@ export async function createApp(options: CreateAppOptions = {}) {
})();
},
createTailStream: async () => null,
purge: () => {},
purge: async () => {},
teardown: () => {},
};
} else {
Expand Down Expand Up @@ -813,8 +806,7 @@ export async function createApp(options: CreateAppOptions = {}) {
sseHub,
};

const POD_ID = getPodId();
const runRegistry = new RunRegistry(cancelReactorDeps, POD_ID);
const runRegistry = new RunRegistry(cancelReactorDeps);

cancelBroadcast
.start((taskId) => {
Expand All @@ -839,38 +831,11 @@ export async function createApp(options: CreateAppOptions = {}) {
);
});

// Per-pod heartbeat via NATS KV (only when NATS is available)
let podHeartbeat: NatsPodHeartbeat | null = null;
if (natsProvider) {
podHeartbeat = new NatsPodHeartbeat({
getConnection: () => natsProvider!.getConnection(),
getJetStream: () => natsProvider!.getJetStream(),
});

// Attempt immediate init (may no-op if NATS not ready)
podHeartbeat
.init()
.then(() => {
podHeartbeat!.start(POD_ID);
})
.catch(() => {});

// Re-init when NATS connects
natsProvider.onReady(() => {
podHeartbeat!
.init()
.then(() => {
podHeartbeat!.start(POD_ID);
})
.catch((err: unknown) => {
console.error("[PodHeartbeat] Deferred init failed:", err);
});
});
}

currentDecopilotCleanup = async () => {
// Delete KV key first → watcher fires on other pods → immediate handoff
await podHeartbeat?.stop();
// Abort in-flight runs so streamText loops stop cleanly. DBOS's
// launch-time recovery picks up the workflows on the next start of
// this same pod (executorID = POD_NAME, stable on K8s StatefulSet),
// so we don't need any in-process death detection here.
await runRegistry.stopAll();
runRegistry.dispose();
cancelBroadcast.stop().catch(() => {});
Expand Down Expand Up @@ -1219,108 +1184,20 @@ export async function createApp(options: CreateAppOptions = {}) {
).setAutomationEventDispatcher(automationEventDispatcher);
}

// ============================================================================
// Crash Recovery — resume orphaned automation runs after rolling deploy
// ============================================================================

/** Shared resume function for both startup recovery and pod-death watcher. */
const resumeOrphanedThread = async (thread: Thread) => {
const parsed = PersistedRunConfigSchema.safeParse(thread.run_config);
if (!parsed.success) {
console.warn(
`[recovery] Invalid run_config for ${thread.id}, force-failing`,
);
await threadStorage.forceFailIfInProgress(
thread.id,
thread.organization_id,
);
return;
}
const config = parsed.data;

// Build context for the original user
const resumeCtx = await automationContextFactory(
thread.organization_id,
thread.created_by,
);
if (!resumeCtx) {
console.warn(
`[recovery] Cannot build context for ${thread.id}, force-failing`,
);
await threadStorage.forceFailIfInProgress(
thread.id,
thread.organization_id,
);
return;
}

// Audit trail: record that this run was auto-resumed
const now = new Date().toISOString();
await threadStorage.saveMessages(
[
{
id: crypto.randomUUID(),
thread_id: thread.id,
role: "system",
parts: [
{
type: "text",
text: "Run resumed automatically after infrastructure restart.",
},
],
metadata: undefined,
created_at: now,
updated_at: now,
},
],
thread.organization_id,
);

// Pod-death recovery: a different pod's run was claimed by us. Drain
// synchronously to know when the run completes server-side. We
// deliberately don't pass a streamBuffer here — this background
// recovery is the safety net for threads no DBOS replay or attached
// client picks up; clients reconnecting via /stream see the run via
// the workflow's own JetStream pump on the pod that DBOS replayed it
// onto.
await dispatchRunAndWait(
{
messages: [],
models: toModelsConfig(config.models),
agent: config.agent,
temperature: config.temperature,
toolApprovalLevel: config.toolApprovalLevel,
mode: config.mode,
organizationId: thread.organization_id,
userId: thread.created_by,
taskId: thread.id,
windowSize: config.windowSize,
isResume: true,
},
resumeCtx,
{ runRegistry, cancelBroadcast },
);
};

// Wire pod death watcher → orphan recovery
if (podHeartbeat) {
podHeartbeat.onPodDeath((deadPodId) => {
runRegistry
.handlePodDeath(deadPodId, resumeOrphanedThread, cancelBroadcast)
.catch((err) => {
console.error(
`[Decopilot] Pod death recovery failed for ${deadPodId}:`,
err,
);
});
});
}

setTimeout(() => {
runRegistry.recoverOrphanedRuns(resumeOrphanedThread).catch((err) => {
console.error("[recovery] Orphan recovery failed:", err);
});
}, 10_000); // 10s grace for rolling deploys
// Crash recovery is delegated to DBOS. A pod that dies mid-stream
// leaves its threadGateWorkflow row in `dbos.workflow_status` with
// status PENDING and executor_id = POD_NAME. When K8s restarts the
// pod with the same name (StatefulSet), DBOS launch-time recovery
// re-runs the `dispatchRunAndWait` step from scratch — streamText
// produces chunks again into the same per-thread JetStream subject,
// and SSE tails on any pod see the resumed stream. No cross-pod
// detection or claim CAS lives here on purpose.
//
// Pre-DBOS: a custom Postgres-advisory-lock heartbeat detected dead
// peers in ~5s and CAS-stole their runs. Deleted in favor of trusting
// DBOS recovery — fewer races, single source of truth, and the
// bounded-by-pod-restart-time recovery window is acceptable for chat
// UX. See thread-gate-workflow.ts for the durable workflow surface.

// NDJSON monitoring retention cleanup runs as a DBOS scheduled workflow
// (see `initDbos` below). Kick off a single eager sweep at boot so a fresh
Expand Down
20 changes: 18 additions & 2 deletions apps/mesh/src/api/routes/decopilot/dispatch-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,24 @@ async function prepareRun(
}
}

// Purge stale buffered chunks from any previous run on this thread
streamBuffer?.purge(mem.thread.id);
// Purge stale buffered chunks from any previous run on this thread.
// Always purges — including on resume — because the resumed run
// re-invokes the LLM from scratch and produces chunk-1..chunk-N
// again. Without this, any /stream opened after recovery starts
// would see the assistant's reply twice (deliverPolicy:"all"
// replays the dead-pod prefix and then the resumed run's full
// body). Regression guard:
// tests/multi-pod/scenarios/pod-death-dbos-replay.test.ts.
//
// ⚠️ Known UX gap, not addressed here: a /stream that was already
// tailing when the owner pod died will still receive the dead
// pod's prefix from its local consumer buffer (the purge is
// server-side and doesn't reach into already-delivered messages)
// AND the resumed run's full body afterwards — so it sees the
// reply rendered twice. A proper fix would publish a "reset"
// sentinel to the subject before the resume pump starts so all
// consumers flush their UI buffer; left as a follow-up.
await streamBuffer?.purge(mem.thread.id);

// Split system messages from user message
const systemMessages = input.messages.filter((m) => m.role === "system");
Expand Down
12 changes: 8 additions & 4 deletions apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,15 @@ export class NatsStreamBuffer implements StreamBuffer {
});
}

purge(taskId: string): void {
async purge(taskId: string): Promise<void> {
if (!this.jsm) return;
this.jsm.streams
.purge(STREAM_NAME, { filter: streamSubject(taskId) })
.catch(() => {});
try {
await this.jsm.streams.purge(STREAM_NAME, {
filter: streamSubject(taskId),
});
} catch {
// Best-effort cleanup; never propagate to the caller.
}
}

teardown(): void {
Expand Down
4 changes: 0 additions & 4 deletions apps/mesh/src/api/routes/decopilot/run-reactor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ function makeDeps(): RunReactorDeps {
listMessages: mock(() => Promise.resolve({ messages: [], total: 0 })),
listByTriggerIds: mock(() => Promise.resolve({ threads: [], total: 0 })),
forceFailIfInProgress: mock(() => Promise.resolve(true)),
claimOrphanedRun: mock(() => Promise.resolve(false)),
claimRunStart: mock(() => Promise.resolve(true)),
listOrphanedRuns: mock(() => Promise.resolve([])),
listOrphanedRunsByPod: mock(() => Promise.resolve([])),
orphanRunsByPod: mock(() => Promise.resolve([])),
addInflightAsyncJob: mock(() => Promise.resolve()),
findInflightAsyncJob: mock(() => Promise.resolve(null)),
removeInflightAsyncJob: mock(() => Promise.resolve()),
Expand Down
4 changes: 2 additions & 2 deletions apps/mesh/src/api/routes/decopilot/run-reactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async function handleTerminalStatus(
run_config: null,
run_started_at: null,
});
streamBuffer.purge(taskId);
void streamBuffer.purge(taskId);
sseHub.emit(
orgId,
createDecopilotThreadStatusEvent(taskId, status, {
Expand Down Expand Up @@ -168,7 +168,7 @@ async function react(event: RunEvent, deps: RunReactorDeps): Promise<void> {
run_started_at: null,
});
}
streamBuffer.purge(event.taskId);
void streamBuffer.purge(event.taskId);
const failedThread = await storage.get(event.taskId, event.orgId);
sseHub.emit(
event.orgId,
Expand Down
Loading
Loading