From 4d53db6d2093ca55b441287ae435a65caa1f13e0 Mon Sep 17 00:00:00 2001 From: jamby77 Date: Wed, 17 Jun 2026 16:52:02 +0300 Subject: [PATCH 1/2] =?UTF-8?q?feat(agent-memory):=20Phase=209=20=E2=80=94?= =?UTF-8?q?=20observability?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add createMemoryTelemetry: prom-client metrics (items gauge, recall total/hits/empty, recall latency histogram, embedding calls, evictions, consolidations) with a configurable prefix, plus an OTel tracer - Emit spans for remember/recall/consolidate via a traced() helper with OK/ERROR status, recording k/candidate-count/result-count and candidate/created/deleted attributes - Wire metrics through remember/recall/consolidate/forget/forgetByScope/ embed and the capacity-eviction path - Telemetry option (tracerName/metricsPrefix/registry); no-op tracer and default registry when unset, mirroring agent-cache - Add @opentelemetry/api + prom-client deps and the OTel SDK dev dep --- packages/agent-memory/package.json | 5 +- packages/agent-memory/src/MemoryStore.ts | 219 ++++++++++++------ .../src/__tests__/MemoryStore.spans.test.ts | 121 ++++++++++ .../__tests__/MemoryStore.telemetry.test.ts | 175 ++++++++++++++ packages/agent-memory/src/index.ts | 2 + packages/agent-memory/src/telemetry.ts | 117 ++++++++++ pnpm-lock.yaml | 55 ++++- 7 files changed, 620 insertions(+), 74 deletions(-) create mode 100644 packages/agent-memory/src/__tests__/MemoryStore.spans.test.ts create mode 100644 packages/agent-memory/src/__tests__/MemoryStore.telemetry.test.ts create mode 100644 packages/agent-memory/src/telemetry.ts diff --git a/packages/agent-memory/package.json b/packages/agent-memory/package.json index e526af23..4976d40e 100644 --- a/packages/agent-memory/package.json +++ b/packages/agent-memory/package.json @@ -37,9 +37,12 @@ }, "dependencies": { "@betterdb/agent-cache": "workspace:*", - "@betterdb/valkey-search-kit": "workspace:*" + "@betterdb/valkey-search-kit": "workspace:*", + "@opentelemetry/api": "^1.9.0", + "prom-client": "^15.1.3" }, "devDependencies": { + "@opentelemetry/sdk-trace-base": "^1.30.1", "@types/node": "^22.19.15", "typescript": "^5.9.3", "vitest": "^4.1.1" diff --git a/packages/agent-memory/src/MemoryStore.ts b/packages/agent-memory/src/MemoryStore.ts index ca1d77e5..cee41af9 100644 --- a/packages/agent-memory/src/MemoryStore.ts +++ b/packages/agent-memory/src/MemoryStore.ts @@ -1,4 +1,5 @@ import { randomUUID } from 'node:crypto'; +import { SpanStatusCode, type Span } from '@opentelemetry/api'; import { encodeFloat32, parseFtSearchResponse } from '@betterdb/valkey-search-kit'; import { buildMemoryRecord } from './buildMemoryRecord'; import { @@ -11,6 +12,11 @@ import { parseMemoryItem } from './parseMemoryItem'; import { compositeScore, similarityFromDistance, type RecallWeights } from './compositeScore'; import { selectEvictions, type EvictionCandidate } from './selectEvictions'; import { MemoryDiscovery } from './discovery'; +import { + createMemoryTelemetry, + type MemoryTelemetry, + type MemoryTelemetryOptions, +} from './telemetry'; import type { ConsolidateOptions, ConsolidateResult, @@ -33,6 +39,7 @@ const EVICTION_SCAN_LIMIT = 10000; const CONSOLIDATE_SCAN_LIMIT = 10000; const DEFAULT_SUMMARY_IMPORTANCE = 0.7; const SUMMARY_SOURCE = 'summary'; +const DEFAULT_IMPORTANCE = 0.5; const DEFAULT_CONFIG_REFRESH_MS = 30000; const MIN_CONFIG_REFRESH_MS = 1000; const MAX_DISTANCE = 2; @@ -70,6 +77,7 @@ export interface MemoryStoreOptions { maxItemsPerScope?: number; discovery?: boolean | MemoryDiscoveryConfig; configRefresh?: boolean | MemoryConfigRefreshConfig; + telemetry?: MemoryTelemetryOptions; } export class MemoryStore { @@ -88,12 +96,16 @@ export class MemoryStore { private configRefreshHandle: ReturnType | null = null; private readonly discovery: MemoryDiscovery | null; private discoveryReady: Promise | null = null; + private readonly telemetry: MemoryTelemetry; + private readonly storeLabels: Record; private dims?: number; constructor(options: MemoryStoreOptions) { this.client = options.client; this.name = options.name; this.embedFn = options.embedFn; + this.telemetry = createMemoryTelemetry(options.telemetry); + this.storeLabels = { store_name: this.name }; this.initialThreshold = options.defaultThreshold ?? DEFAULT_THRESHOLD; this.initialWeights = { ...(options.weights ?? DEFAULT_WEIGHTS) }; this.initialHalfLifeSeconds = options.halfLifeSeconds ?? DEFAULT_HALF_LIFE_SECONDS; @@ -245,75 +257,108 @@ export class MemoryStore { } async recall(query: string, options: RecallOptions = {}): Promise { - const k = options.k ?? DEFAULT_RECALL_K; - const threshold = options.threshold ?? this.defaultThreshold; - const weights = options.weights ?? this.weights; - // Snapshot the half-life alongside threshold/weights so a concurrent - // configRefresh can't score one recall with a mix of config versions. - const halfLifeSeconds = this.halfLifeSeconds; - const fetchK = k * RECALL_OVERFETCH; - const tags = options.tags ?? []; - const scope = { - threadId: options.threadId, - agentId: options.agentId, - namespace: options.namespace, - }; + return this.traced('recall', async (span) => { + const startedAt = Date.now(); + const k = options.k ?? DEFAULT_RECALL_K; + const threshold = options.threshold ?? this.defaultThreshold; + const weights = options.weights ?? this.weights; + // Snapshot the half-life alongside threshold/weights so a concurrent + // configRefresh can't score one recall with a mix of config versions. + const halfLifeSeconds = this.halfLifeSeconds; + const fetchK = k * RECALL_OVERFETCH; + const tags = options.tags ?? []; + const scope = { + threadId: options.threadId, + agentId: options.agentId, + namespace: options.namespace, + }; + span.setAttribute('recall.k', k); - const vector = await this.embed(query); - const queryString = buildRecallQuery(fetchK, scope, tags); - const raw = await this.client.call( - 'FT.SEARCH', - `${this.name}:mem:idx`, - queryString, - 'PARAMS', - '2', - 'vec', - encodeFloat32(vector), - 'LIMIT', - '0', - String(fetchK), - 'DIALECT', - '2', - ); + const vector = await this.embed(query); + const queryString = buildRecallQuery(fetchK, scope, tags); + const raw = await this.client.call( + 'FT.SEARCH', + `${this.name}:mem:idx`, + queryString, + 'PARAMS', + '2', + 'vec', + encodeFloat32(vector), + 'LIMIT', + '0', + String(fetchK), + 'DIALECT', + '2', + ); - const now = Date.now(); - const hits: MemoryHit[] = []; - for (const hit of parseFtSearchResponse(raw)) { - const rawScore = hit.fields[SCORE_FIELD]; - if (rawScore === undefined || rawScore.trim() === '') { - continue; - } - const distance = Number(rawScore); - if (!Number.isFinite(distance) || distance > threshold) { - continue; - } - const item = parseMemoryItem(this.name, hit); - // Recency decays from the last access, not creation, so reinforcement - // (which bumps last_accessed_at) actually makes a memory more recallable. - // max() guards against a clock-skewed last_accessed_at older than created_at. - const lastTouched = Math.max(item.createdAt, item.lastAccessedAt); - const ageSeconds = (now - lastTouched) / 1000; - const score = compositeScore({ - similarity: similarityFromDistance(distance), - ageSeconds, - importance: item.importance, - weights, - halfLifeSeconds, - }); - if (!Number.isFinite(score)) { - continue; + const now = Date.now(); + const hits: MemoryHit[] = []; + for (const hit of parseFtSearchResponse(raw)) { + const rawScore = hit.fields[SCORE_FIELD]; + if (rawScore === undefined || rawScore.trim() === '') { + continue; + } + const distance = Number(rawScore); + if (!Number.isFinite(distance) || distance > threshold) { + continue; + } + const item = parseMemoryItem(this.name, hit); + // Recency decays from the last access, not creation, so reinforcement + // (which bumps last_accessed_at) actually makes a memory more recallable. + // max() guards against a clock-skewed last_accessed_at older than created_at. + const lastTouched = Math.max(item.createdAt, item.lastAccessedAt); + const ageSeconds = (now - lastTouched) / 1000; + const score = compositeScore({ + similarity: similarityFromDistance(distance), + ageSeconds, + importance: item.importance, + weights, + halfLifeSeconds, + }); + if (!Number.isFinite(score)) { + continue; + } + hits.push({ item, similarity: distance, score }); } - hits.push({ item, similarity: distance, score }); - } - hits.sort((a, b) => b.score - a.score); - const result = hits.slice(0, k); + hits.sort((a, b) => b.score - a.score); + const result = hits.slice(0, k); + span.setAttribute('recall.candidate_count', hits.length); + span.setAttribute('recall.result_count', result.length); + this.recordRecall(result.length, (Date.now() - startedAt) / 1000); + + if (options.reinforce !== false) { + // Reinforcement is best-effort and must never break the recall read path. + await this.reinforce(result, now).catch(() => undefined); + } + return result; + }); + } - if (options.reinforce !== false) { - // Reinforcement is best-effort and must never break the recall read path. - await this.reinforce(result, now).catch(() => undefined); + private recordRecall(resultCount: number, latencySeconds: number): void { + const metrics = this.telemetry.metrics; + metrics.recallTotal.labels(this.storeLabels).inc(); + if (resultCount > 0) { + metrics.recallHits.labels(this.storeLabels).inc(); + } else { + metrics.recallEmpty.labels(this.storeLabels).inc(); } - return result; + metrics.recallLatency.labels(this.storeLabels).observe(latencySeconds); + } + + private traced(operation: string, fn: (span: Span) => Promise): Promise { + return this.telemetry.tracer.startActiveSpan(`agent_memory.${operation}`, async (span) => { + try { + const result = await fn(span); + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (err) { + span.setStatus({ code: SpanStatusCode.ERROR, message: String(err) }); + throw err; + } finally { + span.end(); + } + }); } private async reinforce(hits: MemoryHit[], now: number): Promise { @@ -331,8 +376,11 @@ export class MemoryStore { } async forget(id: string): Promise { - const deleted = await this.client.call('DEL', `${this.name}:mem:${id}`); - return Number(deleted) > 0; + const removed = Number(await this.client.call('DEL', `${this.name}:mem:${id}`)); + if (removed > 0) { + this.telemetry.metrics.items.labels(this.storeLabels).dec(removed); + } + return removed > 0; } async forgetByScope(scope: MemoryScope & { tags?: string[] }): Promise { @@ -383,6 +431,9 @@ export class MemoryStore { ); } + if (deleted > 0) { + this.telemetry.metrics.items.labels(this.storeLabels).dec(deleted); + } return deleted; } @@ -395,19 +446,33 @@ export class MemoryStore { const id = randomUUID(); const record = buildMemoryRecord(this.name, id, content, vector, options, now); await this.writeRecord(record.key, record.fields, options.ttl); + this.telemetry.metrics.items.labels(this.storeLabels).inc(); return id; } async remember(content: string, options: RememberOptions = {}): Promise { - const now = Date.now(); - const id = await this.writeMemory(content, options, now); - // Capacity enforcement is best-effort: the memory is already durably stored, - // so a failed eviction pass must not reject an otherwise successful write. - await this.enforceCapacity(options, now).catch(() => undefined); - return id; + return this.traced('remember', async (span) => { + span.setAttribute('memory.importance', options.importance ?? DEFAULT_IMPORTANCE); + if (options.ttl !== undefined) { + span.setAttribute('memory.ttl', options.ttl); + } + const now = Date.now(); + const id = await this.writeMemory(content, options, now); + // Capacity enforcement is best-effort: the memory is already durably stored, + // so a failed eviction pass must not reject an otherwise successful write. + await this.enforceCapacity(options, now).catch(() => undefined); + return id; + }); } async consolidate(options: ConsolidateOptions): Promise { + return this.traced('consolidate', (span) => this.runConsolidate(options, span)); + } + + private async runConsolidate( + options: ConsolidateOptions, + span: Span, + ): Promise { const now = Date.now(); const tags = options.tags ?? []; const scope: MemoryScope = { @@ -465,8 +530,11 @@ export class MemoryStore { '2', ); const candidates = parseFtSearchResponse(raw).map((hit) => parseMemoryItem(this.name, hit)); + span.setAttribute('consolidate.candidates', candidates.length); if (candidates.length === 0) { + span.setAttribute('consolidate.created', 0); + span.setAttribute('consolidate.deleted', 0); return { consolidated: 0, created: [], deleted: 0 }; } @@ -492,8 +560,14 @@ export class MemoryStore { if (options.deleteSources !== false) { const keys = candidates.map((item) => `${this.name}:mem:${item.id}`); deleted = Number(await this.client.call('DEL', ...keys)); + if (deleted > 0) { + this.telemetry.metrics.items.labels(this.storeLabels).dec(deleted); + } } + this.telemetry.metrics.consolidations.labels(this.storeLabels).inc(); + span.setAttribute('consolidate.created', 1); + span.setAttribute('consolidate.deleted', deleted); return { consolidated: candidates.length, created: [summaryId], deleted }; } @@ -599,9 +673,12 @@ export class MemoryStore { 'evictions', String(evictKeys.length), ); + this.telemetry.metrics.evictions.labels(this.storeLabels).inc(evictKeys.length); + this.telemetry.metrics.items.labels(this.storeLabels).dec(evictKeys.length); } private async embed(content: string): Promise { + this.telemetry.metrics.embeddingCalls.labels(this.storeLabels).inc(); const vector = await this.embedFn(content); if (this.dims === undefined) { this.dims = vector.length; diff --git a/packages/agent-memory/src/__tests__/MemoryStore.spans.test.ts b/packages/agent-memory/src/__tests__/MemoryStore.spans.test.ts new file mode 100644 index 00000000..cccf281c --- /dev/null +++ b/packages/agent-memory/src/__tests__/MemoryStore.spans.test.ts @@ -0,0 +1,121 @@ +import { describe, it, expect, beforeAll, afterAll, afterEach, vi } from 'vitest'; +import { trace } from '@opentelemetry/api'; +import { + BasicTracerProvider, + InMemorySpanExporter, + SimpleSpanProcessor, +} from '@opentelemetry/sdk-trace-base'; +import { MemoryStore } from '../MemoryStore'; +import { fakeEmbed } from './helpers/fakeEmbed'; +import { mockClient } from './helpers/mockClient'; + +const exporter = new InMemorySpanExporter(); +let provider: BasicTracerProvider; + +beforeAll(() => { + provider = new BasicTracerProvider({ spanProcessors: [new SimpleSpanProcessor(exporter)] }); + trace.setGlobalTracerProvider(provider); +}); + +afterEach(() => { + exporter.reset(); +}); + +afterAll(async () => { + await provider.shutdown(); + trace.disable(); +}); + +function spanNamed(name: string) { + return exporter.getFinishedSpans().find((s) => s.name === name); +} + +function recallHit(distance: number): unknown[] { + const now = Date.now(); + const fields: Record = { + __score: String(distance), + content: 'c', + importance: '0.5', + created_at: String(now), + last_accessed_at: String(now), + access_count: '0', + }; + const flat: string[] = []; + for (const [field, value] of Object.entries(fields)) { + flat.push(field, value); + } + return ['1', 'mem:mem:a', flat]; +} + +function consolidateHit(id: string): [string, string[]] { + const created = Date.now() - 100000 * 1000; + const fields: Record = { + content: `c-${id}`, + importance: '0.2', + created_at: String(created), + last_accessed_at: String(created), + access_count: '0', + }; + const flat: string[] = []; + for (const [f, v] of Object.entries(fields)) { + flat.push(f, v); + } + return [`mem:mem:${id}`, flat]; +} + +function searchReply(total: number, hits: Array<[string, string[]]> = []): unknown[] { + const out: unknown[] = [String(total)]; + for (const [key, flat] of hits) { + out.push(key, flat); + } + return out; +} + +describe('MemoryStore spans', () => { + it('emits a remember span with the importance attribute', async () => { + const store = new MemoryStore({ + client: mockClient(() => 'OK'), + name: 'mem', + embedFn: fakeEmbed(8), + }); + + await store.remember('hi', { importance: 0.8 }); + + const span = spanNamed('agent_memory.remember'); + expect(span).toBeDefined(); + expect(span?.attributes['memory.importance']).toBe(0.8); + }); + + it('emits a recall span with k and result count', async () => { + const client = mockClient((command) => (command === 'FT.SEARCH' ? recallHit(0.1) : 'OK')); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + await store.recall('q', { k: 1 }); + + const span = spanNamed('agent_memory.recall'); + expect(span).toBeDefined(); + expect(span?.attributes['recall.k']).toBe(1); + expect(span?.attributes['recall.result_count']).toBe(1); + }); + + it('emits a consolidate span with candidate/created/deleted counts', async () => { + const client = mockClient((command, ...args) => { + if (command === 'FT.SEARCH') { + return searchReply(1, [consolidateHit('a')]); + } + if (command === 'DEL') { + return args.length; + } + return 'OK'; + }); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + await store.consolidate({ namespace: 'u1', summarize: vi.fn(async () => 'summary') }); + + const span = spanNamed('agent_memory.consolidate'); + expect(span).toBeDefined(); + expect(span?.attributes['consolidate.candidates']).toBe(1); + expect(span?.attributes['consolidate.created']).toBe(1); + expect(span?.attributes['consolidate.deleted']).toBe(1); + }); +}); diff --git a/packages/agent-memory/src/__tests__/MemoryStore.telemetry.test.ts b/packages/agent-memory/src/__tests__/MemoryStore.telemetry.test.ts new file mode 100644 index 00000000..1d9b6818 --- /dev/null +++ b/packages/agent-memory/src/__tests__/MemoryStore.telemetry.test.ts @@ -0,0 +1,175 @@ +import { describe, it, expect, vi } from 'vitest'; +import { Registry } from 'prom-client'; +import { MemoryStore } from '../MemoryStore'; +import { fakeEmbed } from './helpers/fakeEmbed'; +import { mockClient } from './helpers/mockClient'; + +function recallHit(distance: number): unknown[] { + const now = Date.now(); + const fields: Record = { + __score: String(distance), + content: 'c', + importance: '0.5', + created_at: String(now), + last_accessed_at: String(now), + access_count: '0', + }; + const flat: string[] = []; + for (const [field, value] of Object.entries(fields)) { + flat.push(field, value); + } + return ['1', 'mem:mem:a', flat]; +} + +function evictionFields(importance: number, lastAccessedAt: number): Record { + return { importance: String(importance), last_accessed_at: String(lastAccessedAt) }; +} + +function evictionSearch(total: number, hits: Array<[string, Record]>): unknown[] { + const out: unknown[] = [String(total)]; + for (const [key, fieldMap] of hits) { + const flat: string[] = []; + for (const [f, v] of Object.entries(fieldMap)) { + flat.push(f, v); + } + out.push(key, flat); + } + return out; +} + +function consolidateHit(id: string): [string, string[]] { + const created = Date.now() - 100000 * 1000; + const fields: Record = { + content: `c-${id}`, + importance: '0.2', + created_at: String(created), + last_accessed_at: String(created), + access_count: '0', + }; + const flat: string[] = []; + for (const [f, v] of Object.entries(fields)) { + flat.push(f, v); + } + return [`mem:mem:${id}`, flat]; +} + +describe('MemoryStore metrics', () => { + it('counts embedding calls and bumps the items gauge on remember', async () => { + const registry = new Registry(); + const store = new MemoryStore({ + client: mockClient(() => 'OK'), + name: 'mem', + embedFn: fakeEmbed(8), + telemetry: { registry }, + }); + + await store.remember('hi'); + + const text = await registry.metrics(); + expect(text).toMatch(/agent_memory_embedding_calls_total\{store_name="mem"\} 1/); + expect(text).toMatch(/agent_memory_items\{store_name="mem"\} 1/); + }); + + it('records a recall hit', async () => { + const registry = new Registry(); + const client = mockClient((command) => (command === 'FT.SEARCH' ? recallHit(0.1) : 'OK')); + const store = new MemoryStore({ + client, + name: 'mem', + embedFn: fakeEmbed(8), + telemetry: { registry }, + }); + + await store.recall('q', { k: 1 }); + + const text = await registry.metrics(); + expect(text).toMatch(/agent_memory_recall_total\{store_name="mem"\} 1/); + expect(text).toMatch(/agent_memory_recall_hits_total\{store_name="mem"\} 1/); + expect(text).toMatch(/agent_memory_recall_latency_seconds_count\{store_name="mem"\} 1/); + }); + + it('records an empty recall', async () => { + const registry = new Registry(); + const client = mockClient((command) => (command === 'FT.SEARCH' ? ['0'] : 'OK')); + const store = new MemoryStore({ + client, + name: 'mem', + embedFn: fakeEmbed(8), + telemetry: { registry }, + }); + + await store.recall('q', { k: 1 }); + + const text = await registry.metrics(); + expect(text).toMatch(/agent_memory_recall_empty_total\{store_name="mem"\} 1/); + expect(text).not.toMatch(/agent_memory_recall_hits_total\{store_name="mem"\} [1-9]/); + }); + + it('counts evictions when capacity is enforced', async () => { + const registry = new Registry(); + const client = mockClient((command, ...args) => { + if (command === 'FT.SEARCH') { + if (args.includes('RETURN')) { + return evictionSearch(3, [ + ['mem:mem:a', evictionFields(0.1, 1000)], + ['mem:mem:b', evictionFields(0.9, 5000)], + ['mem:mem:c', evictionFields(0.5, 9000)], + ]); + } + return evictionSearch(3, []); + } + return 'OK'; + }); + const store = new MemoryStore({ + client, + name: 'mem', + embedFn: fakeEmbed(8), + maxItemsPerScope: 2, + telemetry: { registry }, + }); + + await store.remember('content', { namespace: 'u1' }); + + const text = await registry.metrics(); + expect(text).toMatch(/agent_memory_evictions_total\{store_name="mem"\} 1/); + }); + + it('counts consolidations', async () => { + const registry = new Registry(); + const client = mockClient((command, ...args) => { + if (command === 'FT.SEARCH') { + return evictionSearch(1, [consolidateHit('a')]); + } + if (command === 'DEL') { + return args.length; + } + return 'OK'; + }); + const store = new MemoryStore({ + client, + name: 'mem', + embedFn: fakeEmbed(8), + telemetry: { registry }, + }); + + await store.consolidate({ namespace: 'u1', summarize: vi.fn(async () => 'summary') }); + + const text = await registry.metrics(); + expect(text).toMatch(/agent_memory_consolidations_total\{store_name="mem"\} 1/); + }); + + it('honours a configurable metrics prefix', async () => { + const registry = new Registry(); + const store = new MemoryStore({ + client: mockClient(() => 'OK'), + name: 'mem', + embedFn: fakeEmbed(8), + telemetry: { registry, metricsPrefix: 'mymem' }, + }); + + await store.remember('hi'); + + const text = await registry.metrics(); + expect(text).toMatch(/mymem_embedding_calls_total\{store_name="mem"\} 1/); + }); +}); diff --git a/packages/agent-memory/src/index.ts b/packages/agent-memory/src/index.ts index 634510d7..87552912 100644 --- a/packages/agent-memory/src/index.ts +++ b/packages/agent-memory/src/index.ts @@ -8,6 +8,8 @@ export type { } from './MemoryStore'; export { MemoryDiscovery, MEMORY_CACHE_TYPE, MEMORY_CAPABILITIES } from './discovery'; export type { MemoryDiscoveryDeps, MemoryMarker } from './discovery'; +export { createMemoryTelemetry, DEFAULT_METRICS_PREFIX, DEFAULT_TRACER_NAME } from './telemetry'; +export type { MemoryTelemetry, MemoryTelemetryOptions, MemoryMetrics } from './telemetry'; export { AgentMemory } from './AgentMemory'; export type { EmbedFn, diff --git a/packages/agent-memory/src/telemetry.ts b/packages/agent-memory/src/telemetry.ts new file mode 100644 index 00000000..0db00ac9 --- /dev/null +++ b/packages/agent-memory/src/telemetry.ts @@ -0,0 +1,117 @@ +import { trace, type Tracer } from '@opentelemetry/api'; +import { + Counter, + Gauge, + Histogram, + register as defaultRegistry, + Registry, + type CounterConfiguration, + type GaugeConfiguration, + type HistogramConfiguration, +} from 'prom-client'; + +export const DEFAULT_METRICS_PREFIX = 'agent_memory'; +export const DEFAULT_TRACER_NAME = '@betterdb/agent-memory'; + +const RECALL_LATENCY_BUCKETS = [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]; + +export interface MemoryTelemetryOptions { + tracerName?: string; + metricsPrefix?: string; + registry?: Registry; +} + +export interface MemoryMetrics { + items: Gauge; + recallTotal: Counter; + recallHits: Counter; + recallEmpty: Counter; + recallLatency: Histogram; + embeddingCalls: Counter; + evictions: Counter; + consolidations: Counter; +} + +export interface MemoryTelemetry { + tracer: Tracer; + metrics: MemoryMetrics; +} + +function getOrCreateCounter( + registry: Registry, + config: CounterConfiguration, +): Counter { + const existing = registry.getSingleMetric(config.name); + return existing + ? (existing as Counter) + : new Counter({ ...config, registers: [registry] }); +} + +function getOrCreateGauge(registry: Registry, config: GaugeConfiguration): Gauge { + const existing = registry.getSingleMetric(config.name); + return existing ? (existing as Gauge) : new Gauge({ ...config, registers: [registry] }); +} + +function getOrCreateHistogram( + registry: Registry, + config: HistogramConfiguration, +): Histogram { + const existing = registry.getSingleMetric(config.name); + return existing + ? (existing as Histogram) + : new Histogram({ ...config, registers: [registry] }); +} + +export function createMemoryTelemetry(options: MemoryTelemetryOptions = {}): MemoryTelemetry { + const registry = options.registry ?? defaultRegistry; + const prefix = options.metricsPrefix ?? DEFAULT_METRICS_PREFIX; + const tracer = trace.getTracer(options.tracerName ?? DEFAULT_TRACER_NAME); + const labelNames = ['store_name']; + + return { + tracer, + metrics: { + items: getOrCreateGauge(registry, { + name: `${prefix}_items`, + help: 'Approximate number of stored memories observed in-process', + labelNames, + }), + recallTotal: getOrCreateCounter(registry, { + name: `${prefix}_recall_total`, + help: 'Total recall queries', + labelNames, + }), + recallHits: getOrCreateCounter(registry, { + name: `${prefix}_recall_hits_total`, + help: 'Recall queries that returned at least one memory', + labelNames, + }), + recallEmpty: getOrCreateCounter(registry, { + name: `${prefix}_recall_empty_total`, + help: 'Recall queries that returned no memories', + labelNames, + }), + recallLatency: getOrCreateHistogram(registry, { + name: `${prefix}_recall_latency_seconds`, + help: 'Recall query latency in seconds', + labelNames, + buckets: RECALL_LATENCY_BUCKETS, + }), + embeddingCalls: getOrCreateCounter(registry, { + name: `${prefix}_embedding_calls_total`, + help: 'Total embedding function invocations', + labelNames, + }), + evictions: getOrCreateCounter(registry, { + name: `${prefix}_evictions_total`, + help: 'Total memories evicted for capacity', + labelNames, + }), + consolidations: getOrCreateCounter(registry, { + name: `${prefix}_consolidations_total`, + help: 'Total consolidation summaries created', + labelNames, + }), + }, + }; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 85381a1e..e211bf9a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -471,7 +471,16 @@ importers: '@betterdb/valkey-search-kit': specifier: workspace:* version: link:../valkey-search-kit + '@opentelemetry/api': + specifier: ^1.9.0 + version: 1.9.0 + prom-client: + specifier: ^15.1.3 + version: 15.1.3 devDependencies: + '@opentelemetry/sdk-trace-base': + specifier: ^1.30.1 + version: 1.30.1(@opentelemetry/api@1.9.0) '@types/node': specifier: ^22.19.15 version: 22.19.15 @@ -2908,6 +2917,12 @@ packages: resolution: {integrity: sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==} engines: {node: '>=8.0.0'} + '@opentelemetry/core@1.30.1': + resolution: {integrity: sha512-OOCM2C/QIURhJMuKaekP3TRBxBKxG/TWWA0TL2J6nXUtDnuCtccy49LUJF8xPFXMX+0LMcxFpCo8M9cGY1W6rQ==} + engines: {node: '>=14'} + peerDependencies: + '@opentelemetry/api': '>=1.0.0 <1.10.0' + '@opentelemetry/core@2.2.0': resolution: {integrity: sha512-FuabnnUm8LflnieVxs6eP7Z383hgQU4W1e3KJS6aOG3RxWxcHyBxH8fDMHNgu/gFx/M2jvTOW/4/PHhLz6bjWw==} engines: {node: ^18.19.0 || >=20.6.0} @@ -2938,6 +2953,12 @@ packages: peerDependencies: '@opentelemetry/api': ^1.3.0 + '@opentelemetry/resources@1.30.1': + resolution: {integrity: sha512-5UxZqiAgLYGFjS4s9qm5mBVo433u+dSPUFWVWXmLAD4wB65oMCoXaJP1KJa9DIYYMeHu3z4BZcStG3LC593cWA==} + engines: {node: '>=14'} + peerDependencies: + '@opentelemetry/api': '>=1.0.0 <1.10.0' + '@opentelemetry/resources@2.2.0': resolution: {integrity: sha512-1pNQf/JazQTMA0BiO5NINUzH0cbLbbl7mntLa4aJNmCCXSj0q03T5ZXXL0zw4G55TjdL9Tz32cznGClf+8zr5A==} engines: {node: ^18.19.0 || >=20.6.0} @@ -2962,12 +2983,22 @@ packages: peerDependencies: '@opentelemetry/api': '>=1.9.0 <1.10.0' + '@opentelemetry/sdk-trace-base@1.30.1': + resolution: {integrity: sha512-jVPgBbH1gCy2Lb7X0AVQ8XAfgg0pJ4nvl8/IiQA6nxOsPvS+0zMJaFSs2ltXe0J6C8dqjcnpyqINDJmU30+uOg==} + engines: {node: '>=14'} + peerDependencies: + '@opentelemetry/api': '>=1.0.0 <1.10.0' + '@opentelemetry/sdk-trace-base@2.2.0': resolution: {integrity: sha512-xWQgL0Bmctsalg6PaXExmzdedSp3gyKV8mQBwK/j9VGdCDu2fmXIb2gAehBKbkXCpJ4HPkgv3QfoJWRT4dHWbw==} engines: {node: ^18.19.0 || >=20.6.0} peerDependencies: '@opentelemetry/api': '>=1.3.0 <1.10.0' + '@opentelemetry/semantic-conventions@1.28.0': + resolution: {integrity: sha512-lp4qAiMTD4sNWW4DbKLBkfiMZ4jbAboJIGOQr5DvciMRI494OapieI9qiODpOt0XBr1LjIDy1xAGAnVs5supTA==} + engines: {node: '>=14'} + '@opentelemetry/semantic-conventions@1.40.0': resolution: {integrity: sha512-cifvXDhcqMwwTlTK04GBNeIe7yyo28Mfby85QXFe1Yk8nmi36Ab/5UQwptOx84SsoGNRg+EVSjwzfSZMy6pmlw==} engines: {node: '>=14'} @@ -11724,6 +11755,11 @@ snapshots: '@opentelemetry/api@1.9.0': {} + '@opentelemetry/core@1.30.1(@opentelemetry/api@1.9.0)': + dependencies: + '@opentelemetry/api': 1.9.0 + '@opentelemetry/semantic-conventions': 1.28.0 + '@opentelemetry/core@2.2.0(@opentelemetry/api@1.9.0)': dependencies: '@opentelemetry/api': 1.9.0 @@ -11760,6 +11796,12 @@ snapshots: '@opentelemetry/sdk-trace-base': 2.2.0(@opentelemetry/api@1.9.0) protobufjs: 7.6.3 + '@opentelemetry/resources@1.30.1(@opentelemetry/api@1.9.0)': + dependencies: + '@opentelemetry/api': 1.9.0 + '@opentelemetry/core': 1.30.1(@opentelemetry/api@1.9.0) + '@opentelemetry/semantic-conventions': 1.28.0 + '@opentelemetry/resources@2.2.0(@opentelemetry/api@1.9.0)': dependencies: '@opentelemetry/api': 1.9.0 @@ -11785,6 +11827,13 @@ snapshots: '@opentelemetry/core': 2.2.0(@opentelemetry/api@1.9.0) '@opentelemetry/resources': 2.2.0(@opentelemetry/api@1.9.0) + '@opentelemetry/sdk-trace-base@1.30.1(@opentelemetry/api@1.9.0)': + dependencies: + '@opentelemetry/api': 1.9.0 + '@opentelemetry/core': 1.30.1(@opentelemetry/api@1.9.0) + '@opentelemetry/resources': 1.30.1(@opentelemetry/api@1.9.0) + '@opentelemetry/semantic-conventions': 1.28.0 + '@opentelemetry/sdk-trace-base@2.2.0(@opentelemetry/api@1.9.0)': dependencies: '@opentelemetry/api': 1.9.0 @@ -11792,6 +11841,8 @@ snapshots: '@opentelemetry/resources': 2.2.0(@opentelemetry/api@1.9.0) '@opentelemetry/semantic-conventions': 1.40.0 + '@opentelemetry/semantic-conventions@1.28.0': {} + '@opentelemetry/semantic-conventions@1.40.0': {} '@oxc-project/types@0.133.0': {} @@ -15682,7 +15733,7 @@ snapshots: isstream: 0.1.2 jsonwebtoken: 9.0.3 mime-types: 2.1.35 - retry-axios: 2.6.0(axios@1.16.0) + retry-axios: 2.6.0(axios@1.16.0(debug@4.4.3)) tough-cookie: 4.1.4 transitivePeerDependencies: - supports-color @@ -17818,7 +17869,7 @@ snapshots: ret@0.5.0: {} - retry-axios@2.6.0(axios@1.16.0): + retry-axios@2.6.0(axios@1.16.0(debug@4.4.3)): dependencies: axios: 1.16.0(debug@4.4.3) From 9f7207920bea572224d4931484c48fd2b6d05b9c Mon Sep 17 00:00:00 2001 From: jamby77 Date: Fri, 19 Jun 2026 14:32:12 +0300 Subject: [PATCH 2/2] fix(agent-memory): count actual DEL removals in eviction stats and metrics enforceCapacity bumped the evictions stat/counter and decremented the items gauge by evictKeys.length without reading the DEL reply, so a stale index (keys already gone) drifted Prometheus. Use the actual removal count, guarded > 0, matching forget/forgetByScope/consolidate. --- packages/agent-memory/src/MemoryStore.ts | 20 +++++----- .../__tests__/MemoryStore.eviction.test.ts | 37 +++++++++++++++++++ .../__tests__/MemoryStore.telemetry.test.ts | 3 ++ 3 files changed, 51 insertions(+), 9 deletions(-) diff --git a/packages/agent-memory/src/MemoryStore.ts b/packages/agent-memory/src/MemoryStore.ts index cee41af9..02d77961 100644 --- a/packages/agent-memory/src/MemoryStore.ts +++ b/packages/agent-memory/src/MemoryStore.ts @@ -666,15 +666,17 @@ export class MemoryStore { if (evictKeys.length === 0) { return; } - await this.client.call('DEL', ...evictKeys); - await this.client.call( - 'HINCRBY', - `${this.name}:__mem_stats`, - 'evictions', - String(evictKeys.length), - ); - this.telemetry.metrics.evictions.labels(this.storeLabels).inc(evictKeys.length); - this.telemetry.metrics.items.labels(this.storeLabels).dec(evictKeys.length); + // Count actual removals, not the keys we asked to drop: the index can list + // already-deleted keys (stale), so DEL may remove fewer. Using the reply + // keeps the stats and Prometheus gauges accurate, as forget/forgetByScope/ + // consolidate already do. + const removed = Number(await this.client.call('DEL', ...evictKeys)); + if (!(removed > 0)) { + return; + } + await this.client.call('HINCRBY', `${this.name}:__mem_stats`, 'evictions', String(removed)); + this.telemetry.metrics.evictions.labels(this.storeLabels).inc(removed); + this.telemetry.metrics.items.labels(this.storeLabels).dec(removed); } private async embed(content: string): Promise { diff --git a/packages/agent-memory/src/__tests__/MemoryStore.eviction.test.ts b/packages/agent-memory/src/__tests__/MemoryStore.eviction.test.ts index 859374d8..f37ec279 100644 --- a/packages/agent-memory/src/__tests__/MemoryStore.eviction.test.ts +++ b/packages/agent-memory/src/__tests__/MemoryStore.eviction.test.ts @@ -84,6 +84,9 @@ describe('MemoryStore capacity eviction', () => { } return searchReply(3); } + if (command === 'DEL') { + return args.length; + } return 'OK'; }); const store = new MemoryStore({ @@ -103,6 +106,40 @@ describe('MemoryStore capacity eviction', () => { expect(hincr).toEqual(['HINCRBY', 'mem:__mem_stats', 'evictions', '1']); }); + it('counts only actual removals when the index lists already-deleted keys', async () => { + const client = mockClient((command, ...args) => { + if (command === 'FT.SEARCH') { + if (args.includes('RETURN')) { + return searchReply(4, [ + ['mem:mem:a', fields(0.1, 1000)], + ['mem:mem:b', fields(0.2, 2000)], + ['mem:mem:c', fields(0.9, 5000)], + ['mem:mem:d', fields(0.5, 9000)], + ]); + } + return searchReply(4); + } + // Two keys are evicted but only one was still live. + if (command === 'DEL') { + return 1; + } + return 'OK'; + }); + const store = new MemoryStore({ + client, + name: 'mem', + embedFn: fakeEmbed(8), + maxItemsPerScope: 2, + }); + + await store.remember('content', { namespace: 'u1' }); + + const hincr = client.call.mock.calls.find( + (c) => c[0] === 'HINCRBY' && c[1] === 'mem:__mem_stats', + ); + expect(hincr).toEqual(['HINCRBY', 'mem:__mem_stats', 'evictions', '1']); + }); + it('queries capacity by the written item scope', async () => { const client = mockClient((command) => (command === 'FT.SEARCH' ? searchReply(2) : 'OK')); const store = new MemoryStore({ diff --git a/packages/agent-memory/src/__tests__/MemoryStore.telemetry.test.ts b/packages/agent-memory/src/__tests__/MemoryStore.telemetry.test.ts index 1d9b6818..ad3f9659 100644 --- a/packages/agent-memory/src/__tests__/MemoryStore.telemetry.test.ts +++ b/packages/agent-memory/src/__tests__/MemoryStore.telemetry.test.ts @@ -118,6 +118,9 @@ describe('MemoryStore metrics', () => { } return evictionSearch(3, []); } + if (command === 'DEL') { + return args.length; + } return 'OK'; }); const store = new MemoryStore({