From 6d4f2a15160d9a36d8338a7e6b57a7edbb85b610 Mon Sep 17 00:00:00 2001 From: Justin Lee Date: Thu, 18 Jun 2026 11:35:55 +0800 Subject: [PATCH] =?UTF-8?q?fix(#82):=20bound=20rebuild-index=20memory=20?= =?UTF-8?q?=E2=80=94=20strip=20delta=20cache=20+=20lazy=20eviction?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit reconstructReq now caches only the messages array (needed for delta splicing), not the full parsedBody (system 50-100KB + tools 10-50KB per entry). After projection, entries are deleted from cache via lazy reference counting — once an ancestor's last downstream consumer finishes, the entry is evicted. Peak memory drops from O(total orphans × full body) to O(active chain depth × messages-only). Co-Authored-By: Claude Opus 4.6 (1M context) --- server/rebuild-index.js | 37 ++++++++++++++++++------ test/rebuild-index.test.js | 58 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 8 deletions(-) diff --git a/server/rebuild-index.js b/server/rebuild-index.js index 7a04a5d..2852fca 100644 --- a/server/rebuild-index.js +++ b/server/rebuild-index.js @@ -103,8 +103,11 @@ async function reconstructReq(id, storage, cache, seen = new Set()) { delete parsedBody.prevId; delete parsedBody.msgOffset; - const result = { provider: 'anthropic', parsedBody }; - cache.set(id, result); + const result = { provider: 'anthropic', parsedBody, prevId: stripped.prevId || null }; + // ponytail: cache only what delta splicing needs (messages array). Full parsedBody + // (system 50-100KB + tools 10-50KB per entry) is returned to caller but NOT retained. + // Without this, 65K entries × 100KB = 4GB+ cache → OOM. + cache.set(id, { provider: 'anthropic', parsedBody: { messages } }); return result; } @@ -203,7 +206,7 @@ async function rebuildIndex({ apply = false, storage = config.storage, log = con // ── 3. Pass 1: reconstruct every orphan body; extend the explicit timeline. ── // (Only Anthropic turns survive reconstructReq; non-Anthropic/WS are skipped.) const cache = new Map(); - const recon = []; // { id, parsedBody, explicitSid } + const recon = []; // { id, parsedBody, explicitSid, prevId } let unrecoverable = 0; for (const id of orphanIds) { let r; @@ -212,9 +215,18 @@ async function rebuildIndex({ apply = false, storage = config.storage, log = con // Canonical explicit-session read (handles metadata.session_id + the // user_id-embedded formats), same primitive the live pipeline uses. const explicitSid = store.extractSessionId(r.parsedBody); - recon.push({ id, parsedBody: r.parsedBody, explicitSid }); + recon.push({ id, parsedBody: r.parsedBody, explicitSid, prevId: r.prevId }); if (explicitSid) explicitTimeline.push({ id, sid: explicitSid, cwd: store.extractCwd(r.parsedBody) }); } + + // ponytail: lazy refcount eviction — delete cache entries once their last + // downstream consumer is projected. Combined with the stripped cache in + // reconstructReq, memory stays O(active chain depth × messages-only size). + const ancestorRefs = new Map(); + for (const { prevId } of recon) { + if (prevId) ancestorRefs.set(prevId, (ancestorRefs.get(prevId) || 0) + 1); + } + explicitTimeline.sort((a, b) => (a.id < b.id ? -1 : a.id > b.id ? 1 : 0)); // Latest cwd wins per session (timeline is id-ascending) — mirrors the live // pipeline using the session's current store.sessionMeta[sid].cwd. @@ -222,7 +234,7 @@ async function rebuildIndex({ apply = false, storage = config.storage, log = con // ── 4. Pass 2: project each (Anthropic) orphan through the canonical pipeline. ── const recovered = []; // [{ id, line }] - for (const { id, parsedBody, explicitSid } of recon) { + for (const { id, parsedBody, explicitSid, prevId } of recon) { const events = await readResEvents(storage, id); // Session attribution. Explicit metadata.session_id is authoritative (every @@ -278,6 +290,15 @@ async function rebuildIndex({ apply = false, storage = config.storage, log = con ...fields, }; recovered.push({ id, line: buildIndexLine(entry) }); + + // ── Cache eviction: entries are already stripped (messages-only) by + // reconstructReq; here we delete entries no longer needed as ancestors. ── + if (!ancestorRefs.has(id)) cache.delete(id); + if (prevId && ancestorRefs.has(prevId)) { + const remaining = ancestorRefs.get(prevId) - 1; + if (remaining <= 0) { ancestorRefs.delete(prevId); cache.delete(prevId); } + else ancestorRefs.set(prevId, remaining); + } } // ── 5. Report. ── @@ -288,11 +309,11 @@ async function rebuildIndex({ apply = false, storage = config.storage, log = con if (N === 0) { log(apply ? ' nothing to add — index left unchanged.' : ' dry run — nothing to add.'); - return { refused: false, recovered: 0, total: M, unrecoverable, applied: false }; + return { refused: false, recovered: 0, total: M, unrecoverable, applied: false, cacheFinalSize: cache.size }; } if (!apply) { log(` dry run — pass --apply to write ${storage.location || 'index.ndjson'}.`); - return { refused: false, recovered: N, total: M, unrecoverable, applied: false }; + return { refused: false, recovered: N, total: M, unrecoverable, applied: false, cacheFinalSize: cache.size }; } // ── 6. Atomic merge-write (local filesystem only). ── @@ -312,7 +333,7 @@ async function rebuildIndex({ apply = false, storage = config.storage, log = con fs.writeFileSync(tmpPath, merged.join('\n') + '\n'); fs.renameSync(tmpPath, indexPath); log(` wrote ${indexPath} (${existingIds.size + N} lines). Restart the dashboard to see recovered turns.`); - return { refused: false, recovered: N, total: M, unrecoverable, applied: true }; + return { refused: false, recovered: N, total: M, unrecoverable, applied: true, cacheFinalSize: cache.size }; } module.exports = { rebuildIndex, reconstructReq, tsFromId, nearestPrecedingSession }; diff --git a/test/rebuild-index.test.js b/test/rebuild-index.test.js index bcc84ad..a5fedc1 100644 --- a/test/rebuild-index.test.js +++ b/test/rebuild-index.test.js @@ -272,6 +272,64 @@ describe('rebuild-index', () => { assert.equal(old.stopReason, 'end_turn'); }); + it('cache is bounded: 2-session × 3-turn chains evict ancestors after use (OOM fix #82)', async () => { + writeShared(); + // Session A: 3-turn delta chain (anchor → delta1 → delta2) + writeReq('2026-06-01T06-00-00-000', { + model: 'claude-x', max_tokens: 100, sysHash: SYS_HASH, + messages: [{ role: 'user', content: 'A0' }], metadata: { session_id: 'SA' }, + }); + writeRes('2026-06-01T06-00-00-000', sseEvents()); + writeReq('2026-06-01T06-00-01-000', { + model: 'claude-x', max_tokens: 100, sysHash: SYS_HASH, + prevId: '2026-06-01T06-00-00-000', msgOffset: 1, + messages: [{ role: 'assistant', content: 'a1' }, { role: 'user', content: 'A1' }], + metadata: { session_id: 'SA' }, + }); + writeRes('2026-06-01T06-00-01-000', sseEvents()); + writeReq('2026-06-01T06-00-02-000', { + model: 'claude-x', max_tokens: 100, sysHash: SYS_HASH, + prevId: '2026-06-01T06-00-01-000', msgOffset: 3, + messages: [{ role: 'assistant', content: 'a2' }, { role: 'user', content: 'A2' }], + metadata: { session_id: 'SA' }, + }); + writeRes('2026-06-01T06-00-02-000', sseEvents()); + + // Session B: 3-turn delta chain (anchor → delta1 → delta2) + writeReq('2026-06-01T07-00-00-000', { + model: 'claude-x', max_tokens: 100, sysHash: SYS_HASH, + messages: [{ role: 'user', content: 'B0' }], metadata: { session_id: 'SB' }, + }); + writeRes('2026-06-01T07-00-00-000', sseEvents()); + writeReq('2026-06-01T07-00-01-000', { + model: 'claude-x', max_tokens: 100, sysHash: SYS_HASH, + prevId: '2026-06-01T07-00-00-000', msgOffset: 1, + messages: [{ role: 'assistant', content: 'b1' }, { role: 'user', content: 'B1' }], + metadata: { session_id: 'SB' }, + }); + writeRes('2026-06-01T07-00-01-000', sseEvents()); + writeReq('2026-06-01T07-00-02-000', { + model: 'claude-x', max_tokens: 100, sysHash: SYS_HASH, + prevId: '2026-06-01T07-00-01-000', msgOffset: 3, + messages: [{ role: 'assistant', content: 'b2' }, { role: 'user', content: 'B2' }], + metadata: { session_id: 'SB' }, + }); + writeRes('2026-06-01T07-00-02-000', sseEvents()); + + const r = await rebuildIndex({ apply: true, storage, log }); + assert.equal(r.recovered, 6); + + // Correctness: all delta chains spliced correctly + const byId = new Map(readIndexIds().map(o => [o.id, o])); + assert.equal(byId.get('2026-06-01T06-00-02-000').msgCount, 5, 'SA chain: 1+2+2'); + assert.equal(byId.get('2026-06-01T07-00-02-000').msgCount, 5, 'SB chain: 1+2+2'); + + // OOM proxy: after eviction, cache should be empty — every entry was either + // projected (and deleted if no downstream refs) or consumed as an ancestor + // (and deleted when last child finished). Old code: cache.size === 6. + assert.equal(r.cacheFinalSize, 0, 'cache fully evicted after rebuild (OOM fix #82)'); + }); + it('refuses to run while a live hub holds the index', async () => { hub.readHubLock = () => ({ pid: process.pid, port: 5577 }); // our own pid is alive try {