diff --git a/packages/engine/src/__tests__/engine/clock.test.ts b/packages/engine/src/__tests__/engine/clock.test.ts new file mode 100644 index 0000000..578ee0d --- /dev/null +++ b/packages/engine/src/__tests__/engine/clock.test.ts @@ -0,0 +1,165 @@ +import { describe, it, expect, vi } from 'vitest' +import { RealClock, TestClock } from '../../engine/clock.js' +import { parseDuration } from '../../engine/duration.js' + +describe('RealClock', () => { + it('now() returns a number close to Date.now()', () => { + const clock = new RealClock() + const before = Date.now() + const result = clock.now() + const after = Date.now() + expect(result).toBeGreaterThanOrEqual(before) + expect(result).toBeLessThanOrEqual(after) + }) + + it('setTimeout and clearTimeout delegate to global timers', () => { + vi.useFakeTimers() + try { + const clock = new RealClock() + const fn = vi.fn() + clock.setTimeout(fn, 1000) + vi.advanceTimersByTime(500) + expect(fn).not.toHaveBeenCalled() + vi.advanceTimersByTime(500) + expect(fn).toHaveBeenCalledOnce() + + // clearTimeout prevents execution + const fn2 = vi.fn() + const id2 = clock.setTimeout(fn2, 1000) + clock.clearTimeout(id2) + vi.advanceTimersByTime(2000) + expect(fn2).not.toHaveBeenCalled() + } finally { + vi.useRealTimers() + } + }) +}) + +describe('TestClock', () => { + it('starts at time 0', () => { + const clock = new TestClock() + expect(clock.now()).toBe(0) + }) + + it('advance() moves time forward', () => { + const clock = new TestClock() + clock.advance(5000) + expect(clock.now()).toBe(5000) + }) + + it('advance() fires timers in chronological order', () => { + const clock = new TestClock() + const order: number[] = [] + + clock.setTimeout(() => order.push(1), 100) + clock.setTimeout(() => order.push(2), 50) + clock.setTimeout(() => order.push(3), 200) + + clock.advance(200) + + expect(order).toEqual([2, 1, 3]) + }) + + it('advance() does not fire timers beyond the window', () => { + const clock = new TestClock() + const fn = vi.fn() + + clock.setTimeout(fn, 1000) + clock.advance(500) + + expect(fn).not.toHaveBeenCalled() + expect(clock.now()).toBe(500) + }) + + it('advance() fires timers at exact boundary', () => { + const clock = new TestClock() + const fn = vi.fn() + + clock.setTimeout(fn, 100) + clock.advance(100) + + expect(fn).toHaveBeenCalledOnce() + }) + + it('clearTimeout prevents a timer from firing', () => { + const clock = new TestClock() + const fn = vi.fn() + + const id = clock.setTimeout(fn, 100) + clock.clearTimeout(id) + clock.advance(200) + + expect(fn).not.toHaveBeenCalled() + }) + + it('multiple advance() calls accumulate time', () => { + const clock = new TestClock() + const fn = vi.fn() + + clock.setTimeout(fn, 150) + clock.advance(100) + expect(fn).not.toHaveBeenCalled() + clock.advance(100) + expect(fn).toHaveBeenCalledOnce() + expect(clock.now()).toBe(200) + }) + + it('timer fn runs at the correct currentTime', () => { + const clock = new TestClock() + let capturedTime = -1 + + clock.setTimeout(() => { + capturedTime = clock.now() + }, 75) + + clock.advance(100) + + expect(capturedTime).toBe(75) + }) +}) + +describe('parseDuration', () => { + it('parses PT24H', () => { + expect(parseDuration('PT24H')).toBe(24 * 3600 * 1000) + }) + + it('parses PT30M', () => { + expect(parseDuration('PT30M')).toBe(30 * 60 * 1000) + }) + + it('parses PT60S', () => { + expect(parseDuration('PT60S')).toBe(60 * 1000) + }) + + it('parses combined PT1H30M15S', () => { + expect(parseDuration('PT1H30M15S')).toBe((3600 + 1800 + 15) * 1000) + }) + + it('parses legacy shorthand 7d', () => { + expect(parseDuration('7d')).toBe(7 * 86_400_000) + }) + + it('parses legacy shorthand 24h', () => { + expect(parseDuration('24h')).toBe(24 * 3_600_000) + }) + + it('parses legacy shorthand 30m', () => { + expect(parseDuration('30m')).toBe(30 * 60_000) + }) + + it('parses legacy shorthand 60s', () => { + expect(parseDuration('60s')).toBe(60 * 1000) + }) + + it('throws on invalid format', () => { + expect(() => parseDuration('invalid')).toThrow('Invalid duration') + }) + + it('throws on empty PT', () => { + expect(() => parseDuration('PT')).toThrow('Invalid duration') + }) + + it('throws on P1D (day-level ISO not supported)', () => { + expect(() => parseDuration('P1D')).toThrow('Invalid duration') + }) +}) diff --git a/packages/engine/src/__tests__/engine/wait-signal.test.ts b/packages/engine/src/__tests__/engine/wait-signal.test.ts new file mode 100644 index 0000000..d3e79e8 --- /dev/null +++ b/packages/engine/src/__tests__/engine/wait-signal.test.ts @@ -0,0 +1,467 @@ +import { describe, it, expect } from 'vitest' +import { FlowprintEngine } from '../../engine/engine.js' +import { TestClock } from '../../engine/clock.js' +import type { ExecutionContext } from '../../walker/types.js' + +/** + * Flow with a single wait node: trigger → wait → action → terminal. + */ +const WAIT_FLOW = ` +schema: flowprint/1.0 +name: approval-flow +version: "1.0.0" +lanes: + default: + label: Default + visibility: internal + order: 0 +nodes: + start: + type: trigger + lane: default + label: Start + trigger_type: manual + manual: {} + next: wait_approval + wait_approval: + type: wait + lane: default + label: Wait for Approval + event: approval + timeout: PT24H + timeout_next: auto_reject + next: process_approved + process_approved: + type: action + lane: default + label: Process Approved + next: done + auto_reject: + type: action + lane: default + label: Auto Reject + next: done + done: + type: terminal + lane: default + label: Done + outcome: success +` + +/** + * Flow with two sequential waits: trigger → wait1 → wait2 → terminal. + */ +const DOUBLE_WAIT_FLOW = ` +schema: flowprint/1.0 +name: double-wait-flow +version: "1.0.0" +lanes: + default: + label: Default + visibility: internal + order: 0 +nodes: + start: + type: trigger + lane: default + label: Start + trigger_type: manual + manual: {} + next: wait_first + wait_first: + type: wait + lane: default + label: Wait First + event: first_signal + next: wait_second + wait_second: + type: wait + lane: default + label: Wait Second + event: second_signal + next: done + done: + type: terminal + lane: default + label: Done + outcome: success +` + +/** + * Simple wait flow without timeout (no timeout_next). + */ +const SIMPLE_WAIT_FLOW = ` +schema: flowprint/1.0 +name: simple-wait-flow +version: "1.0.0" +lanes: + default: + label: Default + visibility: internal + order: 0 +nodes: + start: + type: trigger + lane: default + label: Start + trigger_type: manual + manual: {} + next: wait_event + wait_event: + type: wait + lane: default + label: Wait Event + event: my_event + next: done + done: + type: terminal + lane: default + label: Done + outcome: success +` + +/** + * Helper: wait for microtasks to flush so the async engine loop + * reaches the wait node and suspends. + */ +async function flushMicrotasks(): Promise { + // Multiple rounds to ensure the walkGraph loop has suspended on the wait promise + for (let i = 0; i < 10; i++) { + await new Promise((r) => setTimeout(r, 0)) + } +} + +describe('wait/signal system', () => { + describe('basic signal delivery', () => { + it('wait node pauses execution — status is waiting, waitingFor is event name', async () => { + const clock = new TestClock() + const engine = new FlowprintEngine({ clock }) + engine.register('process_approved', async () => ({ approved: true })) + engine.register('auto_reject', async () => ({ rejected: true })) + + const flow = await engine.load(WAIT_FLOW) + const execution = flow.start({}) + + await flushMicrotasks() + + expect(execution.status).toBe('waiting') + expect(execution.waitingFor).toBe('approval') + }) + + it('signal resumes execution and continues to next node', async () => { + const clock = new TestClock() + const engine = new FlowprintEngine({ clock }) + engine.register('process_approved', async () => ({ approved: true })) + engine.register('auto_reject', async () => ({ rejected: true })) + + const flow = await engine.load(WAIT_FLOW) + const execution = flow.start({}) + + await flushMicrotasks() + expect(execution.status).toBe('waiting') + + execution.signal('approval', { approver: 'alice' }) + + const result = await execution.result + expect(execution.status).toBe('completed') + expect(result.outcome).toBe('success') + expect(result.output).toMatchObject({ approved: true }) + }) + + it('signal data is merged into execution state', async () => { + const clock = new TestClock() + const engine = new FlowprintEngine({ clock }) + let capturedState: Record = {} + + engine.register('process_approved', async (ctx: ExecutionContext) => { + capturedState = { ...ctx.state } + return { approved: true } + }) + engine.register('auto_reject', async () => ({ rejected: true })) + + const flow = await engine.load(WAIT_FLOW) + const execution = flow.start({}) + + await flushMicrotasks() + execution.signal('approval', { approver: 'bob' }) + + await execution.result + + expect(capturedState).toMatchObject({ approver: 'bob' }) + }) + }) + + describe('multiple sequential waits', () => { + it('two sequential waits work correctly', async () => { + const clock = new TestClock() + const engine = new FlowprintEngine({ clock }) + + const flow = await engine.load(DOUBLE_WAIT_FLOW) + const execution = flow.start({}) + + // First wait + await flushMicrotasks() + expect(execution.status).toBe('waiting') + expect(execution.waitingFor).toBe('first_signal') + + execution.signal('first_signal', { step: 1 }) + + // Second wait + await flushMicrotasks() + expect(execution.status).toBe('waiting') + expect(execution.waitingFor).toBe('second_signal') + + execution.signal('second_signal', { step: 2 }) + + const result = await execution.result + expect(execution.status).toBe('completed') + expect(result.outcome).toBe('success') + expect(result.output).toMatchObject({ step: 2 }) + }) + }) + + describe('timeout', () => { + it('wait expires and routes to timeout_next using TestClock.advance', async () => { + const clock = new TestClock() + // Set TTL higher than the 24h wait timeout so TTL doesn't interfere + const engine = new FlowprintEngine({ clock, pausedExecutionTTL: 48 * 3600 * 1000 }) + engine.register('process_approved', async () => ({ approved: true })) + engine.register('auto_reject', async () => ({ rejected: true })) + + const flow = await engine.load(WAIT_FLOW) + const execution = flow.start({}) + + await flushMicrotasks() + expect(execution.status).toBe('waiting') + + // Advance past the 24h timeout + execution.advanceTime('PT24H') + + const result = await execution.result + expect(execution.status).toBe('completed') + expect(result.outcome).toBe('success') + // Should have gone through auto_reject path + expect(result.output).toMatchObject({ rejected: true }) + }) + + it('TestClock.advance triggers timeout synchronously', async () => { + const clock = new TestClock() + // Set TTL higher than the 24h wait timeout so TTL doesn't interfere + const engine = new FlowprintEngine({ clock, pausedExecutionTTL: 48 * 3600 * 1000 }) + engine.register('process_approved', async () => ({ approved: true })) + engine.register('auto_reject', async () => ({ rejected: true })) + + const flow = await engine.load(WAIT_FLOW) + const execution = flow.start({}) + + await flushMicrotasks() + expect(execution.status).toBe('waiting') + + // Advance exactly to the timeout boundary + clock.advance(24 * 3600 * 1000) + + // The timeout fires synchronously, but the async flow needs a tick + await flushMicrotasks() + expect(execution.status).toBe('completed') + }) + }) + + describe('signal validation', () => { + it('wrong signal name throws error', async () => { + const clock = new TestClock() + const engine = new FlowprintEngine({ clock }) + + const flow = await engine.load(SIMPLE_WAIT_FLOW) + const execution = flow.start({}) + + await flushMicrotasks() + expect(execution.status).toBe('waiting') + + expect(() => execution.signal('wrong_event', {})).toThrow( + "No pending wait for event 'wrong_event'", + ) + }) + + it('signal on non-waiting execution throws', async () => { + const clock = new TestClock() + const engine = new FlowprintEngine({ clock }) + + const flow = await engine.load(SIMPLE_WAIT_FLOW) + const execution = flow.start({}) + + // Don't wait for it to reach the wait node — still running + expect(() => execution.signal('my_event', {})).toThrow( + "Cannot signal execution in 'running' state", + ) + }) + + it('validateSignal hook rejects invalid signal data', async () => { + const clock = new TestClock() + const engine = new FlowprintEngine({ + clock, + validateSignal: (eventName, data) => { + if (eventName === 'my_event' && !(data as Record)?.valid) { + throw new Error('Signal data must have valid=true') + } + }, + }) + + const flow = await engine.load(SIMPLE_WAIT_FLOW) + const execution = flow.start({}) + + await flushMicrotasks() + expect(execution.status).toBe('waiting') + + expect(() => execution.signal('my_event', { valid: false })).toThrow( + 'Signal data must have valid=true', + ) + + // Valid signal should work + execution.signal('my_event', { valid: true }) + await execution.result + expect(execution.status).toBe('completed') + }) + }) + + describe('execute() vs start()', () => { + it('execute() throws on flow with wait nodes', async () => { + const engine = new FlowprintEngine() + engine.register('process_approved', async () => ({ approved: true })) + engine.register('auto_reject', async () => ({ rejected: true })) + + const flow = await engine.load(WAIT_FLOW) + await expect(flow.execute({})).rejects.toThrow('Wait nodes require start(), not execute()') + }) + }) + + describe('result promise', () => { + it('result promise resolves when flow completes after signal', async () => { + const clock = new TestClock() + const engine = new FlowprintEngine({ clock }) + + const flow = await engine.load(SIMPLE_WAIT_FLOW) + const execution = flow.start({}) + + await flushMicrotasks() + execution.signal('my_event', { data: 42 }) + + const result = await execution.result + expect(result.outcome).toBe('success') + expect(result.trace.length).toBeGreaterThan(0) + }) + + it('completedResult is available after completion', async () => { + const clock = new TestClock() + const engine = new FlowprintEngine({ clock }) + + const flow = await engine.load(SIMPLE_WAIT_FLOW) + const execution = flow.start({}) + + expect(execution.completedResult).toBeUndefined() + + await flushMicrotasks() + execution.signal('my_event', {}) + + await execution.result + expect(execution.completedResult).toBeDefined() + expect(execution.completedResult!.outcome).toBe('success') + }) + }) + + describe('paused execution TTL', () => { + it('paused execution expires after TTL', async () => { + const clock = new TestClock() + const engine = new FlowprintEngine({ + clock, + pausedExecutionTTL: 5000, // 5 seconds TTL + }) + + const flow = await engine.load(SIMPLE_WAIT_FLOW) + const execution = flow.start({}) + + await flushMicrotasks() + expect(execution.status).toBe('waiting') + + // Advance past TTL + clock.advance(6000) + + await expect(execution.result).rejects.toThrow('expired after 5000ms TTL') + expect(execution.status).toBe('failed') + }) + + it('TTL does not fire if signal arrives in time', async () => { + const clock = new TestClock() + const engine = new FlowprintEngine({ + clock, + pausedExecutionTTL: 5000, + }) + + const flow = await engine.load(SIMPLE_WAIT_FLOW) + const execution = flow.start({}) + + await flushMicrotasks() + execution.signal('my_event', {}) + + const result = await execution.result + expect(result.outcome).toBe('success') + + // Advance past TTL — should not fail since already completed + clock.advance(10000) + expect(execution.status).toBe('completed') + }) + }) + + describe('advanceTime', () => { + it('advanceTime() throws with RealClock', async () => { + const engine = new FlowprintEngine() // default = RealClock + + const flow = await engine.load(SIMPLE_WAIT_FLOW) + const execution = flow.start({}) + + await flushMicrotasks() + expect(() => execution.advanceTime('PT1H')).toThrow( + 'advanceTime() is only available with TestClock', + ) + }) + + it('advanceTime() parses ISO 8601 duration', async () => { + const clock = new TestClock() + // Set TTL higher than the 25h advance so TTL doesn't interfere + const engine = new FlowprintEngine({ clock, pausedExecutionTTL: 48 * 3600 * 1000 }) + engine.register('process_approved', async () => ({ approved: true })) + engine.register('auto_reject', async () => ({ rejected: true })) + + const flow = await engine.load(WAIT_FLOW) + const execution = flow.start({}) + + await flushMicrotasks() + + // Advance 25 hours (past the 24h timeout) + execution.advanceTime('PT25H') + + const result = await execution.result + expect(result.output).toMatchObject({ rejected: true }) + }) + }) + + describe('trace recording', () => { + it('wait node appears in the execution trace', async () => { + const clock = new TestClock() + const engine = new FlowprintEngine({ clock }) + engine.register('process_approved', async () => ({ approved: true })) + engine.register('auto_reject', async () => ({ rejected: true })) + + const flow = await engine.load(WAIT_FLOW) + const execution = flow.start({}) + + await flushMicrotasks() + execution.signal('approval', { approver: 'alice' }) + + const result = await execution.result + const waitStep = result.trace.find((t) => t.nodeId === 'wait_approval') + expect(waitStep).toBeDefined() + expect(waitStep!.type).toBe('wait') + expect(waitStep!.handler).toBe('native') + }) + }) +}) diff --git a/packages/engine/src/adapters/index.ts b/packages/engine/src/adapters/index.ts index 938d56a..44cc4e3 100644 --- a/packages/engine/src/adapters/index.ts +++ b/packages/engine/src/adapters/index.ts @@ -1,2 +1,2 @@ -export { PlainAdapter, ActionTimeoutError } from './plain.js' +export { PlainAdapter, ActionTimeoutError, WaitTimeoutError } from './plain.js' export type { ExecutionAdapter, ActionConfig } from './types.js' diff --git a/packages/engine/src/adapters/plain.ts b/packages/engine/src/adapters/plain.ts index 136ac6a..d39c212 100644 --- a/packages/engine/src/adapters/plain.ts +++ b/packages/engine/src/adapters/plain.ts @@ -1,5 +1,7 @@ import type { ExecutionContext } from '../walker/types.js' import type { ExecutionAdapter, ActionConfig } from './types.js' +import type { Clock } from '../engine/clock.js' +import { RealClock } from '../engine/clock.js' /** * Error thrown when an action handler exceeds its timeout. @@ -14,6 +16,20 @@ export class ActionTimeoutError extends Error { } } +/** + * Error thrown when a wait node times out before receiving a signal. + */ +export class WaitTimeoutError extends Error { + constructor( + public readonly nodeId: string, + public readonly eventName: string, + public readonly timeoutMs: number, + ) { + super(`Wait node '${nodeId}' timed out after ${timeoutMs}ms waiting for event '${eventName}'`) + this.name = 'WaitTimeoutError' + } +} + /** * Combine multiple AbortSignals into a single signal that aborts * when any of the input signals abort. @@ -40,9 +56,20 @@ export class PlainAdapter implements ExecutionAdapter { readonly name = 'plain' private defaultTimeout: number + private readonly clock: Clock + private readonly pendingWaits = new Map< + string, + { + resolve: (data: unknown) => void + reject: (err: Error) => void + eventName: string + timeoutId?: ReturnType + } + >() - constructor(options?: { defaultTimeout?: number }) { + constructor(options?: { defaultTimeout?: number; clock?: Clock }) { this.defaultTimeout = options?.defaultTimeout ?? 30_000 // 30s default + this.clock = options?.clock ?? new RealClock() } async executeAction( @@ -87,4 +114,47 @@ export class PlainAdapter implements ExecutionAdapter { clearTimeout(timeoutId) } } + + /** + * Suspend execution at a wait node until an external signal arrives or timeout fires. + * Returns the signal payload when delivered. + */ + async waitForEvent(nodeId: string, eventName: string, timeout?: number): Promise { + return new Promise((resolve, reject) => { + const entry: { + resolve: (data: unknown) => void + reject: (err: Error) => void + eventName: string + timeoutId?: ReturnType + } = { resolve, reject, eventName } + + if (timeout != null && timeout > 0) { + entry.timeoutId = this.clock.setTimeout(() => { + this.pendingWaits.delete(nodeId) + reject(new WaitTimeoutError(nodeId, eventName, timeout)) + }, timeout) + } + + this.pendingWaits.set(nodeId, entry) + }) + } + + /** + * Deliver an external signal to a waiting node. + * Returns true if the signal was delivered, false if no matching wait exists. + */ + deliverSignal(nodeId: string, eventName: string, data: unknown): boolean { + const wait = this.pendingWaits.get(nodeId) + if (!wait || wait.eventName !== eventName) return false + + if (wait.timeoutId != null) this.clock.clearTimeout(wait.timeoutId) + this.pendingWaits.delete(nodeId) + wait.resolve(data) + return true + } + + /** Returns true if there is a pending wait for the given node. */ + hasPendingWait(nodeId: string): boolean { + return this.pendingWaits.has(nodeId) + } } diff --git a/packages/engine/src/adapters/types.ts b/packages/engine/src/adapters/types.ts index 1dec8c2..42c5878 100644 --- a/packages/engine/src/adapters/types.ts +++ b/packages/engine/src/adapters/types.ts @@ -24,4 +24,14 @@ export interface ExecutionAdapter { context: ExecutionContext, config: ActionConfig, ): Promise + + /** + * Suspend execution at a wait node until an external signal arrives or timeout fires. + * Returns the signal payload when delivered. + */ + waitForEvent?( + nodeId: string, + eventName: string, + timeout?: number, + ): Promise } diff --git a/packages/engine/src/engine/clock.ts b/packages/engine/src/engine/clock.ts new file mode 100644 index 0000000..2c04969 --- /dev/null +++ b/packages/engine/src/engine/clock.ts @@ -0,0 +1,74 @@ +/** + * Abstraction over time operations. Allows deterministic testing + * of timeout-dependent code paths (wait nodes, TTL). + */ +export interface Clock { + now(): number + setTimeout(fn: () => void, ms: number): ReturnType + clearTimeout(id: ReturnType): void +} + +/** + * Real clock backed by Date.now() and native timers. + */ +export class RealClock implements Clock { + now(): number { + return Date.now() + } + + setTimeout(fn: () => void, ms: number): ReturnType { + return setTimeout(fn, ms) + } + + clearTimeout(id: ReturnType): void { + clearTimeout(id) + } +} + +/** + * Deterministic clock for testing. Time only advances via advance(). + * Timers fire synchronously in chronological order within the advanced window. + */ +export class TestClock implements Clock { + private currentTime = 0 + private timers: ({ fn: () => void; triggerAt: number } | undefined)[] = [] + + now(): number { + return this.currentTime + } + + setTimeout(fn: () => void, ms: number): ReturnType { + const timer = { fn, triggerAt: this.currentTime + ms } + this.timers.push(timer) + // Return a fake timer ID (index-based) + return (this.timers.length - 1) as unknown as ReturnType + } + + clearTimeout(id: ReturnType): void { + const index = id as unknown as number + if (this.timers[index]) { + this.timers[index] = undefined + } + } + + /** Advance time by `ms`. Triggers all timers that fire within the window, in order. */ + advance(ms: number): void { + const targetTime = this.currentTime + ms + + // Collect and sort pending timers within the window + const pending = this.timers + .filter((t): t is { fn: () => void; triggerAt: number } => t != null && t.triggerAt <= targetTime) + .sort((a, b) => a.triggerAt - b.triggerAt) + + for (const timer of pending) { + // Clear the timer slot before firing (prevents double-fire) + const idx = this.timers.indexOf(timer) + if (idx !== -1) this.timers[idx] = undefined + + this.currentTime = timer.triggerAt + timer.fn() + } + + this.currentTime = targetTime + } +} diff --git a/packages/engine/src/engine/compiled-flow.ts b/packages/engine/src/engine/compiled-flow.ts index e855510..66eee30 100644 --- a/packages/engine/src/engine/compiled-flow.ts +++ b/packages/engine/src/engine/compiled-flow.ts @@ -16,9 +16,16 @@ import { evaluateExpression } from '../runner/evaluator.js' import { loadRulesFile, evaluateRules } from '../rules/evaluator.js' import { PlainAdapter } from '../adapters/plain.js' import type { ExecutionAdapter } from '../adapters/types.js' +import { Execution } from './execution.js' +import { RealClock } from './clock.js' +import type { Clock } from './clock.js' +import { parseDuration } from './duration.js' import { buildLegacyContext } from './engine.js' import type { EngineOptions, ExecutionResult, ResolvedHandler, EngineHooks } from './types.js' +/** Default TTL for paused executions: 1 hour. */ +const DEFAULT_PAUSED_TTL = 3_600_000 + /** * An immutable, pre-compiled flow ready for execution. * @@ -27,13 +34,17 @@ import type { EngineOptions, ExecutionResult, ResolvedHandler, EngineHooks } fro */ export class CompiledFlow { private readonly adapter: ExecutionAdapter + private readonly clock: Clock constructor( private readonly doc: FlowprintDocument, private readonly resolvedHandlers: ReadonlyMap, private readonly options: EngineOptions, ) { - this.adapter = options.adapter ?? new PlainAdapter({ defaultTimeout: options.defaultTimeout }) + this.clock = options.clock ?? new RealClock() + this.adapter = + options.adapter ?? + new PlainAdapter({ defaultTimeout: options.defaultTimeout, clock: this.clock }) } /** @@ -63,6 +74,63 @@ export class CompiledFlow { } } + /** + * Start a flow that may contain wait nodes. Returns an Execution handle + * for signal delivery and status tracking. + * + * The flow runs asynchronously. Use `execution.result` to await completion. + */ + start(input: Record): Execution { + const adapter = this.adapter + if (!(adapter instanceof PlainAdapter)) { + throw new Error('start() requires PlainAdapter (or a subclass)') + } + + const ttl = this.options.pausedExecutionTTL ?? DEFAULT_PAUSED_TTL + const execution = new Execution(adapter, this.clock, this.options.validateSignal, ttl) + + // Run the flow asynchronously + this.runAsync(input, execution, adapter).then( + (result) => execution.complete(result), + (error) => { + const err = error instanceof Error ? error : new Error(String(error)) + execution.fail(err) + }, + ) + + return execution + } + + /** + * Run the flow asynchronously with wait-node support. + * When a wait node is hit, calls adapter.waitForEvent() which suspends + * until a signal is delivered via Execution.signal(). + */ + private async runAsync( + input: Record, + execution: Execution, + adapter: PlainAdapter, + ): Promise { + const hooks = this.options.hooks + const projectRoot = this.options.projectRoot ?? process.cwd() + const expressionTimeout = this.options.expressionTimeout + + const callbacks = this.buildCallbacks(hooks, projectRoot, expressionTimeout, execution, adapter) + + try { + const result = await walkGraph(this.doc, input, callbacks) + return { + output: result.output, + trace: result.trace, + outcome: result.outcome, + } + } catch (err: unknown) { + const error = err instanceof Error ? err : new Error(String(err)) + safeCallHook(() => hooks?.onFlowError?.(error)) + throw err + } + } + /** * Build WalkerCallbacks that dispatch to resolvedHandlers. * @@ -75,6 +143,8 @@ export class CompiledFlow { hooks: EngineHooks | undefined, projectRoot: string, expressionTimeout: number | undefined, + execution?: Execution, + plainAdapter?: PlainAdapter, ): WalkGraphCallbacks { const resolvedHandlers = this.resolvedHandlers const doc = this.doc @@ -328,11 +398,67 @@ export class CompiledFlow { }, onWait: async ( - _nodeId: string, - _node: WaitNode, + nodeId: string, + node: WaitNode, _ctx: ExecutionContext, ): Promise => { - throw new Error('Wait nodes require start(), not execute()') + if (!execution || !plainAdapter) { + throw new Error('Wait nodes require start(), not execute()') + } + + safeCallHook(() => hooks?.onNodeStart?.(nodeId, node.type, node.lane)) + const startedAt = performance.now() + + // Parse timeout from the node's duration string + let timeoutMs: number | undefined + if (node.timeout) { + timeoutMs = parseDuration(node.timeout) + } + + // Suspend: mark execution as waiting, then block on adapter + execution.setWaiting(nodeId, node.event) + let signalData: unknown + + try { + signalData = await plainAdapter.waitForEvent(nodeId, node.event, timeoutMs) + } catch (err: unknown) { + // WaitTimeoutError — route to timeout_next if available + if (err instanceof Error && err.name === 'WaitTimeoutError' && node.timeout_next) { + signalData = undefined + } else { + throw err + } + } + + execution.setRunning() + + const record: NodeExecutionRecord = { + nodeId, + type: node.type, + lane: node.lane, + startedAt, + completedAt: performance.now(), + output: signalData && typeof signalData === 'object' && !Array.isArray(signalData) + ? (signalData as Record) + : {}, + handler: 'native', + } + safeCallHook(() => hooks?.onNodeComplete?.(record)) + recordStep(record) + + return signalData + }, + + resolveWaitNext: ( + _nodeId: string, + node: WaitNode, + result: unknown, + ): string | undefined => { + // If result is undefined (timeout case) and timeout_next exists, route there + if (result === undefined && node.timeout_next) { + return node.timeout_next + } + return node.next }, onError: async ( diff --git a/packages/engine/src/engine/duration.ts b/packages/engine/src/engine/duration.ts new file mode 100644 index 0000000..ee9ba00 --- /dev/null +++ b/packages/engine/src/engine/duration.ts @@ -0,0 +1,34 @@ +/** + * Parse a subset of ISO 8601 duration strings into milliseconds. + * + * Supports: PT##H##M##S (hours, minutes, seconds — all optional but at least one required). + * Also supports the legacy engine shorthand: ##d, ##h, ##m, ##s. + */ +export function parseDuration(iso: string): number { + // ISO 8601 PT format + const ptMatch = iso.match(/^PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?$/) + if (ptMatch && (ptMatch[1] || ptMatch[2] || ptMatch[3])) { + const hours = parseInt(ptMatch[1] ?? '0', 10) + const minutes = parseInt(ptMatch[2] ?? '0', 10) + const seconds = parseInt(ptMatch[3] ?? '0', 10) + return (hours * 3600 + minutes * 60 + seconds) * 1000 + } + + // Legacy shorthand: 7d, 24h, 30m, 60s + const shortMatch = iso.match(/^(\d+)(d|h|m|s)$/) + if (shortMatch) { + const value = parseInt(shortMatch[1]!, 10) + switch (shortMatch[2]) { + case 'd': + return value * 86_400_000 + case 'h': + return value * 3_600_000 + case 'm': + return value * 60_000 + case 's': + return value * 1000 + } + } + + throw new Error(`Invalid duration: "${iso}". Expected ISO 8601 PT format (e.g. PT24H, PT30M) or shorthand (e.g. 24h, 30m).`) +} diff --git a/packages/engine/src/engine/execution.ts b/packages/engine/src/engine/execution.ts new file mode 100644 index 0000000..1be35bb --- /dev/null +++ b/packages/engine/src/engine/execution.ts @@ -0,0 +1,149 @@ +import type { PlainAdapter } from '../adapters/plain.js' +import type { Clock } from './clock.js' +import { TestClock } from './clock.js' +import { parseDuration } from './duration.js' +import type { ExecutionResult, ValidateSignalFn } from './types.js' + +export type ExecutionStatus = 'running' | 'waiting' | 'completed' | 'failed' + +/** + * Handle returned by `CompiledFlow.start()` for flows that may contain wait nodes. + * + * Provides: + * - Status tracking (running / waiting / completed / failed) + * - Signal delivery to resume waiting nodes + * - Time advancement for testing with TestClock + * - A result promise that resolves when the flow completes + */ +export class Execution { + private _status: ExecutionStatus = 'running' + private _waitingNodeId?: string + private _waitingFor?: string + private _result?: ExecutionResult + private _error?: Error + private readonly resultPromise: Promise + private resolveResult!: (result: ExecutionResult) => void + private rejectResult!: (error: Error) => void + private ttlTimerId?: ReturnType + + constructor( + private readonly adapter: PlainAdapter, + private readonly clock: Clock, + private readonly validateSignal?: ValidateSignalFn, + private readonly ttl?: number, + ) { + this.resultPromise = new Promise((resolve, reject) => { + this.resolveResult = resolve + this.rejectResult = reject + }) + } + + /** Current execution status. */ + get status(): ExecutionStatus { + return this._status + } + + /** The event name the execution is currently waiting for, if any. */ + get waitingFor(): string | undefined { + return this._waitingFor + } + + /** Promise that resolves with the final result when the flow completes. */ + get result(): Promise { + return this.resultPromise + } + + /** The synchronous result snapshot, available after status is 'completed'. */ + get completedResult(): ExecutionResult | undefined { + return this._result + } + + /** The error, available after status is 'failed'. */ + get error(): Error | undefined { + return this._error + } + + /** Send a signal to resume a waiting execution. */ + signal(eventName: string, data?: unknown): void { + if (this._status !== 'waiting') { + throw new Error(`Cannot signal execution in '${this._status}' state`) + } + + if (this.validateSignal) { + this.validateSignal(eventName, data) + } + + const payload = data ?? {} + const delivered = this.adapter.deliverSignal(this._waitingNodeId!, eventName, payload) + if (!delivered) { + throw new Error(`No pending wait for event '${eventName}'`) + } + } + + /** Advance synthetic time (TestClock only). Throws if using RealClock. */ + advanceTime(duration: string): void { + if (!(this.clock instanceof TestClock)) { + throw new Error('advanceTime() is only available with TestClock') + } + const ms = parseDuration(duration) + this.clock.advance(ms) + } + + // ── Internal methods called by the engine ────────────────────────── + + /** @internal */ + setWaiting(nodeId: string, eventName: string): void { + this._status = 'waiting' + this._waitingNodeId = nodeId + this._waitingFor = eventName + this.startTtl() + } + + /** @internal */ + setRunning(): void { + this._status = 'running' + this._waitingNodeId = undefined + this._waitingFor = undefined + this.clearTtl() + } + + /** @internal */ + complete(result: ExecutionResult): void { + this._status = 'completed' + this._result = result + this.clearTtl() + this.resolveResult(result) + } + + /** @internal */ + fail(error: Error): void { + this._status = 'failed' + this._error = error + this.clearTtl() + this.rejectResult(error) + } + + // ── TTL management ───────────────────────────────────────────────── + + private startTtl(): void { + this.clearTtl() + if (this.ttl != null && this.ttl > 0) { + this.ttlTimerId = this.clock.setTimeout(() => { + if (this._status === 'waiting') { + this.fail( + new Error( + `Paused execution expired after ${this.ttl}ms TTL (waiting for '${this._waitingFor}')`, + ), + ) + } + }, this.ttl) + } + } + + private clearTtl(): void { + if (this.ttlTimerId != null) { + this.clock.clearTimeout(this.ttlTimerId) + this.ttlTimerId = undefined + } + } +} diff --git a/packages/engine/src/engine/index.ts b/packages/engine/src/engine/index.ts index 1f212ef..f59a58c 100644 --- a/packages/engine/src/engine/index.ts +++ b/packages/engine/src/engine/index.ts @@ -1,5 +1,10 @@ export { FlowprintEngine } from './engine.js' export { CompiledFlow } from './compiled-flow.js' +export { Execution } from './execution.js' +export type { ExecutionStatus } from './execution.js' +export { RealClock, TestClock } from './clock.js' +export type { Clock } from './clock.js' +export { parseDuration } from './duration.js' export type { EngineOptions, EngineHooks, @@ -7,6 +12,7 @@ export type { HandlerFn, RegisterOptions, ResolvedHandler, + ValidateSignalFn, } from './types.js' // Re-export adapter types used by EngineOptions diff --git a/packages/engine/src/engine/types.ts b/packages/engine/src/engine/types.ts index d8a77be..b0b253a 100644 --- a/packages/engine/src/engine/types.ts +++ b/packages/engine/src/engine/types.ts @@ -1,5 +1,6 @@ import type { ExecutionContext, NodeExecutionRecord } from '../walker/types.js' import type { ExecutionAdapter } from '../adapters/types.js' +import type { Clock } from './clock.js' /** How a node's handler was resolved at load() time. */ export type ResolvedHandler = @@ -9,6 +10,9 @@ export type ResolvedHandler = | { type: 'entry_point'; fn: (ctx: ExecutionContext) => Promise } | { type: 'native' } // terminals, triggers, switches, waits, parallels, errors +/** Signal validation function. Throw to reject a signal. */ +export type ValidateSignalFn = (eventName: string, data: unknown) => void + /** Observability hooks. Called synchronously. Must not throw. */ export interface EngineHooks { onNodeStart?(nodeId: string, type: string, lane: string): void @@ -28,6 +32,12 @@ export interface EngineOptions { hooks?: EngineHooks /** Execution adapter for action handlers. Defaults to PlainAdapter. */ adapter?: ExecutionAdapter + /** Clock implementation for time-dependent operations. Defaults to RealClock. */ + clock?: Clock + /** Signal validation function. Called before delivering a signal to a wait node. */ + validateSignal?: ValidateSignalFn + /** TTL for paused executions in ms. Default: 3600000 (1 hour). */ + pausedExecutionTTL?: number } /** Result of a successful execution. */ diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index 7c364d3..9c6f5cd 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -58,11 +58,25 @@ export type { export { assertWithinProject } from './security/index.js' // Engine -export { FlowprintEngine, CompiledFlow } from './engine/index.js' -export type { EngineOptions, EngineHooks, ExecutionResult } from './engine/index.js' +export { + FlowprintEngine, + CompiledFlow, + Execution, + RealClock, + TestClock, + parseDuration, +} from './engine/index.js' +export type { + Clock, + ExecutionStatus, + EngineOptions, + EngineHooks, + ExecutionResult, + ValidateSignalFn, +} from './engine/index.js' // Adapters -export { PlainAdapter, ActionTimeoutError } from './adapters/index.js' +export { PlainAdapter, ActionTimeoutError, WaitTimeoutError } from './adapters/index.js' export type { ExecutionAdapter, ActionConfig } from './adapters/index.js' // Codegen