Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/agent-memory/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
},
"dependencies": {
"@betterdb/agent-cache": "workspace:*",
"@betterdb/valkey-search-kit": "workspace:*"
"@betterdb/valkey-search-kit": "workspace:*",
"@opentelemetry/api": "^1.9.0",
"prom-client": "^15.1.3"
},
"devDependencies": {
"@opentelemetry/sdk-trace-base": "^1.30.1",
"@types/node": "^22.19.15",
"typescript": "^5.9.3",
"vitest": "^4.1.1"
Expand Down
235 changes: 157 additions & 78 deletions packages/agent-memory/src/MemoryStore.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { randomUUID } from 'node:crypto';
import { SpanStatusCode, type Span } from '@opentelemetry/api';
import { encodeFloat32, parseFtSearchResponse } from '@betterdb/valkey-search-kit';
import { buildMemoryRecord } from './buildMemoryRecord';
import {
Expand All @@ -11,6 +12,11 @@ import { parseMemoryItem } from './parseMemoryItem';
import { compositeScore, similarityFromDistance, type RecallWeights } from './compositeScore';
import { selectEvictions, type EvictionCandidate } from './selectEvictions';
import { MemoryDiscovery } from './discovery';
import {
createMemoryTelemetry,
type MemoryTelemetry,
type MemoryTelemetryOptions,
} from './telemetry';
import type {
ConsolidateOptions,
ConsolidateResult,
Expand All @@ -33,6 +39,7 @@ const EVICTION_SCAN_LIMIT = 10000;
const CONSOLIDATE_SCAN_LIMIT = 10000;
const DEFAULT_SUMMARY_IMPORTANCE = 0.7;
const SUMMARY_SOURCE = 'summary';
const DEFAULT_IMPORTANCE = 0.5;
const DEFAULT_CONFIG_REFRESH_MS = 30000;
const MIN_CONFIG_REFRESH_MS = 1000;
const MAX_DISTANCE = 2;
Expand Down Expand Up @@ -70,6 +77,7 @@ export interface MemoryStoreOptions {
maxItemsPerScope?: number;
discovery?: boolean | MemoryDiscoveryConfig;
configRefresh?: boolean | MemoryConfigRefreshConfig;
telemetry?: MemoryTelemetryOptions;
}

export class MemoryStore {
Expand All @@ -88,12 +96,16 @@ export class MemoryStore {
private configRefreshHandle: ReturnType<typeof setInterval> | null = null;
private readonly discovery: MemoryDiscovery | null;
private discoveryReady: Promise<void> | null = null;
private readonly telemetry: MemoryTelemetry;
private readonly storeLabels: Record<string, string>;
private dims?: number;

constructor(options: MemoryStoreOptions) {
this.client = options.client;
this.name = options.name;
this.embedFn = options.embedFn;
this.telemetry = createMemoryTelemetry(options.telemetry);
this.storeLabels = { store_name: this.name };
this.initialThreshold = options.defaultThreshold ?? DEFAULT_THRESHOLD;
this.initialWeights = { ...(options.weights ?? DEFAULT_WEIGHTS) };
this.initialHalfLifeSeconds = options.halfLifeSeconds ?? DEFAULT_HALF_LIFE_SECONDS;
Expand Down Expand Up @@ -245,75 +257,108 @@ export class MemoryStore {
}

async recall(query: string, options: RecallOptions = {}): Promise<MemoryHit[]> {
const k = options.k ?? DEFAULT_RECALL_K;
const threshold = options.threshold ?? this.defaultThreshold;
const weights = options.weights ?? this.weights;
// Snapshot the half-life alongside threshold/weights so a concurrent
// configRefresh can't score one recall with a mix of config versions.
const halfLifeSeconds = this.halfLifeSeconds;
const fetchK = k * RECALL_OVERFETCH;
const tags = options.tags ?? [];
const scope = {
threadId: options.threadId,
agentId: options.agentId,
namespace: options.namespace,
};
return this.traced('recall', async (span) => {
const startedAt = Date.now();
const k = options.k ?? DEFAULT_RECALL_K;
const threshold = options.threshold ?? this.defaultThreshold;
const weights = options.weights ?? this.weights;
// Snapshot the half-life alongside threshold/weights so a concurrent
// configRefresh can't score one recall with a mix of config versions.
const halfLifeSeconds = this.halfLifeSeconds;
const fetchK = k * RECALL_OVERFETCH;
const tags = options.tags ?? [];
const scope = {
threadId: options.threadId,
agentId: options.agentId,
namespace: options.namespace,
};
span.setAttribute('recall.k', k);

const vector = await this.embed(query);
const queryString = buildRecallQuery(fetchK, scope, tags);
const raw = await this.client.call(
'FT.SEARCH',
`${this.name}:mem:idx`,
queryString,
'PARAMS',
'2',
'vec',
encodeFloat32(vector),
'LIMIT',
'0',
String(fetchK),
'DIALECT',
'2',
);
const vector = await this.embed(query);
const queryString = buildRecallQuery(fetchK, scope, tags);
const raw = await this.client.call(
'FT.SEARCH',
`${this.name}:mem:idx`,
queryString,
'PARAMS',
'2',
'vec',
encodeFloat32(vector),
'LIMIT',
'0',
String(fetchK),
'DIALECT',
'2',
);

const now = Date.now();
const hits: MemoryHit[] = [];
for (const hit of parseFtSearchResponse(raw)) {
const rawScore = hit.fields[SCORE_FIELD];
if (rawScore === undefined || rawScore.trim() === '') {
continue;
}
const distance = Number(rawScore);
if (!Number.isFinite(distance) || distance > threshold) {
continue;
const now = Date.now();
const hits: MemoryHit[] = [];
for (const hit of parseFtSearchResponse(raw)) {
const rawScore = hit.fields[SCORE_FIELD];
if (rawScore === undefined || rawScore.trim() === '') {
continue;
}
const distance = Number(rawScore);
if (!Number.isFinite(distance) || distance > threshold) {
continue;
}
const item = parseMemoryItem(this.name, hit);
// Recency decays from the last access, not creation, so reinforcement
// (which bumps last_accessed_at) actually makes a memory more recallable.
// max() guards against a clock-skewed last_accessed_at older than created_at.
const lastTouched = Math.max(item.createdAt, item.lastAccessedAt);
const ageSeconds = (now - lastTouched) / 1000;
const score = compositeScore({
similarity: similarityFromDistance(distance),
ageSeconds,
importance: item.importance,
weights,
halfLifeSeconds,
});
if (!Number.isFinite(score)) {
continue;
}
hits.push({ item, similarity: distance, score });
}
const item = parseMemoryItem(this.name, hit);
// Recency decays from the last access, not creation, so reinforcement
// (which bumps last_accessed_at) actually makes a memory more recallable.
// max() guards against a clock-skewed last_accessed_at older than created_at.
const lastTouched = Math.max(item.createdAt, item.lastAccessedAt);
const ageSeconds = (now - lastTouched) / 1000;
const score = compositeScore({
similarity: similarityFromDistance(distance),
ageSeconds,
importance: item.importance,
weights,
halfLifeSeconds,
});
if (!Number.isFinite(score)) {
continue;
}
hits.push({ item, similarity: distance, score });
}

hits.sort((a, b) => b.score - a.score);
const result = hits.slice(0, k);
hits.sort((a, b) => b.score - a.score);
const result = hits.slice(0, k);
span.setAttribute('recall.candidate_count', hits.length);
span.setAttribute('recall.result_count', result.length);
this.recordRecall(result.length, (Date.now() - startedAt) / 1000);

if (options.reinforce !== false) {
// Reinforcement is best-effort and must never break the recall read path.
await this.reinforce(result, now).catch(() => undefined);
if (options.reinforce !== false) {
// Reinforcement is best-effort and must never break the recall read path.
await this.reinforce(result, now).catch(() => undefined);
}
return result;
});
}

private recordRecall(resultCount: number, latencySeconds: number): void {
const metrics = this.telemetry.metrics;
metrics.recallTotal.labels(this.storeLabels).inc();
if (resultCount > 0) {
metrics.recallHits.labels(this.storeLabels).inc();
} else {
metrics.recallEmpty.labels(this.storeLabels).inc();
}
return result;
metrics.recallLatency.labels(this.storeLabels).observe(latencySeconds);
}

private traced<T>(operation: string, fn: (span: Span) => Promise<T>): Promise<T> {
return this.telemetry.tracer.startActiveSpan(`agent_memory.${operation}`, async (span) => {
try {
const result = await fn(span);
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (err) {
span.setStatus({ code: SpanStatusCode.ERROR, message: String(err) });
throw err;
} finally {
span.end();
}
});
}

private async reinforce(hits: MemoryHit[], now: number): Promise<void> {
Expand All @@ -331,8 +376,11 @@ export class MemoryStore {
}

async forget(id: string): Promise<boolean> {
const deleted = await this.client.call('DEL', `${this.name}:mem:${id}`);
return Number(deleted) > 0;
const removed = Number(await this.client.call('DEL', `${this.name}:mem:${id}`));
if (removed > 0) {
this.telemetry.metrics.items.labels(this.storeLabels).dec(removed);
}
return removed > 0;
}

async forgetByScope(scope: MemoryScope & { tags?: string[] }): Promise<number> {
Expand Down Expand Up @@ -383,6 +431,9 @@ export class MemoryStore {
);
}

if (deleted > 0) {
this.telemetry.metrics.items.labels(this.storeLabels).dec(deleted);
}
return deleted;
}

Expand All @@ -395,19 +446,33 @@ export class MemoryStore {
const id = randomUUID();
const record = buildMemoryRecord(this.name, id, content, vector, options, now);
await this.writeRecord(record.key, record.fields, options.ttl);
this.telemetry.metrics.items.labels(this.storeLabels).inc();
return id;
}

async remember(content: string, options: RememberOptions = {}): Promise<string> {
const now = Date.now();
const id = await this.writeMemory(content, options, now);
// Capacity enforcement is best-effort: the memory is already durably stored,
// so a failed eviction pass must not reject an otherwise successful write.
await this.enforceCapacity(options, now).catch(() => undefined);
return id;
return this.traced('remember', async (span) => {
span.setAttribute('memory.importance', options.importance ?? DEFAULT_IMPORTANCE);
if (options.ttl !== undefined) {
span.setAttribute('memory.ttl', options.ttl);
}
const now = Date.now();
const id = await this.writeMemory(content, options, now);
// Capacity enforcement is best-effort: the memory is already durably stored,
// so a failed eviction pass must not reject an otherwise successful write.
await this.enforceCapacity(options, now).catch(() => undefined);
return id;
});
}

async consolidate(options: ConsolidateOptions): Promise<ConsolidateResult> {
return this.traced('consolidate', (span) => this.runConsolidate(options, span));
}

private async runConsolidate(
options: ConsolidateOptions,
span: Span,
): Promise<ConsolidateResult> {
const now = Date.now();
const tags = options.tags ?? [];
const scope: MemoryScope = {
Expand Down Expand Up @@ -465,8 +530,11 @@ export class MemoryStore {
'2',
);
const candidates = parseFtSearchResponse(raw).map((hit) => parseMemoryItem(this.name, hit));
span.setAttribute('consolidate.candidates', candidates.length);

if (candidates.length === 0) {
span.setAttribute('consolidate.created', 0);
span.setAttribute('consolidate.deleted', 0);
return { consolidated: 0, created: [], deleted: 0 };
}

Expand All @@ -492,8 +560,14 @@ export class MemoryStore {
if (options.deleteSources !== false) {
const keys = candidates.map((item) => `${this.name}:mem:${item.id}`);
deleted = Number(await this.client.call('DEL', ...keys));
if (deleted > 0) {
this.telemetry.metrics.items.labels(this.storeLabels).dec(deleted);
}
}

this.telemetry.metrics.consolidations.labels(this.storeLabels).inc();
span.setAttribute('consolidate.created', 1);
span.setAttribute('consolidate.deleted', deleted);
return { consolidated: candidates.length, created: [summaryId], deleted };
}

Expand Down Expand Up @@ -592,16 +666,21 @@ export class MemoryStore {
if (evictKeys.length === 0) {
return;
}
await this.client.call('DEL', ...evictKeys);
await this.client.call(
'HINCRBY',
`${this.name}:__mem_stats`,
'evictions',
String(evictKeys.length),
);
// Count actual removals, not the keys we asked to drop: the index can list
// already-deleted keys (stale), so DEL may remove fewer. Using the reply
// keeps the stats and Prometheus gauges accurate, as forget/forgetByScope/
// consolidate already do.
const removed = Number(await this.client.call('DEL', ...evictKeys));
if (!(removed > 0)) {
return;
}
await this.client.call('HINCRBY', `${this.name}:__mem_stats`, 'evictions', String(removed));
this.telemetry.metrics.evictions.labels(this.storeLabels).inc(removed);
this.telemetry.metrics.items.labels(this.storeLabels).dec(removed);
}

private async embed(content: string): Promise<number[]> {
this.telemetry.metrics.embeddingCalls.labels(this.storeLabels).inc();
const vector = await this.embedFn(content);
if (this.dims === undefined) {
this.dims = vector.length;
Expand Down
Loading
Loading