From 073526449dc07d07891fbedce2b75bb0212cf7bc Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Mon, 5 Jan 2026 09:37:01 +0100 Subject: [PATCH] add condition evaluation in start_ready_steps and DSL support --- ...00_function__cascade_force_skip_steps.sql} | 16 +- ...00_function_cascade_resolve_conditions.sql | 259 ++++ .../schemas/0100_function_complete_task.sql | 20 + .../core/schemas/0100_function_start_flow.sql | 8 + .../0100_function_start_ready_steps.sql | 96 +- .../schemas/0120_function_start_tasks.sql | 3 +- pkgs/core/src/database-types.ts | 9 +- ...074725_pgflow_temp_skip_infrastructure.sql | 173 --- ...0105121138_pgflow_condition_evaluation.sql | 1203 +++++++++++++++++ ...03847_pgflow_rename_cascade_skip_steps.sql | 348 +++++ pkgs/core/supabase/migrations/atlas.sum | 5 +- .../broadcast_order.test.sql | 4 +- .../cascade_through_multiple_levels.test.sql | 4 +- .../cascade_to_single_dependent.test.sql | 4 +- .../multi_dependency_partial_skip.test.sql | 4 +- .../single_step_skip.test.sql | 6 +- .../skipped_event_payload.test.sql | 4 +- .../dependent_step_condition_met.test.sql | 68 + ...pendent_step_condition_unmet_skip.test.sql | 75 + .../no_condition_always_executes.test.sql | 40 + ...n_skip_iterates_until_convergence.test.sql | 121 ++ .../plain_skip_propagates_to_map.test.sql | 108 ++ .../root_step_condition_met.test.sql | 56 + .../root_step_condition_unmet_fail.test.sql | 62 + .../root_step_condition_unmet_skip.test.sql | 73 + ...step_condition_unmet_skip_cascade.test.sql | 86 ++ .../skipped_deps_excluded_from_input.test.sql | 107 ++ .../runtime/condition-options.test.ts | 178 +++ pkgs/dsl/src/compile-flow.ts | 10 + pkgs/dsl/src/dsl.ts | 9 + 30 files changed, 2881 insertions(+), 278 deletions(-) rename pkgs/core/schemas/{0100_function_cascade_skip_steps.sql => 0100_function__cascade_force_skip_steps.sql} (84%) create mode 100644 pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql delete mode 100644 pkgs/core/supabase/migrations/20260105074725_pgflow_temp_skip_infrastructure.sql create mode 100644 pkgs/core/supabase/migrations/20260105121138_pgflow_condition_evaluation.sql create mode 100644 pkgs/core/supabase/migrations/20260105203847_pgflow_rename_cascade_skip_steps.sql rename pkgs/core/supabase/tests/{cascade_skip_steps => _cascade_force_skip_steps}/broadcast_order.test.sql (93%) rename pkgs/core/supabase/tests/{cascade_skip_steps => _cascade_force_skip_steps}/cascade_through_multiple_levels.test.sql (95%) rename pkgs/core/supabase/tests/{cascade_skip_steps => _cascade_force_skip_steps}/cascade_to_single_dependent.test.sql (95%) rename pkgs/core/supabase/tests/{cascade_skip_steps => _cascade_force_skip_steps}/multi_dependency_partial_skip.test.sql (95%) rename pkgs/core/supabase/tests/{cascade_skip_steps => _cascade_force_skip_steps}/single_step_skip.test.sql (91%) rename pkgs/core/supabase/tests/{cascade_skip_steps => _cascade_force_skip_steps}/skipped_event_payload.test.sql (94%) create mode 100644 pkgs/core/supabase/tests/condition_evaluation/dependent_step_condition_met.test.sql create mode 100644 pkgs/core/supabase/tests/condition_evaluation/dependent_step_condition_unmet_skip.test.sql create mode 100644 pkgs/core/supabase/tests/condition_evaluation/no_condition_always_executes.test.sql create mode 100644 pkgs/core/supabase/tests/condition_evaluation/plain_skip_iterates_until_convergence.test.sql create mode 100644 pkgs/core/supabase/tests/condition_evaluation/plain_skip_propagates_to_map.test.sql create mode 100644 pkgs/core/supabase/tests/condition_evaluation/root_step_condition_met.test.sql create mode 100644 pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_fail.test.sql create mode 100644 pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_skip.test.sql create mode 100644 pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_skip_cascade.test.sql create mode 100644 pkgs/core/supabase/tests/condition_evaluation/skipped_deps_excluded_from_input.test.sql create mode 100644 pkgs/dsl/__tests__/runtime/condition-options.test.ts diff --git a/pkgs/core/schemas/0100_function_cascade_skip_steps.sql b/pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql similarity index 84% rename from pkgs/core/schemas/0100_function_cascade_skip_steps.sql rename to pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql index 62be9f53d..18d481d4e 100644 --- a/pkgs/core/schemas/0100_function_cascade_skip_steps.sql +++ b/pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql @@ -1,6 +1,6 @@ --- cascade_skip_steps: Skip a step and cascade to all downstream dependents +-- _cascade_force_skip_steps: Skip a step and cascade to all downstream dependents -- Used when a condition is unmet (whenUnmet: skip-cascade) or handler fails (whenFailed: skip-cascade) -create or replace function pgflow.cascade_skip_steps( +create or replace function pgflow._cascade_force_skip_steps( run_id uuid, step_slug text, skip_reason text @@ -15,10 +15,10 @@ BEGIN -- Get flow_slug for this run SELECT r.flow_slug INTO v_flow_slug FROM pgflow.runs r - WHERE r.run_id = cascade_skip_steps.run_id; + WHERE r.run_id = _cascade_force_skip_steps.run_id; IF v_flow_slug IS NULL THEN - RAISE EXCEPTION 'Run not found: %', cascade_skip_steps.run_id; + RAISE EXCEPTION 'Run not found: %', _cascade_force_skip_steps.run_id; END IF; -- ========================================== @@ -34,10 +34,10 @@ BEGIN s.flow_slug, s.step_slug, s.step_index, - cascade_skip_steps.skip_reason AS reason -- Original reason for trigger step + _cascade_force_skip_steps.skip_reason AS reason -- Original reason for trigger step FROM pgflow.steps s WHERE s.flow_slug = v_flow_slug - AND s.step_slug = cascade_skip_steps.step_slug + AND s.step_slug = _cascade_force_skip_steps.step_slug UNION ALL @@ -69,7 +69,7 @@ BEGIN skipped_at = now(), remaining_tasks = NULL -- Clear remaining_tasks for skipped steps FROM steps_to_skip sts - WHERE ss.run_id = cascade_skip_steps.run_id + WHERE ss.run_id = _cascade_force_skip_steps.run_id AND ss.step_slug = sts.step_slug AND ss.status IN ('created', 'started') -- Only skip non-terminal steps RETURNING @@ -95,7 +95,7 @@ BEGIN UPDATE pgflow.runs r SET remaining_steps = r.remaining_steps - skipped_count.count FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count - WHERE r.run_id = cascade_skip_steps.run_id + WHERE r.run_id = _cascade_force_skip_steps.run_id AND skipped_count.count > 0 ) SELECT COUNT(*) INTO v_total_skipped FROM skipped; diff --git a/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql b/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql new file mode 100644 index 000000000..04a21893f --- /dev/null +++ b/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql @@ -0,0 +1,259 @@ +-- cascade_resolve_conditions: Evaluate step conditions and handle skip/fail modes +-- Called before cascade_complete_taskless_steps to evaluate conditions on ready steps. +-- Must iterate until convergence since skipping a step can make dependents ready. +-- +-- Returns: +-- true = run can continue (or nothing to do) +-- false = run was failed (due to fail mode) +create or replace function pgflow.cascade_resolve_conditions(run_id uuid) +returns boolean +language plpgsql +set search_path to '' +as $$ +DECLARE + v_run_input jsonb; + v_run_status text; + v_first_fail record; + v_iteration_count int := 0; + v_max_iterations int := 50; + v_processed_count int; +BEGIN + -- ========================================== + -- GUARD: Early return if run is already terminal + -- ========================================== + SELECT r.status, r.input INTO v_run_status, v_run_input + FROM pgflow.runs r + WHERE r.run_id = cascade_resolve_conditions.run_id; + + IF v_run_status IN ('failed', 'completed') THEN + RETURN v_run_status != 'failed'; + END IF; + + -- ========================================== + -- ITERATE UNTIL CONVERGENCE + -- ========================================== + -- After skipping steps, dependents may become ready and need evaluation. + -- Loop until no more steps are processed. + LOOP + v_iteration_count := v_iteration_count + 1; + IF v_iteration_count > v_max_iterations THEN + RAISE EXCEPTION 'cascade_resolve_conditions exceeded safety limit of % iterations', v_max_iterations; + END IF; + + v_processed_count := 0; + + -- ========================================== + -- PHASE 1a: CHECK FOR FAIL CONDITIONS + -- ========================================== + -- Find first step (by topological order) with unmet condition and 'fail' mode. + WITH steps_with_conditions AS ( + SELECT + step_state.flow_slug, + step_state.step_slug, + step.condition_pattern, + step.when_unmet, + step.deps_count, + step.step_index + FROM pgflow.step_states AS step_state + JOIN pgflow.steps AS step + ON step.flow_slug = step_state.flow_slug + AND step.step_slug = step_state.step_slug + WHERE step_state.run_id = cascade_resolve_conditions.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step.condition_pattern IS NOT NULL + ), + step_deps_output AS ( + SELECT + swc.step_slug, + jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output + FROM steps_with_conditions swc + JOIN pgflow.deps dep ON dep.flow_slug = swc.flow_slug AND dep.step_slug = swc.step_slug + JOIN pgflow.step_states dep_state + ON dep_state.run_id = cascade_resolve_conditions.run_id + AND dep_state.step_slug = dep.dep_slug + AND dep_state.status = 'completed' -- Only completed deps (not skipped) + WHERE swc.deps_count > 0 + GROUP BY swc.step_slug + ), + condition_evaluations AS ( + SELECT + swc.*, + CASE + WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern + ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern + END AS condition_met + FROM steps_with_conditions swc + LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug + ) + SELECT flow_slug, step_slug, condition_pattern + INTO v_first_fail + FROM condition_evaluations + WHERE NOT condition_met AND when_unmet = 'fail' + ORDER BY step_index + LIMIT 1; + + -- Handle fail mode: fail step and run, return false + IF v_first_fail IS NOT NULL THEN + UPDATE pgflow.step_states + SET status = 'failed', + failed_at = now(), + error_message = 'Condition not met: ' || v_first_fail.condition_pattern::text + WHERE pgflow.step_states.run_id = cascade_resolve_conditions.run_id + AND pgflow.step_states.step_slug = v_first_fail.step_slug; + + UPDATE pgflow.runs + SET status = 'failed', + failed_at = now() + WHERE pgflow.runs.run_id = cascade_resolve_conditions.run_id; + + RETURN false; + END IF; + + -- ========================================== + -- PHASE 1b: HANDLE SKIP CONDITIONS (with propagation) + -- ========================================== + -- Skip steps with unmet conditions and whenUnmet='skip'. + -- NEW: Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents. + WITH steps_with_conditions AS ( + SELECT + step_state.flow_slug, + step_state.step_slug, + step.condition_pattern, + step.when_unmet, + step.deps_count, + step.step_index + FROM pgflow.step_states AS step_state + JOIN pgflow.steps AS step + ON step.flow_slug = step_state.flow_slug + AND step.step_slug = step_state.step_slug + WHERE step_state.run_id = cascade_resolve_conditions.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step.condition_pattern IS NOT NULL + ), + step_deps_output AS ( + SELECT + swc.step_slug, + jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output + FROM steps_with_conditions swc + JOIN pgflow.deps dep ON dep.flow_slug = swc.flow_slug AND dep.step_slug = swc.step_slug + JOIN pgflow.step_states dep_state + ON dep_state.run_id = cascade_resolve_conditions.run_id + AND dep_state.step_slug = dep.dep_slug + AND dep_state.status = 'completed' -- Only completed deps (not skipped) + WHERE swc.deps_count > 0 + GROUP BY swc.step_slug + ), + condition_evaluations AS ( + SELECT + swc.*, + CASE + WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern + ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern + END AS condition_met + FROM steps_with_conditions swc + LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug + ), + unmet_skip_steps AS ( + SELECT * FROM condition_evaluations + WHERE NOT condition_met AND when_unmet = 'skip' + ), + skipped_steps AS ( + UPDATE pgflow.step_states ss + SET status = 'skipped', + skip_reason = 'condition_unmet', + skipped_at = now() + FROM unmet_skip_steps uss + WHERE ss.run_id = cascade_resolve_conditions.run_id + AND ss.step_slug = uss.step_slug + RETURNING + ss.*, + realtime.send( + jsonb_build_object( + 'event_type', 'step:skipped', + 'run_id', ss.run_id, + 'flow_slug', ss.flow_slug, + 'step_slug', ss.step_slug, + 'status', 'skipped', + 'skip_reason', 'condition_unmet', + 'skipped_at', ss.skipped_at + ), + concat('step:', ss.step_slug, ':skipped'), + concat('pgflow:run:', ss.run_id), + false + ) AS _broadcast_result + ), + -- NEW: Update dependent steps (decrement remaining_deps, set initial_tasks=0 for maps) + dependent_updates AS ( + UPDATE pgflow.step_states child_state + SET remaining_deps = child_state.remaining_deps - 1, + -- If child is a map step and this skipped step is its only dependency, + -- set initial_tasks = 0 (skipped dep = empty array) + initial_tasks = CASE + WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0 + ELSE child_state.initial_tasks + END + FROM skipped_steps parent + JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug + JOIN pgflow.steps child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug + WHERE child_state.run_id = cascade_resolve_conditions.run_id + AND child_state.step_slug = dep.step_slug + ), + run_update AS ( + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - (SELECT COUNT(*) FROM skipped_steps) + WHERE r.run_id = cascade_resolve_conditions.run_id + AND (SELECT COUNT(*) FROM skipped_steps) > 0 + ) + SELECT COUNT(*)::int INTO v_processed_count FROM skipped_steps; + + -- ========================================== + -- PHASE 1c: HANDLE SKIP-CASCADE CONDITIONS + -- ========================================== + -- Call _cascade_force_skip_steps for each step with unmet condition and whenUnmet='skip-cascade'. + -- Process in topological order; _cascade_force_skip_steps is idempotent. + PERFORM pgflow._cascade_force_skip_steps(cascade_resolve_conditions.run_id, ready_step.step_slug, 'condition_unmet') + FROM pgflow.step_states AS ready_step + JOIN pgflow.steps AS step + ON step.flow_slug = ready_step.flow_slug + AND step.step_slug = ready_step.step_slug + LEFT JOIN LATERAL ( + SELECT jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output + FROM pgflow.deps dep + JOIN pgflow.step_states dep_state + ON dep_state.run_id = cascade_resolve_conditions.run_id + AND dep_state.step_slug = dep.dep_slug + AND dep_state.status = 'completed' -- Only completed deps (not skipped) + WHERE dep.flow_slug = ready_step.flow_slug + AND dep.step_slug = ready_step.step_slug + ) AS agg_deps ON step.deps_count > 0 + WHERE ready_step.run_id = cascade_resolve_conditions.run_id + AND ready_step.status = 'created' + AND ready_step.remaining_deps = 0 + AND step.condition_pattern IS NOT NULL + AND step.when_unmet = 'skip-cascade' + AND NOT ( + CASE + WHEN step.deps_count = 0 THEN v_run_input @> step.condition_pattern + ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) @> step.condition_pattern + END + ) + ORDER BY step.step_index; + + -- Check if run was failed during cascade (e.g., if _cascade_force_skip_steps triggers fail) + SELECT r.status INTO v_run_status + FROM pgflow.runs r + WHERE r.run_id = cascade_resolve_conditions.run_id; + + IF v_run_status IN ('failed', 'completed') THEN + RETURN v_run_status != 'failed'; + END IF; + + -- Exit loop if no steps were processed in this iteration + EXIT WHEN v_processed_count = 0; + END LOOP; + + RETURN true; +END; +$$; diff --git a/pkgs/core/schemas/0100_function_complete_task.sql b/pkgs/core/schemas/0100_function_complete_task.sql index 9daa172dc..521e8a3af 100644 --- a/pkgs/core/schemas/0100_function_complete_task.sql +++ b/pkgs/core/schemas/0100_function_complete_task.sql @@ -312,6 +312,26 @@ IF v_step_state.status = 'completed' THEN false ); + -- THEN evaluate conditions on newly-ready dependent steps + -- This must happen before cascade_complete_taskless_steps so that + -- skipped steps can set initial_tasks=0 for their map dependents + IF NOT pgflow.cascade_resolve_conditions(complete_task.run_id) THEN + -- Run was failed due to a condition with when_unmet='fail' + -- Archive the current task's message before returning + PERFORM pgmq.archive( + (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id), + (SELECT st.message_id FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index) + ); + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + RETURN; + END IF; + -- THEN cascade complete any taskless steps that are now ready -- This ensures dependent children broadcast AFTER their parent PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); diff --git a/pkgs/core/schemas/0100_function_start_flow.sql b/pkgs/core/schemas/0100_function_start_flow.sql index f0a2bfed3..abae94696 100644 --- a/pkgs/core/schemas/0100_function_start_flow.sql +++ b/pkgs/core/schemas/0100_function_start_flow.sql @@ -110,6 +110,14 @@ PERFORM realtime.send( false ); +-- ---------- Evaluate conditions on ready steps ---------- +-- Skip steps with unmet conditions, propagate to dependents +IF NOT pgflow.cascade_resolve_conditions(v_created_run.run_id) THEN + -- Run was failed due to a condition with when_unmet='fail' + RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id; + RETURN; +END IF; + -- ---------- Complete taskless steps ---------- -- Handle empty array maps that should auto-complete PERFORM pgflow.cascade_complete_taskless_steps(v_created_run.run_id); diff --git a/pkgs/core/schemas/0100_function_start_ready_steps.sql b/pkgs/core/schemas/0100_function_start_ready_steps.sql index 5e82a3659..a70ca9f26 100644 --- a/pkgs/core/schemas/0100_function_start_ready_steps.sql +++ b/pkgs/core/schemas/0100_function_start_ready_steps.sql @@ -3,69 +3,25 @@ returns void language plpgsql set search_path to '' as $$ -begin +BEGIN -- ========================================== --- GUARD: No mutations on failed runs +-- GUARD: No mutations on terminal runs -- ========================================== -IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = start_ready_steps.run_id AND pgflow.runs.status = 'failed') THEN +IF EXISTS ( + SELECT 1 FROM pgflow.runs + WHERE pgflow.runs.run_id = start_ready_steps.run_id + AND pgflow.runs.status IN ('failed', 'completed') +) THEN RETURN; END IF; -- ========================================== --- HANDLE EMPTY ARRAY MAPS (initial_tasks = 0) --- ========================================== --- These complete immediately without spawning tasks -WITH empty_map_steps AS ( - SELECT step_state.* - FROM pgflow.step_states AS step_state - JOIN pgflow.steps AS step - ON step.flow_slug = step_state.flow_slug - AND step.step_slug = step_state.step_slug - WHERE step_state.run_id = start_ready_steps.run_id - AND step_state.status = 'created' - AND step_state.remaining_deps = 0 - AND step.step_type = 'map' - AND step_state.initial_tasks = 0 - ORDER BY step_state.step_slug - FOR UPDATE OF step_state -), --- ---------- Complete empty map steps ---------- -completed_empty_steps AS ( - UPDATE pgflow.step_states - SET status = 'completed', - started_at = now(), - completed_at = now(), - remaining_tasks = 0, - output = '[]'::jsonb -- Empty map produces empty array output - FROM empty_map_steps - WHERE pgflow.step_states.run_id = start_ready_steps.run_id - AND pgflow.step_states.step_slug = empty_map_steps.step_slug - RETURNING - pgflow.step_states.*, - -- Broadcast step:completed event atomically with the UPDATE - -- Using RETURNING ensures this executes during row processing - -- and cannot be optimized away by the query planner - realtime.send( - jsonb_build_object( - 'event_type', 'step:completed', - 'run_id', pgflow.step_states.run_id, - 'step_slug', pgflow.step_states.step_slug, - 'status', 'completed', - 'started_at', pgflow.step_states.started_at, - 'completed_at', pgflow.step_states.completed_at, - 'remaining_tasks', 0, - 'remaining_deps', 0, - 'output', pgflow.step_states.output -- Use stored output instead of hardcoded [] - ), - concat('step:', pgflow.step_states.step_slug, ':completed'), - concat('pgflow:run:', pgflow.step_states.run_id), - false - ) as _broadcast_completed -- Prefix with _ to indicate internal use only -), - --- ========================================== --- HANDLE NORMAL STEPS (initial_tasks > 0) +-- PHASE 1: START READY STEPS -- ========================================== +-- NOTE: Condition evaluation and empty map handling are done by +-- cascade_resolve_conditions() and cascade_complete_taskless_steps() +-- which are called before this function. +WITH -- ---------- Find ready steps ---------- -- Steps with no remaining deps and known task count ready_steps AS ( @@ -74,14 +30,8 @@ ready_steps AS ( WHERE step_state.run_id = start_ready_steps.run_id AND step_state.status = 'created' AND step_state.remaining_deps = 0 - AND step_state.initial_tasks IS NOT NULL -- NEW: Cannot start with unknown count - AND step_state.initial_tasks > 0 -- Don't start taskless steps - -- Exclude empty map steps already handled - AND NOT EXISTS ( - SELECT 1 FROM empty_map_steps - WHERE empty_map_steps.run_id = step_state.run_id - AND empty_map_steps.step_slug = step_state.step_slug - ) + AND step_state.initial_tasks IS NOT NULL -- Cannot start with unknown count + AND step_state.initial_tasks > 0 -- Don't start taskless steps (handled by cascade_complete_taskless_steps) ORDER BY step_state.step_slug FOR UPDATE ), @@ -115,7 +65,7 @@ started_step_states AS ( ), -- ========================================== --- TASK GENERATION AND QUEUE MESSAGES +-- PHASE 2: TASK GENERATION AND QUEUE MESSAGES -- ========================================== -- ---------- Generate tasks and batch messages ---------- -- Single steps: 1 task (index 0) @@ -136,8 +86,8 @@ message_batches AS ( ) AS messages, array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices FROM started_step_states AS started_step - JOIN pgflow.steps AS step - ON step.flow_slug = started_step.flow_slug + JOIN pgflow.steps AS step + ON step.flow_slug = started_step.flow_slug AND step.step_slug = started_step.step_slug -- Generate task indices from 0 to initial_tasks-1 CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index) @@ -159,7 +109,7 @@ sent_messages AS ( ) -- ========================================== --- RECORD TASKS IN DATABASE +-- PHASE 3: RECORD TASKS IN DATABASE -- ========================================== INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id) SELECT @@ -170,13 +120,5 @@ SELECT sent_messages.msg_id FROM sent_messages; --- ========================================== --- BROADCAST REALTIME EVENTS --- ========================================== --- Note: Both step:completed events for empty maps and step:started events --- are now broadcast atomically in their respective CTEs using RETURNING pattern. --- This ensures correct ordering, prevents duplicate broadcasts, and guarantees --- that events are sent for exactly the rows that were updated. - -end; +END; $$; diff --git a/pkgs/core/schemas/0120_function_start_tasks.sql b/pkgs/core/schemas/0120_function_start_tasks.sql index 9fbe69a49..627497e3a 100644 --- a/pkgs/core/schemas/0120_function_start_tasks.sql +++ b/pkgs/core/schemas/0120_function_start_tasks.sql @@ -53,7 +53,8 @@ as $$ join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug join pgflow.step_states dep_state on dep_state.run_id = st.run_id and - dep_state.step_slug = dep.dep_slug + dep_state.step_slug = dep.dep_slug and + dep_state.status = 'completed' -- Only include completed deps (not skipped) ), deps_outputs as ( select diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index 9a4fe7c7d..a89789c36 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -394,6 +394,10 @@ export type Database = { [_ in never]: never } Functions: { + _cascade_force_skip_steps: { + Args: { run_id: string; skip_reason: string; step_slug: string } + Returns: number + } _compare_flow_shapes: { Args: { p_db: Json; p_local: Json } Returns: string[] @@ -447,10 +451,7 @@ export type Database = { Args: { run_id: string } Returns: number } - cascade_skip_steps: { - Args: { run_id: string; skip_reason: string; step_slug: string } - Returns: number - } + cascade_resolve_conditions: { Args: { run_id: string }; Returns: boolean } cleanup_ensure_workers_logs: { Args: { retention_hours?: number } Returns: { diff --git a/pkgs/core/supabase/migrations/20260105074725_pgflow_temp_skip_infrastructure.sql b/pkgs/core/supabase/migrations/20260105074725_pgflow_temp_skip_infrastructure.sql deleted file mode 100644 index 178f74ac3..000000000 --- a/pkgs/core/supabase/migrations/20260105074725_pgflow_temp_skip_infrastructure.sql +++ /dev/null @@ -1,173 +0,0 @@ --- Modify "step_states" table -ALTER TABLE "pgflow"."step_states" DROP CONSTRAINT "completed_at_or_failed_at", DROP CONSTRAINT "remaining_tasks_state_consistency", ADD CONSTRAINT "remaining_tasks_state_consistency" CHECK ((remaining_tasks IS NULL) OR (status <> ALL (ARRAY['created'::text, 'skipped'::text]))), DROP CONSTRAINT "status_is_valid", ADD CONSTRAINT "status_is_valid" CHECK (status = ANY (ARRAY['created'::text, 'started'::text, 'completed'::text, 'failed'::text, 'skipped'::text])), ADD CONSTRAINT "completed_at_or_failed_at_or_skipped_at" CHECK ((( -CASE - WHEN (completed_at IS NOT NULL) THEN 1 - ELSE 0 -END + -CASE - WHEN (failed_at IS NOT NULL) THEN 1 - ELSE 0 -END) + -CASE - WHEN (skipped_at IS NOT NULL) THEN 1 - ELSE 0 -END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipped'::text) AND (skip_reason IS NOT NULL)) OR ((status <> 'skipped'::text) AND (skip_reason IS NULL))), ADD CONSTRAINT "skipped_at_is_after_created_at" CHECK ((skipped_at IS NULL) OR (skipped_at >= created_at)), ADD COLUMN "skip_reason" text NULL, ADD COLUMN "skipped_at" timestamptz NULL; --- Create index "idx_step_states_skipped" to table: "step_states" -CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text); --- Modify "steps" table -ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail'; --- Create "add_step" function -CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$ -DECLARE - result_step pgflow.steps; - next_idx int; -BEGIN - -- Validate map step constraints - -- Map steps can have either: - -- 0 dependencies (root map - maps over flow input array) - -- 1 dependency (dependent map - maps over dependency output array) - IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN - RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', - add_step.step_slug, - COALESCE(array_length(add_step.deps_slugs, 1), 0), - array_to_string(add_step.deps_slugs, ', '); - END IF; - - -- Get next step index - SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx - FROM pgflow.steps s - WHERE s.flow_slug = add_step.flow_slug; - - -- Create the step - INSERT INTO pgflow.steps ( - flow_slug, step_slug, step_type, step_index, deps_count, - opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay, - condition_pattern, when_unmet, when_failed - ) - VALUES ( - add_step.flow_slug, - add_step.step_slug, - COALESCE(add_step.step_type, 'single'), - next_idx, - COALESCE(array_length(add_step.deps_slugs, 1), 0), - add_step.max_attempts, - add_step.base_delay, - add_step.timeout, - add_step.start_delay, - add_step.condition_pattern, - add_step.when_unmet, - add_step.when_failed - ) - ON CONFLICT ON CONSTRAINT steps_pkey - DO UPDATE SET step_slug = EXCLUDED.step_slug - RETURNING * INTO result_step; - - -- Insert dependencies - INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug) - SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug - FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug) - WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0 - ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING; - - RETURN result_step; -END; -$$; --- Create "cascade_skip_steps" function -CREATE FUNCTION "pgflow"."cascade_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$ -DECLARE - v_flow_slug text; - v_total_skipped int := 0; -BEGIN - -- Get flow_slug for this run - SELECT r.flow_slug INTO v_flow_slug - FROM pgflow.runs r - WHERE r.run_id = cascade_skip_steps.run_id; - - IF v_flow_slug IS NULL THEN - RAISE EXCEPTION 'Run not found: %', cascade_skip_steps.run_id; - END IF; - - -- ========================================== - -- SKIP STEPS IN TOPOLOGICAL ORDER - -- ========================================== - -- Use recursive CTE to find all downstream dependents, - -- then skip them in topological order (by step_index) - WITH RECURSIVE - -- ---------- Find all downstream steps ---------- - downstream_steps AS ( - -- Base case: the trigger step - SELECT - s.flow_slug, - s.step_slug, - s.step_index, - cascade_skip_steps.skip_reason AS reason -- Original reason for trigger step - FROM pgflow.steps s - WHERE s.flow_slug = v_flow_slug - AND s.step_slug = cascade_skip_steps.step_slug - - UNION ALL - - -- Recursive case: steps that depend on already-found steps - SELECT - s.flow_slug, - s.step_slug, - s.step_index, - 'dependency_skipped'::text AS reason -- Downstream steps get this reason - FROM pgflow.steps s - JOIN pgflow.deps d ON d.flow_slug = s.flow_slug AND d.step_slug = s.step_slug - JOIN downstream_steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.dep_slug - ), - -- ---------- Deduplicate and order by step_index ---------- - steps_to_skip AS ( - SELECT DISTINCT ON (ds.step_slug) - ds.flow_slug, - ds.step_slug, - ds.step_index, - ds.reason - FROM downstream_steps ds - ORDER BY ds.step_slug, ds.step_index -- Keep first occurrence (trigger step has original reason) - ), - -- ---------- Skip the steps ---------- - skipped AS ( - UPDATE pgflow.step_states ss - SET status = 'skipped', - skip_reason = sts.reason, - skipped_at = now(), - remaining_tasks = NULL -- Clear remaining_tasks for skipped steps - FROM steps_to_skip sts - WHERE ss.run_id = cascade_skip_steps.run_id - AND ss.step_slug = sts.step_slug - AND ss.status IN ('created', 'started') -- Only skip non-terminal steps - RETURNING - ss.*, - -- Broadcast step:skipped event - realtime.send( - jsonb_build_object( - 'event_type', 'step:skipped', - 'run_id', ss.run_id, - 'flow_slug', ss.flow_slug, - 'step_slug', ss.step_slug, - 'status', 'skipped', - 'skip_reason', ss.skip_reason, - 'skipped_at', ss.skipped_at - ), - concat('step:', ss.step_slug, ':skipped'), - concat('pgflow:run:', ss.run_id), - false - ) as _broadcast_result - ), - -- ---------- Update run counters ---------- - run_updates AS ( - UPDATE pgflow.runs r - SET remaining_steps = r.remaining_steps - skipped_count.count - FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count - WHERE r.run_id = cascade_skip_steps.run_id - AND skipped_count.count > 0 - ) - SELECT COUNT(*) INTO v_total_skipped FROM skipped; - - RETURN v_total_skipped; -END; -$$; --- Drop "add_step" function -DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer, integer, text); diff --git a/pkgs/core/supabase/migrations/20260105121138_pgflow_condition_evaluation.sql b/pkgs/core/supabase/migrations/20260105121138_pgflow_condition_evaluation.sql new file mode 100644 index 000000000..0c3bcb0d7 --- /dev/null +++ b/pkgs/core/supabase/migrations/20260105121138_pgflow_condition_evaluation.sql @@ -0,0 +1,1203 @@ +-- Modify "step_states" table +ALTER TABLE "pgflow"."step_states" DROP CONSTRAINT "completed_at_or_failed_at", DROP CONSTRAINT "remaining_tasks_state_consistency", ADD CONSTRAINT "remaining_tasks_state_consistency" CHECK ((remaining_tasks IS NULL) OR (status <> ALL (ARRAY['created'::text, 'skipped'::text]))), DROP CONSTRAINT "status_is_valid", ADD CONSTRAINT "status_is_valid" CHECK (status = ANY (ARRAY['created'::text, 'started'::text, 'completed'::text, 'failed'::text, 'skipped'::text])), ADD CONSTRAINT "completed_at_or_failed_at_or_skipped_at" CHECK ((( +CASE + WHEN (completed_at IS NOT NULL) THEN 1 + ELSE 0 +END + +CASE + WHEN (failed_at IS NOT NULL) THEN 1 + ELSE 0 +END) + +CASE + WHEN (skipped_at IS NOT NULL) THEN 1 + ELSE 0 +END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipped'::text) AND (skip_reason IS NOT NULL)) OR ((status <> 'skipped'::text) AND (skip_reason IS NULL))), ADD CONSTRAINT "skipped_at_is_after_created_at" CHECK ((skipped_at IS NULL) OR (skipped_at >= created_at)), ADD COLUMN "skip_reason" text NULL, ADD COLUMN "skipped_at" timestamptz NULL; +-- Create index "idx_step_states_skipped" to table: "step_states" +CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text); +-- Modify "steps" table +ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail'; +-- Create "cascade_skip_steps" function +CREATE FUNCTION "pgflow"."cascade_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$ +DECLARE + v_flow_slug text; + v_total_skipped int := 0; +BEGIN + -- Get flow_slug for this run + SELECT r.flow_slug INTO v_flow_slug + FROM pgflow.runs r + WHERE r.run_id = cascade_skip_steps.run_id; + + IF v_flow_slug IS NULL THEN + RAISE EXCEPTION 'Run not found: %', cascade_skip_steps.run_id; + END IF; + + -- ========================================== + -- SKIP STEPS IN TOPOLOGICAL ORDER + -- ========================================== + -- Use recursive CTE to find all downstream dependents, + -- then skip them in topological order (by step_index) + WITH RECURSIVE + -- ---------- Find all downstream steps ---------- + downstream_steps AS ( + -- Base case: the trigger step + SELECT + s.flow_slug, + s.step_slug, + s.step_index, + cascade_skip_steps.skip_reason AS reason -- Original reason for trigger step + FROM pgflow.steps s + WHERE s.flow_slug = v_flow_slug + AND s.step_slug = cascade_skip_steps.step_slug + + UNION ALL + + -- Recursive case: steps that depend on already-found steps + SELECT + s.flow_slug, + s.step_slug, + s.step_index, + 'dependency_skipped'::text AS reason -- Downstream steps get this reason + FROM pgflow.steps s + JOIN pgflow.deps d ON d.flow_slug = s.flow_slug AND d.step_slug = s.step_slug + JOIN downstream_steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.dep_slug + ), + -- ---------- Deduplicate and order by step_index ---------- + steps_to_skip AS ( + SELECT DISTINCT ON (ds.step_slug) + ds.flow_slug, + ds.step_slug, + ds.step_index, + ds.reason + FROM downstream_steps ds + ORDER BY ds.step_slug, ds.step_index -- Keep first occurrence (trigger step has original reason) + ), + -- ---------- Skip the steps ---------- + skipped AS ( + UPDATE pgflow.step_states ss + SET status = 'skipped', + skip_reason = sts.reason, + skipped_at = now(), + remaining_tasks = NULL -- Clear remaining_tasks for skipped steps + FROM steps_to_skip sts + WHERE ss.run_id = cascade_skip_steps.run_id + AND ss.step_slug = sts.step_slug + AND ss.status IN ('created', 'started') -- Only skip non-terminal steps + RETURNING + ss.*, + -- Broadcast step:skipped event + realtime.send( + jsonb_build_object( + 'event_type', 'step:skipped', + 'run_id', ss.run_id, + 'flow_slug', ss.flow_slug, + 'step_slug', ss.step_slug, + 'status', 'skipped', + 'skip_reason', ss.skip_reason, + 'skipped_at', ss.skipped_at + ), + concat('step:', ss.step_slug, ':skipped'), + concat('pgflow:run:', ss.run_id), + false + ) as _broadcast_result + ), + -- ---------- Update run counters ---------- + run_updates AS ( + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - skipped_count.count + FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count + WHERE r.run_id = cascade_skip_steps.run_id + AND skipped_count.count > 0 + ) + SELECT COUNT(*) INTO v_total_skipped FROM skipped; + + RETURN v_total_skipped; +END; +$$; +-- Create "cascade_resolve_conditions" function +CREATE FUNCTION "pgflow"."cascade_resolve_conditions" ("run_id" uuid) RETURNS boolean LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + v_run_input jsonb; + v_run_status text; + v_first_fail record; + v_iteration_count int := 0; + v_max_iterations int := 50; + v_processed_count int; +BEGIN + -- ========================================== + -- GUARD: Early return if run is already terminal + -- ========================================== + SELECT r.status, r.input INTO v_run_status, v_run_input + FROM pgflow.runs r + WHERE r.run_id = cascade_resolve_conditions.run_id; + + IF v_run_status IN ('failed', 'completed') THEN + RETURN v_run_status != 'failed'; + END IF; + + -- ========================================== + -- ITERATE UNTIL CONVERGENCE + -- ========================================== + -- After skipping steps, dependents may become ready and need evaluation. + -- Loop until no more steps are processed. + LOOP + v_iteration_count := v_iteration_count + 1; + IF v_iteration_count > v_max_iterations THEN + RAISE EXCEPTION 'cascade_resolve_conditions exceeded safety limit of % iterations', v_max_iterations; + END IF; + + v_processed_count := 0; + + -- ========================================== + -- PHASE 1a: CHECK FOR FAIL CONDITIONS + -- ========================================== + -- Find first step (by topological order) with unmet condition and 'fail' mode. + WITH steps_with_conditions AS ( + SELECT + step_state.flow_slug, + step_state.step_slug, + step.condition_pattern, + step.when_unmet, + step.deps_count, + step.step_index + FROM pgflow.step_states AS step_state + JOIN pgflow.steps AS step + ON step.flow_slug = step_state.flow_slug + AND step.step_slug = step_state.step_slug + WHERE step_state.run_id = cascade_resolve_conditions.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step.condition_pattern IS NOT NULL + ), + step_deps_output AS ( + SELECT + swc.step_slug, + jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output + FROM steps_with_conditions swc + JOIN pgflow.deps dep ON dep.flow_slug = swc.flow_slug AND dep.step_slug = swc.step_slug + JOIN pgflow.step_states dep_state + ON dep_state.run_id = cascade_resolve_conditions.run_id + AND dep_state.step_slug = dep.dep_slug + AND dep_state.status = 'completed' -- Only completed deps (not skipped) + WHERE swc.deps_count > 0 + GROUP BY swc.step_slug + ), + condition_evaluations AS ( + SELECT + swc.*, + CASE + WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern + ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern + END AS condition_met + FROM steps_with_conditions swc + LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug + ) + SELECT flow_slug, step_slug, condition_pattern + INTO v_first_fail + FROM condition_evaluations + WHERE NOT condition_met AND when_unmet = 'fail' + ORDER BY step_index + LIMIT 1; + + -- Handle fail mode: fail step and run, return false + IF v_first_fail IS NOT NULL THEN + UPDATE pgflow.step_states + SET status = 'failed', + failed_at = now(), + error_message = 'Condition not met: ' || v_first_fail.condition_pattern::text + WHERE pgflow.step_states.run_id = cascade_resolve_conditions.run_id + AND pgflow.step_states.step_slug = v_first_fail.step_slug; + + UPDATE pgflow.runs + SET status = 'failed', + failed_at = now() + WHERE pgflow.runs.run_id = cascade_resolve_conditions.run_id; + + RETURN false; + END IF; + + -- ========================================== + -- PHASE 1b: HANDLE SKIP CONDITIONS (with propagation) + -- ========================================== + -- Skip steps with unmet conditions and whenUnmet='skip'. + -- NEW: Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents. + WITH steps_with_conditions AS ( + SELECT + step_state.flow_slug, + step_state.step_slug, + step.condition_pattern, + step.when_unmet, + step.deps_count, + step.step_index + FROM pgflow.step_states AS step_state + JOIN pgflow.steps AS step + ON step.flow_slug = step_state.flow_slug + AND step.step_slug = step_state.step_slug + WHERE step_state.run_id = cascade_resolve_conditions.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step.condition_pattern IS NOT NULL + ), + step_deps_output AS ( + SELECT + swc.step_slug, + jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output + FROM steps_with_conditions swc + JOIN pgflow.deps dep ON dep.flow_slug = swc.flow_slug AND dep.step_slug = swc.step_slug + JOIN pgflow.step_states dep_state + ON dep_state.run_id = cascade_resolve_conditions.run_id + AND dep_state.step_slug = dep.dep_slug + AND dep_state.status = 'completed' -- Only completed deps (not skipped) + WHERE swc.deps_count > 0 + GROUP BY swc.step_slug + ), + condition_evaluations AS ( + SELECT + swc.*, + CASE + WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern + ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern + END AS condition_met + FROM steps_with_conditions swc + LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug + ), + unmet_skip_steps AS ( + SELECT * FROM condition_evaluations + WHERE NOT condition_met AND when_unmet = 'skip' + ), + skipped_steps AS ( + UPDATE pgflow.step_states ss + SET status = 'skipped', + skip_reason = 'condition_unmet', + skipped_at = now() + FROM unmet_skip_steps uss + WHERE ss.run_id = cascade_resolve_conditions.run_id + AND ss.step_slug = uss.step_slug + RETURNING + ss.*, + realtime.send( + jsonb_build_object( + 'event_type', 'step:skipped', + 'run_id', ss.run_id, + 'flow_slug', ss.flow_slug, + 'step_slug', ss.step_slug, + 'status', 'skipped', + 'skip_reason', 'condition_unmet', + 'skipped_at', ss.skipped_at + ), + concat('step:', ss.step_slug, ':skipped'), + concat('pgflow:run:', ss.run_id), + false + ) AS _broadcast_result + ), + -- NEW: Update dependent steps (decrement remaining_deps, set initial_tasks=0 for maps) + dependent_updates AS ( + UPDATE pgflow.step_states child_state + SET remaining_deps = child_state.remaining_deps - 1, + -- If child is a map step and this skipped step is its only dependency, + -- set initial_tasks = 0 (skipped dep = empty array) + initial_tasks = CASE + WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0 + ELSE child_state.initial_tasks + END + FROM skipped_steps parent + JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug + JOIN pgflow.steps child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug + WHERE child_state.run_id = cascade_resolve_conditions.run_id + AND child_state.step_slug = dep.step_slug + ), + run_update AS ( + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - (SELECT COUNT(*) FROM skipped_steps) + WHERE r.run_id = cascade_resolve_conditions.run_id + AND (SELECT COUNT(*) FROM skipped_steps) > 0 + ) + SELECT COUNT(*)::int INTO v_processed_count FROM skipped_steps; + + -- ========================================== + -- PHASE 1c: HANDLE SKIP-CASCADE CONDITIONS + -- ========================================== + -- Call cascade_skip_steps for each step with unmet condition and whenUnmet='skip-cascade'. + -- Process in topological order; cascade_skip_steps is idempotent. + PERFORM pgflow.cascade_skip_steps(cascade_resolve_conditions.run_id, ready_step.step_slug, 'condition_unmet') + FROM pgflow.step_states AS ready_step + JOIN pgflow.steps AS step + ON step.flow_slug = ready_step.flow_slug + AND step.step_slug = ready_step.step_slug + LEFT JOIN LATERAL ( + SELECT jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output + FROM pgflow.deps dep + JOIN pgflow.step_states dep_state + ON dep_state.run_id = cascade_resolve_conditions.run_id + AND dep_state.step_slug = dep.dep_slug + AND dep_state.status = 'completed' -- Only completed deps (not skipped) + WHERE dep.flow_slug = ready_step.flow_slug + AND dep.step_slug = ready_step.step_slug + ) AS agg_deps ON step.deps_count > 0 + WHERE ready_step.run_id = cascade_resolve_conditions.run_id + AND ready_step.status = 'created' + AND ready_step.remaining_deps = 0 + AND step.condition_pattern IS NOT NULL + AND step.when_unmet = 'skip-cascade' + AND NOT ( + CASE + WHEN step.deps_count = 0 THEN v_run_input @> step.condition_pattern + ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) @> step.condition_pattern + END + ) + ORDER BY step.step_index; + + -- Check if run was failed during cascade (e.g., if cascade_skip_steps triggers fail) + SELECT r.status INTO v_run_status + FROM pgflow.runs r + WHERE r.run_id = cascade_resolve_conditions.run_id; + + IF v_run_status IN ('failed', 'completed') THEN + RETURN v_run_status != 'failed'; + END IF; + + -- Exit loop if no steps were processed in this iteration + EXIT WHEN v_processed_count = 0; + END LOOP; + + RETURN true; +END; +$$; +-- Modify "start_ready_steps" function +CREATE OR REPLACE FUNCTION "pgflow"."start_ready_steps" ("run_id" uuid) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ +BEGIN +-- ========================================== +-- GUARD: No mutations on terminal runs +-- ========================================== +IF EXISTS ( + SELECT 1 FROM pgflow.runs + WHERE pgflow.runs.run_id = start_ready_steps.run_id + AND pgflow.runs.status IN ('failed', 'completed') +) THEN + RETURN; +END IF; + +-- ========================================== +-- PHASE 1: START READY STEPS +-- ========================================== +-- NOTE: Condition evaluation and empty map handling are done by +-- cascade_resolve_conditions() and cascade_complete_taskless_steps() +-- which are called before this function. +WITH +-- ---------- Find ready steps ---------- +-- Steps with no remaining deps and known task count +ready_steps AS ( + SELECT * + FROM pgflow.step_states AS step_state + WHERE step_state.run_id = start_ready_steps.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step_state.initial_tasks IS NOT NULL -- Cannot start with unknown count + AND step_state.initial_tasks > 0 -- Don't start taskless steps (handled by cascade_complete_taskless_steps) + ORDER BY step_state.step_slug + FOR UPDATE +), +-- ---------- Mark steps as started ---------- +started_step_states AS ( + UPDATE pgflow.step_states + SET status = 'started', + started_at = now(), + remaining_tasks = ready_steps.initial_tasks -- Copy initial_tasks to remaining_tasks when starting + FROM ready_steps + WHERE pgflow.step_states.run_id = start_ready_steps.run_id + AND pgflow.step_states.step_slug = ready_steps.step_slug + RETURNING pgflow.step_states.*, + -- Broadcast step:started event atomically with the UPDATE + -- Using RETURNING ensures this executes during row processing + -- and cannot be optimized away by the query planner + realtime.send( + jsonb_build_object( + 'event_type', 'step:started', + 'run_id', pgflow.step_states.run_id, + 'step_slug', pgflow.step_states.step_slug, + 'status', 'started', + 'started_at', pgflow.step_states.started_at, + 'remaining_tasks', pgflow.step_states.remaining_tasks, + 'remaining_deps', pgflow.step_states.remaining_deps + ), + concat('step:', pgflow.step_states.step_slug, ':started'), + concat('pgflow:run:', pgflow.step_states.run_id), + false + ) as _broadcast_result -- Prefix with _ to indicate internal use only +), + +-- ========================================== +-- PHASE 2: TASK GENERATION AND QUEUE MESSAGES +-- ========================================== +-- ---------- Generate tasks and batch messages ---------- +-- Single steps: 1 task (index 0) +-- Map steps: N tasks (indices 0..N-1) +message_batches AS ( + SELECT + started_step.flow_slug, + started_step.run_id, + started_step.step_slug, + COALESCE(step.opt_start_delay, 0) as delay, + array_agg( + jsonb_build_object( + 'flow_slug', started_step.flow_slug, + 'run_id', started_step.run_id, + 'step_slug', started_step.step_slug, + 'task_index', task_idx.task_index + ) ORDER BY task_idx.task_index + ) AS messages, + array_agg(task_idx.task_index ORDER BY task_idx.task_index) AS task_indices + FROM started_step_states AS started_step + JOIN pgflow.steps AS step + ON step.flow_slug = started_step.flow_slug + AND step.step_slug = started_step.step_slug + -- Generate task indices from 0 to initial_tasks-1 + CROSS JOIN LATERAL generate_series(0, started_step.initial_tasks - 1) AS task_idx(task_index) + GROUP BY started_step.flow_slug, started_step.run_id, started_step.step_slug, step.opt_start_delay +), +-- ---------- Send messages to queue ---------- +-- Uses batch sending for performance with large arrays +sent_messages AS ( + SELECT + mb.flow_slug, + mb.run_id, + mb.step_slug, + task_indices.task_index, + msg_ids.msg_id + FROM message_batches mb + CROSS JOIN LATERAL unnest(mb.task_indices) WITH ORDINALITY AS task_indices(task_index, idx_ord) + CROSS JOIN LATERAL pgmq.send_batch(mb.flow_slug, mb.messages, mb.delay) WITH ORDINALITY AS msg_ids(msg_id, msg_ord) + WHERE task_indices.idx_ord = msg_ids.msg_ord +) + +-- ========================================== +-- PHASE 3: RECORD TASKS IN DATABASE +-- ========================================== +INSERT INTO pgflow.step_tasks (flow_slug, run_id, step_slug, task_index, message_id) +SELECT + sent_messages.flow_slug, + sent_messages.run_id, + sent_messages.step_slug, + sent_messages.task_index, + sent_messages.msg_id +FROM sent_messages; + +END; +$$; +-- Modify "complete_task" function +CREATE OR REPLACE FUNCTION "pgflow"."complete_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "output" jsonb) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_step_state pgflow.step_states%ROWTYPE; + v_dependent_map_slug text; + v_run_record pgflow.runs%ROWTYPE; + v_step_record pgflow.step_states%ROWTYPE; +begin + +-- ========================================== +-- GUARD: No mutations on failed runs +-- ========================================== +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = complete_task.run_id AND pgflow.runs.status = 'failed') THEN + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + RETURN; +END IF; + +-- ========================================== +-- LOCK ACQUISITION AND TYPE VALIDATION +-- ========================================== +-- Acquire locks first to prevent race conditions +SELECT * INTO v_run_record FROM pgflow.runs +WHERE pgflow.runs.run_id = complete_task.run_id +FOR UPDATE; + +SELECT * INTO v_step_record FROM pgflow.step_states +WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug +FOR UPDATE; + +-- Check for type violations AFTER acquiring locks +SELECT child_step.step_slug INTO v_dependent_map_slug +FROM pgflow.deps dependency +JOIN pgflow.steps child_step ON child_step.flow_slug = dependency.flow_slug + AND child_step.step_slug = dependency.step_slug +JOIN pgflow.steps parent_step ON parent_step.flow_slug = dependency.flow_slug + AND parent_step.step_slug = dependency.dep_slug +JOIN pgflow.step_states child_state ON child_state.flow_slug = child_step.flow_slug + AND child_state.step_slug = child_step.step_slug +WHERE dependency.dep_slug = complete_task.step_slug -- parent is the completing step + AND dependency.flow_slug = v_run_record.flow_slug + AND parent_step.step_type = 'single' -- Only validate single steps + AND child_step.step_type = 'map' + AND child_state.run_id = complete_task.run_id + AND child_state.initial_tasks IS NULL + AND (complete_task.output IS NULL OR jsonb_typeof(complete_task.output) != 'array') +LIMIT 1; + +-- Handle type violation if detected +IF v_dependent_map_slug IS NOT NULL THEN + -- Mark run as failed immediately + UPDATE pgflow.runs + SET status = 'failed', + failed_at = now() + WHERE pgflow.runs.run_id = complete_task.run_id; + + -- Broadcast run:failed event + -- Uses PERFORM pattern to ensure execution (proven reliable pattern in this function) + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:failed', + 'run_id', complete_task.run_id, + 'flow_slug', v_run_record.flow_slug, + 'status', 'failed', + 'failed_at', now() + ), + 'run:failed', + concat('pgflow:run:', complete_task.run_id), + false + ); + + -- Archive all active messages (both queued and started) to prevent orphaned messages + PERFORM pgmq.archive( + v_run_record.flow_slug, + array_agg(st.message_id) + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + HAVING count(*) > 0; -- Only call archive if there are messages to archive + + -- Mark current task as failed and store the output + UPDATE pgflow.step_tasks + SET status = 'failed', + failed_at = now(), + output = complete_task.output, -- Store the output that caused the violation + error_message = '[TYPE_VIOLATION] Produced ' || + CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END || + ' instead of array' + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + + -- Mark step state as failed + UPDATE pgflow.step_states + SET status = 'failed', + failed_at = now(), + error_message = '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug || + ' expects array input but dependency ' || complete_task.step_slug || + ' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug; + + -- Broadcast step:failed event + -- Uses PERFORM pattern to ensure execution (proven reliable pattern in this function) + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', complete_task.run_id, + 'step_slug', complete_task.step_slug, + 'status', 'failed', + 'error_message', '[TYPE_VIOLATION] Map step ' || v_dependent_map_slug || + ' expects array input but dependency ' || complete_task.step_slug || + ' produced ' || CASE WHEN complete_task.output IS NULL THEN 'null' + ELSE jsonb_typeof(complete_task.output) END, + 'failed_at', now() + ), + concat('step:', complete_task.step_slug, ':failed'), + concat('pgflow:run:', complete_task.run_id), + false + ); + + -- Archive the current task's message (it was started, now failed) + PERFORM pgmq.archive( + v_run_record.flow_slug, + st.message_id -- Single message, use scalar form + ) + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.message_id IS NOT NULL; + + -- Return empty result + RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false; + RETURN; +END IF; + +-- ========================================== +-- MAIN CTE CHAIN: Update task and propagate changes +-- ========================================== +WITH +-- ---------- Task completion ---------- +-- Update the task record with completion status and output +task AS ( + UPDATE pgflow.step_tasks + SET + status = 'completed', + completed_at = now(), + output = complete_task.output + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index + AND pgflow.step_tasks.status = 'started' + RETURNING * +), +-- ---------- Get step type for output handling ---------- +step_def AS ( + SELECT step.step_type + FROM pgflow.steps step + JOIN pgflow.runs run ON run.flow_slug = step.flow_slug + WHERE run.run_id = complete_task.run_id + AND step.step_slug = complete_task.step_slug +), +-- ---------- Step state update ---------- +-- Decrement remaining_tasks and potentially mark step as completed +-- Also store output atomically with status transition to completed +step_state AS ( + UPDATE pgflow.step_states + SET + status = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN 'completed' -- Will be 0 after decrement + ELSE 'started' + END, + completed_at = CASE + WHEN pgflow.step_states.remaining_tasks = 1 THEN now() -- Will be 0 after decrement + ELSE NULL + END, + remaining_tasks = pgflow.step_states.remaining_tasks - 1, + -- Store output atomically with completion (only when remaining_tasks = 1, meaning step completes) + output = CASE + -- Single step: store task output directly when completing + WHEN (SELECT step_type FROM step_def) = 'single' AND pgflow.step_states.remaining_tasks = 1 THEN + complete_task.output + -- Map step: aggregate on completion (ordered by task_index) + WHEN (SELECT step_type FROM step_def) = 'map' AND pgflow.step_states.remaining_tasks = 1 THEN + (SELECT COALESCE(jsonb_agg(all_outputs.output ORDER BY all_outputs.task_index), '[]'::jsonb) + FROM ( + -- All previously completed tasks + SELECT st.output, st.task_index + FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.status = 'completed' + UNION ALL + -- Current task being completed (not yet visible as completed in snapshot) + SELECT complete_task.output, complete_task.task_index + ) all_outputs) + ELSE pgflow.step_states.output + END + FROM task + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug = complete_task.step_slug + RETURNING pgflow.step_states.* +), +-- ---------- Dependency resolution ---------- +-- Find all child steps that depend on the completed parent step (only if parent completed) +child_steps AS ( + SELECT deps.step_slug AS child_step_slug + FROM pgflow.deps deps + JOIN step_state parent_state ON parent_state.status = 'completed' AND deps.flow_slug = parent_state.flow_slug + WHERE deps.dep_slug = complete_task.step_slug -- dep_slug is the parent, step_slug is the child + ORDER BY deps.step_slug -- Ensure consistent ordering +), +-- ---------- Lock child steps ---------- +-- Acquire locks on all child steps before updating them +child_steps_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = complete_task.run_id + AND pgflow.step_states.step_slug IN (SELECT child_step_slug FROM child_steps) + FOR UPDATE +), +-- ---------- Update child steps ---------- +-- Decrement remaining_deps and resolve NULL initial_tasks for map steps +child_steps_update AS ( + UPDATE pgflow.step_states child_state + SET remaining_deps = child_state.remaining_deps - 1, + -- Resolve NULL initial_tasks for child map steps + -- This is where child maps learn their array size from the parent + -- This CTE only runs when the parent step is complete (see child_steps JOIN) + initial_tasks = CASE + WHEN child_step.step_type = 'map' AND child_state.initial_tasks IS NULL THEN + CASE + WHEN parent_step.step_type = 'map' THEN + -- Map->map: Count all completed tasks from parent map + -- We add 1 because the current task is being completed in this transaction + -- but isn't yet visible as 'completed' in the step_tasks table + -- TODO: Refactor to use future column step_states.total_tasks + -- Would eliminate the COUNT query and just use parent_state.total_tasks + (SELECT COUNT(*)::int + 1 + FROM pgflow.step_tasks parent_tasks + WHERE parent_tasks.run_id = complete_task.run_id + AND parent_tasks.step_slug = complete_task.step_slug + AND parent_tasks.status = 'completed' + AND parent_tasks.task_index != complete_task.task_index) + ELSE + -- Single->map: Use output array length (single steps complete immediately) + CASE + WHEN complete_task.output IS NOT NULL + AND jsonb_typeof(complete_task.output) = 'array' THEN + jsonb_array_length(complete_task.output) + ELSE NULL -- Keep NULL if not an array + END + END + ELSE child_state.initial_tasks -- Keep existing value (including NULL) + END + FROM child_steps children + JOIN pgflow.steps child_step ON child_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND child_step.step_slug = children.child_step_slug + JOIN pgflow.steps parent_step ON parent_step.flow_slug = (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id) + AND parent_step.step_slug = complete_task.step_slug + WHERE child_state.run_id = complete_task.run_id + AND child_state.step_slug = children.child_step_slug +) +-- ---------- Update run remaining_steps ---------- +-- Decrement the run's remaining_steps counter if step completed +UPDATE pgflow.runs +SET remaining_steps = pgflow.runs.remaining_steps - 1 +FROM step_state +WHERE pgflow.runs.run_id = complete_task.run_id + AND step_state.status = 'completed'; + +-- ========================================== +-- POST-COMPLETION ACTIONS +-- ========================================== + +-- ---------- Get updated state for broadcasting ---------- +SELECT * INTO v_step_state FROM pgflow.step_states +WHERE pgflow.step_states.run_id = complete_task.run_id AND pgflow.step_states.step_slug = complete_task.step_slug; + +-- ---------- Handle step completion ---------- +IF v_step_state.status = 'completed' THEN + -- Broadcast step:completed event FIRST (before cascade) + -- This ensures parent broadcasts before its dependent children + -- Use stored output from step_states (set atomically during status transition) + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:completed', + 'run_id', complete_task.run_id, + 'step_slug', complete_task.step_slug, + 'status', 'completed', + 'output', v_step_state.output, -- Use stored output instead of re-aggregating + 'completed_at', v_step_state.completed_at + ), + concat('step:', complete_task.step_slug, ':completed'), + concat('pgflow:run:', complete_task.run_id), + false + ); + + -- THEN evaluate conditions on newly-ready dependent steps + -- This must happen before cascade_complete_taskless_steps so that + -- skipped steps can set initial_tasks=0 for their map dependents + IF NOT pgflow.cascade_resolve_conditions(complete_task.run_id) THEN + -- Run was failed due to a condition with when_unmet='fail' + -- Archive the current task's message before returning + PERFORM pgmq.archive( + (SELECT r.flow_slug FROM pgflow.runs r WHERE r.run_id = complete_task.run_id), + (SELECT st.message_id FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index) + ); + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = complete_task.run_id + AND pgflow.step_tasks.step_slug = complete_task.step_slug + AND pgflow.step_tasks.task_index = complete_task.task_index; + RETURN; + END IF; + + -- THEN cascade complete any taskless steps that are now ready + -- This ensures dependent children broadcast AFTER their parent + PERFORM pgflow.cascade_complete_taskless_steps(complete_task.run_id); +END IF; + +-- ---------- Archive completed task message ---------- +-- Move message from active queue to archive table +PERFORM ( + WITH completed_tasks AS ( + SELECT r.flow_slug, st.message_id + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index + AND st.status = 'completed' + ) + SELECT pgmq.archive(ct.flow_slug, ct.message_id) + FROM completed_tasks ct + WHERE EXISTS (SELECT 1 FROM completed_tasks) +); + +-- ---------- Trigger next steps ---------- +-- Start any steps that are now ready (deps satisfied) +PERFORM pgflow.start_ready_steps(complete_task.run_id); + +-- Check if the entire run is complete +PERFORM pgflow.maybe_complete_run(complete_task.run_id); + +-- ---------- Return completed task ---------- +RETURN QUERY SELECT * +FROM pgflow.step_tasks AS step_task +WHERE step_task.run_id = complete_task.run_id + AND step_task.step_slug = complete_task.step_slug + AND step_task.task_index = complete_task.task_index; + +end; +$$; +-- Modify "start_flow" function +CREATE OR REPLACE FUNCTION "pgflow"."start_flow" ("flow_slug" text, "input" jsonb, "run_id" uuid DEFAULT NULL::uuid) RETURNS SETOF "pgflow"."runs" LANGUAGE plpgsql SET "search_path" = '' AS $$ +declare + v_created_run pgflow.runs%ROWTYPE; + v_root_map_count int; +begin + +-- ========================================== +-- VALIDATION: Root map array input +-- ========================================== +WITH root_maps AS ( + SELECT step_slug + FROM pgflow.steps + WHERE steps.flow_slug = start_flow.flow_slug + AND steps.step_type = 'map' + AND steps.deps_count = 0 +) +SELECT COUNT(*) INTO v_root_map_count FROM root_maps; + +-- If we have root map steps, validate that input is an array +IF v_root_map_count > 0 THEN + -- First check for NULL (should be caught by NOT NULL constraint, but be defensive) + IF start_flow.input IS NULL THEN + RAISE EXCEPTION 'Flow % has root map steps but input is NULL', start_flow.flow_slug; + END IF; + + -- Then check if it's not an array + IF jsonb_typeof(start_flow.input) != 'array' THEN + RAISE EXCEPTION 'Flow % has root map steps but input is not an array (got %)', + start_flow.flow_slug, jsonb_typeof(start_flow.input); + END IF; +END IF; + +-- ========================================== +-- MAIN CTE CHAIN: Create run and step states +-- ========================================== +WITH + -- ---------- Gather flow metadata ---------- + flow_steps AS ( + SELECT steps.flow_slug, steps.step_slug, steps.step_type, steps.deps_count + FROM pgflow.steps + WHERE steps.flow_slug = start_flow.flow_slug + ), + -- ---------- Create run record ---------- + created_run AS ( + INSERT INTO pgflow.runs (run_id, flow_slug, input, remaining_steps) + VALUES ( + COALESCE(start_flow.run_id, gen_random_uuid()), + start_flow.flow_slug, + start_flow.input, + (SELECT count(*) FROM flow_steps) + ) + RETURNING * + ), + -- ---------- Create step states ---------- + -- Sets initial_tasks: known for root maps, NULL for dependent maps + created_step_states AS ( + INSERT INTO pgflow.step_states (flow_slug, run_id, step_slug, remaining_deps, initial_tasks) + SELECT + fs.flow_slug, + (SELECT created_run.run_id FROM created_run), + fs.step_slug, + fs.deps_count, + -- Updated logic for initial_tasks: + CASE + WHEN fs.step_type = 'map' AND fs.deps_count = 0 THEN + -- Root map: get array length from input + CASE + WHEN jsonb_typeof(start_flow.input) = 'array' THEN + jsonb_array_length(start_flow.input) + ELSE + 1 + END + WHEN fs.step_type = 'map' AND fs.deps_count > 0 THEN + -- Dependent map: unknown until dependencies complete + NULL + ELSE + -- Single steps: always 1 task + 1 + END + FROM flow_steps fs + ) +SELECT * FROM created_run INTO v_created_run; + +-- ========================================== +-- POST-CREATION ACTIONS +-- ========================================== + +-- ---------- Broadcast run:started event ---------- +PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:started', + 'run_id', v_created_run.run_id, + 'flow_slug', v_created_run.flow_slug, + 'input', v_created_run.input, + 'status', 'started', + 'remaining_steps', v_created_run.remaining_steps, + 'started_at', v_created_run.started_at + ), + 'run:started', + concat('pgflow:run:', v_created_run.run_id), + false +); + +-- ---------- Evaluate conditions on ready steps ---------- +-- Skip steps with unmet conditions, propagate to dependents +IF NOT pgflow.cascade_resolve_conditions(v_created_run.run_id) THEN + -- Run was failed due to a condition with when_unmet='fail' + RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id; + RETURN; +END IF; + +-- ---------- Complete taskless steps ---------- +-- Handle empty array maps that should auto-complete +PERFORM pgflow.cascade_complete_taskless_steps(v_created_run.run_id); + +-- ---------- Start initial steps ---------- +-- Start root steps (those with no dependencies) +PERFORM pgflow.start_ready_steps(v_created_run.run_id); + +-- ---------- Check for run completion ---------- +-- If cascade completed all steps (zero-task flows), finalize the run +PERFORM pgflow.maybe_complete_run(v_created_run.run_id); + +RETURN QUERY SELECT * FROM pgflow.runs where pgflow.runs.run_id = v_created_run.run_id; + +end; +$$; +-- Modify "start_tasks" function +CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$ +with tasks as ( + select + task.flow_slug, + task.run_id, + task.step_slug, + task.task_index, + task.message_id + from pgflow.step_tasks as task + join pgflow.runs r on r.run_id = task.run_id + where task.flow_slug = start_tasks.flow_slug + and task.message_id = any(msg_ids) + and task.status = 'queued' + -- MVP: Don't start tasks on failed runs + and r.status != 'failed' + ), + start_tasks_update as ( + update pgflow.step_tasks + set + attempts_count = attempts_count + 1, + status = 'started', + started_at = now(), + last_worker_id = worker_id + from tasks + where step_tasks.message_id = tasks.message_id + and step_tasks.flow_slug = tasks.flow_slug + and step_tasks.status = 'queued' + ), + runs as ( + select + r.run_id, + r.input + from pgflow.runs r + where r.run_id in (select run_id from tasks) + ), + deps as ( + select + st.run_id, + st.step_slug, + dep.dep_slug, + -- Read output directly from step_states (already aggregated by writers) + dep_state.output as dep_output + from tasks st + join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug + join pgflow.step_states dep_state on + dep_state.run_id = st.run_id and + dep_state.step_slug = dep.dep_slug and + dep_state.status = 'completed' -- Only include completed deps (not skipped) + ), + deps_outputs as ( + select + d.run_id, + d.step_slug, + jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output, + count(*) as dep_count + from deps d + group by d.run_id, d.step_slug + ), + timeouts as ( + select + task.message_id, + task.flow_slug, + coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay + from tasks task + join pgflow.flows flow on flow.flow_slug = task.flow_slug + join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug + ), + -- Batch update visibility timeouts for all messages + set_vt_batch as ( + select pgflow.set_vt_batch( + start_tasks.flow_slug, + array_agg(t.message_id order by t.message_id), + array_agg(t.vt_delay order by t.message_id) + ) + from timeouts t + ) + select + st.flow_slug, + st.run_id, + st.step_slug, + -- ========================================== + -- INPUT CONSTRUCTION LOGIC + -- ========================================== + -- This nested CASE statement determines how to construct the input + -- for each task based on the step type (map vs non-map). + -- + -- The fundamental difference: + -- - Map steps: Receive RAW array elements (e.g., just 42 or "hello") + -- - Non-map steps: Receive structured objects with named keys + -- (e.g., {"run": {...}, "dependency1": {...}}) + -- ========================================== + CASE + -- -------------------- MAP STEPS -------------------- + -- Map steps process arrays element-by-element. + -- Each task receives ONE element from the array at its task_index position. + WHEN step.step_type = 'map' THEN + -- Map steps get raw array elements without any wrapper object + CASE + -- ROOT MAP: Gets array from run input + -- Example: run input = [1, 2, 3] + -- task 0 gets: 1 + -- task 1 gets: 2 + -- task 2 gets: 3 + WHEN step.deps_count = 0 THEN + -- Root map (deps_count = 0): no dependencies, reads from run input. + -- Extract the element at task_index from the run's input array. + -- Note: If run input is not an array, this will return NULL + -- and the flow will fail (validated in start_flow). + jsonb_array_element(r.input, st.task_index) + + -- DEPENDENT MAP: Gets array from its single dependency + -- Example: dependency output = ["a", "b", "c"] + -- task 0 gets: "a" + -- task 1 gets: "b" + -- task 2 gets: "c" + ELSE + -- Has dependencies (should be exactly 1 for map steps). + -- Extract the element at task_index from the dependency's output array. + -- + -- Why the subquery with jsonb_each? + -- - The dependency outputs a raw array: [1, 2, 3] + -- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]} + -- - We need to unwrap and get just the array value + -- - Map steps have exactly 1 dependency (enforced by add_step) + -- - So jsonb_each will return exactly 1 row + -- - We extract the 'value' which is the raw array [1, 2, 3] + -- - Then get the element at task_index from that array + (SELECT jsonb_array_element(value, st.task_index) + FROM jsonb_each(dep_out.deps_output) + LIMIT 1) + END + + -- -------------------- NON-MAP STEPS -------------------- + -- Regular (non-map) steps receive dependency outputs as a structured object. + -- Root steps (no dependencies) get empty object - they access flowInput via context. + -- Dependent steps get only their dependency outputs. + ELSE + -- Non-map steps get structured input with dependency keys only + -- Example for dependent step: { + -- "step1": {"output": "from_step1"}, + -- "step2": {"output": "from_step2"} + -- } + -- Example for root step: {} + -- + -- Note: flow_input is available separately in the returned record + -- for workers to access via context.flowInput + coalesce(dep_out.deps_output, '{}'::jsonb) + END as input, + st.message_id as msg_id, + st.task_index as task_index, + -- flow_input: Original run input for worker context + -- Only included for root non-map steps to avoid data duplication. + -- Root map steps: flowInput IS the array, useless to include + -- Dependent steps: lazy load via ctx.flowInput when needed + CASE + WHEN step.step_type != 'map' AND step.deps_count = 0 + THEN r.input + ELSE NULL + END as flow_input + from tasks st + join runs r on st.run_id = r.run_id + join pgflow.steps step on + step.flow_slug = st.flow_slug and + step.step_slug = st.step_slug + left join deps_outputs dep_out on + dep_out.run_id = st.run_id and + dep_out.step_slug = st.step_slug +$$; +-- Create "add_step" function +CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + result_step pgflow.steps; + next_idx int; +BEGIN + -- Validate map step constraints + -- Map steps can have either: + -- 0 dependencies (root map - maps over flow input array) + -- 1 dependency (dependent map - maps over dependency output array) + IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN + RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', + add_step.step_slug, + COALESCE(array_length(add_step.deps_slugs, 1), 0), + array_to_string(add_step.deps_slugs, ', '); + END IF; + + -- Get next step index + SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx + FROM pgflow.steps s + WHERE s.flow_slug = add_step.flow_slug; + + -- Create the step + INSERT INTO pgflow.steps ( + flow_slug, step_slug, step_type, step_index, deps_count, + opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay, + condition_pattern, when_unmet, when_failed + ) + VALUES ( + add_step.flow_slug, + add_step.step_slug, + COALESCE(add_step.step_type, 'single'), + next_idx, + COALESCE(array_length(add_step.deps_slugs, 1), 0), + add_step.max_attempts, + add_step.base_delay, + add_step.timeout, + add_step.start_delay, + add_step.condition_pattern, + add_step.when_unmet, + add_step.when_failed + ) + ON CONFLICT ON CONSTRAINT steps_pkey + DO UPDATE SET step_slug = EXCLUDED.step_slug + RETURNING * INTO result_step; + + -- Insert dependencies + INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug) + SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug + FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug) + WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0 + ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING; + + RETURN result_step; +END; +$$; +-- Drop "add_step" function +DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer, integer, text); diff --git a/pkgs/core/supabase/migrations/20260105203847_pgflow_rename_cascade_skip_steps.sql b/pkgs/core/supabase/migrations/20260105203847_pgflow_rename_cascade_skip_steps.sql new file mode 100644 index 000000000..054e39593 --- /dev/null +++ b/pkgs/core/supabase/migrations/20260105203847_pgflow_rename_cascade_skip_steps.sql @@ -0,0 +1,348 @@ +-- Create "_cascade_force_skip_steps" function +CREATE FUNCTION "pgflow"."_cascade_force_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$ +DECLARE + v_flow_slug text; + v_total_skipped int := 0; +BEGIN + -- Get flow_slug for this run + SELECT r.flow_slug INTO v_flow_slug + FROM pgflow.runs r + WHERE r.run_id = _cascade_force_skip_steps.run_id; + + IF v_flow_slug IS NULL THEN + RAISE EXCEPTION 'Run not found: %', _cascade_force_skip_steps.run_id; + END IF; + + -- ========================================== + -- SKIP STEPS IN TOPOLOGICAL ORDER + -- ========================================== + -- Use recursive CTE to find all downstream dependents, + -- then skip them in topological order (by step_index) + WITH RECURSIVE + -- ---------- Find all downstream steps ---------- + downstream_steps AS ( + -- Base case: the trigger step + SELECT + s.flow_slug, + s.step_slug, + s.step_index, + _cascade_force_skip_steps.skip_reason AS reason -- Original reason for trigger step + FROM pgflow.steps s + WHERE s.flow_slug = v_flow_slug + AND s.step_slug = _cascade_force_skip_steps.step_slug + + UNION ALL + + -- Recursive case: steps that depend on already-found steps + SELECT + s.flow_slug, + s.step_slug, + s.step_index, + 'dependency_skipped'::text AS reason -- Downstream steps get this reason + FROM pgflow.steps s + JOIN pgflow.deps d ON d.flow_slug = s.flow_slug AND d.step_slug = s.step_slug + JOIN downstream_steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.dep_slug + ), + -- ---------- Deduplicate and order by step_index ---------- + steps_to_skip AS ( + SELECT DISTINCT ON (ds.step_slug) + ds.flow_slug, + ds.step_slug, + ds.step_index, + ds.reason + FROM downstream_steps ds + ORDER BY ds.step_slug, ds.step_index -- Keep first occurrence (trigger step has original reason) + ), + -- ---------- Skip the steps ---------- + skipped AS ( + UPDATE pgflow.step_states ss + SET status = 'skipped', + skip_reason = sts.reason, + skipped_at = now(), + remaining_tasks = NULL -- Clear remaining_tasks for skipped steps + FROM steps_to_skip sts + WHERE ss.run_id = _cascade_force_skip_steps.run_id + AND ss.step_slug = sts.step_slug + AND ss.status IN ('created', 'started') -- Only skip non-terminal steps + RETURNING + ss.*, + -- Broadcast step:skipped event + realtime.send( + jsonb_build_object( + 'event_type', 'step:skipped', + 'run_id', ss.run_id, + 'flow_slug', ss.flow_slug, + 'step_slug', ss.step_slug, + 'status', 'skipped', + 'skip_reason', ss.skip_reason, + 'skipped_at', ss.skipped_at + ), + concat('step:', ss.step_slug, ':skipped'), + concat('pgflow:run:', ss.run_id), + false + ) as _broadcast_result + ), + -- ---------- Update run counters ---------- + run_updates AS ( + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - skipped_count.count + FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count + WHERE r.run_id = _cascade_force_skip_steps.run_id + AND skipped_count.count > 0 + ) + SELECT COUNT(*) INTO v_total_skipped FROM skipped; + + RETURN v_total_skipped; +END; +$$; +-- Modify "cascade_resolve_conditions" function +CREATE OR REPLACE FUNCTION "pgflow"."cascade_resolve_conditions" ("run_id" uuid) RETURNS boolean LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + v_run_input jsonb; + v_run_status text; + v_first_fail record; + v_iteration_count int := 0; + v_max_iterations int := 50; + v_processed_count int; +BEGIN + -- ========================================== + -- GUARD: Early return if run is already terminal + -- ========================================== + SELECT r.status, r.input INTO v_run_status, v_run_input + FROM pgflow.runs r + WHERE r.run_id = cascade_resolve_conditions.run_id; + + IF v_run_status IN ('failed', 'completed') THEN + RETURN v_run_status != 'failed'; + END IF; + + -- ========================================== + -- ITERATE UNTIL CONVERGENCE + -- ========================================== + -- After skipping steps, dependents may become ready and need evaluation. + -- Loop until no more steps are processed. + LOOP + v_iteration_count := v_iteration_count + 1; + IF v_iteration_count > v_max_iterations THEN + RAISE EXCEPTION 'cascade_resolve_conditions exceeded safety limit of % iterations', v_max_iterations; + END IF; + + v_processed_count := 0; + + -- ========================================== + -- PHASE 1a: CHECK FOR FAIL CONDITIONS + -- ========================================== + -- Find first step (by topological order) with unmet condition and 'fail' mode. + WITH steps_with_conditions AS ( + SELECT + step_state.flow_slug, + step_state.step_slug, + step.condition_pattern, + step.when_unmet, + step.deps_count, + step.step_index + FROM pgflow.step_states AS step_state + JOIN pgflow.steps AS step + ON step.flow_slug = step_state.flow_slug + AND step.step_slug = step_state.step_slug + WHERE step_state.run_id = cascade_resolve_conditions.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step.condition_pattern IS NOT NULL + ), + step_deps_output AS ( + SELECT + swc.step_slug, + jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output + FROM steps_with_conditions swc + JOIN pgflow.deps dep ON dep.flow_slug = swc.flow_slug AND dep.step_slug = swc.step_slug + JOIN pgflow.step_states dep_state + ON dep_state.run_id = cascade_resolve_conditions.run_id + AND dep_state.step_slug = dep.dep_slug + AND dep_state.status = 'completed' -- Only completed deps (not skipped) + WHERE swc.deps_count > 0 + GROUP BY swc.step_slug + ), + condition_evaluations AS ( + SELECT + swc.*, + CASE + WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern + ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern + END AS condition_met + FROM steps_with_conditions swc + LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug + ) + SELECT flow_slug, step_slug, condition_pattern + INTO v_first_fail + FROM condition_evaluations + WHERE NOT condition_met AND when_unmet = 'fail' + ORDER BY step_index + LIMIT 1; + + -- Handle fail mode: fail step and run, return false + IF v_first_fail IS NOT NULL THEN + UPDATE pgflow.step_states + SET status = 'failed', + failed_at = now(), + error_message = 'Condition not met: ' || v_first_fail.condition_pattern::text + WHERE pgflow.step_states.run_id = cascade_resolve_conditions.run_id + AND pgflow.step_states.step_slug = v_first_fail.step_slug; + + UPDATE pgflow.runs + SET status = 'failed', + failed_at = now() + WHERE pgflow.runs.run_id = cascade_resolve_conditions.run_id; + + RETURN false; + END IF; + + -- ========================================== + -- PHASE 1b: HANDLE SKIP CONDITIONS (with propagation) + -- ========================================== + -- Skip steps with unmet conditions and whenUnmet='skip'. + -- NEW: Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents. + WITH steps_with_conditions AS ( + SELECT + step_state.flow_slug, + step_state.step_slug, + step.condition_pattern, + step.when_unmet, + step.deps_count, + step.step_index + FROM pgflow.step_states AS step_state + JOIN pgflow.steps AS step + ON step.flow_slug = step_state.flow_slug + AND step.step_slug = step_state.step_slug + WHERE step_state.run_id = cascade_resolve_conditions.run_id + AND step_state.status = 'created' + AND step_state.remaining_deps = 0 + AND step.condition_pattern IS NOT NULL + ), + step_deps_output AS ( + SELECT + swc.step_slug, + jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output + FROM steps_with_conditions swc + JOIN pgflow.deps dep ON dep.flow_slug = swc.flow_slug AND dep.step_slug = swc.step_slug + JOIN pgflow.step_states dep_state + ON dep_state.run_id = cascade_resolve_conditions.run_id + AND dep_state.step_slug = dep.dep_slug + AND dep_state.status = 'completed' -- Only completed deps (not skipped) + WHERE swc.deps_count > 0 + GROUP BY swc.step_slug + ), + condition_evaluations AS ( + SELECT + swc.*, + CASE + WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern + ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern + END AS condition_met + FROM steps_with_conditions swc + LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug + ), + unmet_skip_steps AS ( + SELECT * FROM condition_evaluations + WHERE NOT condition_met AND when_unmet = 'skip' + ), + skipped_steps AS ( + UPDATE pgflow.step_states ss + SET status = 'skipped', + skip_reason = 'condition_unmet', + skipped_at = now() + FROM unmet_skip_steps uss + WHERE ss.run_id = cascade_resolve_conditions.run_id + AND ss.step_slug = uss.step_slug + RETURNING + ss.*, + realtime.send( + jsonb_build_object( + 'event_type', 'step:skipped', + 'run_id', ss.run_id, + 'flow_slug', ss.flow_slug, + 'step_slug', ss.step_slug, + 'status', 'skipped', + 'skip_reason', 'condition_unmet', + 'skipped_at', ss.skipped_at + ), + concat('step:', ss.step_slug, ':skipped'), + concat('pgflow:run:', ss.run_id), + false + ) AS _broadcast_result + ), + -- NEW: Update dependent steps (decrement remaining_deps, set initial_tasks=0 for maps) + dependent_updates AS ( + UPDATE pgflow.step_states child_state + SET remaining_deps = child_state.remaining_deps - 1, + -- If child is a map step and this skipped step is its only dependency, + -- set initial_tasks = 0 (skipped dep = empty array) + initial_tasks = CASE + WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0 + ELSE child_state.initial_tasks + END + FROM skipped_steps parent + JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug + JOIN pgflow.steps child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug + WHERE child_state.run_id = cascade_resolve_conditions.run_id + AND child_state.step_slug = dep.step_slug + ), + run_update AS ( + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - (SELECT COUNT(*) FROM skipped_steps) + WHERE r.run_id = cascade_resolve_conditions.run_id + AND (SELECT COUNT(*) FROM skipped_steps) > 0 + ) + SELECT COUNT(*)::int INTO v_processed_count FROM skipped_steps; + + -- ========================================== + -- PHASE 1c: HANDLE SKIP-CASCADE CONDITIONS + -- ========================================== + -- Call _cascade_force_skip_steps for each step with unmet condition and whenUnmet='skip-cascade'. + -- Process in topological order; _cascade_force_skip_steps is idempotent. + PERFORM pgflow._cascade_force_skip_steps(cascade_resolve_conditions.run_id, ready_step.step_slug, 'condition_unmet') + FROM pgflow.step_states AS ready_step + JOIN pgflow.steps AS step + ON step.flow_slug = ready_step.flow_slug + AND step.step_slug = ready_step.step_slug + LEFT JOIN LATERAL ( + SELECT jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output + FROM pgflow.deps dep + JOIN pgflow.step_states dep_state + ON dep_state.run_id = cascade_resolve_conditions.run_id + AND dep_state.step_slug = dep.dep_slug + AND dep_state.status = 'completed' -- Only completed deps (not skipped) + WHERE dep.flow_slug = ready_step.flow_slug + AND dep.step_slug = ready_step.step_slug + ) AS agg_deps ON step.deps_count > 0 + WHERE ready_step.run_id = cascade_resolve_conditions.run_id + AND ready_step.status = 'created' + AND ready_step.remaining_deps = 0 + AND step.condition_pattern IS NOT NULL + AND step.when_unmet = 'skip-cascade' + AND NOT ( + CASE + WHEN step.deps_count = 0 THEN v_run_input @> step.condition_pattern + ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) @> step.condition_pattern + END + ) + ORDER BY step.step_index; + + -- Check if run was failed during cascade (e.g., if _cascade_force_skip_steps triggers fail) + SELECT r.status INTO v_run_status + FROM pgflow.runs r + WHERE r.run_id = cascade_resolve_conditions.run_id; + + IF v_run_status IN ('failed', 'completed') THEN + RETURN v_run_status != 'failed'; + END IF; + + -- Exit loop if no steps were processed in this iteration + EXIT WHEN v_processed_count = 0; + END LOOP; + + RETURN true; +END; +$$; +-- Drop "cascade_skip_steps" function +DROP FUNCTION "pgflow"."cascade_skip_steps"; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index e23d991f8..bf02c11e5 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:95pJcIaIV04WBvPgFpjULl/TWBCArYhQTMB4IG69phs= +h1:U+A8OjhnUoZjydIbgkI6blxH+7ZJslRQeQEn5tdID4k= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -16,4 +16,5 @@ h1:95pJcIaIV04WBvPgFpjULl/TWBCArYhQTMB4IG69phs= 20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ= 20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc= 20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o= -20260105074725_pgflow_temp_skip_infrastructure.sql h1:tjele0FyNwcK0DLlr7I8QxiAueTC36r7KYK27Mkbi2s= +20260105121138_pgflow_condition_evaluation.sql h1:OR8KpHBKBB87VBhnZB+NkW8Rs7pGBPuPL8GF8FcM+oU= +20260105203847_pgflow_rename_cascade_skip_steps.sql h1:chw/UFU1F7vGcWKkqfApPfLt+k48fKBnrQO31kT/zKg= diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/broadcast_order.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/broadcast_order.test.sql similarity index 93% rename from pkgs/core/supabase/tests/cascade_skip_steps/broadcast_order.test.sql rename to pkgs/core/supabase/tests/_cascade_force_skip_steps/broadcast_order.test.sql index 44e04d649..012f3368e 100644 --- a/pkgs/core/supabase/tests/cascade_skip_steps/broadcast_order.test.sql +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/broadcast_order.test.sql @@ -1,4 +1,4 @@ --- Test: cascade_skip_steps - Broadcast order respects dependency graph +-- Test: _cascade_force_skip_steps - Broadcast order respects dependency graph -- Verifies step:skipped events are sent in topological order begin; select plan(2); @@ -17,7 +17,7 @@ with flow as ( select run_id into temporary run_ids from flow; -- Skip step_a (cascades to B and C) -select pgflow.cascade_skip_steps( +select pgflow._cascade_force_skip_steps( (select run_id from run_ids), 'step_a', 'condition_unmet' diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/cascade_through_multiple_levels.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_through_multiple_levels.test.sql similarity index 95% rename from pkgs/core/supabase/tests/cascade_skip_steps/cascade_through_multiple_levels.test.sql rename to pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_through_multiple_levels.test.sql index 888a8d6f8..5faa21cff 100644 --- a/pkgs/core/supabase/tests/cascade_skip_steps/cascade_through_multiple_levels.test.sql +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_through_multiple_levels.test.sql @@ -1,4 +1,4 @@ --- Test: cascade_skip_steps - Cascade through multiple DAG levels +-- Test: _cascade_force_skip_steps - Cascade through multiple DAG levels -- Verifies skipping A cascades through A -> B -> C chain begin; select plan(8); @@ -17,7 +17,7 @@ with flow as ( select run_id into temporary run_ids from flow; -- Skip step_a (should cascade to step_b and step_c) -select pgflow.cascade_skip_steps( +select pgflow._cascade_force_skip_steps( (select run_id from run_ids), 'step_a', 'handler_failed' diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/cascade_to_single_dependent.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_to_single_dependent.test.sql similarity index 95% rename from pkgs/core/supabase/tests/cascade_skip_steps/cascade_to_single_dependent.test.sql rename to pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_to_single_dependent.test.sql index a6b086b41..e6be5c41f 100644 --- a/pkgs/core/supabase/tests/cascade_skip_steps/cascade_to_single_dependent.test.sql +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_to_single_dependent.test.sql @@ -1,4 +1,4 @@ --- Test: cascade_skip_steps - Cascade to single dependent +-- Test: _cascade_force_skip_steps - Cascade to single dependent -- Verifies skipping a step cascades to its direct dependent begin; select plan(7); @@ -16,7 +16,7 @@ with flow as ( select run_id into temporary run_ids from flow; -- Skip step_a (should cascade to step_b) -select pgflow.cascade_skip_steps( +select pgflow._cascade_force_skip_steps( (select run_id from run_ids), 'step_a', 'condition_unmet' diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/multi_dependency_partial_skip.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/multi_dependency_partial_skip.test.sql similarity index 95% rename from pkgs/core/supabase/tests/cascade_skip_steps/multi_dependency_partial_skip.test.sql rename to pkgs/core/supabase/tests/_cascade_force_skip_steps/multi_dependency_partial_skip.test.sql index c7a1f2327..fc14700d5 100644 --- a/pkgs/core/supabase/tests/cascade_skip_steps/multi_dependency_partial_skip.test.sql +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/multi_dependency_partial_skip.test.sql @@ -1,4 +1,4 @@ --- Test: cascade_skip_steps - Multi-dependency scenario +-- Test: _cascade_force_skip_steps - Multi-dependency scenario -- Flow: A -> C, B -> C (C depends on both A and B) -- Skipping A should cascade to C, even though B is still runnable begin; @@ -18,7 +18,7 @@ with flow as ( select run_id into temporary run_ids from flow; -- Skip step_a (should cascade to step_c) -select pgflow.cascade_skip_steps( +select pgflow._cascade_force_skip_steps( (select run_id from run_ids), 'step_a', 'condition_unmet' diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/single_step_skip.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/single_step_skip.test.sql similarity index 91% rename from pkgs/core/supabase/tests/cascade_skip_steps/single_step_skip.test.sql rename to pkgs/core/supabase/tests/_cascade_force_skip_steps/single_step_skip.test.sql index 2df25cbd5..e892ef21b 100644 --- a/pkgs/core/supabase/tests/cascade_skip_steps/single_step_skip.test.sql +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/single_step_skip.test.sql @@ -1,4 +1,4 @@ --- Test: cascade_skip_steps - Single step skip (base case) +-- Test: _cascade_force_skip_steps - Single step skip (base case) -- Verifies the function can skip a single step without dependencies begin; select plan(5); @@ -24,7 +24,7 @@ select is( ); -- Skip step_a -select pgflow.cascade_skip_steps( +select pgflow._cascade_force_skip_steps( (select run_id from run_ids), 'step_a', 'condition_unmet' @@ -35,7 +35,7 @@ select is( (select status from pgflow.step_states where run_id = (select run_id from run_ids) and step_slug = 'step_a'), 'skipped', - 'step_a should be skipped after cascade_skip_steps' + 'step_a should be skipped after _cascade_force_skip_steps' ); -- Test 3: step_a should have skip_reason set diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/skipped_event_payload.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/skipped_event_payload.test.sql similarity index 94% rename from pkgs/core/supabase/tests/cascade_skip_steps/skipped_event_payload.test.sql rename to pkgs/core/supabase/tests/_cascade_force_skip_steps/skipped_event_payload.test.sql index 6b546928f..015f31c29 100644 --- a/pkgs/core/supabase/tests/cascade_skip_steps/skipped_event_payload.test.sql +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/skipped_event_payload.test.sql @@ -1,4 +1,4 @@ --- Test: cascade_skip_steps - step:skipped event payload format +-- Test: _cascade_force_skip_steps - step:skipped event payload format -- Verifies the realtime event contains all required fields begin; select plan(8); @@ -15,7 +15,7 @@ with flow as ( select run_id into temporary run_ids from flow; -- Skip step_a -select pgflow.cascade_skip_steps( +select pgflow._cascade_force_skip_steps( (select run_id from run_ids), 'step_a', 'condition_unmet' diff --git a/pkgs/core/supabase/tests/condition_evaluation/dependent_step_condition_met.test.sql b/pkgs/core/supabase/tests/condition_evaluation/dependent_step_condition_met.test.sql new file mode 100644 index 000000000..a3e275eb9 --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/dependent_step_condition_met.test.sql @@ -0,0 +1,68 @@ +-- Test: Dependent step condition met - step executes normally +-- Verifies that a dependent step with a condition pattern that matches +-- the aggregated dependency outputs starts normally +begin; +select plan(3); + +-- Reset database +select pgflow_tests.reset_db(); + +-- Create flow with a root step and a conditional dependent step +select pgflow.create_flow('conditional_flow'); +select pgflow.add_step('conditional_flow', 'first'); +select pgflow.add_step( + 'conditional_flow', + 'checked_step', + '{first}', -- depends on first + null, null, null, null, -- default options + 'single', -- step_type + '{"first": {"success": true}}'::jsonb, -- condition: first.success must be true + 'skip' -- when_unmet +); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('conditional_flow', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Read and start the first step's task +select pgflow_tests.read_and_start('conditional_flow'); + +-- Complete first step with output that MATCHES condition +select pgflow.complete_task( + (select run_id from run_ids), + 'first', + 0, + '{"success": true, "data": "hello"}'::jsonb +); + +-- Test 1: checked_step should be 'started' (condition met) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 'started', + 'Dependent step with met condition should start' +); + +-- Test 2: skip_reason should be NULL +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + NULL, + 'Step with met condition should have no skip_reason' +); + +-- Test 3: Task should be created +select is( + (select count(*)::int from pgflow.step_tasks + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 1, + 'Task should be created for step with met condition' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/dependent_step_condition_unmet_skip.test.sql b/pkgs/core/supabase/tests/condition_evaluation/dependent_step_condition_unmet_skip.test.sql new file mode 100644 index 000000000..3dbf44a9a --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/dependent_step_condition_unmet_skip.test.sql @@ -0,0 +1,75 @@ +-- Test: Dependent step condition unmet with whenUnmet='skip' +-- Verifies that a dependent step with unmet condition is skipped +-- when its dependency output doesn't match the pattern +begin; +select plan(4); + +-- Reset database +select pgflow_tests.reset_db(); + +-- Create flow with a root step and a conditional dependent step +select pgflow.create_flow('conditional_flow'); +select pgflow.add_step('conditional_flow', 'first'); +select pgflow.add_step( + 'conditional_flow', + 'checked_step', + '{first}', -- depends on first + null, null, null, null, -- default options + 'single', -- step_type + '{"first": {"success": true}}'::jsonb, -- condition: first.success must be true + 'skip' -- when_unmet +); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('conditional_flow', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Read and start the first step's task +select pgflow_tests.read_and_start('conditional_flow'); + +-- Complete first step with output that does NOT match condition +select pgflow.complete_task( + (select run_id from run_ids), + 'first', + 0, + '{"success": false, "error": "something went wrong"}'::jsonb +); + +-- Test 1: checked_step should be 'skipped' (condition unmet) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 'skipped', + 'Dependent step with unmet condition should be skipped' +); + +-- Test 2: skip_reason should be 'condition_unmet' +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 'condition_unmet', + 'Step should have skip_reason = condition_unmet' +); + +-- Test 3: No task should be created +select is( + (select count(*)::int from pgflow.step_tasks + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 0, + 'No task should be created for skipped step' +); + +-- Test 4: Run should complete (all steps done) +select is( + (select status from pgflow.runs where run_id = (select run_id from run_ids)), + 'completed', + 'Run should complete when skipped step was the last step' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/no_condition_always_executes.test.sql b/pkgs/core/supabase/tests/condition_evaluation/no_condition_always_executes.test.sql new file mode 100644 index 000000000..46f172760 --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/no_condition_always_executes.test.sql @@ -0,0 +1,40 @@ +-- Test: Step with no condition (NULL pattern) always executes +-- Verifies that steps without condition_pattern execute normally +-- regardless of input content +begin; +select plan(2); + +-- Reset database +select pgflow_tests.reset_db(); + +-- Create flow with a step that has no condition (default) +select pgflow.create_flow('simple_flow'); +select pgflow.add_step('simple_flow', 'unconditioned'); + +-- Start flow with any input +with flow as ( + select * from pgflow.start_flow('simple_flow', '{"anything": "goes"}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: Step should be started (no condition means always execute) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'unconditioned'), + 'started', + 'Step with no condition should start regardless of input' +); + +-- Test 2: Task should be created +select is( + (select count(*)::int from pgflow.step_tasks + where run_id = (select run_id from run_ids) and step_slug = 'unconditioned'), + 1, + 'Task should be created for step with no condition' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/plain_skip_iterates_until_convergence.test.sql b/pkgs/core/supabase/tests/condition_evaluation/plain_skip_iterates_until_convergence.test.sql new file mode 100644 index 000000000..49d43091e --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/plain_skip_iterates_until_convergence.test.sql @@ -0,0 +1,121 @@ +-- Test: Plain skip iterates until convergence +-- Verifies that after skipping a step: +-- 1. Dependents' remaining_deps are decremented +-- 2. Those newly-ready dependents get their conditions evaluated +-- 3. If they also have unmet conditions, they're also skipped +-- 4. Process repeats until no more steps need skipping +-- +-- Flow: a (skip) -> b (skip) -> c (no condition) +-- When 'a' is skipped, 'b' becomes ready and should also be skipped +-- Then 'c' becomes ready (but has no condition, so starts normally) +begin; +select plan(8); + +-- Reset database +select pgflow_tests.reset_db(); + +-- Create flow with chain: a -> b -> c +-- a has unmet condition (skip) +-- b depends on a.success (also skip) +-- c has no condition +select pgflow.create_flow('chain_skip'); +select pgflow.add_step( + 'chain_skip', + 'step_a', + '{}', -- root step + null, null, null, null, + 'single', + '{"enabled": true}'::jsonb, -- condition: requires enabled=true + 'skip' -- plain skip +); +select pgflow.add_step( + 'chain_skip', + 'step_b', + '{step_a}', -- depends on a + null, null, null, null, + 'single', + '{"step_a": {"success": true}}'::jsonb, -- condition: a.success must be true + 'skip' -- plain skip (won't be met since a was skipped) +); +select pgflow.add_step( + 'chain_skip', + 'step_c', + '{step_b}', -- depends on b + null, null, null, null, + 'single' -- no condition +); + +-- Start flow with input that does NOT match step_a's condition +with flow as ( + select * from pgflow.start_flow('chain_skip', '{"enabled": false}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: step_a should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'skipped', + 'step_a with unmet condition should be skipped' +); + +-- Test 2: step_b should also be skipped (its condition references skipped step_a) +-- The condition '{"step_a": {"success": true}}' cannot be met when step_a is skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'skipped', + 'step_b should be skipped (condition references skipped dependency)' +); + +-- Test 3: step_b skip_reason should be 'condition_unmet' +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'condition_unmet', + 'step_b should have skip_reason = condition_unmet' +); + +-- Test 4: step_c remaining_deps should be 0 +select is( + (select remaining_deps from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_c'), + 0, + 'step_c remaining_deps should be 0 (both a and b skipped)' +); + +-- Test 5: step_c should be started (has no condition) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_c'), + 'started', + 'step_c with no condition should be started' +); + +-- Test 6: step_c should have a task created +select is( + (select count(*)::int from pgflow.step_tasks + where run_id = (select run_id from run_ids) and step_slug = 'step_c'), + 1, + 'step_c should have one task created' +); + +-- Test 7: Run remaining_steps should be 1 (only step_c) +select is( + (select remaining_steps from pgflow.runs where run_id = (select run_id from run_ids)), + 1, + 'Run remaining_steps should be 1 (only step_c remaining)' +); + +-- Test 8: Run should be started (not completed yet, step_c still running) +select is( + (select status from pgflow.runs where run_id = (select run_id from run_ids)), + 'started', + 'Run should be started while step_c is running' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/plain_skip_propagates_to_map.test.sql b/pkgs/core/supabase/tests/condition_evaluation/plain_skip_propagates_to_map.test.sql new file mode 100644 index 000000000..7d0d31d0c --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/plain_skip_propagates_to_map.test.sql @@ -0,0 +1,108 @@ +-- Test: Plain skip (whenUnmet='skip') propagates correctly to dependent map step +-- Verifies that when a step is skipped with plain 'skip' mode: +-- 1. remaining_deps on dependents is decremented +-- 2. initial_tasks is set to 0 for map dependents +-- 3. The run completes properly (not hanging) +-- +-- This tests the bug fix: Before this fix, plain skip didn't update +-- remaining_deps on dependents, causing runs to hang forever. +begin; +select plan(8); + +-- Reset database +select pgflow_tests.reset_db(); + +-- Create flow: +-- producer (conditional, skip) -> map_consumer (map step) +select pgflow.create_flow('skip_to_map'); +select pgflow.add_step( + 'skip_to_map', + 'producer', + '{}', -- root step + null, null, null, null, -- default options + 'single', -- step_type + '{"enabled": true}'::jsonb, -- condition: requires enabled=true + 'skip' -- when_unmet - plain skip (not skip-cascade) +); +-- Map consumer: no condition, just depends on producer +select pgflow.add_step( + 'skip_to_map', + 'map_consumer', + '{producer}', -- depends on producer + null, null, null, null, -- default options + 'map' -- map step type (no condition_pattern or when_unmet needed) +); + +-- Start flow with input that does NOT match producer's condition +with flow as ( + select * from pgflow.start_flow('skip_to_map', '{"enabled": false}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: producer should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'producer'), + 'skipped', + 'Producer with unmet condition should be skipped' +); + +-- Test 2: producer skip_reason should be 'condition_unmet' +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'producer'), + 'condition_unmet', + 'Producer should have skip_reason = condition_unmet' +); + +-- Test 3: map_consumer remaining_deps should be 0 (decremented from 1) +select is( + (select remaining_deps from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'map_consumer'), + 0, + 'Map consumer remaining_deps should be decremented to 0' +); + +-- Test 4: map_consumer initial_tasks should be 0 (skipped parent = empty array) +select is( + (select initial_tasks from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'map_consumer'), + 0, + 'Map consumer initial_tasks should be 0 (skipped dep = empty array)' +); + +-- Test 5: map_consumer should be completed (cascade_complete_taskless_steps handles 0 tasks) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'map_consumer'), + 'completed', + 'Map consumer should be completed (empty map auto-completes)' +); + +-- Test 6: map_consumer output should be empty array +select is( + (select output from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'map_consumer'), + '[]'::jsonb, + 'Map consumer output should be empty array' +); + +-- Test 7: Run remaining_steps should be 0 +select is( + (select remaining_steps from pgflow.runs where run_id = (select run_id from run_ids)), + 0, + 'Run remaining_steps should be 0' +); + +-- Test 8: Run should be completed (not hanging!) +select is( + (select status from pgflow.runs where run_id = (select run_id from run_ids)), + 'completed', + 'Run should complete (not hang) when skip propagates to map' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_met.test.sql b/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_met.test.sql new file mode 100644 index 000000000..d7c853548 --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_met.test.sql @@ -0,0 +1,56 @@ +-- Test: Root step condition met - step executes normally +-- Verifies that a root step with a condition pattern that matches the flow input +-- starts normally without being skipped +begin; +select plan(3); + +-- Reset database +select pgflow_tests.reset_db(); + +-- Create flow with a root step that has a condition +select pgflow.create_flow('conditional_flow'); +select pgflow.add_step( + 'conditional_flow', + 'checked_step', + '{}', -- no deps (root step) + null, null, null, null, -- default options + 'single', -- step_type + '{"enabled": true}'::jsonb, -- condition_pattern: requires enabled=true + 'skip' -- when_unmet +); + +-- Start flow with input that matches condition +with flow as ( + select * from pgflow.start_flow('conditional_flow', '{"enabled": true, "value": 42}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: Step should be in 'started' status (condition met, step executes) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 'started', + 'Step with met condition should start normally' +); + +-- Test 2: skip_reason should be NULL (not skipped) +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + NULL, + 'Step with met condition should have no skip_reason' +); + +-- Test 3: Task should be created +select is( + (select count(*)::int from pgflow.step_tasks + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 1, + 'Task should be created for step with met condition' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_fail.test.sql b/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_fail.test.sql new file mode 100644 index 000000000..c12703a85 --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_fail.test.sql @@ -0,0 +1,62 @@ +-- Test: Root step condition unmet with whenUnmet='fail' - run fails +-- Verifies that a root step with unmet condition and whenUnmet='fail' +-- causes the run to fail immediately +begin; +select plan(4); + +-- Reset database +select pgflow_tests.reset_db(); + +-- Create flow with a root step that has a condition with fail mode +select pgflow.create_flow('conditional_flow'); +select pgflow.add_step( + 'conditional_flow', + 'checked_step', + '{}', -- no deps (root step) + null, null, null, null, -- default options + 'single', -- step_type + '{"enabled": true}'::jsonb, -- condition_pattern: requires enabled=true + 'fail' -- when_unmet - causes run to fail +); + +-- Start flow with input that does NOT match condition +with flow as ( + select * from pgflow.start_flow('conditional_flow', '{"enabled": false, "value": 42}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: checked_step should be 'failed' (condition unmet + fail mode) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 'failed', + 'Step with unmet condition and whenUnmet=fail should be failed' +); + +-- Test 2: error_message should indicate condition unmet +select ok( + (select error_message from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'checked_step') ILIKE '%condition%', + 'Failed step should have error message about condition' +); + +-- Test 3: No task should be created +select is( + (select count(*)::int from pgflow.step_tasks + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 0, + 'No task should be created for failed step' +); + +-- Test 4: Run should be failed +select is( + (select status from pgflow.runs where run_id = (select run_id from run_ids)), + 'failed', + 'Run should fail when step condition fails with fail mode' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_skip.test.sql b/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_skip.test.sql new file mode 100644 index 000000000..aacdff2c2 --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_skip.test.sql @@ -0,0 +1,73 @@ +-- Test: Root step condition unmet with whenUnmet='skip' - step skipped +-- Verifies that a root step with unmet condition and whenUnmet='skip' +-- is skipped but the run continues +begin; +select plan(5); + +-- Reset database +select pgflow_tests.reset_db(); + +-- Create flow with a root step that has a condition +select pgflow.create_flow('conditional_flow'); +select pgflow.add_step( + 'conditional_flow', + 'checked_step', + '{}', -- no deps (root step) + null, null, null, null, -- default options + 'single', -- step_type + '{"enabled": true}'::jsonb, -- condition_pattern: requires enabled=true + 'skip' -- when_unmet +); +-- Add another root step without condition +select pgflow.add_step('conditional_flow', 'other_step'); + +-- Start flow with input that does NOT match condition +with flow as ( + select * from pgflow.start_flow('conditional_flow', '{"enabled": false, "value": 42}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: checked_step should be 'skipped' (condition unmet) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 'skipped', + 'Step with unmet condition and whenUnmet=skip should be skipped' +); + +-- Test 2: skip_reason should be 'condition_unmet' +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 'condition_unmet', + 'Step with unmet condition should have skip_reason = condition_unmet' +); + +-- Test 3: No task should be created for skipped step +select is( + (select count(*)::int from pgflow.step_tasks + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 0, + 'No task should be created for skipped step' +); + +-- Test 4: other_step should be started (independent root step) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'other_step'), + 'started', + 'Other step without condition should start normally' +); + +-- Test 5: Run should continue (not failed) +select is( + (select status from pgflow.runs where run_id = (select run_id from run_ids)), + 'started', + 'Run should continue when step is skipped' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_skip_cascade.test.sql b/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_skip_cascade.test.sql new file mode 100644 index 000000000..3cde76fc2 --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/root_step_condition_unmet_skip_cascade.test.sql @@ -0,0 +1,86 @@ +-- Test: Root step condition unmet with whenUnmet='skip-cascade' - step and dependents skipped +-- Verifies that a root step with unmet condition and whenUnmet='skip-cascade' +-- skips the step AND all its dependents +begin; +select plan(6); + +-- Reset database +select pgflow_tests.reset_db(); + +-- Create flow with a root step that has a condition and a dependent +select pgflow.create_flow('conditional_flow'); +select pgflow.add_step( + 'conditional_flow', + 'checked_step', + '{}', -- no deps (root step) + null, null, null, null, -- default options + 'single', -- step_type + '{"enabled": true}'::jsonb, -- condition_pattern + 'skip-cascade' -- when_unmet - skip this AND dependents +); +select pgflow.add_step( + 'conditional_flow', + 'dependent_step', + '{checked_step}' -- depends on checked_step +); +-- Add an independent root step that should still run +select pgflow.add_step('conditional_flow', 'other_step'); + +-- Start flow with input that does NOT match condition +with flow as ( + select * from pgflow.start_flow('conditional_flow', '{"enabled": false}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: checked_step should be 'skipped' +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 'skipped', + 'Step with unmet condition and skip-cascade should be skipped' +); + +-- Test 2: checked_step skip_reason should be 'condition_unmet' +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'checked_step'), + 'condition_unmet', + 'Original step should have skip_reason = condition_unmet' +); + +-- Test 3: dependent_step should be 'skipped' (cascaded) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'dependent_step'), + 'skipped', + 'Dependent step should be skipped due to cascade' +); + +-- Test 4: dependent_step skip_reason should be 'dependency_skipped' +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'dependent_step'), + 'dependency_skipped', + 'Cascaded step should have skip_reason = dependency_skipped' +); + +-- Test 5: other_step should be started (independent) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'other_step'), + 'started', + 'Independent step should start normally' +); + +-- Test 6: Run should continue (remaining_steps decremented by skipped steps) +select is( + (select status from pgflow.runs where run_id = (select run_id from run_ids)), + 'started', + 'Run should continue after skip-cascade' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/skipped_deps_excluded_from_input.test.sql b/pkgs/core/supabase/tests/condition_evaluation/skipped_deps_excluded_from_input.test.sql new file mode 100644 index 000000000..2547c6712 --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/skipped_deps_excluded_from_input.test.sql @@ -0,0 +1,107 @@ +-- Test: Skipped deps are excluded from handler input (missing key, not null) +-- Verifies that when a dependency is skipped: +-- 1. The handler receives deps_output WITHOUT the skipped dep key +-- 2. The key is missing entirely, not present with null value +-- +-- Flow: +-- step_a (conditional, skip) \ +-- -> step_c (no condition) +-- step_b (always runs) / +-- +-- When step_a is skipped, step_c should receive: {"step_b": } +-- (NOT: {"step_a": null, "step_b": }) +begin; +select plan(5); + +-- Reset database +select pgflow_tests.reset_db(); + +-- Create flow with diamond: a + b -> c +-- a has unmet condition (will be skipped) +-- b always runs +-- c depends on both +select pgflow.create_flow('skip_diamond'); +select pgflow.add_step( + 'skip_diamond', + 'step_a', + '{}', -- root step + null, null, null, null, + 'single', + '{"enabled": true}'::jsonb, -- condition: requires enabled=true + 'skip' -- plain skip +); +select pgflow.add_step( + 'skip_diamond', + 'step_b', + '{}' -- root step, no condition +); +select pgflow.add_step( + 'skip_diamond', + 'step_c', + '{step_a, step_b}', -- depends on both + null, null, null, null, + 'single' -- no condition +); + +-- Start flow with input that skips step_a +with flow as ( + select * from pgflow.start_flow('skip_diamond', '{"enabled": false}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: step_a should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'skipped', + 'step_a with unmet condition should be skipped' +); + +-- Test 2: step_b should be started +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'started', + 'step_b without condition should be started' +); + +-- Read and start step_b's task +select pgflow_tests.read_and_start('skip_diamond'); + +-- Complete step_b with some output +select pgflow.complete_task( + (select run_id from run_ids), + 'step_b', + 0, + '{"data": "from_b"}'::jsonb +); + +-- Test 3: step_c remaining_deps should be 0 (both deps resolved - a skipped, b completed) +select is( + (select remaining_deps from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_c'), + 0, + 'step_c remaining_deps should be 0 (a skipped + b completed)' +); + +-- Test 4: step_c should now be started +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_c'), + 'started', + 'step_c should be started after step_b completes' +); + +-- Test 5: step_b output should be in step_states +select is( + (select output from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + '{"data": "from_b"}'::jsonb, + 'step_b output should be stored' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/dsl/__tests__/runtime/condition-options.test.ts b/pkgs/dsl/__tests__/runtime/condition-options.test.ts new file mode 100644 index 000000000..e630444e8 --- /dev/null +++ b/pkgs/dsl/__tests__/runtime/condition-options.test.ts @@ -0,0 +1,178 @@ +import { describe, it, expect } from 'vitest'; +import { Flow } from '../../src/dsl.js'; +import { compileFlow } from '../../src/compile-flow.js'; + +describe('Condition Options', () => { + describe('DSL accepts condition and whenUnmet', () => { + it('should accept condition option on a step', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { slug: 'conditional_step', condition: { enabled: true } }, + () => 'result' + ); + + const step = flow.getStepDefinition('conditional_step'); + expect(step.options.condition).toEqual({ enabled: true }); + }); + + it('should accept whenUnmet option on a step', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { slug: 'conditional_step', whenUnmet: 'skip' }, + () => 'result' + ); + + const step = flow.getStepDefinition('conditional_step'); + expect(step.options.whenUnmet).toBe('skip'); + }); + + it('should accept both condition and whenUnmet together', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { + slug: 'conditional_step', + condition: { status: 'active' }, + whenUnmet: 'skip-cascade', + }, + () => 'result' + ); + + const step = flow.getStepDefinition('conditional_step'); + expect(step.options.condition).toEqual({ status: 'active' }); + expect(step.options.whenUnmet).toBe('skip-cascade'); + }); + + it('should accept condition on dependent steps', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step({ slug: 'first' }, () => ({ success: true })) + .step( + { + slug: 'conditional_step', + dependsOn: ['first'], + condition: { first: { success: true } }, + whenUnmet: 'skip', + }, + () => 'result' + ); + + const step = flow.getStepDefinition('conditional_step'); + expect(step.options.condition).toEqual({ first: { success: true } }); + expect(step.options.whenUnmet).toBe('skip'); + }); + }); + + describe('compileFlow includes condition parameters', () => { + it('should compile condition_pattern for root step', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { slug: 'step1', condition: { enabled: true } }, + () => 'result' + ); + + const statements = compileFlow(flow); + + expect(statements).toHaveLength(2); + expect(statements[1]).toContain("condition_pattern => '{\"enabled\":true}'"); + }); + + it('should compile when_unmet for step', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { slug: 'step1', whenUnmet: 'fail' }, + () => 'result' + ); + + const statements = compileFlow(flow); + + expect(statements).toHaveLength(2); + expect(statements[1]).toContain("when_unmet => 'fail'"); + }); + + it('should compile both condition_pattern and when_unmet together', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { + slug: 'step1', + condition: { active: true, type: 'premium' }, + whenUnmet: 'skip-cascade', + }, + () => 'result' + ); + + const statements = compileFlow(flow); + + expect(statements).toHaveLength(2); + expect(statements[1]).toContain("condition_pattern => '{\"active\":true,\"type\":\"premium\"}'"); + expect(statements[1]).toContain("when_unmet => 'skip-cascade'"); + }); + + it('should compile step with all options including condition', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { + slug: 'step1', + maxAttempts: 3, + timeout: 60, + condition: { enabled: true }, + whenUnmet: 'skip', + }, + () => 'result' + ); + + const statements = compileFlow(flow); + + expect(statements).toHaveLength(2); + expect(statements[1]).toContain('max_attempts => 3'); + expect(statements[1]).toContain('timeout => 60'); + expect(statements[1]).toContain("condition_pattern => '{\"enabled\":true}'"); + expect(statements[1]).toContain("when_unmet => 'skip'"); + }); + + it('should compile dependent step with condition checking deps output', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step({ slug: 'first' }, () => ({ success: true })) + .step( + { + slug: 'second', + dependsOn: ['first'], + condition: { first: { success: true } }, + whenUnmet: 'skip', + }, + () => 'result' + ); + + const statements = compileFlow(flow); + + expect(statements).toHaveLength(3); + expect(statements[2]).toContain("ARRAY['first']"); + expect(statements[2]).toContain("condition_pattern => '{\"first\":{\"success\":true}}'"); + expect(statements[2]).toContain("when_unmet => 'skip'"); + }); + }); + + describe('whenUnmet validation', () => { + it('should only accept valid whenUnmet values', () => { + // Valid values should work + expect(() => + new Flow({ slug: 'test' }).step( + { slug: 's1', whenUnmet: 'fail' }, + () => 1 + ) + ).not.toThrow(); + + expect(() => + new Flow({ slug: 'test' }).step( + { slug: 's1', whenUnmet: 'skip' }, + () => 1 + ) + ).not.toThrow(); + + expect(() => + new Flow({ slug: 'test' }).step( + { slug: 's1', whenUnmet: 'skip-cascade' }, + () => 1 + ) + ).not.toThrow(); + }); + }); +}); diff --git a/pkgs/dsl/src/compile-flow.ts b/pkgs/dsl/src/compile-flow.ts index bbf169fe1..0e6435b7e 100644 --- a/pkgs/dsl/src/compile-flow.ts +++ b/pkgs/dsl/src/compile-flow.ts @@ -62,5 +62,15 @@ function formatRuntimeOptions(options: RuntimeOptions | StepRuntimeOptions): str parts.push(`start_delay => ${options.startDelay}`); } + if ('condition' in options && options.condition !== undefined) { + // Serialize JSON pattern and escape for SQL + const jsonStr = JSON.stringify(options.condition); + parts.push(`condition_pattern => '${jsonStr}'`); + } + + if ('whenUnmet' in options && options.whenUnmet !== undefined) { + parts.push(`when_unmet => '${options.whenUnmet}'`); + } + return parts.length > 0 ? `, ${parts.join(', ')}` : ''; } diff --git a/pkgs/dsl/src/dsl.ts b/pkgs/dsl/src/dsl.ts index 13d05d885..3f8102e09 100644 --- a/pkgs/dsl/src/dsl.ts +++ b/pkgs/dsl/src/dsl.ts @@ -317,9 +317,14 @@ export interface FlowContext = Record, TEnv extends Env = Env> = FlowContext & T; +// Valid values for whenUnmet option +export type WhenUnmetMode = 'fail' | 'skip' | 'skip-cascade'; + // Step runtime options interface that extends flow options with step-specific options export interface StepRuntimeOptions extends RuntimeOptions { startDelay?: number; + condition?: Json; // JSON pattern for @> containment check + whenUnmet?: WhenUnmetMode; // What to do when condition not met } // Define the StepDefinition interface with integrated options @@ -477,6 +482,8 @@ export class Flow< if (opts.baseDelay !== undefined) options.baseDelay = opts.baseDelay; if (opts.timeout !== undefined) options.timeout = opts.timeout; if (opts.startDelay !== undefined) options.startDelay = opts.startDelay; + if (opts.condition !== undefined) options.condition = opts.condition; + if (opts.whenUnmet !== undefined) options.whenUnmet = opts.whenUnmet; // Validate runtime options (optional for step level) validateRuntimeOptions(options, { optional: true }); @@ -640,6 +647,8 @@ export class Flow< if (opts.baseDelay !== undefined) options.baseDelay = opts.baseDelay; if (opts.timeout !== undefined) options.timeout = opts.timeout; if (opts.startDelay !== undefined) options.startDelay = opts.startDelay; + if (opts.condition !== undefined) options.condition = opts.condition; + if (opts.whenUnmet !== undefined) options.whenUnmet = opts.whenUnmet; // Validate runtime options validateRuntimeOptions(options, { optional: true });