Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/app/electron.vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export default defineConfig({
index: resolve(__dirname, 'src/main/index.ts'),
'sync-worker': resolve(__dirname, 'src/main/sync-worker.ts'),
'scan-worker-thread': resolve(__dirname, 'src/main/scan-worker-thread.ts'),
'mutation-worker-thread': resolve(__dirname, 'src/main/mutation-worker-thread.ts'),
},
},
},
Expand Down
45 changes: 44 additions & 1 deletion packages/app/src/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
SPOOL_DIR,
} from '@spool-lab/core'
import { spawnScanWorker, type ScanWorkerProxy } from './scan-worker-proxy.js'
import { spawnMutationWorker, type MutationWorkerProxy } from './mutation-worker-proxy.js'
import { Effect } from 'effect'
import { registerSecurityIpc, registerSecurityReadinessIpc, SECURITY_IPC_CHANNELS, type SecurityReadiness } from './ipc/security.js'
import { loadSecurityPreferences, saveSecurityPreferences } from './securityPreferences.js'
Expand Down Expand Up @@ -113,6 +114,7 @@ let watcher: SpoolWatcher
let acpManager: AcpManager
let isSyncActive = false
let scanWorker: ScanWorkerProxy | null = null
let mutationWorker: MutationWorkerProxy | null = null
let disposeSecurityIpc: (() => void) | null = null
let setSecurityReadiness: ((next: SecurityReadiness) => void) | null = null
let disposeSecurityReadinessIpc: (() => void) | null = null
Expand Down Expand Up @@ -233,6 +235,20 @@ async function bootScanWorker(): Promise<void> {
}
}

/** Bring up the mutation worker — purge / dismiss / undismiss SQL
* runs there so the main process event loop stays unblocked through
* the ~1s tail of bulk operations on large archives. Failure is
* non-fatal: the IPC handlers fall back to running the same SQL
* in-process on the main DB handle, which is the legacy behaviour. */
async function bootMutationWorker(): Promise<void> {
try {
mutationWorker = await spawnMutationWorker(join(__dirname, 'mutation-worker-thread.js'))
} catch (err) {
console.error('[security] mutation worker failed to boot:', err)
mutationWorker = null
}
}

async function shutdownScanWorker(): Promise<void> {
if (disposeSecurityIpc) {
try { disposeSecurityIpc() } catch { /* best effort */ }
Expand All @@ -249,6 +265,12 @@ async function shutdownScanWorker(): Promise<void> {
} catch { /* best effort */ }
scanWorker = null
}
if (mutationWorker) {
try {
await mutationWorker.shutdown()
} catch { /* best effort */ }
mutationWorker = null
}
try { await pfRuntime.stop() } catch { /* best effort */ }
}

Expand Down Expand Up @@ -319,7 +341,16 @@ async function ensureSecurityBooted(): Promise<void> {
setSecurityReadiness?.({ ready: false, reason: 'scanner-unavailable' })
return
}
disposeSecurityIpc = registerSecurityIpc({
// Register IPC immediately after the scan worker is ready so the
// renderer's `security:get-scan-status` polling on first window
// open finds a handler. Mutation-worker boot is deferred and
// plumbed in via `securityIpc.attachMutationWorker` once it
// reports ready — the handlers fall back to in-process SQL in the
// meantime. Without this split the e2e harness saw the
// first-window polling rejected with "No handler registered for
// security:get-scan-status" because both worker boots were
// awaited sequentially before the IPC was registered.
const securityIpc = registerSecurityIpc({
db,
worker: scanWorker,
runPromise: runWithObservability,
Expand All @@ -332,11 +363,23 @@ async function ensureSecurityBooted(): Promise<void> {
})
},
})
disposeSecurityIpc = securityIpc.dispose
setSecurityReadiness?.({ ready: true })
console.log('[security.lifecycle] booted — worker + IPC ready, backfilling')
runWithObservability(scanWorker.backfill()).catch((err) => {
console.error('[security] boot backfill failed:', err)
})

