Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 272 additions & 0 deletions packages/engine/src/__tests__/engine/semaphore.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
import { describe, it, expect } from 'vitest'
import { Semaphore } from '../../engine/semaphore.js'
import { FlowprintEngine } from '../../engine/engine.js'
import type { ExecutionContext } from '../../walker/types.js'

/**
* Minimal 3-node YAML: trigger -> action -> terminal.
*/
const SIMPLE_FLOW = `
schema: flowprint/1.0
name: test-flow
version: "1.0.0"
lanes:
default:
label: Default
visibility: internal
order: 0
nodes:
start:
type: trigger
lane: default
label: Start
trigger_type: manual
manual: {}
next: process
process:
type: action
lane: default
label: Process
next: done
done:
type: terminal
lane: default
label: Done
outcome: success
`

/** Create a deferred promise for controlling async flow in tests. */
function deferred<T = void>(): {
promise: Promise<T>
resolve: (value: T) => void
reject: (reason?: unknown) => void
} {
let resolve!: (value: T) => void
let reject!: (reason?: unknown) => void
const promise = new Promise<T>((res, rej) => {
resolve = res
reject = rej
})
return { promise, resolve, reject }
}

describe('Semaphore', () => {
describe('unit tests', () => {
it('acquires up to max without blocking', async () => {
const sem = new Semaphore(3)

// All three should resolve immediately
await sem.acquire()
await sem.acquire()
await sem.acquire()
// If we got here, none blocked
})

it('acquire beyond max blocks until release', async () => {
const sem = new Semaphore(1)
await sem.acquire()

let acquired = false
const pending = sem.acquire().then(() => {
acquired = true
})

// Give microtask queue a chance to flush
await Promise.resolve()
expect(acquired).toBe(false)

sem.release()
await pending
expect(acquired).toBe(true)
})

it('release unblocks waiting acquires in FIFO order', async () => {
const sem = new Semaphore(1)
await sem.acquire()

const order: number[] = []

const p1 = sem.acquire().then(() => {
order.push(1)
})
const p2 = sem.acquire().then(() => {
order.push(2)
})
const p3 = sem.acquire().then(() => {
order.push(3)
})

// Release one at a time and verify FIFO
sem.release()
await p1

sem.release()
await p2

sem.release()
await p3

expect(order).toEqual([1, 2, 3])
})

it('multiple waiters: release unblocks one at a time', async () => {
const sem = new Semaphore(2)
await sem.acquire()
await sem.acquire()

let count = 0
const p1 = sem.acquire().then(() => {
count++
})
const p2 = sem.acquire().then(() => {
count++
})

await Promise.resolve()
expect(count).toBe(0)

// Release one slot — only one waiter should proceed
sem.release()
await p1
expect(count).toBe(1)

// Release another slot — the second waiter should proceed
sem.release()
await p2
expect(count).toBe(2)
})
})

describe('backpressure integration', () => {
it('default: unlimited concurrency — multiple execute() calls run simultaneously', async () => {
const engine = new FlowprintEngine()

const barriers = [deferred(), deferred(), deferred()]
let callIndex = 0
const started: number[] = []

engine.register('process', async (_ctx: ExecutionContext) => {
const idx = callIndex++
started.push(idx)
await barriers[idx]!.promise
return { idx }
})

const flow = await engine.load(SIMPLE_FLOW)

// Launch 3 concurrent executions
const p1 = flow.execute({ id: 1 })
const p2 = flow.execute({ id: 2 })
const p3 = flow.execute({ id: 3 })

// Give microtasks time to schedule
await Promise.resolve()
await Promise.resolve()

// All three should have started simultaneously (no backpressure)
expect(started.length).toBe(3)

// Resolve all barriers
barriers[0]!.resolve()
barriers[1]!.resolve()
barriers[2]!.resolve()

const [r1, r2, r3] = await Promise.all([p1, p2, p3])
expect(r1.outcome).toBe('success')
expect(r2.outcome).toBe('success')
expect(r3.outcome).toBe('success')
})

it('maxConcurrency: 2 — third call waits until a slot is free', async () => {
const engine = new FlowprintEngine({ maxConcurrency: 2 })

const barriers = [deferred(), deferred(), deferred()]
let callIndex = 0
const started: number[] = []

engine.register('process', async (_ctx: ExecutionContext) => {
const idx = callIndex++
started.push(idx)
await barriers[idx]!.promise
return { idx }
})

const flow = await engine.load(SIMPLE_FLOW)

// Launch 3 concurrent executions
const p1 = flow.execute({ id: 1 })
const p2 = flow.execute({ id: 2 })
const p3 = flow.execute({ id: 3 })

// Give microtasks time to schedule
await Promise.resolve()
await Promise.resolve()

// Only first 2 should have started (semaphore max = 2)
expect(started.length).toBe(2)

// Complete the first execution — this frees a slot
barriers[0]!.resolve()
await p1

// Give microtasks time to propagate semaphore release
await Promise.resolve()
await Promise.resolve()

// Third should now have started
expect(started.length).toBe(3)

// Resolve remaining
barriers[1]!.resolve()
barriers[2]!.resolve()

const [r2, r3] = await Promise.all([p2, p3])
expect(r2.outcome).toBe('success')
expect(r3.outcome).toBe('success')
})

it('completed execution releases slot, allowing waiting execution to proceed', async () => {
const engine = new FlowprintEngine({ maxConcurrency: 1 })

const barriers = [deferred(), deferred()]
let callIndex = 0
const started: number[] = []

engine.register('process', async (_ctx: ExecutionContext) => {
const idx = callIndex++
started.push(idx)
await barriers[idx]!.promise
return { idx }
})

const flow = await engine.load(SIMPLE_FLOW)

// Launch 2 concurrent executions with maxConcurrency=1
const p1 = flow.execute({ id: 1 })
const p2 = flow.execute({ id: 2 })

// Give microtasks time to schedule
await Promise.resolve()
await Promise.resolve()

// Only first should have started
expect(started.length).toBe(1)

// Complete the first execution
barriers[0]!.resolve()
await p1

// Give microtasks time to propagate
await Promise.resolve()
await Promise.resolve()

// Second should now have started
expect(started.length).toBe(2)

// Complete the second
barriers[1]!.resolve()
const r2 = await p2
expect(r2.outcome).toBe('success')
})
})
})
44 changes: 28 additions & 16 deletions packages/engine/src/engine/compiled-flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { evaluateExpression } from '../runner/evaluator.js'
import { loadRulesFile, evaluateRules } from '../rules/evaluator.js'
import { buildLegacyContext } from './engine.js'
import type { EngineOptions, ExecutionResult, ResolvedHandler, EngineHooks } from './types.js'
import { Semaphore } from './semaphore.js'

