Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions src/functions/evict.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import type {
RawObservation,
SessionSummary,
Memory,
GraphNode,
GraphEdge,
} from "../types.js";
import { KV } from "../state/schema.js";
import { StateKV } from "../state/kv.js";
Expand Down Expand Up @@ -34,6 +36,9 @@ interface EvictionStats {
capEvictions: number;
expiredMemories: number;
nonLatestMemories: number;
staleGraphNodes?: number;
staleGraphEdges?: number;
isolatedNodes?: number;
dryRun: boolean;
}

Expand Down Expand Up @@ -340,6 +345,94 @@ export function registerEvictFunction(sdk: ISdk, kv: StateKV): void {
}
}

// === Fix 1: Stale graph node/edge garbage collection ===
// Single KV scan — derive stale and all lists from the same result
const allNodes = await kv.list<GraphNode>(KV.graphNodes);
const allEdges = await kv.list<GraphEdge>(KV.graphEdges);
const staleNodes = allNodes.filter((n) => n.stale);
const staleEdges = allEdges.filter((e) => e.stale);

for (const node of staleNodes) {
if (stats.staleGraphNodes === undefined) stats.staleGraphNodes = 0;
if (dryRun) {
stats.staleGraphNodes++;
continue;
}
try {
await kv.delete(KV.graphNodes, node.id);
stats.staleGraphNodes++;
await recordAudit(kv, "delete", "mem::evict", [node.id], {
resource: "graph-node",
reason: "stale",
dryRun,
});
} catch (err) {
logger.warn("Eviction stale node delete failed", {
nodeId: node.id,
error: err instanceof Error ? err.message : String(err),
});
}
}
for (const edge of staleEdges) {
if (stats.staleGraphEdges === undefined) stats.staleGraphEdges = 0;
if (dryRun) {
stats.staleGraphEdges++;
continue;
}
try {
await kv.delete(KV.graphEdges, edge.id);
stats.staleGraphEdges++;
await recordAudit(kv, "delete", "mem::evict", [edge.id], {
resource: "graph-edge",
reason: "stale",
dryRun,
});
} catch (err) {
logger.warn("Eviction stale edge delete failed", {
edgeId: edge.id,
error: err instanceof Error ? err.message : String(err),
});
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// === Fix 3: Isolated node cleanup (0 edges, no observations, >7 days old) ===
// Reuses allNodes/allEdges from Fix 1 above
const nowTime = Date.now();
const SEVEN_DAYS_MS = 7 * 24 * 60 * 60 * 1000;

for (const node of allNodes) {
if (node.stale) continue; // handled above
const hasEdges = allEdges.some(
(e) => e.sourceNodeId === node.id || e.targetNodeId === node.id,
);
if (hasEdges) continue;
const hasObs = (node.sourceObservationIds ?? []).length > 0;
if (hasObs) continue;
const createdAtMs = node.createdAt ? new Date(node.createdAt).getTime() : NaN;
if (!Number.isFinite(createdAtMs)) continue; // unknown age — preserve
if (nowTime - createdAtMs < SEVEN_DAYS_MS) continue; // keep recent nodes
if (dryRun) {
if (stats.isolatedNodes === undefined) stats.isolatedNodes = 0;
stats.isolatedNodes++;
} else {
try {
await kv.delete(KV.graphNodes, node.id);
if (stats.isolatedNodes === undefined) stats.isolatedNodes = 0;
stats.isolatedNodes++;
await recordAudit(kv, "delete", "mem::evict", [node.id], {
resource: "graph-node",
reason: "isolated_no_edges_no_observations",
dryRun,
});
} catch (err) {
logger.warn("Eviction isolated node delete failed", {
nodeId: node.id,
error: err instanceof Error ? err.message : String(err),
});
}
}
}

logger.info("Eviction complete", { stats });
return stats;
},
Expand Down
51 changes: 45 additions & 6 deletions src/functions/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type {
CompressedObservation,
MemoryProvider,
} from "../types.js";
import { KV, generateId } from "../state/schema.js";
import { KV, generateId, jaccardSimilarity } from "../state/schema.js";
import type { StateKV } from "../state/kv.js";
import {
GRAPH_EXTRACTION_SYSTEM,
Expand Down Expand Up @@ -399,6 +399,9 @@ function parseGraphXml(
const type = attrs["type"] as GraphNode["type"] | undefined;
const name = attrs["name"];
if (!type || !name) return;
const VALID_TYPES = new Set(["file", "function", "concept", "error", "decision", "pattern", "library", "person"]);
// Must match types listed in GRAPH_EXTRACTION_SYSTEM prompt — update both together
if (!VALID_TYPES.has(type)) return; // skip corrupt/invalid types from LLM
const properties: Record<string, string> = {};
const propRegex = /<property\s+key="([^"]+)">([^<]*)<\/property>/g;
let propMatch;
Expand Down Expand Up @@ -490,6 +493,9 @@ export function registerGraphFunction(
let newNodeCount = 0;
let newEdgeCount = 0;
const newEdgesForTopCheck: GraphEdge[] = [];
// #813: cache all nodes once for fuzzy dedup, avoiding per-node kv.list calls
const allNodes = await kv.list<GraphNode>(KV.graphNodes);
Comment on lines +496 to +497
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Avoid reintroducing a full graph scan in mem::graph-extract.

Line 497 brings back the exact kv.list<GraphNode>(KV.graphNodes) enumeration that Lines 486-490 say was removed because large corpora can blow the iii heartbeat budget. Since this now runs on every extract and has no timeout/fallback, fuzzy dedup can make extraction fail again at scale. Please move the fuzzy lookup onto a narrower index/candidate set instead of a whole-scope list.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/functions/graph.ts` around lines 496 - 497, The change reintroduces a
full-scope kv.list<GraphNode>(KV.graphNodes) inside mem::graph-extract
(allNodes) which will re-trigger full graph scans at scale; replace this with a
targeted candidate lookup for fuzzy dedup instead of enumerating KV.graphNodes:
e.g., consult an existing narrower index or embedding nearest-neighbour store
(or a prefix/time-windowed index, recentIds list, or KV.graphIndex) to produce a
small candidate id set, then fetch only those nodes via kv.get/kv.mget and run
fuzzy dedup against that set; ensure the new flow keeps the variable name
(allNodes or candidates) but limits size and is bounded/timeboxed to avoid
heartbeat/timeout issues.

const nodeIdMap = new Map<string, string>();
Comment on lines +496 to +498
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep the fuzzy-dedup candidate cache in sync within the same extraction run.

allNodes is snapshotted once before the loop, then never updated after Line 537 inserts a new node. If one LLM response emits two similar names in the same batch, the second node will miss the fuzzy match and be persisted as a duplicate because it only searches the pre-existing corpus.

Suggested fix
         } else {
           await Promise.all([
             kv.set(KV.graphNodes, node.id, node),
             kv.set(KV.graphNameIndex, indexKey, node.id),
             kv.set(KV.graphNodeDegree, node.id, 0),
           ]);
+          allNodes.push(node);
           snap.stats.totalNodes += 1;
           snap.stats.nodesByType[node.type] =
             (snap.stats.nodesByType[node.type] ?? 0) + 1;
           newNodeCount += 1;
           nodeIdMap.set(node.id, node.id);

Also applies to: 536-545

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/functions/graph.ts` around lines 496 - 498, The in-memory fuzzy-dedup
cache (allNodes and nodeIdMap) is snapshotted once and never updated when you
insert new nodes, so subsequent names in the same extraction run can be
persisted as duplicates; after you create/persist a new GraphNode (the code path
that inserts to KV.graphNodes), append that new node into the allNodes array and
update nodeIdMap accordingly (or insert into whatever in-memory index you use
for fuzzy matching) so future fuzzy-match checks in the same run will see it;
locate usages of allNodes/nodeIdMap and the block that persists a new node and
add a single-step cache update there.


for (const node of nodes) {
const indexKey = nameIndexKey(node.type, node.name);
Expand All @@ -501,8 +507,20 @@ export function registerGraphFunction(
let existing: GraphNode | null = null;
if (existingId) {
existing = await kv.get<GraphNode>(KV.graphNodes, existingId);
// #813: skip stale nodes — they shouldn't merge
if (existing?.stale) existing = null;
}

// #813: Fuzzy dedup via Jaccard similarity (if exact match failed)
if (!existing && node.name.length >= 3) {
existing = allNodes.find(
(n) =>
!n.stale &&
n.type === node.type &&
n.name.length >= 3 &&
jaccardSimilarity(n.name.toLowerCase(), node.name.toLowerCase()) > 0.8,
) ?? null;
}
if (existing) {
const merged = mergeNode(existing, node, obsIds, capturedAt);
await kv.set(KV.graphNodes, existing.id, merged);
Expand All @@ -512,14 +530,19 @@ export function registerGraphFunction(
(n) => n.id === existing!.id,
);
if (topIdx !== -1) snap.topNodes[topIdx] = merged;
// #813: track mapping from parsed ID -> persisted ID
nodeIdMap.set(node.id, existing.id);
} else {
await kv.set(KV.graphNodes, node.id, node);
await kv.set(KV.graphNameIndex, indexKey, node.id);
await kv.set(KV.graphNodeDegree, node.id, 0);
await Promise.all([
kv.set(KV.graphNodes, node.id, node),
kv.set(KV.graphNameIndex, indexKey, node.id),
kv.set(KV.graphNodeDegree, node.id, 0),
]);
snap.stats.totalNodes += 1;
snap.stats.nodesByType[node.type] =
(snap.stats.nodesByType[node.type] ?? 0) + 1;
newNodeCount += 1;
nodeIdMap.set(node.id, node.id);
if (snap.topNodes.length < SNAPSHOT_TOP_NODES) {
// Degree 0 still beats an empty slot — sit at the tail
// until edges arrive and promote.
Expand All @@ -530,6 +553,20 @@ export function registerGraphFunction(
}

for (const edge of edges) {
// #813: remap transient parsed node IDs to persisted node IDs
const srcId = nodeIdMap.get(edge.sourceNodeId);
const tgtId = nodeIdMap.get(edge.targetNodeId);
if (!srcId || !tgtId) {
logger.warn("Edge references unknown node, skipping", {
edgeId: edge.id,
sourceNodeId: edge.sourceNodeId,
targetNodeId: edge.targetNodeId,
});
continue;
}
edge.sourceNodeId = srcId;
edge.targetNodeId = tgtId;

const eKey = edgeIndexKey(
edge.sourceNodeId,
edge.targetNodeId,
Expand All @@ -551,8 +588,10 @@ export function registerGraphFunction(
);
if (topIdx !== -1) snap.topEdges[topIdx] = merged;
} else {
await kv.set(KV.graphEdges, edge.id, edge);
await kv.set(KV.graphEdgeKey, eKey, edge.id);
await Promise.all([
kv.set(KV.graphEdges, edge.id, edge),
kv.set(KV.graphEdgeKey, eKey, edge.id),
]);
snap.stats.totalEdges += 1;
snap.stats.edgesByType[edge.type] =
(snap.stats.edgesByType[edge.type] ?? 0) + 1;
Expand Down
42 changes: 27 additions & 15 deletions src/triggers/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { isGraphExtractionEnabled } from "../config.js";
import { logger } from "../logger.js";

export function registerEventTriggers(sdk: ISdk, kv: StateKV): void {
const extractionLocks = new Set<string>();
sdk.registerFunction(
"event::session::started",
async (data: { sessionId: string; project: string; cwd: string }) => {
Expand Down Expand Up @@ -61,23 +62,34 @@ export function registerEventTriggers(sdk: ISdk, kv: StateKV): void {
}
}
if (isGraphExtractionEnabled()) {
try {
const observations = await kv.list<CompressedObservation>(
KV.observations(data.sessionId),
);
const compressed = observations.filter((o) => o.title);
if (compressed.length > 0) {
sdk.trigger({
function_id: "mem::graph-extract",
payload: { observations: compressed },
action: TriggerAction.Void(),
// In-process dedup guard: prevents redundant graph-extract triggers
// for the same session. Single-process only — distributed deployments
// rely on graph-extract's own idempotency.
const lockKey = `graph-extract-${data.sessionId}`;
if (extractionLocks.has(lockKey)) {
logger.info("Skipping duplicate graph-extract (lock held)", { sessionId: data.sessionId });
} else {
extractionLocks.add(lockKey);
try {
const observations = await kv.list<CompressedObservation>(
KV.observations(data.sessionId),
);
const compressed = observations.filter((o) => o.title);
if (compressed.length > 0) {
await sdk.trigger({
function_id: "mem::graph-extract",
payload: { observations: compressed },
action: TriggerAction.Void(),
});
}
} catch (err) {
logger.warn("graph-extract trigger failed", {
sessionId: data.sessionId,
error: err instanceof Error ? err.message : String(err),
});
} finally {
extractionLocks.delete(lockKey);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
} catch (err) {
logger.warn("graph-extract trigger failed", {
sessionId: data.sessionId,
error: err instanceof Error ? err.message : String(err),
});
}
}
return summary;
Expand Down
Loading