// Mutation worker boots in the background. Until it's ready the
// IPC handlers run their in-process fallback path on the main
// thread — same SQL, same correctness, just no off-main offload.
// On success, attach so subsequent calls route through the worker
// AND start the per-mutation change forwarder.
void bootMutationWorker().then(() => {
if (mutationWorker) {
securityIpc.attachMutationWorker(mutationWorker)
}
})
// If the user enabled PF before this boot, bring the inference window
// up now that the rest of Spool is ready.
if (loadSecurityPreferences().pfEnabled) {
Expand Down
144 changes: 143 additions & 1 deletion packages/app/src/main/ipc/security.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
type ScanStatus,
type FindingsChange,
} from '@spool-lab/core'
import type { MutationWorkerProxy } from '../mutation-worker-proxy.js'

// ─── electron mock ────────────────────────────────────────────────
// vi.hoisted so the stub objects exist before vi.mock evaluates.
Expand Down Expand Up @@ -123,7 +124,11 @@ async function setupFixture(): Promise<Fixture> {
handlers.clear()
sentEvents.length = 0

const dispose = registerSecurityIpc({
// Mutation worker is never attached in the default fixture so the
// existing assertions exercise the in-process fallback path; the
// suite further down attaches a fake proxy and pins the worker-
// delegated path.
const { dispose } = registerSecurityIpc({
db,
worker,
runPromise: <A, E>(eff: Effect.Effect<A, E>) => Effect.runPromise(eff as unknown as Effect.Effect<A>),
Expand Down Expand Up @@ -498,6 +503,143 @@ describe('registerSecurityIpc', () => {
// must broadcast a readiness event so the renderer can switch
// from skeleton → ready or → "Scanner unavailable" banner instead
// of swallowing IPC errors.
// When a mutation worker proxy is wired in, the IPC layer must
// delegate purge / dismiss / undismiss to it (so the synchronous SQL
// runs on the worker thread, not on the main process event loop) AND
// forward the proxy's per-mutation FindingsChange events to the
// renderer via the same EVT_FINDINGS_CHANGED channel the scan worker
// uses — so renderer subscribers can't tell whether a change came
// from a scan or a mutation.
describe('registerSecurityIpc with mutationWorker', () => {
it('routes purge / dismiss / undismiss IPCs through the proxy instead of running SQL in-process', async () => {
handlers.clear()
sentEvents.length = 0
const db = setupDb()
const status: ScanStatus = { queued: 0, scanning: null, backfillRemaining: 0, backfillTotal: 0, manualBurstInFlight: false, currentProfile: 'regex@4' }
const worker: ScanWorker = {
enqueue: () => Effect.void,
rescanAll: () => Effect.sync(() => 0),
backfill: () => Effect.sync(() => 0),
changes: Stream.empty,
statusChanges: Stream.empty,
getStatus: Effect.sync(() => status),
} as ScanWorker

// Track every call so we can assert the right proxy method was
// hit and the in-process path was not.
const calls: Array<{ method: string; args: unknown[] }> = []
const fakeProxy: MutationWorkerProxy = {
purgeFinding: async (id) => { calls.push({ method: 'purgeFinding', args: [id] }); return { findingId: id, sessionId: 1, maskUsed: '[redacted]', purgedAt: 'now' } },
purgeFindings: async (ids) => { calls.push({ method: 'purgeFindings', args: [ids] }); return [] },
purgeEverywhere: async (kind, hash) => { calls.push({ method: 'purgeEverywhere', args: [kind, hash] }); return { results: [], sessionIds: [] } },
dismissFinding: async (id, scope) => { calls.push({ method: 'dismissFinding', args: [id, scope] }); return null },
dismissFindings: async (ids, scope) => { calls.push({ method: 'dismissFindings', args: [ids, scope] }); return [] },
undismissFinding: async (id) => { calls.push({ method: 'undismissFinding', args: [id] }); return null },
changes: Stream.empty,
shutdown: async () => { /* no-op */ },
}

const fakeWindow = {
webContents: { send: (channel: string, payload: unknown) => { sentEvents.push({ channel, payload }) } },
} as unknown as import('electron').BrowserWindow

const { dispose, attachMutationWorker } = registerSecurityIpc({
db,
worker,
runPromise: <A, E>(eff: Effect.Effect<A, E>) => Effect.runPromise(eff as unknown as Effect.Effect<A>),
getMainWindow: () => fakeWindow,
})
// Late-attach the fake proxy — mirrors how production wires the
// mutation worker in after the IPC layer is already live.
attachMutationWorker(fakeProxy)

try {
await invoke('security:purge-finding', 42)
await invoke('security:purge-findings', [1, 2, 3])
await invoke('security:purge-everywhere', { kind: 'api-key', valueHash: 'h' })
await invoke('security:dismiss-finding', { findingId: 7, scope: 'session' })
await invoke('security:dismiss-findings', { findingIds: [8, 9], scope: 'global' })
await invoke('security:undismiss-finding', { findingId: 7 })

expect(calls).toEqual([
{ method: 'purgeFinding', args: [42] },
{ method: 'purgeFindings', args: [[1, 2, 3]] },
{ method: 'purgeEverywhere', args: ['api-key', 'h'] },
{ method: 'dismissFinding', args: [7, 'session'] },
{ method: 'dismissFindings', args: [[8, 9], 'global'] },
{ method: 'undismissFinding', args: [7] },
])

// The proxy's per-mutation publishes ride a separate forwarder
// (a Stream.empty proxy here emits nothing); the in-process
// fallback's manual webContents.send is what would normally
// populate sentEvents during a dismiss. With a real proxy
// those events flow through `mutationWorker.changes` instead
// — covered separately in the next test.
expect(sentEvents.length).toBe(0)
} finally {
dispose()
db.close()
}
})

it('forwards mutation worker change events to EVT_FINDINGS_CHANGED', async () => {
handlers.clear()
sentEvents.length = 0
const db = setupDb()
const status: ScanStatus = { queued: 0, scanning: null, backfillRemaining: 0, backfillTotal: 0, manualBurstInFlight: false, currentProfile: 'regex@4' }
const worker: ScanWorker = {
enqueue: () => Effect.void,
rescanAll: () => Effect.sync(() => 0),
backfill: () => Effect.sync(() => 0),
changes: Stream.empty,
statusChanges: Stream.empty,
getStatus: Effect.sync(() => status),
} as ScanWorker

const mutationPubsub = await Effect.runPromise(PubSub.unbounded<FindingsChange>())
const fakeProxy: MutationWorkerProxy = {
purgeFinding: async () => { throw new Error('not used') },
purgeFindings: async () => [],
purgeEverywhere: async () => ({ results: [], sessionIds: [] }),
dismissFinding: async () => null,
dismissFindings: async () => [],
undismissFinding: async () => null,
changes: Stream.fromPubSub(mutationPubsub),
shutdown: async () => { /* no-op */ },
}

const fakeWindow = {
webContents: { send: (channel: string, payload: unknown) => { sentEvents.push({ channel, payload }) } },
} as unknown as import('electron').BrowserWindow

const { dispose, attachMutationWorker } = registerSecurityIpc({
db,
worker,
runPromise: <A, E>(eff: Effect.Effect<A, E>) => Effect.runPromise(eff as unknown as Effect.Effect<A>),
getMainWindow: () => fakeWindow,
})
attachMutationWorker(fakeProxy)

try {
// The forwarder fiber is forked via Effect.runPromise which is
// async; give it a tick to subscribe to the PubSub before we
// publish. Mirrors the 50ms wait in the worker.changes forwarder
// test above.
await new Promise((r) => setTimeout(r, 50))
const change: FindingsChange = { type: 'state-changed', sessionId: 5, state: 'purged' }
await Effect.runPromise(PubSub.publish(mutationPubsub, change))
await new Promise((r) => setTimeout(r, 50))
expect(sentEvents).toEqual([
{ channel: 'security:evt-findings-changed', payload: change },
])
} finally {
dispose()
db.close()
}
})
})

describe('registerSecurityReadinessIpc', () => {
let fakeWin: { sent: Array<{ channel: string; payload: unknown }> }
let getWindow: () => import('electron').BrowserWindow | null
Expand Down
Loading