Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkgs/core/schemas/0050_tables_definitions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ create table pgflow.steps (
opt_base_delay int,
opt_timeout int,
opt_start_delay int,
condition_pattern jsonb, -- JSON pattern for @> containment check
condition_pattern jsonb, -- JSON pattern for @> containment check (if)
condition_not_pattern jsonb, -- JSON pattern for NOT @> containment check (ifNot)
when_unmet text not null default 'skip', -- What to do when condition not met (skip is natural default)
when_failed text not null default 'fail', -- What to do when handler fails after retries
created_at timestamptz not null default now(),
Expand Down
4 changes: 3 additions & 1 deletion pkgs/core/schemas/0100_function_add_step.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ create or replace function pgflow.add_step(
start_delay int default null,
step_type text default 'single',
condition_pattern jsonb default null,
condition_not_pattern jsonb default null,
when_unmet text default 'skip',
when_failed text default 'fail'
)
Expand Down Expand Up @@ -40,7 +41,7 @@ BEGIN
INSERT INTO pgflow.steps (
flow_slug, step_slug, step_type, step_index, deps_count,
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
condition_pattern, when_unmet, when_failed
condition_pattern, condition_not_pattern, when_unmet, when_failed
)
VALUES (
add_step.flow_slug,
Expand All @@ -53,6 +54,7 @@ BEGIN
add_step.timeout,
add_step.start_delay,
add_step.condition_pattern,
add_step.condition_not_pattern,
add_step.when_unmet,
add_step.when_failed
)
Expand Down
53 changes: 34 additions & 19 deletions pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ BEGIN
-- PHASE 1a: CHECK FOR FAIL CONDITIONS
-- ==========================================
-- Find first step (by topological order) with unmet condition and 'fail' mode.
-- Condition is unmet when:
-- (condition_pattern is set AND input does NOT contain it) OR
-- (condition_not_pattern is set AND input DOES contain it)
WITH steps_with_conditions AS (
SELECT
step_state.flow_slug,
step_state.step_slug,
step.condition_pattern,
step.condition_not_pattern,
step.when_unmet,
step.deps_count,
step.step_index
Expand All @@ -61,7 +65,7 @@ BEGIN
WHERE step_state.run_id = cascade_resolve_conditions.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND step.condition_pattern IS NOT NULL
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
),
step_deps_output AS (
SELECT
Expand All @@ -79,26 +83,31 @@ BEGIN
condition_evaluations AS (
SELECT
swc.*,
CASE
WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern
ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern
END AS condition_met
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
(swc.condition_pattern IS NULL OR
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
AND
(swc.condition_not_pattern IS NULL OR
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
AS condition_met
FROM steps_with_conditions swc
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
)
SELECT flow_slug, step_slug, condition_pattern
SELECT flow_slug, step_slug, condition_pattern, condition_not_pattern
INTO v_first_fail
FROM condition_evaluations
WHERE NOT condition_met AND when_unmet = 'fail'
ORDER BY step_index
LIMIT 1;

-- Handle fail mode: fail step and run, return false
IF v_first_fail IS NOT NULL THEN
-- Note: Cannot use "v_first_fail IS NOT NULL" because records with NULL fields
-- evaluate to NULL in IS NOT NULL checks. Use FOUND instead.
IF FOUND THEN
UPDATE pgflow.step_states
SET status = 'failed',
failed_at = now(),
error_message = 'Condition not met: ' || v_first_fail.condition_pattern::text
error_message = 'Condition not met'
WHERE pgflow.step_states.run_id = cascade_resolve_conditions.run_id
AND pgflow.step_states.step_slug = v_first_fail.step_slug;

Expand All @@ -114,12 +123,13 @@ BEGIN
-- PHASE 1b: HANDLE SKIP CONDITIONS (with propagation)
-- ==========================================
-- Skip steps with unmet conditions and whenUnmet='skip'.
-- NEW: Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents.
-- Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents.
WITH steps_with_conditions AS (
SELECT
step_state.flow_slug,
step_state.step_slug,
step.condition_pattern,
step.condition_not_pattern,
step.when_unmet,
step.deps_count,
step.step_index
Expand All @@ -130,7 +140,7 @@ BEGIN
WHERE step_state.run_id = cascade_resolve_conditions.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND step.condition_pattern IS NOT NULL
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
),
step_deps_output AS (
SELECT
Expand All @@ -148,10 +158,13 @@ BEGIN
condition_evaluations AS (
SELECT
swc.*,
CASE
WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern
ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern
END AS condition_met
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
(swc.condition_pattern IS NULL OR
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
AND
(swc.condition_not_pattern IS NULL OR
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
AS condition_met
FROM steps_with_conditions swc
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
),
Expand Down Expand Up @@ -231,13 +244,15 @@ BEGIN
WHERE ready_step.run_id = cascade_resolve_conditions.run_id
AND ready_step.status = 'created'
AND ready_step.remaining_deps = 0
AND step.condition_pattern IS NOT NULL
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
AND step.when_unmet = 'skip-cascade'
-- Condition is NOT met when: (if fails) OR (ifNot fails)
AND NOT (
CASE
WHEN step.deps_count = 0 THEN v_run_input @> step.condition_pattern
ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) @> step.condition_pattern
END
(step.condition_pattern IS NULL OR
CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_pattern)
AND
(step.condition_not_pattern IS NULL OR
NOT (CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_not_pattern))
)
ORDER BY step.step_index;

Expand Down
5 changes: 5 additions & 0 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ export type Database = {
}
steps: {
Row: {
condition_not_pattern: Json | null
condition_pattern: Json | null
created_at: string
deps_count: number
Expand All @@ -293,6 +294,7 @@ export type Database = {
when_unmet: string
}
Insert: {
condition_not_pattern?: Json | null
condition_pattern?: Json | null
created_at?: string
deps_count?: number
Expand All @@ -308,6 +310,7 @@ export type Database = {
when_unmet?: string
}
Update: {
condition_not_pattern?: Json | null
condition_pattern?: Json | null
created_at?: string
deps_count?: number
Expand Down Expand Up @@ -410,6 +413,7 @@ export type Database = {
add_step: {
Args: {
base_delay?: number
condition_not_pattern?: Json
condition_pattern?: Json
deps_slugs?: string[]
flow_slug: string
Expand All @@ -422,6 +426,7 @@ export type Database = {
when_unmet?: string
}
Returns: {
condition_not_pattern: Json | null
condition_pattern: Json | null
created_at: string
deps_count: number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipp
-- Create index "idx_step_states_skipped" to table: "step_states"
CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text);
-- Modify "steps" table
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail';
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "condition_not_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail';
-- Create "_cascade_force_skip_steps" function
CREATE FUNCTION "pgflow"."_cascade_force_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$
DECLARE
Expand Down Expand Up @@ -151,11 +151,15 @@ BEGIN
-- PHASE 1a: CHECK FOR FAIL CONDITIONS
-- ==========================================
-- Find first step (by topological order) with unmet condition and 'fail' mode.
-- Condition is unmet when:
-- (condition_pattern is set AND input does NOT contain it) OR
-- (condition_not_pattern is set AND input DOES contain it)
WITH steps_with_conditions AS (
SELECT
step_state.flow_slug,
step_state.step_slug,
step.condition_pattern,
step.condition_not_pattern,
step.when_unmet,
step.deps_count,
step.step_index
Expand All @@ -166,7 +170,7 @@ BEGIN
WHERE step_state.run_id = cascade_resolve_conditions.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND step.condition_pattern IS NOT NULL
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
),
step_deps_output AS (
SELECT
Expand All @@ -184,26 +188,31 @@ BEGIN
condition_evaluations AS (
SELECT
swc.*,
CASE
WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern
ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern
END AS condition_met
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
(swc.condition_pattern IS NULL OR
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
AND
(swc.condition_not_pattern IS NULL OR
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
AS condition_met
FROM steps_with_conditions swc
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
)
SELECT flow_slug, step_slug, condition_pattern
SELECT flow_slug, step_slug, condition_pattern, condition_not_pattern
INTO v_first_fail
FROM condition_evaluations
WHERE NOT condition_met AND when_unmet = 'fail'
ORDER BY step_index
LIMIT 1;

-- Handle fail mode: fail step and run, return false
IF v_first_fail IS NOT NULL THEN
-- Note: Cannot use "v_first_fail IS NOT NULL" because records with NULL fields
-- evaluate to NULL in IS NOT NULL checks. Use FOUND instead.
IF FOUND THEN
UPDATE pgflow.step_states
SET status = 'failed',
failed_at = now(),
error_message = 'Condition not met: ' || v_first_fail.condition_pattern::text
error_message = 'Condition not met'
WHERE pgflow.step_states.run_id = cascade_resolve_conditions.run_id
AND pgflow.step_states.step_slug = v_first_fail.step_slug;

Expand All @@ -219,12 +228,13 @@ BEGIN
-- PHASE 1b: HANDLE SKIP CONDITIONS (with propagation)
-- ==========================================
-- Skip steps with unmet conditions and whenUnmet='skip'.
-- NEW: Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents.
-- Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents.
WITH steps_with_conditions AS (
SELECT
step_state.flow_slug,
step_state.step_slug,
step.condition_pattern,
step.condition_not_pattern,
step.when_unmet,
step.deps_count,
step.step_index
Expand All @@ -235,7 +245,7 @@ BEGIN
WHERE step_state.run_id = cascade_resolve_conditions.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND step.condition_pattern IS NOT NULL
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
),
step_deps_output AS (
SELECT
Expand All @@ -253,10 +263,13 @@ BEGIN
condition_evaluations AS (
SELECT
swc.*,
CASE
WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern
ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern
END AS condition_met
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
(swc.condition_pattern IS NULL OR
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
AND
(swc.condition_not_pattern IS NULL OR
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
AS condition_met
FROM steps_with_conditions swc
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
),
Expand Down Expand Up @@ -336,13 +349,15 @@ BEGIN
WHERE ready_step.run_id = cascade_resolve_conditions.run_id
AND ready_step.status = 'created'
AND ready_step.remaining_deps = 0
AND step.condition_pattern IS NOT NULL
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
AND step.when_unmet = 'skip-cascade'
-- Condition is NOT met when: (if fails) OR (ifNot fails)
AND NOT (
CASE
WHEN step.deps_count = 0 THEN v_run_input @> step.condition_pattern
ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) @> step.condition_pattern
END
(step.condition_pattern IS NULL OR
CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_pattern)
AND
(step.condition_not_pattern IS NULL OR
NOT (CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_not_pattern))
)
ORDER BY step.step_index;

Expand Down Expand Up @@ -1440,7 +1455,7 @@ with tasks as (
dep_out.step_slug = st.step_slug
$$;
-- Create "add_step" function
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "condition_not_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
result_step pgflow.steps;
next_idx int;
Expand All @@ -1465,7 +1480,7 @@ BEGIN
INSERT INTO pgflow.steps (
flow_slug, step_slug, step_type, step_index, deps_count,
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
condition_pattern, when_unmet, when_failed
condition_pattern, condition_not_pattern, when_unmet, when_failed
)
VALUES (
add_step.flow_slug,
Expand All @@ -1478,6 +1493,7 @@ BEGIN
add_step.timeout,
add_step.start_delay,
add_step.condition_pattern,
add_step.condition_not_pattern,
add_step.when_unmet,
add_step.when_failed
)
Expand Down
4 changes: 2 additions & 2 deletions pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:YiBO80ZA6oQ84E10ZabIvo3OS/XglHkEmBn1Rp5Iay4=
h1:UUZln51my4XRIQECtp1HayMW7tGjk5w8qLQhW0x7gEY=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -16,4 +16,4 @@ h1:YiBO80ZA6oQ84E10ZabIvo3OS/XglHkEmBn1Rp5Iay4=
20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ=
20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc=
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
20260105214940_pgflow_step_conditions.sql h1:DIta8qrr+qRvA9aFCdWefk72qp27mcPvGGlAJswmitw=
20260108131350_pgflow_step_conditions.sql h1:7YMszmTlExOtx9PyYLB7hIc3RiMmtB4ZOc2EOQVfuPs=
Loading