fix(decopilot): cross-pod recovery actually fires on pod death#3393
fix(decopilot): cross-pod recovery actually fires on pod death#3393viktormarinho wants to merge 15 commits into
Conversation
Three coordinated changes make the previously-skipped pod-death-dbos-replay scenario pass end-to-end. **1. Heartbeat watcher gains a poller for hard kills.** NATS JetStream KV's TTL-based key expiry is a server-side stream cleanup pass — it does NOT emit a DEL/PURGE op on the watcher. So `kv.watch()` alone only catches *graceful* shutdowns (explicit `kv.delete()`). A SIGKILL'd pod's heartbeat key vanishes silently from the bucket with no notification, and `runRegistry.handlePodDeath` never fires. Added a parallel poller that lists `kv.keys()` every 10s and treats anything that drops out of the snapshot as a death. The watcher still handles the fast graceful-shutdown path; the poller is the safety net for hard kills. Detection latency: ~45s (TTL) + up to 10s (poll tick) ≈ 55s worst case. A Postgres advisory lock would be sub-second, but that's a deeper refactor and would deserve its own PR. **2. Heartbeat-recovered runs now reach JetStream.** `resumeOrphanedThread` in app.ts was deliberately dropping `streamBuffer` when calling `dispatchRunAndWait`, on the assumption that DBOS workflow replay would pump chunks separately. Turns out DBOS doesn't replay cross-pod (no built-in liveness scan, single shared executor_id), so the resumed run streamed to /dev/null and any /attach tails on survivor pods saw nothing. Pass `streamBuffer` so chunks land back on the per-thread subject. **3. `prepareRun` skips JetStream purge on resume.** `streamBuffer.purge()` ran unconditionally at the start of every dispatch, including the resume path. That wiped the prefix chunks the dead owner had already pumped, which survivor watchers were mid-consumption of. Gate on `!input.isResume`. Plus minor cleanups: - `PodHeartbeat.isReady()` so callers can distinguish "init resolved" from "init was a no-op because NATS wasn't ready yet". - `[PodHeartbeat] Started` and `[RunRegistry] handlePodDeath` log lines for production observability. - Compose: share one studio image across mesh-1/2/3 via a single `image: multi-pod-studio:latest` tag instead of three separate per-service builds. Eliminates `docker tag` workarounds. Verified with `tests/multi-pod/scenarios/` — pod-death scenario un-skipped, 7/7 pass in ~63s. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
🧪 BenchmarkShould we run the Virtual MCP strategy benchmark for this PR? React with 👍 to run the benchmark.
Benchmark will run on the next push after you react. |
Release OptionsSuggested: Patch ( React with an emoji to override the release type:
Current version:
|
There was a problem hiding this comment.
1 issue found across 6 files
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
Re-trigger cubic
…test A previous commit on this branch skipped streamBuffer.purge() when input.isResume was true, on the theory that the purge wiped chunks survivor /attach watchers were mid-consumption of. Empirical verification shows that theory was wrong: - NATS JetStream consumers track their own read position. A server- side purge removes messages from the stream but doesn't touch a consumer's local buffer for messages it has already pulled. The consumer just sees a sequence-number jump under the hood and keeps receiving new messages. - Meanwhile, skipping purge on resume creates a real bug: the dead pod's chunks 1..K stay in JetStream, and the resumed run publishes chunks 1..N alongside. Any /attach opened during the recovery window with deliverPolicy: "all" replays both — the user sees the assistant's response twice. So the unconditional purge in prepareRun is correct. Reverted to the original behavior with a much clearer comment about why. The pod-death scenario now opens a fresh /attach AFTER recovery has visibly started (we detect "chunk-3" on a survivor — chunks 1-2 can only come from the dead pod, chunk-3+ can only come from the resumed run) and asserts that the late watcher sees "chunk-1 " exactly once. Negative-tested by re-introducing the skip-on-resume branch: the assertion fails with chunk-1 count = 2, as expected. So the test is a real regression guard for this whole class of bug. The docstring is also rewritten now that the scenario is live — previously it described why it was skipped + a long architectural finding, now it describes what each step exercises and the three fixes that had to land. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`init()` aborts the watcher and clears the poller timer to drop stale state. On the first call, `start()` re-arms both via the `pendingDeathCallback` field that `onPodDeath()` populated before init completed. But `pendingDeathCallback` is consumed (set to null) inside that first `start()` — so on a NATS reconnect, `init()` happens again, the watcher/poller get torn down again, but `start()`'s re-arm check sees no pending callback and silently does nothing. Heartbeats keep refreshing, but every peer death goes undetected until the next pod restart. Fix: fall back to the persistent `deathCallback` field (which `onPodDeath()` always sets) when the pending-callback slot is empty. Plus two follow-ups from the same review: - Pod-death test budget bumped 180 → 220s. Sum of the three sequential pollUntil windows is 75+90+20=185s worst case; the previous budget could race the bun-test timeout in pathological cases. - dispatch-run.ts purge comment now honestly flags the remaining UX gap: /attach watchers already streaming when the owner dies receive the dead pod's prefix from their local consumer buffer plus the resumed run's full body — so they render the reply twice. The purge protects new attachers (the test asserts this), not in-flight ones. Proper fix is a "reset" sentinel on the subject so all consumers flush; left as a follow-up. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…th-recovery # Conflicts: # apps/mesh/src/api/app.ts # tests/multi-pod/scenarios/pod-death-dbos-replay.test.ts
SIGKILL'd pods leave `dbos.workflow_status` rows in PENDING. When any pod sharing the same executor_id boots later (e.g. via the restoreStoppedPods hook), DBOS's launch-time recovery re-executes those workflows — which publishes chunks to a thread's JetStream subject long after the originating scenario has ended. Subsequent scenarios then see phantom runs they didn't initiate; pod-death-dbos-replay's chunk-1-count-1 assertion is the specific failure mode (the ghost run duplicates the prefix). Hook now DELETEs `dbos.workflow_status` (CASCADEs to operation_outputs, workflow_inputs, notifications, workflow_events) and `dbos.workflow_queue` in beforeAll, before restarting any stopped pods. Order matters: a dead pod that boots before the cleanup gets a window to start recovery against the stale rows. Verified with two back-to-back full-suite runs against the same cluster session — both 7/7 pass, where previously the second run could trip the duplicate-chunk assertion in pod-death-dbos-replay. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
NATS KV with bucket TTL gave us ~10-55s detection windows depending on
the failure mode — TTL expiry doesn't fire watcher events, so SIGKILL'd
pods only got detected by a polling-fallback layered on top. Three
moving parts (refresh timer + watch loop + poll diff) for what's
fundamentally a liveness check.
Postgres advisory locks collapse all of that:
- Each pod holds `pg_advisory_lock(hashtext(podId))` on a dedicated
long-lived `pg.Client`. The lock's session lifetime IS the
heartbeat — when the process dies for any reason (SIGKILL, OOM,
network drop), Postgres releases the lock immediately via TCP
RST/FIN. Sub-second detection on the common case.
- Survivors poll a `mesh_pods` registry every 5s and call
`pg_try_advisory_lock(hashtext(peerId))` for each peer. Success
means nobody holds it = peer is dead → fire `handlePodDeath`.
- TCP keepalives are tuned on the dedicated connection (10s initial
delay) so a network partition without RST is detected within ~25s
rather than the OS default 2h.
Files:
- `apps/mesh/src/core/pod-heartbeat.ts` (new) — `PgPodHeartbeat`
implementing the same `PodHeartbeat` interface; nothing downstream
of `app.ts` had to change.
- `apps/mesh/src/nats/pod-heartbeat.ts` (deleted).
- `apps/mesh/migrations/078-mesh-pods.ts` — single-column `mesh_pods`
registry. Each pod INSERTs on `start()`, DELETEs on `stop()`, peers
DELETE the row when they detect the death.
- `apps/mesh/src/api/app.ts` — wires the new heartbeat using
`database.pool` + `getSettings().databaseUrl`; `podHeartbeat` is no
longer nullable since it doesn't depend on NATS being up.
- `tests/multi-pod/docker-compose.yml` — `migrate` service shares the
same `multi-pod-studio:latest` tag as the mesh-N services, so a
rebuild of mesh-1's image automatically picks up new migrations
for migrate too. Previous setup tagged migrate's image separately
and silently drifted from mesh.
Result: pod-death scenario passes in 15s (was 53s with NATS heartbeat),
full suite 27s (was 63s). 7/7 pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The chunk-3 gate I used to detect "resumed pump is flowing" was racy. Between seeing chunk-2 (the kill trigger) and SIGKILL actually taking effect there's ~400-1000ms of latency: a DB roundtrip to read run_owner_pod, plus `docker compose kill`, plus SIGKILL propagation. At 500ms-per-chunk on the mock, that's enough for the dead pod to publish chunk-3 and sometimes chunk-4 before its process actually dies. So a survivor seeing chunk-3 might be looking at the dead pod's publish, not the resumed run's — opening the late watcher before prepareRun's purge fires, JetStream still holding the dead pod's prefix, and the duplicate-detection assertion failing on chunk-1=2 for the wrong reason. A real purge regression would be indistinguishable from this race. chunk-10 is 5s past the mock's start time, well clear of the kill window. Provably can only come from the resumed pump (the dead pod SIGKILL'd 3s+ earlier). Test runtime unchanged (~15s) because chunk-10 arrives a couple seconds before chunk-20 anyway. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…io_pods
Four review fixes, plus a rename:
**1. Advisory lock leak via pool queries.**
The probe was `pg_try_advisory_lock` on one pool client and
`pg_advisory_unlock` on (probably) a different pool client.
Session-scoped unlocks on a foreign session silently return false; the
session lock stayed on the try_lock's pool client forever. Switched
the probe to `pg_try_advisory_xact_lock`, which auto-releases at the
implicit single-statement transaction commit — no explicit unlock,
no leak, safe across pool clients.
**2. Startup race between row visibility and lock acquisition.**
`start()` previously fired the `INSERT INTO mesh_pods` and the
`pg_advisory_lock` independently and unawaited. A peer poll that
landed between row-visible and lock-held would probe, win the lock,
and falsely declare this freshly-booted pod dead. Now the lock is
acquired first, THEN the row is inserted, so any probe either finds
no row (nothing to probe) or finds row + held lock (alive).
`start()` returns a Promise so the caller can sequence on it.
**3. Stale path in migration comment.**
Migration referenced the deleted `apps/mesh/src/nats/pod-heartbeat.ts`;
updated to `apps/mesh/src/core/pod-heartbeat.ts`.
**4. Test docstring still described NATS-KV.**
"What had to be fixed" block, the 70s/45s pollUntil comments, and step
5 in the scenario walkthrough all talked about KV TTL and `kv.keys()`
diff. Rewrote to describe the PG advisory-lock implementation.
**Rename:** `mesh_pods` → `studio_pods`. The migration table is
namespaced to studio rather than abstract-mesh-internal naming. Also
folds in two pending refinements from earlier in this branch that
were uncommitted:
- `streamBuffer.purge()` now returns `Promise<void>`, and
`prepareRun` awaits it. Previously fire-and-forget; the dispatch's
first chunk could land in JetStream before purge completed,
racing a late /stream's "all" replay window.
- `mock-ai` stamps every chunk with `t<callStartedAt>` — the
wall-clock time the call began. The pod-death scenario uses this
to deterministically distinguish dead-pod chunks (call started
before kill) from resumed-pump chunks (call started after kill),
replacing the previous chunk-N timing heuristic.
Verified: full suite passes 7/7 twice back-to-back against the same
cluster (the prior version flaked on the second pass via a
double-recovery race).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
3 issues found across 12 files (changes from recent commits).
Tip: Review your code locally with the cubic CLI to iterate faster.
Re-trigger cubic
…ELETE
Two review-flagged issues, both real:
**1. Heartbeat client ignored DB SSL setting.**
The main pg pool was constructed with `ssl: getSsl()` (reads
`DATABASE_PG_SSL`), but the dedicated long-lived `Client` for the
liveness lock omitted it. On TLS-required Postgres deployments (RDS
with `rds.force_ssl=on`, etc.), the heartbeat client would fail to
connect, `init()` would throw, and pod-death recovery would be
silently disabled. Exported `getSsl()` from the database module,
added a required `ssl` field to `PgPodHeartbeatDeps`, and wired it
through from `createApp`.
**2. Probe lock released before peer-row DELETE.**
`pg_try_advisory_xact_lock` ran in its own implicit transaction, so
the xact-scoped lock was already released by the time the subsequent
`DELETE FROM studio_pods` executed. In the window between probe-commit
and DELETE, a replacement pod with the same POD_NAME could:
- boot, grab a fresh session lock on the same key, and INSERT its
row (no-op-or-new under ON CONFLICT)
- then have its row deleted by the surviving probe, leaving the
replacement unregistered (invisible to other peers' future polls)
Fixed by wrapping probe + DELETE in a single explicit BEGIN/COMMIT via
a new `runInTxn` helper. The xact lock now lives through the DELETE,
and any replacement pod's `pg_advisory_lock` blocks on the survivor's
transaction until COMMIT — at which point the row is gone, the
replacement INSERTs cleanly, and registry is consistent.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The old implementation UPDATEd any row with status='in_progress'
without keying on the previous run_owner_pod. Two survivors fired
handlePodDeath for the same dead peer, both ran claimOrphanedRun
concurrently, both got `claimed = true` (their UPDATEs found
matching rows), and both proceeded to dispatch a resumed run. Per-
thread JetStream subject ended up with two copies of every chunk,
and the pod-death test's chunk-1-count-1 assertion failed.
Fixed by adding `expectedCurrentOwner: string | null` and CASing on
it via `IS NOT DISTINCT FROM` (so NULL stays comparable). Now:
- `handlePodDeath` passes `deadPodId`. The first survivor flips
the column; the second's WHERE clause no longer matches, and
`claimed = false`.
- `recoverOrphanedRuns` (startup sweep) passes the row's current
`run_owner_pod` as read from the list query. Same-pod replays
still succeed (registry-lost recovery) because the column value
matches the expected.
Added a unit test covering the concurrent-survivor race:
`claimOrphanedRun` with the same `expectedCurrentOwner` from two
different `podId`s — first returns true, second returns false, row
ends up owned by the first. 22/22 unit tests pass.
Local multi-pod verification: pod-death scenario no longer shows
both survivors running `[decopilot:stream] resume` for the same
thread. (Re-runs still expose a different timing-related flake — a
chunk-20 pollUntil times out occasionally; left for separate
investigation.)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
1 issue found across 4 files (changes from recent commits).
Tip: Review your code locally with the cubic CLI to iterate faster.
Re-trigger cubic
listOrphanedRuns previously returned every in_progress row regardless of owner, intended for a registry that was empty at boot. With multiple pods alive, a freshly-restarted pod's 10s sweep would steal a thread another pod had just claimed: the CAS used the read-back owner as expectedCurrentOwner, which still matched because the original owner hadn't re-touched the row. Two dispatches landed on the same per-thread JetStream subject and CI's pod-death scenario timed out because the test killed the wrong pod. - listOrphanedRuns filters to threads whose owner is gone (NULL, this pod's previous incarnation, or absent from studio_pods) - recoverOrphanedRuns skips threads we're already running so a back-to-back handlePodDeath claim doesn't get re-dispatched by the startup timer that fires moments later - multi-pod hook clears stale in_progress rows between scenarios so rerun bursts don't inherit ghost recoveries Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
stop() used to DELETE the studio_pods row before ending the liveness connection, and stopAll() nulled run_owner_pod on all in-flight runs. Together that left graceful shutdown invisible to survivor polls: peers had no probe target, handlePodDeath never fired, and the orphan signal in the threads table was lost — runs sat in_progress until a future startup sweep happened by. - pod-heartbeat.stop(): drop the DELETE; just end the liveness client. The next survivor poll catches the released advisory lock, fires handlePodDeath, and cleans up our registry row in the same probe transaction. SIGKILL and SIGTERM now share one detection path. - run-registry.stopAll(): keep run_owner_pod set so survivors locate our runs via listOrphanedRunsByPod(thisPodId). Drop orphanRunsByPod entirely — no remaining caller. - app.ts shutdown: abort controllers before releasing the lock so the resumed dispatch doesn't briefly race our still-aborting one on the per-thread JetStream subject. Regression tests: - tests/multi-pod/scenarios/graceful-shutdown-recovery: full SIGTERM-mid-stream → survivor-completes-the-run path. - run-registry.test: isRunning guard in recoverOrphanedRuns (the startup-vs-handlePodDeath same-pod race fixed in 0b8ec3). - run-registry.test stopAll: asserts no DB ownership writes during shutdown. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously: PgPodHeartbeat (Postgres advisory locks, 5s peer poll), studio_pods registry, claimOrphanedRun CAS, listOrphanedRuns(ByPod), runRegistry.handlePodDeath / recoverOrphanedRuns, runOwnerPod-based recovery dispatch in app.ts. Net: ~500 LOC of custom Conductor-light. DBOS already provides the workflow durability we need: every threadGateWorkflow row in `dbos.workflow_status` carries the executor_id of the pod that started it, and DBOS's launch-time recovery re-runs the dispatchRunAndWaitStep from scratch when the same executor_id boots again. Wiring `executorID = POD_NAME` in index.ts pins workflows to a stable pod identity that K8s StatefulSet preserves across restarts, so a SIGKILL → restart cycle is the entire recovery path. What was kept: RunRegistry (in-memory dispatch state + abort controllers), thread-gate-workflow (the DBOS surface), JetStream pump, prepareRun's unconditional purge (still the regression guard against duplicated chunks on replay). Trade-off: recovery time is now bounded by pod restart (~30s in our docker test, similar in K8s) instead of the prior ~5s heartbeat detection. Acceptable for chat UX, and removes an entire class of race conditions the reviewers kept finding. Requires StatefulSet (or any deploy with stable pod IDs) — plain Deployment with random pod names breaks DBOS's executor_id-based recovery. pod-death-dbos-replay test rewritten to assert this exact path: kill the owner, restart it, wait for a chunk with mock-ai timestamp > killTime (proves replay ran), confirm chunk-1 count == 1 (proves purge still works). graceful-shutdown-recovery scenario deleted — SIGTERM and SIGKILL share the recovery path now. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The prior CAS allowed claim only when status was non-in_progress OR run_owner_pod was null OR matched the current pod. When DBOS recovers a workflow on a different executor than the one that originally took the START path, the row reads (status=in_progress, run_owner_pod= dead-pod) and the CAS rejected the legitimate replay — RunClaimError stalled recovery indefinitely. The thread-gate DBOS queue (concurrency=1 per threadId) already serialises dispatches per thread, so the CAS isn't load-bearing for preventing concurrent runs. Drop it entirely; let DBOS own that guarantee. Verified end-to-end with pod-death-dbos-replay (3/3 stable, ~17-19s recovery from kill to chunk-20 delivered) and the full multi-pod suite (7/7). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
You're iterating quickly on this pull request. To help protect your rate limits, cubic has paused automatic reviews on new pushes for now—when you're ready for another review, comment |
POD_NAME is the DBOS executorID. DBOS recovery only replays workflows whose executor_id matches the booting process, so a value that changes per restart (the prior crypto.randomUUID fallback) silently strands in-flight workflows the moment a pod dies. The failure mode is invisible: nothing crashes, runs just sit PENDING forever until manual intervention. In production (NODE_ENV=production), throw on startup if POD_NAME is missing or empty so the misconfiguration surfaces at boot with an actionable error message pointing at the K8s downward API or a stable constant. Dev and test still get the random UUID fallback — single-process recovery works fine against the same UUID across in-process restarts, and dev shouldn't require operator wiring. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
What is this contribution about?
Makes the previously-skipped `pod-death-dbos-replay` scenario pass end-to-end. Three coordinated fixes, each independently load-bearing — the recovery path was broken in three places at once.
1. Heartbeat watcher gains a polling fallback for hard kills. NATS KV's TTL-based key expiry is server-side cleanup and does NOT emit DEL/PURGE on the watcher — `kv.watch()` only catches graceful `kv.delete()`. A SIGKILL'd pod's heartbeat vanished silently and `handlePodDeath` never fired. Added a parallel `kv.keys()` poller (~10s tick) that diffs snapshots; the watcher still handles graceful shutdowns, the poller catches the rest. Detection latency: ~55s worst case.
2. Heartbeat-recovered runs now pump to JetStream. `resumeOrphanedThread` was deliberately dropping `streamBuffer` when calling `dispatchRunAndWait`, on the assumption that DBOS workflow replay would carry chunks separately. DBOS doesn't replay cross-pod, so the resumed run streamed to /dev/null and survivor `/attach` tails saw nothing. Pass `streamBuffer` through.
3. `prepareRun` skips JetStream purge on resume. `streamBuffer.purge()` ran unconditionally at every dispatch start, including the resume path — wiping the prefix chunks survivor watchers were mid-consumption of. Gate on `!input.isResume`.
Future cleanup left for follow-up: replace the NATS-KV liveness path with Postgres advisory locks (sub-second detection vs ~55s) — already discussed, deserves its own PR.
How to Test
```bash
./tests/multi-pod/run.sh
```
Expected: `7/7 pass`. The `pod-death-dbos-replay` scenario was `.skip`ped before this PR; it's now un-skipped and exercises the full recovery loop (POST → run flowing → SIGKILL owner → poller detects → recovery on survivor → /attach sees final chunk).
Migration Notes
The mesh image is now tagged `multi-pod-studio:latest` and shared across all three mesh services in test compose, instead of being built three times under three different tags. No production impact — only affects test infrastructure.
Review Checklist
Summary by cubic
Switches decopilot recovery to
DBOSlaunch-time replay by pinning workflows toPOD_NAMEviaexecutorID. Killing and then restarting a pod cleanly replays the run;prepareRunnow awaits JetStream purge so late/streamsees a single reply.Refactors
handlePodDeath); recovery is DBOS-only on pod restart.DBOSexecutorID = POD_NAME; dropped pod-bound CAS inclaimRunStartso DBOS replay can claim rows.streamBuffer.purge()async and awaited inprepareRun; kept unconditional purge to prevent duplicate replies after replay.RunRegistry.stopAll()only aborts in‑memory state; no DB ownership writes.Migration
POD_NAME(stable pod identity) or the process exits; dev/test fall back to a random UUID.multi-pod-studio:latestimage; hooks now wipedbos.workflow_*and reset stalethreadsbetween scenarios.Written for commit c6099d5. Summary will update on new commits. Review in cubic