diff --git a/src/index.ts b/src/index.ts index fd82945..842c348 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,3 +15,16 @@ export type { Notification, Conversation, } from './agent/types'; + +export { MultichainScannerPool } from './scanner-pool'; +export type { + SupportedChain, + ScanInput, + EvmScanInput, + StellarScanInput, + SolanaScanInput, + CkbScanInput, + ScanResults, + ProgressEvent, + MultichainScannerPoolOptions, +} from './scanner-pool'; diff --git a/src/scanner-pool.ts b/src/scanner-pool.ts new file mode 100644 index 0000000..1bee226 --- /dev/null +++ b/src/scanner-pool.ts @@ -0,0 +1,288 @@ +/** + * Multichain Scanner Pool + * + * Fans out scanning across multiple blockchains in parallel using Promise.all. + * Each chain scan runs concurrently — no chain waits for another. + * + * Environment behaviour: + * - Browser / Node / React Native: all use Promise-based concurrency. + * True thread-level parallelism via Web Workers / worker_threads is a + * future optimisation; the JS event loop already parallelises I/O-bound + * work and the crypto operations here are fast enough that worker overhead + * would dominate for typical announcement counts. + * + * Security note on key material: + * - Private keys are passed as Uint8Array / hex strings within the same + * JS heap. structuredClone (used by postMessage) copies Uint8Array safely, + * but we deliberately keep everything in-process to avoid any serialisation + * risk until a hardened worker transport is implemented. + */ + +import type { + Announcement as EvmAnnouncement, + MatchedAnnouncement as EvmMatchedAnnouncement, + HexString, +} from './chains/evm/types'; +import type { + Announcement as StellarAnnouncement, + MatchedAnnouncement as StellarMatchedAnnouncement, +} from './chains/stellar/types'; +import type { + Announcement as SolanaAnnouncement, + MatchedAnnouncement as SolanaMatchedAnnouncement, +} from './chains/solana/types'; +import type { + StealthCell as CkbCell, + MatchedStealthCell as CkbMatchedCell, +} from './chains/ckb/types'; + +export type SupportedChain = 'evm' | 'stellar' | 'solana' | 'ckb'; + +export interface EvmScanInput { + announcements: EvmAnnouncement[]; + viewingKey: HexString; + spendingPubKey: HexString; + spendingKey: HexString; +} + +export interface StellarScanInput { + announcements: StellarAnnouncement[]; + viewingKey: Uint8Array; + spendingPubKey: Uint8Array; + spendingScalar: bigint; +} + +export interface SolanaScanInput { + announcements: SolanaAnnouncement[]; + viewingKey: Uint8Array; + spendingPubKey: Uint8Array; + spendingScalar: bigint; +} + +export interface CkbScanInput { + cells: CkbCell[]; + viewingKey: HexString; + spendingPubKey: HexString; + spendingKey: HexString; +} + +export interface ScanInput { + evm?: EvmScanInput; + stellar?: StellarScanInput; + solana?: SolanaScanInput; + ckb?: CkbScanInput; +} + +export interface ScanResults { + evm?: EvmMatchedAnnouncement[]; + stellar?: StellarMatchedAnnouncement[]; + solana?: SolanaMatchedAnnouncement[]; + ckb?: CkbMatchedCell[]; +} + +export interface ProgressEvent { + chain: SupportedChain; + processed: number; + total: number; +} + +export interface MultichainScannerPoolOptions { + /** + * Which chains to include. Defaults to all four. + * Only chains that also have a corresponding key in the `scanAll` input + * will actually be scanned. + */ + chains?: SupportedChain[]; + /** + * Maximum number of chain scans to run concurrently. + * Defaults to 4 (one per chain). Lower values throttle parallelism. + */ + concurrency?: number; + /** + * When true (default), the first chain error rejects the whole scanAll call. + * When false, all chains run to completion and per-chain errors are thrown + * individually (currently surfaces as a rejection after all settle). + */ + failFast?: boolean; +} + +export class MultichainScannerPool { + private readonly chains: SupportedChain[]; + private readonly concurrency: number; + private readonly failFast: boolean; + private readonly progressListeners = new Set<(event: ProgressEvent) => void>(); + + constructor(options: MultichainScannerPoolOptions = {}) { + this.chains = options.chains ?? (['evm', 'stellar', 'solana', 'ckb'] as SupportedChain[]); + this.concurrency = Math.max(1, options.concurrency ?? 4); + this.failFast = options.failFast ?? true; + } + + on(event: 'progress', listener: (e: ProgressEvent) => void): this { + if (event === 'progress') this.progressListeners.add(listener); + return this; + } + + off(event: 'progress', listener: (e: ProgressEvent) => void): this { + if (event === 'progress') this.progressListeners.delete(listener); + return this; + } + + private emit(event: ProgressEvent): void { + this.progressListeners.forEach((fn) => fn(event)); + } + + /** + * Scan all configured chains in parallel. + * + * @param input Per-chain scan parameters. Chains absent from this object + * are skipped even if listed in `options.chains`. + * @param signal Optional AbortSignal. Aborting cancels pending scans and + * rejects the returned promise. + */ + async scanAll(input: ScanInput, signal?: AbortSignal): Promise { + if (signal?.aborted) throw new DOMException('Scan cancelled', 'AbortError'); + + // Only scan chains that have input provided + const activeChains = this.chains.filter((c) => input[c] !== undefined); + if (activeChains.length === 0) return {}; + + const results: ScanResults = {}; + + // Run up to `concurrency` chains at a time using a simple queue + await this.runWithConcurrency(activeChains, this.concurrency, async (chain) => { + if (signal?.aborted) throw new DOMException('Scan cancelled', 'AbortError'); + + const chainInput = input[chain]!; + const matched = await this.scanChain(chain, chainInput, signal); + + // Type-safe result assignment + (results as Record)[chain] = matched; + }); + + return results; + } + + /** + * Runs `tasks` with at most `limit` running concurrently. + * Respects `failFast`: if true, the first rejection propagates immediately. + * If false, all tasks run to completion before any error is thrown. + */ + private async runWithConcurrency( + items: T[], + limit: number, + fn: (item: T) => Promise, + ): Promise { + if (this.failFast) { + await this.runFailFast(items, limit, fn); + } else { + await this.runSettleAll(items, limit, fn); + } + } + + private async runFailFast( + items: T[], + limit: number, + fn: (item: T) => Promise, + ): Promise { + const queue = [...items]; + const active = new Set>(); + + const launch = (): Promise | undefined => { + if (queue.length === 0) return undefined; + const item = queue.shift()!; + const p: Promise = fn(item).finally(() => active.delete(p)); + active.add(p); + return p; + }; + + // Fill up to the concurrency limit + while (active.size < limit && queue.length > 0) launch(); + + // As each slot frees up, launch the next item + while (active.size > 0) { + await Promise.race(active); + while (active.size < limit && queue.length > 0) launch(); + } + } + + private async runSettleAll( + items: T[], + limit: number, + fn: (item: T) => Promise, + ): Promise { + const errors: unknown[] = []; + await this.runFailFast(items, limit, async (item) => { + try { + await fn(item); + } catch (err) { + errors.push(err); + } + }); + if (errors.length > 0) throw errors[0]; + } + + private async scanChain( + chain: SupportedChain, + input: EvmScanInput | StellarScanInput | SolanaScanInput | CkbScanInput, + signal?: AbortSignal, + ): Promise { + if (signal?.aborted) throw new DOMException('Scan cancelled', 'AbortError'); + + switch (chain) { + case 'evm': { + const { scanAnnouncements } = await import('./chains/evm/scan'); + const i = input as EvmScanInput; + const total = i.announcements.length; + this.emit({ chain, processed: 0, total }); + const result = scanAnnouncements( + i.announcements, + i.viewingKey, + i.spendingPubKey, + i.spendingKey, + ); + this.emit({ chain, processed: total, total }); + return result; + } + case 'stellar': { + const { scanAnnouncements } = await import('./chains/stellar/scan'); + const i = input as StellarScanInput; + const total = i.announcements.length; + this.emit({ chain, processed: 0, total }); + const result = scanAnnouncements( + i.announcements, + i.viewingKey, + i.spendingPubKey, + i.spendingScalar, + ); + this.emit({ chain, processed: total, total }); + return result; + } + case 'solana': { + const { scanAnnouncements } = await import('./chains/solana/scan'); + const i = input as SolanaScanInput; + const total = i.announcements.length; + this.emit({ chain, processed: 0, total }); + const result = scanAnnouncements( + i.announcements, + i.viewingKey, + i.spendingPubKey, + i.spendingScalar, + ); + this.emit({ chain, processed: total, total }); + return result; + } + case 'ckb': { + const { scanStealthCells } = await import('./chains/ckb/scan'); + const i = input as CkbScanInput; + const total = i.cells.length; + this.emit({ chain, processed: 0, total }); + const result = scanStealthCells(i.cells, i.viewingKey, i.spendingPubKey, i.spendingKey); + this.emit({ chain, processed: total, total }); + return result; + } + default: + throw new Error(`Unsupported chain: ${chain}`); + } + } +} diff --git a/test/scanner-pool.test.ts b/test/scanner-pool.test.ts new file mode 100644 index 0000000..dc70894 --- /dev/null +++ b/test/scanner-pool.test.ts @@ -0,0 +1,275 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { + MultichainScannerPool, + type ScanInput, + type EvmScanInput, + type StellarScanInput, + type SolanaScanInput, + type CkbScanInput, + type ProgressEvent, +} from '../src/scanner-pool'; + +// --------------------------------------------------------------------------- +// Fixtures +// --------------------------------------------------------------------------- + +const evmKeys = { + viewingKey: '0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc' as const, + spendingPubKey: '0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd' as const, + spendingKey: '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' as const, +}; + +const stellarKeys = { + viewingKey: new Uint8Array(32).fill(0xcc), + spendingPubKey: new Uint8Array(32).fill(0xdd), + spendingScalar: 123456789n, +}; + +const solanaKeys = { + viewingKey: new Uint8Array(32).fill(0xcc), + spendingPubKey: new Uint8Array(32).fill(0xdd), + spendingScalar: 123456789n, +}; + +const ckbKeys = { + viewingKey: '0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc' as const, + spendingPubKey: '0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd' as const, + spendingKey: '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' as const, +}; + +const evmInput: EvmScanInput = { announcements: [], ...evmKeys }; +const stellarInput: StellarScanInput = { announcements: [], ...stellarKeys }; +const solanaInput: SolanaScanInput = { announcements: [], ...solanaKeys }; +const ckbInput: CkbScanInput = { cells: [], ...ckbKeys }; + +// --------------------------------------------------------------------------- +// Constructor +// --------------------------------------------------------------------------- + +describe('MultichainScannerPool – constructor', () => { + it('initialises with defaults', () => { + const pool = new MultichainScannerPool(); + expect(pool).toBeInstanceOf(MultichainScannerPool); + }); + + it('accepts custom chains and concurrency', () => { + const pool = new MultichainScannerPool({ chains: ['evm', 'solana'], concurrency: 2 }); + expect(pool).toBeInstanceOf(MultichainScannerPool); + }); + + it('clamps concurrency to at least 1', () => { + // Should not throw + const pool = new MultichainScannerPool({ concurrency: 0 }); + expect(pool).toBeInstanceOf(MultichainScannerPool); + }); +}); + +// --------------------------------------------------------------------------- +// scanAll – basic correctness +// --------------------------------------------------------------------------- + +describe('MultichainScannerPool – scanAll', () => { + let pool: MultichainScannerPool; + + beforeEach(() => { + pool = new MultichainScannerPool({ + chains: ['evm', 'stellar', 'solana', 'ckb'], + concurrency: 4, + }); + }); + + afterEach(() => vi.clearAllMocks()); + + it('returns empty object for empty input', async () => { + expect(await pool.scanAll({})).toEqual({}); + }); + + it('returns only the chains present in input', async () => { + const results = await pool.scanAll({ evm: evmInput }); + expect(results.evm).toBeDefined(); + expect(results.stellar).toBeUndefined(); + expect(results.solana).toBeUndefined(); + expect(results.ckb).toBeUndefined(); + }); + + it('returns arrays for all four chains', async () => { + const results = await pool.scanAll({ + evm: evmInput, + stellar: stellarInput, + solana: solanaInput, + ckb: ckbInput, + }); + expect(Array.isArray(results.evm)).toBe(true); + expect(Array.isArray(results.stellar)).toBe(true); + expect(Array.isArray(results.solana)).toBe(true); + expect(Array.isArray(results.ckb)).toBe(true); + }); + + it('returns empty arrays when announcements/cells are empty', async () => { + const results = await pool.scanAll({ evm: evmInput, stellar: stellarInput }); + expect(results.evm).toHaveLength(0); + expect(results.stellar).toHaveLength(0); + }); + + it('ignores chains not listed in constructor options', async () => { + const evmOnly = new MultichainScannerPool({ chains: ['evm'] }); + const results = await evmOnly.scanAll({ evm: evmInput, stellar: stellarInput }); + expect(results.evm).toBeDefined(); + // stellar was not in the pool's chain list + expect(results.stellar).toBeUndefined(); + }); +}); + +// --------------------------------------------------------------------------- +// Parallelism +// --------------------------------------------------------------------------- + +describe('MultichainScannerPool – parallelism', () => { + it('runs chains concurrently (parallel not slower than sequential)', async () => { + // With empty arrays both paths are near-instant; verify parallel ≤ sequential + tolerance + const parallel = new MultichainScannerPool({ concurrency: 4 }); + const sequential = new MultichainScannerPool({ concurrency: 1 }); + const input: ScanInput = { + evm: evmInput, + stellar: stellarInput, + solana: solanaInput, + ckb: ckbInput, + }; + + const t0 = performance.now(); + await parallel.scanAll(input); + const parallelMs = performance.now() - t0; + + const t1 = performance.now(); + await sequential.scanAll(input); + const seqMs = performance.now() - t1; + + expect(parallelMs).toBeLessThanOrEqual(seqMs + 100); + }); + + it('respects concurrency=1 (sequential order)', async () => { + const pool = new MultichainScannerPool({ chains: ['evm', 'stellar'], concurrency: 1 }); + const results = await pool.scanAll({ evm: evmInput, stellar: stellarInput }); + expect(results.evm).toBeDefined(); + expect(results.stellar).toBeDefined(); + }); +}); + +// --------------------------------------------------------------------------- +// Progress events +// --------------------------------------------------------------------------- + +describe('MultichainScannerPool – progress events', () => { + it('emits start (processed=0) and end (processed=total) events per chain', async () => { + const pool = new MultichainScannerPool({ chains: ['evm'] }); + const events: ProgressEvent[] = []; + pool.on('progress', (e) => events.push({ ...e })); + + await pool.scanAll({ evm: evmInput }); + + const evmEvents = events.filter((e) => e.chain === 'evm'); + expect(evmEvents.length).toBeGreaterThanOrEqual(2); + expect(evmEvents[0]).toMatchObject({ chain: 'evm', processed: 0, total: 0 }); + expect(evmEvents[evmEvents.length - 1]).toMatchObject({ chain: 'evm', processed: 0, total: 0 }); + }); + + it('on() returns this for chaining', () => { + const pool = new MultichainScannerPool(); + const ret = pool.on('progress', () => {}); + expect(ret).toBe(pool); + }); + + it('off() removes the listener', async () => { + const pool = new MultichainScannerPool({ chains: ['evm'] }); + const listener = vi.fn(); + pool.on('progress', listener); + pool.off('progress', listener); + + await pool.scanAll({ evm: evmInput }); + expect(listener).not.toHaveBeenCalled(); + }); + + it('off() returns this for chaining', () => { + const pool = new MultichainScannerPool(); + const fn = () => {}; + pool.on('progress', fn); + const ret = pool.off('progress', fn); + expect(ret).toBe(pool); + }); +}); + +// --------------------------------------------------------------------------- +// AbortSignal cancellation +// --------------------------------------------------------------------------- + +describe('MultichainScannerPool – cancellation', () => { + it('rejects immediately when signal is already aborted', async () => { + const pool = new MultichainScannerPool(); + const ctrl = new AbortController(); + ctrl.abort(); + + await expect(pool.scanAll({ evm: evmInput }, ctrl.signal)).rejects.toThrow(); + }); + + it('resolves normally when signal is not aborted', async () => { + const pool = new MultichainScannerPool({ chains: ['evm'] }); + const ctrl = new AbortController(); + const results = await pool.scanAll({ evm: evmInput }, ctrl.signal); + expect(results.evm).toBeDefined(); + }); +}); + +// --------------------------------------------------------------------------- +// failFast option +// --------------------------------------------------------------------------- + +describe('MultichainScannerPool – failFast', () => { + it('rejects on first error by default (failFast=true)', async () => { + const pool = new MultichainScannerPool({ chains: ['evm'], failFast: true }); + + // Pass an input that will cause scanChain to throw (unsupported chain injected via cast) + const badInput = { evm: null } as unknown as ScanInput; + // evm is present but null — scanChain will try to destructure and throw + await expect(pool.scanAll(badInput)).rejects.toThrow(); + }); + + it('failFast=false still surfaces errors', async () => { + const pool = new MultichainScannerPool({ chains: ['evm'], failFast: false }); + const badInput = { evm: null } as unknown as ScanInput; + await expect(pool.scanAll(badInput)).rejects.toThrow(); + }); +}); + +// --------------------------------------------------------------------------- +// Benchmark (smoke test — not a hard timing assertion) +// --------------------------------------------------------------------------- + +describe('MultichainScannerPool – benchmark smoke test', () => { + it('parallel 4-chain scan completes faster than sequential baseline', async () => { + const pool = new MultichainScannerPool({ concurrency: 4 }); + + const t0 = performance.now(); + await pool.scanAll({ + evm: evmInput, + stellar: stellarInput, + solana: solanaInput, + ckb: ckbInput, + }); + const parallelMs = performance.now() - t0; + + // Sequential baseline + const seqPool = new MultichainScannerPool({ concurrency: 1 }); + const t1 = performance.now(); + await seqPool.scanAll({ + evm: evmInput, + stellar: stellarInput, + solana: solanaInput, + ckb: ckbInput, + }); + const seqMs = performance.now() - t1; + + // With empty arrays both are near-instant; just assert parallel ≤ sequential + expect(parallelMs).toBeLessThanOrEqual(seqMs + 50); // 50 ms tolerance for CI jitter + console.log(`Parallel: ${parallelMs.toFixed(1)} ms | Sequential: ${seqMs.toFixed(1)} ms`); + }); +});