Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 116 additions & 1 deletion packages/agent-memory/src/MemoryStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { buildMemoryRecord } from './buildMemoryRecord';
import { buildRecallQuery, buildScopeFilter, SCORE_FIELD } from './buildRecallQuery';
import { parseMemoryItem } from './parseMemoryItem';
import { compositeScore, similarityFromDistance, type RecallWeights } from './compositeScore';
import { selectEvictions, type EvictionCandidate } from './selectEvictions';
import type {
EmbedFn,
MemoryHit,
Expand All @@ -20,6 +21,7 @@ const DEFAULT_RECALL_K = 8;
const RECALL_OVERFETCH = 4;
const FORGET_BATCH_SIZE = 500;
const FORGET_MAX_BATCHES = 10000;
const EVICTION_SCAN_LIMIT = 10000;

export interface MemoryStoreOptions {
client: MemoryStoreClient;
Expand All @@ -28,6 +30,7 @@ export interface MemoryStoreOptions {
defaultThreshold?: number;
weights?: RecallWeights;
halfLifeSeconds?: number;
maxItemsPerScope?: number;
}

export class MemoryStore {
Expand All @@ -37,6 +40,7 @@ export class MemoryStore {
private readonly defaultThreshold: number;
private readonly weights: RecallWeights;
private readonly halfLifeSeconds: number;
private readonly maxItemsPerScope?: number;
private dims?: number;

constructor(options: MemoryStoreOptions) {
Expand All @@ -46,6 +50,7 @@ export class MemoryStore {
this.defaultThreshold = options.defaultThreshold ?? DEFAULT_THRESHOLD;
this.weights = options.weights ?? DEFAULT_WEIGHTS;
this.halfLifeSeconds = options.halfLifeSeconds ?? DEFAULT_HALF_LIFE_SECONDS;
this.maxItemsPerScope = options.maxItemsPerScope;
}

async recall(query: string, options: RecallOptions = {}): Promise<MemoryHit[]> {
Expand Down Expand Up @@ -192,10 +197,112 @@ export class MemoryStore {
const id = randomUUID();
const now = Date.now();
const record = buildMemoryRecord(this.name, id, content, vector, options, now);
await this.client.call('HSET', record.key, ...record.fields);
await this.writeRecord(record.key, record.fields, options.ttl);
// Capacity enforcement is best-effort: the memory is already durably stored,
// so a failed eviction pass must not reject an otherwise successful write.
await this.enforceCapacity(options, now).catch(() => undefined);
return id;
}

private async writeRecord(key: string, fields: (string | Buffer)[], ttl?: number): Promise<void> {
if (ttl === undefined || ttl <= 0) {
await this.client.call('HSET', key, ...fields);
return;
}
// Set the hash and its expiry in one transaction so a crash between the two
// can't leave a memory that should expire living forever. Atomicity assumes
// the client routes these calls to a single connection (the MemoryStoreClient
// contract); on a pooled client that splits them the guarantee is lost.
await this.client.call('MULTI');
try {
await this.client.call('HSET', key, ...fields);
await this.client.call('EXPIRE', key, String(ttl));
await this.client.call('EXEC');
} catch (err) {
// Clear the half-built transaction so the connection isn't left mid-MULTI.
await this.client.call('DISCARD').catch(() => undefined);
throw err;
}
}

private async enforceCapacity(scope: MemoryScope & { tags?: string[] }, now: number): Promise<void> {
const max = this.maxItemsPerScope;
if (max === undefined) {
return;
}
// Tags are part of the partition (as in recall/forgetByScope), so a
// tag-scoped write caps its own tag bucket.
const filter = buildScopeFilter(scope, scope.tags ?? []);
if (filter === '*') {
// A fully-unscoped write has no scope to bound: enforcing here would count
// and evict across the entire index (every other scope's memories), which
// `maxItemsPerScope` does not promise. Skip — the write stays, uncapped.
return;
}
// Count-first so the common in-capacity write pays only a cheap LIMIT 0 0
// probe and never fetches candidate rows. Both the count and the candidate
// scan go through FT.SEARCH, so under HNSW index lag the cap is enforced
// approximately and up to one write behind (the unit tests mock this exact).
const countRaw = await this.client.call(
'FT.SEARCH',
`${this.name}:mem:idx`,
filter,
'LIMIT',
'0',
'0',
'DIALECT',
'2',
);
const total = ftSearchTotal(countRaw);
if (total <= max) {
return;
}

// Eviction selection is exact while the scope fits EVICTION_SCAN_LIMIT (the
// expected case); a larger scope evicts from the scanned window and the
// remainder is reclaimed on subsequent writes.

const raw = await this.client.call(
'FT.SEARCH',
`${this.name}:mem:idx`,
filter,
'RETURN',
'2',
'importance',
'last_accessed_at',
'LIMIT',
'0',
String(EVICTION_SCAN_LIMIT),
'DIALECT',
'2',
);
const candidates: EvictionCandidate[] = parseFtSearchResponse(raw).map((hit) => {
const importance = Number(hit.fields.importance);
const lastAccessedAt = Number(hit.fields.last_accessed_at);
return {
key: hit.key,
importance: Number.isFinite(importance) ? importance : 0,
lastAccessedAt: Number.isFinite(lastAccessedAt) ? lastAccessedAt : 0,
};
});
const dropCount = Math.min(total - max, candidates.length);
const evictKeys = selectEvictions(candidates, candidates.length - dropCount, {
now,
halfLifeSeconds: this.halfLifeSeconds,
weights: this.weights,
});
if (evictKeys.length === 0) {
return;
}
await this.client.call('DEL', ...evictKeys);
Comment thread
cursor[bot] marked this conversation as resolved.
await this.client.call(
'HINCRBY',
`${this.name}:__mem_stats`,
'evictions',
String(evictKeys.length),
);
}

private async embed(content: string): Promise<number[]> {
const vector = await this.embedFn(content);
if (this.dims === undefined) {
Expand All @@ -208,3 +315,11 @@ export class MemoryStore {
return vector;
}
}

function ftSearchTotal(raw: unknown): number {
if (!Array.isArray(raw) || raw.length < 1) {
return 0;
}
const total = typeof raw[0] === 'string' ? parseInt(raw[0], 10) : Number(raw[0]);
return Number.isFinite(total) && total > 0 ? total : 0;
}
198 changes: 198 additions & 0 deletions packages/agent-memory/src/__tests__/MemoryStore.eviction.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
import { describe, it, expect } from 'vitest';
import { MemoryStore } from '../MemoryStore';
import { fakeEmbed } from './helpers/fakeEmbed';
import { mockClient } from './helpers/mockClient';

function fields(importance: number, lastAccessedAt: number): Record<string, string> {
return { importance: String(importance), last_accessed_at: String(lastAccessedAt) };
}

function searchReply(total: number, hits: Array<[string, Record<string, string>]> = []): unknown[] {
const out: unknown[] = [String(total)];
for (const [key, fieldMap] of hits) {
const flat: string[] = [];
for (const [field, value] of Object.entries(fieldMap)) {
flat.push(field, value);
}
out.push(key, flat);
}
return out;
}

describe('MemoryStore TTL writes', () => {
it('writes a durable memory with a plain HSET when no ttl is given', async () => {
const client = mockClient(() => 'OK');
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

await store.remember('durable');

const commands = client.call.mock.calls.map((c) => c[0]);
expect(commands).toContain('HSET');
expect(commands).not.toContain('EXPIRE');
expect(commands).not.toContain('MULTI');
});

it('writes an expiring memory atomically when ttl is set', async () => {
const client = mockClient(() => 'OK');
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

await store.remember('temporary', { ttl: 3600 });

const commands = client.call.mock.calls.map((c) => c[0]);
expect(commands).toEqual(['MULTI', 'HSET', 'EXPIRE', 'EXEC']);
const hset = client.call.mock.calls.find((c) => c[0] === 'HSET');
const expire = client.call.mock.calls.find((c) => c[0] === 'EXPIRE');
expect(expire?.[1]).toBe(hset?.[1]);
expect(expire?.[2]).toBe('3600');
});

it('treats a non-positive ttl as durable (no EXPIRE)', async () => {
const client = mockClient(() => 'OK');
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

await store.remember('x', { ttl: 0 });

const commands = client.call.mock.calls.map((c) => c[0]);
expect(commands).toContain('HSET');
expect(commands).not.toContain('EXPIRE');
});

it('DISCARDs and propagates when a ttl write fails mid-transaction', async () => {
const client = mockClient((command) => {
if (command === 'EXPIRE') {
throw new Error('boom');
}
return 'OK';
});
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

await expect(store.remember('x', { ttl: 60 })).rejects.toThrow(/boom/);
expect(client.call.mock.calls.some((c) => c[0] === 'DISCARD')).toBe(true);
});
});

describe('MemoryStore capacity eviction', () => {
it('evicts the lowest-ranked item and bumps the eviction counter when over capacity', async () => {
const client = mockClient((command, ...args) => {
if (command === 'FT.SEARCH') {
if (args.includes('RETURN')) {
return searchReply(3, [
['mem:mem:a', fields(0.1, 1000)],
['mem:mem:b', fields(0.9, 5000)],
['mem:mem:c', fields(0.5, 9000)],
]);
}
return searchReply(3);
}
return 'OK';
});
const store = new MemoryStore({
client,
name: 'mem',
embedFn: fakeEmbed(8),
maxItemsPerScope: 2,
});

await store.remember('content', { namespace: 'u1' });

const del = client.call.mock.calls.find((c) => c[0] === 'DEL');
expect(del).toEqual(['DEL', 'mem:mem:a']);
const hincr = client.call.mock.calls.find(
(c) => c[0] === 'HINCRBY' && c[1] === 'mem:__mem_stats',
);
expect(hincr).toEqual(['HINCRBY', 'mem:__mem_stats', 'evictions', '1']);
});

it('queries capacity by the written item scope', async () => {
const client = mockClient((command) => (command === 'FT.SEARCH' ? searchReply(2) : 'OK'));
const store = new MemoryStore({
client,
name: 'mem',
embedFn: fakeEmbed(8),
maxItemsPerScope: 2,
});

await store.remember('content', { namespace: 'u1' });

const search = client.call.mock.calls.find((c) => c[0] === 'FT.SEARCH');
expect(search?.[1]).toBe('mem:mem:idx');
expect(search?.[2]).toBe('(@namespace:{u1})');
});

it('partitions capacity by tags so a tag-scoped write does not cap the whole index', async () => {
const client = mockClient((command) => (command === 'FT.SEARCH' ? searchReply(2) : 'OK'));
const store = new MemoryStore({
client,
name: 'mem',
embedFn: fakeEmbed(8),
maxItemsPerScope: 2,
});

await store.remember('content', { tags: ['teamx'] });

const search = client.call.mock.calls.find((c) => c[0] === 'FT.SEARCH');
expect(search?.[2]).toBe('(@tags:{teamx})');
expect(search?.[2]).not.toBe('*');
});

it('does not evict or fetch candidates when within capacity', async () => {
const client = mockClient((command) => (command === 'FT.SEARCH' ? searchReply(2) : 'OK'));
const store = new MemoryStore({
client,
name: 'mem',
embedFn: fakeEmbed(8),
maxItemsPerScope: 2,
});

await store.remember('content', { namespace: 'u1' });

const searches = client.call.mock.calls.filter((c) => c[0] === 'FT.SEARCH');
expect(searches).toHaveLength(1);
const commands = client.call.mock.calls.map((c) => c[0]);
expect(commands).not.toContain('DEL');
expect(commands).not.toContain('HINCRBY');
});

it('performs no capacity check when maxItemsPerScope is not configured', async () => {
const client = mockClient(() => 'OK');
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

await store.remember('content', { namespace: 'u1' });

expect(client.call.mock.calls.some((c) => c[0] === 'FT.SEARCH')).toBe(false);
});

it('skips capacity enforcement for a fully-unscoped write (no global eviction)', async () => {
const client = mockClient(() => 'OK');
const store = new MemoryStore({
client,
name: 'mem',
embedFn: fakeEmbed(8),
maxItemsPerScope: 1,
});

await store.remember('content');

expect(client.call.mock.calls.some((c) => c[0] === 'FT.SEARCH')).toBe(false);
});

it('never lets a capacity-enforcement failure break the write', async () => {
const client = mockClient((command) => {
if (command === 'FT.SEARCH') {
throw new Error('search boom');
}
return 'OK';
});
const store = new MemoryStore({
client,
name: 'mem',
embedFn: fakeEmbed(8),
maxItemsPerScope: 2,
});

const id = await store.remember('content', { namespace: 'u1' });

expect(typeof id).toBe('string');
expect(id.length).toBeGreaterThan(0);
});
});
Loading
Loading