Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/skip-infrastructure-schema.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@pgflow/core': patch
---

Add skip infrastructure schema for conditional execution - new columns (condition_pattern, when_unmet, when_failed, skip_reason, skipped_at), 'skipped' status, and cascade_skip_steps function
7 changes: 6 additions & 1 deletion pkgs/core/schemas/0050_tables_definitions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ create table pgflow.steps (
opt_base_delay int,
opt_timeout int,
opt_start_delay int,
condition_pattern jsonb, -- JSON pattern for @> containment check
when_unmet text not null default 'skip', -- What to do when condition not met (skip is natural default)
when_failed text not null default 'fail', -- What to do when handler fails after retries
created_at timestamptz not null default now(),
primary key (flow_slug, step_slug),
unique (flow_slug, step_index), -- Ensure step_index is unique within a flow
Expand All @@ -32,7 +35,9 @@ create table pgflow.steps (
constraint opt_max_attempts_is_nonnegative check (opt_max_attempts is null or opt_max_attempts >= 0),
constraint opt_base_delay_is_nonnegative check (opt_base_delay is null or opt_base_delay >= 0),
constraint opt_timeout_is_positive check (opt_timeout is null or opt_timeout > 0),
constraint opt_start_delay_is_nonnegative check (opt_start_delay is null or opt_start_delay >= 0)
constraint opt_start_delay_is_nonnegative check (opt_start_delay is null or opt_start_delay >= 0),
constraint when_unmet_is_valid check (when_unmet in ('fail', 'skip', 'skip-cascade')),
constraint when_failed_is_valid check (when_failed in ('fail', 'skip', 'skip-cascade'))
);

-- Dependencies table - stores relationships between steps
Expand Down
23 changes: 19 additions & 4 deletions pkgs/core/schemas/0060_tables_runtime.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,20 @@ create table pgflow.step_states (
remaining_deps int not null default 0 check (remaining_deps >= 0),
output jsonb, -- Step output: stored atomically with status=completed transition
error_message text,
skip_reason text, -- Why step was skipped: condition_unmet, handler_failed, dependency_skipped
created_at timestamptz not null default now(),
started_at timestamptz,
completed_at timestamptz,
failed_at timestamptz,
skipped_at timestamptz,
primary key (run_id, step_slug),
foreign key (flow_slug, step_slug)
references pgflow.steps (flow_slug, step_slug),
constraint status_is_valid check (status in ('created', 'started', 'completed', 'failed')),
constraint status_is_valid check (status in ('created', 'started', 'completed', 'failed', 'skipped')),
constraint status_and_remaining_tasks_match check (status != 'completed' or remaining_tasks = 0),
-- Add constraint to ensure remaining_tasks is only set when step has started
constraint remaining_tasks_state_consistency check (
remaining_tasks is null or status != 'created'
remaining_tasks is null or status not in ('created', 'skipped')
),
constraint initial_tasks_known_when_started check (
status != 'started' or initial_tasks is not null
Expand All @@ -52,16 +54,29 @@ create table pgflow.step_states (
constraint output_only_for_completed_or_null check (
output is null or status = 'completed'
),
constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)),
-- skip_reason is required for skipped status and forbidden for other statuses
constraint skip_reason_matches_status check (
(status = 'skipped' and skip_reason is not null) or
(status != 'skipped' and skip_reason is null)
),
constraint completed_at_or_failed_at_or_skipped_at check (
(
case when completed_at is not null then 1 else 0 end +
case when failed_at is not null then 1 else 0 end +
case when skipped_at is not null then 1 else 0 end
) <= 1
),
constraint started_at_is_after_created_at check (started_at is null or started_at >= created_at),
constraint completed_at_is_after_started_at check (completed_at is null or completed_at >= started_at),
constraint failed_at_is_after_started_at check (failed_at is null or failed_at >= started_at)
constraint failed_at_is_after_started_at check (failed_at is null or failed_at >= started_at),
constraint skipped_at_is_after_created_at check (skipped_at is null or skipped_at >= created_at)
);

create index if not exists idx_step_states_ready on pgflow.step_states (run_id, status, remaining_deps) where status
= 'created'
and remaining_deps = 0;
create index if not exists idx_step_states_failed on pgflow.step_states (run_id, step_slug) where status = 'failed';
create index if not exists idx_step_states_skipped on pgflow.step_states (run_id, step_slug) where status = 'skipped';
create index if not exists idx_step_states_flow_slug on pgflow.step_states (flow_slug);
create index if not exists idx_step_states_run_id on pgflow.step_states (run_id);

Expand Down
19 changes: 13 additions & 6 deletions pkgs/core/schemas/0100_function_add_step.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ create or replace function pgflow.add_step(
base_delay int default null,
timeout int default null,
start_delay int default null,
step_type text default 'single'
step_type text default 'single',
condition_pattern jsonb default null,
when_unmet text default 'skip',
when_failed text default 'fail'
)
returns pgflow.steps
language plpgsql
Expand All @@ -22,7 +25,7 @@ BEGIN
-- 0 dependencies (root map - maps over flow input array)
-- 1 dependency (dependent map - maps over dependency output array)
IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
add_step.step_slug,
COALESCE(array_length(add_step.deps_slugs, 1), 0),
array_to_string(add_step.deps_slugs, ', ');
Expand All @@ -36,18 +39,22 @@ BEGIN
-- Create the step
INSERT INTO pgflow.steps (
flow_slug, step_slug, step_type, step_index, deps_count,
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
condition_pattern, when_unmet, when_failed
)
VALUES (
add_step.flow_slug,
add_step.step_slug,
COALESCE(add_step.step_type, 'single'),
next_idx,
next_idx,
COALESCE(array_length(add_step.deps_slugs, 1), 0),
add_step.max_attempts,
add_step.base_delay,
add_step.timeout,
add_step.start_delay
add_step.start_delay,
add_step.condition_pattern,
add_step.when_unmet,
add_step.when_failed
)
ON CONFLICT ON CONSTRAINT steps_pkey
DO UPDATE SET step_slug = EXCLUDED.step_slug
Expand All @@ -59,7 +66,7 @@ BEGIN
FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug)
WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0
ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING;

