From 9c18bb1738c1e01e959ffb4e0b3d343c96e608e0 Mon Sep 17 00:00:00 2001 From: jamby77 Date: Wed, 17 Jun 2026 12:44:00 +0300 Subject: [PATCH 1/3] =?UTF-8?q?feat(agent-memory):=20Phase=206=20=E2=80=94?= =?UTF-8?q?=20consolidate()?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - consolidate() summarizes old/low-importance memories into one durable source:'summary' memory via a caller-supplied summarize(items) callback - Select candidates by scope/tags + olderThanSeconds + maxImportance; write the summary before deleting sources so a failure never destroys memories without leaving the replacement - deleteSources defaults true; summaryImportance defaults 0.7 - Require at least one selection criterion to prevent whole-store consolidation (mirrors forgetByScope's mass-delete guard) - Fetch only the fields parseMemoryItem needs (no vector blobs) on the candidate scan - Add ConsolidateOptions/ConsolidateResult types --- packages/agent-memory/src/MemoryStore.ts | 93 +++++++++ .../__tests__/MemoryStore.consolidate.test.ts | 187 ++++++++++++++++++ packages/agent-memory/src/index.ts | 2 + packages/agent-memory/src/types.ts | 15 ++ 4 files changed, 297 insertions(+) create mode 100644 packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts diff --git a/packages/agent-memory/src/MemoryStore.ts b/packages/agent-memory/src/MemoryStore.ts index c14f0262..7be123b5 100644 --- a/packages/agent-memory/src/MemoryStore.ts +++ b/packages/agent-memory/src/MemoryStore.ts @@ -6,8 +6,11 @@ import { parseMemoryItem } from './parseMemoryItem'; import { compositeScore, similarityFromDistance, type RecallWeights } from './compositeScore'; import { selectEvictions, type EvictionCandidate } from './selectEvictions'; import type { + ConsolidateOptions, + ConsolidateResult, EmbedFn, MemoryHit, + MemoryItem, MemoryScope, MemoryStoreClient, RecallOptions, @@ -22,6 +25,8 @@ const RECALL_OVERFETCH = 4; const FORGET_BATCH_SIZE = 500; const FORGET_MAX_BATCHES = 10000; const EVICTION_SCAN_LIMIT = 10000; +const CONSOLIDATE_SCAN_LIMIT = 10000; +const DEFAULT_SUMMARY_IMPORTANCE = 0.7; export interface MemoryStoreOptions { client: MemoryStoreClient; @@ -204,6 +209,77 @@ export class MemoryStore { return id; } + async consolidate(options: ConsolidateOptions): Promise { + const now = Date.now(); + const tags = options.tags ?? []; + const scope: MemoryScope = { + threadId: options.threadId, + agentId: options.agentId, + namespace: options.namespace, + }; + + const hasCriteria = + scope.threadId !== undefined || + scope.agentId !== undefined || + scope.namespace !== undefined || + tags.length > 0 || + options.olderThanSeconds !== undefined || + options.maxImportance !== undefined; + if (!hasCriteria) { + throw new Error( + 'consolidate requires a scope, tags, olderThanSeconds, or maxImportance to select candidates', + ); + } + + const raw = await this.client.call( + 'FT.SEARCH', + `${this.name}:mem:idx`, + buildScopeFilter(scope, tags), + 'RETURN', + '10', + 'content', + 'importance', + 'tags', + 'created_at', + 'last_accessed_at', + 'access_count', + 'source', + 'threadId', + 'agentId', + 'namespace', + 'LIMIT', + '0', + String(CONSOLIDATE_SCAN_LIMIT), + 'DIALECT', + '2', + ); + const candidates = parseFtSearchResponse(raw) + .map((hit) => parseMemoryItem(this.name, hit)) + .filter((item) => isConsolidationCandidate(item, options, now)); + + if (candidates.length === 0) { + return { consolidated: 0, created: [], deleted: 0 }; + } + + // Write the summary before deleting sources so a failure can never destroy + // memories without leaving their consolidated replacement behind. + const summary = await options.summarize(candidates); + const summaryId = await this.remember(summary, { + ...scope, + tags, + source: 'summary', + importance: options.summaryImportance ?? DEFAULT_SUMMARY_IMPORTANCE, + }); + + let deleted = 0; + if (options.deleteSources !== false) { + const keys = candidates.map((item) => `${this.name}:mem:${item.id}`); + deleted = Number(await this.client.call('DEL', ...keys)); + } + + return { consolidated: candidates.length, created: [summaryId], deleted }; + } + private async writeRecord(key: string, fields: (string | Buffer)[], ttl?: number): Promise { if (ttl === undefined || ttl <= 0) { await this.client.call('HSET', key, ...fields); @@ -323,3 +399,20 @@ function ftSearchTotal(raw: unknown): number { const total = typeof raw[0] === 'string' ? parseInt(raw[0], 10) : Number(raw[0]); return Number.isFinite(total) && total > 0 ? total : 0; } + +function isConsolidationCandidate( + item: MemoryItem, + options: ConsolidateOptions, + now: number, +): boolean { + if (options.maxImportance !== undefined && item.importance > options.maxImportance) { + return false; + } + if (options.olderThanSeconds !== undefined) { + const ageSeconds = (now - item.createdAt) / 1000; + if (ageSeconds < options.olderThanSeconds) { + return false; + } + } + return true; +} diff --git a/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts b/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts new file mode 100644 index 00000000..3ff763c7 --- /dev/null +++ b/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts @@ -0,0 +1,187 @@ +import { describe, it, expect, vi } from 'vitest'; +import { MemoryStore } from '../MemoryStore'; +import { fakeEmbed } from './helpers/fakeEmbed'; +import { mockClient } from './helpers/mockClient'; + +const now = Date.now(); + +interface HitSpec { + importance: number; + ageSeconds: number; + source?: string; +} + +function itemHit(id: string, spec: HitSpec): [string, string[]] { + const created = now - spec.ageSeconds * 1000; + const fields: Record = { + content: `c-${id}`, + importance: String(spec.importance), + created_at: String(created), + last_accessed_at: String(created), + access_count: '0', + }; + if (spec.source !== undefined) { + fields.source = spec.source; + } + const flat: string[] = []; + for (const [field, value] of Object.entries(fields)) { + flat.push(field, value); + } + return [`mem:mem:${id}`, flat]; +} + +function searchReply(hits: Array<[string, string[]]>): unknown[] { + const out: unknown[] = [String(hits.length)]; + for (const [key, flat] of hits) { + out.push(key, flat); + } + return out; +} + +function consolidatingClient(hits: Array<[string, string[]]>) { + return mockClient((command, ...args) => { + if (command === 'FT.SEARCH') { + return searchReply(hits); + } + if (command === 'DEL') { + return args.length; + } + return 'OK'; + }); +} + +function fieldValue(call: unknown[] | undefined, field: string): string | undefined { + if (!call) { + return undefined; + } + const idx = call.indexOf(field); + return idx >= 0 ? (call[idx + 1] as string) : undefined; +} + +describe('MemoryStore.consolidate', () => { + it('summarizes matching candidates, writes a summary, deletes sources, returns counts', async () => { + const summarize = vi.fn(async (items) => `summary of ${items.length}`); + const client = consolidatingClient([ + itemHit('a', { importance: 0.2, ageSeconds: 100000 }), + itemHit('b', { importance: 0.3, ageSeconds: 200000 }), + ]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + const result = await store.consolidate({ + namespace: 'u1', + olderThanSeconds: 3600, + maxImportance: 0.5, + summarize, + }); + + expect(summarize).toHaveBeenCalledTimes(1); + expect(summarize.mock.calls[0][0].map((i: { id: string }) => i.id)).toEqual(['a', 'b']); + expect(result.consolidated).toBe(2); + expect(result.created).toHaveLength(1); + expect(result.deleted).toBe(2); + + const hset = client.call.mock.calls.find((c) => c[0] === 'HSET'); + expect(fieldValue(hset, 'content')).toBe('summary of 2'); + expect(fieldValue(hset, 'source')).toBe('summary'); + expect(hset?.[1]).toBe(`mem:mem:${result.created[0]}`); + + const del = client.call.mock.calls.find((c) => c[0] === 'DEL'); + expect(del?.slice(1).sort()).toEqual(['mem:mem:a', 'mem:mem:b']); + }); + + it('selects only candidates older than olderThanSeconds', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([ + itemHit('old', { importance: 0.2, ageSeconds: 100000 }), + itemHit('recent', { importance: 0.2, ageSeconds: 10 }), + ]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + const result = await store.consolidate({ olderThanSeconds: 3600, summarize }); + + expect(summarize.mock.calls[0][0].map((i: { id: string }) => i.id)).toEqual(['old']); + expect(result.consolidated).toBe(1); + const del = client.call.mock.calls.find((c) => c[0] === 'DEL'); + expect(del?.slice(1)).toEqual(['mem:mem:old']); + }); + + it('selects only candidates at or below maxImportance', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([ + itemHit('low', { importance: 0.2, ageSeconds: 100000 }), + itemHit('high', { importance: 0.9, ageSeconds: 100000 }), + ]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + const result = await store.consolidate({ maxImportance: 0.5, summarize }); + + expect(summarize.mock.calls[0][0].map((i: { id: string }) => i.id)).toEqual(['low']); + expect(result.consolidated).toBe(1); + }); + + it('writes the summary scoped to the request at summaryImportance', async () => { + const summarize = vi.fn(async () => 'merged'); + const client = consolidatingClient([itemHit('a', { importance: 0.1, ageSeconds: 100000 })]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + await store.consolidate({ namespace: 'u1', summarize, summaryImportance: 0.9 }); + + const hset = client.call.mock.calls.find((c) => c[0] === 'HSET'); + expect(fieldValue(hset, 'importance')).toBe('0.9'); + expect(fieldValue(hset, 'namespace')).toBe('u1'); + expect(fieldValue(hset, 'source')).toBe('summary'); + }); + + it('keeps sources when deleteSources is false', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([ + itemHit('a', { importance: 0.2, ageSeconds: 100000 }), + itemHit('b', { importance: 0.2, ageSeconds: 100000 }), + ]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + const result = await store.consolidate({ + summarize, + deleteSources: false, + olderThanSeconds: 3600, + }); + + expect(result.consolidated).toBe(2); + expect(result.created).toHaveLength(1); + expect(result.deleted).toBe(0); + expect(client.call.mock.calls.some((c) => c[0] === 'DEL')).toBe(false); + }); + + it('returns zeros and does not summarize or write when nothing matches', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + const result = await store.consolidate({ olderThanSeconds: 3600, summarize }); + + expect(summarize).not.toHaveBeenCalled(); + expect(result).toEqual({ consolidated: 0, created: [], deleted: 0 }); + expect(client.call.mock.calls.some((c) => c[0] === 'HSET')).toBe(false); + expect(client.call.mock.calls.some((c) => c[0] === 'DEL')).toBe(false); + }); + + it('throws when given no scope, tags, or selection criteria (prevents whole-store consolidation)', async () => { + const summarize = vi.fn(async () => 'summary'); + const store = new MemoryStore({ client: mockClient(), name: 'mem', embedFn: fakeEmbed(8) }); + + await expect(store.consolidate({ summarize })).rejects.toThrow(/scope|criteria/i); + expect(summarize).not.toHaveBeenCalled(); + }); + + it('defaults summaryImportance to 0.7 and deletes sources by default', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([itemHit('a', { importance: 0.2, ageSeconds: 100000 })]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + const result = await store.consolidate({ summarize, olderThanSeconds: 3600 }); + + const hset = client.call.mock.calls.find((c) => c[0] === 'HSET'); + expect(fieldValue(hset, 'importance')).toBe('0.7'); + expect(result.deleted).toBe(1); + }); +}); diff --git a/packages/agent-memory/src/index.ts b/packages/agent-memory/src/index.ts index d8e7422c..bf1a904a 100644 --- a/packages/agent-memory/src/index.ts +++ b/packages/agent-memory/src/index.ts @@ -10,6 +10,8 @@ export type { MemoryItem, RecallOptions, MemoryHit, + ConsolidateOptions, + ConsolidateResult, } from './types'; export { compositeScore, similarityFromDistance } from './compositeScore'; export type { RecallWeights, CompositeScoreParams } from './compositeScore'; diff --git a/packages/agent-memory/src/types.ts b/packages/agent-memory/src/types.ts index 6577bd4a..d28b477a 100644 --- a/packages/agent-memory/src/types.ts +++ b/packages/agent-memory/src/types.ts @@ -50,3 +50,18 @@ export interface MemoryHit { /** Composite recall score (similarity + recency + importance); higher is better. */ score: number; } + +export interface ConsolidateOptions extends MemoryScope { + olderThanSeconds?: number; + maxImportance?: number; + summarize: (items: MemoryItem[]) => Promise; + deleteSources?: boolean; + summaryImportance?: number; + tags?: string[]; +} + +export interface ConsolidateResult { + consolidated: number; + created: string[]; + deleted: number; +} From 1b71d3e7e4eb37d90846159b93bfdcf60fb0c696 Mon Sep 17 00:00:00 2001 From: jamby77 Date: Fri, 19 Jun 2026 12:31:04 +0300 Subject: [PATCH 2/3] fix(agent-memory): push consolidate predicates into the query and skip summaries - olderThanSeconds/maxImportance are now NUMERIC range clauses in the FT.SEARCH filter, so the scan limit applies to actual matches instead of an arbitrary first window, and content isn't transferred for discarded rows - the candidate scan always excludes @source:{summary}, so a run never re-folds prior summaries into a new summary - extract buildConsolidateFilter (shares scope-clause logic with buildScopeFilter) --- packages/agent-memory/src/MemoryStore.ts | 46 +++++++++---------- .../__tests__/MemoryStore.consolidate.test.ts | 42 +++++++++-------- .../src/__tests__/buildRecallQuery.test.ts | 24 +++++++++- packages/agent-memory/src/buildRecallQuery.ts | 34 +++++++++++++- 4 files changed, 103 insertions(+), 43 deletions(-) diff --git a/packages/agent-memory/src/MemoryStore.ts b/packages/agent-memory/src/MemoryStore.ts index 7be123b5..0fae5693 100644 --- a/packages/agent-memory/src/MemoryStore.ts +++ b/packages/agent-memory/src/MemoryStore.ts @@ -1,7 +1,12 @@ import { randomUUID } from 'node:crypto'; import { encodeFloat32, parseFtSearchResponse } from '@betterdb/valkey-search-kit'; import { buildMemoryRecord } from './buildMemoryRecord'; -import { buildRecallQuery, buildScopeFilter, SCORE_FIELD } from './buildRecallQuery'; +import { + buildConsolidateFilter, + buildRecallQuery, + buildScopeFilter, + SCORE_FIELD, +} from './buildRecallQuery'; import { parseMemoryItem } from './parseMemoryItem'; import { compositeScore, similarityFromDistance, type RecallWeights } from './compositeScore'; import { selectEvictions, type EvictionCandidate } from './selectEvictions'; @@ -10,7 +15,6 @@ import type { ConsolidateResult, EmbedFn, MemoryHit, - MemoryItem, MemoryScope, MemoryStoreClient, RecallOptions, @@ -27,6 +31,7 @@ const FORGET_MAX_BATCHES = 10000; const EVICTION_SCAN_LIMIT = 10000; const CONSOLIDATE_SCAN_LIMIT = 10000; const DEFAULT_SUMMARY_IMPORTANCE = 0.7; +const SUMMARY_SOURCE = 'summary'; export interface MemoryStoreOptions { client: MemoryStoreClient; @@ -231,10 +236,23 @@ export class MemoryStore { ); } + // Push olderThanSeconds/maxImportance into the query (both are NUMERIC + // indexed) so the scan limit applies to actual matches, not an arbitrary + // first window, and we don't transfer rows we'd only discard. Prior + // summaries are always excluded (-@source:{summary}) so consolidation never + // re-folds its own output into a new summary. + const filter = buildConsolidateFilter(scope, tags, { + maxCreatedAt: + options.olderThanSeconds !== undefined + ? now - options.olderThanSeconds * 1000 + : undefined, + maxImportance: options.maxImportance, + excludeSource: SUMMARY_SOURCE, + }); const raw = await this.client.call( 'FT.SEARCH', `${this.name}:mem:idx`, - buildScopeFilter(scope, tags), + filter, 'RETURN', '10', 'content', @@ -253,9 +271,7 @@ export class MemoryStore { 'DIALECT', '2', ); - const candidates = parseFtSearchResponse(raw) - .map((hit) => parseMemoryItem(this.name, hit)) - .filter((item) => isConsolidationCandidate(item, options, now)); + const candidates = parseFtSearchResponse(raw).map((hit) => parseMemoryItem(this.name, hit)); if (candidates.length === 0) { return { consolidated: 0, created: [], deleted: 0 }; @@ -267,7 +283,7 @@ export class MemoryStore { const summaryId = await this.remember(summary, { ...scope, tags, - source: 'summary', + source: SUMMARY_SOURCE, importance: options.summaryImportance ?? DEFAULT_SUMMARY_IMPORTANCE, }); @@ -400,19 +416,3 @@ function ftSearchTotal(raw: unknown): number { return Number.isFinite(total) && total > 0 ? total : 0; } -function isConsolidationCandidate( - item: MemoryItem, - options: ConsolidateOptions, - now: number, -): boolean { - if (options.maxImportance !== undefined && item.importance > options.maxImportance) { - return false; - } - if (options.olderThanSeconds !== undefined) { - const ageSeconds = (now - item.createdAt) / 1000; - if (ageSeconds < options.olderThanSeconds) { - return false; - } - } - return true; -} diff --git a/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts b/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts index 3ff763c7..0888389c 100644 --- a/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts +++ b/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts @@ -89,34 +89,40 @@ describe('MemoryStore.consolidate', () => { expect(del?.slice(1).sort()).toEqual(['mem:mem:a', 'mem:mem:b']); }); - it('selects only candidates older than olderThanSeconds', async () => { + it('pushes olderThanSeconds into the query as a created_at upper bound', async () => { const summarize = vi.fn(async () => 'summary'); - const client = consolidatingClient([ - itemHit('old', { importance: 0.2, ageSeconds: 100000 }), - itemHit('recent', { importance: 0.2, ageSeconds: 10 }), - ]); + const client = consolidatingClient([itemHit('a', { importance: 0.2, ageSeconds: 100000 })]); const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); - const result = await store.consolidate({ olderThanSeconds: 3600, summarize }); + await store.consolidate({ olderThanSeconds: 3600, summarize }); - expect(summarize.mock.calls[0][0].map((i: { id: string }) => i.id)).toEqual(['old']); - expect(result.consolidated).toBe(1); - const del = client.call.mock.calls.find((c) => c[0] === 'DEL'); - expect(del?.slice(1)).toEqual(['mem:mem:old']); + const search = client.call.mock.calls.find((c) => c[0] === 'FT.SEARCH'); + const filter = search?.[2] as string; + // Server-side range so the scan limit applies to actual matches, not an + // arbitrary first window. + expect(filter).toMatch(/@created_at:\[-inf \d+\]/); }); - it('selects only candidates at or below maxImportance', async () => { + it('pushes maxImportance into the query as an importance upper bound', async () => { const summarize = vi.fn(async () => 'summary'); - const client = consolidatingClient([ - itemHit('low', { importance: 0.2, ageSeconds: 100000 }), - itemHit('high', { importance: 0.9, ageSeconds: 100000 }), - ]); + const client = consolidatingClient([itemHit('a', { importance: 0.2, ageSeconds: 100000 })]); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + await store.consolidate({ maxImportance: 0.5, summarize }); + + const search = client.call.mock.calls.find((c) => c[0] === 'FT.SEARCH'); + expect(search?.[2] as string).toContain('@importance:[-inf 0.5]'); + }); + + it('excludes prior summaries from the candidate scan so a default run does not re-fold them', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([itemHit('a', { importance: 0.2, ageSeconds: 100000 })]); const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); - const result = await store.consolidate({ maxImportance: 0.5, summarize }); + await store.consolidate({ namespace: 'u1', summarize }); - expect(summarize.mock.calls[0][0].map((i: { id: string }) => i.id)).toEqual(['low']); - expect(result.consolidated).toBe(1); + const search = client.call.mock.calls.find((c) => c[0] === 'FT.SEARCH'); + expect(search?.[2] as string).toContain('-@source:{summary}'); }); it('writes the summary scoped to the request at summaryImportance', async () => { diff --git a/packages/agent-memory/src/__tests__/buildRecallQuery.test.ts b/packages/agent-memory/src/__tests__/buildRecallQuery.test.ts index 2d771f6e..4f9a4359 100644 --- a/packages/agent-memory/src/__tests__/buildRecallQuery.test.ts +++ b/packages/agent-memory/src/__tests__/buildRecallQuery.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from 'vitest'; -import { buildRecallQuery } from '../buildRecallQuery'; +import { buildRecallQuery, buildConsolidateFilter } from '../buildRecallQuery'; describe('buildRecallQuery', () => { it('builds a bare KNN query when there are no filters', () => { @@ -18,3 +18,25 @@ describe('buildRecallQuery', () => { ); }); }); + +describe('buildConsolidateFilter', () => { + it('appends inclusive NUMERIC ranges and a source exclusion to the scope filter', () => { + expect( + buildConsolidateFilter({ namespace: 'u1' }, ['pref'], { + maxCreatedAt: 1000, + maxImportance: 0.5, + excludeSource: 'summary', + }), + ).toBe('(@namespace:{u1} @tags:{pref} @created_at:[-inf 1000] @importance:[-inf 0.5] -@source:{summary})'); + }); + + it('omits absent predicates', () => { + expect(buildConsolidateFilter({ threadId: 't' }, [], { excludeSource: 'summary' })).toBe( + '(@threadId:{t} -@source:{summary})', + ); + }); + + it('still constrains by range when there is no scope', () => { + expect(buildConsolidateFilter({}, [], { maxImportance: 0.3 })).toBe('(@importance:[-inf 0.3])'); + }); +}); diff --git a/packages/agent-memory/src/buildRecallQuery.ts b/packages/agent-memory/src/buildRecallQuery.ts index 4e0c44d3..1bc6e7fc 100644 --- a/packages/agent-memory/src/buildRecallQuery.ts +++ b/packages/agent-memory/src/buildRecallQuery.ts @@ -4,7 +4,7 @@ import type { MemoryScope } from './types'; export const SCORE_FIELD = '__score'; export const VECTOR_FIELD = 'vector'; -export function buildScopeFilter(scope: MemoryScope, tags: string[]): string { +function scopeClauses(scope: MemoryScope, tags: string[]): string[] { const clauses: string[] = []; if (scope.threadId !== undefined) { clauses.push(`@threadId:{${escapeTag(scope.threadId)}}`); @@ -18,12 +18,44 @@ export function buildScopeFilter(scope: MemoryScope, tags: string[]): string { for (const tag of tags) { clauses.push(`@tags:{${escapeTag(tag)}}`); } + return clauses; +} + +function joinClauses(clauses: string[]): string { if (clauses.length === 0) { return '*'; } return `(${clauses.join(' ')})`; } +export function buildScopeFilter(scope: MemoryScope, tags: string[]): string { + return joinClauses(scopeClauses(scope, tags)); +} + +export interface ConsolidateFilterOptions { + maxCreatedAt?: number; + maxImportance?: number; + excludeSource?: string; +} + +export function buildConsolidateFilter( + scope: MemoryScope, + tags: string[], + options: ConsolidateFilterOptions, +): string { + const clauses = scopeClauses(scope, tags); + if (options.maxCreatedAt !== undefined) { + clauses.push(`@created_at:[-inf ${options.maxCreatedAt}]`); + } + if (options.maxImportance !== undefined) { + clauses.push(`@importance:[-inf ${options.maxImportance}]`); + } + if (options.excludeSource !== undefined) { + clauses.push(`-@source:{${escapeTag(options.excludeSource)}}`); + } + return joinClauses(clauses); +} + export function buildRecallQuery(k: number, scope: MemoryScope, tags: string[]): string { return `${buildScopeFilter(scope, tags)}=>[KNN ${k} @${VECTOR_FIELD} $vec AS ${SCORE_FIELD}]`; } From 384e10b7e1c7ab9cb6787d2285a62b9088382e95 Mon Sep 17 00:00:00 2001 From: jamby77 Date: Fri, 19 Jun 2026 16:31:50 +0300 Subject: [PATCH 3/3] fix(agent-memory): write the consolidate summary without a capacity pass MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With maxItemsPerScope set, consolidate wrote the summary via remember(), whose enforceCapacity could evict that very summary while the sources still inflated the scope — then the sources were deleted, losing the consolidated content entirely. Write the summary via a capacity-free path; consolidation is a net reduction, so the cap stays honored without a pass. --- packages/agent-memory/src/MemoryStore.ts | 35 ++++++++++++++----- .../__tests__/MemoryStore.consolidate.test.ts | 24 +++++++++++++ 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/packages/agent-memory/src/MemoryStore.ts b/packages/agent-memory/src/MemoryStore.ts index 0fae5693..0fd47ad9 100644 --- a/packages/agent-memory/src/MemoryStore.ts +++ b/packages/agent-memory/src/MemoryStore.ts @@ -202,12 +202,21 @@ export class MemoryStore { return deleted; } - async remember(content: string, options: RememberOptions = {}): Promise { + private async writeMemory( + content: string, + options: RememberOptions, + now: number, + ): Promise { const vector = await this.embed(content); const id = randomUUID(); - const now = Date.now(); const record = buildMemoryRecord(this.name, id, content, vector, options, now); await this.writeRecord(record.key, record.fields, options.ttl); + return id; + } + + async remember(content: string, options: RememberOptions = {}): Promise { + const now = Date.now(); + const id = await this.writeMemory(content, options, now); // Capacity enforcement is best-effort: the memory is already durably stored, // so a failed eviction pass must not reject an otherwise successful write. await this.enforceCapacity(options, now).catch(() => undefined); @@ -278,14 +287,22 @@ export class MemoryStore { } // Write the summary before deleting sources so a failure can never destroy - // memories without leaving their consolidated replacement behind. + // memories without leaving their consolidated replacement behind. Use the + // capacity-free write path: consolidation is a net reduction (N sources -> 1 + // summary), and the sources still inflate the scope here, so an enforceCapacity + // pass could otherwise evict the summary we just wrote and then delete the + // sources — losing the content entirely. const summary = await options.summarize(candidates); - const summaryId = await this.remember(summary, { - ...scope, - tags, - source: SUMMARY_SOURCE, - importance: options.summaryImportance ?? DEFAULT_SUMMARY_IMPORTANCE, - }); + const summaryId = await this.writeMemory( + summary, + { + ...scope, + tags, + source: SUMMARY_SOURCE, + importance: options.summaryImportance ?? DEFAULT_SUMMARY_IMPORTANCE, + }, + now, + ); let deleted = 0; if (options.deleteSources !== false) { diff --git a/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts b/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts index 0888389c..539d0b25 100644 --- a/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts +++ b/packages/agent-memory/src/__tests__/MemoryStore.consolidate.test.ts @@ -190,4 +190,28 @@ describe('MemoryStore.consolidate', () => { expect(fieldValue(hset, 'importance')).toBe('0.7'); expect(result.deleted).toBe(1); }); + + it('writes the summary without a capacity pass so it cannot be evicted then orphaned', async () => { + const summarize = vi.fn(async () => 'summary'); + const client = consolidatingClient([ + itemHit('a', { importance: 0.2, ageSeconds: 100000 }), + itemHit('b', { importance: 0.2, ageSeconds: 100000 }), + ]); + const store = new MemoryStore({ + client, + name: 'mem', + embedFn: fakeEmbed(8), + maxItemsPerScope: 1, + }); + + const result = await store.consolidate({ namespace: 'u1', summarize }); + + expect(result.created).toHaveLength(1); + expect(client.call.mock.calls.some((c) => c[0] === 'HSET')).toBe(true); + // Exactly one FT.SEARCH (the candidate scan): the summary write triggers no + // capacity probe/scan, so enforceCapacity can't evict the just-written summary + // while the sources still inflate the count. + const searches = client.call.mock.calls.filter((c) => c[0] === 'FT.SEARCH'); + expect(searches).toHaveLength(1); + }); });