From bdabd075502b1c40f5fc5fa79f66404a37587aeb Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Tue, 6 Jan 2026 13:52:51 +0100 Subject: [PATCH] add StepMeta type structure for skippable deps tracking --- .../skipped_deps_excluded_from_input.test.sql | 47 +- .../runtime/condition-options.test.ts | 60 +-- .../runtime/when-failed-options.test.ts | 72 +-- .../types/condition-pattern.test-d.ts | 60 +-- pkgs/dsl/__tests__/types/map-method.test-d.ts | 38 +- .../types/map-return-type-inference.test-d.ts | 22 +- .../__tests__/types/skippable-deps.test-d.ts | 440 ++++++++++++++++ pkgs/dsl/src/compile-flow.ts | 12 +- pkgs/dsl/src/dsl.ts | 489 ++++++++++++++---- 9 files changed, 999 insertions(+), 241 deletions(-) create mode 100644 pkgs/dsl/__tests__/types/skippable-deps.test-d.ts diff --git a/pkgs/core/supabase/tests/condition_evaluation/skipped_deps_excluded_from_input.test.sql b/pkgs/core/supabase/tests/condition_evaluation/skipped_deps_excluded_from_input.test.sql index 2547c6712..8d3380d4f 100644 --- a/pkgs/core/supabase/tests/condition_evaluation/skipped_deps_excluded_from_input.test.sql +++ b/pkgs/core/supabase/tests/condition_evaluation/skipped_deps_excluded_from_input.test.sql @@ -76,7 +76,7 @@ select pgflow.complete_task( '{"data": "from_b"}'::jsonb ); --- Test 3: step_c remaining_deps should be 0 (both deps resolved - a skipped, b completed) +-- Test 3: Verify step_c remaining_deps is 0 (ready to start) select is( (select remaining_deps from pgflow.step_states where run_id = (select run_id from run_ids) and step_slug = 'step_c'), @@ -84,24 +84,55 @@ select is( 'step_c remaining_deps should be 0 (a skipped + b completed)' ); --- Test 4: step_c should now be started +-- Now read and start step_c - this replicates what read_and_start does +-- and allows us to inspect the returned input value +-- +-- We need to do this in steps: +-- 1. Read the message from the queue +-- 2. Start the task with start_tasks +-- 3. Inspect the input returned by start_tasks + +-- Read the message and store msg_id +with read_msg as ( + select * from pgmq.read_with_poll('skip_diamond', 1, 1, 1, 50) + limit 1 +), +msg_ids as ( + select array_agg(msg_id) as ids from read_msg +), +-- Start the task and get the input +start_result as ( + select st.input, st.step_slug, st.run_id + from pgflow.start_tasks( + 'skip_diamond', + (select ids from msg_ids), + pgflow_tests.ensure_worker('skip_diamond') + ) st +) +-- Store the input for later testing +select input, step_slug, run_id into temporary step_c_inputs +from start_result +where step_slug = 'step_c'; + +-- Test 4: Verify step_c was started select is( (select status from pgflow.step_states where run_id = (select run_id from run_ids) and step_slug = 'step_c'), 'started', - 'step_c should be started after step_b completes' + 'step_c should be started after read_and_start' ); --- Test 5: step_b output should be in step_states +-- Test 5: Verify the input does NOT contain step_a key +-- The handler input should only have step_b, NOT step_a select is( - (select output from pgflow.step_states - where run_id = (select run_id from run_ids) and step_slug = 'step_b'), - '{"data": "from_b"}'::jsonb, - 'step_b output should be stored' + (select input from step_c_inputs), + '{"step_b": {"data": "from_b"}}'::jsonb, + 'step_c input should only contain step_b, not step_a (skipped deps are excluded)' ); -- Clean up drop table if exists run_ids; +drop table if exists step_c_inputs; select finish(); rollback; diff --git a/pkgs/dsl/__tests__/runtime/condition-options.test.ts b/pkgs/dsl/__tests__/runtime/condition-options.test.ts index e630444e8..4c52dfda7 100644 --- a/pkgs/dsl/__tests__/runtime/condition-options.test.ts +++ b/pkgs/dsl/__tests__/runtime/condition-options.test.ts @@ -3,61 +3,61 @@ import { Flow } from '../../src/dsl.js'; import { compileFlow } from '../../src/compile-flow.js'; describe('Condition Options', () => { - describe('DSL accepts condition and whenUnmet', () => { - it('should accept condition option on a step', () => { + describe('DSL accepts if and else', () => { + it('should accept if option on a step', () => { const flow = new Flow({ slug: 'test_flow' }) .step( - { slug: 'conditional_step', condition: { enabled: true } }, + { slug: 'conditional_step', if: { enabled: true } }, () => 'result' ); const step = flow.getStepDefinition('conditional_step'); - expect(step.options.condition).toEqual({ enabled: true }); + expect(step.options.if).toEqual({ enabled: true }); }); - it('should accept whenUnmet option on a step', () => { + it('should accept else option on a step', () => { const flow = new Flow({ slug: 'test_flow' }) .step( - { slug: 'conditional_step', whenUnmet: 'skip' }, + { slug: 'conditional_step', else: 'skip' }, () => 'result' ); const step = flow.getStepDefinition('conditional_step'); - expect(step.options.whenUnmet).toBe('skip'); + expect(step.options.else).toBe('skip'); }); - it('should accept both condition and whenUnmet together', () => { + it('should accept both if and else together', () => { const flow = new Flow({ slug: 'test_flow' }) .step( { slug: 'conditional_step', - condition: { status: 'active' }, - whenUnmet: 'skip-cascade', + if: { status: 'active' }, + else: 'skip-cascade', }, () => 'result' ); const step = flow.getStepDefinition('conditional_step'); - expect(step.options.condition).toEqual({ status: 'active' }); - expect(step.options.whenUnmet).toBe('skip-cascade'); + expect(step.options.if).toEqual({ status: 'active' }); + expect(step.options.else).toBe('skip-cascade'); }); - it('should accept condition on dependent steps', () => { + it('should accept if on dependent steps', () => { const flow = new Flow({ slug: 'test_flow' }) .step({ slug: 'first' }, () => ({ success: true })) .step( { slug: 'conditional_step', dependsOn: ['first'], - condition: { first: { success: true } }, - whenUnmet: 'skip', + if: { first: { success: true } }, + else: 'skip', }, () => 'result' ); const step = flow.getStepDefinition('conditional_step'); - expect(step.options.condition).toEqual({ first: { success: true } }); - expect(step.options.whenUnmet).toBe('skip'); + expect(step.options.if).toEqual({ first: { success: true } }); + expect(step.options.else).toBe('skip'); }); }); @@ -65,7 +65,7 @@ describe('Condition Options', () => { it('should compile condition_pattern for root step', () => { const flow = new Flow({ slug: 'test_flow' }) .step( - { slug: 'step1', condition: { enabled: true } }, + { slug: 'step1', if: { enabled: true } }, () => 'result' ); @@ -78,7 +78,7 @@ describe('Condition Options', () => { it('should compile when_unmet for step', () => { const flow = new Flow({ slug: 'test_flow' }) .step( - { slug: 'step1', whenUnmet: 'fail' }, + { slug: 'step1', else: 'fail' }, () => 'result' ); @@ -93,8 +93,8 @@ describe('Condition Options', () => { .step( { slug: 'step1', - condition: { active: true, type: 'premium' }, - whenUnmet: 'skip-cascade', + if: { active: true, type: 'premium' }, + else: 'skip-cascade', }, () => 'result' ); @@ -113,8 +113,8 @@ describe('Condition Options', () => { slug: 'step1', maxAttempts: 3, timeout: 60, - condition: { enabled: true }, - whenUnmet: 'skip', + if: { enabled: true }, + else: 'skip', }, () => 'result' ); @@ -135,8 +135,8 @@ describe('Condition Options', () => { { slug: 'second', dependsOn: ['first'], - condition: { first: { success: true } }, - whenUnmet: 'skip', + if: { first: { success: true } }, + else: 'skip', }, () => 'result' ); @@ -150,26 +150,26 @@ describe('Condition Options', () => { }); }); - describe('whenUnmet validation', () => { - it('should only accept valid whenUnmet values', () => { + describe('else validation', () => { + it('should only accept valid else values', () => { // Valid values should work expect(() => new Flow({ slug: 'test' }).step( - { slug: 's1', whenUnmet: 'fail' }, + { slug: 's1', else: 'fail' }, () => 1 ) ).not.toThrow(); expect(() => new Flow({ slug: 'test' }).step( - { slug: 's1', whenUnmet: 'skip' }, + { slug: 's1', else: 'skip' }, () => 1 ) ).not.toThrow(); expect(() => new Flow({ slug: 'test' }).step( - { slug: 's1', whenUnmet: 'skip-cascade' }, + { slug: 's1', else: 'skip-cascade' }, () => 1 ) ).not.toThrow(); diff --git a/pkgs/dsl/__tests__/runtime/when-failed-options.test.ts b/pkgs/dsl/__tests__/runtime/when-failed-options.test.ts index b1b77044f..1691f7e93 100644 --- a/pkgs/dsl/__tests__/runtime/when-failed-options.test.ts +++ b/pkgs/dsl/__tests__/runtime/when-failed-options.test.ts @@ -2,65 +2,65 @@ import { describe, it, expect } from 'vitest'; import { Flow } from '../../src/dsl.js'; import { compileFlow } from '../../src/compile-flow.js'; -describe('whenFailed Options', () => { - describe('DSL accepts whenFailed option', () => { - it('should accept whenFailed option on a step', () => { +describe('retriesExhausted Options', () => { + describe('DSL accepts retriesExhausted option', () => { + it('should accept retriesExhausted option on a step', () => { const flow = new Flow({ slug: 'test_flow' }) .step( - { slug: 'step1', whenFailed: 'skip' }, + { slug: 'step1', retriesExhausted: 'skip' }, () => 'result' ); const step = flow.getStepDefinition('step1'); - expect(step.options.whenFailed).toBe('skip'); + expect(step.options.retriesExhausted).toBe('skip'); }); - it('should accept whenFailed: fail (default behavior)', () => { + it('should accept retriesExhausted: fail (default behavior)', () => { const flow = new Flow({ slug: 'test_flow' }) .step( - { slug: 'step1', whenFailed: 'fail' }, + { slug: 'step1', retriesExhausted: 'fail' }, () => 'result' ); const step = flow.getStepDefinition('step1'); - expect(step.options.whenFailed).toBe('fail'); + expect(step.options.retriesExhausted).toBe('fail'); }); - it('should accept whenFailed: skip-cascade', () => { + it('should accept retriesExhausted: skip-cascade', () => { const flow = new Flow({ slug: 'test_flow' }) .step( - { slug: 'step1', whenFailed: 'skip-cascade' }, + { slug: 'step1', retriesExhausted: 'skip-cascade' }, () => 'result' ); const step = flow.getStepDefinition('step1'); - expect(step.options.whenFailed).toBe('skip-cascade'); + expect(step.options.retriesExhausted).toBe('skip-cascade'); }); - it('should accept whenFailed on dependent steps', () => { + it('should accept retriesExhausted on dependent steps', () => { const flow = new Flow({ slug: 'test_flow' }) .step({ slug: 'first' }, () => ({ data: 'test' })) .step( { slug: 'second', dependsOn: ['first'], - whenFailed: 'skip', + retriesExhausted: 'skip', }, () => 'result' ); const step = flow.getStepDefinition('second'); - expect(step.options.whenFailed).toBe('skip'); + expect(step.options.retriesExhausted).toBe('skip'); }); - it('should accept whenFailed together with other options', () => { + it('should accept retriesExhausted together with other options', () => { const flow = new Flow({ slug: 'test_flow' }) .step( { slug: 'step1', maxAttempts: 3, timeout: 60, - whenFailed: 'skip-cascade', + retriesExhausted: 'skip-cascade', }, () => 'result' ); @@ -68,25 +68,25 @@ describe('whenFailed Options', () => { const step = flow.getStepDefinition('step1'); expect(step.options.maxAttempts).toBe(3); expect(step.options.timeout).toBe(60); - expect(step.options.whenFailed).toBe('skip-cascade'); + expect(step.options.retriesExhausted).toBe('skip-cascade'); }); - it('should accept both whenUnmet and whenFailed together', () => { + it('should accept both else and retriesExhausted together', () => { const flow = new Flow({ slug: 'test_flow' }) .step( { slug: 'step1', - condition: { enabled: true }, - whenUnmet: 'skip', - whenFailed: 'skip-cascade', + if: { enabled: true }, + else: 'skip', + retriesExhausted: 'skip-cascade', }, () => 'result' ); const step = flow.getStepDefinition('step1'); - expect(step.options.condition).toEqual({ enabled: true }); - expect(step.options.whenUnmet).toBe('skip'); - expect(step.options.whenFailed).toBe('skip-cascade'); + expect(step.options.if).toEqual({ enabled: true }); + expect(step.options.else).toBe('skip'); + expect(step.options.retriesExhausted).toBe('skip-cascade'); }); }); @@ -94,7 +94,7 @@ describe('whenFailed Options', () => { it('should compile when_failed for step', () => { const flow = new Flow({ slug: 'test_flow' }) .step( - { slug: 'step1', whenFailed: 'skip' }, + { slug: 'step1', retriesExhausted: 'skip' }, () => 'result' ); @@ -107,7 +107,7 @@ describe('whenFailed Options', () => { it('should compile when_failed: fail', () => { const flow = new Flow({ slug: 'test_flow' }) .step( - { slug: 'step1', whenFailed: 'fail' }, + { slug: 'step1', retriesExhausted: 'fail' }, () => 'result' ); @@ -120,7 +120,7 @@ describe('whenFailed Options', () => { it('should compile when_failed: skip-cascade', () => { const flow = new Flow({ slug: 'test_flow' }) .step( - { slug: 'step1', whenFailed: 'skip-cascade' }, + { slug: 'step1', retriesExhausted: 'skip-cascade' }, () => 'result' ); @@ -130,16 +130,16 @@ describe('whenFailed Options', () => { expect(statements[1]).toContain("when_failed => 'skip-cascade'"); }); - it('should compile step with all options including whenFailed', () => { + it('should compile step with all options including retriesExhausted', () => { const flow = new Flow({ slug: 'test_flow' }) .step( { slug: 'step1', maxAttempts: 3, timeout: 60, - condition: { enabled: true }, - whenUnmet: 'skip', - whenFailed: 'skip-cascade', + if: { enabled: true }, + else: 'skip', + retriesExhausted: 'skip-cascade', }, () => 'result' ); @@ -168,22 +168,22 @@ describe('whenFailed Options', () => { }); }); - describe('whenFailed on map steps', () => { - it('should accept whenFailed on map step', () => { + describe('retriesExhausted on map steps', () => { + it('should accept retriesExhausted on map step', () => { const flow = new Flow({ slug: 'test_flow' }) .map( - { slug: 'map_step', whenFailed: 'skip' }, + { slug: 'map_step', retriesExhausted: 'skip' }, (item) => item.toUpperCase() ); const step = flow.getStepDefinition('map_step'); - expect(step.options.whenFailed).toBe('skip'); + expect(step.options.retriesExhausted).toBe('skip'); }); it('should compile when_failed for map step', () => { const flow = new Flow({ slug: 'test_flow' }) .map( - { slug: 'map_step', whenFailed: 'skip-cascade' }, + { slug: 'map_step', retriesExhausted: 'skip-cascade' }, (item) => item.toUpperCase() ); diff --git a/pkgs/dsl/__tests__/types/condition-pattern.test-d.ts b/pkgs/dsl/__tests__/types/condition-pattern.test-d.ts index e61f7a130..684286dba 100644 --- a/pkgs/dsl/__tests__/types/condition-pattern.test-d.ts +++ b/pkgs/dsl/__tests__/types/condition-pattern.test-d.ts @@ -133,48 +133,48 @@ describe('ContainmentPattern utility type', () => { }); }); -describe('condition option typing in step methods', () => { - describe('root step condition', () => { - it('should type condition as ContainmentPattern', () => { +describe('if option typing in step methods', () => { + describe('root step if', () => { + it('should type if as ContainmentPattern', () => { type FlowInput = { userId: string; role: string }; // This should compile - valid partial pattern const flow = new Flow({ slug: 'test_flow' }).step( - { slug: 'check', condition: { role: 'admin' } }, + { slug: 'check', if: { role: 'admin' } }, (input) => input.userId ); expectTypeOf(flow).toBeObject(); }); - it('should reject invalid keys in condition', () => { + it('should reject invalid keys in if', () => { type FlowInput = { userId: string; role: string }; // @ts-expect-error - 'invalidKey' does not exist on FlowInput new Flow({ slug: 'test_flow' }).step( - { slug: 'check', condition: { invalidKey: 'value' } }, + { slug: 'check', if: { invalidKey: 'value' } }, // eslint-disable-next-line @typescript-eslint/no-explicit-any (input: any) => input.userId ); }); - it('should reject wrong value types in condition', () => { + it('should reject wrong value types in if', () => { type FlowInput = { userId: string; role: string }; // @ts-expect-error - role should be string, not number new Flow({ slug: 'test_flow' }).step( - { slug: 'check', condition: { role: 123 } }, + { slug: 'check', if: { role: 123 } }, // eslint-disable-next-line @typescript-eslint/no-explicit-any (input: any) => input.userId ); }); - it('should allow empty object condition (always matches)', () => { + it('should allow empty object if (always matches)', () => { type FlowInput = { userId: string; role: string }; // Empty object should be valid const flow = new Flow({ slug: 'test_flow' }).step( - { slug: 'check', condition: {} }, + { slug: 'check', if: {} }, (input) => input.userId ); @@ -185,7 +185,7 @@ describe('condition option typing in step methods', () => { type FlowInput = { user: { name: string; role: string } }; const flow = new Flow({ slug: 'test_flow' }).step( - { slug: 'check', condition: { user: { role: 'admin' } } }, + { slug: 'check', if: { user: { role: 'admin' } } }, (input) => input.user.name ); @@ -193,15 +193,15 @@ describe('condition option typing in step methods', () => { }); }); - describe('dependent step condition', () => { - it('should type condition as ContainmentPattern', () => { + describe('dependent step if', () => { + it('should type if as ContainmentPattern', () => { const flow = new Flow<{ initial: string }>({ slug: 'test_flow' }) .step({ slug: 'fetch' }, () => ({ status: 'ok', data: 'result' })) .step( { slug: 'process', dependsOn: ['fetch'], - condition: { fetch: { status: 'ok' } }, + if: { fetch: { status: 'ok' } }, }, (deps) => deps.fetch.data ); @@ -209,7 +209,7 @@ describe('condition option typing in step methods', () => { expectTypeOf(flow).toBeObject(); }); - it('should reject invalid dep slug in condition', () => { + it('should reject invalid dep slug in if', () => { new Flow<{ initial: string }>({ slug: 'test_flow' }) .step({ slug: 'fetch' }, () => ({ status: 'ok' })) .step( @@ -217,7 +217,7 @@ describe('condition option typing in step methods', () => { slug: 'process', dependsOn: ['fetch'], // @ts-expect-error - 'nonexistent' is not a dependency - condition: { nonexistent: { status: 'ok' } }, + if: { nonexistent: { status: 'ok' } }, }, // eslint-disable-next-line @typescript-eslint/no-explicit-any (deps: any) => deps.fetch.status @@ -232,14 +232,14 @@ describe('condition option typing in step methods', () => { slug: 'process', dependsOn: ['fetch'], // @ts-expect-error - 'invalidField' does not exist on fetch output - condition: { fetch: { invalidField: 'value' } }, + if: { fetch: { invalidField: 'value' } }, }, // eslint-disable-next-line @typescript-eslint/no-explicit-any (deps: any) => deps.fetch.status ); }); - it('should handle multiple dependencies in condition', () => { + it('should handle multiple dependencies in if', () => { const flow = new Flow<{ initial: string }>({ slug: 'test_flow' }) .step({ slug: 'step1' }, () => ({ ready: true })) .step({ slug: 'step2' }, () => ({ valid: true })) @@ -247,7 +247,7 @@ describe('condition option typing in step methods', () => { { slug: 'final', dependsOn: ['step1', 'step2'], - condition: { step1: { ready: true }, step2: { valid: true } }, + if: { step1: { ready: true }, step2: { valid: true } }, }, (deps) => deps.step1.ready && deps.step2.valid ); @@ -256,26 +256,26 @@ describe('condition option typing in step methods', () => { }); }); - describe('array step condition', () => { - it('should type condition for root array step', () => { + describe('array step if', () => { + it('should type if for root array step', () => { type FlowInput = { items: string[]; enabled: boolean }; const flow = new Flow({ slug: 'test_flow' }).array( - { slug: 'getItems', condition: { enabled: true } }, + { slug: 'getItems', if: { enabled: true } }, (input) => input.items ); expectTypeOf(flow).toBeObject(); }); - it('should type condition for dependent array step', () => { + it('should type if for dependent array step', () => { const flow = new Flow<{ initial: string }>({ slug: 'test_flow' }) .step({ slug: 'fetch' }, () => ({ ready: true, items: ['a', 'b'] })) .array( { slug: 'process', dependsOn: ['fetch'], - condition: { fetch: { ready: true } }, + if: { fetch: { ready: true } }, }, (deps) => deps.fetch.items ); @@ -284,20 +284,20 @@ describe('condition option typing in step methods', () => { }); }); - describe('map step condition', () => { - it('should type condition for root map step', () => { + describe('map step if', () => { + it('should type if for root map step', () => { type FlowInput = { type: string; value: number }[]; const flow = new Flow({ slug: 'test_flow' }).map( - // Root map condition checks the array itself - { slug: 'process', condition: [{ type: 'active' }] }, + // Root map if checks the array itself + { slug: 'process', if: [{ type: 'active' }] }, (item) => item.value * 2 ); expectTypeOf(flow).toBeObject(); }); - it('should type condition for dependent map step', () => { + it('should type if for dependent map step', () => { const flow = new Flow<{ initial: string }>({ slug: 'test_flow' }) .step({ slug: 'fetch' }, () => [ { id: 1, active: true }, @@ -308,7 +308,7 @@ describe('condition option typing in step methods', () => { slug: 'process', array: 'fetch', // Condition checks the array dep - condition: { fetch: [{ active: true }] }, + if: { fetch: [{ active: true }] }, }, (item) => item.id ); diff --git a/pkgs/dsl/__tests__/types/map-method.test-d.ts b/pkgs/dsl/__tests__/types/map-method.test-d.ts index cb52f264a..960ef03ef 100644 --- a/pkgs/dsl/__tests__/types/map-method.test-d.ts +++ b/pkgs/dsl/__tests__/types/map-method.test-d.ts @@ -1,4 +1,4 @@ -import { Flow, type Json, type StepInput, type ExtractFlowContext } from '../../src/index.js'; +import { Flow, type Json, type StepInput, type ExtractFlowContext, type ExtractFlowSteps } from '../../src/index.js'; import { describe, it, expectTypeOf } from 'vitest'; describe('.map() method type constraints', () => { @@ -11,9 +11,7 @@ describe('.map() method type constraints', () => { }); // The map step should return an array of the handler return type - type ProcessOutput = typeof flow extends Flow - ? Steps['process'] - : never; + type ProcessOutput = ExtractFlowSteps['process']; expectTypeOf().toEqualTypeOf<{ processed: string }[]>(); }); @@ -34,9 +32,7 @@ describe('.map() method type constraints', () => { return item.length; }); - type FlattenOutput = typeof flow extends Flow - ? Steps['flatten'] - : never; + type FlattenOutput = ExtractFlowSteps['flatten']; expectTypeOf().toEqualTypeOf(); }); @@ -47,9 +43,7 @@ describe('.map() method type constraints', () => { return String(item); }); - type StringifyOutput = typeof flow extends Flow - ? Steps['stringify'] - : never; + type StringifyOutput = ExtractFlowSteps['stringify']; expectTypeOf().toEqualTypeOf(); }); }); @@ -65,9 +59,7 @@ describe('.map() method type constraints', () => { return item * 2; }); - type DoubleOutput = typeof flow extends Flow - ? Steps['double'] - : never; + type DoubleOutput = ExtractFlowSteps['double']; expectTypeOf().toEqualTypeOf(); }); @@ -105,9 +97,7 @@ describe('.map() method type constraints', () => { return user.name; }); - type NamesOutput = typeof flow extends Flow - ? Steps['extractNames'] - : never; + type NamesOutput = ExtractFlowSteps['extractNames']; expectTypeOf().toEqualTypeOf(); }); }); @@ -149,9 +139,7 @@ describe('.map() method type constraints', () => { return item.length; }); - type LengthsOutput = typeof flow extends Flow - ? Steps['lengths'] - : never; + type LengthsOutput = ExtractFlowSteps['lengths']; expectTypeOf().toEqualTypeOf(); }); @@ -163,9 +151,7 @@ describe('.map() method type constraints', () => { return deps.double.reduce((a, b) => a + b, 0); }); - type SumOutput = typeof flow extends Flow - ? Steps['sum'] - : never; + type SumOutput = ExtractFlowSteps['sum']; expectTypeOf().toEqualTypeOf(); }); }); @@ -239,9 +225,7 @@ describe('.map() method type constraints', () => { return String(item); }); - type StringifyOutput = typeof flow extends Flow - ? Steps['stringify'] - : never; + type StringifyOutput = ExtractFlowSteps['stringify']; expectTypeOf().toEqualTypeOf(); }); @@ -252,9 +236,7 @@ describe('.map() method type constraints', () => { return item !== null; }); - type FilterOutput = typeof flow extends Flow - ? Steps['filter'] - : never; + type FilterOutput = ExtractFlowSteps['filter']; expectTypeOf().toEqualTypeOf(); }); }); diff --git a/pkgs/dsl/__tests__/types/map-return-type-inference.test-d.ts b/pkgs/dsl/__tests__/types/map-return-type-inference.test-d.ts index 907be82c5..a266fe01c 100644 --- a/pkgs/dsl/__tests__/types/map-return-type-inference.test-d.ts +++ b/pkgs/dsl/__tests__/types/map-return-type-inference.test-d.ts @@ -1,4 +1,4 @@ -import { Flow } from '../../src/index.js'; +import { Flow, type ExtractFlowSteps } from '../../src/index.js'; import { describe, it, expectTypeOf } from 'vitest'; describe('map step return type inference bug', () => { @@ -38,9 +38,7 @@ describe('map step return type inference bug', () => { ); // Verify the map step output type is not any[] - type ProcessChunksOutput = typeof flow extends Flow - ? Steps['processChunks'] - : never; + type ProcessChunksOutput = ExtractFlowSteps['processChunks']; expectTypeOf().not.toEqualTypeOf(); }); @@ -73,9 +71,7 @@ describe('map step return type inference bug', () => { return { ok: true }; }); - type TransformOutput = typeof flow extends Flow - ? Steps['transform'] - : never; + type TransformOutput = ExtractFlowSteps['transform']; expectTypeOf().toEqualTypeOf(); expectTypeOf().not.toEqualTypeOf(); @@ -100,9 +96,7 @@ describe('map step return type inference bug', () => { return { done: true }; }); - type ProcessOutput = typeof flow extends Flow - ? Steps['process'] - : never; + type ProcessOutput = ExtractFlowSteps['process']; expectTypeOf().not.toEqualTypeOf(); }); @@ -127,9 +121,7 @@ describe('map step return type inference bug', () => { return { ok: true }; }); - type TransformOutput = typeof flow extends Flow - ? Steps['transform'] - : never; + type TransformOutput = ExtractFlowSteps['transform']; expectTypeOf().toEqualTypeOf<{ value: string; length: number }[]>(); expectTypeOf().not.toEqualTypeOf(); @@ -155,9 +147,7 @@ describe('map step return type inference bug', () => { return { count: deps.uppercase.length }; }); - type UppercaseOutput = typeof flow extends Flow - ? Steps['uppercase'] - : never; + type UppercaseOutput = ExtractFlowSteps['uppercase']; expectTypeOf().toEqualTypeOf<{ original: string; transformed: string }[]>(); expectTypeOf().not.toEqualTypeOf(); diff --git a/pkgs/dsl/__tests__/types/skippable-deps.test-d.ts b/pkgs/dsl/__tests__/types/skippable-deps.test-d.ts new file mode 100644 index 000000000..fc7d66330 --- /dev/null +++ b/pkgs/dsl/__tests__/types/skippable-deps.test-d.ts @@ -0,0 +1,440 @@ +import { Flow, type StepInput, type StepOutput } from '../../src/index.js'; +import { describe, it, expectTypeOf } from 'vitest'; + +/** + * Type tests for skippable step dependencies + * + * When a step has `else: 'skip' | 'skip-cascade'` or `retriesExhausted: 'skip' | 'skip-cascade'`, + * it may not execute. Dependent steps should receive that step's output as an optional key. + */ + +describe('skippable deps type safety', () => { + describe('core skippability - else', () => { + it('step with else: skip makes output optional for dependents', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step( + { slug: 'conditional', if: { value: 42 }, else: 'skip' }, + (input) => ({ result: input.value * 2 }) + ) + .step({ slug: 'dependent', dependsOn: ['conditional'] }, (deps) => { + // conditional should be optional - can't access without null check + expectTypeOf(deps.conditional).toEqualTypeOf< + { result: number } | undefined + >(); + return { done: true }; + }); + + type DepInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + conditional?: { result: number }; + }>(); + }); + + it('step with else: skip-cascade makes output optional for dependents', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step( + { slug: 'conditional', if: { value: 42 }, else: 'skip-cascade' }, + (input) => ({ result: input.value * 2 }) + ) + .step({ slug: 'dependent', dependsOn: ['conditional'] }, (deps) => { + expectTypeOf(deps.conditional).toEqualTypeOf< + { result: number } | undefined + >(); + return { done: true }; + }); + + type DepInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + conditional?: { result: number }; + }>(); + }); + + it('step with else: fail keeps output required (default behavior)', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step( + { slug: 'conditional', if: { value: 42 }, else: 'fail' }, + (input) => ({ result: input.value * 2 }) + ) + .step({ slug: 'dependent', dependsOn: ['conditional'] }, (deps) => { + // else: 'fail' means step either runs or flow fails - output is guaranteed + expectTypeOf(deps.conditional).toEqualTypeOf<{ result: number }>(); + return { done: true }; + }); + + type DepInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + conditional: { result: number }; + }>(); + }); + + it('step without else keeps output required', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'normal' }, (input) => ({ result: input.value * 2 })) + .step({ slug: 'dependent', dependsOn: ['normal'] }, (deps) => { + expectTypeOf(deps.normal).toEqualTypeOf<{ result: number }>(); + return { done: true }; + }); + + type DepInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + normal: { result: number }; + }>(); + }); + }); + + describe('core skippability - retriesExhausted', () => { + it('step with retriesExhausted: skip makes output optional for dependents', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'risky', retriesExhausted: 'skip' }, (input) => ({ + result: input.value * 2, + })) + .step({ slug: 'dependent', dependsOn: ['risky'] }, (deps) => { + expectTypeOf(deps.risky).toEqualTypeOf< + { result: number } | undefined + >(); + return { done: true }; + }); + + type DepInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + risky?: { result: number }; + }>(); + }); + + it('step with retriesExhausted: skip-cascade makes output optional for dependents', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'risky', retriesExhausted: 'skip-cascade' }, (input) => ({ + result: input.value * 2, + })) + .step({ slug: 'dependent', dependsOn: ['risky'] }, (deps) => { + expectTypeOf(deps.risky).toEqualTypeOf< + { result: number } | undefined + >(); + return { done: true }; + }); + + type DepInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + risky?: { result: number }; + }>(); + }); + + it('step with retriesExhausted: fail keeps output required', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'risky', retriesExhausted: 'fail' }, (input) => ({ + result: input.value * 2, + })) + .step({ slug: 'dependent', dependsOn: ['risky'] }, (deps) => { + expectTypeOf(deps.risky).toEqualTypeOf<{ result: number }>(); + return { done: true }; + }); + + type DepInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + risky: { result: number }; + }>(); + }); + }); + + describe('multiple dependencies - mixed skippability', () => { + it('mixed deps: some optional, some required', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'skippable', if: { value: 42 }, else: 'skip' }, () => ({ + a: 1, + })) + .step({ slug: 'required' }, () => ({ b: 2 })) + .step( + { slug: 'dependent', dependsOn: ['skippable', 'required'] }, + (deps) => { + expectTypeOf(deps.skippable).toEqualTypeOf< + { a: number } | undefined + >(); + expectTypeOf(deps.required).toEqualTypeOf<{ b: number }>(); + return { done: true }; + } + ); + + type DepInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + skippable?: { a: number }; + required: { b: number }; + }>(); + }); + + it('all deps skippable: all optional', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'skip1', retriesExhausted: 'skip' }, () => ({ a: 1 })) + .step({ slug: 'skip2', retriesExhausted: 'skip' }, () => ({ b: 2 })) + .step({ slug: 'dependent', dependsOn: ['skip1', 'skip2'] }, (deps) => { + expectTypeOf(deps.skip1).toEqualTypeOf<{ a: number } | undefined>(); + expectTypeOf(deps.skip2).toEqualTypeOf<{ b: number } | undefined>(); + return { done: true }; + }); + + type DepInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + skip1?: { a: number }; + skip2?: { b: number }; + }>(); + }); + + it('all deps required: none optional', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'req1' }, () => ({ a: 1 })) + .step({ slug: 'req2' }, () => ({ b: 2 })) + .step({ slug: 'dependent', dependsOn: ['req1', 'req2'] }, (deps) => { + expectTypeOf(deps.req1).toEqualTypeOf<{ a: number }>(); + expectTypeOf(deps.req2).toEqualTypeOf<{ b: number }>(); + return { done: true }; + }); + + type DepInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + req1: { a: number }; + req2: { b: number }; + }>(); + }); + }); + + describe('chains and graphs', () => { + it('chain A(skip) -> B -> C: A optional in B, B required in C', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'a', retriesExhausted: 'skip' }, () => ({ aVal: 1 })) + .step({ slug: 'b', dependsOn: ['a'] }, (deps) => { + expectTypeOf(deps.a).toEqualTypeOf<{ aVal: number } | undefined>(); + return { bVal: 2 }; + }) + .step({ slug: 'c', dependsOn: ['b'] }, (deps) => { + // B is not skippable, so B's output is required + expectTypeOf(deps.b).toEqualTypeOf<{ bVal: number }>(); + return { cVal: 3 }; + }); + + type BInput = StepInput; + expectTypeOf().toEqualTypeOf<{ a?: { aVal: number } }>(); + + type CInput = StepInput; + expectTypeOf().toEqualTypeOf<{ b: { bVal: number } }>(); + }); + + it('diamond: A(skip) -> B, A -> C, B+C -> D: A optional in B and C', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'a', retriesExhausted: 'skip' }, () => ({ aVal: 1 })) + .step({ slug: 'b', dependsOn: ['a'] }, (deps) => { + expectTypeOf(deps.a).toEqualTypeOf<{ aVal: number } | undefined>(); + return { bVal: 2 }; + }) + .step({ slug: 'c', dependsOn: ['a'] }, (deps) => { + expectTypeOf(deps.a).toEqualTypeOf<{ aVal: number } | undefined>(); + return { cVal: 3 }; + }) + .step({ slug: 'd', dependsOn: ['b', 'c'] }, (deps) => { + // B and C are not skippable themselves + expectTypeOf(deps.b).toEqualTypeOf<{ bVal: number }>(); + expectTypeOf(deps.c).toEqualTypeOf<{ cVal: number }>(); + return { dVal: 4 }; + }); + + type BInput = StepInput; + expectTypeOf().toEqualTypeOf<{ a?: { aVal: number } }>(); + + type CInput = StepInput; + expectTypeOf().toEqualTypeOf<{ a?: { aVal: number } }>(); + + type DInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + b: { bVal: number }; + c: { cVal: number }; + }>(); + }); + + it('cascade does NOT propagate: A(skip-cascade) -> B: B output NOT automatically optional', () => { + // skip-cascade means A and its dependents get skipped at RUNTIME + // but B itself is not marked as skippable in its definition + // so if B does run, its output is required for its own dependents + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'a', retriesExhausted: 'skip-cascade' }, () => ({ + aVal: 1, + })) + .step({ slug: 'b', dependsOn: ['a'] }, (deps) => { + expectTypeOf(deps.a).toEqualTypeOf<{ aVal: number } | undefined>(); + return { bVal: 2 }; + }) + .step({ slug: 'c', dependsOn: ['b'] }, (deps) => { + // B is not skippable in its own definition, so its output is required + expectTypeOf(deps.b).toEqualTypeOf<{ bVal: number }>(); + return { cVal: 3 }; + }); + + type CInput = StepInput; + expectTypeOf().toEqualTypeOf<{ b: { bVal: number } }>(); + }); + }); + + describe('edge cases', () => { + it('root step with skip: valid config, no dependents affected (no deps)', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }).step( + { slug: 'root', retriesExhausted: 'skip' }, + (input) => ({ result: input.value }) + ); + + // Root step has no deps, so StepInput is the flow input + type RootInput = StepInput; + expectTypeOf().toEqualTypeOf<{ value: number }>(); + }); + + it('map step with skip: entire output array is optional type', () => { + const flow = new Flow({ slug: 'test' }) + .map({ slug: 'process', retriesExhausted: 'skip' }, (item) => + item.toUpperCase() + ) + .step({ slug: 'aggregate', dependsOn: ['process'] }, (deps) => { + expectTypeOf(deps.process).toEqualTypeOf(); + return { done: true }; + }); + + type AggInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + process?: string[]; + }>(); + }); + + it('array step with skip: entire output array is optional type', () => { + const flow = new Flow<{ count: number }>({ slug: 'test' }) + .array({ slug: 'generate', retriesExhausted: 'skip' }, (input) => + Array(input.count) + .fill(0) + .map((_, i) => i) + ) + .step({ slug: 'sum', dependsOn: ['generate'] }, (deps) => { + expectTypeOf(deps.generate).toEqualTypeOf(); + return { done: true }; + }); + + type SumInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + generate?: number[]; + }>(); + }); + + it('both else and retriesExhausted set: still skippable', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step( + { + slug: 'both', + if: { value: 42 }, + else: 'skip', + retriesExhausted: 'skip', + }, + () => ({ result: 1 }) + ) + .step({ slug: 'dependent', dependsOn: ['both'] }, (deps) => { + expectTypeOf(deps.both).toEqualTypeOf< + { result: number } | undefined + >(); + return { done: true }; + }); + + type DepInput = StepInput; + expectTypeOf().toEqualTypeOf<{ + both?: { result: number }; + }>(); + }); + }); + + describe('type inference and narrowing', () => { + it('cannot access property on optional dep without null check', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'skippable', retriesExhausted: 'skip' }, () => ({ + foo: 'bar', + })) + .step({ slug: 'dependent', dependsOn: ['skippable'] }, (deps) => { + // Direct property access should be a compile error - we test via runtime pattern + // The type system should make deps.skippable potentially undefined + expectTypeOf(deps.skippable).toEqualTypeOf< + { foo: string } | undefined + >(); + return { done: true }; + }); + + // Type verification + type DepInput = StepInput; + expectTypeOf().toEqualTypeOf<{ skippable?: { foo: string } }>(); + }); + + it('type narrowing works after existence check', () => { + new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'skippable', retriesExhausted: 'skip' }, () => ({ + foo: 'bar', + })) + .step({ slug: 'dependent', dependsOn: ['skippable'] }, (deps) => { + if (deps.skippable) { + // After narrowing, foo is accessible + expectTypeOf(deps.skippable.foo).toEqualTypeOf(); + } + return { done: true }; + }); + }); + + it('handler receives correctly typed deps object', () => { + new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'skip1', retriesExhausted: 'skip' }, () => ({ a: 1 })) + .step({ slug: 'req1' }, () => ({ b: 'str' })) + .step({ slug: 'dependent', dependsOn: ['skip1', 'req1'] }, (deps) => { + // Handler parameter should have correct mixed optionality + expectTypeOf(deps).toEqualTypeOf<{ + skip1?: { a: number }; + req1: { b: string }; + }>(); + return { done: true }; + }); + }); + }); + + describe('utility types', () => { + it('StepOutput returns output type (not metadata)', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'normal' }, () => ({ result: 42 })) + .step({ slug: 'skippable', retriesExhausted: 'skip' }, () => ({ + other: 'str', + })); + + // StepOutput should return the actual output type, not the metadata structure + type NormalOutput = StepOutput; + expectTypeOf().toEqualTypeOf<{ result: number }>(); + + type SkippableOutput = StepOutput; + expectTypeOf().toEqualTypeOf<{ other: string }>(); + }); + + it('keyof ExtractFlowSteps still returns slug union', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .step({ slug: 'a' }, () => 1) + .step({ slug: 'b', retriesExhausted: 'skip' }, () => 2) + .step({ slug: 'c', dependsOn: ['a', 'b'] }, () => 3); + + type StepSlugs = keyof import('../../src/index.js').ExtractFlowSteps< + typeof flow + >; + expectTypeOf().toEqualTypeOf<'a' | 'b' | 'c'>(); + }); + }); + + describe('dependent map with skippable array source', () => { + it('dependent map on skippable array: deps should be optional', () => { + const flow = new Flow<{ value: number }>({ slug: 'test' }) + .array({ slug: 'items', retriesExhausted: 'skip' }, () => [1, 2, 3]) + .map({ slug: 'double', array: 'items' }, (item) => item * 2) + .step({ slug: 'sum', dependsOn: ['double'] }, (deps) => { + // The map step itself doesn't have skip, but its source does + // This is an interesting edge case - map depends on skippable array + // For now, map's own skippability determines its output optionality + expectTypeOf(deps.double).toEqualTypeOf(); + return { done: true }; + }); + + type SumInput = StepInput; + expectTypeOf().toEqualTypeOf<{ double: number[] }>(); + }); + }); +}); diff --git a/pkgs/dsl/src/compile-flow.ts b/pkgs/dsl/src/compile-flow.ts index f6ed91bc0..d321d6dbf 100644 --- a/pkgs/dsl/src/compile-flow.ts +++ b/pkgs/dsl/src/compile-flow.ts @@ -62,18 +62,18 @@ function formatRuntimeOptions(options: RuntimeOptions | StepRuntimeOptions): str parts.push(`start_delay => ${options.startDelay}`); } - if ('condition' in options && options.condition !== undefined) { + if ('if' in options && options.if !== undefined) { // Serialize JSON pattern and escape for SQL - const jsonStr = JSON.stringify(options.condition); + const jsonStr = JSON.stringify(options.if); parts.push(`condition_pattern => '${jsonStr}'`); } - if ('whenUnmet' in options && options.whenUnmet !== undefined) { - parts.push(`when_unmet => '${options.whenUnmet}'`); + if ('else' in options && options.else !== undefined) { + parts.push(`when_unmet => '${options.else}'`); } - if ('whenFailed' in options && options.whenFailed !== undefined) { - parts.push(`when_failed => '${options.whenFailed}'`); + if ('retriesExhausted' in options && options.retriesExhausted !== undefined) { + parts.push(`when_failed => '${options.retriesExhausted}'`); } return parts.length > 0 ? `, ${parts.join(', ')}` : ''; diff --git a/pkgs/dsl/src/dsl.ts b/pkgs/dsl/src/dsl.ts index 555274d9c..8703705a2 100644 --- a/pkgs/dsl/src/dsl.ts +++ b/pkgs/dsl/src/dsl.ts @@ -25,12 +25,11 @@ export type Simplify = { [KeyType in keyof T]: T[KeyType] } & {}; * - Objects: all keys optional, recursively applied * - Arrays: elements expected to be present in target array */ -export type ContainmentPattern = - T extends readonly (infer U)[] - ? ContainmentPattern[] // Array: elements expected to be present - : T extends object - ? { [K in keyof T]?: ContainmentPattern } // Object: all keys optional - : T; // Primitive: exact value match +export type ContainmentPattern = T extends readonly (infer U)[] + ? ContainmentPattern[] // Array: elements expected to be present + : T extends object + ? { [K in keyof T]?: ContainmentPattern } // Object: all keys optional + : T; // Primitive: exact value match // Utility that unwraps Promise and keeps plain values unchanged // Note: `any[]` is required here for proper type inference in conditional types @@ -38,8 +37,8 @@ export type ContainmentPattern = type AwaitedReturn = T extends (...args: any[]) => Promise ? R : T extends (...args: any[]) => infer R - ? R - : never; + ? R + : never; // ======================== // ENVIRONMENT TYPE SYSTEM @@ -67,8 +66,18 @@ export type AnyInput = Json; export type AnyOutput = Json; // Step Types +// Step metadata structure - enriched type that tracks output and skippability +export interface StepMeta< + TOutput = AnyOutput, + TSkippable extends boolean = boolean +> { + output: TOutput; + skippable: TSkippable; +} + export type EmptySteps = Record; -export type AnySteps = Record; // Could use unknown if needed +// AnySteps now uses StepMeta structure for enriched step information +export type AnySteps = Record; // Dependency Types export type EmptyDeps = Record; @@ -115,7 +124,7 @@ export type ExtractFlowInput = TFlow extends Flow< * This creates a union of all step input types */ export type AllStepInputs = { - [K in keyof ExtractFlowSteps & string]: StepInput + [K in keyof ExtractFlowSteps & string]: StepInput; }[keyof ExtractFlowSteps & string]; /** @@ -137,7 +146,7 @@ export type ExtractFlowOutput = TFlow extends Flow< : never; /** - * Extracts the steps type from a Flow + * Extracts the steps type from a Flow (unwraps StepMeta to just output types) * @template TFlow - The Flow type to extract from */ export type ExtractFlowSteps = TFlow extends Flow< @@ -146,6 +155,20 @@ export type ExtractFlowSteps = TFlow extends Flow< infer TS, infer _TD, infer _TEnv +> + ? { [K in keyof TS]: TS[K]['output'] } + : never; + +/** + * Extracts the raw steps type from a Flow (includes StepMeta structure with skippable info) + * @template TFlow - The Flow type to extract from + */ +export type ExtractFlowStepsRaw = TFlow extends Flow< + infer _TI, + infer _TC, + infer TS, + infer _TD, + infer _TEnv > ? TS : never; @@ -220,10 +243,11 @@ export type CompatibleFlow< F extends AnyFlow, PlatformResources extends Record, UserResources extends Record = Record -> = - (FlowContext> & PlatformResources & UserResources) extends ExtractFlowContext - ? F - : never; +> = FlowContext> & + PlatformResources & + UserResources extends ExtractFlowContext + ? F + : never; /** * Extracts the dependencies type from a Flow @@ -238,6 +262,7 @@ type StepDepsOf< /** * Extracts only the leaf steps from a Flow (steps that are not dependencies of any other steps) + * Returns the output types, not the full StepMeta structure * @template TFlow - The Flow type to extract from */ export type ExtractFlowLeafSteps = { @@ -251,6 +276,7 @@ export type ExtractFlowLeafSteps = { // Utility type to extract the output type of a step handler from a Flow // Usage: // StepOutput +// Returns the output type (ExtractFlowSteps already unwraps StepMeta) export type StepOutput< TFlow extends AnyFlow, TStepSlug extends string @@ -258,23 +284,53 @@ export type StepOutput< ? ExtractFlowSteps[TStepSlug] : never; +/** + * Checks if a step is skippable (has else: 'skip' | 'skip-cascade' or retriesExhausted: 'skip' | 'skip-cascade') + * @template TFlow - The Flow type + * @template TStepSlug - The step slug to check + */ +export type IsStepSkippable< + TFlow extends AnyFlow, + TStepSlug extends string +> = TStepSlug extends keyof ExtractFlowStepsRaw + ? ExtractFlowStepsRaw[TStepSlug]['skippable'] + : false; + +// Helper types for StepInput with optional skippable deps +type RequiredDeps = { + [K in Extract< + keyof ExtractFlowSteps, + StepDepsOf + > as IsStepSkippable extends true + ? never + : K]: ExtractFlowSteps[K]; +}; + +type OptionalDeps = { + [K in Extract< + keyof ExtractFlowSteps, + StepDepsOf + > as IsStepSkippable extends true + ? K + : never]?: ExtractFlowSteps[K]; +}; + /** * Asymmetric step input type: * - Root steps (no dependencies): receive flow input directly * - Dependent steps: receive only their dependencies (flow input available via context) + * - Skippable deps (else/retriesExhausted: 'skip' | 'skip-cascade') are optional + * - Required deps are required * * This enables functional composition where subflows can receive typed inputs * without the 'run' wrapper that previously blocked type matching. */ -export type StepInput = - StepDepsOf extends never - ? ExtractFlowInput // Root step: flow input directly - : { - [K in Extract< - keyof ExtractFlowSteps, - StepDepsOf - >]: ExtractFlowSteps[K]; - }; // Dependent step: only deps +export type StepInput< + TFlow extends AnyFlow, + TStepSlug extends string +> = StepDepsOf extends never + ? ExtractFlowInput // Root step: flow input directly + : Simplify & OptionalDeps>; // Runtime options interface for flow-level options export interface RuntimeOptions { @@ -325,44 +381,164 @@ export interface BaseContext { * receive flow_input from SQL; other step types lazy-load it on demand. * Use `await ctx.flowInput` to access the original flow input. */ -export interface FlowContext extends BaseContext { +export interface FlowContext< + TEnv extends Env = Env, + TFlowInput extends AnyInput = AnyInput +> extends BaseContext { stepTask: StepTaskRecord; flowInput: Promise; } // Generic context type helper (uses FlowContext for flow handlers) -export type Context = Record, TEnv extends Env = Env> = FlowContext & T; +export type Context< + T extends Record = Record, + TEnv extends Env = Env +> = FlowContext & T; -// Valid values for whenUnmet option -export type WhenUnmetMode = 'fail' | 'skip' | 'skip-cascade'; +/** + * Options for handling unmet conditions (when 'if' pattern doesn't match input) + * + * @example + * // Fail the step (and run) when pattern doesn't match + * { if: { enabled: true }, else: 'fail' } + * + * @example + * // Skip this step only when pattern doesn't match + * { if: { enabled: true }, else: 'skip' } + * + * @example + * // Skip this step and all dependents when pattern doesn't match + * { if: { enabled: true }, else: 'skip-cascade' } + * + * @remarks + * - `'fail'`: When pattern doesn't match, step fails -> run fails (default) + * - `'skip'`: When pattern doesn't match, skip step and continue (step key omitted from dependent inputs) + * - `'skip-cascade'`: When pattern doesn't match, skip step + mark all dependents as skipped + */ +export type ElseMode = 'fail' | 'skip' | 'skip-cascade'; -// Valid values for whenFailed option (same values as whenUnmet) -export type WhenFailedMode = 'fail' | 'skip' | 'skip-cascade'; +/** + * Options for handling errors after all retries are exhausted + * + * @example + * // Fail the run after retries exhausted (default) + * { retriesExhausted: 'fail' } + * + * @example + * // Skip this step after retries exhausted, continue run + * { retriesExhausted: 'skip' } + * + * @example + * // Skip this step and all dependents after retries exhausted + * { retriesExhausted: 'skip-cascade' } + * + * @remarks + * - `'fail'`: Step fails -> run fails (default behavior) + * - `'skip'`: Mark step as skipped, continue run (step key omitted from dependent inputs) + * - `'skip-cascade'`: Skip step + mark all dependents as skipped too + * + * @note + * TYPE_VIOLATION errors (e.g., single step returns non-array for map dependent) + * are NOT subject to retriesExhausted - these always hard fail as they indicate + * programming errors, not runtime conditions. + */ +export type RetriesExhaustedMode = 'fail' | 'skip' | 'skip-cascade'; + +/** + * Helper type for dependent step handlers - creates deps object with correct optionality. + * Skippable deps (steps with else/retriesExhausted: 'skip' | 'skip-cascade') are optional. + * Required deps are required. + */ +type DepsWithOptionalSkippable< + TSteps extends AnySteps, + TDeps extends string +> = { + [K in TDeps as K extends keyof TSteps + ? TSteps[K]['skippable'] extends true + ? never + : K + : K]: K extends keyof TSteps ? TSteps[K]['output'] : never; +} & { + [K in TDeps as K extends keyof TSteps + ? TSteps[K]['skippable'] extends true + ? K + : never + : never]?: K extends keyof TSteps ? TSteps[K]['output'] : never; +}; // Step runtime options interface that extends flow options with step-specific options -// Note: condition is typed as Json here for internal storage; overloads provide type safety +// Note: 'if' is typed as Json here for internal storage; overloads provide type safety export interface StepRuntimeOptions extends RuntimeOptions { startDelay?: number; - condition?: Json; // JSON pattern for @> containment check - whenUnmet?: WhenUnmetMode; // What to do when condition not met - whenFailed?: WhenFailedMode; // What to do when handler fails after retries + + /** + * Pattern to match using PostgreSQL's @> (contains) operator + * + * @example + * // Root step: match against flow input + * { if: { role: 'admin', active: true } } + * + * @example + * // Dependent step: match against dependency outputs + * { if: { prevStep: { status: 'success' } } } + * + * @remarks + * - Primitives: exact value match + * - Objects: all keys optional, recursively applied + * - Arrays: elements expected to be present in target array + * + * @see ElseMode for controlling what happens when pattern doesn't match + */ + if?: Json; + + /** + * What to do when the 'if' pattern doesn't match the input + * + * @default 'fail' + * + * @example + * { else: 'fail' } // Pattern doesn't match -> step fails -> run fails + * { else: 'skip' } // Pattern doesn't match -> skip step, continue run + * { else: 'skip-cascade' } // Pattern doesn't match -> skip step + all dependents + * + * @see ElseMode for detailed documentation of each mode + */ + else?: ElseMode; + + /** + * What to do when handler throws an error after all retries are exhausted + * + * @default 'fail' + * + * @example + * { retriesExhausted: 'fail' } // Step fails -> run fails + * { retriesExhausted: 'skip' } // Skip step, continue run + * { retriesExhausted: 'skip-cascade' } // Skip step + all dependents + * + * @remarks + * Only applies after maxAttempts retries are exhausted. + * TYPE_VIOLATION errors always fail regardless of this setting. + * + * @see RetriesExhaustedMode for detailed documentation of each mode + */ + retriesExhausted?: RetriesExhaustedMode; } -// Base runtime options without condition (for typed overloads) +// Base runtime options without 'if' (for typed overloads) interface BaseStepRuntimeOptions extends RuntimeOptions { startDelay?: number; - whenUnmet?: WhenUnmetMode; - whenFailed?: WhenFailedMode; + else?: ElseMode; + retriesExhausted?: RetriesExhaustedMode; } -// Typed step options for root steps (condition matches FlowInput pattern) +// Typed step options for root steps (if matches FlowInput pattern) type RootStepOptions = BaseStepRuntimeOptions & { - condition?: ContainmentPattern; + if?: ContainmentPattern; }; -// Typed step options for dependent steps (condition matches deps object pattern) +// Typed step options for dependent steps (if matches deps object pattern) type DependentStepOptions = BaseStepRuntimeOptions & { - condition?: ContainmentPattern; + if?: ContainmentPattern; }; // Define the StepDefinition interface with integrated options @@ -426,6 +602,7 @@ export class Flow< * Returns the step definition with asymmetric input typing: * - Root steps (no dependencies): input is flowInput directly * - Dependent steps: input is deps object only (flowInput available via context) + * - Skippable deps are optional, required deps are required * * @throws Error if the step with the given slug doesn't exist */ @@ -434,12 +611,22 @@ export class Flow< ): StepDefinition< StepDependencies[SlugType] extends [] | readonly [] ? TFlowInput // Root step: flow input directly - : Simplify<{ - [K in StepDependencies[SlugType][number]]: K extends keyof Steps - ? Steps[K] - : never; - }>, // Dependent step: only deps - Steps[SlugType], + : Simplify< + { + [K in StepDependencies[SlugType][number] as K extends keyof Steps + ? Steps[K]['skippable'] extends true + ? never + : K + : never]: K extends keyof Steps ? Steps[K]['output'] : never; + } & { + [K in StepDependencies[SlugType][number] as K extends keyof Steps + ? Steps[K]['skippable'] extends true + ? K + : never + : never]?: K extends keyof Steps ? Steps[K]['output'] : never; + } + >, // Dependent step: only deps (skippable deps optional) + Steps[SlugType]['output'], FlowContext & TContext > { // Check if the slug exists in stepDefinitions using a more explicit pattern @@ -449,19 +636,25 @@ export class Flow< ); } - // Use a type assertion directive to tell TypeScript that this is safe - // @ts-expect-error The type system cannot track that this.stepDefinitions[slug] has the correct type - // but we know it's safe because we only add steps through the strongly-typed `step` method return this.stepDefinitions[slug as string]; } // Overload 1: Root step (no dependsOn) - receives flowInput directly - // condition is typed as ContainmentPattern + // if is typed as ContainmentPattern step< Slug extends string, - TOutput + TOutput, + TElse extends ElseMode | undefined = undefined, + TRetries extends RetriesExhaustedMode | undefined = undefined >( - opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn?: never } & RootStepOptions>, + opts: Simplify< + { + slug: Slug extends keyof Steps ? never : Slug; + dependsOn?: never; + else?: TElse; + retriesExhausted?: TRetries; + } & Omit, 'else' | 'retriesExhausted'> + >, handler: ( flowInput: TFlowInput, context: FlowContext & TContext @@ -469,28 +662,59 @@ export class Flow< ): Flow< TFlowInput, TContext, - Steps & { [K in Slug]: Awaited }, + Steps & { + [K in Slug]: StepMeta< + Awaited, + TElse extends 'skip' | 'skip-cascade' + ? true + : TRetries extends 'skip' | 'skip-cascade' + ? true + : false + >; + }, StepDependencies & { [K in Slug]: [] }, TEnv >; // Overload 2: Dependent step (with dependsOn) - receives deps, flowInput via context - // condition is typed as ContainmentPattern + // if is typed as ContainmentPattern // Note: [Deps, ...Deps[]] requires at least one dependency - empty arrays are rejected at compile time + // Handler receives deps with correct optionality based on upstream steps' skippability step< Slug extends string, Deps extends Extract, - TOutput + TOutput, + TElse extends ElseMode | undefined = undefined, + TRetries extends RetriesExhaustedMode | undefined = undefined >( - opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: [Deps, ...Deps[]] } & DependentStepOptions<{ [K in Deps]: K extends keyof Steps ? Steps[K] : never }>>, + opts: Simplify< + { + slug: Slug extends keyof Steps ? never : Slug; + dependsOn: [Deps, ...Deps[]]; + else?: TElse; + retriesExhausted?: TRetries; + } & Omit< + DependentStepOptions>>, + 'else' | 'retriesExhausted' + > + >, handler: ( - deps: { [K in Deps]: K extends keyof Steps ? Steps[K] : never }, + deps: Simplify>, context: FlowContext & TContext ) => TOutput | Promise ): Flow< TFlowInput, TContext, - Steps & { [K in Slug]: Awaited }, + Steps & { + [K in Slug]: StepMeta< + Awaited, + TElse extends 'skip' | 'skip-cascade' + ? true + : TRetries extends 'skip' | 'skip-cascade' + ? true + : false + >; + }, StepDependencies & { [K in Slug]: Deps[] }, TEnv >; @@ -522,9 +746,10 @@ export class Flow< if (opts.baseDelay !== undefined) options.baseDelay = opts.baseDelay; if (opts.timeout !== undefined) options.timeout = opts.timeout; if (opts.startDelay !== undefined) options.startDelay = opts.startDelay; - if (opts.condition !== undefined) options.condition = opts.condition; - if (opts.whenUnmet !== undefined) options.whenUnmet = opts.whenUnmet; - if (opts.whenFailed !== undefined) options.whenFailed = opts.whenFailed; + if (opts.if !== undefined) options.if = opts.if; + if (opts.else !== undefined) options.else = opts.else; + if (opts.retriesExhausted !== undefined) + options.retriesExhausted = opts.retriesExhausted; // Validate runtime options (optional for step level) validateRuntimeOptions(options, { optional: true }); @@ -568,12 +793,21 @@ export class Flow< * @returns A new Flow instance with the array step added */ // Overload 1: Root array (no dependsOn) - receives flowInput directly - // condition is typed as ContainmentPattern + // if is typed as ContainmentPattern array< Slug extends string, - TOutput extends readonly any[] + TOutput extends readonly any[], + TElse extends ElseMode | undefined = undefined, + TRetries extends RetriesExhaustedMode | undefined = undefined >( - opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn?: never } & RootStepOptions>, + opts: Simplify< + { + slug: Slug extends keyof Steps ? never : Slug; + dependsOn?: never; + else?: TElse; + retriesExhausted?: TRetries; + } & Omit, 'else' | 'retriesExhausted'> + >, handler: ( flowInput: TFlowInput, context: FlowContext & TContext @@ -581,28 +815,58 @@ export class Flow< ): Flow< TFlowInput, TContext, - Steps & { [K in Slug]: Awaited }, + Steps & { + [K in Slug]: StepMeta< + Awaited, + TElse extends 'skip' | 'skip-cascade' + ? true + : TRetries extends 'skip' | 'skip-cascade' + ? true + : false + >; + }, StepDependencies & { [K in Slug]: [] }, TEnv >; // Overload 2: Dependent array (with dependsOn) - receives deps, flowInput via context - // condition is typed as ContainmentPattern + // if is typed as ContainmentPattern // Note: [Deps, ...Deps[]] requires at least one dependency - empty arrays are rejected at compile time array< Slug extends string, Deps extends Extract, - TOutput extends readonly any[] + TOutput extends readonly any[], + TElse extends ElseMode | undefined = undefined, + TRetries extends RetriesExhaustedMode | undefined = undefined >( - opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: [Deps, ...Deps[]] } & DependentStepOptions<{ [K in Deps]: K extends keyof Steps ? Steps[K] : never }>>, + opts: Simplify< + { + slug: Slug extends keyof Steps ? never : Slug; + dependsOn: [Deps, ...Deps[]]; + else?: TElse; + retriesExhausted?: TRetries; + } & Omit< + DependentStepOptions>>, + 'else' | 'retriesExhausted' + > + >, handler: ( - deps: { [K in Deps]: K extends keyof Steps ? Steps[K] : never }, + deps: Simplify>, context: FlowContext & TContext ) => TOutput | Promise ): Flow< TFlowInput, TContext, - Steps & { [K in Slug]: Awaited }, + Steps & { + [K in Slug]: StepMeta< + Awaited, + TElse extends 'skip' | 'skip-cascade' + ? true + : TRetries extends 'skip' | 'skip-cascade' + ? true + : false + >; + }, StepDependencies & { [K in Slug]: Deps[] }, TEnv >; @@ -625,38 +889,82 @@ export class Flow< * @returns A new Flow instance with the map step added */ // Overload for root map - handler receives item, context includes flowInput - // condition is typed as ContainmentPattern (checks the array itself) + // if is typed as ContainmentPattern (checks the array itself) map< Slug extends string, THandler extends TFlowInput extends readonly (infer Item)[] - ? (item: Item, context: FlowContext & TContext) => Json | Promise - : never + ? ( + item: Item, + context: FlowContext & TContext + ) => Json | Promise + : never, + TElse extends ElseMode | undefined = undefined, + TRetries extends RetriesExhaustedMode | undefined = undefined >( - opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug } & RootStepOptions>, + opts: Simplify< + { + slug: Slug extends keyof Steps ? never : Slug; + else?: TElse; + retriesExhausted?: TRetries; + } & Omit, 'else' | 'retriesExhausted'> + >, handler: THandler ): Flow< TFlowInput, TContext, - Steps & { [K in Slug]: AwaitedReturn[] }, + Steps & { + [K in Slug]: StepMeta< + AwaitedReturn[], + TElse extends 'skip' | 'skip-cascade' + ? true + : TRetries extends 'skip' | 'skip-cascade' + ? true + : false + >; + }, StepDependencies & { [K in Slug]: [] }, TEnv >; // Overload for dependent map - handler receives item, context includes flowInput - // condition is typed as ContainmentPattern<{ arrayDep: ArrayOutput }> (checks the dep object) + // if is typed as ContainmentPattern<{ arrayDep: ArrayOutput }> (checks the dep object) map< Slug extends string, TArrayDep extends Extract, - THandler extends Steps[TArrayDep] extends readonly (infer Item)[] - ? (item: Item, context: FlowContext & TContext) => Json | Promise - : never + THandler extends Steps[TArrayDep]['output'] extends readonly (infer Item)[] + ? ( + item: Item, + context: FlowContext & TContext + ) => Json | Promise + : never, + TElse extends ElseMode | undefined = undefined, + TRetries extends RetriesExhaustedMode | undefined = undefined >( - opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; array: TArrayDep } & DependentStepOptions<{ [K in TArrayDep]: Steps[K] }>>, + opts: Simplify< + { + slug: Slug extends keyof Steps ? never : Slug; + array: TArrayDep; + else?: TElse; + retriesExhausted?: TRetries; + } & Omit< + DependentStepOptions<{ [K in TArrayDep]: Steps[K]['output'] }>, + 'else' | 'retriesExhausted' + > + >, handler: THandler ): Flow< TFlowInput, TContext, - Steps & { [K in Slug]: AwaitedReturn[] }, + Steps & { + [K in Slug]: StepMeta< + AwaitedReturn[], + TElse extends 'skip' | 'skip-cascade' + ? true + : TRetries extends 'skip' | 'skip-cascade' + ? true + : false + >; + }, StepDependencies & { [K in Slug]: [TArrayDep] }, TEnv >; @@ -678,7 +986,9 @@ export class Flow< if (arrayDep) { // Dependent map - validate single dependency exists and returns array if (!this.stepDefinitions[arrayDep]) { - throw new Error(`Step "${slug}" depends on undefined step "${arrayDep}"`); + throw new Error( + `Step "${slug}" depends on undefined step "${arrayDep}"` + ); } dependencies = [arrayDep]; } else { @@ -692,16 +1002,21 @@ export class Flow< if (opts.baseDelay !== undefined) options.baseDelay = opts.baseDelay; if (opts.timeout !== undefined) options.timeout = opts.timeout; if (opts.startDelay !== undefined) options.startDelay = opts.startDelay; - if (opts.condition !== undefined) options.condition = opts.condition; - if (opts.whenUnmet !== undefined) options.whenUnmet = opts.whenUnmet; - if (opts.whenFailed !== undefined) options.whenFailed = opts.whenFailed; + if (opts.if !== undefined) options.if = opts.if; + if (opts.else !== undefined) options.else = opts.else; + if (opts.retriesExhausted !== undefined) + options.retriesExhausted = opts.retriesExhausted; // Validate runtime options validateRuntimeOptions(options, { optional: true }); // Create the map step definition with stepType // Note: We use AnyInput/AnyOutput here because the actual types are handled at the type level via overloads - const newStepDefinition: StepDefinition = { + const newStepDefinition: StepDefinition< + AnyInput, + AnyOutput, + BaseContext & TContext + > = { slug, handler: handler as any, // Type assertion needed due to complex generic constraints dependencies,