From b7e2b464e2e3a5f3ddf81125f630b382f2b69c92 Mon Sep 17 00:00:00 2001 From: jamby77 Date: Wed, 17 Jun 2026 13:12:15 +0300 Subject: [PATCH 1/4] =?UTF-8?q?feat(agent-memory):=20Phase=207=20=E2=80=94?= =?UTF-8?q?=20discovery=20marker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - MemoryDiscovery registers an agent_memory marker on the shared __betterdb:caches registry: type 'agent_memory', prefix, version, capabilities ['recall','consolidate','reinforce'], stats_key - Reuses the agent-cache discovery protocol constants (additively re-exported) so Monitor reads memory markers identically; writes via MemoryStore's .call client (no method-style client needed) - Heartbeat on an unref'd interval with a TTL'd heartbeat key; best-effort writes; name-collision detection against a different cache type - MemoryStore gains an opt-in discovery option and close() to stop the heartbeat and remove the marker - Re-export discovery constants/MarkerMetadata from @betterdb/agent-cache --- packages/agent-cache/src/index.ts | 14 +- packages/agent-memory/src/MemoryStore.ts | 42 +++++ .../__tests__/MemoryStore.discovery.test.ts | 40 ++++ .../src/__tests__/discovery.test.ts | 142 ++++++++++++++ packages/agent-memory/src/discovery.ts | 173 ++++++++++++++++++ packages/agent-memory/src/index.ts | 4 +- 6 files changed, 409 insertions(+), 6 deletions(-) create mode 100644 packages/agent-memory/src/__tests__/MemoryStore.discovery.test.ts create mode 100644 packages/agent-memory/src/__tests__/discovery.test.ts create mode 100644 packages/agent-memory/src/discovery.ts diff --git a/packages/agent-cache/src/index.ts b/packages/agent-cache/src/index.ts index 638766bf..c9938b05 100644 --- a/packages/agent-cache/src/index.ts +++ b/packages/agent-cache/src/index.ts @@ -20,12 +20,16 @@ export type { TierDefaults, ConfigRefreshOptions, } from './types'; +export { AgentCacheError, AgentCacheUsageError, ValkeyCommandError } from './errors'; +export type { DiscoveryOptions, MarkerMetadata } from './discovery'; export { - AgentCacheError, - AgentCacheUsageError, - ValkeyCommandError, -} from './errors'; -export type { DiscoveryOptions } from './discovery'; + PROTOCOL_VERSION, + REGISTRY_KEY, + PROTOCOL_KEY, + HEARTBEAT_KEY_PREFIX, + DEFAULT_HEARTBEAT_INTERVAL_MS, + HEARTBEAT_TTL_SECONDS, +} from './discovery'; export type { Analytics } from './analytics'; export type { ContentBlock, diff --git a/packages/agent-memory/src/MemoryStore.ts b/packages/agent-memory/src/MemoryStore.ts index 0fd47ad9..09e62c18 100644 --- a/packages/agent-memory/src/MemoryStore.ts +++ b/packages/agent-memory/src/MemoryStore.ts @@ -10,6 +10,7 @@ import { import { parseMemoryItem } from './parseMemoryItem'; import { compositeScore, similarityFromDistance, type RecallWeights } from './compositeScore'; import { selectEvictions, type EvictionCandidate } from './selectEvictions'; +import { MemoryDiscovery } from './discovery'; import type { ConsolidateOptions, ConsolidateResult, @@ -32,6 +33,12 @@ const EVICTION_SCAN_LIMIT = 10000; const CONSOLIDATE_SCAN_LIMIT = 10000; const DEFAULT_SUMMARY_IMPORTANCE = 0.7; const SUMMARY_SOURCE = 'summary'; +const PACKAGE_VERSION = (require('../package.json') as { version: string }).version; + +export interface MemoryDiscoveryConfig { + version?: string; + heartbeatIntervalMs?: number; +} export interface MemoryStoreOptions { client: MemoryStoreClient; @@ -41,6 +48,7 @@ export interface MemoryStoreOptions { weights?: RecallWeights; halfLifeSeconds?: number; maxItemsPerScope?: number; + discovery?: boolean | MemoryDiscoveryConfig; } export class MemoryStore { @@ -51,6 +59,8 @@ export class MemoryStore { private readonly weights: RecallWeights; private readonly halfLifeSeconds: number; private readonly maxItemsPerScope?: number; + private readonly discovery: MemoryDiscovery | null; + private discoveryReady: Promise | null = null; private dims?: number; constructor(options: MemoryStoreOptions) { @@ -61,6 +71,38 @@ export class MemoryStore { this.weights = options.weights ?? DEFAULT_WEIGHTS; this.halfLifeSeconds = options.halfLifeSeconds ?? DEFAULT_HALF_LIFE_SECONDS; this.maxItemsPerScope = options.maxItemsPerScope; + this.discovery = this.createDiscovery(options.discovery); + } + + private createDiscovery(config?: boolean | MemoryDiscoveryConfig): MemoryDiscovery | null { + if (!config) { + return null; + } + const settings = config === true ? {} : config; + const discovery = new MemoryDiscovery({ + client: this.client, + name: this.name, + version: settings.version ?? PACKAGE_VERSION, + statsKey: `${this.name}:__mem_stats`, + heartbeatIntervalMs: settings.heartbeatIntervalMs, + }); + // Registration is fire-and-forget so construction stays synchronous; + // close() awaits it before tearing the marker down. The floating catch + // keeps a rejected registration (e.g. a name collision) from surfacing as + // an unhandled rejection when close() is never called. + const ready = discovery.register(); + ready.catch(() => undefined); + this.discoveryReady = ready; + return discovery; + } + + async close(): Promise { + if (this.discoveryReady) { + await this.discoveryReady.catch(() => undefined); + } + if (this.discovery) { + await this.discovery.stop({ deleteHeartbeat: true }); + } } async recall(query: string, options: RecallOptions = {}): Promise { diff --git a/packages/agent-memory/src/__tests__/MemoryStore.discovery.test.ts b/packages/agent-memory/src/__tests__/MemoryStore.discovery.test.ts new file mode 100644 index 00000000..75b2d680 --- /dev/null +++ b/packages/agent-memory/src/__tests__/MemoryStore.discovery.test.ts @@ -0,0 +1,40 @@ +import { describe, it, expect } from 'vitest'; +import { REGISTRY_KEY, HEARTBEAT_KEY_PREFIX } from '@betterdb/agent-cache'; +import { MemoryStore } from '../MemoryStore'; +import { fakeEmbed } from './helpers/fakeEmbed'; +import { mockClient } from './helpers/mockClient'; + +const HEARTBEAT_KEY = `${HEARTBEAT_KEY_PREFIX}mem`; + +describe('MemoryStore discovery wiring', () => { + it('registers a discovery marker on construct when discovery is enabled', async () => { + const client = mockClient((command) => (command === 'HGET' ? null : 'OK')); + const store = new MemoryStore({ + client, + name: 'mem', + embedFn: fakeEmbed(8), + discovery: { version: '1.0.0', heartbeatIntervalMs: 999_999 }, + }); + + await store.close(); + + const hset = client.call.mock.calls.find((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY); + expect(hset?.[2]).toBe('mem'); + const marker = JSON.parse(hset?.[3] as string); + expect(marker.type).toBe('agent_memory'); + expect(marker.stats_key).toBe('mem:__mem_stats'); + const del = client.call.mock.calls.find((c) => c[0] === 'DEL' && c[1] === HEARTBEAT_KEY); + expect(del).toBeDefined(); + }); + + it('does not touch the registry when discovery is not enabled', async () => { + const client = mockClient(() => 'OK'); + const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) }); + + await store.close(); + + expect(client.call.mock.calls.some((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY)).toBe( + false, + ); + }); +}); diff --git a/packages/agent-memory/src/__tests__/discovery.test.ts b/packages/agent-memory/src/__tests__/discovery.test.ts new file mode 100644 index 00000000..2a7eed9f --- /dev/null +++ b/packages/agent-memory/src/__tests__/discovery.test.ts @@ -0,0 +1,142 @@ +import { describe, it, expect, vi } from 'vitest'; +import { + REGISTRY_KEY, + PROTOCOL_KEY, + HEARTBEAT_KEY_PREFIX, + PROTOCOL_VERSION, +} from '@betterdb/agent-cache'; +import { MemoryDiscovery } from '../discovery'; +import { mockClient } from './helpers/mockClient'; + +const HEARTBEAT_KEY = `${HEARTBEAT_KEY_PREFIX}mem`; + +function freshClient() { + return mockClient((command) => (command === 'HGET' ? null : 'OK')); +} + +function makeDiscovery(client: ReturnType, overrides = {}) { + return new MemoryDiscovery({ + client, + name: 'mem', + version: '1.2.3', + statsKey: 'mem:__mem_stats', + heartbeatIntervalMs: 999_999, + ...overrides, + }); +} + +describe('MemoryDiscovery', () => { + it('registers an agent_memory marker with the memory capabilities and stats key', async () => { + const client = freshClient(); + const disco = makeDiscovery(client); + + await disco.register(); + await disco.stop({ deleteHeartbeat: false }); + + const hset = client.call.mock.calls.find((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY); + expect(hset?.[2]).toBe('mem'); + const marker = JSON.parse(hset?.[3] as string); + expect(marker.type).toBe('agent_memory'); + expect(marker.prefix).toBe('mem'); + expect(marker.version).toBe('1.2.3'); + expect(marker.protocol_version).toBe(PROTOCOL_VERSION); + expect(marker.capabilities).toEqual(['recall', 'consolidate', 'reinforce']); + expect(marker.stats_key).toBe('mem:__mem_stats'); + }); + + it('sets the protocol key with NX and writes a heartbeat with a TTL', async () => { + const client = freshClient(); + const disco = makeDiscovery(client); + + await disco.register(); + await disco.stop({ deleteHeartbeat: false }); + + const sets = client.call.mock.calls.filter((c) => c[0] === 'SET'); + expect(sets.some((c) => c[1] === PROTOCOL_KEY && c[3] === 'NX')).toBe(true); + const heartbeat = sets.find((c) => c[1] === HEARTBEAT_KEY); + expect(heartbeat?.[3]).toBe('EX'); + expect(heartbeat?.[4]).toBe('60'); + }); + + it('throws on a name collision with a different cache type', async () => { + const client = mockClient((command) => + command === 'HGET' ? JSON.stringify({ type: 'agent_cache' }) : 'OK', + ); + const disco = makeDiscovery(client); + + await expect(disco.register()).rejects.toThrow(/collision/i); + expect(client.call.mock.calls.some((c) => c[0] === 'HSET')).toBe(false); + }); + + it('overwrites an existing marker of the same type without throwing', async () => { + const client = mockClient((command) => + command === 'HGET' ? JSON.stringify({ type: 'agent_memory', version: '0.0.1' }) : 'OK', + ); + const disco = makeDiscovery(client); + + await disco.register(); + await disco.stop({ deleteHeartbeat: false }); + + expect(client.call.mock.calls.some((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY)).toBe(true); + }); + + it('deletes the heartbeat key on stop when asked', async () => { + const client = freshClient(); + const disco = makeDiscovery(client); + + await disco.register(); + await disco.stop({ deleteHeartbeat: true }); + + expect(client.call.mock.calls.some((c) => c[0] === 'DEL' && c[1] === HEARTBEAT_KEY)).toBe(true); + }); + + it('re-writes the heartbeat and marker on tickHeartbeat', async () => { + const client = freshClient(); + const disco = makeDiscovery(client); + await disco.register(); + const before = client.call.mock.calls.length; + + await disco.tickHeartbeat(); + await disco.stop({ deleteHeartbeat: false }); + + const after = client.call.mock.calls.slice(before); + expect(after.some((c) => c[0] === 'SET' && c[1] === HEARTBEAT_KEY)).toBe(true); + expect(after.some((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY)).toBe(true); + }); + + it('heartbeats on the configured interval', async () => { + vi.useFakeTimers(); + try { + const client = freshClient(); + const disco = makeDiscovery(client, { heartbeatIntervalMs: 1000 }); + await disco.register(); + const before = client.call.mock.calls.filter((c) => c[0] === 'HSET').length; + + await vi.advanceTimersByTimeAsync(1000); + + const after = client.call.mock.calls.filter((c) => c[0] === 'HSET').length; + expect(after).toBeGreaterThan(before); + await disco.stop({ deleteHeartbeat: false }); + } finally { + vi.useRealTimers(); + } + }); + + it('never throws when a registry write fails (best-effort)', async () => { + const onWriteFailed = vi.fn(); + const client = mockClient((command) => { + if (command === 'HGET') { + return null; + } + if (command === 'HSET') { + throw new Error('registry boom'); + } + return 'OK'; + }); + const disco = makeDiscovery(client, { onWriteFailed }); + + await expect(disco.register()).resolves.toBeUndefined(); + await disco.stop({ deleteHeartbeat: false }); + expect(onWriteFailed).toHaveBeenCalled(); + }); +}); diff --git a/packages/agent-memory/src/discovery.ts b/packages/agent-memory/src/discovery.ts new file mode 100644 index 00000000..df143d74 --- /dev/null +++ b/packages/agent-memory/src/discovery.ts @@ -0,0 +1,173 @@ +import { hostname } from 'node:os'; +import { + DEFAULT_HEARTBEAT_INTERVAL_MS, + HEARTBEAT_KEY_PREFIX, + HEARTBEAT_TTL_SECONDS, + PROTOCOL_KEY, + PROTOCOL_VERSION, + REGISTRY_KEY, +} from '@betterdb/agent-cache'; +import type { MemoryStoreClient } from './types'; + +export const MEMORY_CACHE_TYPE = 'agent_memory'; +export const MEMORY_CAPABILITIES = ['recall', 'consolidate', 'reinforce']; + +export interface MemoryMarker { + type: typeof MEMORY_CACHE_TYPE; + prefix: string; + version: string; + protocol_version: number; + capabilities: string[]; + stats_key: string; + started_at: string; + pid?: number; + hostname?: string; +} + +export interface MemoryDiscoveryDeps { + client: MemoryStoreClient; + name: string; + version: string; + statsKey: string; + heartbeatIntervalMs?: number; + onWriteFailed?: () => void; +} + +export class MemoryDiscovery { + private readonly client: MemoryStoreClient; + private readonly name: string; + private readonly version: string; + private readonly statsKey: string; + private readonly heartbeatIntervalMs: number; + private readonly heartbeatKey: string; + private readonly startedAt: string; + private readonly onWriteFailed: () => void; + private heartbeatHandle: ReturnType | null = null; + + constructor(deps: MemoryDiscoveryDeps) { + this.client = deps.client; + this.name = deps.name; + this.version = deps.version; + this.statsKey = deps.statsKey; + this.heartbeatIntervalMs = deps.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_INTERVAL_MS; + this.heartbeatKey = `${HEARTBEAT_KEY_PREFIX}${deps.name}`; + this.startedAt = new Date().toISOString(); + this.onWriteFailed = deps.onWriteFailed ?? (() => {}); + } + + buildMarker(): MemoryMarker { + const marker: MemoryMarker = { + type: MEMORY_CACHE_TYPE, + prefix: this.name, + version: this.version, + protocol_version: PROTOCOL_VERSION, + capabilities: [...MEMORY_CAPABILITIES], + stats_key: this.statsKey, + started_at: this.startedAt, + pid: process.pid, + hostname: hostname(), + }; + return marker; + } + + async register(): Promise { + const existing = await this.safeHget(); + if (existing !== null) { + this.checkCollision(existing); + } + await this.writeMarker(); + await this.safeCall(() => + this.client.call('SET', PROTOCOL_KEY, String(PROTOCOL_VERSION), 'NX'), + ); + await this.writeHeartbeat(); + this.startHeartbeat(); + } + + async stop(opts: { deleteHeartbeat: boolean }): Promise { + if (this.heartbeatHandle) { + clearInterval(this.heartbeatHandle); + this.heartbeatHandle = null; + } + if (!opts.deleteHeartbeat) { + return; + } + try { + await this.client.call('DEL', this.heartbeatKey); + } catch { + this.onWriteFailed(); + } + } + + async tickHeartbeat(): Promise { + await this.writeHeartbeat(); + await this.writeMarker(); + await this.safeCall(() => + this.client.call('SET', PROTOCOL_KEY, String(PROTOCOL_VERSION), 'NX'), + ); + } + + private startHeartbeat(): void { + const handle = setInterval(() => { + void this.tickHeartbeat(); + }, this.heartbeatIntervalMs); + handle.unref?.(); + this.heartbeatHandle = handle; + } + + private async writeHeartbeat(): Promise { + try { + await this.client.call( + 'SET', + this.heartbeatKey, + new Date().toISOString(), + 'EX', + String(HEARTBEAT_TTL_SECONDS), + ); + } catch { + this.onWriteFailed(); + } + } + + private async writeMarker(): Promise { + let payload: string; + try { + payload = JSON.stringify(this.buildMarker()); + } catch { + this.onWriteFailed(); + return; + } + await this.safeCall(() => this.client.call('HSET', REGISTRY_KEY, this.name, payload)); + } + + private async safeHget(): Promise { + try { + const value = await this.client.call('HGET', REGISTRY_KEY, this.name); + return value === null || value === undefined ? null : String(value); + } catch { + this.onWriteFailed(); + return null; + } + } + + private async safeCall(fn: () => Promise): Promise { + try { + await fn(); + } catch { + this.onWriteFailed(); + } + } + + private checkCollision(existingJson: string): void { + let parsed: { type?: string }; + try { + parsed = JSON.parse(existingJson) as { type?: string }; + } catch { + return; + } + if (parsed.type && parsed.type !== MEMORY_CACHE_TYPE) { + throw new Error( + `memory store name collision: '${this.name}' is already registered as type '${parsed.type}' on this Valkey instance`, + ); + } + } +} diff --git a/packages/agent-memory/src/index.ts b/packages/agent-memory/src/index.ts index bf1a904a..a12bb20c 100644 --- a/packages/agent-memory/src/index.ts +++ b/packages/agent-memory/src/index.ts @@ -1,6 +1,8 @@ export * from '@betterdb/agent-cache'; export { MemoryStore } from './MemoryStore'; -export type { MemoryStoreOptions } from './MemoryStore'; +export type { MemoryStoreOptions, MemoryDiscoveryConfig } from './MemoryStore'; +export { MemoryDiscovery, MEMORY_CACHE_TYPE, MEMORY_CAPABILITIES } from './discovery'; +export type { MemoryDiscoveryDeps, MemoryMarker } from './discovery'; export { AgentMemory } from './AgentMemory'; export type { EmbedFn, From 4ac07f9ff8a9cc392b6d62a5f028d58fc3f69f12 Mon Sep 17 00:00:00 2001 From: jamby77 Date: Wed, 17 Jun 2026 17:50:23 +0300 Subject: [PATCH 2/4] fix(agent-memory): namespace the memory discovery marker under {name}:mem Register the agent_memory marker under the {name}:mem registry field and heartbeat key so a memory store and an agent-cache sharing the same name no longer collide on the same __betterdb:caches field / heartbeat key (reported on the AgentMemory facade, which discovers both tiers). --- .../src/__tests__/MemoryStore.discovery.test.ts | 4 ++-- packages/agent-memory/src/__tests__/discovery.test.ts | 4 ++-- packages/agent-memory/src/discovery.ts | 11 ++++++++--- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/packages/agent-memory/src/__tests__/MemoryStore.discovery.test.ts b/packages/agent-memory/src/__tests__/MemoryStore.discovery.test.ts index 75b2d680..a8e237d7 100644 --- a/packages/agent-memory/src/__tests__/MemoryStore.discovery.test.ts +++ b/packages/agent-memory/src/__tests__/MemoryStore.discovery.test.ts @@ -4,7 +4,7 @@ import { MemoryStore } from '../MemoryStore'; import { fakeEmbed } from './helpers/fakeEmbed'; import { mockClient } from './helpers/mockClient'; -const HEARTBEAT_KEY = `${HEARTBEAT_KEY_PREFIX}mem`; +const HEARTBEAT_KEY = `${HEARTBEAT_KEY_PREFIX}mem:mem`; describe('MemoryStore discovery wiring', () => { it('registers a discovery marker on construct when discovery is enabled', async () => { @@ -19,7 +19,7 @@ describe('MemoryStore discovery wiring', () => { await store.close(); const hset = client.call.mock.calls.find((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY); - expect(hset?.[2]).toBe('mem'); + expect(hset?.[2]).toBe('mem:mem'); const marker = JSON.parse(hset?.[3] as string); expect(marker.type).toBe('agent_memory'); expect(marker.stats_key).toBe('mem:__mem_stats'); diff --git a/packages/agent-memory/src/__tests__/discovery.test.ts b/packages/agent-memory/src/__tests__/discovery.test.ts index 2a7eed9f..b985f613 100644 --- a/packages/agent-memory/src/__tests__/discovery.test.ts +++ b/packages/agent-memory/src/__tests__/discovery.test.ts @@ -8,7 +8,7 @@ import { import { MemoryDiscovery } from '../discovery'; import { mockClient } from './helpers/mockClient'; -const HEARTBEAT_KEY = `${HEARTBEAT_KEY_PREFIX}mem`; +const HEARTBEAT_KEY = `${HEARTBEAT_KEY_PREFIX}mem:mem`; function freshClient() { return mockClient((command) => (command === 'HGET' ? null : 'OK')); @@ -34,7 +34,7 @@ describe('MemoryDiscovery', () => { await disco.stop({ deleteHeartbeat: false }); const hset = client.call.mock.calls.find((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY); - expect(hset?.[2]).toBe('mem'); + expect(hset?.[2]).toBe('mem:mem'); const marker = JSON.parse(hset?.[3] as string); expect(marker.type).toBe('agent_memory'); expect(marker.prefix).toBe('mem'); diff --git a/packages/agent-memory/src/discovery.ts b/packages/agent-memory/src/discovery.ts index df143d74..0476141e 100644 --- a/packages/agent-memory/src/discovery.ts +++ b/packages/agent-memory/src/discovery.ts @@ -39,6 +39,7 @@ export class MemoryDiscovery { private readonly version: string; private readonly statsKey: string; private readonly heartbeatIntervalMs: number; + private readonly markerField: string; private readonly heartbeatKey: string; private readonly startedAt: string; private readonly onWriteFailed: () => void; @@ -50,7 +51,11 @@ export class MemoryDiscovery { this.version = deps.version; this.statsKey = deps.statsKey; this.heartbeatIntervalMs = deps.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_INTERVAL_MS; - this.heartbeatKey = `${HEARTBEAT_KEY_PREFIX}${deps.name}`; + // Namespace the marker under `{name}:mem` so a memory store and an + // agent-cache sharing the same name register distinct registry fields and + // heartbeat keys instead of clobbering each other. + this.markerField = `${deps.name}:mem`; + this.heartbeatKey = `${HEARTBEAT_KEY_PREFIX}${this.markerField}`; this.startedAt = new Date().toISOString(); this.onWriteFailed = deps.onWriteFailed ?? (() => {}); } @@ -136,12 +141,12 @@ export class MemoryDiscovery { this.onWriteFailed(); return; } - await this.safeCall(() => this.client.call('HSET', REGISTRY_KEY, this.name, payload)); + await this.safeCall(() => this.client.call('HSET', REGISTRY_KEY, this.markerField, payload)); } private async safeHget(): Promise { try { - const value = await this.client.call('HGET', REGISTRY_KEY, this.name); + const value = await this.client.call('HGET', REGISTRY_KEY, this.markerField); return value === null || value === undefined ? null : String(value); } catch { this.onWriteFailed(); From ffda27a026a4f4ccc9501eb2ff105fc0cd9eb21c Mon Sep 17 00:00:00 2001 From: jamby77 Date: Fri, 19 Jun 2026 12:37:10 +0300 Subject: [PATCH 3/4] fix(agent-memory): make discovery collisions visible and trim heartbeat work MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - a cross-type marker collision now emits a visible console.warn and proceeds last-writer-wins, instead of throwing into a swallowed registration promise (the memory marker lives under {name}:mem, so it never collides with agent-cache's {name} field — the check is purely defensive) - read package.json lazily inside createDiscovery so non-discovery importers don't pay a disk read (and avoid the bundler-emit hazard) at module load - drop the redundant SET protocol NX from every heartbeat tick (no-op after register); note the best-effort HGET->HSET TOCTOU --- packages/agent-memory/src/MemoryStore.ts | 13 +++++++++---- .../agent-memory/src/__tests__/discovery.test.ts | 12 +++++++++--- packages/agent-memory/src/discovery.ts | 16 +++++++++++----- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/packages/agent-memory/src/MemoryStore.ts b/packages/agent-memory/src/MemoryStore.ts index 09e62c18..7717ef43 100644 --- a/packages/agent-memory/src/MemoryStore.ts +++ b/packages/agent-memory/src/MemoryStore.ts @@ -33,7 +33,12 @@ const EVICTION_SCAN_LIMIT = 10000; const CONSOLIDATE_SCAN_LIMIT = 10000; const DEFAULT_SUMMARY_IMPORTANCE = 0.7; const SUMMARY_SOURCE = 'summary'; -const PACKAGE_VERSION = (require('../package.json') as { version: string }).version; + +// Read lazily so only discovery users pay the disk read on import (and avoid a +// bundler hazard, since package.json is not always emitted). +function packageVersion(): string { + return (require('../package.json') as { version: string }).version; +} export interface MemoryDiscoveryConfig { version?: string; @@ -82,14 +87,14 @@ export class MemoryStore { const discovery = new MemoryDiscovery({ client: this.client, name: this.name, - version: settings.version ?? PACKAGE_VERSION, + version: settings.version ?? packageVersion(), statsKey: `${this.name}:__mem_stats`, heartbeatIntervalMs: settings.heartbeatIntervalMs, }); // Registration is fire-and-forget so construction stays synchronous; // close() awaits it before tearing the marker down. The floating catch - // keeps a rejected registration (e.g. a name collision) from surfacing as - // an unhandled rejection when close() is never called. + // keeps any rejected registration from surfacing as an unhandled rejection + // when close() is never called. const ready = discovery.register(); ready.catch(() => undefined); this.discoveryReady = ready; diff --git a/packages/agent-memory/src/__tests__/discovery.test.ts b/packages/agent-memory/src/__tests__/discovery.test.ts index b985f613..87aa3f96 100644 --- a/packages/agent-memory/src/__tests__/discovery.test.ts +++ b/packages/agent-memory/src/__tests__/discovery.test.ts @@ -58,14 +58,20 @@ describe('MemoryDiscovery', () => { expect(heartbeat?.[4]).toBe('60'); }); - it('throws on a name collision with a different cache type', async () => { + it('warns (visibly) and overwrites on a collision with a different cache type', async () => { + const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined); const client = mockClient((command) => command === 'HGET' ? JSON.stringify({ type: 'agent_cache' }) : 'OK', ); const disco = makeDiscovery(client); - await expect(disco.register()).rejects.toThrow(/collision/i); - expect(client.call.mock.calls.some((c) => c[0] === 'HSET')).toBe(false); + // Registration must not reject into the swallowed promise; the collision is + // surfaced via a visible warning and registration proceeds last-writer-wins. + await expect(disco.register()).resolves.toBeUndefined(); + expect(warn).toHaveBeenCalledTimes(1); + expect(warn.mock.calls[0][0]).toMatch(/marker/i); + expect(client.call.mock.calls.some((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY)).toBe(true); + warn.mockRestore(); }); it('overwrites an existing marker of the same type without throwing', async () => { diff --git a/packages/agent-memory/src/discovery.ts b/packages/agent-memory/src/discovery.ts index 0476141e..f8003b6a 100644 --- a/packages/agent-memory/src/discovery.ts +++ b/packages/agent-memory/src/discovery.ts @@ -76,6 +76,8 @@ export class MemoryDiscovery { } async register(): Promise { + // HGET-then-HSET is not atomic (TOCTOU); acceptable for best-effort + // discovery — a racing writer just means last-writer-wins on the marker. const existing = await this.safeHget(); if (existing !== null) { this.checkCollision(existing); @@ -106,9 +108,8 @@ export class MemoryDiscovery { async tickHeartbeat(): Promise { await this.writeHeartbeat(); await this.writeMarker(); - await this.safeCall(() => - this.client.call('SET', PROTOCOL_KEY, String(PROTOCOL_VERSION), 'NX'), - ); + // PROTOCOL_KEY is set once in register(); the NX SET is a guaranteed no-op + // on every subsequent tick, so it's not re-issued from the heartbeat path. } private startHeartbeat(): void { @@ -170,8 +171,13 @@ export class MemoryDiscovery { return; } if (parsed.type && parsed.type !== MEMORY_CACHE_TYPE) { - throw new Error( - `memory store name collision: '${this.name}' is already registered as type '${parsed.type}' on this Valkey instance`, + // Reachable only if a non-agent_memory marker already occupies this field. + // The memory marker lives under `{name}:mem`, distinct from agent-cache's + // `{name}`, so the two tiers never collide here. Surface it with a visible + // warning rather than throwing into a swallowed registration promise; + // registration then proceeds last-writer-wins, matching agent-cache. + console.warn( + `agent-memory discovery: field '${this.markerField}' already holds a '${parsed.type}' marker; overwriting`, ); } } From c83f1e68ed4eda07aaf5e40baa591bb6247ad1c2 Mon Sep 17 00:00:00 2001 From: jamby77 Date: Fri, 19 Jun 2026 14:41:52 +0300 Subject: [PATCH 4/4] fix(agent-memory): await an in-flight heartbeat tick before stop deletes it stop() cleared the interval and DEL'd the heartbeat but didn't wait for a tick already running, so a tick that fired just before close() could re-write the heartbeat/marker after the DEL and make the store look alive post-shutdown. Track the in-flight tick and await it before deleting. --- .../src/__tests__/discovery.test.ts | 32 +++++++++++++++++++ packages/agent-memory/src/discovery.ts | 12 ++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/packages/agent-memory/src/__tests__/discovery.test.ts b/packages/agent-memory/src/__tests__/discovery.test.ts index 87aa3f96..6ad56913 100644 --- a/packages/agent-memory/src/__tests__/discovery.test.ts +++ b/packages/agent-memory/src/__tests__/discovery.test.ts @@ -128,6 +128,38 @@ describe('MemoryDiscovery', () => { } }); + it('waits for an in-flight heartbeat tick before deleting, so no write lands after stop', async () => { + vi.useFakeTimers(); + try { + let releaseTick: (value: unknown) => void = () => {}; + let gateArmed = false; + const order: string[] = []; + const client = mockClient((command, ...args) => { + order.push(command); + if (gateArmed && command === 'SET' && args[0] === HEARTBEAT_KEY) { + return new Promise((resolve) => { + releaseTick = resolve; + }); + } + return command === 'HGET' ? null : 'OK'; + }); + const disco = makeDiscovery(client, { heartbeatIntervalMs: 1000 }); + await disco.register(); + + gateArmed = true; + await vi.advanceTimersByTimeAsync(1000); // fire one tick; its heartbeat SET blocks + + const stopP = disco.stop({ deleteHeartbeat: true }); + expect(order.includes('DEL')).toBe(false); // DEL waits behind the in-flight tick + + releaseTick('OK'); + await stopP; + expect(order[order.length - 1]).toBe('DEL'); // DEL is the final write + } finally { + vi.useRealTimers(); + } + }); + it('never throws when a registry write fails (best-effort)', async () => { const onWriteFailed = vi.fn(); const client = mockClient((command) => { diff --git a/packages/agent-memory/src/discovery.ts b/packages/agent-memory/src/discovery.ts index f8003b6a..9f197f5c 100644 --- a/packages/agent-memory/src/discovery.ts +++ b/packages/agent-memory/src/discovery.ts @@ -44,6 +44,7 @@ export class MemoryDiscovery { private readonly startedAt: string; private readonly onWriteFailed: () => void; private heartbeatHandle: ReturnType | null = null; + private inFlightTick: Promise | null = null; constructor(deps: MemoryDiscoveryDeps) { this.client = deps.client; @@ -95,6 +96,11 @@ export class MemoryDiscovery { clearInterval(this.heartbeatHandle); this.heartbeatHandle = null; } + // Wait out a tick already in flight so its heartbeat/marker writes can't + // land after the DEL below and make the store look alive post-shutdown. + if (this.inFlightTick) { + await this.inFlightTick; + } if (!opts.deleteHeartbeat) { return; } @@ -114,7 +120,11 @@ export class MemoryDiscovery { private startHeartbeat(): void { const handle = setInterval(() => { - void this.tickHeartbeat(); + this.inFlightTick = this.tickHeartbeat() + .catch(() => undefined) + .finally(() => { + this.inFlightTick = null; + }); }, this.heartbeatIntervalMs); handle.unref?.(); this.heartbeatHandle = handle;