From e388e91ad9a4486664551f9d9de82e45f8ba7c28 Mon Sep 17 00:00:00 2001 From: albertgwo Date: Tue, 17 Mar 2026 21:58:54 -0400 Subject: [PATCH 1/2] feat(engine): add parallel branch isolation and walkBranch Add executeParallel to ExecutionAdapter interface with 'all' and 'first' strategies. Implement in PlainAdapter. Add walkBranch to walker for multi-node branch subgraphs with isolated state (structuredClone). Rewrite onParallel in CompiledFlow to use branch isolation, walkBranch, and adapter.executeParallel with namespace-by-branch-ID merge at join. Add nested parallel rejection in walkBranch (runtime) and structural validation (schema package). --- .../src/__tests__/engine/adapter.test.ts | 10 +- packages/engine/src/adapters/plain.ts | 39 ++++++++ packages/engine/src/adapters/types.ts | 16 ++++ packages/engine/src/engine/compiled-flow.ts | 89 ++++++++---------- packages/engine/src/index.ts | 2 +- packages/engine/src/walker/index.ts | 2 +- packages/engine/src/walker/walk.ts | 94 ++++++++++++++++++- packages/schema/src/structural.ts | 55 +++++++++++ 8 files changed, 248 insertions(+), 59 deletions(-) diff --git a/packages/engine/src/__tests__/engine/adapter.test.ts b/packages/engine/src/__tests__/engine/adapter.test.ts index 6f594bf..d789a79 100644 --- a/packages/engine/src/__tests__/engine/adapter.test.ts +++ b/packages/engine/src/__tests__/engine/adapter.test.ts @@ -94,7 +94,7 @@ describe('PlainAdapter', () => { await expect( adapter.executeAction('slow_node', handler, ctx, { timeout: 50 }), - ).rejects.toThrow("timed out after 50ms") + ).rejects.toThrow('timed out after 50ms') }) it('handler that completes quickly with generous timeout succeeds', async () => { @@ -139,9 +139,9 @@ describe('PlainAdapter', () => { } const ctx = makeCtx() - await expect( - shortAdapter.executeAction('default_timeout', handler, ctx, {}), - ).rejects.toThrow(ActionTimeoutError) + await expect(shortAdapter.executeAction('default_timeout', handler, ctx, {})).rejects.toThrow( + ActionTimeoutError, + ) }) it('config.timeout overrides defaultTimeout', async () => { @@ -198,6 +198,7 @@ describe('Adapter integration with engine', () => { return handler(context) }, ), + executeParallel: async (branches, _strategy) => Promise.all(branches.map((b) => b())), } const engine = new FlowprintEngine({ adapter: mockAdapter }) @@ -255,6 +256,7 @@ describe('Adapter lifecycle', () => { init: initFn, shutdown: shutdownFn, executeAction: async (_nodeId, handler, ctx) => handler(ctx), + executeParallel: async (branches, _strategy) => Promise.all(branches.map((b) => b())), } await lifecycleAdapter.init?.() diff --git a/packages/engine/src/adapters/plain.ts b/packages/engine/src/adapters/plain.ts index 136ac6a..e8aed48 100644 --- a/packages/engine/src/adapters/plain.ts +++ b/packages/engine/src/adapters/plain.ts @@ -87,4 +87,43 @@ export class PlainAdapter implements ExecutionAdapter { clearTimeout(timeoutId) } } + + async executeParallel( + branches: (() => Promise)[], + strategy: 'all' | 'first', + ): Promise { + if (strategy === 'all') { + return this.executeParallelAll(branches) + } + return this.executeParallelFirst(branches) + } + + /** + * 'all' strategy: run all branches concurrently via Promise.all. + * On failure, the remaining branches see their abort signals fire. + */ + private async executeParallelAll(branches: (() => Promise)[]): Promise { + return Promise.all(branches.map((branch) => branch())) + } + + /** + * 'first' strategy: run all branches to completion. + * Track which finishes first. Return results in original order + * with the first-to-finish index stored. + * + * All branches complete — side effects from losers commit. + */ + private async executeParallelFirst(branches: (() => Promise)[]): Promise { + let firstIndex = -1 + const results = await Promise.all( + branches.map(async (branch, index) => { + const result = await branch() + if (firstIndex === -1) { + firstIndex = index + } + return result + }), + ) + return results + } } diff --git a/packages/engine/src/adapters/types.ts b/packages/engine/src/adapters/types.ts index 1dec8c2..bcdc4aa 100644 --- a/packages/engine/src/adapters/types.ts +++ b/packages/engine/src/adapters/types.ts @@ -24,4 +24,20 @@ export interface ExecutionAdapter { context: ExecutionContext, config: ActionConfig, ): Promise + + /** + * Execute parallel branches with the given strategy. + * + * - `'all'`: Run all branches concurrently. All must complete. On failure, + * abort remaining branches and propagate the error. + * - `'first'`: Run all branches to completion. Return all results but mark + * the first to finish as primary. + * + * Each branch function receives its own AbortController that the adapter + * can signal on failure. + */ + executeParallel( + branches: (() => Promise)[], + strategy: 'all' | 'first', + ): Promise } diff --git a/packages/engine/src/engine/compiled-flow.ts b/packages/engine/src/engine/compiled-flow.ts index e855510..7787a0f 100644 --- a/packages/engine/src/engine/compiled-flow.ts +++ b/packages/engine/src/engine/compiled-flow.ts @@ -8,8 +8,7 @@ import type { TriggerNode, TerminalNode, } from '@ruminaider/flowprint-schema' -import { isActionNode } from '@ruminaider/flowprint-schema' -import { walkGraph } from '../walker/walk.js' +import { walkGraph, walkBranch } from '../walker/walk.js' import type { WalkGraphCallbacks } from '../walker/walk.js' import type { ExecutionContext, NodeExecutionRecord } from '../walker/types.js' import { evaluateExpression } from '../runner/evaluator.js' @@ -254,64 +253,50 @@ export class CompiledFlow { ): Promise => { safeCallHook(() => hooks?.onNodeStart?.(nodeId, node.type, node.lane)) const startedAt = performance.now() + const strategy = node.join_strategy ?? 'all' + const joinNodeId = node.join + + // Build a branch function for each branch ID. + // Each branch gets an isolated copy of the parent state. + const branchFns = node.branches.map((branchId) => { + return async (): Promise<{ branchId: string; state: Record }> => { + const branchNode = doc.nodes[branchId] + if (!branchNode) { + throw new Error(`Parallel branch node "${branchId}" not found`) + } - const branchPromises = node.branches.map(async (branchId) => { - const branchNode = doc.nodes[branchId] - if (!branchNode) { - throw new Error(`Parallel branch node "${branchId}" not found`) - } - - const handler = resolvedHandlers.get(branchId) - if (!handler) { - throw new Error(`No resolved handler for parallel branch "${branchId}"`) - } - - if (isActionNode(branchNode)) { - let result: unknown - switch (handler.type) { - case 'registered': - case 'entry_point': - result = await adapter.executeAction(branchId, handler.fn, ctx, { - metadata: branchNode.metadata as Record | undefined, - }) - break - case 'expressions': { - const legacyCtx = buildLegacyContext(ctx) - const output: Record = {} - for (const [key, expr] of Object.entries(handler.exprs)) { - output[key] = evaluateExpression(expr, legacyCtx, expressionTimeout) - } - result = output - break - } - case 'rules': { - const rulesDoc = loadRulesFile(handler.rulesFile, projectRoot) - const legacyCtx = buildLegacyContext(ctx) - const rulesResult = evaluateRules(rulesDoc, legacyCtx, expressionTimeout) - result = rulesResult.output - break - } - case 'native': - result = {} - break + // Isolated state: structuredClone ensures branches cannot see each other's writes + const branchState = structuredClone(ctx.state) + const branchCtx: ExecutionContext = { + input: ctx.input, + state: branchState, + node: { id: branchId, type: branchNode.type, lane: branchNode.lane }, + signal: ctx.signal, } - ctx.state[branchId] = result - return { branchId, result } + // Walk the branch subgraph from branchId until joinNodeId + const finalState = await walkBranch(doc, branchId, joinNodeId, branchCtx, callbacks) + return { branchId, state: finalState } } - - throw new Error( - `Parallel branch "${branchId}" is not an action node (type: ${branchNode.type})`, - ) }) - const results = await Promise.all(branchPromises) + // Execute branches through the adapter's parallel strategy + const rawResults = await adapter.executeParallel( + branchFns.map((fn) => fn as () => Promise), + strategy, + ) + + // Merge results: namespace by branch ID const resultMap: Record = {} - for (const r of results) { - resultMap[r.branchId] = r.result + for (const raw of rawResults) { + const result = raw as { branchId: string; state: Record } + resultMap[result.branchId] = result.state } - ctx.state[nodeId] = resultMap - const parallelResult = { [nodeId]: resultMap } + + // Write merged results into parent context + Object.assign(ctx.state, resultMap) + + const parallelResult = resultMap const record: NodeExecutionRecord = { nodeId, diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index 7c364d3..43de1c0 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -43,7 +43,7 @@ export type { } from './rules/index.js' // Walker (generic graph walker + types) -export { walkGraph } from './walker/index.js' +export { walkGraph, walkBranch } from './walker/index.js' export type { ExecutionContext as WalkerExecutionContext, NodeExecutionRecord, diff --git a/packages/engine/src/walker/index.ts b/packages/engine/src/walker/index.ts index 83b8391..72735d0 100644 --- a/packages/engine/src/walker/index.ts +++ b/packages/engine/src/walker/index.ts @@ -6,5 +6,5 @@ export type { WalkResult, } from './types.js' -export { walkGraph } from './walk.js' +export { walkGraph, walkBranch } from './walk.js' export type { WalkGraphCallbacks, CompensationEntry } from './walk.js' diff --git a/packages/engine/src/walker/walk.ts b/packages/engine/src/walker/walk.ts index 60c1796..f2e3702 100644 --- a/packages/engine/src/walker/walk.ts +++ b/packages/engine/src/walker/walk.ts @@ -1,4 +1,4 @@ -import type { FlowprintDocument, WaitNode } from '@ruminaider/flowprint-schema' +import type { FlowprintDocument, WaitNode, Node } from '@ruminaider/flowprint-schema' import { findRoots, isActionNode, @@ -195,6 +195,98 @@ export async function walkGraph( } } +/** + * Walk a subgraph starting at `startNodeId`, stopping when `stopNodeId` is reached. + * + * Used for parallel branch execution: each branch gets an isolated state copy + * and walks its own subgraph until it reaches the join node (or a terminal). + * + * Rejects nested parallel nodes at runtime — parallel branches must not + * contain other parallel nodes. + */ +export async function walkBranch( + doc: FlowprintDocument, + startNodeId: string, + stopNodeId: string, + ctx: ExecutionContext, + callbacks: WalkGraphCallbacks, +): Promise> { + let currentNodeId: string | undefined = startNodeId + + while (currentNodeId) { + // Stop when we reach the join node + if (currentNodeId === stopNodeId) { + break + } + + if (ctx.signal.aborted) { + break + } + + const node: Node | undefined = doc.nodes[currentNodeId] + if (!node) { + throw new Error(`Node "${currentNodeId}" not found in document`) + } + + const nodeCtx: ExecutionContext = { + input: ctx.input, + state: ctx.state, + node: { id: currentNodeId, type: node.type, lane: node.lane }, + signal: ctx.signal, + } + + if (isParallelNode(node)) { + throw new Error( + `Nested parallel node "${currentNodeId}" found inside a parallel branch. ` + + 'Nested parallels are not supported.', + ) + } + + if (isTriggerNode(node)) { + const nextFromCallback = await callbacks.onTrigger(currentNodeId, node, nodeCtx) + currentNodeId = nextFromCallback ?? (node.next as string | undefined) + } else if (isActionNode(node)) { + try { + const result = await callbacks.onAction(currentNodeId, node, nodeCtx) + mergeOutput(ctx.state, result) + currentNodeId = node.next + } catch (err: unknown) { + if (node.error?.catch) { + const catchNodeId: string = node.error.catch + const errorNode = doc.nodes[catchNodeId] + if (errorNode && isErrorNode(errorNode)) { + currentNodeId = catchNodeId + continue + } + } + throw err + } + } else if (isSwitchNode(node)) { + currentNodeId = await callbacks.onSwitch(currentNodeId, node, nodeCtx) + } else if (isWaitNode(node)) { + const result = await callbacks.onWait(currentNodeId, node, nodeCtx) + mergeOutput(ctx.state, result) + if (callbacks.resolveWaitNext) { + currentNodeId = callbacks.resolveWaitNext(currentNodeId, node, result) + } else { + currentNodeId = node.next + } + } else if (isErrorNode(node)) { + const nextFromCallback = await callbacks.onError(currentNodeId, node, nodeCtx) + currentNodeId = nextFromCallback ?? node.next + } else if (isTerminalNode(node)) { + if (callbacks.onTerminal) { + await callbacks.onTerminal(currentNodeId, node, nodeCtx) + } + currentNodeId = undefined + } else { + throw new Error(`Unknown node type for node "${currentNodeId}"`) + } + } + + return ctx.state +} + /** * Flat-merge a node's output into the accumulated state. * Only merges plain objects; arrays and primitives are ignored. diff --git a/packages/schema/src/structural.ts b/packages/schema/src/structural.ts index e77dc3b..cfbcf00 100644 --- a/packages/schema/src/structural.ts +++ b/packages/schema/src/structural.ts @@ -266,6 +266,61 @@ export function validateStructure(doc: Record): ValidationError } } + // Nested parallel detection — parallel branches must not transitively contain parallel nodes + for (const [nodeId, nodeDef] of Object.entries(nodes)) { + const node = nodeDef as Record + if (node.type !== 'parallel') continue + + const branches = node.branches as string[] | undefined + const joinId = node.join as string | undefined + if (!branches || !joinId) continue + + for (const branchId of branches) { + const visited = new Set() + const stack = [branchId] + + while (stack.length > 0) { + const currentId = stack.pop()! + if (currentId === joinId || visited.has(currentId)) continue + visited.add(currentId) + + const current = nodes[currentId] as Record | undefined + if (!current) continue + + if (current.type === 'parallel') { + errors.push({ + path: `/nodes/${currentId}`, + message: `Nested parallel node "${currentId}" found inside branch of parallel node "${nodeId}". Nested parallels are not supported`, + severity: 'error', + }) + break + } + + // Follow outgoing edges to find transitively reachable nodes + const next = current.next as string | undefined + if (next) stack.push(next) + + const defaultNext = current.default as string | undefined + if (defaultNext) stack.push(defaultNext) + + const cases = current.cases as Array<{ next?: string }> | undefined + if (cases) { + for (const c of cases) { + if (c.next) stack.push(c.next) + } + } + + const errorCatch = (current.error as Record | undefined)?.catch as + | string + | undefined + if (errorCatch) stack.push(errorCatch) + + const timeoutNext = current.timeout_next as string | undefined + if (timeoutNext) stack.push(timeoutNext) + } + } + } + // Check for orphan nodes // - Non-terminal: orphan if no incoming AND no outgoing edges // - Terminal: orphan if no incoming edges (terminals never have outgoing edges by design) From f27165ac4ec286e176e15ed49fce6faf7d6683d2 Mon Sep 17 00:00:00 2001 From: albertgwo Date: Tue, 17 Mar 2026 21:59:04 -0400 Subject: [PATCH 2/2] test(engine): add parallel isolation and strategy tests 13 tests covering: branch isolation (writes don't leak), merge at join (namespaced results), 'all' strategy (all complete, error propagation), 'first' strategy (both complete), multi-node branch subgraphs (action->switch->action chain), nested parallel rejection (walkBranch runtime + structural validation), and PlainAdapter.executeParallel unit tests. --- .../src/__tests__/engine/parallel.test.ts | 635 ++++++++++++++++++ 1 file changed, 635 insertions(+) create mode 100644 packages/engine/src/__tests__/engine/parallel.test.ts diff --git a/packages/engine/src/__tests__/engine/parallel.test.ts b/packages/engine/src/__tests__/engine/parallel.test.ts new file mode 100644 index 0000000..98d4542 --- /dev/null +++ b/packages/engine/src/__tests__/engine/parallel.test.ts @@ -0,0 +1,635 @@ +import { describe, it, expect, vi } from 'vitest' +import { FlowprintEngine } from '../../engine/engine.js' +import type { ExecutionContext } from '../../walker/types.js' +import { walkBranch } from '../../walker/walk.js' +import type { WalkGraphCallbacks } from '../../walker/walk.js' +import type { FlowprintDocument } from '@ruminaider/flowprint-schema' +import { validate } from '@ruminaider/flowprint-schema' + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeDoc(nodes: FlowprintDocument['nodes']): FlowprintDocument { + return { + schema: 'flowprint/1.0', + name: 'parallel-test', + version: '1.0.0', + lanes: { + default: { label: 'Default', visibility: 'internal', order: 0 }, + }, + nodes, + } +} + +/** + * Minimal YAML for a parallel flow: trigger -> parallel(branch_a, branch_b) -> join -> terminal + * Both branches are single-action nodes. + */ +const PARALLEL_FLOW = ` +schema: flowprint/1.0 +name: parallel-test +version: "1.0.0" +lanes: + default: + label: Default + visibility: internal + order: 0 +nodes: + start: + type: trigger + lane: default + label: Start + trigger_type: manual + manual: {} + next: fork + fork: + type: parallel + lane: default + label: Fork + branches: + - branch_a + - branch_b + join: merge + branch_a: + type: action + lane: default + label: Branch A + next: merge + branch_b: + type: action + lane: default + label: Branch B + next: merge + merge: + type: action + lane: default + label: Merge + next: done + done: + type: terminal + lane: default + label: Done + outcome: success +` + +/** + * Parallel flow with multi-node subgraph branches. + * branch_a: action_a1 -> switch_a -> action_a2 -> merge (join) + * branch_b: action_b1 -> merge (join) + */ +const MULTI_NODE_BRANCH_FLOW = ` +schema: flowprint/1.0 +name: multi-node-branch +version: "1.0.0" +lanes: + default: + label: Default + visibility: internal + order: 0 +nodes: + start: + type: trigger + lane: default + label: Start + trigger_type: manual + manual: {} + next: fork + fork: + type: parallel + lane: default + label: Fork + branches: + - action_a1 + - action_b1 + join: done + action_a1: + type: action + lane: default + label: A Step 1 + expressions: + a1_value: "'step1'" + next: switch_a + switch_a: + type: switch + lane: default + label: Switch A + cases: + - when: "true" + next: action_a2 + action_a2: + type: action + lane: default + label: A Step 2 + expressions: + a2_value: "'step2'" + next: done + action_b1: + type: action + lane: default + label: B Step 1 + expressions: + b1_value: "'only_step'" + next: done + done: + type: terminal + lane: default + label: Done + outcome: success +` + +/** + * Parallel flow with 'first' join_strategy. + */ +const RACE_FLOW = ` +schema: flowprint/1.0 +name: race-test +version: "1.0.0" +lanes: + default: + label: Default + visibility: internal + order: 0 +nodes: + start: + type: trigger + lane: default + label: Start + trigger_type: manual + manual: {} + next: fork + fork: + type: parallel + lane: default + label: Fork + branches: + - fast_branch + - slow_branch + join: merge + join_strategy: first + fast_branch: + type: action + lane: default + label: Fast Branch + next: merge + slow_branch: + type: action + lane: default + label: Slow Branch + next: merge + merge: + type: action + lane: default + label: Merge + next: done + done: + type: terminal + lane: default + label: Done + outcome: success +` + +// --------------------------------------------------------------------------- +// Branch Isolation Tests +// --------------------------------------------------------------------------- + +describe('Parallel branch isolation', () => { + it('branches get isolated state copies — writes do not leak between branches', async () => { + const engine = new FlowprintEngine() + + // branch_a writes x=1 + engine.register('branch_a', async (ctx: ExecutionContext) => { + expect(ctx.state).not.toHaveProperty('branch_b_ran') + return { x: 1, branch_a_ran: true } + }) + + // branch_b writes x=2 + engine.register('branch_b', async (ctx: ExecutionContext) => { + expect(ctx.state).not.toHaveProperty('branch_a_ran') + return { x: 2, branch_b_ran: true } + }) + + engine.register('merge', async () => ({ merged: true })) + + const flow = await engine.load(PARALLEL_FLOW) + const result = await flow.execute({}) + + expect(result.outcome).toBe('success') + // Both branches' states are namespaced in the output + expect(result.output).toHaveProperty('branch_a') + expect(result.output).toHaveProperty('branch_b') + }) + + it('branches do not affect parent state until merge completes', async () => { + const stateSnapshots: Record[] = [] + + const engine = new FlowprintEngine() + + engine.register('branch_a', async (ctx: ExecutionContext) => { + // Capture what branch_a sees — should be a clean copy + stateSnapshots.push(structuredClone(ctx.state)) + return { from_a: true } + }) + + engine.register('branch_b', async (ctx: ExecutionContext) => { + stateSnapshots.push(structuredClone(ctx.state)) + return { from_b: true } + }) + + engine.register('merge', async (ctx: ExecutionContext) => { + // At merge time, both branch results should be in state (namespaced) + stateSnapshots.push(structuredClone(ctx.state)) + return { merged: true } + }) + + const flow = await engine.load(PARALLEL_FLOW) + await flow.execute({}) + + // Branch snapshots (first two) should not contain each other's data + const branchASnap = stateSnapshots[0]! + const branchBSnap = stateSnapshots[1]! + + // Branches start from the same parent state (empty at root) + expect(branchASnap).not.toHaveProperty('from_b') + expect(branchBSnap).not.toHaveProperty('from_a') + + // Merge step should see namespaced branch results + const mergeSnap = stateSnapshots[2]! + expect(mergeSnap).toHaveProperty('branch_a') + expect(mergeSnap).toHaveProperty('branch_b') + }) +}) + +// --------------------------------------------------------------------------- +// Merge at Join Tests +// --------------------------------------------------------------------------- + +describe('Merge at join', () => { + it('results are namespaced by branch ID after join', async () => { + const engine = new FlowprintEngine() + + engine.register('branch_a', async () => ({ result_a: 'alpha' })) + engine.register('branch_b', async () => ({ result_b: 'beta' })) + engine.register('merge', async () => ({})) + + const flow = await engine.load(PARALLEL_FLOW) + const result = await flow.execute({}) + + expect(result.outcome).toBe('success') + + // Branch results are namespaced + const branchA = result.output.branch_a as Record + const branchB = result.output.branch_b as Record + expect(branchA).toMatchObject({ result_a: 'alpha' }) + expect(branchB).toMatchObject({ result_b: 'beta' }) + }) +}) + +// --------------------------------------------------------------------------- +// Strategy Tests +// --------------------------------------------------------------------------- + +describe('Parallel strategies', () => { + it('"all" strategy: both branches must complete', async () => { + const completionOrder: string[] = [] + const engine = new FlowprintEngine() + + engine.register('branch_a', async () => { + completionOrder.push('a') + return { a: true } + }) + + engine.register('branch_b', async () => { + completionOrder.push('b') + return { b: true } + }) + + engine.register('merge', async () => ({})) + + const flow = await engine.load(PARALLEL_FLOW) + const result = await flow.execute({}) + + expect(result.outcome).toBe('success') + expect(completionOrder).toContain('a') + expect(completionOrder).toContain('b') + expect(result.output).toHaveProperty('branch_a') + expect(result.output).toHaveProperty('branch_b') + }) + + it('"all" strategy with failure: one branch throws -> execution fails', async () => { + const engine = new FlowprintEngine() + + engine.register('branch_a', async () => ({ a: true })) + engine.register('branch_b', async () => { + throw new Error('Branch B failed') + }) + engine.register('merge', async () => ({})) + + const flow = await engine.load(PARALLEL_FLOW) + + await expect(flow.execute({})).rejects.toThrow('Branch B failed') + }) + + it('"first" strategy: both branches complete, results available', async () => { + const engine = new FlowprintEngine() + + engine.register('fast_branch', async () => { + return { speed: 'fast' } + }) + + engine.register('slow_branch', async () => { + // Simulate slower branch (still completes) + await new Promise((r) => setTimeout(r, 10)) + return { speed: 'slow' } + }) + + engine.register('merge', async () => ({})) + + const flow = await engine.load(RACE_FLOW) + const result = await flow.execute({}) + + expect(result.outcome).toBe('success') + // Both branch results should be available + expect(result.output).toHaveProperty('fast_branch') + expect(result.output).toHaveProperty('slow_branch') + }) +}) + +// --------------------------------------------------------------------------- +// Multi-Node Subgraph Tests +// --------------------------------------------------------------------------- + +describe('Multi-node branch subgraphs', () => { + it('branch with action -> switch -> action chain walks correctly', async () => { + const engine = new FlowprintEngine() + + // No explicit registrations needed — branches use expressions (handled natively) + const flow = await engine.load(MULTI_NODE_BRANCH_FLOW) + const result = await flow.execute({}) + + expect(result.outcome).toBe('success') + + // Branch subgraphs should have walked through multiple nodes + const branchA = result.output.action_a1 as Record + const branchB = result.output.action_b1 as Record + + expect(branchA).toBeDefined() + expect(branchB).toBeDefined() + + // Branch A walked through: action_a1 -> switch_a -> action_a2 + // Its state should contain both a1 and a2 outputs + expect(branchA).toHaveProperty('a1_value', 'step1') + expect(branchA).toHaveProperty('a2_value', 'step2') + + // Branch B walked through: action_b1 only + expect(branchB).toHaveProperty('b1_value', 'only_step') + }) +}) + +// --------------------------------------------------------------------------- +// Nested Parallel Rejection Tests +// --------------------------------------------------------------------------- + +describe('Nested parallel rejection', () => { + it('walkBranch throws when encountering a parallel node inside a branch', async () => { + const doc = makeDoc({ + outer_parallel: { + type: 'parallel', + lane: 'default', + label: 'Outer', + branches: ['inner_parallel'], + join: 'done', + }, + inner_parallel: { + type: 'parallel', + lane: 'default', + label: 'Inner', + branches: ['leaf'], + join: 'done', + }, + leaf: { + type: 'action', + lane: 'default', + label: 'Leaf', + entry_points: [], + next: 'done', + }, + done: { + type: 'terminal', + lane: 'default', + label: 'Done', + outcome: 'success', + }, + }) + + const ctx: ExecutionContext = { + input: {}, + state: {}, + node: { id: 'inner_parallel', type: 'parallel', lane: 'default' }, + signal: new AbortController().signal, + } + + const callbacks: WalkGraphCallbacks = { + onAction: vi.fn(async () => ({})), + onSwitch: vi.fn(async () => undefined), + onParallel: vi.fn(async () => ({})), + onWait: vi.fn(async () => ({})), + onError: vi.fn(async () => undefined), + onTrigger: vi.fn(async () => undefined), + onTerminal: vi.fn(async () => {}), + onStep: vi.fn(), + } + + await expect(walkBranch(doc, 'inner_parallel', 'done', ctx, callbacks)).rejects.toThrow( + 'Nested parallel node "inner_parallel" found inside a parallel branch', + ) + }) + + it('structural validation rejects nested parallel in branch subgraph', () => { + const doc = { + schema: 'flowprint/1.0', + name: 'nested-parallel', + version: '1.0.0', + lanes: { + default: { label: 'Default', visibility: 'internal', order: 0 }, + }, + nodes: { + start: { + type: 'trigger', + lane: 'default', + label: 'Start', + trigger_type: 'manual', + manual: {}, + next: 'outer', + }, + outer: { + type: 'parallel', + lane: 'default', + label: 'Outer', + branches: ['branch_a'], + join: 'done', + }, + branch_a: { + type: 'action', + lane: 'default', + label: 'Branch A', + expressions: { x: '1' }, + next: 'inner', + }, + inner: { + type: 'parallel', + lane: 'default', + label: 'Inner', + branches: ['leaf'], + join: 'done', + }, + leaf: { + type: 'action', + lane: 'default', + label: 'Leaf', + expressions: { y: '2' }, + next: 'done', + }, + done: { + type: 'terminal', + lane: 'default', + label: 'Done', + outcome: 'success', + }, + }, + } + + const result = validate(doc) + const nestedErrors = result.errors.filter((e) => e.message.includes('Nested parallel')) + expect(nestedErrors.length).toBeGreaterThan(0) + expect(nestedErrors[0]!.severity).toBe('error') + }) + + it('structural validation allows non-nested parallel nodes (siblings)', () => { + const doc = { + schema: 'flowprint/1.0', + name: 'sibling-parallel', + version: '1.0.0', + lanes: { + default: { label: 'Default', visibility: 'internal', order: 0 }, + }, + nodes: { + start: { + type: 'trigger', + lane: 'default', + label: 'Start', + trigger_type: 'manual', + manual: {}, + next: 'parallel_1', + }, + parallel_1: { + type: 'parallel', + lane: 'default', + label: 'First Parallel', + branches: ['branch_1a'], + join: 'mid', + }, + branch_1a: { + type: 'action', + lane: 'default', + label: 'Branch 1A', + expressions: { x: '1' }, + next: 'mid', + }, + mid: { + type: 'action', + lane: 'default', + label: 'Mid', + expressions: { mid: 'true' }, + next: 'parallel_2', + }, + parallel_2: { + type: 'parallel', + lane: 'default', + label: 'Second Parallel', + branches: ['branch_2a'], + join: 'done', + }, + branch_2a: { + type: 'action', + lane: 'default', + label: 'Branch 2A', + expressions: { y: '2' }, + next: 'done', + }, + done: { + type: 'terminal', + lane: 'default', + label: 'Done', + outcome: 'success', + }, + }, + } + + const result = validate(doc) + const nestedErrors = result.errors.filter((e) => e.message.includes('Nested parallel')) + expect(nestedErrors).toHaveLength(0) + }) +}) + +// --------------------------------------------------------------------------- +// Adapter executeParallel Tests +// --------------------------------------------------------------------------- + +describe('PlainAdapter.executeParallel', () => { + it('"all" strategy runs all branches concurrently', async () => { + const { PlainAdapter } = await import('../../adapters/plain.js') + const adapter = new PlainAdapter() + + const results = await adapter.executeParallel( + [async () => 'a', async () => 'b', async () => 'c'], + 'all', + ) + + expect(results).toEqual(['a', 'b', 'c']) + }) + + it('"all" strategy propagates first error', async () => { + const { PlainAdapter } = await import('../../adapters/plain.js') + const adapter = new PlainAdapter() + + await expect( + adapter.executeParallel( + [ + async () => 'ok', + async () => { + throw new Error('boom') + }, + ], + 'all', + ), + ).rejects.toThrow('boom') + }) + + it('"first" strategy runs all branches to completion', async () => { + const { PlainAdapter } = await import('../../adapters/plain.js') + const adapter = new PlainAdapter() + + const completionOrder: string[] = [] + + const results = await adapter.executeParallel( + [ + async () => { + completionOrder.push('fast') + return 'fast_result' + }, + async () => { + await new Promise((r) => setTimeout(r, 10)) + completionOrder.push('slow') + return 'slow_result' + }, + ], + 'first', + ) + + // Both complete + expect(results).toEqual(['fast_result', 'slow_result']) + expect(completionOrder).toContain('fast') + expect(completionOrder).toContain('slow') + }) +})