/**
* An immutable, pre-compiled flow ready for execution.
Expand All @@ -24,11 +25,17 @@ import type { EngineOptions, ExecutionResult, ResolvedHandler, EngineHooks } fro
* so subsequent engine mutations do not affect this instance.
*/
export class CompiledFlow {
private readonly semaphore: Semaphore | undefined

constructor(
private readonly doc: FlowprintDocument,
private readonly resolvedHandlers: ReadonlyMap<string, ResolvedHandler>,
private readonly options: EngineOptions,
) {}
) {
if (options.maxConcurrency != null) {
this.semaphore = new Semaphore(options.maxConcurrency)
Comment on lines +35 to +36
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

options.maxConcurrency is passed into new Semaphore without validation — should we validate or clamp it to a positive integer before constructing the Semaphore and add defensive checks in the Semaphore constructor?

Finding type: Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/compiled-flow.ts around lines 35 to 36, the constructor
currently does new Semaphore(options.maxConcurrency) without validating the value.
Change this to validate that options.maxConcurrency is a positive integer before
constructing the Semaphore: if it is null/undefined leave semaphore undefined
(unlimited), if it is a positive integer create the semaphore, and if it is
zero/negative or non-integer either clamp to a minimum of 1 or throw a clear
configuration error (prefer throwing to avoid surprising behavior). Also add a short
defensive check in packages/engine/src/engine/semaphore.ts constructor to assert max is
a positive integer and throw on invalid input so invalid configs cannot silently
deadlock the engine.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

}
}

