diff --git a/packages/agent-cache/src/index.ts b/packages/agent-cache/src/index.ts index 638766bf..c9938b05 100644 --- a/packages/agent-cache/src/index.ts +++ b/packages/agent-cache/src/index.ts @@ -20,12 +20,16 @@ export type { TierDefaults, ConfigRefreshOptions, } from './types'; +export { AgentCacheError, AgentCacheUsageError, ValkeyCommandError } from './errors'; +export type { DiscoveryOptions, MarkerMetadata } from './discovery'; export { - AgentCacheError, - AgentCacheUsageError, - ValkeyCommandError, -} from './errors'; -export type { DiscoveryOptions } from './discovery'; + PROTOCOL_VERSION, + REGISTRY_KEY, + PROTOCOL_KEY, + HEARTBEAT_KEY_PREFIX, + DEFAULT_HEARTBEAT_INTERVAL_MS, + HEARTBEAT_TTL_SECONDS, +} from './discovery'; export type { Analytics } from './analytics'; export type { ContentBlock, diff --git a/packages/agent-memory/src/MemoryStore.ts b/packages/agent-memory/src/MemoryStore.ts index 0fd47ad9..7717ef43 100644 --- a/packages/agent-memory/src/MemoryStore.ts +++ b/packages/agent-memory/src/MemoryStore.ts @@ -10,6 +10,7 @@ import { import { parseMemoryItem } from './parseMemoryItem'; import { compositeScore, similarityFromDistance, type RecallWeights } from './compositeScore'; import { selectEvictions, type EvictionCandidate } from './selectEvictions'; +import { MemoryDiscovery } from './discovery'; import type { ConsolidateOptions, ConsolidateResult, @@ -33,6 +34,17 @@ const CONSOLIDATE_SCAN_LIMIT = 10000; const DEFAULT_SUMMARY_IMPORTANCE = 0.7; const SUMMARY_SOURCE = 'summary'; +// Read lazily so only discovery users pay the disk read on import (and avoid a +// bundler hazard, since package.json is not always emitted). +function packageVersion(): string { + return (require('../package.json') as { version: string }).version; +} + +export interface MemoryDiscoveryConfig { + version?: string; + heartbeatIntervalMs?: number; +} + export interface MemoryStoreOptions { client: MemoryStoreClient; name: string; @@ -41,6 +53,7 @@ export interface MemoryStoreOptions { weights?: RecallWeights; halfLifeSeconds?: number; maxItemsPerScope?: number; + discovery?: boolean | MemoryDiscoveryConfig; } export class MemoryStore { @@ -51,6 +64,8 @@ export class MemoryStore { private readonly weights: RecallWeights; private readonly halfLifeSeconds: number; private readonly maxItemsPerScope?: number; + private readonly discovery: MemoryDiscovery | null; + private discoveryReady: Promise | null = null; private dims?: number; constructor(options: MemoryStoreOptions) { @@ -61,6 +76,38 @@ export class MemoryStore { this.weights = options.weights ?? DEFAULT_WEIGHTS; this.halfLifeSeconds = options.halfLifeSeconds ?? DEFAULT_HALF_LIFE_SECONDS; this.maxItemsPerScope = options.maxItemsPerScope; + this.discovery = this.createDiscovery(options.discovery); + } + + private createDiscovery(config?: boolean | MemoryDiscoveryConfig): MemoryDiscovery | null { + if (!config) { + return null; + } + const settings = config === true ? {} : config; + const discovery = new MemoryDiscovery({ + client: this.client, + name: this.name, + version: settings.version ?? packageVersion(), + statsKey: `${this.name}:__mem_stats`, + heartbeatIntervalMs: settings.heartbeatIntervalMs, + }); + // Registration is fire-and-forget so construction stays synchronous; + // close() awaits it before tearing the marker down. The floating catch + // keeps any rejected registration from surfacing as an unhandled rejection + // when close() is never called. + const ready = discovery.register(); + ready.catch(() => undefined); + this.discoveryReady = ready; + return discovery; + } + + async close(): Promise { + if (this.discoveryReady) { + await this.discoveryReady.catch(() => undefined); + } + if (this.discovery) { + await this.discovery.stop({ deleteHeartbeat: true }); + } } async recall(query: string, options: RecallOptions = {}): Promise { diff --git a/packages/agent-memory/src/__tests__/MemoryStore.discovery.test.ts b/packages/agent-memory/src/__tests__/MemoryStore.discovery.test.ts new file mode 100644 index 00000000..a8e237d7 --- /dev/null +++ b/packages/agent-memory/src/__tests__/MemoryStore.discovery.test.ts @@ -0,0 +1,40 @@ +import { describe, it, expect } from 'vitest'; +import { REGISTRY_KEY, HEARTBEAT_KEY_PREFIX } from '@betterdb/agent-cache'; +import { MemoryStore } from '../MemoryStore'; +import { fakeEmbed } from './helpers/fakeEmbed'; +import { mockClient } from './helpers/mockClient'; + +const HEARTBEAT_KEY = `${HEARTBEAT_KEY_PREFIX}mem:mem`; + +describe('MemoryStore discovery wiring', () => { + it('registers a discovery marker on construct when discovery is enabled', async () => { + const client = mockClient((command) => (command === 'HGET' ? null : 'OK')); + const store = new MemoryStore({ + client, + name: 'mem', + embedFn: fakeEmbed(8), + discovery: { version: '1.0.0', heartbeatIntervalMs: 999_999 }, + }); + + await store.close(); + + const hset = client.call.mock.calls.find((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY); + expect(hset?.[2]).toBe('mem:mem'); + const marker = JSON.parse(hset?.[3] as string); + expect(marker.type).toBe('agent_memory'); + expect(marker.stats_key).toBe('mem:__mem_stats'); + const del = client.call.mock.calls.find((c) => c[0] === 'DEL' && c[1] === HEARTBEAT_KEY); + expect(del).toBeDefined(); + }); + + it('does not touch the registry when discovery is not enabled', async () => { + const client = mockClient(() => 'OK'); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + await store.close(); + + expect(client.call.mock.calls.some((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY)).toBe( + false, + ); + }); +}); diff --git a/packages/agent-memory/src/__tests__/discovery.test.ts b/packages/agent-memory/src/__tests__/discovery.test.ts new file mode 100644 index 00000000..6ad56913 --- /dev/null +++ b/packages/agent-memory/src/__tests__/discovery.test.ts @@ -0,0 +1,180 @@ +import { describe, it, expect, vi } from 'vitest'; +import { + REGISTRY_KEY, + PROTOCOL_KEY, + HEARTBEAT_KEY_PREFIX, + PROTOCOL_VERSION, +} from '@betterdb/agent-cache'; +import { MemoryDiscovery } from '../discovery'; +import { mockClient } from './helpers/mockClient'; + +const HEARTBEAT_KEY = `${HEARTBEAT_KEY_PREFIX}mem:mem`; + +function freshClient() { + return mockClient((command) => (command === 'HGET' ? null : 'OK')); +} + +function makeDiscovery(client: ReturnType, overrides = {}) { + return new MemoryDiscovery({ + client, + name: 'mem', + version: '1.2.3', + statsKey: 'mem:__mem_stats', + heartbeatIntervalMs: 999_999, + ...overrides, + }); +} + +describe('MemoryDiscovery', () => { + it('registers an agent_memory marker with the memory capabilities and stats key', async () => { + const client = freshClient(); + const disco = makeDiscovery(client); + + await disco.register(); + await disco.stop({ deleteHeartbeat: false }); + + const hset = client.call.mock.calls.find((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY); + expect(hset?.[2]).toBe('mem:mem'); + const marker = JSON.parse(hset?.[3] as string); + expect(marker.type).toBe('agent_memory'); + expect(marker.prefix).toBe('mem'); + expect(marker.version).toBe('1.2.3'); + expect(marker.protocol_version).toBe(PROTOCOL_VERSION); + expect(marker.capabilities).toEqual(['recall', 'consolidate', 'reinforce']); + expect(marker.stats_key).toBe('mem:__mem_stats'); + }); + + it('sets the protocol key with NX and writes a heartbeat with a TTL', async () => { + const client = freshClient(); + const disco = makeDiscovery(client); + + await disco.register(); + await disco.stop({ deleteHeartbeat: false }); + + const sets = client.call.mock.calls.filter((c) => c[0] === 'SET'); + expect(sets.some((c) => c[1] === PROTOCOL_KEY && c[3] === 'NX')).toBe(true); + const heartbeat = sets.find((c) => c[1] === HEARTBEAT_KEY); + expect(heartbeat?.[3]).toBe('EX'); + expect(heartbeat?.[4]).toBe('60'); + }); + + it('warns (visibly) and overwrites on a collision with a different cache type', async () => { + const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined); + const client = mockClient((command) => + command === 'HGET' ? JSON.stringify({ type: 'agent_cache' }) : 'OK', + ); + const disco = makeDiscovery(client); + + // Registration must not reject into the swallowed promise; the collision is + // surfaced via a visible warning and registration proceeds last-writer-wins. + await expect(disco.register()).resolves.toBeUndefined(); + expect(warn).toHaveBeenCalledTimes(1); + expect(warn.mock.calls[0][0]).toMatch(/marker/i); + expect(client.call.mock.calls.some((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY)).toBe(true); + warn.mockRestore(); + }); + + it('overwrites an existing marker of the same type without throwing', async () => { + const client = mockClient((command) => + command === 'HGET' ? JSON.stringify({ type: 'agent_memory', version: '0.0.1' }) : 'OK', + ); + const disco = makeDiscovery(client); + + await disco.register(); + await disco.stop({ deleteHeartbeat: false }); + + expect(client.call.mock.calls.some((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY)).toBe(true); + }); + + it('deletes the heartbeat key on stop when asked', async () => { + const client = freshClient(); + const disco = makeDiscovery(client); + + await disco.register(); + await disco.stop({ deleteHeartbeat: true }); + + expect(client.call.mock.calls.some((c) => c[0] === 'DEL' && c[1] === HEARTBEAT_KEY)).toBe(true); + }); + + it('re-writes the heartbeat and marker on tickHeartbeat', async () => { + const client = freshClient(); + const disco = makeDiscovery(client); + await disco.register(); + const before = client.call.mock.calls.length; + + await disco.tickHeartbeat(); + await disco.stop({ deleteHeartbeat: false }); + + const after = client.call.mock.calls.slice(before); + expect(after.some((c) => c[0] === 'SET' && c[1] === HEARTBEAT_KEY)).toBe(true); + expect(after.some((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY)).toBe(true); + }); + + it('heartbeats on the configured interval', async () => { + vi.useFakeTimers(); + try { + const client = freshClient(); + const disco = makeDiscovery(client, { heartbeatIntervalMs: 1000 }); + await disco.register(); + const before = client.call.mock.calls.filter((c) => c[0] === 'HSET').length; + + await vi.advanceTimersByTimeAsync(1000); + + const after = client.call.mock.calls.filter((c) => c[0] === 'HSET').length; + expect(after).toBeGreaterThan(before); + await disco.stop({ deleteHeartbeat: false }); + } finally { + vi.useRealTimers(); + } + }); + + it('waits for an in-flight heartbeat tick before deleting, so no write lands after stop', async () => { + vi.useFakeTimers(); + try { + let releaseTick: (value: unknown) => void = () => {}; + let gateArmed = false; + const order: string[] = []; + const client = mockClient((command, ...args) => { + order.push(command); + if (gateArmed && command === 'SET' && args[0] === HEARTBEAT_KEY) { + return new Promise((resolve) => { + releaseTick = resolve; + }); + } + return command === 'HGET' ? null : 'OK'; + }); + const disco = makeDiscovery(client, { heartbeatIntervalMs: 1000 }); + await disco.register(); + + gateArmed = true; + await vi.advanceTimersByTimeAsync(1000); // fire one tick; its heartbeat SET blocks + + const stopP = disco.stop({ deleteHeartbeat: true }); + expect(order.includes('DEL')).toBe(false); // DEL waits behind the in-flight tick + + releaseTick('OK'); + await stopP; + expect(order[order.length - 1]).toBe('DEL'); // DEL is the final write + } finally { + vi.useRealTimers(); + } + }); + + it('never throws when a registry write fails (best-effort)', async () => { + const onWriteFailed = vi.fn(); + const client = mockClient((command) => { + if (command === 'HGET') { + return null; + } + if (command === 'HSET') { + throw new Error('registry boom'); + } + return 'OK'; + }); + const disco = makeDiscovery(client, { onWriteFailed }); + + await expect(disco.register()).resolves.toBeUndefined(); + await disco.stop({ deleteHeartbeat: false }); + expect(onWriteFailed).toHaveBeenCalled(); + }); +}); diff --git a/packages/agent-memory/src/discovery.ts b/packages/agent-memory/src/discovery.ts new file mode 100644 index 00000000..9f197f5c --- /dev/null +++ b/packages/agent-memory/src/discovery.ts @@ -0,0 +1,194 @@ +import { hostname } from 'node:os'; +import { + DEFAULT_HEARTBEAT_INTERVAL_MS, + HEARTBEAT_KEY_PREFIX, + HEARTBEAT_TTL_SECONDS, + PROTOCOL_KEY, + PROTOCOL_VERSION, + REGISTRY_KEY, +} from '@betterdb/agent-cache'; +import type { MemoryStoreClient } from './types'; + +export const MEMORY_CACHE_TYPE = 'agent_memory'; +export const MEMORY_CAPABILITIES = ['recall', 'consolidate', 'reinforce']; + +export interface MemoryMarker { + type: typeof MEMORY_CACHE_TYPE; + prefix: string; + version: string; + protocol_version: number; + capabilities: string[]; + stats_key: string; + started_at: string; + pid?: number; + hostname?: string; +} + +export interface MemoryDiscoveryDeps { + client: MemoryStoreClient; + name: string; + version: string; + statsKey: string; + heartbeatIntervalMs?: number; + onWriteFailed?: () => void; +} + +export class MemoryDiscovery { + private readonly client: MemoryStoreClient; + private readonly name: string; + private readonly version: string; + private readonly statsKey: string; + private readonly heartbeatIntervalMs: number; + private readonly markerField: string; + private readonly heartbeatKey: string; + private readonly startedAt: string; + private readonly onWriteFailed: () => void; + private heartbeatHandle: ReturnType | null = null; + private inFlightTick: Promise | null = null; + + constructor(deps: MemoryDiscoveryDeps) { + this.client = deps.client; + this.name = deps.name; + this.version = deps.version; + this.statsKey = deps.statsKey; + this.heartbeatIntervalMs = deps.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_INTERVAL_MS; + // Namespace the marker under `{name}:mem` so a memory store and an + // agent-cache sharing the same name register distinct registry fields and + // heartbeat keys instead of clobbering each other. + this.markerField = `${deps.name}:mem`; + this.heartbeatKey = `${HEARTBEAT_KEY_PREFIX}${this.markerField}`; + this.startedAt = new Date().toISOString(); + this.onWriteFailed = deps.onWriteFailed ?? (() => {}); + } + + buildMarker(): MemoryMarker { + const marker: MemoryMarker = { + type: MEMORY_CACHE_TYPE, + prefix: this.name, + version: this.version, + protocol_version: PROTOCOL_VERSION, + capabilities: [...MEMORY_CAPABILITIES], + stats_key: this.statsKey, + started_at: this.startedAt, + pid: process.pid, + hostname: hostname(), + }; + return marker; + } + + async register(): Promise { + // HGET-then-HSET is not atomic (TOCTOU); acceptable for best-effort + // discovery — a racing writer just means last-writer-wins on the marker. + const existing = await this.safeHget(); + if (existing !== null) { + this.checkCollision(existing); + } + await this.writeMarker(); + await this.safeCall(() => + this.client.call('SET', PROTOCOL_KEY, String(PROTOCOL_VERSION), 'NX'), + ); + await this.writeHeartbeat(); + this.startHeartbeat(); + } + + async stop(opts: { deleteHeartbeat: boolean }): Promise { + if (this.heartbeatHandle) { + clearInterval(this.heartbeatHandle); + this.heartbeatHandle = null; + } + // Wait out a tick already in flight so its heartbeat/marker writes can't + // land after the DEL below and make the store look alive post-shutdown. + if (this.inFlightTick) { + await this.inFlightTick; + } + if (!opts.deleteHeartbeat) { + return; + } + try { + await this.client.call('DEL', this.heartbeatKey); + } catch { + this.onWriteFailed(); + } + } + + async tickHeartbeat(): Promise { + await this.writeHeartbeat(); + await this.writeMarker(); + // PROTOCOL_KEY is set once in register(); the NX SET is a guaranteed no-op + // on every subsequent tick, so it's not re-issued from the heartbeat path. + } + + private startHeartbeat(): void { + const handle = setInterval(() => { + this.inFlightTick = this.tickHeartbeat() + .catch(() => undefined) + .finally(() => { + this.inFlightTick = null; + }); + }, this.heartbeatIntervalMs); + handle.unref?.(); + this.heartbeatHandle = handle; + } + + private async writeHeartbeat(): Promise { + try { + await this.client.call( + 'SET', + this.heartbeatKey, + new Date().toISOString(), + 'EX', + String(HEARTBEAT_TTL_SECONDS), + ); + } catch { + this.onWriteFailed(); + } + } + + private async writeMarker(): Promise { + let payload: string; + try { + payload = JSON.stringify(this.buildMarker()); + } catch { + this.onWriteFailed(); + return; + } + await this.safeCall(() => this.client.call('HSET', REGISTRY_KEY, this.markerField, payload)); + } + + private async safeHget(): Promise { + try { + const value = await this.client.call('HGET', REGISTRY_KEY, this.markerField); + return value === null || value === undefined ? null : String(value); + } catch { + this.onWriteFailed(); + return null; + } + } + + private async safeCall(fn: () => Promise): Promise { + try { + await fn(); + } catch { + this.onWriteFailed(); + } + } + + private checkCollision(existingJson: string): void { + let parsed: { type?: string }; + try { + parsed = JSON.parse(existingJson) as { type?: string }; + } catch { + return; + } + if (parsed.type && parsed.type !== MEMORY_CACHE_TYPE) { + // Reachable only if a non-agent_memory marker already occupies this field. + // The memory marker lives under `{name}:mem`, distinct from agent-cache's + // `{name}`, so the two tiers never collide here. Surface it with a visible + // warning rather than throwing into a swallowed registration promise; + // registration then proceeds last-writer-wins, matching agent-cache. + console.warn( + `agent-memory discovery: field '${this.markerField}' already holds a '${parsed.type}' marker; overwriting`, + ); + } + } +} diff --git a/packages/agent-memory/src/index.ts b/packages/agent-memory/src/index.ts index bf1a904a..a12bb20c 100644 --- a/packages/agent-memory/src/index.ts +++ b/packages/agent-memory/src/index.ts @@ -1,6 +1,8 @@ export * from '@betterdb/agent-cache'; export { MemoryStore } from './MemoryStore'; -export type { MemoryStoreOptions } from './MemoryStore'; +export type { MemoryStoreOptions, MemoryDiscoveryConfig } from './MemoryStore'; +export { MemoryDiscovery, MEMORY_CACHE_TYPE, MEMORY_CAPABILITIES } from './discovery'; +export type { MemoryDiscoveryDeps, MemoryMarker } from './discovery'; export { AgentMemory } from './AgentMemory'; export type { EmbedFn,