diff --git a/packages/agent-memory/src/MemoryStore.ts b/packages/agent-memory/src/MemoryStore.ts index c14f0262..0fd47ad9 100644 --- a/packages/agent-memory/src/MemoryStore.ts +++ b/packages/agent-memory/src/MemoryStore.ts @@ -1,11 +1,18 @@ import { randomUUID } from 'node:crypto'; import { encodeFloat32, parseFtSearchResponse } from '@betterdb/valkey-search-kit'; import { buildMemoryRecord } from './buildMemoryRecord'; -import { buildRecallQuery, buildScopeFilter, SCORE_FIELD } from './buildRecallQuery'; +import { + buildConsolidateFilter, + buildRecallQuery, + buildScopeFilter, + SCORE_FIELD, +} from './buildRecallQuery'; import { parseMemoryItem } from './parseMemoryItem'; import { compositeScore, similarityFromDistance, type RecallWeights } from './compositeScore'; import { selectEvictions, type EvictionCandidate } from './selectEvictions'; import type { + ConsolidateOptions, + ConsolidateResult, EmbedFn, MemoryHit, MemoryScope, @@ -22,6 +29,9 @@ const RECALL_OVERFETCH = 4; const FORGET_BATCH_SIZE = 500; const FORGET_MAX_BATCHES = 10000; const EVICTION_SCAN_LIMIT = 10000; +const CONSOLIDATE_SCAN_LIMIT = 10000; +const DEFAULT_SUMMARY_IMPORTANCE = 0.7; +const SUMMARY_SOURCE = 'summary'; export interface MemoryStoreOptions { client: MemoryStoreClient; @@ -192,18 +202,117 @@ export class MemoryStore { return deleted; } - async remember(content: string, options: RememberOptions = {}): Promise { + private async writeMemory( + content: string, + options: RememberOptions, + now: number, + ): Promise { const vector = await this.embed(content); const id = randomUUID(); - const now = Date.now(); const record = buildMemoryRecord(this.name, id, content, vector, options, now); await this.writeRecord(record.key, record.fields, options.ttl); + return id; + } + + async remember(content: string, options: RememberOptions = {}): Promise { + const now = Date.now(); + const id = await this.writeMemory(content, options, now); // Capacity enforcement is best-effort: the memory is already durably stored, // so a failed eviction pass must not reject an otherwise successful write. await this.enforceCapacity(options, now).catch(() => undefined); return id; } + async consolidate(options: ConsolidateOptions): Promise { + const now = Date.now(); + const tags = options.tags ?? []; + const scope: MemoryScope = { + threadId: options.threadId, + agentId: options.agentId, + namespace: options.namespace, + }; + + const hasCriteria = + scope.threadId !== undefined || + scope.agentId !== undefined || + scope.namespace !== undefined || + tags.length > 0 || + options.olderThanSeconds !== undefined || + options.maxImportance !== undefined; + if (!hasCriteria) { + throw new Error( + 'consolidate requires a scope, tags, olderThanSeconds, or maxImportance to select candidates', + ); + } + + // Push olderThanSeconds/maxImportance into the query (both are NUMERIC + // indexed) so the scan limit applies to actual matches, not an arbitrary + // first window, and we don't transfer rows we'd only discard. Prior + // summaries are always excluded (-@source:{summary}) so consolidation never + // re-folds its own output into a new summary. + const filter = buildConsolidateFilter(scope, tags, { + maxCreatedAt: + options.olderThanSeconds !== undefined + ? now - options.olderThanSeconds * 1000 + : undefined, + maxImportance: options.maxImportance, + excludeSource: SUMMARY_SOURCE, + }); + const raw = await this.client.call( + 'FT.SEARCH', + `${this.name}:mem:idx`, + filter, + 'RETURN', + '10', + 'content', + 'importance', + 'tags', + 'created_at', + 'last_accessed_at', + 'access_count', + 'source', + 'threadId', + 'agentId', + 'namespace', + 'LIMIT', + '0', + String(CONSOLIDATE_SCAN_LIMIT), + 'DIALECT', + '2', + ); + const candidates = parseFtSearchResponse(raw).map((hit) => parseMemoryItem(this.name, hit)); + + if (candidates.length === 0) { + return { consolidated: 0, created: [], deleted: 0 }; + } + + // Write the summary before deleting sources so a failure can never destroy + // memories without leaving their consolidated replacement behind. Use the + // capacity-free write path: consolidation is a net reduction (N sources -> 1 + // summary), and the sources still inflate the scope here, so an enforceCapacity + // pass could otherwise evict the summary we just wrote and then delete the + // sources — losing the content entirely. + const summary = await options.summarize(candidates); + const summaryId = await this.writeMemory( + summary, + { + ...scope, + tags, + source: SUMMARY_SOURCE, + importance: options.summaryImportance ?? DEFAULT_SUMMARY_IMPORTANCE, + }, + now, + ); + + let deleted = 0; + if (options.deleteSources !== false) { + const keys = candidates.map((item) => `${this.name}:mem:${item.id}`); + deleted = Number(await this.client.call('DEL', ...keys)); + } + + return { consolidated: candidates.length, created: [summaryId], deleted }; + } + private async writeRecord(key: string, fields: (string | Buffer)[], ttl?: number): Promise { if (ttl === undefined || ttl <= 0) { await this.client.call('HSET', key, ...fields); @@ -323,3 +432,4 @@ function ftSearchTotal(raw: unknown): number { const total = typeof raw[0] === 'string' ? parseInt(raw[0], 10) : Number(raw[0]); return Number.isFinite(total) && total > 0 ? total : 0; } + diff --git a/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts b/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts new file mode 100644 index 00000000..539d0b25 --- /dev/null +++ b/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts @@ -0,0 +1,217 @@ +import { describe, it, expect, vi } from 'vitest'; +import { MemoryStore } from '../MemoryStore'; +import { fakeEmbed } from './helpers/fakeEmbed'; +import { mockClient } from './helpers/mockClient'; + +const now = Date.now(); + +interface HitSpec { + importance: number; + ageSeconds: number; + source?: string; +} + +function itemHit(id: string, spec: HitSpec): [string, string[]] { + const created = now - spec.ageSeconds * 1000; + const fields: Record = { + content: `c-${id}`, + importance: String(spec.importance), + created_at: String(created), + last_accessed_at: String(created), + access_count: '0', + }; + if (spec.source !== undefined) { + fields.source = spec.source; + } + const flat: string[] = []; + for (const [field, value] of Object.entries(fields)) { + flat.push(field, value); + } + return [`mem:mem:${id}`, flat]; +} + +function searchReply(hits: Array<[string, string[]]>): unknown[] { + const out: unknown[] = [String(hits.length)]; + for (const [key, flat] of hits) { + out.push(key, flat); + } + return out; +} + +function consolidatingClient(hits: Array<[string, string[]]>) { + return mockClient((command, ...args) => { + if (command === 'FT.SEARCH') { + return searchReply(hits); + } + if (command === 'DEL') { + return args.length; + } + return 'OK'; + }); +} + +function fieldValue(call: unknown[] | undefined, field: string): string | undefined { + if (!call) { + return undefined; + } + const idx = call.indexOf(field); + return idx >= 0 ? (call[idx + 1] as string) : undefined; +} + +describe('MemoryStore.consolidate', () => { + it('summarizes matching candidates, writes a summary, deletes sources, returns counts', async () => { + const summarize = vi.fn(async (items) => `summary of ${items.length}`); + const client = consolidatingClient([ + itemHit('a', { importance: 0.2, ageSeconds: 100000 }), + itemHit('b', { importance: 0.3, ageSeconds: 200000 }), + ]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + const result = await store.consolidate({ + namespace: 'u1', + olderThanSeconds: 3600, + maxImportance: 0.5, + summarize, + }); + + expect(summarize).toHaveBeenCalledTimes(1); + expect(summarize.mock.calls[0][0].map((i: { id: string }) => i.id)).toEqual(['a', 'b']); + expect(result.consolidated).toBe(2); + expect(result.created).toHaveLength(1); + expect(result.deleted).toBe(2); + + const hset = client.call.mock.calls.find((c) => c[0] === 'HSET'); + expect(fieldValue(hset, 'content')).toBe('summary of 2'); + expect(fieldValue(hset, 'source')).toBe('summary'); + expect(hset?.[1]).toBe(`mem:mem:${result.created[0]}`); + + const del = client.call.mock.calls.find((c) => c[0] === 'DEL'); + expect(del?.slice(1).sort()).toEqual(['mem:mem:a', 'mem:mem:b']); + }); + + it('pushes olderThanSeconds into the query as a created_at upper bound', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([itemHit('a', { importance: 0.2, ageSeconds: 100000 })]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + await store.consolidate({ olderThanSeconds: 3600, summarize }); + + const search = client.call.mock.calls.find((c) => c[0] === 'FT.SEARCH'); + const filter = search?.[2] as string; + // Server-side range so the scan limit applies to actual matches, not an + // arbitrary first window. + expect(filter).toMatch(/@created_at:\[-inf \d+\]/); + }); + + it('pushes maxImportance into the query as an importance upper bound', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([itemHit('a', { importance: 0.2, ageSeconds: 100000 })]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + await store.consolidate({ maxImportance: 0.5, summarize }); + + const search = client.call.mock.calls.find((c) => c[0] === 'FT.SEARCH'); + expect(search?.[2] as string).toContain('@importance:[-inf 0.5]'); + }); + + it('excludes prior summaries from the candidate scan so a default run does not re-fold them', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([itemHit('a', { importance: 0.2, ageSeconds: 100000 })]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + await store.consolidate({ namespace: 'u1', summarize }); + + const search = client.call.mock.calls.find((c) => c[0] === 'FT.SEARCH'); + expect(search?.[2] as string).toContain('-@source:{summary}'); + }); + + it('writes the summary scoped to the request at summaryImportance', async () => { + const summarize = vi.fn(async () => 'merged'); + const client = consolidatingClient([itemHit('a', { importance: 0.1, ageSeconds: 100000 })]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + await store.consolidate({ namespace: 'u1', summarize, summaryImportance: 0.9 }); + + const hset = client.call.mock.calls.find((c) => c[0] === 'HSET'); + expect(fieldValue(hset, 'importance')).toBe('0.9'); + expect(fieldValue(hset, 'namespace')).toBe('u1'); + expect(fieldValue(hset, 'source')).toBe('summary'); + }); + + it('keeps sources when deleteSources is false', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([ + itemHit('a', { importance: 0.2, ageSeconds: 100000 }), + itemHit('b', { importance: 0.2, ageSeconds: 100000 }), + ]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + const result = await store.consolidate({ + summarize, + deleteSources: false, + olderThanSeconds: 3600, + }); + + expect(result.consolidated).toBe(2); + expect(result.created).toHaveLength(1); + expect(result.deleted).toBe(0); + expect(client.call.mock.calls.some((c) => c[0] === 'DEL')).toBe(false); + }); + + it('returns zeros and does not summarize or write when nothing matches', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + const result = await store.consolidate({ olderThanSeconds: 3600, summarize }); + + expect(summarize).not.toHaveBeenCalled(); + expect(result).toEqual({ consolidated: 0, created: [], deleted: 0 }); + expect(client.call.mock.calls.some((c) => c[0] === 'HSET')).toBe(false); + expect(client.call.mock.calls.some((c) => c[0] === 'DEL')).toBe(false); + }); + + it('throws when given no scope, tags, or selection criteria (prevents whole-store consolidation)', async () => { + const summarize = vi.fn(async () => 'summary'); + const store = new MemoryStore({ client: mockClient(), name: 'mem', embedFn: fakeEmbed(8) }); + + await expect(store.consolidate({ summarize })).rejects.toThrow(/scope|criteria/i); + expect(summarize).not.toHaveBeenCalled(); + }); + + it('defaults summaryImportance to 0.7 and deletes sources by default', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([itemHit('a', { importance: 0.2, ageSeconds: 100000 })]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + const result = await store.consolidate({ summarize, olderThanSeconds: 3600 }); + + const hset = client.call.mock.calls.find((c) => c[0] === 'HSET'); + expect(fieldValue(hset, 'importance')).toBe('0.7'); + expect(result.deleted).toBe(1); + }); + + it('writes the summary without a capacity pass so it cannot be evicted then orphaned', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([ + itemHit('a', { importance: 0.2, ageSeconds: 100000 }), + itemHit('b', { importance: 0.2, ageSeconds: 100000 }), + ]); + const store = new MemoryStore({ + client, + name: 'mem', + embedFn: fakeEmbed(8), + maxItemsPerScope: 1, + }); + + const result = await store.consolidate({ namespace: 'u1', summarize }); + + expect(result.created).toHaveLength(1); + expect(client.call.mock.calls.some((c) => c[0] === 'HSET')).toBe(true); + // Exactly one FT.SEARCH (the candidate scan): the summary write triggers no + // capacity probe/scan, so enforceCapacity can't evict the just-written summary + // while the sources still inflate the count. + const searches = client.call.mock.calls.filter((c) => c[0] === 'FT.SEARCH'); + expect(searches).toHaveLength(1); + }); +}); diff --git a/packages/agent-memory/src/__tests__/buildRecallQuery.test.ts b/packages/agent-memory/src/__tests__/buildRecallQuery.test.ts index 2d771f6e..4f9a4359 100644 --- a/packages/agent-memory/src/__tests__/buildRecallQuery.test.ts +++ b/packages/agent-memory/src/__tests__/buildRecallQuery.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from 'vitest'; -import { buildRecallQuery } from '../buildRecallQuery'; +import { buildRecallQuery, buildConsolidateFilter } from '../buildRecallQuery'; describe('buildRecallQuery', () => { it('builds a bare KNN query when there are no filters', () => { @@ -18,3 +18,25 @@ describe('buildRecallQuery', () => { ); }); }); + +describe('buildConsolidateFilter', () => { + it('appends inclusive NUMERIC ranges and a source exclusion to the scope filter', () => { + expect( + buildConsolidateFilter({ namespace: 'u1' }, ['pref'], { + maxCreatedAt: 1000, + maxImportance: 0.5, + excludeSource: 'summary', + }), + ).toBe('(@namespace:{u1} @tags:{pref} @created_at:[-inf 1000] @importance:[-inf 0.5] -@source:{summary})'); + }); + + it('omits absent predicates', () => { + expect(buildConsolidateFilter({ threadId: 't' }, [], { excludeSource: 'summary' })).toBe( + '(@threadId:{t} -@source:{summary})', + ); + }); + + it('still constrains by range when there is no scope', () => { + expect(buildConsolidateFilter({}, [], { maxImportance: 0.3 })).toBe('(@importance:[-inf 0.3])'); + }); +}); diff --git a/packages/agent-memory/src/buildRecallQuery.ts b/packages/agent-memory/src/buildRecallQuery.ts index 4e0c44d3..1bc6e7fc 100644 --- a/packages/agent-memory/src/buildRecallQuery.ts +++ b/packages/agent-memory/src/buildRecallQuery.ts @@ -4,7 +4,7 @@ import type { MemoryScope } from './types'; export const SCORE_FIELD = '__score'; export const VECTOR_FIELD = 'vector'; -export function buildScopeFilter(scope: MemoryScope, tags: string[]): string { +function scopeClauses(scope: MemoryScope, tags: string[]): string[] { const clauses: string[] = []; if (scope.threadId !== undefined) { clauses.push(`@threadId:{${escapeTag(scope.threadId)}}`); @@ -18,12 +18,44 @@ export function buildScopeFilter(scope: MemoryScope, tags: string[]): string { for (const tag of tags) { clauses.push(`@tags:{${escapeTag(tag)}}`); } + return clauses; +} + +function joinClauses(clauses: string[]): string { if (clauses.length === 0) { return '*'; } return `(${clauses.join(' ')})`; } +export function buildScopeFilter(scope: MemoryScope, tags: string[]): string { + return joinClauses(scopeClauses(scope, tags)); +} + +export interface ConsolidateFilterOptions { + maxCreatedAt?: number; + maxImportance?: number; + excludeSource?: string; +} + +export function buildConsolidateFilter( + scope: MemoryScope, + tags: string[], + options: ConsolidateFilterOptions, +): string { + const clauses = scopeClauses(scope, tags); + if (options.maxCreatedAt !== undefined) { + clauses.push(`@created_at:[-inf ${options.maxCreatedAt}]`); + } + if (options.maxImportance !== undefined) { + clauses.push(`@importance:[-inf ${options.maxImportance}]`); + } + if (options.excludeSource !== undefined) { + clauses.push(`-@source:{${escapeTag(options.excludeSource)}}`); + } + return joinClauses(clauses); +} + export function buildRecallQuery(k: number, scope: MemoryScope, tags: string[]): string { return `${buildScopeFilter(scope, tags)}=>[KNN ${k} @${VECTOR_FIELD} $vec AS ${SCORE_FIELD}]`; } diff --git a/packages/agent-memory/src/index.ts b/packages/agent-memory/src/index.ts index d8e7422c..bf1a904a 100644 --- a/packages/agent-memory/src/index.ts +++ b/packages/agent-memory/src/index.ts @@ -10,6 +10,8 @@ export type { MemoryItem, RecallOptions, MemoryHit, + ConsolidateOptions, + ConsolidateResult, } from './types'; export { compositeScore, similarityFromDistance } from './compositeScore'; export type { RecallWeights, CompositeScoreParams } from './compositeScore'; diff --git a/packages/agent-memory/src/types.ts b/packages/agent-memory/src/types.ts index 6577bd4a..d28b477a 100644 --- a/packages/agent-memory/src/types.ts +++ b/packages/agent-memory/src/types.ts @@ -50,3 +50,18 @@ export interface MemoryHit { /** Composite recall score (similarity + recency + importance); higher is better. */ score: number; } + +export interface ConsolidateOptions extends MemoryScope { + olderThanSeconds?: number; + maxImportance?: number; + summarize: (items: MemoryItem[]) => Promise; + deleteSources?: boolean; + summaryImportance?: number; + tags?: string[]; +} + +export interface ConsolidateResult { + consolidated: number; + created: string[]; + deleted: number; +}