/**
* Execute a flow that has no wait nodes. Returns when complete.
Expand All @@ -37,23 +44,28 @@ export class CompiledFlow {
* on the same CompiledFlow instance do not interfere.
*/
async execute(input: Record<string, unknown>): Promise<ExecutionResult> {
const hooks = this.options.hooks
const projectRoot = this.options.projectRoot ?? process.cwd()
const expressionTimeout = this.options.expressionTimeout

const callbacks = this.buildCallbacks(hooks, projectRoot, expressionTimeout)

if (this.semaphore) await this.semaphore.acquire()
try {
const result = await walkGraph(this.doc, input, callbacks)
return {
output: result.output,
trace: result.trace,
outcome: result.outcome,
const hooks = this.options.hooks
const projectRoot = this.options.projectRoot ?? process.cwd()
const expressionTimeout = this.options.expressionTimeout

const callbacks = this.buildCallbacks(hooks, projectRoot, expressionTimeout)

try {
const result = await walkGraph(this.doc, input, callbacks)
return {
output: result.output,
trace: result.trace,
outcome: result.outcome,
}
} catch (err: unknown) {
const error = err instanceof Error ? err : new Error(String(err))
safeCallHook(() => hooks?.onFlowError?.(error))
throw err
}
} catch (err: unknown) {
const error = err instanceof Error ? err : new Error(String(err))
safeCallHook(() => hooks?.onFlowError?.(error))
throw err
} finally {
if (this.semaphore) this.semaphore.release()
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/engine/src/engine/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export { FlowprintEngine } from './engine.js'
export { CompiledFlow } from './compiled-flow.js'
export { Semaphore } from './semaphore.js'
Comment on lines 1 to +3
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruminaider/flowprint-engine was changed without a .changeset entry, should we add a Changeset to bump its version and review CLAUDE.md?

Finding type: AI Coding Guidelines | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/index.ts around lines 1-3 the new export `Semaphore` was
added which changes the public API of the @ruminaider/flowprint-engine package. Add a
new file in packages/engine/.changeset (e.g., add-semaphore.md) that names the changed
package (@ruminaider/flowprint-engine) and specifies a version bump (patch if
backwards-compatible, minor if you consider this a feature), with a short description
like "Export Semaphore from engine" so the release tooling will bump the package and
publish it. Also confirm CLAUDE.md lines 89-91 are satisfied and that the
repository-level .changeset config will pick up the new file.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

export type {
EngineOptions,
EngineHooks,
Expand Down
30 changes: 30 additions & 0 deletions packages/engine/src/engine/semaphore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Simple counting semaphore for limiting concurrent execute() calls.
* When all slots are taken, acquire() returns a promise that resolves
* when a slot becomes available.
*/
export class Semaphore {
private current = 0
private queue: (() => void)[] = []

constructor(private readonly max: number) {}

async acquire(): Promise<void> {
if (this.current < this.max) {
this.current++
return
}
return new Promise<void>((resolve) => {
this.queue.push(() => {
this.current++
resolve()
})
})
}

release(): void {
this.current--
const next = this.queue.shift()
if (next) next()
}
}
2 changes: 2 additions & 0 deletions packages/engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export interface EngineOptions {
expressionTimeout?: number
/** Observability hooks. */
hooks?: EngineHooks
/** Maximum concurrent execute() calls on a CompiledFlow. Unlimited if omitted. */
maxConcurrency?: number
}

/** Result of a successful execution. */
Expand Down