Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/add-when-failed-option.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@pgflow/core': patch
'@pgflow/dsl': patch
---

Add whenFailed option for error handling after retries exhausted (fail, skip, skip-cascade)
90 changes: 82 additions & 8 deletions pkgs/core/schemas/0100_function_fail_task.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ as $$
DECLARE
v_run_failed boolean;
v_step_failed boolean;
v_step_skipped boolean;
v_when_failed text;
v_task_exhausted boolean; -- True if task has exhausted retries
begin

-- If run is already failed, no retries allowed
Expand Down Expand Up @@ -62,7 +65,8 @@ flow_info AS (
config AS (
SELECT
COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts,
COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay
COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay,
s.when_failed
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN flow_info fi ON fi.flow_slug = s.flow_slug
Expand Down Expand Up @@ -90,27 +94,53 @@ fail_or_retry_task as (
AND task.status = 'started'
RETURNING *
),
-- Determine if task exhausted retries and get when_failed mode
task_status AS (
SELECT
(select status from fail_or_retry_task) AS new_task_status,
(select when_failed from config) AS when_failed_mode,
-- Task is exhausted when it's failed (no more retries)
((select status from fail_or_retry_task) = 'failed') AS is_exhausted
),
maybe_fail_step AS (
UPDATE pgflow.step_states
SET
-- Status logic:
-- - If task not exhausted (retrying): keep current status
-- - If exhausted AND when_failed='fail': set to 'failed'
-- - If exhausted AND when_failed IN ('skip', 'skip-cascade'): set to 'skipped'
status = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed'
ELSE pgflow.step_states.status
WHEN NOT (select is_exhausted from task_status) THEN pgflow.step_states.status
WHEN (select when_failed_mode from task_status) = 'fail' THEN 'failed'
ELSE 'skipped' -- skip or skip-cascade
END,
failed_at = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now()
WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) = 'fail' THEN now()
ELSE NULL
END,
error_message = CASE
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN fail_task.error_message
WHEN (select is_exhausted from task_status) THEN fail_task.error_message
ELSE NULL
END
END,
skip_reason = CASE
WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN 'handler_failed'
ELSE pgflow.step_states.skip_reason
END,
skipped_at = CASE
WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN now()
ELSE pgflow.step_states.skipped_at
END,
-- Clear remaining_tasks when skipping (required by remaining_tasks_state_consistency constraint)
remaining_tasks = CASE
WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN NULL
ELSE pgflow.step_states.remaining_tasks
END
FROM fail_or_retry_task
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
)
-- Update run status
-- Update run status: only fail when when_failed='fail' and step was failed
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
Expand All @@ -119,10 +149,27 @@ SET status = CASE
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END
END,
-- Decrement remaining_steps when step was skipped (not failed, run continues)
remaining_steps = CASE
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
ELSE pgflow.runs.remaining_steps
END
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING (status = 'failed') INTO v_run_failed;

-- Capture when_failed mode and check if step was skipped for later processing
SELECT s.when_failed INTO v_when_failed
FROM pgflow.steps s
JOIN pgflow.runs r ON r.flow_slug = s.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug;

SELECT (status = 'skipped') INTO v_step_skipped
FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug;

-- Check if step failed by querying the step_states table
SELECT (status = 'failed') INTO v_step_failed
FROM pgflow.step_states
Expand All @@ -146,6 +193,33 @@ IF v_step_failed THEN
);
END IF;

-- Handle step skipping (when_failed = 'skip' or 'skip-cascade')
IF v_step_skipped THEN
-- Send broadcast event for step skipped
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:skipped',
'run_id', fail_task.run_id,
'step_slug', fail_task.step_slug,
'status', 'skipped',
'skip_reason', 'handler_failed',
'error_message', fail_task.error_message,
'skipped_at', now()
),
concat('step:', fail_task.step_slug, ':skipped'),
concat('pgflow:run:', fail_task.run_id),
false
);

-- For skip-cascade: cascade skip to all downstream dependents
IF v_when_failed = 'skip-cascade' THEN
PERFORM pgflow._cascade_force_skip_steps(fail_task.run_id, fail_task.step_slug, 'handler_failed');
END IF;

-- Try to complete the run (remaining_steps may now be 0)
PERFORM pgflow.maybe_complete_run(fail_task.run_id);
END IF;

-- Send broadcast event for run failure if the run was failed
IF v_run_failed THEN
DECLARE
Expand Down
Loading
Loading