Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,32 +76,63 @@ select pgflow.complete_task(
'{"data": "from_b"}'::jsonb
);

-- Test 3: step_c remaining_deps should be 0 (both deps resolved - a skipped, b completed)
-- Test 3: Verify step_c remaining_deps is 0 (ready to start)
select is(
(select remaining_deps from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'step_c'),
0,
'step_c remaining_deps should be 0 (a skipped + b completed)'
);

-- Test 4: step_c should now be started
-- Now read and start step_c - this replicates what read_and_start does
-- and allows us to inspect the returned input value
--
-- We need to do this in steps:
-- 1. Read the message from the queue
-- 2. Start the task with start_tasks
-- 3. Inspect the input returned by start_tasks

-- Read the message and store msg_id
with read_msg as (
select * from pgmq.read_with_poll('skip_diamond', 1, 1, 1, 50)
limit 1
),
msg_ids as (
select array_agg(msg_id) as ids from read_msg
),
-- Start the task and get the input
start_result as (
select st.input, st.step_slug, st.run_id
from pgflow.start_tasks(
'skip_diamond',
(select ids from msg_ids),
pgflow_tests.ensure_worker('skip_diamond')
) st
)
-- Store the input for later testing
select input, step_slug, run_id into temporary step_c_inputs
from start_result
where step_slug = 'step_c';

-- Test 4: Verify step_c was started
select is(
(select status from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'step_c'),
'started',
'step_c should be started after step_b completes'
'step_c should be started after read_and_start'
);

-- Test 5: step_b output should be in step_states
-- Test 5: Verify the input does NOT contain step_a key
-- The handler input should only have step_b, NOT step_a
select is(
(select output from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'step_b'),
'{"data": "from_b"}'::jsonb,
'step_b output should be stored'
(select input from step_c_inputs),
'{"step_b": {"data": "from_b"}}'::jsonb,
'step_c input should only contain step_b, not step_a (skipped deps are excluded)'
);

-- Clean up
drop table if exists run_ids;
drop table if exists step_c_inputs;

select finish();
rollback;
60 changes: 30 additions & 30 deletions pkgs/dsl/__tests__/runtime/condition-options.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,69 +3,69 @@ import { Flow } from '../../src/dsl.js';
import { compileFlow } from '../../src/compile-flow.js';

describe('Condition Options', () => {
describe('DSL accepts condition and whenUnmet', () => {
it('should accept condition option on a step', () => {
describe('DSL accepts if and else', () => {
it('should accept if option on a step', () => {
const flow = new Flow({ slug: 'test_flow' })
.step(
{ slug: 'conditional_step', condition: { enabled: true } },
{ slug: 'conditional_step', if: { enabled: true } },
() => 'result'
);

const step = flow.getStepDefinition('conditional_step');
expect(step.options.condition).toEqual({ enabled: true });
expect(step.options.if).toEqual({ enabled: true });
});

it('should accept whenUnmet option on a step', () => {
it('should accept else option on a step', () => {
const flow = new Flow({ slug: 'test_flow' })
.step(
{ slug: 'conditional_step', whenUnmet: 'skip' },
{ slug: 'conditional_step', else: 'skip' },
() => 'result'
);

const step = flow.getStepDefinition('conditional_step');
expect(step.options.whenUnmet).toBe('skip');
expect(step.options.else).toBe('skip');
});

it('should accept both condition and whenUnmet together', () => {
it('should accept both if and else together', () => {
const flow = new Flow({ slug: 'test_flow' })
.step(
{
slug: 'conditional_step',
condition: { status: 'active' },
whenUnmet: 'skip-cascade',
if: { status: 'active' },
else: 'skip-cascade',
},
() => 'result'
);

const step = flow.getStepDefinition('conditional_step');
expect(step.options.condition).toEqual({ status: 'active' });
expect(step.options.whenUnmet).toBe('skip-cascade');
expect(step.options.if).toEqual({ status: 'active' });
expect(step.options.else).toBe('skip-cascade');
});

it('should accept condition on dependent steps', () => {
it('should accept if on dependent steps', () => {
const flow = new Flow({ slug: 'test_flow' })
.step({ slug: 'first' }, () => ({ success: true }))
.step(
{
slug: 'conditional_step',
dependsOn: ['first'],
condition: { first: { success: true } },
whenUnmet: 'skip',
if: { first: { success: true } },
else: 'skip',
},
() => 'result'
);

const step = flow.getStepDefinition('conditional_step');
expect(step.options.condition).toEqual({ first: { success: true } });
expect(step.options.whenUnmet).toBe('skip');
expect(step.options.if).toEqual({ first: { success: true } });
expect(step.options.else).toBe('skip');
});
});

describe('compileFlow includes condition parameters', () => {
it('should compile condition_pattern for root step', () => {
const flow = new Flow({ slug: 'test_flow' })
.step(
{ slug: 'step1', condition: { enabled: true } },
{ slug: 'step1', if: { enabled: true } },
() => 'result'
);

Expand All @@ -78,7 +78,7 @@ describe('Condition Options', () => {
it('should compile when_unmet for step', () => {
const flow = new Flow({ slug: 'test_flow' })
.step(
{ slug: 'step1', whenUnmet: 'fail' },
{ slug: 'step1', else: 'fail' },
() => 'result'
);

Expand All @@ -93,8 +93,8 @@ describe('Condition Options', () => {
.step(
{
slug: 'step1',
condition: { active: true, type: 'premium' },
whenUnmet: 'skip-cascade',
if: { active: true, type: 'premium' },
else: 'skip-cascade',
},
() => 'result'
);
Expand All @@ -113,8 +113,8 @@ describe('Condition Options', () => {
slug: 'step1',
maxAttempts: 3,
timeout: 60,
condition: { enabled: true },
whenUnmet: 'skip',
if: { enabled: true },
else: 'skip',
},
() => 'result'
);
Expand All @@ -135,8 +135,8 @@ describe('Condition Options', () => {
{
slug: 'second',
dependsOn: ['first'],
condition: { first: { success: true } },
whenUnmet: 'skip',
if: { first: { success: true } },
else: 'skip',
},
() => 'result'
);
Expand All @@ -150,26 +150,26 @@ describe('Condition Options', () => {
});
});

describe('whenUnmet validation', () => {
it('should only accept valid whenUnmet values', () => {
describe('else validation', () => {
it('should only accept valid else values', () => {
// Valid values should work
expect(() =>
new Flow({ slug: 'test' }).step(
{ slug: 's1', whenUnmet: 'fail' },
{ slug: 's1', else: 'fail' },
() => 1
)
).not.toThrow();

expect(() =>
new Flow({ slug: 'test' }).step(
{ slug: 's1', whenUnmet: 'skip' },
{ slug: 's1', else: 'skip' },
() => 1
)
).not.toThrow();

expect(() =>
new Flow({ slug: 'test' }).step(
{ slug: 's1', whenUnmet: 'skip-cascade' },
{ slug: 's1', else: 'skip-cascade' },
() => 1
)
).not.toThrow();
Expand Down
Loading