RETURN result_step;
END;
$$;
105 changes: 105 additions & 0 deletions pkgs/core/schemas/0100_function_cascade_skip_steps.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
-- cascade_skip_steps: Skip a step and cascade to all downstream dependents
-- Used when a condition is unmet (whenUnmet: skip-cascade) or handler fails (whenFailed: skip-cascade)
create or replace function pgflow.cascade_skip_steps(
run_id uuid,
step_slug text,
skip_reason text
)
returns int
language plpgsql
as $$
DECLARE
v_flow_slug text;
v_total_skipped int := 0;
BEGIN
-- Get flow_slug for this run
SELECT r.flow_slug INTO v_flow_slug
FROM pgflow.runs r
WHERE r.run_id = cascade_skip_steps.run_id;

IF v_flow_slug IS NULL THEN
RAISE EXCEPTION 'Run not found: %', cascade_skip_steps.run_id;
END IF;

-- ==========================================
-- SKIP STEPS IN TOPOLOGICAL ORDER
-- ==========================================
-- Use recursive CTE to find all downstream dependents,
-- then skip them in topological order (by step_index)
WITH RECURSIVE
-- ---------- Find all downstream steps ----------
downstream_steps AS (
-- Base case: the trigger step
SELECT
s.flow_slug,
s.step_slug,
s.step_index,
cascade_skip_steps.skip_reason AS reason -- Original reason for trigger step
FROM pgflow.steps s
WHERE s.flow_slug = v_flow_slug
AND s.step_slug = cascade_skip_steps.step_slug

UNION ALL

-- Recursive case: steps that depend on already-found steps
SELECT
s.flow_slug,
s.step_slug,
s.step_index,
'dependency_skipped'::text AS reason -- Downstream steps get this reason
FROM pgflow.steps s
JOIN pgflow.deps d ON d.flow_slug = s.flow_slug AND d.step_slug = s.step_slug
JOIN downstream_steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.dep_slug
),
-- ---------- Deduplicate and order by step_index ----------
steps_to_skip AS (
SELECT DISTINCT ON (ds.step_slug)
ds.flow_slug,
ds.step_slug,
ds.step_index,
ds.reason
FROM downstream_steps ds
ORDER BY ds.step_slug, ds.step_index -- Keep first occurrence (trigger step has original reason)
),
-- ---------- Skip the steps ----------
skipped AS (
UPDATE pgflow.step_states ss
SET status = 'skipped',
skip_reason = sts.reason,
skipped_at = now(),
remaining_tasks = NULL -- Clear remaining_tasks for skipped steps
FROM steps_to_skip sts
WHERE ss.run_id = cascade_skip_steps.run_id
AND ss.step_slug = sts.step_slug
AND ss.status IN ('created', 'started') -- Only skip non-terminal steps
RETURNING
ss.*,
-- Broadcast step:skipped event
realtime.send(
jsonb_build_object(
'event_type', 'step:skipped',
'run_id', ss.run_id,
'flow_slug', ss.flow_slug,
'step_slug', ss.step_slug,
'status', 'skipped',
'skip_reason', ss.skip_reason,
'skipped_at', ss.skipped_at
),
concat('step:', ss.step_slug, ':skipped'),
concat('pgflow:run:', ss.run_id),
false
) as _broadcast_result
),
-- ---------- Update run counters ----------
run_updates AS (
UPDATE pgflow.runs r
SET remaining_steps = r.remaining_steps - skipped_count.count
FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count
WHERE r.run_id = cascade_skip_steps.run_id
AND skipped_count.count > 0
)
SELECT COUNT(*) INTO v_total_skipped FROM skipped;

