diff --git a/packages/app/electron.vite.config.ts b/packages/app/electron.vite.config.ts index 232e9e11..bb03b1d3 100644 --- a/packages/app/electron.vite.config.ts +++ b/packages/app/electron.vite.config.ts @@ -35,6 +35,7 @@ export default defineConfig({ index: resolve(__dirname, 'src/main/index.ts'), 'sync-worker': resolve(__dirname, 'src/main/sync-worker.ts'), 'scan-worker-thread': resolve(__dirname, 'src/main/scan-worker-thread.ts'), + 'mutation-worker-thread': resolve(__dirname, 'src/main/mutation-worker-thread.ts'), }, }, }, diff --git a/packages/app/src/main/index.ts b/packages/app/src/main/index.ts index 97047b1c..17fc7b48 100644 --- a/packages/app/src/main/index.ts +++ b/packages/app/src/main/index.ts @@ -39,6 +39,7 @@ import { SPOOL_DIR, } from '@spool-lab/core' import { spawnScanWorker, type ScanWorkerProxy } from './scan-worker-proxy.js' +import { spawnMutationWorker, type MutationWorkerProxy } from './mutation-worker-proxy.js' import { Effect } from 'effect' import { registerSecurityIpc, registerSecurityReadinessIpc, SECURITY_IPC_CHANNELS, type SecurityReadiness } from './ipc/security.js' import { loadSecurityPreferences, saveSecurityPreferences } from './securityPreferences.js' @@ -113,6 +114,7 @@ let watcher: SpoolWatcher let acpManager: AcpManager let isSyncActive = false let scanWorker: ScanWorkerProxy | null = null +let mutationWorker: MutationWorkerProxy | null = null let disposeSecurityIpc: (() => void) | null = null let setSecurityReadiness: ((next: SecurityReadiness) => void) | null = null let disposeSecurityReadinessIpc: (() => void) | null = null @@ -233,6 +235,20 @@ async function bootScanWorker(): Promise { } } +/** Bring up the mutation worker — purge / dismiss / undismiss SQL + * runs there so the main process event loop stays unblocked through + * the ~1s tail of bulk operations on large archives. Failure is + * non-fatal: the IPC handlers fall back to running the same SQL + * in-process on the main DB handle, which is the legacy behaviour. */ +async function bootMutationWorker(): Promise { + try { + mutationWorker = await spawnMutationWorker(join(__dirname, 'mutation-worker-thread.js')) + } catch (err) { + console.error('[security] mutation worker failed to boot:', err) + mutationWorker = null + } +} + async function shutdownScanWorker(): Promise { if (disposeSecurityIpc) { try { disposeSecurityIpc() } catch { /* best effort */ } @@ -249,6 +265,12 @@ async function shutdownScanWorker(): Promise { } catch { /* best effort */ } scanWorker = null } + if (mutationWorker) { + try { + await mutationWorker.shutdown() + } catch { /* best effort */ } + mutationWorker = null + } try { await pfRuntime.stop() } catch { /* best effort */ } } @@ -319,7 +341,16 @@ async function ensureSecurityBooted(): Promise { setSecurityReadiness?.({ ready: false, reason: 'scanner-unavailable' }) return } - disposeSecurityIpc = registerSecurityIpc({ + // Register IPC immediately after the scan worker is ready so the + // renderer's `security:get-scan-status` polling on first window + // open finds a handler. Mutation-worker boot is deferred and + // plumbed in via `securityIpc.attachMutationWorker` once it + // reports ready — the handlers fall back to in-process SQL in the + // meantime. Without this split the e2e harness saw the + // first-window polling rejected with "No handler registered for + // security:get-scan-status" because both worker boots were + // awaited sequentially before the IPC was registered. + const securityIpc = registerSecurityIpc({ db, worker: scanWorker, runPromise: runWithObservability, @@ -332,11 +363,23 @@ async function ensureSecurityBooted(): Promise { }) }, }) + disposeSecurityIpc = securityIpc.dispose setSecurityReadiness?.({ ready: true }) console.log('[security.lifecycle] booted — worker + IPC ready, backfilling') runWithObservability(scanWorker.backfill()).catch((err) => { console.error('[security] boot backfill failed:', err) }) + + // Mutation worker boots in the background. Until it's ready the + // IPC handlers run their in-process fallback path on the main + // thread — same SQL, same correctness, just no off-main offload. + // On success, attach so subsequent calls route through the worker + // AND start the per-mutation change forwarder. + void bootMutationWorker().then(() => { + if (mutationWorker) { + securityIpc.attachMutationWorker(mutationWorker) + } + }) // If the user enabled PF before this boot, bring the inference window // up now that the rest of Spool is ready. if (loadSecurityPreferences().pfEnabled) { diff --git a/packages/app/src/main/ipc/security.test.ts b/packages/app/src/main/ipc/security.test.ts index 72aa2415..63441bf8 100644 --- a/packages/app/src/main/ipc/security.test.ts +++ b/packages/app/src/main/ipc/security.test.ts @@ -24,6 +24,7 @@ import { type ScanStatus, type FindingsChange, } from '@spool-lab/core' +import type { MutationWorkerProxy } from '../mutation-worker-proxy.js' // ─── electron mock ──────────────────────────────────────────────── // vi.hoisted so the stub objects exist before vi.mock evaluates. @@ -123,7 +124,11 @@ async function setupFixture(): Promise { handlers.clear() sentEvents.length = 0 - const dispose = registerSecurityIpc({ + // Mutation worker is never attached in the default fixture so the + // existing assertions exercise the in-process fallback path; the + // suite further down attaches a fake proxy and pins the worker- + // delegated path. + const { dispose } = registerSecurityIpc({ db, worker, runPromise: (eff: Effect.Effect) => Effect.runPromise(eff as unknown as Effect.Effect), @@ -498,6 +503,143 @@ describe('registerSecurityIpc', () => { // must broadcast a readiness event so the renderer can switch // from skeleton → ready or → "Scanner unavailable" banner instead // of swallowing IPC errors. +// When a mutation worker proxy is wired in, the IPC layer must +// delegate purge / dismiss / undismiss to it (so the synchronous SQL +// runs on the worker thread, not on the main process event loop) AND +// forward the proxy's per-mutation FindingsChange events to the +// renderer via the same EVT_FINDINGS_CHANGED channel the scan worker +// uses — so renderer subscribers can't tell whether a change came +// from a scan or a mutation. +describe('registerSecurityIpc with mutationWorker', () => { + it('routes purge / dismiss / undismiss IPCs through the proxy instead of running SQL in-process', async () => { + handlers.clear() + sentEvents.length = 0 + const db = setupDb() + const status: ScanStatus = { queued: 0, scanning: null, backfillRemaining: 0, backfillTotal: 0, manualBurstInFlight: false, currentProfile: 'regex@4' } + const worker: ScanWorker = { + enqueue: () => Effect.void, + rescanAll: () => Effect.sync(() => 0), + backfill: () => Effect.sync(() => 0), + changes: Stream.empty, + statusChanges: Stream.empty, + getStatus: Effect.sync(() => status), + } as ScanWorker + + // Track every call so we can assert the right proxy method was + // hit and the in-process path was not. + const calls: Array<{ method: string; args: unknown[] }> = [] + const fakeProxy: MutationWorkerProxy = { + purgeFinding: async (id) => { calls.push({ method: 'purgeFinding', args: [id] }); return { findingId: id, sessionId: 1, maskUsed: '[redacted]', purgedAt: 'now' } }, + purgeFindings: async (ids) => { calls.push({ method: 'purgeFindings', args: [ids] }); return [] }, + purgeEverywhere: async (kind, hash) => { calls.push({ method: 'purgeEverywhere', args: [kind, hash] }); return { results: [], sessionIds: [] } }, + dismissFinding: async (id, scope) => { calls.push({ method: 'dismissFinding', args: [id, scope] }); return null }, + dismissFindings: async (ids, scope) => { calls.push({ method: 'dismissFindings', args: [ids, scope] }); return [] }, + undismissFinding: async (id) => { calls.push({ method: 'undismissFinding', args: [id] }); return null }, + changes: Stream.empty, + shutdown: async () => { /* no-op */ }, + } + + const fakeWindow = { + webContents: { send: (channel: string, payload: unknown) => { sentEvents.push({ channel, payload }) } }, + } as unknown as import('electron').BrowserWindow + + const { dispose, attachMutationWorker } = registerSecurityIpc({ + db, + worker, + runPromise: (eff: Effect.Effect) => Effect.runPromise(eff as unknown as Effect.Effect), + getMainWindow: () => fakeWindow, + }) + // Late-attach the fake proxy — mirrors how production wires the + // mutation worker in after the IPC layer is already live. + attachMutationWorker(fakeProxy) + + try { + await invoke('security:purge-finding', 42) + await invoke('security:purge-findings', [1, 2, 3]) + await invoke('security:purge-everywhere', { kind: 'api-key', valueHash: 'h' }) + await invoke('security:dismiss-finding', { findingId: 7, scope: 'session' }) + await invoke('security:dismiss-findings', { findingIds: [8, 9], scope: 'global' }) + await invoke('security:undismiss-finding', { findingId: 7 }) + + expect(calls).toEqual([ + { method: 'purgeFinding', args: [42] }, + { method: 'purgeFindings', args: [[1, 2, 3]] }, + { method: 'purgeEverywhere', args: ['api-key', 'h'] }, + { method: 'dismissFinding', args: [7, 'session'] }, + { method: 'dismissFindings', args: [[8, 9], 'global'] }, + { method: 'undismissFinding', args: [7] }, + ]) + + // The proxy's per-mutation publishes ride a separate forwarder + // (a Stream.empty proxy here emits nothing); the in-process + // fallback's manual webContents.send is what would normally + // populate sentEvents during a dismiss. With a real proxy + // those events flow through `mutationWorker.changes` instead + // — covered separately in the next test. + expect(sentEvents.length).toBe(0) + } finally { + dispose() + db.close() + } + }) + + it('forwards mutation worker change events to EVT_FINDINGS_CHANGED', async () => { + handlers.clear() + sentEvents.length = 0 + const db = setupDb() + const status: ScanStatus = { queued: 0, scanning: null, backfillRemaining: 0, backfillTotal: 0, manualBurstInFlight: false, currentProfile: 'regex@4' } + const worker: ScanWorker = { + enqueue: () => Effect.void, + rescanAll: () => Effect.sync(() => 0), + backfill: () => Effect.sync(() => 0), + changes: Stream.empty, + statusChanges: Stream.empty, + getStatus: Effect.sync(() => status), + } as ScanWorker + + const mutationPubsub = await Effect.runPromise(PubSub.unbounded()) + const fakeProxy: MutationWorkerProxy = { + purgeFinding: async () => { throw new Error('not used') }, + purgeFindings: async () => [], + purgeEverywhere: async () => ({ results: [], sessionIds: [] }), + dismissFinding: async () => null, + dismissFindings: async () => [], + undismissFinding: async () => null, + changes: Stream.fromPubSub(mutationPubsub), + shutdown: async () => { /* no-op */ }, + } + + const fakeWindow = { + webContents: { send: (channel: string, payload: unknown) => { sentEvents.push({ channel, payload }) } }, + } as unknown as import('electron').BrowserWindow + + const { dispose, attachMutationWorker } = registerSecurityIpc({ + db, + worker, + runPromise: (eff: Effect.Effect) => Effect.runPromise(eff as unknown as Effect.Effect), + getMainWindow: () => fakeWindow, + }) + attachMutationWorker(fakeProxy) + + try { + // The forwarder fiber is forked via Effect.runPromise which is + // async; give it a tick to subscribe to the PubSub before we + // publish. Mirrors the 50ms wait in the worker.changes forwarder + // test above. + await new Promise((r) => setTimeout(r, 50)) + const change: FindingsChange = { type: 'state-changed', sessionId: 5, state: 'purged' } + await Effect.runPromise(PubSub.publish(mutationPubsub, change)) + await new Promise((r) => setTimeout(r, 50)) + expect(sentEvents).toEqual([ + { channel: 'security:evt-findings-changed', payload: change }, + ]) + } finally { + dispose() + db.close() + } + }) +}) + describe('registerSecurityReadinessIpc', () => { let fakeWin: { sent: Array<{ channel: string; payload: unknown }> } let getWindow: () => import('electron').BrowserWindow | null diff --git a/packages/app/src/main/ipc/security.ts b/packages/app/src/main/ipc/security.ts index cff5d4a8..55719a41 100644 --- a/packages/app/src/main/ipc/security.ts +++ b/packages/app/src/main/ipc/security.ts @@ -46,6 +46,7 @@ import { } from '../securityPreferences.js' import type { PfCoordinator } from '../security/pf-coordinator.js' import type { PfRuntime } from '../security/pf-runtime.js' +import type { MutationWorkerProxy } from '../mutation-worker-proxy.js' /** Channel name table. Shared via type only with the renderer adapter * (no runtime import — preload uses the strings literally). */ @@ -176,11 +177,41 @@ export interface SecurityIpcDeps { pfRuntime?: PfRuntime | null } +export interface SecurityIpcHandle { + /** Interrupts all forwarder daemons + removes every ipcMain.handle + * registration. Idempotent — fine to call after a failed attach. */ + dispose: () => void + /** Late-bind a mutation worker so the IPC handlers start delegating + * purge / dismiss / undismiss to it, AND fork the change-event + * forwarder onto the worker's `changes` stream. Mutation worker + * boot happens in the background after `registerSecurityIpc` + * returns so the IPC layer is live the moment the scan worker is + * ready — without this split, the e2e harness opens the first + * window before mutation-worker boot completes and `security: + * get-scan-status` rejects with "No handler registered" before + * the worker proxies the call. + * + * Calling twice or with the same proxy is a no-op-ish: the prior + * forwarder is left running (no harm — the previous PubSub + * becomes unreachable and GCs once the proxy reference drops). + * Real-world flow boots one proxy, attaches it, and replaces only + * on teardown / re-boot. */ + attachMutationWorker: (proxy: MutationWorkerProxy) => void +} + /** Register every Security Scan ipcMain.handle and start a background - * fiber that forwards worker change events to the main window. The - * returned disposer interrupts the forwarding fiber. */ -export function registerSecurityIpc(deps: SecurityIpcDeps): () => void { + * fiber that forwards scan-worker change + status events to the main + * window. Mutation-worker boot is intentionally deferred — call + * `attachMutationWorker(proxy)` on the returned handle once the + * worker is ready so the IPC layer becomes live before that boot + * completes. */ +export function registerSecurityIpc(deps: SecurityIpcDeps): SecurityIpcHandle { const { db, worker, runPromise, getMainWindow, pfCoordinator, pfRuntime: hostRuntime, onPfEnabledChanged } = deps + // Closure-local ref read by every mutation handler at call time — + // lets `attachMutationWorker` swap the worker in after IPC has + // already started taking calls. Until it's set the handlers fall + // back to in-process SQL on the main thread. + let currentMutationWorker: MutationWorkerProxy | null = null ipcMain.handle(SECURITY_IPC_CHANNELS.LIST_FINDINGS, (_e, filter: FindingFilter) => listFindings(db, filter), @@ -218,9 +249,22 @@ export function registerSecurityIpc(deps: SecurityIpcDeps): () => void { runPromise(worker.getStatus), ) + // All write paths route through `currentMutationWorker` when present so + // the main event loop stays free during multi-second bulk + // operations on a large archive. The fallback to in-process is + // kept for the rare worker-boot-failure case (and for the IPC + // unit-tests that don't spin up a real worker) — the existing core + // helpers run synchronously on `db` exactly as before. ipcMain.handle( SECURITY_IPC_CHANNELS.DISMISS_FINDING, - (_e, args: { findingId: number; scope: 'session' | 'global' }) => { + async (_e, args: { findingId: number; scope: 'session' | 'global' }) => { + if (currentMutationWorker) { + // Per-event publish lands via the proxy's `changes` stream + // (subscribed below in the forwarder fiber), so no need to + // webContents.send on success here. + await currentMutationWorker.dismissFinding(args.findingId, args.scope) + return { ok: true } + } const sessionId = dismissFinding(db, args.findingId, args.scope, true) if (sessionId != null) { getMainWindow()?.webContents.send(SECURITY_IPC_CHANNELS.EVT_FINDINGS_CHANGED, { @@ -232,7 +276,11 @@ export function registerSecurityIpc(deps: SecurityIpcDeps): () => void { ) ipcMain.handle( SECURITY_IPC_CHANNELS.DISMISS_FINDINGS, - (_e, args: { findingIds: number[]; scope: 'session' | 'global' }) => { + async (_e, args: { findingIds: number[]; scope: 'session' | 'global' }) => { + if (currentMutationWorker) { + await currentMutationWorker.dismissFindings(args.findingIds, args.scope) + return { ok: true } + } const sessionIds = dismissFindings(db, args.findingIds, args.scope) for (const sessionId of sessionIds) { getMainWindow()?.webContents.send(SECURITY_IPC_CHANNELS.EVT_FINDINGS_CHANGED, { @@ -244,7 +292,11 @@ export function registerSecurityIpc(deps: SecurityIpcDeps): () => void { ) ipcMain.handle( SECURITY_IPC_CHANNELS.UNDISMISS_FINDING, - (_e, args: { findingId: number }) => { + async (_e, args: { findingId: number }) => { + if (currentMutationWorker) { + await currentMutationWorker.undismissFinding(args.findingId) + return { ok: true } + } const sessionId = undismissFinding(db, args.findingId) if (sessionId != null) { getMainWindow()?.webContents.send(SECURITY_IPC_CHANNELS.EVT_FINDINGS_CHANGED, { @@ -255,6 +307,7 @@ export function registerSecurityIpc(deps: SecurityIpcDeps): () => void { }, ) ipcMain.handle(SECURITY_IPC_CHANNELS.PURGE_FINDING, async (_e, findingId: number) => { + if (currentMutationWorker) return currentMutationWorker.purgeFinding(findingId) const publish = (change: Parameters infer R ? R : never>['webContents']['send']>[1]) => Effect.sync(() => { getMainWindow()?.webContents.send(SECURITY_IPC_CHANNELS.EVT_FINDINGS_CHANGED, change) @@ -263,6 +316,7 @@ export function registerSecurityIpc(deps: SecurityIpcDeps): () => void { return result }) ipcMain.handle(SECURITY_IPC_CHANNELS.PURGE_FINDINGS, async (_e, findingIds: number[]) => { + if (currentMutationWorker) return currentMutationWorker.purgeFindings(findingIds) const publish = (change: unknown) => Effect.sync(() => { getMainWindow()?.webContents.send(SECURITY_IPC_CHANNELS.EVT_FINDINGS_CHANGED, change) @@ -273,6 +327,10 @@ export function registerSecurityIpc(deps: SecurityIpcDeps): () => void { ipcMain.handle( SECURITY_IPC_CHANNELS.PURGE_EVERYWHERE, async (_e, args: { kind: SensitiveKind; valueHash: string }) => { + if (currentMutationWorker) { + const out = await currentMutationWorker.purgeEverywhere(args.kind, args.valueHash) + return { count: out.results.length, sessionIds: out.sessionIds } + } const publish = (change: unknown) => Effect.sync(() => { getMainWindow()?.webContents.send(SECURITY_IPC_CHANNELS.EVT_FINDINGS_CHANGED, change) @@ -394,6 +452,7 @@ export function registerSecurityIpc(deps: SecurityIpcDeps): () => void { // until explicitly interrupted by the returned disposer below. let forwarderFiber: Fiber.RuntimeFiber | null = null let statusForwarderFiber: Fiber.RuntimeFiber | null = null + let mutationForwarderFiber: Fiber.RuntimeFiber | null = null // Stream consumers wrapped in `Effect.catchAllDefect` — webContents // .send synchronously throws on a destroyed window (e.g. user closed // the main window while a scan was mid-burst). Without this the @@ -433,16 +492,45 @@ export function registerSecurityIpc(deps: SecurityIpcDeps): () => void { }), ).catch(() => { /* fork rejected; ignore (cleanup path) */ }) - return () => { + // Mutation-worker change forwarder is forked when `attachMutationWorker` + // fires, not at registration time, so the IPC layer is live even + // before the worker has finished booting. Stored here so `dispose` + // can interrupt it alongside the scan-worker forwarders. + const dispose = (): void => { if (forwarderFiber) { Effect.runFork(Fiber.interrupt(forwarderFiber)) } if (statusForwarderFiber) { Effect.runFork(Fiber.interrupt(statusForwarderFiber)) } + if (mutationForwarderFiber) { + Effect.runFork(Fiber.interrupt(mutationForwarderFiber)) + } unsubscribePf?.() for (const ch of Object.values(SECURITY_IPC_CHANNELS)) { ipcMain.removeHandler(ch) } } + + const attachMutationWorker = (proxy: MutationWorkerProxy): void => { + currentMutationWorker = proxy + // Mutation worker has its own per-mutation FindingsChange stream; + // forward it onto the SAME EVT_FINDINGS_CHANGED renderer channel + // so consumers can't tell whether the publish came from a scan + // or a purge / dismiss. Until this fork runs, the per-handler + // fallback path sends its own EVT_FINDINGS_CHANGED for the same + // mutations — events still flow, they just take the in-process + // route. + Effect.runPromise( + Effect.gen(function* () { + mutationForwarderFiber = yield* Effect.forkDaemon( + Stream.runForEach(proxy.changes, (change) => + safeSend(SECURITY_IPC_CHANNELS.EVT_FINDINGS_CHANGED, change), + ), + ) + }), + ).catch(() => { /* fork rejected; ignore */ }) + } + + return { dispose, attachMutationWorker } } diff --git a/packages/app/src/main/mutation-worker-proxy.ts b/packages/app/src/main/mutation-worker-proxy.ts new file mode 100644 index 00000000..76f287f8 --- /dev/null +++ b/packages/app/src/main/mutation-worker-proxy.ts @@ -0,0 +1,205 @@ +// Main-process proxy that exposes the security mutation worker as a +// regular set of async functions. IPC handlers in ipc/security.ts +// don't need to know the SQL actually runs in a child thread — they +// call `proxy.purgeFindings(ids)` and get back a promise. Each command +// is round-tripped over postMessage; the per-event `FindingsChange` +// stream is forwarded straight onto a PubSub the existing +// registerSecurityIpc forwarder already subscribes to. +// +// See mutation-worker-thread.ts for the worker side of the protocol. + +import { Worker } from 'node:worker_threads' +import { PubSub, Effect, Stream } from 'effect' +import type { FindingsChange, PurgeResult } from '@spool-lab/core' +import type { SensitiveKind } from '@spool-lab/redact' +import type { + FromWorker, + MutationCommand, + MutationResult, + PurgeErrorWire, + ToWorker, +} from './mutation-worker-thread.js' + +export interface MutationWorkerProxy { + purgeFinding: (findingId: number) => Promise + purgeFindings: (findingIds: number[]) => Promise + purgeEverywhere: ( + kind: SensitiveKind, + valueHash: string, + ) => Promise<{ results: PurgeResult[]; sessionIds: number[] }> + dismissFinding: (findingId: number, scope: 'session' | 'global') => Promise + dismissFindings: (findingIds: number[], scope: 'session' | 'global') => Promise + undismissFinding: (findingId: number) => Promise + /** PubSub the change-event forwarder subscribes to — same shape as + * the scan worker's `worker.changes` stream so the existing + * registerSecurityIpc daemon fiber can fork-and-forget on both. */ + changes: Stream.Stream + /** Asks the worker thread to drain cleanly and waits for `exit`. */ + shutdown: () => Promise +} + +interface PendingCommand { + resolve: (value: MutationResult) => void + reject: (err: Error) => void +} + +/** Custom error type carrying the worker-flattened `PurgeError` shape + * so IPC handlers can branch on `err.reason` (the renderer's existing + * error UI). */ +export class MutationWorkerError extends Error { + public readonly reason: PurgeErrorWire['reason'] | 'unknown' + public readonly findingId: number | undefined + constructor(error: PurgeErrorWire | { reason: 'unknown'; message: string }) { + super('message' in error && typeof error.message === 'string' ? error.message : `mutation worker error: ${error.reason}`) + this.name = 'MutationWorkerError' + this.reason = error.reason + this.findingId = 'findingId' in error ? error.findingId : undefined + } +} + +const BOOT_TIMEOUT_MS = 10_000 + +export async function spawnMutationWorker(workerPath: string): Promise { + const worker = new Worker(workerPath) + const pending = new Map() + let nextReqId = 1 + + const changes = await Effect.runPromise(PubSub.unbounded()) + + function postSafe(msg: ToWorker): void { + try { worker.postMessage(msg) } catch { /* worker gone */ } + } + + function send(payload: MutationCommand): Promise { + const reqId = nextReqId++ + return new Promise((resolve, reject) => { + pending.set(reqId, { resolve, reject }) + try { + worker.postMessage({ type: 'cmd', reqId, payload } satisfies ToWorker) + } catch (err) { + pending.delete(reqId) + reject(err instanceof Error ? err : new Error(String(err))) + } + }) + } + + // Boot handshake — wait for `ready` before resolving. Mirrors the + // scan-worker-proxy contract. + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + worker.off('message', onMessage) + worker.off('error', onError) + worker.terminate().catch(() => { /* nothing to do */ }) + reject(new Error(`mutation worker did not report ready within ${BOOT_TIMEOUT_MS}ms`)) + }, BOOT_TIMEOUT_MS) + function clear(): void { + clearTimeout(timeout) + worker.off('message', onMessage) + worker.off('error', onError) + } + function onMessage(msg: FromWorker): void { + if (msg.type === 'ready') { + clear() + worker.on('message', onSteadyState) + worker.on('error', onSteadyError) + worker.on('exit', onExit) + resolve() + return + } + if (msg.type === 'fatal') { + clear() + reject(new Error(`mutation worker failed during boot: ${msg.error}`)) + } + } + function onError(err: Error): void { + clear() + reject(err) + } + worker.on('message', onMessage) + worker.on('error', onError) + }) + + function onSteadyState(msg: FromWorker): void { + switch (msg.type) { + case 'cmd-result': { + const p = pending.get(msg.reqId) + if (p) { + pending.delete(msg.reqId) + p.resolve(msg.result) + } + return + } + case 'event-change': { + void Effect.runPromise(PubSub.publish(changes, msg.change)).catch((err) => { + console.error('[security] mutation worker change publish failed:', err) + }) + return + } + case 'fatal': + console.error('[security] mutation worker reported fatal:', msg.error) + return + default: + return + } + } + function onSteadyError(err: Error): void { + console.error('[security] mutation worker emitted error event:', err) + } + function onExit(code: number): void { + // Sweep any in-flight requests; without this, the IPC handlers + // that awaited them would hang forever and the renderer would + // see "loading" indefinitely. + const err = new Error(`mutation worker exited with code ${code}`) + for (const p of pending.values()) p.reject(err) + pending.clear() + } + + function unwrap(result: MutationResult): MutationResult & { ok: true } { + if (!result.ok) throw new MutationWorkerError(result.error) + return result + } + + return { + purgeFinding: async (findingId) => { + const out = unwrap(await send({ cmd: 'purgeFinding', findingId })) + if (out.cmd !== 'purgeFinding') throw new Error('mutation worker: cmd mismatch') + return out.result + }, + purgeFindings: async (findingIds) => { + const out = unwrap(await send({ cmd: 'purgeFindings', findingIds })) + if (out.cmd !== 'purgeFindings') throw new Error('mutation worker: cmd mismatch') + return out.results + }, + purgeEverywhere: async (kind, valueHash) => { + const out = unwrap(await send({ cmd: 'purgeEverywhere', kind, valueHash })) + if (out.cmd !== 'purgeEverywhere') throw new Error('mutation worker: cmd mismatch') + return { results: out.results, sessionIds: out.sessionIds } + }, + dismissFinding: async (findingId, scope) => { + const out = unwrap(await send({ cmd: 'dismissFinding', findingId, scope })) + if (out.cmd !== 'dismissFinding') throw new Error('mutation worker: cmd mismatch') + return out.sessionId + }, + dismissFindings: async (findingIds, scope) => { + const out = unwrap(await send({ cmd: 'dismissFindings', findingIds, scope })) + if (out.cmd !== 'dismissFindings') throw new Error('mutation worker: cmd mismatch') + return out.sessionIds + }, + undismissFinding: async (findingId) => { + const out = unwrap(await send({ cmd: 'undismissFinding', findingId })) + if (out.cmd !== 'undismissFinding') throw new Error('mutation worker: cmd mismatch') + return out.sessionId + }, + changes: Stream.fromPubSub(changes), + shutdown: async () => { + postSafe({ type: 'shutdown' }) + await new Promise((resolve) => { + worker.once('exit', () => resolve()) + setTimeout(() => { + worker.terminate().catch(() => { /* nothing to do */ }) + resolve() + }, 3000) + }) + }, + } +} diff --git a/packages/app/src/main/mutation-worker-thread.ts b/packages/app/src/main/mutation-worker-thread.ts new file mode 100644 index 00000000..c4f77d95 --- /dev/null +++ b/packages/app/src/main/mutation-worker-thread.ts @@ -0,0 +1,192 @@ +// Security mutation worker — runs purge / dismiss / undismiss SQL +// in a dedicated worker_thread so the main process event loop stays +// unblocked even during the multi-second tail of bulk operations on +// a large archive (a fresh `purgeFindings(8k)` was ~1.4s post-#346; +// fast enough that we shipped the bug fix but slow enough that +// hover responsiveness and other IPC channels still froze for the +// duration). +// +// Architecture (mirror of scan-worker-thread.ts): +// +// • Opens its own better-sqlite3 handle via getDB(). WAL mode (db.ts) +// lets the main-process read handle plus the scan worker's write +// handle coexist with this thread's write handle — SQLite +// serialises writes via the WAL but never blocks reads. +// • Receives mutation commands on parentPort, runs them via the +// existing core helpers (`purgeFinding`, `dismissFindings`, …), +// and posts back either a typed result or a typed error. +// • Publishes `FindingsChange` events as they happen so main can +// forward them to the renderer without changing the existing +// EVT_FINDINGS_CHANGED contract. +// • A 'shutdown' message closes the Effect Scope cleanly and +// exits 0. Unhandled rejections / exceptions post a 'fatal' +// message and exit(1) — the parent's `worker.on('exit')` handler +// surfaces that as "mutation worker died, fall back to main +// thread" in the IPC layer. + +import { parentPort, threadId } from 'node:worker_threads' +import { Effect } from 'effect' +import { + getDB, + purgeFinding as purgeFindingEff, + purgeFindings as purgeFindingsEff, + purgeEverywhere as purgeEverywhereEff, + dismissFinding, + dismissFindings, + undismissFinding, + type FindingsChange, + type PurgeResult, +} from '@spool-lab/core' +import type { SensitiveKind } from '@spool-lab/redact' +import { + exitToWireResult, + flattenPurgeError, + type PurgeErrorWire, + type WireError, +} from './security/mutation-error-wire.js' + +export type { PurgeErrorWire } + +if (!parentPort) { + throw new Error('mutation-worker-thread.ts is only meant to run as a worker_thread child') +} +const port = parentPort + +export type MutationCommand = + | { cmd: 'purgeFinding'; findingId: number } + | { cmd: 'purgeFindings'; findingIds: number[] } + | { cmd: 'purgeEverywhere'; kind: SensitiveKind; valueHash: string } + | { cmd: 'dismissFinding'; findingId: number; scope: 'session' | 'global' } + | { cmd: 'dismissFindings'; findingIds: number[]; scope: 'session' | 'global' } + | { cmd: 'undismissFinding'; findingId: number } + +export type MutationResult = + | { ok: true; cmd: 'purgeFinding'; result: PurgeResult } + | { ok: true; cmd: 'purgeFindings'; results: PurgeResult[] } + | { ok: true; cmd: 'purgeEverywhere'; results: PurgeResult[]; sessionIds: number[] } + | { ok: true; cmd: 'dismissFinding'; sessionId: number | null } + | { ok: true; cmd: 'dismissFindings'; sessionIds: number[] } + | { ok: true; cmd: 'undismissFinding'; sessionId: number | null } + | { ok: false; error: WireError } + +export type ToWorker = + | { type: 'cmd'; reqId: number; payload: MutationCommand } + | { type: 'shutdown' } + +export type FromWorker = + | { type: 'ready' } + | { type: 'cmd-result'; reqId: number; result: MutationResult } + | { type: 'event-change'; change: FindingsChange } + | { type: 'fatal'; error: string } + +function reportFatal(err: unknown): void { + const error = err instanceof Error ? (err.stack ?? err.message) : String(err) + try { + port.postMessage({ type: 'fatal', error } satisfies FromWorker) + } catch { /* parent gone */ } + process.exit(1) +} + +// Node 22's --unhandled-rejections=strict default takes down the host +// process on any unhandled rejection inside a worker. Capturing both +// channels here converts those into structured 'fatal' messages. +process.on('unhandledRejection', reportFatal) +process.on('uncaughtException', reportFatal) + +console.log('[security] mutation worker thread booted; threadId =', threadId) + +// Main process already migrated the file before spawning this thread; +// skipping here avoids the FTS-trigger DROP/CREATE race between the +// three worker threads (sync, scan, mutation) that all open the same DB. +const db = getDB({ runMigrations: false }) + +function postSafe(msg: FromWorker): void { + try { port.postMessage(msg) } catch { /* parent gone */ } +} + +/** Forward a `FindingsChange` to main as it happens. The core purge + * helpers accept a `publish` callback that fires per touched session; + * we wrap it here so each emission becomes a postMessage. */ +const publish = (change: FindingsChange) => + Effect.sync(() => postSafe({ type: 'event-change', change })) + +async function handle(cmd: MutationCommand): Promise { + switch (cmd.cmd) { + case 'purgeFinding': { + const exit = await Effect.runPromiseExit(purgeFindingEff(cmd.findingId, { db, publish })) + const wire = exitToWireResult(exit, (result) => result) + return wire.ok + ? { ok: true, cmd: 'purgeFinding', result: wire.success } + : { ok: false, error: wire.error } + } + case 'purgeFindings': { + const exit = await Effect.runPromiseExit(purgeFindingsEff(cmd.findingIds, { db, publish })) + const wire = exitToWireResult(exit, (results) => results) + return wire.ok + ? { ok: true, cmd: 'purgeFindings', results: wire.success } + : { ok: false, error: wire.error } + } + case 'purgeEverywhere': { + const exit = await Effect.runPromiseExit(purgeEverywhereEff(cmd.kind, cmd.valueHash, { db, publish })) + const wire = exitToWireResult(exit, (value) => value) + return wire.ok + ? { ok: true, cmd: 'purgeEverywhere', results: wire.success.results, sessionIds: wire.success.sessionIds } + : { ok: false, error: wire.error } + } + case 'dismissFinding': { + const sessionId = dismissFinding(db, cmd.findingId, cmd.scope) + if (sessionId != null) { + postSafe({ + type: 'event-change', + change: { type: 'state-changed', sessionId, findingId: cmd.findingId, state: 'dismissed' }, + }) + } + return { ok: true, cmd: 'dismissFinding', sessionId } + } + case 'dismissFindings': { + const sessionIds = dismissFindings(db, cmd.findingIds, cmd.scope) + for (const sessionId of sessionIds) { + postSafe({ + type: 'event-change', + change: { type: 'state-changed', sessionId, state: 'dismissed' }, + }) + } + return { ok: true, cmd: 'dismissFindings', sessionIds } + } + case 'undismissFinding': { + const sessionId = undismissFinding(db, cmd.findingId) + if (sessionId != null) { + postSafe({ + type: 'event-change', + change: { type: 'state-changed', sessionId, findingId: cmd.findingId, state: 'active' }, + }) + } + return { ok: true, cmd: 'undismissFinding', sessionId } + } + } +} + +port.on('message', (msg: ToWorker) => { + if (msg.type === 'shutdown') { + try { db.close() } catch { /* best effort */ } + process.exit(0) + } + if (msg.type !== 'cmd') return + // Don't await — the per-command Promise is awaited inside `handle`, + // and each response postMessage carries the originating reqId so the + // proxy can route it to the right pending promise. Running them in + // parallel matches the "multiple concurrent IPC actions" reality of + // the renderer (e.g. user opens BlastRadius while a bulk purge is in + // flight). SQLite WAL serialises the actual writes; the JS scheduling + // here just hands the next request to better-sqlite3, which queues. + void handle(msg.payload).then( + (result) => postSafe({ type: 'cmd-result', reqId: msg.reqId, result }), + (err) => postSafe({ + type: 'cmd-result', + reqId: msg.reqId, + result: { ok: false, error: flattenPurgeError(err) }, + }), + ) +}) + +postSafe({ type: 'ready' }) diff --git a/packages/app/src/main/security/mutation-error-wire.test.ts b/packages/app/src/main/security/mutation-error-wire.test.ts new file mode 100644 index 00000000..abde25bc --- /dev/null +++ b/packages/app/src/main/security/mutation-error-wire.test.ts @@ -0,0 +1,137 @@ +// Regression tests for the Effect.Exit → wire transformation in the +// mutation worker. The test file motivating this module is the +// self-reviewed bug on PR #N that was caught only because the human +// reviewer asked the agent to double-check the `Exit.causeOption` +// shape — without these tests, the same regression would slip back in +// silently the next time someone touches the worker. +// +// What we're guarding: +// - Typed `PurgeError` from `@spool-lab/core` must round-trip with +// its `reason` AND `findingId` intact, so the renderer's +// reason-keyed error UI fires for worker-routed calls (which is +// every purge call once the worker is booted). +// - Defects (Effect.die / thrown inside Effect.gen) and interrupts +// must NOT pretend to be typed PurgeErrors. They flatten to a +// `reason: 'unknown'` envelope so the renderer falls back to its +// generic error toast instead of branching on the wrong reason. + +import { describe, it, expect } from 'vitest' +import { Data, Effect } from 'effect' +import { + exitToWireResult, + flattenPurgeError, + unwrapEffectFailure, + type PurgeErrorWire, +} from './mutation-error-wire.js' + +// Local mirror of the core PurgeError tagged class — keeping this +// test file zero-dep on @spool-lab/core so the regression survives +// even if the core package fails to build for unrelated reasons. +class TestPurgeError extends Data.TaggedError('PurgeError')<{ + readonly findingId: number + readonly reason: PurgeErrorWire['reason'] + readonly cause?: unknown +}> {} + +describe('unwrapEffectFailure', () => { + it('returns null for a successful exit', async () => { + const exit = await Effect.runPromiseExit(Effect.succeed(42)) + expect(unwrapEffectFailure(exit)).toBeNull() + }) + + it('unwraps a typed Fail down to the raw E (the bug-fix path)', async () => { + const err = new TestPurgeError({ findingId: 7, reason: 'not-found' }) + const exit = await Effect.runPromiseExit(Effect.fail(err)) + const out = unwrapEffectFailure(exit) + // Bug pre-fix returned a Cause here, not E. Asserting the + // unwrapped instance has the typed fields the renderer reads + // pins the contract. + expect(out).toBeInstanceOf(TestPurgeError) + expect(out).toMatchObject({ findingId: 7, reason: 'not-found' }) + }) + + it('returns null for a defect (Effect.die) — defects are not typed failures', async () => { + const exit = await Effect.runPromiseExit(Effect.die(new Error('boom'))) + expect(unwrapEffectFailure(exit)).toBeNull() + }) + + it('returns null for a thrown synchronous exception inside a generator', async () => { + const exit = await Effect.runPromiseExit( + Effect.gen(function* () { + yield* Effect.sync(() => { throw new Error('oops') }) + }), + ) + expect(unwrapEffectFailure(exit)).toBeNull() + }) +}) + +describe('flattenPurgeError', () => { + it.each([ + 'not-found', + 'already-purged', + 'message-missing', + 'db-failed', + ] as const)('preserves reason=%s and findingId from a typed PurgeError-shaped object', (reason) => { + expect(flattenPurgeError({ reason, findingId: 5 })).toEqual({ reason, findingId: 5 }) + }) + + it('attaches cause.message when present, omits when not', () => { + expect(flattenPurgeError({ reason: 'db-failed', findingId: 9, cause: new Error('disk full') })) + .toEqual({ reason: 'db-failed', findingId: 9, message: 'disk full' }) + expect(flattenPurgeError({ reason: 'db-failed', findingId: 9 })) + .toEqual({ reason: 'db-failed', findingId: 9 }) + }) + + it('downgrades a plain Error to reason=unknown with its message', () => { + expect(flattenPurgeError(new Error('something else'))).toEqual({ + reason: 'unknown', + message: 'something else', + }) + }) + + it('downgrades null / undefined / strings to reason=unknown', () => { + expect(flattenPurgeError(null)).toEqual({ reason: 'unknown', message: 'null' }) + expect(flattenPurgeError(undefined)).toEqual({ reason: 'unknown', message: 'undefined' }) + expect(flattenPurgeError('boom')).toEqual({ reason: 'unknown', message: 'boom' }) + }) + + it('rejects shapes that look close but miss the contract (no findingId, wrong type)', () => { + // Missing findingId — must NOT be accepted as a PurgeError shape. + expect(flattenPurgeError({ reason: 'not-found' })).toEqual({ + reason: 'unknown', + message: expect.any(String), + }) + // findingId of wrong type — must NOT be accepted. + expect(flattenPurgeError({ reason: 'not-found', findingId: '5' })).toEqual({ + reason: 'unknown', + message: expect.any(String), + }) + }) +}) + +describe('exitToWireResult', () => { + it('shapes a successful exit through the per-command success transformer', async () => { + const exit = await Effect.runPromiseExit(Effect.succeed({ findingId: 1, sessionId: 2 })) + const wire = exitToWireResult(exit, (v) => ({ id: v.findingId, session: v.sessionId })) + expect(wire).toEqual({ ok: true, success: { id: 1, session: 2 } }) + }) + + it('shapes a typed failure as { ok: false, error: PurgeErrorWire }', async () => { + const err = new TestPurgeError({ findingId: 12, reason: 'already-purged' }) + const exit = await Effect.runPromiseExit(Effect.fail(err)) + const wire = exitToWireResult(exit, (v) => v) + expect(wire).toEqual({ + ok: false, + error: { reason: 'already-purged', findingId: 12 }, + }) + }) + + it('shapes a defect as { ok: false, error: { reason: "unknown" } }', async () => { + const exit = await Effect.runPromiseExit(Effect.die(new Error('out of memory'))) + const wire = exitToWireResult(exit, (v) => v) + expect(wire).toEqual({ + ok: false, + error: { reason: 'unknown', message: expect.any(String) }, + }) + }) +}) diff --git a/packages/app/src/main/security/mutation-error-wire.ts b/packages/app/src/main/security/mutation-error-wire.ts new file mode 100644 index 00000000..8d1d6658 --- /dev/null +++ b/packages/app/src/main/security/mutation-error-wire.ts @@ -0,0 +1,72 @@ +// Pure helpers that convert the worker thread's `Effect.Exit` results +// into the serialisable `PurgeErrorWire` shape the proxy + IPC layer +// consume. Extracted from `mutation-worker-thread.ts` so the (easy to +// get subtly wrong) Effect-Cause unwrap has its own unit-test surface: +// the previous shape used `Exit.causeOption(exit).value` directly, +// which returns a `Cause` rather than the raw `E` — every typed +// `PurgeError` landed on the wire as `{ reason: 'unknown' }` and the +// renderer's reason-keyed error branches silently died. Catching that +// at test time is much cheaper than catching it at runtime. + +import { Cause, Exit } from 'effect' + +/** Serialisable mirror of `PurgeError` from @spool-lab/core. The core + * type is a tagged class; postMessage strips the prototype, so we + * flatten to plain fields and rebuild selectively on the proxy side + * when the IPC handler needs them. */ +export interface PurgeErrorWire { + reason: 'not-found' | 'already-purged' | 'message-missing' | 'db-failed' + findingId: number + message?: string +} + +export type WireError = PurgeErrorWire | { reason: 'unknown'; message: string } + +/** Unwrap an `Exit.Failure`'s `Cause` down to the typed `E` (or + * `null` when the cause is a defect / interrupt rather than a + * business failure). Two hops: + * + * 1. `Exit.causeOption(exit)` → Option> + * 2. `Cause.failureOption(cause)` → Option + * + * Skipping step 2 was the original bug. */ +export function unwrapEffectFailure(exit: Exit.Exit): E | null { + if (Exit.isSuccess(exit)) return null + const causeOpt = Exit.causeOption(exit) + if (causeOpt._tag !== 'Some') return null + const failOpt = Cause.failureOption(causeOpt.value) + return failOpt._tag === 'Some' ? failOpt.value : null +} + +/** Duck-type a raw failure into the wire shape. Anything that + * doesn't structurally match a `PurgeError` (including `null`, plain + * `Error`, defects, interrupts) degrades to `{ reason: 'unknown' }` + * so the wire schema stays closed even when the Effect runtime hands + * us something exotic. */ +export function flattenPurgeError(err: unknown): WireError { + if ( + err && typeof err === 'object' && 'reason' in err && 'findingId' in err && + typeof (err as { reason: unknown }).reason === 'string' && + typeof (err as { findingId: unknown }).findingId === 'number' + ) { + const e = err as { reason: PurgeErrorWire['reason']; findingId: number; cause?: unknown } + return { + reason: e.reason, + findingId: e.findingId, + ...(e.cause instanceof Error ? { message: e.cause.message } : {}), + } + } + return { reason: 'unknown', message: err instanceof Error ? err.message : String(err) } +} + +/** One-shot Exit → wire helper. `onSuccess` shapes the per-command + * success result so each command can stay strongly typed without + * duplicating the (Exit.isSuccess → Exit.causeOption → Cause.failureOption) + * walk in every case. */ +export function exitToWireResult( + exit: Exit.Exit, + onSuccess: (value: A) => S, +): { ok: true; success: S } | { ok: false; error: WireError } { + if (Exit.isSuccess(exit)) return { ok: true, success: onSuccess(exit.value) } + return { ok: false, error: flattenPurgeError(unwrapEffectFailure(exit)) } +}