Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 113 additions & 3 deletions packages/agent-memory/src/MemoryStore.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import { randomUUID } from 'node:crypto';
import { encodeFloat32, parseFtSearchResponse } from '@betterdb/valkey-search-kit';
import { buildMemoryRecord } from './buildMemoryRecord';
import { buildRecallQuery, buildScopeFilter, SCORE_FIELD } from './buildRecallQuery';
import {
buildConsolidateFilter,
buildRecallQuery,
buildScopeFilter,
SCORE_FIELD,
} from './buildRecallQuery';
import { parseMemoryItem } from './parseMemoryItem';
import { compositeScore, similarityFromDistance, type RecallWeights } from './compositeScore';
import { selectEvictions, type EvictionCandidate } from './selectEvictions';
import type {
ConsolidateOptions,
ConsolidateResult,
EmbedFn,
MemoryHit,
MemoryScope,
Expand All @@ -22,6 +29,9 @@ const RECALL_OVERFETCH = 4;
const FORGET_BATCH_SIZE = 500;
const FORGET_MAX_BATCHES = 10000;
const EVICTION_SCAN_LIMIT = 10000;
const CONSOLIDATE_SCAN_LIMIT = 10000;
const DEFAULT_SUMMARY_IMPORTANCE = 0.7;
const SUMMARY_SOURCE = 'summary';

export interface MemoryStoreOptions {
client: MemoryStoreClient;
Expand Down Expand Up @@ -192,18 +202,117 @@ export class MemoryStore {
return deleted;
}

async remember(content: string, options: RememberOptions = {}): Promise<string> {
private async writeMemory(
content: string,
options: RememberOptions,
now: number,
): Promise<string> {
const vector = await this.embed(content);
const id = randomUUID();
const now = Date.now();
const record = buildMemoryRecord(this.name, id, content, vector, options, now);
await this.writeRecord(record.key, record.fields, options.ttl);
return id;
}

async remember(content: string, options: RememberOptions = {}): Promise<string> {
const now = Date.now();
const id = await this.writeMemory(content, options, now);
// Capacity enforcement is best-effort: the memory is already durably stored,
// so a failed eviction pass must not reject an otherwise successful write.
await this.enforceCapacity(options, now).catch(() => undefined);
return id;
}

async consolidate(options: ConsolidateOptions): Promise<ConsolidateResult> {
const now = Date.now();
const tags = options.tags ?? [];
const scope: MemoryScope = {
threadId: options.threadId,
agentId: options.agentId,
namespace: options.namespace,
};

const hasCriteria =
scope.threadId !== undefined ||
scope.agentId !== undefined ||
scope.namespace !== undefined ||
tags.length > 0 ||
options.olderThanSeconds !== undefined ||
options.maxImportance !== undefined;
if (!hasCriteria) {
throw new Error(
'consolidate requires a scope, tags, olderThanSeconds, or maxImportance to select candidates',
);
}

// Push olderThanSeconds/maxImportance into the query (both are NUMERIC
// indexed) so the scan limit applies to actual matches, not an arbitrary
// first window, and we don't transfer rows we'd only discard. Prior
// summaries are always excluded (-@source:{summary}) so consolidation never
// re-folds its own output into a new summary.
const filter = buildConsolidateFilter(scope, tags, {
maxCreatedAt:
options.olderThanSeconds !== undefined
? now - options.olderThanSeconds * 1000
: undefined,
maxImportance: options.maxImportance,
excludeSource: SUMMARY_SOURCE,
});
const raw = await this.client.call(
'FT.SEARCH',
`${this.name}:mem:idx`,
filter,
'RETURN',
'10',
'content',
'importance',
'tags',
'created_at',
'last_accessed_at',
'access_count',
'source',
'threadId',
'agentId',
'namespace',
'LIMIT',
'0',
String(CONSOLIDATE_SCAN_LIMIT),
'DIALECT',
'2',
);
const candidates = parseFtSearchResponse(raw).map((hit) => parseMemoryItem(this.name, hit));

if (candidates.length === 0) {
return { consolidated: 0, created: [], deleted: 0 };
Comment thread
jamby77 marked this conversation as resolved.
}

// Write the summary before deleting sources so a failure can never destroy
// memories without leaving their consolidated replacement behind. Use the
// capacity-free write path: consolidation is a net reduction (N sources -> 1
// summary), and the sources still inflate the scope here, so an enforceCapacity
// pass could otherwise evict the summary we just wrote and then delete the
// sources — losing the content entirely.
const summary = await options.summarize(candidates);
const summaryId = await this.writeMemory(
summary,
{
...scope,
tags,
source: SUMMARY_SOURCE,
importance: options.summaryImportance ?? DEFAULT_SUMMARY_IMPORTANCE,
},
now,
);

let deleted = 0;
if (options.deleteSources !== false) {
const keys = candidates.map((item) => `${this.name}:mem:${item.id}`);
deleted = Number(await this.client.call('DEL', ...keys));
}
Comment thread
cursor[bot] marked this conversation as resolved.

return { consolidated: candidates.length, created: [summaryId], deleted };
}

private async writeRecord(key: string, fields: (string | Buffer)[], ttl?: number): Promise<void> {
if (ttl === undefined || ttl <= 0) {
await this.client.call('HSET', key, ...fields);
Expand Down Expand Up @@ -323,3 +432,4 @@ function ftSearchTotal(raw: unknown): number {
const total = typeof raw[0] === 'string' ? parseInt(raw[0], 10) : Number(raw[0]);
return Number.isFinite(total) && total > 0 ? total : 0;
}

217 changes: 217 additions & 0 deletions packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
import { describe, it, expect, vi } from 'vitest';
import { MemoryStore } from '../MemoryStore';
import { fakeEmbed } from './helpers/fakeEmbed';
import { mockClient } from './helpers/mockClient';

const now = Date.now();

interface HitSpec {
importance: number;
ageSeconds: number;
source?: string;
}

function itemHit(id: string, spec: HitSpec): [string, string[]] {
const created = now - spec.ageSeconds * 1000;
const fields: Record<string, string> = {
content: `c-${id}`,
importance: String(spec.importance),
created_at: String(created),
last_accessed_at: String(created),
access_count: '0',
};
if (spec.source !== undefined) {
fields.source = spec.source;
}
const flat: string[] = [];
for (const [field, value] of Object.entries(fields)) {
flat.push(field, value);
}
return [`mem:mem:${id}`, flat];
}

function searchReply(hits: Array<[string, string[]]>): unknown[] {
const out: unknown[] = [String(hits.length)];
for (const [key, flat] of hits) {
out.push(key, flat);
}
return out;
}

function consolidatingClient(hits: Array<[string, string[]]>) {
return mockClient((command, ...args) => {
if (command === 'FT.SEARCH') {
return searchReply(hits);
}
if (command === 'DEL') {
return args.length;
}
return 'OK';
});
}

function fieldValue(call: unknown[] | undefined, field: string): string | undefined {
if (!call) {
return undefined;
}
const idx = call.indexOf(field);
return idx >= 0 ? (call[idx + 1] as string) : undefined;
}

describe('MemoryStore.consolidate', () => {
it('summarizes matching candidates, writes a summary, deletes sources, returns counts', async () => {
const summarize = vi.fn(async (items) => `summary of ${items.length}`);
const client = consolidatingClient([
itemHit('a', { importance: 0.2, ageSeconds: 100000 }),
itemHit('b', { importance: 0.3, ageSeconds: 200000 }),
]);
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

const result = await store.consolidate({
namespace: 'u1',
olderThanSeconds: 3600,
maxImportance: 0.5,
summarize,
});

expect(summarize).toHaveBeenCalledTimes(1);
expect(summarize.mock.calls[0][0].map((i: { id: string }) => i.id)).toEqual(['a', 'b']);
expect(result.consolidated).toBe(2);
expect(result.created).toHaveLength(1);
expect(result.deleted).toBe(2);

const hset = client.call.mock.calls.find((c) => c[0] === 'HSET');
expect(fieldValue(hset, 'content')).toBe('summary of 2');
expect(fieldValue(hset, 'source')).toBe('summary');
expect(hset?.[1]).toBe(`mem:mem:${result.created[0]}`);

const del = client.call.mock.calls.find((c) => c[0] === 'DEL');
expect(del?.slice(1).sort()).toEqual(['mem:mem:a', 'mem:mem:b']);
});

it('pushes olderThanSeconds into the query as a created_at upper bound', async () => {
const summarize = vi.fn(async () => 'summary');
const client = consolidatingClient([itemHit('a', { importance: 0.2, ageSeconds: 100000 })]);
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

await store.consolidate({ olderThanSeconds: 3600, summarize });

const search = client.call.mock.calls.find((c) => c[0] === 'FT.SEARCH');
const filter = search?.[2] as string;
// Server-side range so the scan limit applies to actual matches, not an
// arbitrary first window.
expect(filter).toMatch(/@created_at:\[-inf \d+\]/);
});

it('pushes maxImportance into the query as an importance upper bound', async () => {
const summarize = vi.fn(async () => 'summary');
const client = consolidatingClient([itemHit('a', { importance: 0.2, ageSeconds: 100000 })]);
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

await store.consolidate({ maxImportance: 0.5, summarize });

const search = client.call.mock.calls.find((c) => c[0] === 'FT.SEARCH');
expect(search?.[2] as string).toContain('@importance:[-inf 0.5]');
});

it('excludes prior summaries from the candidate scan so a default run does not re-fold them', async () => {
const summarize = vi.fn(async () => 'summary');
const client = consolidatingClient([itemHit('a', { importance: 0.2, ageSeconds: 100000 })]);
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

await store.consolidate({ namespace: 'u1', summarize });

const search = client.call.mock.calls.find((c) => c[0] === 'FT.SEARCH');
expect(search?.[2] as string).toContain('-@source:{summary}');
});

it('writes the summary scoped to the request at summaryImportance', async () => {
const summarize = vi.fn(async () => 'merged');
const client = consolidatingClient([itemHit('a', { importance: 0.1, ageSeconds: 100000 })]);
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

await store.consolidate({ namespace: 'u1', summarize, summaryImportance: 0.9 });

const hset = client.call.mock.calls.find((c) => c[0] === 'HSET');
expect(fieldValue(hset, 'importance')).toBe('0.9');
expect(fieldValue(hset, 'namespace')).toBe('u1');
expect(fieldValue(hset, 'source')).toBe('summary');
});

it('keeps sources when deleteSources is false', async () => {
const summarize = vi.fn(async () => 'summary');
const client = consolidatingClient([
itemHit('a', { importance: 0.2, ageSeconds: 100000 }),
itemHit('b', { importance: 0.2, ageSeconds: 100000 }),
]);
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

const result = await store.consolidate({
summarize,
deleteSources: false,
olderThanSeconds: 3600,
});

expect(result.consolidated).toBe(2);
expect(result.created).toHaveLength(1);
expect(result.deleted).toBe(0);
expect(client.call.mock.calls.some((c) => c[0] === 'DEL')).toBe(false);
});

it('returns zeros and does not summarize or write when nothing matches', async () => {
const summarize = vi.fn(async () => 'summary');
const client = consolidatingClient([]);
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

const result = await store.consolidate({ olderThanSeconds: 3600, summarize });

expect(summarize).not.toHaveBeenCalled();
expect(result).toEqual({ consolidated: 0, created: [], deleted: 0 });
expect(client.call.mock.calls.some((c) => c[0] === 'HSET')).toBe(false);
expect(client.call.mock.calls.some((c) => c[0] === 'DEL')).toBe(false);
});

it('throws when given no scope, tags, or selection criteria (prevents whole-store consolidation)', async () => {
const summarize = vi.fn(async () => 'summary');
const store = new MemoryStore({ client: mockClient(), name: 'mem', embedFn: fakeEmbed(8) });

await expect(store.consolidate({ summarize })).rejects.toThrow(/scope|criteria/i);
expect(summarize).not.toHaveBeenCalled();
});

it('defaults summaryImportance to 0.7 and deletes sources by default', async () => {
const summarize = vi.fn(async () => 'summary');
const client = consolidatingClient([itemHit('a', { importance: 0.2, ageSeconds: 100000 })]);
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

const result = await store.consolidate({ summarize, olderThanSeconds: 3600 });

const hset = client.call.mock.calls.find((c) => c[0] === 'HSET');
expect(fieldValue(hset, 'importance')).toBe('0.7');
expect(result.deleted).toBe(1);
});

it('writes the summary without a capacity pass so it cannot be evicted then orphaned', async () => {
const summarize = vi.fn(async () => 'summary');
const client = consolidatingClient([
itemHit('a', { importance: 0.2, ageSeconds: 100000 }),
itemHit('b', { importance: 0.2, ageSeconds: 100000 }),
]);
const store = new MemoryStore({
client,
name: 'mem',
embedFn: fakeEmbed(8),
maxItemsPerScope: 1,
});

const result = await store.consolidate({ namespace: 'u1', summarize });

expect(result.created).toHaveLength(1);
expect(client.call.mock.calls.some((c) => c[0] === 'HSET')).toBe(true);
// Exactly one FT.SEARCH (the candidate scan): the summary write triggers no
// capacity probe/scan, so enforceCapacity can't evict the just-written summary
// while the sources still inflate the count.
const searches = client.call.mock.calls.filter((c) => c[0] === 'FT.SEARCH');
expect(searches).toHaveLength(1);
});
});
Loading
Loading