From a35be8e1822db0cf73f9a8d4e60eb6e8a9d41a52 Mon Sep 17 00:00:00 2001 From: vilosource Date: Sat, 16 May 2026 07:01:30 +0300 Subject: [PATCH] =?UTF-8?q?feat(daemon):=20RpcKnowledgeStore=20=E2=80=94?= =?UTF-8?q?=20sync=20client=20Adapter=20over=20the=20wire=20(phase=203)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TDD red→green. Client-side Adapter (parent DESIGN §Design patterns, "Adapter, client side") presenting the EXISTING synchronous KnowledgeStore interface — zero ripple to extension/CLI call sites (decision: sync via worker-thread + Atomics blocking bridge). - sync-client.ts: SyncRpcClient — inline eval-worker owns the socket; caller blocks on Atomics.wait + receiveMessageOnPort; fail-fast timeout → BACKEND_UNAVAILABLE (contract §2.3); worker-error capture so a bootstrap failure surfaces instead of deadlocking. Documents the inherent constraint: a sync client must not share an event loop with an in-process daemon (separate process in real topology). - rpc-store.ts: RpcKnowledgeStore implements KnowledgeStore; reconstructs the wire's typed kind (§6) back into the core error class so callers catching EntryNotFoundError etc. keep working unchanged (LSP). - 5 tests against a REAL daemon child process (faithful v2 topology): sync addFact round-trip, LSP parity vs local MykbStore, typed-error reconstruction, lifecycle verbs, 25 sequential blocking calls. Found+fixed a load-flake: per-test daemon spawns tipped an unrelated concurrency test under vitest cross-file parallelism. Fix: one shared daemon per file, tests namespaced by area — minimal faithful footprint, also faster. Full suite 639/639 stable across 3 consecutive runs. --- src/daemon/rpc-store.ts | 138 ++++++++++++++++++++++++++++ src/daemon/sync-client.ts | 161 +++++++++++++++++++++++++++++++++ tests/daemon/rpc-store.test.ts | 130 ++++++++++++++++++++++++++ 3 files changed, 429 insertions(+) create mode 100644 src/daemon/rpc-store.ts create mode 100644 src/daemon/sync-client.ts create mode 100644 tests/daemon/rpc-store.test.ts diff --git a/src/daemon/rpc-store.ts b/src/daemon/rpc-store.ts new file mode 100644 index 0000000..6b8f6c7 --- /dev/null +++ b/src/daemon/rpc-store.ts @@ -0,0 +1,138 @@ +/** + * `RpcKnowledgeStore` — client-side Adapter (parent DESIGN §Design + * patterns, "Adapter, client side") that presents the existing + * synchronous `KnowledgeStore` interface while routing every operation + * through the daemon over the L4 wire. + * + * LSP: behaviourally substitutable with the local `MykbStore` behind the + * `KnowledgeStore` interface — including throwing the SAME typed core + * errors, so call sites that `catch (EntryNotFoundError)` keep working + * unchanged. The wire's typed `kind` (contract §6) is reconstructed back + * into the core error class. + */ + +import type { + KnowledgeStore, + KnowledgeEntry, + AddFactOptions, + AddDecisionOptions, + AddGotchaOptions, + AddPatternOptions, + AddLinkOptions, + EntryFilter, +} from '../core/types.js'; +import { + EntryNotFoundError, + AreaNotFoundError, + EntryValidationError, + WorkspaceNotFoundError, +} from '../core/errors.js'; +import { DaemonError } from './errors.js'; +import { SyncRpcClient } from './sync-client.js'; + +function flatten(opts?: Record): Record { + const o: Record = {}; + if (!opts) return o; + if (Array.isArray(opts.tags)) o.tags = opts.tags; + if (typeof opts.zone === 'string') o.zone = opts.zone; + const prov = opts.provenance as { source?: string } | undefined; + if (prov?.source) o.source = prov.source; + for (const k of ['why', 'rejected', 'context', 'failed'] as const) { + if (opts[k] !== undefined) o[k] = opts[k]; + } + return o; +} + +/** Map a wire error kind back onto the core error class (LSP parity). */ +function rethrow(e: unknown): never { + if (e instanceof DaemonError) { + switch (e.kind) { + case 'ENTRY_NOT_FOUND': + throw new EntryNotFoundError(e.message); + case 'AREA_NOT_FOUND': + throw new AreaNotFoundError(e.message); + case 'SCHEMA_INVALID': + case 'INVALID_PARAMS': + throw new EntryValidationError(e.message); + case 'WORKSPACE_NOT_FOUND': + throw new WorkspaceNotFoundError(e.message); + default: + throw e; + } + } + throw e; +} + +export class RpcKnowledgeStore implements KnowledgeStore { + private readonly client: SyncRpcClient; + + constructor(socketPath: string) { + this.client = new SyncRpcClient(socketPath); + } + + private call(method: string, params: Record): unknown { + try { + return this.client.call(method, params); + } catch (e) { + rethrow(e); + } + } + + private addId(verb: string, params: Record): string { + return (this.call(verb, params) as { id: string }).id; + } + + addFact(area: string, text: string, options?: AddFactOptions): string { + return this.addId('add_fact', { area, text, ...flatten(options) }); + } + addDecision(area: string, text: string, options?: AddDecisionOptions): string { + return this.addId('add_decision', { area, text, ...flatten(options) }); + } + addGotcha(area: string, text: string, options?: AddGotchaOptions): string { + return this.addId('add_gotcha', { area, text, ...flatten(options) }); + } + addPattern(area: string, text: string, options?: AddPatternOptions): string { + return this.addId('add_pattern', { area, text, ...flatten(options) }); + } + addLink(area: string, text: string, url: string, options?: AddLinkOptions): string { + return this.addId('add_link', { area, text, url, ...flatten(options) }); + } + + updateEntry(area: string, id: string, updates: Partial): void { + this.call('update_entry', { area, id, updates }); + } + deleteEntry(area: string, id: string): void { + this.call('delete_entry', { area, id }); + } + verifyEntry(area: string, id: string): void { + this.call('verify_entry', { area, id }); + } + promoteEntry(area: string, id: string): void { + this.call('promote_entry', { area, id }); + } + archiveEntry(area: string, id: string): void { + this.call('archive_entry', { area, id }); + } + + loadArea(area: string, filter?: EntryFilter): KnowledgeEntry[] { + return (this.call('load_area', { area, filter }) as { entries: KnowledgeEntry[] }).entries; + } + search(query: string): KnowledgeEntry[] { + return (this.call('search', { query }) as { entries: KnowledgeEntry[] }).entries; + } + matchAreas(text: string): { area: string; score: number }[] { + return ( + this.call('match_areas', { text }) as { + matches: { area: string; score: number }[]; + } + ).matches; + } + compact(area?: string): void { + this.call('compact', area ? { area } : {}); + } + + /** Release the worker thread. Not part of KnowledgeStore; lifecycle. */ + close(): void { + this.client.close(); + } +} diff --git a/src/daemon/sync-client.ts b/src/daemon/sync-client.ts new file mode 100644 index 0000000..f66954e --- /dev/null +++ b/src/daemon/sync-client.ts @@ -0,0 +1,161 @@ +/** + * Synchronous RPC client — blocking Unix-socket round-trip. + * + * The `KnowledgeStore` / `WorkspaceStorage` interfaces are synchronous + * (`addFact(): string`, not `Promise`). The parent DESIGN's Adapter + * (`RpcMykbStore adapts the EXISTING MykbStore interface`) requires the + * RPC client to present synchronously so there is zero ripple into the + * extension hooks and CLI subcommands (Phases 4–5). Socket I/O in Node is + * async, so a worker thread owns the socket and the caller blocks on + * `Atomics.wait` until the worker signals — the canonical sync-over-async + * bridge (one in-flight request at a time, which is exactly the call + * pattern of the store interface). + * + * Caveat (inherent, not a bug): a synchronous client blocks its own + * event loop while waiting. It therefore MUST NOT share a process/event + * loop with an in-process `MykbDaemon` — the blocked loop would starve + * the daemon and deadlock. In the real v2 topology the daemon is a + * separate host process, so this never arises in production; tests run + * the daemon as a child process for the same reason. + * + * The worker is an inline `eval` worker so it needs no compiled-module + * resolution (a known Worker+TS pain). It re-implements the 4-byte + * length-prefix frame codec — deliberately kept identical to + * `transport.ts` §4.1; a divergence here is a wire bug, covered by the + * scenario tests that exercise this client against the real daemon. + */ + +import { + Worker, + MessageChannel, + receiveMessageOnPort, + type MessagePort, +} from 'node:worker_threads'; +import { DaemonError, type DaemonErrorKind } from './errors.js'; + +const WORKER_SRC = ` +const { workerData } = require('node:worker_threads'); +const net = require('node:net'); +const { socketPath, sab, port } = workerData; +const sig = new Int32Array(sab); +let seq = 0; + +function encodeFrame(body) { + const payload = Buffer.from(body, 'utf8'); + const header = Buffer.allocUnsafe(4); + header.writeUInt32BE(payload.length, 0); + return Buffer.concat([header, payload]); +} + +function request(method, params) { + return new Promise((resolve, reject) => { + const c = net.connect(socketPath); + let buf = Buffer.alloc(0); + const id = ++seq; + c.on('connect', () => { + c.write(encodeFrame(JSON.stringify({ jsonrpc: '2.0', id, method, params }))); + }); + c.on('data', (chunk) => { + buf = buf.length === 0 ? chunk : Buffer.concat([buf, chunk]); + if (buf.length < 4) return; + const len = buf.readUInt32BE(0); + if (buf.length < 4 + len) return; + const body = buf.subarray(4, 4 + len).toString('utf8'); + c.end(); + try { resolve(JSON.parse(body)); } catch (e) { reject(e); } + }); + c.on('error', reject); + }); +} + +port.on('message', async (req) => { + let reply; + try { + reply = await request(req.method, req.params); + } catch (e) { + reply = { error: { message: String(e && e.message || e), data: { kind: 'BACKEND_UNAVAILABLE' } } }; + } + // postMessage BEFORE the Atomics notify so the main thread's + // receiveMessageOnPort finds the reply already queued when it wakes. + port.postMessage(reply); + Atomics.store(sig, 0, 1); + Atomics.notify(sig, 0); +}); +`; + +interface RpcReply { + result?: unknown; + error?: { message: string; data: { kind: DaemonErrorKind; detail?: Record } }; +} + +export class SyncRpcClient { + private readonly worker: Worker; + private readonly port: MessagePort; + private readonly sig: Int32Array; + private closed = false; + + private workerError?: Error; + + constructor( + socketPath: string, + private readonly timeoutMs = 15_000, + ) { + const sab = new SharedArrayBuffer(8); + this.sig = new Int32Array(sab); + const channel = new MessageChannel(); + this.port = channel.port1; + this.worker = new Worker(WORKER_SRC, { + eval: true, + workerData: { socketPath, sab, port: channel.port2 }, + transferList: [channel.port2], + }); + // A worker bootstrap/runtime failure would otherwise deadlock the + // blocked caller forever (Atomics.wait freezes the main thread, so the + // 'error' event can't be processed until the wait times out). Capture + // it so the post-timeout path can surface the real cause. + this.worker.on('error', (e: Error) => { + this.workerError = e; + }); + // The worker thread must not keep the process alive on its own. + this.worker.unref(); + } + + /** Blocking request. Returns the JSON-RPC `result`; throws DaemonError. */ + call(method: string, params: Record): unknown { + if (this.closed) { + throw new DaemonError('BACKEND_UNAVAILABLE', 'RPC client already closed'); + } + Atomics.store(this.sig, 0, 0); + this.port.postMessage({ method, params }); + const woke = Atomics.wait(this.sig, 0, 0, this.timeoutMs); + if (woke === 'timed-out') { + // Fail-fast (contract §2.3). If the worker died, surface why. + this.close(); + throw new DaemonError( + 'BACKEND_UNAVAILABLE', + this.workerError + ? `RPC worker failed: ${this.workerError.message}` + : `RPC call '${method}' timed out after ${this.timeoutMs}ms`, + ); + } + const msg = receiveMessageOnPort(this.port); + if (!msg) { + throw new DaemonError('INTERNAL_ERROR', 'no reply from RPC worker'); + } + const reply = msg.message as RpcReply; + if (reply.error) { + throw new DaemonError( + reply.error.data?.kind ?? 'INTERNAL_ERROR', + reply.error.message, + reply.error.data?.detail, + ); + } + return reply.result; + } + + close(): void { + if (this.closed) return; + this.closed = true; + void this.worker.terminate(); + } +} diff --git a/tests/daemon/rpc-store.test.ts b/tests/daemon/rpc-store.test.ts new file mode 100644 index 0000000..08f9633 --- /dev/null +++ b/tests/daemon/rpc-store.test.ts @@ -0,0 +1,130 @@ +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import { spawn, execFileSync, type ChildProcess } from 'node:child_process'; +import { fileURLToPath } from 'node:url'; +import * as os from 'node:os'; +import * as path from 'node:path'; +import * as fs from 'node:fs'; +import { initBrain } from '../../src/core/init.js'; +import { createArea } from '../../src/core/area.js'; +import { MykbStore } from '../../src/core/knowledge-store.js'; +import { RpcKnowledgeStore } from '../../src/daemon/rpc-store.js'; +import { EntryNotFoundError } from '../../src/core/errors.js'; + +// Phase 3 — RpcKnowledgeStore is the client-side Adapter behind the +// EXISTING (synchronous) KnowledgeStore interface (decision: sync via a +// worker-thread + Atomics blocking bridge; zero ripple to call sites). +// +// The daemon runs as a SEPARATE PROCESS — both because that is the real +// v2 topology (host daemon, in-container client) and because a +// synchronous client necessarily blocks its own event loop, so an +// in-process daemon sharing that loop would be starved (documented in +// sync-client.ts). +// +// ONE shared daemon + brain for the whole file, tests namespaced by area: +// per-test daemon spawns produced enough process churn to load-flake an +// unrelated concurrency test under vitest's cross-file parallelism. One +// idle daemon is the minimal faithful footprint. + +const repoRoot = path.resolve(fileURLToPath(import.meta.url), '../../..'); +const daemonMain = path.join(repoRoot, 'dist', 'daemon', 'main.js'); + +let brainPath: string; +let socketPath: string; +let child: ChildProcess; +let store: RpcKnowledgeStore; + +beforeAll(async () => { + if (!fs.existsSync(daemonMain)) { + execFileSync('npm', ['run', 'build'], { cwd: repoRoot, stdio: 'inherit' }); + } + const dir = fs.mkdtempSync(path.join(os.tmpdir(), 'mykbd-rpc-')); + brainPath = path.join(dir, 'brain'); + fs.mkdirSync(brainPath); + initBrain(brainPath); + socketPath = path.join(dir, 'd.sock'); + child = spawn(process.execPath, [daemonMain], { + env: { ...process.env, MYKB_DIR: brainPath, MYKB_SOCKET: socketPath }, + stdio: ['ignore', 'ignore', 'pipe'], + }); + await new Promise((resolve, reject) => { + const t = setTimeout(() => reject(new Error('daemon did not start')), 20_000); + const tick = setInterval(() => { + if (fs.existsSync(socketPath)) { + clearInterval(tick); + clearTimeout(t); + resolve(); + } + }, 50); + child.on('error', reject); + }); + store = new RpcKnowledgeStore(socketPath); +}, 60_000); + +afterAll(() => { + store?.close(); + child?.kill('SIGTERM'); +}); + +/** Each test gets its own area in the shared brain (isolation). */ +function area(name: string): string { + createArea(brainPath, name, name, `${name} area`); + return name; +} + +describe('RpcKnowledgeStore — synchronous adapter over the wire', () => { + it('addFact returns an id synchronously and the entry is loadable', () => { + const a = area('add'); + const id = store.addFact(a, 'cgroups isolate processes'); + expect(typeof id).toBe('string'); + expect(id).toMatch(/\w+/); + expect(store.loadArea(a).map((e) => e.text)).toContain('cgroups isolate processes'); + }); + + it('is behaviourally substitutable with the local MykbStore (LSP)', () => { + const a = area('lsp'); + store.addFact(a, 'layered images'); + store.addDecision(a, 'use overlayfs', { why: 'fast' } as never); + + const local = MykbStore.open(brainPath); + try { + expect( + store + .loadArea(a) + .map((e) => e.text) + .sort(), + ).toEqual( + local + .loadArea(a) + .map((e) => e.text) + .sort(), + ); + expect(store.search('layered').map((e) => e.id)).toEqual( + local.search('layered').map((e) => e.id), + ); + expect(store.matchAreas('layered images')).toEqual(local.matchAreas('layered images')); + } finally { + local.close(); + } + }); + + it('reconstructs a typed core error from the wire error envelope', () => { + const a = area('err'); + expect(() => store.updateEntry(a, 'no-such-id', { text: 'x' })).toThrow(EntryNotFoundError); + }); + + it('lifecycle verbs round-trip (verify/promote/archive/delete)', () => { + const a = area('life'); + const id = store.addFact(a, 'ephemeral'); + // daemon default capability is operator → verify allowed + expect(() => store.verifyEntry(a, id)).not.toThrow(); + expect(() => store.promoteEntry(a, id)).not.toThrow(); + store.deleteEntry(a, id); + expect(store.loadArea(a).find((e) => e.id === id)).toBeUndefined(); + }); + + it('survives many sequential blocking calls on one client', () => { + const a = area('many'); + for (let i = 0; i < 25; i++) store.addFact(a, `fact ${i}`); + expect(store.loadArea(a)).toHaveLength(25); + }); +});