RETURN v_total_skipped;
END;
$$;
25 changes: 25 additions & 0 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ export type Database = {
remaining_deps: number
remaining_tasks: number | null
run_id: string
skip_reason: string | null
skipped_at: string | null
started_at: string | null
status: string
step_slug: string
Expand All @@ -147,6 +149,8 @@ export type Database = {
remaining_deps?: number
remaining_tasks?: number | null
run_id: string
skip_reason?: string | null
skipped_at?: string | null
started_at?: string | null
status?: string
step_slug: string
Expand All @@ -162,6 +166,8 @@ export type Database = {
remaining_deps?: number
remaining_tasks?: number | null
run_id?: string
skip_reason?: string | null
skipped_at?: string | null
started_at?: string | null
status?: string
step_slug?: string
Expand Down Expand Up @@ -272,6 +278,7 @@ export type Database = {
}
steps: {
Row: {
condition_pattern: Json | null
created_at: string
deps_count: number
flow_slug: string
Expand All @@ -282,8 +289,11 @@ export type Database = {
step_index: number
step_slug: string
step_type: string
when_failed: string
when_unmet: string
}
Insert: {
condition_pattern?: Json | null
created_at?: string
deps_count?: number
flow_slug: string
Expand All @@ -294,8 +304,11 @@ export type Database = {
step_index?: number
step_slug: string
step_type?: string
when_failed?: string
when_unmet?: string
}
Update: {
condition_pattern?: Json | null
created_at?: string
deps_count?: number
flow_slug?: string
Expand All @@ -306,6 +319,8 @@ export type Database = {
step_index?: number
step_slug?: string
step_type?: string
when_failed?: string
when_unmet?: string
}
Relationships: [
{
Expand Down Expand Up @@ -391,15 +406,19 @@ export type Database = {
add_step: {
Args: {
base_delay?: number
condition_pattern?: Json
deps_slugs?: string[]
flow_slug: string
max_attempts?: number
start_delay?: number
step_slug: string
step_type?: string
timeout?: number
when_failed?: string
when_unmet?: string
}
Returns: {
condition_pattern: Json | null
created_at: string
deps_count: number
flow_slug: string
Expand All @@ -410,6 +429,8 @@ export type Database = {
step_index: number
step_slug: string
step_type: string
when_failed: string
when_unmet: string
}
SetofOptions: {
from: "*"
Expand All @@ -426,6 +447,10 @@ export type Database = {
Args: { run_id: string }
Returns: number
}
cascade_skip_steps: {
Args: { run_id: string; skip_reason: string; step_slug: string }
Returns: number
}
cleanup_ensure_workers_logs: {
Args: { retention_hours?: number }
Returns: {
Expand Down
Loading
Loading