From 14569995b5e91c2debab7187cf7541157479a8ed Mon Sep 17 00:00:00 2001 From: albertgwo Date: Tue, 17 Mar 2026 21:45:32 -0400 Subject: [PATCH 1/2] feat(engine): add maxConcurrency backpressure semaphore Add an optional maxConcurrency option to EngineOptions that limits concurrent execute() calls on a CompiledFlow via a counting semaphore. When all slots are taken, subsequent callers wait until a slot frees up. Zero overhead when omitted (no semaphore created). --- packages/engine/src/engine/compiled-flow.ts | 44 +++++++++++++-------- packages/engine/src/engine/index.ts | 1 + packages/engine/src/engine/semaphore.ts | 30 ++++++++++++++ packages/engine/src/engine/types.ts | 2 + 4 files changed, 61 insertions(+), 16 deletions(-) create mode 100644 packages/engine/src/engine/semaphore.ts diff --git a/packages/engine/src/engine/compiled-flow.ts b/packages/engine/src/engine/compiled-flow.ts index e801b40..a125984 100644 --- a/packages/engine/src/engine/compiled-flow.ts +++ b/packages/engine/src/engine/compiled-flow.ts @@ -16,6 +16,7 @@ import { evaluateExpression } from '../runner/evaluator.js' import { loadRulesFile, evaluateRules } from '../rules/evaluator.js' import { buildLegacyContext } from './engine.js' import type { EngineOptions, ExecutionResult, ResolvedHandler, EngineHooks } from './types.js' +import { Semaphore } from './semaphore.js' /** * An immutable, pre-compiled flow ready for execution. @@ -24,11 +25,17 @@ import type { EngineOptions, ExecutionResult, ResolvedHandler, EngineHooks } fro * so subsequent engine mutations do not affect this instance. */ export class CompiledFlow { + private readonly semaphore: Semaphore | undefined + constructor( private readonly doc: FlowprintDocument, private readonly resolvedHandlers: ReadonlyMap, private readonly options: EngineOptions, - ) {} + ) { + if (options.maxConcurrency != null) { + this.semaphore = new Semaphore(options.maxConcurrency) + } + } /** * Execute a flow that has no wait nodes. Returns when complete. @@ -37,23 +44,28 @@ export class CompiledFlow { * on the same CompiledFlow instance do not interfere. */ async execute(input: Record): Promise { - const hooks = this.options.hooks - const projectRoot = this.options.projectRoot ?? process.cwd() - const expressionTimeout = this.options.expressionTimeout - - const callbacks = this.buildCallbacks(hooks, projectRoot, expressionTimeout) - + if (this.semaphore) await this.semaphore.acquire() try { - const result = await walkGraph(this.doc, input, callbacks) - return { - output: result.output, - trace: result.trace, - outcome: result.outcome, + const hooks = this.options.hooks + const projectRoot = this.options.projectRoot ?? process.cwd() + const expressionTimeout = this.options.expressionTimeout + + const callbacks = this.buildCallbacks(hooks, projectRoot, expressionTimeout) + + try { + const result = await walkGraph(this.doc, input, callbacks) + return { + output: result.output, + trace: result.trace, + outcome: result.outcome, + } + } catch (err: unknown) { + const error = err instanceof Error ? err : new Error(String(err)) + safeCallHook(() => hooks?.onFlowError?.(error)) + throw err } - } catch (err: unknown) { - const error = err instanceof Error ? err : new Error(String(err)) - safeCallHook(() => hooks?.onFlowError?.(error)) - throw err + } finally { + if (this.semaphore) this.semaphore.release() } } diff --git a/packages/engine/src/engine/index.ts b/packages/engine/src/engine/index.ts index 964c1f4..20a3c62 100644 --- a/packages/engine/src/engine/index.ts +++ b/packages/engine/src/engine/index.ts @@ -1,5 +1,6 @@ export { FlowprintEngine } from './engine.js' export { CompiledFlow } from './compiled-flow.js' +export { Semaphore } from './semaphore.js' export type { EngineOptions, EngineHooks, diff --git a/packages/engine/src/engine/semaphore.ts b/packages/engine/src/engine/semaphore.ts new file mode 100644 index 0000000..210373d --- /dev/null +++ b/packages/engine/src/engine/semaphore.ts @@ -0,0 +1,30 @@ +/** + * Simple counting semaphore for limiting concurrent execute() calls. + * When all slots are taken, acquire() returns a promise that resolves + * when a slot becomes available. + */ +export class Semaphore { + private current = 0 + private queue: (() => void)[] = [] + + constructor(private readonly max: number) {} + + async acquire(): Promise { + if (this.current < this.max) { + this.current++ + return + } + return new Promise((resolve) => { + this.queue.push(() => { + this.current++ + resolve() + }) + }) + } + + release(): void { + this.current-- + const next = this.queue.shift() + if (next) next() + } +} diff --git a/packages/engine/src/engine/types.ts b/packages/engine/src/engine/types.ts index b7011f9..21064ab 100644 --- a/packages/engine/src/engine/types.ts +++ b/packages/engine/src/engine/types.ts @@ -25,6 +25,8 @@ export interface EngineOptions { expressionTimeout?: number /** Observability hooks. */ hooks?: EngineHooks + /** Maximum concurrent execute() calls on a CompiledFlow. Unlimited if omitted. */ + maxConcurrency?: number } /** Result of a successful execution. */ From 0878b157cee40eda05efed73ad316f2a1a41b0d5 Mon Sep 17 00:00:00 2001 From: albertgwo Date: Tue, 17 Mar 2026 21:45:41 -0400 Subject: [PATCH 2/2] test(engine): add semaphore and backpressure tests Unit tests for the Semaphore class (acquire/release, FIFO ordering, blocking beyond max). Integration tests verifying backpressure behavior: unlimited default, maxConcurrency limiting concurrent execute() calls, and slot release allowing queued executions to proceed. --- .../src/__tests__/engine/semaphore.test.ts | 272 ++++++++++++++++++ 1 file changed, 272 insertions(+) create mode 100644 packages/engine/src/__tests__/engine/semaphore.test.ts diff --git a/packages/engine/src/__tests__/engine/semaphore.test.ts b/packages/engine/src/__tests__/engine/semaphore.test.ts new file mode 100644 index 0000000..5462117 --- /dev/null +++ b/packages/engine/src/__tests__/engine/semaphore.test.ts @@ -0,0 +1,272 @@ +import { describe, it, expect } from 'vitest' +import { Semaphore } from '../../engine/semaphore.js' +import { FlowprintEngine } from '../../engine/engine.js' +import type { ExecutionContext } from '../../walker/types.js' + +/** + * Minimal 3-node YAML: trigger -> action -> terminal. + */ +const SIMPLE_FLOW = ` +schema: flowprint/1.0 +name: test-flow +version: "1.0.0" +lanes: + default: + label: Default + visibility: internal + order: 0 +nodes: + start: + type: trigger + lane: default + label: Start + trigger_type: manual + manual: {} + next: process + process: + type: action + lane: default + label: Process + next: done + done: + type: terminal + lane: default + label: Done + outcome: success +` + +/** Create a deferred promise for controlling async flow in tests. */ +function deferred(): { + promise: Promise + resolve: (value: T) => void + reject: (reason?: unknown) => void +} { + let resolve!: (value: T) => void + let reject!: (reason?: unknown) => void + const promise = new Promise((res, rej) => { + resolve = res + reject = rej + }) + return { promise, resolve, reject } +} + +describe('Semaphore', () => { + describe('unit tests', () => { + it('acquires up to max without blocking', async () => { + const sem = new Semaphore(3) + + // All three should resolve immediately + await sem.acquire() + await sem.acquire() + await sem.acquire() + // If we got here, none blocked + }) + + it('acquire beyond max blocks until release', async () => { + const sem = new Semaphore(1) + await sem.acquire() + + let acquired = false + const pending = sem.acquire().then(() => { + acquired = true + }) + + // Give microtask queue a chance to flush + await Promise.resolve() + expect(acquired).toBe(false) + + sem.release() + await pending + expect(acquired).toBe(true) + }) + + it('release unblocks waiting acquires in FIFO order', async () => { + const sem = new Semaphore(1) + await sem.acquire() + + const order: number[] = [] + + const p1 = sem.acquire().then(() => { + order.push(1) + }) + const p2 = sem.acquire().then(() => { + order.push(2) + }) + const p3 = sem.acquire().then(() => { + order.push(3) + }) + + // Release one at a time and verify FIFO + sem.release() + await p1 + + sem.release() + await p2 + + sem.release() + await p3 + + expect(order).toEqual([1, 2, 3]) + }) + + it('multiple waiters: release unblocks one at a time', async () => { + const sem = new Semaphore(2) + await sem.acquire() + await sem.acquire() + + let count = 0 + const p1 = sem.acquire().then(() => { + count++ + }) + const p2 = sem.acquire().then(() => { + count++ + }) + + await Promise.resolve() + expect(count).toBe(0) + + // Release one slot — only one waiter should proceed + sem.release() + await p1 + expect(count).toBe(1) + + // Release another slot — the second waiter should proceed + sem.release() + await p2 + expect(count).toBe(2) + }) + }) + + describe('backpressure integration', () => { + it('default: unlimited concurrency — multiple execute() calls run simultaneously', async () => { + const engine = new FlowprintEngine() + + const barriers = [deferred(), deferred(), deferred()] + let callIndex = 0 + const started: number[] = [] + + engine.register('process', async (_ctx: ExecutionContext) => { + const idx = callIndex++ + started.push(idx) + await barriers[idx]!.promise + return { idx } + }) + + const flow = await engine.load(SIMPLE_FLOW) + + // Launch 3 concurrent executions + const p1 = flow.execute({ id: 1 }) + const p2 = flow.execute({ id: 2 }) + const p3 = flow.execute({ id: 3 }) + + // Give microtasks time to schedule + await Promise.resolve() + await Promise.resolve() + + // All three should have started simultaneously (no backpressure) + expect(started.length).toBe(3) + + // Resolve all barriers + barriers[0]!.resolve() + barriers[1]!.resolve() + barriers[2]!.resolve() + + const [r1, r2, r3] = await Promise.all([p1, p2, p3]) + expect(r1.outcome).toBe('success') + expect(r2.outcome).toBe('success') + expect(r3.outcome).toBe('success') + }) + + it('maxConcurrency: 2 — third call waits until a slot is free', async () => { + const engine = new FlowprintEngine({ maxConcurrency: 2 }) + + const barriers = [deferred(), deferred(), deferred()] + let callIndex = 0 + const started: number[] = [] + + engine.register('process', async (_ctx: ExecutionContext) => { + const idx = callIndex++ + started.push(idx) + await barriers[idx]!.promise + return { idx } + }) + + const flow = await engine.load(SIMPLE_FLOW) + + // Launch 3 concurrent executions + const p1 = flow.execute({ id: 1 }) + const p2 = flow.execute({ id: 2 }) + const p3 = flow.execute({ id: 3 }) + + // Give microtasks time to schedule + await Promise.resolve() + await Promise.resolve() + + // Only first 2 should have started (semaphore max = 2) + expect(started.length).toBe(2) + + // Complete the first execution — this frees a slot + barriers[0]!.resolve() + await p1 + + // Give microtasks time to propagate semaphore release + await Promise.resolve() + await Promise.resolve() + + // Third should now have started + expect(started.length).toBe(3) + + // Resolve remaining + barriers[1]!.resolve() + barriers[2]!.resolve() + + const [r2, r3] = await Promise.all([p2, p3]) + expect(r2.outcome).toBe('success') + expect(r3.outcome).toBe('success') + }) + + it('completed execution releases slot, allowing waiting execution to proceed', async () => { + const engine = new FlowprintEngine({ maxConcurrency: 1 }) + + const barriers = [deferred(), deferred()] + let callIndex = 0 + const started: number[] = [] + + engine.register('process', async (_ctx: ExecutionContext) => { + const idx = callIndex++ + started.push(idx) + await barriers[idx]!.promise + return { idx } + }) + + const flow = await engine.load(SIMPLE_FLOW) + + // Launch 2 concurrent executions with maxConcurrency=1 + const p1 = flow.execute({ id: 1 }) + const p2 = flow.execute({ id: 2 }) + + // Give microtasks time to schedule + await Promise.resolve() + await Promise.resolve() + + // Only first should have started + expect(started.length).toBe(1) + + // Complete the first execution + barriers[0]!.resolve() + await p1 + + // Give microtasks time to propagate + await Promise.resolve() + await Promise.resolve() + + // Second should now have started + expect(started.length).toBe(2) + + // Complete the second + barriers[1]!.resolve() + const r2 = await p2 + expect(r2.outcome).toBe('success') + }) + }) +})