Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions src/daemon/rpc-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/**
* `RpcKnowledgeStore` — client-side Adapter (parent DESIGN §Design
* patterns, "Adapter, client side") that presents the existing
* synchronous `KnowledgeStore` interface while routing every operation
* through the daemon over the L4 wire.
*
* LSP: behaviourally substitutable with the local `MykbStore` behind the
* `KnowledgeStore` interface — including throwing the SAME typed core
* errors, so call sites that `catch (EntryNotFoundError)` keep working
* unchanged. The wire's typed `kind` (contract §6) is reconstructed back
* into the core error class.
*/

import type {
KnowledgeStore,
KnowledgeEntry,
AddFactOptions,
AddDecisionOptions,
AddGotchaOptions,
AddPatternOptions,
AddLinkOptions,
EntryFilter,
} from '../core/types.js';
import {
EntryNotFoundError,
AreaNotFoundError,
EntryValidationError,
WorkspaceNotFoundError,
} from '../core/errors.js';
import { DaemonError } from './errors.js';
import { SyncRpcClient } from './sync-client.js';

function flatten(opts?: Record<string, unknown>): Record<string, unknown> {
const o: Record<string, unknown> = {};
if (!opts) return o;
if (Array.isArray(opts.tags)) o.tags = opts.tags;
if (typeof opts.zone === 'string') o.zone = opts.zone;
const prov = opts.provenance as { source?: string } | undefined;
if (prov?.source) o.source = prov.source;
for (const k of ['why', 'rejected', 'context', 'failed'] as const) {
if (opts[k] !== undefined) o[k] = opts[k];
}
return o;
}

/** Map a wire error kind back onto the core error class (LSP parity). */
function rethrow(e: unknown): never {
if (e instanceof DaemonError) {
switch (e.kind) {
case 'ENTRY_NOT_FOUND':
throw new EntryNotFoundError(e.message);
case 'AREA_NOT_FOUND':
throw new AreaNotFoundError(e.message);
case 'SCHEMA_INVALID':
case 'INVALID_PARAMS':
throw new EntryValidationError(e.message);
case 'WORKSPACE_NOT_FOUND':
throw new WorkspaceNotFoundError(e.message);
default:
throw e;
}
}
throw e;
}

export class RpcKnowledgeStore implements KnowledgeStore {
private readonly client: SyncRpcClient;

constructor(socketPath: string) {
this.client = new SyncRpcClient(socketPath);
}

private call(method: string, params: Record<string, unknown>): unknown {
try {
return this.client.call(method, params);
} catch (e) {
rethrow(e);
}
}

private addId(verb: string, params: Record<string, unknown>): string {
return (this.call(verb, params) as { id: string }).id;
}

addFact(area: string, text: string, options?: AddFactOptions): string {
return this.addId('add_fact', { area, text, ...flatten(options) });
}
addDecision(area: string, text: string, options?: AddDecisionOptions): string {
return this.addId('add_decision', { area, text, ...flatten(options) });
}
addGotcha(area: string, text: string, options?: AddGotchaOptions): string {
return this.addId('add_gotcha', { area, text, ...flatten(options) });
}
addPattern(area: string, text: string, options?: AddPatternOptions): string {
return this.addId('add_pattern', { area, text, ...flatten(options) });
}
addLink(area: string, text: string, url: string, options?: AddLinkOptions): string {
return this.addId('add_link', { area, text, url, ...flatten(options) });
}

updateEntry(area: string, id: string, updates: Partial<KnowledgeEntry>): void {
this.call('update_entry', { area, id, updates });
}
deleteEntry(area: string, id: string): void {
this.call('delete_entry', { area, id });
}
verifyEntry(area: string, id: string): void {
this.call('verify_entry', { area, id });
}
promoteEntry(area: string, id: string): void {
this.call('promote_entry', { area, id });
}
archiveEntry(area: string, id: string): void {
this.call('archive_entry', { area, id });
}

loadArea(area: string, filter?: EntryFilter): KnowledgeEntry[] {
return (this.call('load_area', { area, filter }) as { entries: KnowledgeEntry[] }).entries;
}
search(query: string): KnowledgeEntry[] {
return (this.call('search', { query }) as { entries: KnowledgeEntry[] }).entries;
}
matchAreas(text: string): { area: string; score: number }[] {
return (
this.call('match_areas', { text }) as {
matches: { area: string; score: number }[];
}
).matches;
}
compact(area?: string): void {
this.call('compact', area ? { area } : {});
}

/** Release the worker thread. Not part of KnowledgeStore; lifecycle. */
close(): void {
this.client.close();
}
}
161 changes: 161 additions & 0 deletions src/daemon/sync-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
* Synchronous RPC client — blocking Unix-socket round-trip.
*
* The `KnowledgeStore` / `WorkspaceStorage` interfaces are synchronous
* (`addFact(): string`, not `Promise`). The parent DESIGN's Adapter
* (`RpcMykbStore adapts the EXISTING MykbStore interface`) requires the
* RPC client to present synchronously so there is zero ripple into the
* extension hooks and CLI subcommands (Phases 4–5). Socket I/O in Node is
* async, so a worker thread owns the socket and the caller blocks on
* `Atomics.wait` until the worker signals — the canonical sync-over-async
* bridge (one in-flight request at a time, which is exactly the call
* pattern of the store interface).
*
* Caveat (inherent, not a bug): a synchronous client blocks its own
* event loop while waiting. It therefore MUST NOT share a process/event
* loop with an in-process `MykbDaemon` — the blocked loop would starve
* the daemon and deadlock. In the real v2 topology the daemon is a
* separate host process, so this never arises in production; tests run
* the daemon as a child process for the same reason.
*
* The worker is an inline `eval` worker so it needs no compiled-module
* resolution (a known Worker+TS pain). It re-implements the 4-byte
* length-prefix frame codec — deliberately kept identical to
* `transport.ts` §4.1; a divergence here is a wire bug, covered by the
* scenario tests that exercise this client against the real daemon.
*/

