Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions server/rebuild-index.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@ async function reconstructReq(id, storage, cache, seen = new Set()) {
delete parsedBody.prevId;
delete parsedBody.msgOffset;

const result = { provider: 'anthropic', parsedBody };
cache.set(id, result);
const result = { provider: 'anthropic', parsedBody, prevId: stripped.prevId || null };
// ponytail: cache only what delta splicing needs (messages array). Full parsedBody
// (system 50-100KB + tools 10-50KB per entry) is returned to caller but NOT retained.
// Without this, 65K entries × 100KB = 4GB+ cache → OOM.
cache.set(id, { provider: 'anthropic', parsedBody: { messages } });
return result;
}

Expand Down Expand Up @@ -203,7 +206,7 @@ async function rebuildIndex({ apply = false, storage = config.storage, log = con
// ── 3. Pass 1: reconstruct every orphan body; extend the explicit timeline. ──
// (Only Anthropic turns survive reconstructReq; non-Anthropic/WS are skipped.)
const cache = new Map();
const recon = []; // { id, parsedBody, explicitSid }
const recon = []; // { id, parsedBody, explicitSid, prevId }
let unrecoverable = 0;
for (const id of orphanIds) {
let r;
Expand All @@ -212,17 +215,26 @@ async function rebuildIndex({ apply = false, storage = config.storage, log = con
// Canonical explicit-session read (handles metadata.session_id + the
// user_id-embedded formats), same primitive the live pipeline uses.
const explicitSid = store.extractSessionId(r.parsedBody);
recon.push({ id, parsedBody: r.parsedBody, explicitSid });
recon.push({ id, parsedBody: r.parsedBody, explicitSid, prevId: r.prevId });
if (explicitSid) explicitTimeline.push({ id, sid: explicitSid, cwd: store.extractCwd(r.parsedBody) });
}

// ponytail: lazy refcount eviction — delete cache entries once their last
// downstream consumer is projected. Combined with the stripped cache in
// reconstructReq, memory stays O(active chain depth × messages-only size).
const ancestorRefs = new Map();
for (const { prevId } of recon) {
if (prevId) ancestorRefs.set(prevId, (ancestorRefs.get(prevId) || 0) + 1);
}

explicitTimeline.sort((a, b) => (a.id < b.id ? -1 : a.id > b.id ? 1 : 0));
// Latest cwd wins per session (timeline is id-ascending) — mirrors the live
// pipeline using the session's current store.sessionMeta[sid].cwd.
for (const e of explicitTimeline) if (e.cwd) sessionCwd.set(e.sid, e.cwd);

// ── 4. Pass 2: project each (Anthropic) orphan through the canonical pipeline. ──
const recovered = []; // [{ id, line }]
for (const { id, parsedBody, explicitSid } of recon) {
for (const { id, parsedBody, explicitSid, prevId } of recon) {
const events = await readResEvents(storage, id);

// Session attribution. Explicit metadata.session_id is authoritative (every
Expand Down Expand Up @@ -278,6 +290,15 @@ async function rebuildIndex({ apply = false, storage = config.storage, log = con
...fields,
};
recovered.push({ id, line: buildIndexLine(entry) });

// ── Cache eviction: entries are already stripped (messages-only) by
// reconstructReq; here we delete entries no longer needed as ancestors. ──
if (!ancestorRefs.has(id)) cache.delete(id);
if (prevId && ancestorRefs.has(prevId)) {
const remaining = ancestorRefs.get(prevId) - 1;
if (remaining <= 0) { ancestorRefs.delete(prevId); cache.delete(prevId); }
else ancestorRefs.set(prevId, remaining);
}
}

// ── 5. Report. ──
Expand All @@ -288,11 +309,11 @@ async function rebuildIndex({ apply = false, storage = config.storage, log = con

if (N === 0) {
log(apply ? ' nothing to add — index left unchanged.' : ' dry run — nothing to add.');
return { refused: false, recovered: 0, total: M, unrecoverable, applied: false };
return { refused: false, recovered: 0, total: M, unrecoverable, applied: false, cacheFinalSize: cache.size };
}
if (!apply) {
log(` dry run — pass --apply to write ${storage.location || 'index.ndjson'}.`);
return { refused: false, recovered: N, total: M, unrecoverable, applied: false };
return { refused: false, recovered: N, total: M, unrecoverable, applied: false, cacheFinalSize: cache.size };
}

// ── 6. Atomic merge-write (local filesystem only). ──
Expand All @@ -312,7 +333,7 @@ async function rebuildIndex({ apply = false, storage = config.storage, log = con
fs.writeFileSync(tmpPath, merged.join('\n') + '\n');
fs.renameSync(tmpPath, indexPath);
log(` wrote ${indexPath} (${existingIds.size + N} lines). Restart the dashboard to see recovered turns.`);
return { refused: false, recovered: N, total: M, unrecoverable, applied: true };
return { refused: false, recovered: N, total: M, unrecoverable, applied: true, cacheFinalSize: cache.size };
}

module.exports = { rebuildIndex, reconstructReq, tsFromId, nearestPrecedingSession };
58 changes: 58 additions & 0 deletions test/rebuild-index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,64 @@ describe('rebuild-index', () => {
assert.equal(old.stopReason, 'end_turn');
});

it('cache is bounded: 2-session × 3-turn chains evict ancestors after use (OOM fix #82)', async () => {
writeShared();
// Session A: 3-turn delta chain (anchor → delta1 → delta2)
writeReq('2026-06-01T06-00-00-000', {
model: 'claude-x', max_tokens: 100, sysHash: SYS_HASH,
messages: [{ role: 'user', content: 'A0' }], metadata: { session_id: 'SA' },
});
writeRes('2026-06-01T06-00-00-000', sseEvents());
writeReq('2026-06-01T06-00-01-000', {
model: 'claude-x', max_tokens: 100, sysHash: SYS_HASH,
prevId: '2026-06-01T06-00-00-000', msgOffset: 1,
messages: [{ role: 'assistant', content: 'a1' }, { role: 'user', content: 'A1' }],
metadata: { session_id: 'SA' },
});
writeRes('2026-06-01T06-00-01-000', sseEvents());
writeReq('2026-06-01T06-00-02-000', {
model: 'claude-x', max_tokens: 100, sysHash: SYS_HASH,
prevId: '2026-06-01T06-00-01-000', msgOffset: 3,
messages: [{ role: 'assistant', content: 'a2' }, { role: 'user', content: 'A2' }],
metadata: { session_id: 'SA' },
});
writeRes('2026-06-01T06-00-02-000', sseEvents());

// Session B: 3-turn delta chain (anchor → delta1 → delta2)
writeReq('2026-06-01T07-00-00-000', {
model: 'claude-x', max_tokens: 100, sysHash: SYS_HASH,
messages: [{ role: 'user', content: 'B0' }], metadata: { session_id: 'SB' },
});
writeRes('2026-06-01T07-00-00-000', sseEvents());
writeReq('2026-06-01T07-00-01-000', {
model: 'claude-x', max_tokens: 100, sysHash: SYS_HASH,
prevId: '2026-06-01T07-00-00-000', msgOffset: 1,
messages: [{ role: 'assistant', content: 'b1' }, { role: 'user', content: 'B1' }],
metadata: { session_id: 'SB' },
});
writeRes('2026-06-01T07-00-01-000', sseEvents());
writeReq('2026-06-01T07-00-02-000', {
model: 'claude-x', max_tokens: 100, sysHash: SYS_HASH,
prevId: '2026-06-01T07-00-01-000', msgOffset: 3,
messages: [{ role: 'assistant', content: 'b2' }, { role: 'user', content: 'B2' }],
metadata: { session_id: 'SB' },
});
writeRes('2026-06-01T07-00-02-000', sseEvents());

const r = await rebuildIndex({ apply: true, storage, log });
assert.equal(r.recovered, 6);

// Correctness: all delta chains spliced correctly
const byId = new Map(readIndexIds().map(o => [o.id, o]));
assert.equal(byId.get('2026-06-01T06-00-02-000').msgCount, 5, 'SA chain: 1+2+2');
assert.equal(byId.get('2026-06-01T07-00-02-000').msgCount, 5, 'SB chain: 1+2+2');

// OOM proxy: after eviction, cache should be empty — every entry was either
// projected (and deleted if no downstream refs) or consumed as an ancestor
// (and deleted when last child finished). Old code: cache.size === 6.
assert.equal(r.cacheFinalSize, 0, 'cache fully evicted after rebuild (OOM fix #82)');
});

it('refuses to run while a live hub holds the index', async () => {
hub.readHubLock = () => ({ pid: process.pid, port: 5577 }); // our own pid is alive
try {
Expand Down
Loading