Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions packages/agent-cache/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ export type {
TierDefaults,
ConfigRefreshOptions,
} from './types';
export { AgentCacheError, AgentCacheUsageError, ValkeyCommandError } from './errors';
export type { DiscoveryOptions, MarkerMetadata } from './discovery';
export {
AgentCacheError,
AgentCacheUsageError,
ValkeyCommandError,
} from './errors';
export type { DiscoveryOptions } from './discovery';
PROTOCOL_VERSION,
REGISTRY_KEY,
PROTOCOL_KEY,
HEARTBEAT_KEY_PREFIX,
DEFAULT_HEARTBEAT_INTERVAL_MS,
HEARTBEAT_TTL_SECONDS,
} from './discovery';
export type { Analytics } from './analytics';
export type {
ContentBlock,
Expand Down
47 changes: 47 additions & 0 deletions packages/agent-memory/src/MemoryStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
import { parseMemoryItem } from './parseMemoryItem';
import { compositeScore, similarityFromDistance, type RecallWeights } from './compositeScore';
import { selectEvictions, type EvictionCandidate } from './selectEvictions';
import { MemoryDiscovery } from './discovery';
import type {
ConsolidateOptions,
ConsolidateResult,
Expand All @@ -33,6 +34,17 @@ const CONSOLIDATE_SCAN_LIMIT = 10000;
const DEFAULT_SUMMARY_IMPORTANCE = 0.7;
const SUMMARY_SOURCE = 'summary';

// Read lazily so only discovery users pay the disk read on import (and avoid a
// bundler hazard, since package.json is not always emitted).
function packageVersion(): string {
return (require('../package.json') as { version: string }).version;
}

export interface MemoryDiscoveryConfig {
version?: string;
heartbeatIntervalMs?: number;
}

export interface MemoryStoreOptions {
client: MemoryStoreClient;
name: string;
Expand All @@ -41,6 +53,7 @@ export interface MemoryStoreOptions {
weights?: RecallWeights;
halfLifeSeconds?: number;
maxItemsPerScope?: number;
discovery?: boolean | MemoryDiscoveryConfig;
}

export class MemoryStore {
Expand All @@ -51,6 +64,8 @@ export class MemoryStore {
private readonly weights: RecallWeights;
private readonly halfLifeSeconds: number;
private readonly maxItemsPerScope?: number;
private readonly discovery: MemoryDiscovery | null;
private discoveryReady: Promise<void> | null = null;
private dims?: number;

constructor(options: MemoryStoreOptions) {
Expand All @@ -61,6 +76,38 @@ export class MemoryStore {
this.weights = options.weights ?? DEFAULT_WEIGHTS;
this.halfLifeSeconds = options.halfLifeSeconds ?? DEFAULT_HALF_LIFE_SECONDS;
this.maxItemsPerScope = options.maxItemsPerScope;
this.discovery = this.createDiscovery(options.discovery);
}

private createDiscovery(config?: boolean | MemoryDiscoveryConfig): MemoryDiscovery | null {
if (!config) {
return null;
}
const settings = config === true ? {} : config;
const discovery = new MemoryDiscovery({
client: this.client,
name: this.name,
version: settings.version ?? packageVersion(),
statsKey: `${this.name}:__mem_stats`,
heartbeatIntervalMs: settings.heartbeatIntervalMs,
});
// Registration is fire-and-forget so construction stays synchronous;
// close() awaits it before tearing the marker down. The floating catch
// keeps any rejected registration from surfacing as an unhandled rejection
// when close() is never called.
const ready = discovery.register();
ready.catch(() => undefined);
this.discoveryReady = ready;
return discovery;
}

async close(): Promise<void> {
if (this.discoveryReady) {
await this.discoveryReady.catch(() => undefined);
}
if (this.discovery) {
await this.discovery.stop({ deleteHeartbeat: true });
}
}

async recall(query: string, options: RecallOptions = {}): Promise<MemoryHit[]> {
Expand Down
40 changes: 40 additions & 0 deletions packages/agent-memory/src/__tests__/MemoryStore.discovery.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { describe, it, expect } from 'vitest';
import { REGISTRY_KEY, HEARTBEAT_KEY_PREFIX } from '@betterdb/agent-cache';
import { MemoryStore } from '../MemoryStore';
import { fakeEmbed } from './helpers/fakeEmbed';
import { mockClient } from './helpers/mockClient';

const HEARTBEAT_KEY = `${HEARTBEAT_KEY_PREFIX}mem:mem`;

describe('MemoryStore discovery wiring', () => {
it('registers a discovery marker on construct when discovery is enabled', async () => {
const client = mockClient((command) => (command === 'HGET' ? null : 'OK'));
const store = new MemoryStore({
client,
name: 'mem',
embedFn: fakeEmbed(8),
discovery: { version: '1.0.0', heartbeatIntervalMs: 999_999 },
});

await store.close();

const hset = client.call.mock.calls.find((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY);
expect(hset?.[2]).toBe('mem:mem');
const marker = JSON.parse(hset?.[3] as string);
expect(marker.type).toBe('agent_memory');
expect(marker.stats_key).toBe('mem:__mem_stats');
const del = client.call.mock.calls.find((c) => c[0] === 'DEL' && c[1] === HEARTBEAT_KEY);
expect(del).toBeDefined();
});

it('does not touch the registry when discovery is not enabled', async () => {
const client = mockClient(() => 'OK');
const store = new MemoryStore({ client, name: 'mem', embedFn: fakeEmbed(8) });

await store.close();

expect(client.call.mock.calls.some((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY)).toBe(
false,
);
});
});
180 changes: 180 additions & 0 deletions packages/agent-memory/src/__tests__/discovery.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import { describe, it, expect, vi } from 'vitest';
import {
REGISTRY_KEY,
PROTOCOL_KEY,
HEARTBEAT_KEY_PREFIX,
PROTOCOL_VERSION,
} from '@betterdb/agent-cache';
import { MemoryDiscovery } from '../discovery';
import { mockClient } from './helpers/mockClient';

const HEARTBEAT_KEY = `${HEARTBEAT_KEY_PREFIX}mem:mem`;

function freshClient() {
return mockClient((command) => (command === 'HGET' ? null : 'OK'));
}

function makeDiscovery(client: ReturnType<typeof mockClient>, overrides = {}) {
return new MemoryDiscovery({
client,
name: 'mem',
version: '1.2.3',
statsKey: 'mem:__mem_stats',
heartbeatIntervalMs: 999_999,
...overrides,
});
}

describe('MemoryDiscovery', () => {
it('registers an agent_memory marker with the memory capabilities and stats key', async () => {
const client = freshClient();
const disco = makeDiscovery(client);

await disco.register();
await disco.stop({ deleteHeartbeat: false });

const hset = client.call.mock.calls.find((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY);
expect(hset?.[2]).toBe('mem:mem');
const marker = JSON.parse(hset?.[3] as string);
expect(marker.type).toBe('agent_memory');
expect(marker.prefix).toBe('mem');
expect(marker.version).toBe('1.2.3');
expect(marker.protocol_version).toBe(PROTOCOL_VERSION);
expect(marker.capabilities).toEqual(['recall', 'consolidate', 'reinforce']);
expect(marker.stats_key).toBe('mem:__mem_stats');
});

it('sets the protocol key with NX and writes a heartbeat with a TTL', async () => {
const client = freshClient();
const disco = makeDiscovery(client);

await disco.register();
await disco.stop({ deleteHeartbeat: false });

const sets = client.call.mock.calls.filter((c) => c[0] === 'SET');
expect(sets.some((c) => c[1] === PROTOCOL_KEY && c[3] === 'NX')).toBe(true);
const heartbeat = sets.find((c) => c[1] === HEARTBEAT_KEY);
expect(heartbeat?.[3]).toBe('EX');
expect(heartbeat?.[4]).toBe('60');
});

it('warns (visibly) and overwrites on a collision with a different cache type', async () => {
const warn = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
const client = mockClient((command) =>
command === 'HGET' ? JSON.stringify({ type: 'agent_cache' }) : 'OK',
);
const disco = makeDiscovery(client);

// Registration must not reject into the swallowed promise; the collision is
// surfaced via a visible warning and registration proceeds last-writer-wins.
await expect(disco.register()).resolves.toBeUndefined();
expect(warn).toHaveBeenCalledTimes(1);
expect(warn.mock.calls[0][0]).toMatch(/marker/i);
expect(client.call.mock.calls.some((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY)).toBe(true);
warn.mockRestore();
});

it('overwrites an existing marker of the same type without throwing', async () => {
const client = mockClient((command) =>
command === 'HGET' ? JSON.stringify({ type: 'agent_memory', version: '0.0.1' }) : 'OK',
);
const disco = makeDiscovery(client);

await disco.register();
await disco.stop({ deleteHeartbeat: false });

expect(client.call.mock.calls.some((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY)).toBe(true);
});

it('deletes the heartbeat key on stop when asked', async () => {
const client = freshClient();
const disco = makeDiscovery(client);

await disco.register();
await disco.stop({ deleteHeartbeat: true });

expect(client.call.mock.calls.some((c) => c[0] === 'DEL' && c[1] === HEARTBEAT_KEY)).toBe(true);
});

it('re-writes the heartbeat and marker on tickHeartbeat', async () => {
const client = freshClient();
const disco = makeDiscovery(client);
await disco.register();
const before = client.call.mock.calls.length;

await disco.tickHeartbeat();
await disco.stop({ deleteHeartbeat: false });

const after = client.call.mock.calls.slice(before);
expect(after.some((c) => c[0] === 'SET' && c[1] === HEARTBEAT_KEY)).toBe(true);
expect(after.some((c) => c[0] === 'HSET' && c[1] === REGISTRY_KEY)).toBe(true);
});

it('heartbeats on the configured interval', async () => {
vi.useFakeTimers();
try {
const client = freshClient();
const disco = makeDiscovery(client, { heartbeatIntervalMs: 1000 });
await disco.register();
const before = client.call.mock.calls.filter((c) => c[0] === 'HSET').length;

await vi.advanceTimersByTimeAsync(1000);

const after = client.call.mock.calls.filter((c) => c[0] === 'HSET').length;
expect(after).toBeGreaterThan(before);
await disco.stop({ deleteHeartbeat: false });
} finally {
vi.useRealTimers();
}
});

it('waits for an in-flight heartbeat tick before deleting, so no write lands after stop', async () => {
vi.useFakeTimers();
try {
let releaseTick: (value: unknown) => void = () => {};
let gateArmed = false;
const order: string[] = [];
const client = mockClient((command, ...args) => {
order.push(command);
if (gateArmed && command === 'SET' && args[0] === HEARTBEAT_KEY) {
return new Promise((resolve) => {
releaseTick = resolve;
});
}
return command === 'HGET' ? null : 'OK';
});
const disco = makeDiscovery(client, { heartbeatIntervalMs: 1000 });
await disco.register();

gateArmed = true;
await vi.advanceTimersByTimeAsync(1000); // fire one tick; its heartbeat SET blocks

const stopP = disco.stop({ deleteHeartbeat: true });
expect(order.includes('DEL')).toBe(false); // DEL waits behind the in-flight tick

releaseTick('OK');
await stopP;
expect(order[order.length - 1]).toBe('DEL'); // DEL is the final write
} finally {
vi.useRealTimers();
}
});

it('never throws when a registry write fails (best-effort)', async () => {
const onWriteFailed = vi.fn();
const client = mockClient((command) => {
if (command === 'HGET') {
return null;
}
if (command === 'HSET') {
throw new Error('registry boom');
}
return 'OK';
});
const disco = makeDiscovery(client, { onWriteFailed });

await expect(disco.register()).resolves.toBeUndefined();
await disco.stop({ deleteHeartbeat: false });
expect(onWriteFailed).toHaveBeenCalled();
});
});
Loading
Loading