import {
Worker,
MessageChannel,
receiveMessageOnPort,
type MessagePort,
} from 'node:worker_threads';
import { DaemonError, type DaemonErrorKind } from './errors.js';

const WORKER_SRC = `
const { workerData } = require('node:worker_threads');
const net = require('node:net');
const { socketPath, sab, port } = workerData;
const sig = new Int32Array(sab);
let seq = 0;

function encodeFrame(body) {
const payload = Buffer.from(body, 'utf8');
const header = Buffer.allocUnsafe(4);
header.writeUInt32BE(payload.length, 0);
return Buffer.concat([header, payload]);
}

function request(method, params) {
return new Promise((resolve, reject) => {
const c = net.connect(socketPath);
let buf = Buffer.alloc(0);
const id = ++seq;
c.on('connect', () => {
c.write(encodeFrame(JSON.stringify({ jsonrpc: '2.0', id, method, params })));
});
c.on('data', (chunk) => {
buf = buf.length === 0 ? chunk : Buffer.concat([buf, chunk]);
if (buf.length < 4) return;
const len = buf.readUInt32BE(0);
if (buf.length < 4 + len) return;
const body = buf.subarray(4, 4 + len).toString('utf8');
c.end();
try { resolve(JSON.parse(body)); } catch (e) { reject(e); }
});
c.on('error', reject);
});
}

port.on('message', async (req) => {
let reply;
try {
reply = await request(req.method, req.params);
} catch (e) {
reply = { error: { message: String(e && e.message || e), data: { kind: 'BACKEND_UNAVAILABLE' } } };
}
// postMessage BEFORE the Atomics notify so the main thread's
// receiveMessageOnPort finds the reply already queued when it wakes.
port.postMessage(reply);
Atomics.store(sig, 0, 1);
Atomics.notify(sig, 0);
});
`;

interface RpcReply {
result?: unknown;
error?: { message: string; data: { kind: DaemonErrorKind; detail?: Record<string, unknown> } };
}

export class SyncRpcClient {
private readonly worker: Worker;
private readonly port: MessagePort;
private readonly sig: Int32Array;
private closed = false;

private workerError?: Error;

constructor(
socketPath: string,
private readonly timeoutMs = 15_000,
) {
const sab = new SharedArrayBuffer(8);
this.sig = new Int32Array(sab);
const channel = new MessageChannel();
this.port = channel.port1;
this.worker = new Worker(WORKER_SRC, {
eval: true,
workerData: { socketPath, sab, port: channel.port2 },
transferList: [channel.port2],
});
// A worker bootstrap/runtime failure would otherwise deadlock the
// blocked caller forever (Atomics.wait freezes the main thread, so the
// 'error' event can't be processed until the wait times out). Capture
// it so the post-timeout path can surface the real cause.
this.worker.on('error', (e: Error) => {
this.workerError = e;
});
// The worker thread must not keep the process alive on its own.
this.worker.unref();
}

/** Blocking request. Returns the JSON-RPC `result`; throws DaemonError. */
call(method: string, params: Record<string, unknown>): unknown {
if (this.closed) {
throw new DaemonError('BACKEND_UNAVAILABLE', 'RPC client already closed');
}
Atomics.store(this.sig, 0, 0);
this.port.postMessage({ method, params });
const woke = Atomics.wait(this.sig, 0, 0, this.timeoutMs);
if (woke === 'timed-out') {
// Fail-fast (contract §2.3). If the worker died, surface why.
this.close();
throw new DaemonError(
'BACKEND_UNAVAILABLE',
this.workerError
? `RPC worker failed: ${this.workerError.message}`
: `RPC call '${method}' timed out after ${this.timeoutMs}ms`,
);
}
const msg = receiveMessageOnPort(this.port);
if (!msg) {
throw new DaemonError('INTERNAL_ERROR', 'no reply from RPC worker');
}
const reply = msg.message as RpcReply;
if (reply.error) {
throw new DaemonError(
reply.error.data?.kind ?? 'INTERNAL_ERROR',
reply.error.message,
reply.error.data?.detail,
);
}
return reply.result;
}

close(): void {
if (this.closed) return;
this.closed = true;
void this.worker.terminate();
}
}
Loading