diff --git a/.changeset/add-when-failed-option.md b/.changeset/add-when-failed-option.md new file mode 100644 index 000000000..254800c84 --- /dev/null +++ b/.changeset/add-when-failed-option.md @@ -0,0 +1,6 @@ +--- +'@pgflow/core': patch +'@pgflow/dsl': patch +--- + +Add whenFailed option for error handling after retries exhausted (fail, skip, skip-cascade) diff --git a/pkgs/core/schemas/0100_function_fail_task.sql b/pkgs/core/schemas/0100_function_fail_task.sql index cab0cd846..dc2aeec90 100644 --- a/pkgs/core/schemas/0100_function_fail_task.sql +++ b/pkgs/core/schemas/0100_function_fail_task.sql @@ -12,6 +12,9 @@ as $$ DECLARE v_run_failed boolean; v_step_failed boolean; + v_step_skipped boolean; + v_when_failed text; + v_task_exhausted boolean; -- True if task has exhausted retries begin -- If run is already failed, no retries allowed @@ -62,7 +65,8 @@ flow_info AS ( config AS ( SELECT COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts, - COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay + COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay, + s.when_failed FROM pgflow.steps s JOIN pgflow.flows f ON f.flow_slug = s.flow_slug JOIN flow_info fi ON fi.flow_slug = s.flow_slug @@ -90,27 +94,53 @@ fail_or_retry_task as ( AND task.status = 'started' RETURNING * ), +-- Determine if task exhausted retries and get when_failed mode +task_status AS ( + SELECT + (select status from fail_or_retry_task) AS new_task_status, + (select when_failed from config) AS when_failed_mode, + -- Task is exhausted when it's failed (no more retries) + ((select status from fail_or_retry_task) = 'failed') AS is_exhausted +), maybe_fail_step AS ( UPDATE pgflow.step_states SET + -- Status logic: + -- - If task not exhausted (retrying): keep current status + -- - If exhausted AND when_failed='fail': set to 'failed' + -- - If exhausted AND when_failed IN ('skip', 'skip-cascade'): set to 'skipped' status = CASE - WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed' - ELSE pgflow.step_states.status + WHEN NOT (select is_exhausted from task_status) THEN pgflow.step_states.status + WHEN (select when_failed_mode from task_status) = 'fail' THEN 'failed' + ELSE 'skipped' -- skip or skip-cascade END, failed_at = CASE - WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now() + WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) = 'fail' THEN now() ELSE NULL END, error_message = CASE - WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN fail_task.error_message + WHEN (select is_exhausted from task_status) THEN fail_task.error_message ELSE NULL - END + END, + skip_reason = CASE + WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN 'handler_failed' + ELSE pgflow.step_states.skip_reason + END, + skipped_at = CASE + WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN now() + ELSE pgflow.step_states.skipped_at + END, + -- Clear remaining_tasks when skipping (required by remaining_tasks_state_consistency constraint) + remaining_tasks = CASE + WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN NULL + ELSE pgflow.step_states.remaining_tasks + END FROM fail_or_retry_task WHERE pgflow.step_states.run_id = fail_task.run_id AND pgflow.step_states.step_slug = fail_task.step_slug RETURNING pgflow.step_states.* ) --- Update run status +-- Update run status: only fail when when_failed='fail' and step was failed UPDATE pgflow.runs SET status = CASE WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' @@ -119,10 +149,27 @@ SET status = CASE failed_at = CASE WHEN (select status from maybe_fail_step) = 'failed' THEN now() ELSE NULL - END + END, + -- Decrement remaining_steps when step was skipped (not failed, run continues) + remaining_steps = CASE + WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1 + ELSE pgflow.runs.remaining_steps + END WHERE pgflow.runs.run_id = fail_task.run_id RETURNING (status = 'failed') INTO v_run_failed; +-- Capture when_failed mode and check if step was skipped for later processing +SELECT s.when_failed INTO v_when_failed +FROM pgflow.steps s +JOIN pgflow.runs r ON r.flow_slug = s.flow_slug +WHERE r.run_id = fail_task.run_id + AND s.step_slug = fail_task.step_slug; + +SELECT (status = 'skipped') INTO v_step_skipped +FROM pgflow.step_states +WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug; + -- Check if step failed by querying the step_states table SELECT (status = 'failed') INTO v_step_failed FROM pgflow.step_states @@ -146,6 +193,33 @@ IF v_step_failed THEN ); END IF; +-- Handle step skipping (when_failed = 'skip' or 'skip-cascade') +IF v_step_skipped THEN + -- Send broadcast event for step skipped + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:skipped', + 'run_id', fail_task.run_id, + 'step_slug', fail_task.step_slug, + 'status', 'skipped', + 'skip_reason', 'handler_failed', + 'error_message', fail_task.error_message, + 'skipped_at', now() + ), + concat('step:', fail_task.step_slug, ':skipped'), + concat('pgflow:run:', fail_task.run_id), + false + ); + + -- For skip-cascade: cascade skip to all downstream dependents + IF v_when_failed = 'skip-cascade' THEN + PERFORM pgflow._cascade_force_skip_steps(fail_task.run_id, fail_task.step_slug, 'handler_failed'); + END IF; + + -- Try to complete the run (remaining_steps may now be 0) + PERFORM pgflow.maybe_complete_run(fail_task.run_id); +END IF; + -- Send broadcast event for run failure if the run was failed IF v_run_failed THEN DECLARE diff --git a/pkgs/core/supabase/migrations/20260105203847_pgflow_rename_cascade_skip_steps.sql b/pkgs/core/supabase/migrations/20260105203847_pgflow_rename_cascade_skip_steps.sql deleted file mode 100644 index 054e39593..000000000 --- a/pkgs/core/supabase/migrations/20260105203847_pgflow_rename_cascade_skip_steps.sql +++ /dev/null @@ -1,348 +0,0 @@ --- Create "_cascade_force_skip_steps" function -CREATE FUNCTION "pgflow"."_cascade_force_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$ -DECLARE - v_flow_slug text; - v_total_skipped int := 0; -BEGIN - -- Get flow_slug for this run - SELECT r.flow_slug INTO v_flow_slug - FROM pgflow.runs r - WHERE r.run_id = _cascade_force_skip_steps.run_id; - - IF v_flow_slug IS NULL THEN - RAISE EXCEPTION 'Run not found: %', _cascade_force_skip_steps.run_id; - END IF; - - -- ========================================== - -- SKIP STEPS IN TOPOLOGICAL ORDER - -- ========================================== - -- Use recursive CTE to find all downstream dependents, - -- then skip them in topological order (by step_index) - WITH RECURSIVE - -- ---------- Find all downstream steps ---------- - downstream_steps AS ( - -- Base case: the trigger step - SELECT - s.flow_slug, - s.step_slug, - s.step_index, - _cascade_force_skip_steps.skip_reason AS reason -- Original reason for trigger step - FROM pgflow.steps s - WHERE s.flow_slug = v_flow_slug - AND s.step_slug = _cascade_force_skip_steps.step_slug - - UNION ALL - - -- Recursive case: steps that depend on already-found steps - SELECT - s.flow_slug, - s.step_slug, - s.step_index, - 'dependency_skipped'::text AS reason -- Downstream steps get this reason - FROM pgflow.steps s - JOIN pgflow.deps d ON d.flow_slug = s.flow_slug AND d.step_slug = s.step_slug - JOIN downstream_steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.dep_slug - ), - -- ---------- Deduplicate and order by step_index ---------- - steps_to_skip AS ( - SELECT DISTINCT ON (ds.step_slug) - ds.flow_slug, - ds.step_slug, - ds.step_index, - ds.reason - FROM downstream_steps ds - ORDER BY ds.step_slug, ds.step_index -- Keep first occurrence (trigger step has original reason) - ), - -- ---------- Skip the steps ---------- - skipped AS ( - UPDATE pgflow.step_states ss - SET status = 'skipped', - skip_reason = sts.reason, - skipped_at = now(), - remaining_tasks = NULL -- Clear remaining_tasks for skipped steps - FROM steps_to_skip sts - WHERE ss.run_id = _cascade_force_skip_steps.run_id - AND ss.step_slug = sts.step_slug - AND ss.status IN ('created', 'started') -- Only skip non-terminal steps - RETURNING - ss.*, - -- Broadcast step:skipped event - realtime.send( - jsonb_build_object( - 'event_type', 'step:skipped', - 'run_id', ss.run_id, - 'flow_slug', ss.flow_slug, - 'step_slug', ss.step_slug, - 'status', 'skipped', - 'skip_reason', ss.skip_reason, - 'skipped_at', ss.skipped_at - ), - concat('step:', ss.step_slug, ':skipped'), - concat('pgflow:run:', ss.run_id), - false - ) as _broadcast_result - ), - -- ---------- Update run counters ---------- - run_updates AS ( - UPDATE pgflow.runs r - SET remaining_steps = r.remaining_steps - skipped_count.count - FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count - WHERE r.run_id = _cascade_force_skip_steps.run_id - AND skipped_count.count > 0 - ) - SELECT COUNT(*) INTO v_total_skipped FROM skipped; - - RETURN v_total_skipped; -END; -$$; --- Modify "cascade_resolve_conditions" function -CREATE OR REPLACE FUNCTION "pgflow"."cascade_resolve_conditions" ("run_id" uuid) RETURNS boolean LANGUAGE plpgsql SET "search_path" = '' AS $$ -DECLARE - v_run_input jsonb; - v_run_status text; - v_first_fail record; - v_iteration_count int := 0; - v_max_iterations int := 50; - v_processed_count int; -BEGIN - -- ========================================== - -- GUARD: Early return if run is already terminal - -- ========================================== - SELECT r.status, r.input INTO v_run_status, v_run_input - FROM pgflow.runs r - WHERE r.run_id = cascade_resolve_conditions.run_id; - - IF v_run_status IN ('failed', 'completed') THEN - RETURN v_run_status != 'failed'; - END IF; - - -- ========================================== - -- ITERATE UNTIL CONVERGENCE - -- ========================================== - -- After skipping steps, dependents may become ready and need evaluation. - -- Loop until no more steps are processed. - LOOP - v_iteration_count := v_iteration_count + 1; - IF v_iteration_count > v_max_iterations THEN - RAISE EXCEPTION 'cascade_resolve_conditions exceeded safety limit of % iterations', v_max_iterations; - END IF; - - v_processed_count := 0; - - -- ========================================== - -- PHASE 1a: CHECK FOR FAIL CONDITIONS - -- ========================================== - -- Find first step (by topological order) with unmet condition and 'fail' mode. - WITH steps_with_conditions AS ( - SELECT - step_state.flow_slug, - step_state.step_slug, - step.condition_pattern, - step.when_unmet, - step.deps_count, - step.step_index - FROM pgflow.step_states AS step_state - JOIN pgflow.steps AS step - ON step.flow_slug = step_state.flow_slug - AND step.step_slug = step_state.step_slug - WHERE step_state.run_id = cascade_resolve_conditions.run_id - AND step_state.status = 'created' - AND step_state.remaining_deps = 0 - AND step.condition_pattern IS NOT NULL - ), - step_deps_output AS ( - SELECT - swc.step_slug, - jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output - FROM steps_with_conditions swc - JOIN pgflow.deps dep ON dep.flow_slug = swc.flow_slug AND dep.step_slug = swc.step_slug - JOIN pgflow.step_states dep_state - ON dep_state.run_id = cascade_resolve_conditions.run_id - AND dep_state.step_slug = dep.dep_slug - AND dep_state.status = 'completed' -- Only completed deps (not skipped) - WHERE swc.deps_count > 0 - GROUP BY swc.step_slug - ), - condition_evaluations AS ( - SELECT - swc.*, - CASE - WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern - ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern - END AS condition_met - FROM steps_with_conditions swc - LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug - ) - SELECT flow_slug, step_slug, condition_pattern - INTO v_first_fail - FROM condition_evaluations - WHERE NOT condition_met AND when_unmet = 'fail' - ORDER BY step_index - LIMIT 1; - - -- Handle fail mode: fail step and run, return false - IF v_first_fail IS NOT NULL THEN - UPDATE pgflow.step_states - SET status = 'failed', - failed_at = now(), - error_message = 'Condition not met: ' || v_first_fail.condition_pattern::text - WHERE pgflow.step_states.run_id = cascade_resolve_conditions.run_id - AND pgflow.step_states.step_slug = v_first_fail.step_slug; - - UPDATE pgflow.runs - SET status = 'failed', - failed_at = now() - WHERE pgflow.runs.run_id = cascade_resolve_conditions.run_id; - - RETURN false; - END IF; - - -- ========================================== - -- PHASE 1b: HANDLE SKIP CONDITIONS (with propagation) - -- ========================================== - -- Skip steps with unmet conditions and whenUnmet='skip'. - -- NEW: Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents. - WITH steps_with_conditions AS ( - SELECT - step_state.flow_slug, - step_state.step_slug, - step.condition_pattern, - step.when_unmet, - step.deps_count, - step.step_index - FROM pgflow.step_states AS step_state - JOIN pgflow.steps AS step - ON step.flow_slug = step_state.flow_slug - AND step.step_slug = step_state.step_slug - WHERE step_state.run_id = cascade_resolve_conditions.run_id - AND step_state.status = 'created' - AND step_state.remaining_deps = 0 - AND step.condition_pattern IS NOT NULL - ), - step_deps_output AS ( - SELECT - swc.step_slug, - jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output - FROM steps_with_conditions swc - JOIN pgflow.deps dep ON dep.flow_slug = swc.flow_slug AND dep.step_slug = swc.step_slug - JOIN pgflow.step_states dep_state - ON dep_state.run_id = cascade_resolve_conditions.run_id - AND dep_state.step_slug = dep.dep_slug - AND dep_state.status = 'completed' -- Only completed deps (not skipped) - WHERE swc.deps_count > 0 - GROUP BY swc.step_slug - ), - condition_evaluations AS ( - SELECT - swc.*, - CASE - WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern - ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern - END AS condition_met - FROM steps_with_conditions swc - LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug - ), - unmet_skip_steps AS ( - SELECT * FROM condition_evaluations - WHERE NOT condition_met AND when_unmet = 'skip' - ), - skipped_steps AS ( - UPDATE pgflow.step_states ss - SET status = 'skipped', - skip_reason = 'condition_unmet', - skipped_at = now() - FROM unmet_skip_steps uss - WHERE ss.run_id = cascade_resolve_conditions.run_id - AND ss.step_slug = uss.step_slug - RETURNING - ss.*, - realtime.send( - jsonb_build_object( - 'event_type', 'step:skipped', - 'run_id', ss.run_id, - 'flow_slug', ss.flow_slug, - 'step_slug', ss.step_slug, - 'status', 'skipped', - 'skip_reason', 'condition_unmet', - 'skipped_at', ss.skipped_at - ), - concat('step:', ss.step_slug, ':skipped'), - concat('pgflow:run:', ss.run_id), - false - ) AS _broadcast_result - ), - -- NEW: Update dependent steps (decrement remaining_deps, set initial_tasks=0 for maps) - dependent_updates AS ( - UPDATE pgflow.step_states child_state - SET remaining_deps = child_state.remaining_deps - 1, - -- If child is a map step and this skipped step is its only dependency, - -- set initial_tasks = 0 (skipped dep = empty array) - initial_tasks = CASE - WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0 - ELSE child_state.initial_tasks - END - FROM skipped_steps parent - JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug - JOIN pgflow.steps child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug - WHERE child_state.run_id = cascade_resolve_conditions.run_id - AND child_state.step_slug = dep.step_slug - ), - run_update AS ( - UPDATE pgflow.runs r - SET remaining_steps = r.remaining_steps - (SELECT COUNT(*) FROM skipped_steps) - WHERE r.run_id = cascade_resolve_conditions.run_id - AND (SELECT COUNT(*) FROM skipped_steps) > 0 - ) - SELECT COUNT(*)::int INTO v_processed_count FROM skipped_steps; - - -- ========================================== - -- PHASE 1c: HANDLE SKIP-CASCADE CONDITIONS - -- ========================================== - -- Call _cascade_force_skip_steps for each step with unmet condition and whenUnmet='skip-cascade'. - -- Process in topological order; _cascade_force_skip_steps is idempotent. - PERFORM pgflow._cascade_force_skip_steps(cascade_resolve_conditions.run_id, ready_step.step_slug, 'condition_unmet') - FROM pgflow.step_states AS ready_step - JOIN pgflow.steps AS step - ON step.flow_slug = ready_step.flow_slug - AND step.step_slug = ready_step.step_slug - LEFT JOIN LATERAL ( - SELECT jsonb_object_agg(dep_state.step_slug, dep_state.output) AS deps_output - FROM pgflow.deps dep - JOIN pgflow.step_states dep_state - ON dep_state.run_id = cascade_resolve_conditions.run_id - AND dep_state.step_slug = dep.dep_slug - AND dep_state.status = 'completed' -- Only completed deps (not skipped) - WHERE dep.flow_slug = ready_step.flow_slug - AND dep.step_slug = ready_step.step_slug - ) AS agg_deps ON step.deps_count > 0 - WHERE ready_step.run_id = cascade_resolve_conditions.run_id - AND ready_step.status = 'created' - AND ready_step.remaining_deps = 0 - AND step.condition_pattern IS NOT NULL - AND step.when_unmet = 'skip-cascade' - AND NOT ( - CASE - WHEN step.deps_count = 0 THEN v_run_input @> step.condition_pattern - ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) @> step.condition_pattern - END - ) - ORDER BY step.step_index; - - -- Check if run was failed during cascade (e.g., if _cascade_force_skip_steps triggers fail) - SELECT r.status INTO v_run_status - FROM pgflow.runs r - WHERE r.run_id = cascade_resolve_conditions.run_id; - - IF v_run_status IN ('failed', 'completed') THEN - RETURN v_run_status != 'failed'; - END IF; - - -- Exit loop if no steps were processed in this iteration - EXIT WHEN v_processed_count = 0; - END LOOP; - - RETURN true; -END; -$$; --- Drop "cascade_skip_steps" function -DROP FUNCTION "pgflow"."cascade_skip_steps"; diff --git a/pkgs/core/supabase/migrations/20260105121138_pgflow_condition_evaluation.sql b/pkgs/core/supabase/migrations/20260105214940_pgflow_step_conditions.sql similarity index 80% rename from pkgs/core/supabase/migrations/20260105121138_pgflow_condition_evaluation.sql rename to pkgs/core/supabase/migrations/20260105214940_pgflow_step_conditions.sql index 0c3bcb0d7..b28e8044b 100644 --- a/pkgs/core/supabase/migrations/20260105121138_pgflow_condition_evaluation.sql +++ b/pkgs/core/supabase/migrations/20260105214940_pgflow_step_conditions.sql @@ -16,8 +16,8 @@ END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipp CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text); -- Modify "steps" table ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail'; --- Create "cascade_skip_steps" function -CREATE FUNCTION "pgflow"."cascade_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$ +-- Create "_cascade_force_skip_steps" function +CREATE FUNCTION "pgflow"."_cascade_force_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$ DECLARE v_flow_slug text; v_total_skipped int := 0; @@ -25,10 +25,10 @@ BEGIN -- Get flow_slug for this run SELECT r.flow_slug INTO v_flow_slug FROM pgflow.runs r - WHERE r.run_id = cascade_skip_steps.run_id; + WHERE r.run_id = _cascade_force_skip_steps.run_id; IF v_flow_slug IS NULL THEN - RAISE EXCEPTION 'Run not found: %', cascade_skip_steps.run_id; + RAISE EXCEPTION 'Run not found: %', _cascade_force_skip_steps.run_id; END IF; -- ========================================== @@ -44,10 +44,10 @@ BEGIN s.flow_slug, s.step_slug, s.step_index, - cascade_skip_steps.skip_reason AS reason -- Original reason for trigger step + _cascade_force_skip_steps.skip_reason AS reason -- Original reason for trigger step FROM pgflow.steps s WHERE s.flow_slug = v_flow_slug - AND s.step_slug = cascade_skip_steps.step_slug + AND s.step_slug = _cascade_force_skip_steps.step_slug UNION ALL @@ -79,7 +79,7 @@ BEGIN skipped_at = now(), remaining_tasks = NULL -- Clear remaining_tasks for skipped steps FROM steps_to_skip sts - WHERE ss.run_id = cascade_skip_steps.run_id + WHERE ss.run_id = _cascade_force_skip_steps.run_id AND ss.step_slug = sts.step_slug AND ss.status IN ('created', 'started') -- Only skip non-terminal steps RETURNING @@ -105,7 +105,7 @@ BEGIN UPDATE pgflow.runs r SET remaining_steps = r.remaining_steps - skipped_count.count FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count - WHERE r.run_id = cascade_skip_steps.run_id + WHERE r.run_id = _cascade_force_skip_steps.run_id AND skipped_count.count > 0 ) SELECT COUNT(*) INTO v_total_skipped FROM skipped; @@ -316,9 +316,9 @@ BEGIN -- ========================================== -- PHASE 1c: HANDLE SKIP-CASCADE CONDITIONS -- ========================================== - -- Call cascade_skip_steps for each step with unmet condition and whenUnmet='skip-cascade'. - -- Process in topological order; cascade_skip_steps is idempotent. - PERFORM pgflow.cascade_skip_steps(cascade_resolve_conditions.run_id, ready_step.step_slug, 'condition_unmet') + -- Call _cascade_force_skip_steps for each step with unmet condition and whenUnmet='skip-cascade'. + -- Process in topological order; _cascade_force_skip_steps is idempotent. + PERFORM pgflow._cascade_force_skip_steps(cascade_resolve_conditions.run_id, ready_step.step_slug, 'condition_unmet') FROM pgflow.step_states AS ready_step JOIN pgflow.steps AS step ON step.flow_slug = ready_step.flow_slug @@ -346,7 +346,7 @@ BEGIN ) ORDER BY step.step_index; - -- Check if run was failed during cascade (e.g., if cascade_skip_steps triggers fail) + -- Check if run was failed during cascade (e.g., if _cascade_force_skip_steps triggers fail) SELECT r.status INTO v_run_status FROM pgflow.runs r WHERE r.run_id = cascade_resolve_conditions.run_id; @@ -844,6 +844,302 @@ WHERE step_task.run_id = complete_task.run_id AND step_task.step_slug = complete_task.step_slug AND step_task.task_index = complete_task.task_index; +end; +$$; +-- Modify "fail_task" function +CREATE OR REPLACE FUNCTION "pgflow"."fail_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "error_message" text) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + v_run_failed boolean; + v_step_failed boolean; + v_step_skipped boolean; + v_when_failed text; + v_task_exhausted boolean; -- True if task has exhausted retries +begin + +-- If run is already failed, no retries allowed +IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id AND pgflow.runs.status = 'failed') THEN + UPDATE pgflow.step_tasks + SET status = 'failed', + failed_at = now(), + error_message = fail_task.error_message + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index + AND pgflow.step_tasks.status = 'started'; + + -- Archive the task's message + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; + + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index; + RETURN; +END IF; + +WITH run_lock AS ( + SELECT * FROM pgflow.runs + WHERE pgflow.runs.run_id = fail_task.run_id + FOR UPDATE +), +step_lock AS ( + SELECT * FROM pgflow.step_states + WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug + FOR UPDATE +), +flow_info AS ( + SELECT r.flow_slug + FROM pgflow.runs r + WHERE r.run_id = fail_task.run_id +), +config AS ( + SELECT + COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts, + COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay, + s.when_failed + FROM pgflow.steps s + JOIN pgflow.flows f ON f.flow_slug = s.flow_slug + JOIN flow_info fi ON fi.flow_slug = s.flow_slug + WHERE s.flow_slug = fi.flow_slug AND s.step_slug = fail_task.step_slug +), +fail_or_retry_task as ( + UPDATE pgflow.step_tasks as task + SET + status = CASE + WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued' + ELSE 'failed' + END, + failed_at = CASE + WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now() + ELSE NULL + END, + started_at = CASE + WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN NULL + ELSE task.started_at + END, + error_message = fail_task.error_message + WHERE task.run_id = fail_task.run_id + AND task.step_slug = fail_task.step_slug + AND task.task_index = fail_task.task_index + AND task.status = 'started' + RETURNING * +), +-- Determine if task exhausted retries and get when_failed mode +task_status AS ( + SELECT + (select status from fail_or_retry_task) AS new_task_status, + (select when_failed from config) AS when_failed_mode, + -- Task is exhausted when it's failed (no more retries) + ((select status from fail_or_retry_task) = 'failed') AS is_exhausted +), +maybe_fail_step AS ( + UPDATE pgflow.step_states + SET + -- Status logic: + -- - If task not exhausted (retrying): keep current status + -- - If exhausted AND when_failed='fail': set to 'failed' + -- - If exhausted AND when_failed IN ('skip', 'skip-cascade'): set to 'skipped' + status = CASE + WHEN NOT (select is_exhausted from task_status) THEN pgflow.step_states.status + WHEN (select when_failed_mode from task_status) = 'fail' THEN 'failed' + ELSE 'skipped' -- skip or skip-cascade + END, + failed_at = CASE + WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) = 'fail' THEN now() + ELSE NULL + END, + error_message = CASE + WHEN (select is_exhausted from task_status) THEN fail_task.error_message + ELSE NULL + END, + skip_reason = CASE + WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN 'handler_failed' + ELSE pgflow.step_states.skip_reason + END, + skipped_at = CASE + WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN now() + ELSE pgflow.step_states.skipped_at + END, + -- Clear remaining_tasks when skipping (required by remaining_tasks_state_consistency constraint) + remaining_tasks = CASE + WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN NULL + ELSE pgflow.step_states.remaining_tasks + END + FROM fail_or_retry_task + WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug + RETURNING pgflow.step_states.* +) +-- Update run status: only fail when when_failed='fail' and step was failed +UPDATE pgflow.runs +SET status = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' + ELSE status + END, + failed_at = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN now() + ELSE NULL + END, + -- Decrement remaining_steps when step was skipped (not failed, run continues) + remaining_steps = CASE + WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1 + ELSE pgflow.runs.remaining_steps + END +WHERE pgflow.runs.run_id = fail_task.run_id +RETURNING (status = 'failed') INTO v_run_failed; + +-- Capture when_failed mode and check if step was skipped for later processing +SELECT s.when_failed INTO v_when_failed +FROM pgflow.steps s +JOIN pgflow.runs r ON r.flow_slug = s.flow_slug +WHERE r.run_id = fail_task.run_id + AND s.step_slug = fail_task.step_slug; + +SELECT (status = 'skipped') INTO v_step_skipped +FROM pgflow.step_states +WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug; + +-- Check if step failed by querying the step_states table +SELECT (status = 'failed') INTO v_step_failed +FROM pgflow.step_states +WHERE pgflow.step_states.run_id = fail_task.run_id + AND pgflow.step_states.step_slug = fail_task.step_slug; + +-- Send broadcast event for step failure if the step was failed +IF v_step_failed THEN + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', fail_task.run_id, + 'step_slug', fail_task.step_slug, + 'status', 'failed', + 'error_message', fail_task.error_message, + 'failed_at', now() + ), + concat('step:', fail_task.step_slug, ':failed'), + concat('pgflow:run:', fail_task.run_id), + false + ); +END IF; + +-- Handle step skipping (when_failed = 'skip' or 'skip-cascade') +IF v_step_skipped THEN + -- Send broadcast event for step skipped + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:skipped', + 'run_id', fail_task.run_id, + 'step_slug', fail_task.step_slug, + 'status', 'skipped', + 'skip_reason', 'handler_failed', + 'error_message', fail_task.error_message, + 'skipped_at', now() + ), + concat('step:', fail_task.step_slug, ':skipped'), + concat('pgflow:run:', fail_task.run_id), + false + ); + + -- For skip-cascade: cascade skip to all downstream dependents + IF v_when_failed = 'skip-cascade' THEN + PERFORM pgflow._cascade_force_skip_steps(fail_task.run_id, fail_task.step_slug, 'handler_failed'); + END IF; + + -- Try to complete the run (remaining_steps may now be 0) + PERFORM pgflow.maybe_complete_run(fail_task.run_id); +END IF; + +-- Send broadcast event for run failure if the run was failed +IF v_run_failed THEN + DECLARE + v_flow_slug text; + BEGIN + SELECT flow_slug INTO v_flow_slug FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id; + + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:failed', + 'run_id', fail_task.run_id, + 'flow_slug', v_flow_slug, + 'status', 'failed', + 'error_message', fail_task.error_message, + 'failed_at', now() + ), + 'run:failed', + concat('pgflow:run:', fail_task.run_id), + false + ); + END; +END IF; + +-- Archive all active messages (both queued and started) when run fails +IF v_run_failed THEN + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; +END IF; + +-- For queued tasks: delay the message for retry with exponential backoff +PERFORM ( + WITH retry_config AS ( + SELECT + COALESCE(s.opt_base_delay, f.opt_base_delay) AS base_delay + FROM pgflow.steps s + JOIN pgflow.flows f ON f.flow_slug = s.flow_slug + JOIN pgflow.runs r ON r.flow_slug = f.flow_slug + WHERE r.run_id = fail_task.run_id + AND s.step_slug = fail_task.step_slug + ), + queued_tasks AS ( + SELECT + r.flow_slug, + st.message_id, + pgflow.calculate_retry_delay((SELECT base_delay FROM retry_config), st.attempts_count) AS calculated_delay + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.status = 'queued' + ) + SELECT pgmq.set_vt(qt.flow_slug, qt.message_id, qt.calculated_delay) + FROM queued_tasks qt + WHERE EXISTS (SELECT 1 FROM queued_tasks) +); + +-- For failed tasks: archive the message +PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) +FROM pgflow.step_tasks st +JOIN pgflow.runs r ON st.run_id = r.run_id +WHERE st.run_id = fail_task.run_id + AND st.step_slug = fail_task.step_slug + AND st.task_index = fail_task.task_index + AND st.status = 'failed' + AND st.message_id IS NOT NULL +GROUP BY r.flow_slug +HAVING COUNT(st.message_id) > 0; + +return query select * +from pgflow.step_tasks st +where st.run_id = fail_task.run_id + and st.step_slug = fail_task.step_slug + and st.task_index = fail_task.task_index; + end; $$; -- Modify "start_flow" function diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index bf02c11e5..ebb8f266a 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:U+A8OjhnUoZjydIbgkI6blxH+7ZJslRQeQEn5tdID4k= +h1:YiBO80ZA6oQ84E10ZabIvo3OS/XglHkEmBn1Rp5Iay4= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -16,5 +16,4 @@ h1:U+A8OjhnUoZjydIbgkI6blxH+7ZJslRQeQEn5tdID4k= 20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ= 20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc= 20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o= -20260105121138_pgflow_condition_evaluation.sql h1:OR8KpHBKBB87VBhnZB+NkW8Rs7pGBPuPL8GF8FcM+oU= -20260105203847_pgflow_rename_cascade_skip_steps.sql h1:chw/UFU1F7vGcWKkqfApPfLt+k48fKBnrQO31kT/zKg= +20260105214940_pgflow_step_conditions.sql h1:DIta8qrr+qRvA9aFCdWefk72qp27mcPvGGlAJswmitw= diff --git a/pkgs/core/supabase/tests/fail_task_when_failed/type_violation_always_hard_fails.test.sql b/pkgs/core/supabase/tests/fail_task_when_failed/type_violation_always_hard_fails.test.sql new file mode 100644 index 000000000..6d5e642a4 --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_failed/type_violation_always_hard_fails.test.sql @@ -0,0 +1,57 @@ +-- Test: TYPE_VIOLATION in complete_task always hard fails regardless of when_failed +-- TYPE_VIOLATION is a programming error (wrong return type), not a runtime condition +-- It should always cause the run to fail, even with when_failed='skip' or 'skip-cascade' +begin; +select plan(4); +select pgflow_tests.reset_db(); + +-- SETUP: Create a flow where step_a feeds into a map step +-- step_a has when_failed='skip-cascade' but TYPE_VIOLATION should override this +select pgflow.create_flow('test_flow'); +select pgflow.add_step('test_flow', 'step_a', when_failed => 'skip-cascade'); +select pgflow.add_step('test_flow', 'step_b', ARRAY['step_a'], step_type => 'map'); + +-- Start flow +select pgflow.start_flow('test_flow', '{}'::jsonb); + +-- Poll for step_a's task and complete it with non-array output (causes TYPE_VIOLATION) +with task as ( + select * from pgflow_tests.read_and_start('test_flow', 1, 1) +) +select pgflow.complete_task( + (select run_id from task), + (select step_slug from task), + 0, + '"not_an_array"'::jsonb -- String instead of array - TYPE_VIOLATION +); + +-- TEST 1: step_a should be marked as failed (not skipped) +select is( + (select status from pgflow.step_states where flow_slug = 'test_flow' and step_slug = 'step_a'), + 'failed', + 'step_a should be failed on TYPE_VIOLATION (not skipped despite when_failed=skip-cascade)' +); + +-- TEST 2: error_message should contain TYPE_VIOLATION +select ok( + (select error_message from pgflow.step_states + where flow_slug = 'test_flow' and step_slug = 'step_a') LIKE '%TYPE_VIOLATION%', + 'Error message should indicate TYPE_VIOLATION' +); + +-- TEST 3: Run should be failed +select is( + (select status from pgflow.runs where flow_slug = 'test_flow'), + 'failed', + 'Run should be failed on TYPE_VIOLATION regardless of when_failed setting' +); + +-- TEST 4: step_b should NOT be skipped (run failed before cascade could happen) +select isnt( + (select status from pgflow.step_states where flow_slug = 'test_flow' and step_slug = 'step_b'), + 'skipped', + 'step_b should not be skipped - run failed before any cascade' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/fail_task_when_failed/when_failed_fail_marks_run_failed.test.sql b/pkgs/core/supabase/tests/fail_task_when_failed/when_failed_fail_marks_run_failed.test.sql new file mode 100644 index 000000000..057581b3b --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_failed/when_failed_fail_marks_run_failed.test.sql @@ -0,0 +1,37 @@ +-- Test: fail_task with when_failed='fail' (default) marks run as failed +-- This is the current behavior and should remain unchanged +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- SETUP: Create a flow with default when_failed='fail' (0 retries so it fails immediately) +select pgflow.create_flow('test_flow'); +select pgflow.add_step('test_flow', 'step_a', max_attempts => 0); + +-- Start flow and fail the task +select pgflow.start_flow('test_flow', '{}'::jsonb); +select pgflow_tests.poll_and_fail('test_flow'); + +-- TEST 1: Task should be failed +select is( + (select status from pgflow.step_tasks where flow_slug = 'test_flow' and step_slug = 'step_a'), + 'failed', + 'Task should be marked as failed' +); + +-- TEST 2: Step should be failed +select is( + (select status from pgflow.step_states where flow_slug = 'test_flow' and step_slug = 'step_a'), + 'failed', + 'Step should be marked as failed' +); + +-- TEST 3: Run should be failed +select is( + (select status from pgflow.runs where flow_slug = 'test_flow'), + 'failed', + 'Run should be marked as failed when when_failed=fail (default)' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/fail_task_when_failed/when_failed_skip_cascade_skips_dependents.test.sql b/pkgs/core/supabase/tests/fail_task_when_failed/when_failed_skip_cascade_skips_dependents.test.sql new file mode 100644 index 000000000..ea0ecef9a --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_failed/when_failed_skip_cascade_skips_dependents.test.sql @@ -0,0 +1,68 @@ +-- Test: fail_task with when_failed='skip-cascade' skips step and cascades to dependents +begin; +select plan(7); +select pgflow_tests.reset_db(); + +-- SETUP: Create a flow with when_failed='skip-cascade' +-- step_a (will fail) -> step_b (depends on a) -> step_c (depends on b) +select pgflow.create_flow('test_flow'); +select pgflow.add_step('test_flow', 'step_a', max_attempts => 0, when_failed => 'skip-cascade'); +select pgflow.add_step('test_flow', 'step_b', ARRAY['step_a']); +select pgflow.add_step('test_flow', 'step_c', ARRAY['step_b']); +select pgflow.add_step('test_flow', 'step_d'); -- Independent step to verify run continues + +-- Start flow and fail step_a's task +select pgflow.start_flow('test_flow', '{}'::jsonb); +select pgflow_tests.poll_and_fail('test_flow'); + +-- TEST 1: step_a should be skipped (not failed) +select is( + (select status from pgflow.step_states where flow_slug = 'test_flow' and step_slug = 'step_a'), + 'skipped', + 'step_a should be marked as skipped when when_failed=skip-cascade' +); + +-- TEST 2: step_a skip reason should be handler_failed +select is( + (select skip_reason from pgflow.step_states where flow_slug = 'test_flow' and step_slug = 'step_a'), + 'handler_failed', + 'step_a skip reason should be handler_failed' +); + +-- TEST 3: step_b (dependent) should be skipped via cascade +select is( + (select status from pgflow.step_states where flow_slug = 'test_flow' and step_slug = 'step_b'), + 'skipped', + 'step_b should be cascaded-skipped' +); + +-- TEST 4: step_b skip reason should indicate dependency skipped +select is( + (select skip_reason from pgflow.step_states where flow_slug = 'test_flow' and step_slug = 'step_b'), + 'dependency_skipped', + 'step_b skip reason should be dependency_skipped' +); + +-- TEST 5: step_c (transitive dependent) should also be skipped +select is( + (select status from pgflow.step_states where flow_slug = 'test_flow' and step_slug = 'step_c'), + 'skipped', + 'step_c should be transitively cascade-skipped' +); + +-- TEST 6: step_d (independent) should remain started (not affected) +select is( + (select status from pgflow.step_states where flow_slug = 'test_flow' and step_slug = 'step_d'), + 'started', + 'step_d (independent step) should remain started' +); + +-- TEST 7: Run should NOT be failed (continues running) +select isnt( + (select status from pgflow.runs where flow_slug = 'test_flow'), + 'failed', + 'Run should NOT be marked as failed when when_failed=skip-cascade' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/fail_task_when_failed/when_failed_skip_skips_step.test.sql b/pkgs/core/supabase/tests/fail_task_when_failed/when_failed_skip_skips_step.test.sql new file mode 100644 index 000000000..08d029cf1 --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_failed/when_failed_skip_skips_step.test.sql @@ -0,0 +1,51 @@ +-- Test: fail_task with when_failed='skip' skips the step and continues run +begin; +select plan(5); +select pgflow_tests.reset_db(); + +-- SETUP: Create a flow with when_failed='skip' (0 retries so it fails immediately) +select pgflow.create_flow('test_flow'); +select pgflow.add_step('test_flow', 'step_a', max_attempts => 0, when_failed => 'skip'); +select pgflow.add_step('test_flow', 'step_b'); -- Independent step to verify run continues + +-- Start flow and fail step_a's task +select pgflow.start_flow('test_flow', '{}'::jsonb); +select pgflow_tests.poll_and_fail('test_flow'); + +-- TEST 1: Task should be failed (it still failed, but skip mode affects step/run) +select is( + (select status from pgflow.step_tasks where flow_slug = 'test_flow' and step_slug = 'step_a'), + 'failed', + 'Task should be marked as failed' +); + +-- TEST 2: Step should be skipped (not failed) +select is( + (select status from pgflow.step_states where flow_slug = 'test_flow' and step_slug = 'step_a'), + 'skipped', + 'Step should be marked as skipped when when_failed=skip' +); + +-- TEST 3: Skip reason should indicate handler failure +select is( + (select skip_reason from pgflow.step_states where flow_slug = 'test_flow' and step_slug = 'step_a'), + 'handler_failed', + 'Skip reason should be handler_failed' +); + +-- TEST 4: Run should NOT be failed (continues running) +select isnt( + (select status from pgflow.runs where flow_slug = 'test_flow'), + 'failed', + 'Run should NOT be marked as failed when when_failed=skip' +); + +-- TEST 5: Error message should be preserved in step_states +select is( + (select error_message from pgflow.step_states where flow_slug = 'test_flow' and step_slug = 'step_a'), + 'step_a FAILED', + 'Error message should be preserved on skipped step' +); + +select finish(); +rollback; diff --git a/pkgs/dsl/__tests__/runtime/when-failed-options.test.ts b/pkgs/dsl/__tests__/runtime/when-failed-options.test.ts new file mode 100644 index 000000000..b1b77044f --- /dev/null +++ b/pkgs/dsl/__tests__/runtime/when-failed-options.test.ts @@ -0,0 +1,196 @@ +import { describe, it, expect } from 'vitest'; +import { Flow } from '../../src/dsl.js'; +import { compileFlow } from '../../src/compile-flow.js'; + +describe('whenFailed Options', () => { + describe('DSL accepts whenFailed option', () => { + it('should accept whenFailed option on a step', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { slug: 'step1', whenFailed: 'skip' }, + () => 'result' + ); + + const step = flow.getStepDefinition('step1'); + expect(step.options.whenFailed).toBe('skip'); + }); + + it('should accept whenFailed: fail (default behavior)', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { slug: 'step1', whenFailed: 'fail' }, + () => 'result' + ); + + const step = flow.getStepDefinition('step1'); + expect(step.options.whenFailed).toBe('fail'); + }); + + it('should accept whenFailed: skip-cascade', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { slug: 'step1', whenFailed: 'skip-cascade' }, + () => 'result' + ); + + const step = flow.getStepDefinition('step1'); + expect(step.options.whenFailed).toBe('skip-cascade'); + }); + + it('should accept whenFailed on dependent steps', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step({ slug: 'first' }, () => ({ data: 'test' })) + .step( + { + slug: 'second', + dependsOn: ['first'], + whenFailed: 'skip', + }, + () => 'result' + ); + + const step = flow.getStepDefinition('second'); + expect(step.options.whenFailed).toBe('skip'); + }); + + it('should accept whenFailed together with other options', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { + slug: 'step1', + maxAttempts: 3, + timeout: 60, + whenFailed: 'skip-cascade', + }, + () => 'result' + ); + + const step = flow.getStepDefinition('step1'); + expect(step.options.maxAttempts).toBe(3); + expect(step.options.timeout).toBe(60); + expect(step.options.whenFailed).toBe('skip-cascade'); + }); + + it('should accept both whenUnmet and whenFailed together', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { + slug: 'step1', + condition: { enabled: true }, + whenUnmet: 'skip', + whenFailed: 'skip-cascade', + }, + () => 'result' + ); + + const step = flow.getStepDefinition('step1'); + expect(step.options.condition).toEqual({ enabled: true }); + expect(step.options.whenUnmet).toBe('skip'); + expect(step.options.whenFailed).toBe('skip-cascade'); + }); + }); + + describe('compileFlow includes when_failed parameter', () => { + it('should compile when_failed for step', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { slug: 'step1', whenFailed: 'skip' }, + () => 'result' + ); + + const statements = compileFlow(flow); + + expect(statements).toHaveLength(2); + expect(statements[1]).toContain("when_failed => 'skip'"); + }); + + it('should compile when_failed: fail', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { slug: 'step1', whenFailed: 'fail' }, + () => 'result' + ); + + const statements = compileFlow(flow); + + expect(statements).toHaveLength(2); + expect(statements[1]).toContain("when_failed => 'fail'"); + }); + + it('should compile when_failed: skip-cascade', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { slug: 'step1', whenFailed: 'skip-cascade' }, + () => 'result' + ); + + const statements = compileFlow(flow); + + expect(statements).toHaveLength(2); + expect(statements[1]).toContain("when_failed => 'skip-cascade'"); + }); + + it('should compile step with all options including whenFailed', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { + slug: 'step1', + maxAttempts: 3, + timeout: 60, + condition: { enabled: true }, + whenUnmet: 'skip', + whenFailed: 'skip-cascade', + }, + () => 'result' + ); + + const statements = compileFlow(flow); + + expect(statements).toHaveLength(2); + expect(statements[1]).toContain('max_attempts => 3'); + expect(statements[1]).toContain('timeout => 60'); + expect(statements[1]).toContain("condition_pattern => '{\"enabled\":true}'"); + expect(statements[1]).toContain("when_unmet => 'skip'"); + expect(statements[1]).toContain("when_failed => 'skip-cascade'"); + }); + + it('should not include when_failed when not specified', () => { + const flow = new Flow({ slug: 'test_flow' }) + .step( + { slug: 'step1' }, + () => 'result' + ); + + const statements = compileFlow(flow); + + expect(statements).toHaveLength(2); + expect(statements[1]).not.toContain('when_failed'); + }); + }); + + describe('whenFailed on map steps', () => { + it('should accept whenFailed on map step', () => { + const flow = new Flow({ slug: 'test_flow' }) + .map( + { slug: 'map_step', whenFailed: 'skip' }, + (item) => item.toUpperCase() + ); + + const step = flow.getStepDefinition('map_step'); + expect(step.options.whenFailed).toBe('skip'); + }); + + it('should compile when_failed for map step', () => { + const flow = new Flow({ slug: 'test_flow' }) + .map( + { slug: 'map_step', whenFailed: 'skip-cascade' }, + (item) => item.toUpperCase() + ); + + const statements = compileFlow(flow); + + expect(statements).toHaveLength(2); + expect(statements[1]).toContain("when_failed => 'skip-cascade'"); + }); + }); +}); diff --git a/pkgs/dsl/src/compile-flow.ts b/pkgs/dsl/src/compile-flow.ts index 0e6435b7e..f6ed91bc0 100644 --- a/pkgs/dsl/src/compile-flow.ts +++ b/pkgs/dsl/src/compile-flow.ts @@ -72,5 +72,9 @@ function formatRuntimeOptions(options: RuntimeOptions | StepRuntimeOptions): str parts.push(`when_unmet => '${options.whenUnmet}'`); } + if ('whenFailed' in options && options.whenFailed !== undefined) { + parts.push(`when_failed => '${options.whenFailed}'`); + } + return parts.length > 0 ? `, ${parts.join(', ')}` : ''; } diff --git a/pkgs/dsl/src/dsl.ts b/pkgs/dsl/src/dsl.ts index 3f8102e09..a6e0f6689 100644 --- a/pkgs/dsl/src/dsl.ts +++ b/pkgs/dsl/src/dsl.ts @@ -320,11 +320,15 @@ export type Context = Record, T // Valid values for whenUnmet option export type WhenUnmetMode = 'fail' | 'skip' | 'skip-cascade'; +// Valid values for whenFailed option (same values as whenUnmet) +export type WhenFailedMode = 'fail' | 'skip' | 'skip-cascade'; + // Step runtime options interface that extends flow options with step-specific options export interface StepRuntimeOptions extends RuntimeOptions { startDelay?: number; condition?: Json; // JSON pattern for @> containment check whenUnmet?: WhenUnmetMode; // What to do when condition not met + whenFailed?: WhenFailedMode; // What to do when handler fails after retries } // Define the StepDefinition interface with integrated options @@ -484,6 +488,7 @@ export class Flow< if (opts.startDelay !== undefined) options.startDelay = opts.startDelay; if (opts.condition !== undefined) options.condition = opts.condition; if (opts.whenUnmet !== undefined) options.whenUnmet = opts.whenUnmet; + if (opts.whenFailed !== undefined) options.whenFailed = opts.whenFailed; // Validate runtime options (optional for step level) validateRuntimeOptions(options, { optional: true }); @@ -649,6 +654,7 @@ export class Flow< if (opts.startDelay !== undefined) options.startDelay = opts.startDelay; if (opts.condition !== undefined) options.condition = opts.condition; if (opts.whenUnmet !== undefined) options.whenUnmet = opts.whenUnmet; + if (opts.whenFailed !== undefined) options.whenFailed = opts.whenFailed; // Validate runtime options validateRuntimeOptions(options, { optional: true });