diff --git a/.changeset/skip-infrastructure-schema.md b/.changeset/skip-infrastructure-schema.md new file mode 100644 index 000000000..5fd952320 --- /dev/null +++ b/.changeset/skip-infrastructure-schema.md @@ -0,0 +1,5 @@ +--- +'@pgflow/core': patch +--- + +Add skip infrastructure schema for conditional execution - new columns (condition_pattern, when_unmet, when_failed, skip_reason, skipped_at), 'skipped' status, and cascade_skip_steps function diff --git a/pkgs/core/schemas/0050_tables_definitions.sql b/pkgs/core/schemas/0050_tables_definitions.sql index 42367280c..74c3d8b57 100644 --- a/pkgs/core/schemas/0050_tables_definitions.sql +++ b/pkgs/core/schemas/0050_tables_definitions.sql @@ -24,6 +24,9 @@ create table pgflow.steps ( opt_base_delay int, opt_timeout int, opt_start_delay int, + condition_pattern jsonb, -- JSON pattern for @> containment check + when_unmet text not null default 'skip', -- What to do when condition not met (skip is natural default) + when_failed text not null default 'fail', -- What to do when handler fails after retries created_at timestamptz not null default now(), primary key (flow_slug, step_slug), unique (flow_slug, step_index), -- Ensure step_index is unique within a flow @@ -32,7 +35,9 @@ create table pgflow.steps ( constraint opt_max_attempts_is_nonnegative check (opt_max_attempts is null or opt_max_attempts >= 0), constraint opt_base_delay_is_nonnegative check (opt_base_delay is null or opt_base_delay >= 0), constraint opt_timeout_is_positive check (opt_timeout is null or opt_timeout > 0), - constraint opt_start_delay_is_nonnegative check (opt_start_delay is null or opt_start_delay >= 0) + constraint opt_start_delay_is_nonnegative check (opt_start_delay is null or opt_start_delay >= 0), + constraint when_unmet_is_valid check (when_unmet in ('fail', 'skip', 'skip-cascade')), + constraint when_failed_is_valid check (when_failed in ('fail', 'skip', 'skip-cascade')) ); -- Dependencies table - stores relationships between steps diff --git a/pkgs/core/schemas/0060_tables_runtime.sql b/pkgs/core/schemas/0060_tables_runtime.sql index 7a408410c..33eac47eb 100644 --- a/pkgs/core/schemas/0060_tables_runtime.sql +++ b/pkgs/core/schemas/0060_tables_runtime.sql @@ -31,18 +31,20 @@ create table pgflow.step_states ( remaining_deps int not null default 0 check (remaining_deps >= 0), output jsonb, -- Step output: stored atomically with status=completed transition error_message text, + skip_reason text, -- Why step was skipped: condition_unmet, handler_failed, dependency_skipped created_at timestamptz not null default now(), started_at timestamptz, completed_at timestamptz, failed_at timestamptz, + skipped_at timestamptz, primary key (run_id, step_slug), foreign key (flow_slug, step_slug) references pgflow.steps (flow_slug, step_slug), - constraint status_is_valid check (status in ('created', 'started', 'completed', 'failed')), + constraint status_is_valid check (status in ('created', 'started', 'completed', 'failed', 'skipped')), constraint status_and_remaining_tasks_match check (status != 'completed' or remaining_tasks = 0), -- Add constraint to ensure remaining_tasks is only set when step has started constraint remaining_tasks_state_consistency check ( - remaining_tasks is null or status != 'created' + remaining_tasks is null or status not in ('created', 'skipped') ), constraint initial_tasks_known_when_started check ( status != 'started' or initial_tasks is not null @@ -52,16 +54,29 @@ create table pgflow.step_states ( constraint output_only_for_completed_or_null check ( output is null or status = 'completed' ), - constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)), + -- skip_reason is required for skipped status and forbidden for other statuses + constraint skip_reason_matches_status check ( + (status = 'skipped' and skip_reason is not null) or + (status != 'skipped' and skip_reason is null) + ), + constraint completed_at_or_failed_at_or_skipped_at check ( + ( + case when completed_at is not null then 1 else 0 end + + case when failed_at is not null then 1 else 0 end + + case when skipped_at is not null then 1 else 0 end + ) <= 1 + ), constraint started_at_is_after_created_at check (started_at is null or started_at >= created_at), constraint completed_at_is_after_started_at check (completed_at is null or completed_at >= started_at), - constraint failed_at_is_after_started_at check (failed_at is null or failed_at >= started_at) + constraint failed_at_is_after_started_at check (failed_at is null or failed_at >= started_at), + constraint skipped_at_is_after_created_at check (skipped_at is null or skipped_at >= created_at) ); create index if not exists idx_step_states_ready on pgflow.step_states (run_id, status, remaining_deps) where status = 'created' and remaining_deps = 0; create index if not exists idx_step_states_failed on pgflow.step_states (run_id, step_slug) where status = 'failed'; +create index if not exists idx_step_states_skipped on pgflow.step_states (run_id, step_slug) where status = 'skipped'; create index if not exists idx_step_states_flow_slug on pgflow.step_states (flow_slug); create index if not exists idx_step_states_run_id on pgflow.step_states (run_id); diff --git a/pkgs/core/schemas/0100_function_add_step.sql b/pkgs/core/schemas/0100_function_add_step.sql index 3fb8fbc54..0eecdcd7c 100644 --- a/pkgs/core/schemas/0100_function_add_step.sql +++ b/pkgs/core/schemas/0100_function_add_step.sql @@ -6,7 +6,10 @@ create or replace function pgflow.add_step( base_delay int default null, timeout int default null, start_delay int default null, - step_type text default 'single' + step_type text default 'single', + condition_pattern jsonb default null, + when_unmet text default 'skip', + when_failed text default 'fail' ) returns pgflow.steps language plpgsql @@ -22,7 +25,7 @@ BEGIN -- 0 dependencies (root map - maps over flow input array) -- 1 dependency (dependent map - maps over dependency output array) IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN - RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', + RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', add_step.step_slug, COALESCE(array_length(add_step.deps_slugs, 1), 0), array_to_string(add_step.deps_slugs, ', '); @@ -36,18 +39,22 @@ BEGIN -- Create the step INSERT INTO pgflow.steps ( flow_slug, step_slug, step_type, step_index, deps_count, - opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay + opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay, + condition_pattern, when_unmet, when_failed ) VALUES ( add_step.flow_slug, add_step.step_slug, COALESCE(add_step.step_type, 'single'), - next_idx, + next_idx, COALESCE(array_length(add_step.deps_slugs, 1), 0), add_step.max_attempts, add_step.base_delay, add_step.timeout, - add_step.start_delay + add_step.start_delay, + add_step.condition_pattern, + add_step.when_unmet, + add_step.when_failed ) ON CONFLICT ON CONSTRAINT steps_pkey DO UPDATE SET step_slug = EXCLUDED.step_slug @@ -59,7 +66,7 @@ BEGIN FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug) WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0 ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING; - + RETURN result_step; END; $$; diff --git a/pkgs/core/schemas/0100_function_cascade_skip_steps.sql b/pkgs/core/schemas/0100_function_cascade_skip_steps.sql new file mode 100644 index 000000000..62be9f53d --- /dev/null +++ b/pkgs/core/schemas/0100_function_cascade_skip_steps.sql @@ -0,0 +1,105 @@ +-- cascade_skip_steps: Skip a step and cascade to all downstream dependents +-- Used when a condition is unmet (whenUnmet: skip-cascade) or handler fails (whenFailed: skip-cascade) +create or replace function pgflow.cascade_skip_steps( + run_id uuid, + step_slug text, + skip_reason text +) +returns int +language plpgsql +as $$ +DECLARE + v_flow_slug text; + v_total_skipped int := 0; +BEGIN + -- Get flow_slug for this run + SELECT r.flow_slug INTO v_flow_slug + FROM pgflow.runs r + WHERE r.run_id = cascade_skip_steps.run_id; + + IF v_flow_slug IS NULL THEN + RAISE EXCEPTION 'Run not found: %', cascade_skip_steps.run_id; + END IF; + + -- ========================================== + -- SKIP STEPS IN TOPOLOGICAL ORDER + -- ========================================== + -- Use recursive CTE to find all downstream dependents, + -- then skip them in topological order (by step_index) + WITH RECURSIVE + -- ---------- Find all downstream steps ---------- + downstream_steps AS ( + -- Base case: the trigger step + SELECT + s.flow_slug, + s.step_slug, + s.step_index, + cascade_skip_steps.skip_reason AS reason -- Original reason for trigger step + FROM pgflow.steps s + WHERE s.flow_slug = v_flow_slug + AND s.step_slug = cascade_skip_steps.step_slug + + UNION ALL + + -- Recursive case: steps that depend on already-found steps + SELECT + s.flow_slug, + s.step_slug, + s.step_index, + 'dependency_skipped'::text AS reason -- Downstream steps get this reason + FROM pgflow.steps s + JOIN pgflow.deps d ON d.flow_slug = s.flow_slug AND d.step_slug = s.step_slug + JOIN downstream_steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.dep_slug + ), + -- ---------- Deduplicate and order by step_index ---------- + steps_to_skip AS ( + SELECT DISTINCT ON (ds.step_slug) + ds.flow_slug, + ds.step_slug, + ds.step_index, + ds.reason + FROM downstream_steps ds + ORDER BY ds.step_slug, ds.step_index -- Keep first occurrence (trigger step has original reason) + ), + -- ---------- Skip the steps ---------- + skipped AS ( + UPDATE pgflow.step_states ss + SET status = 'skipped', + skip_reason = sts.reason, + skipped_at = now(), + remaining_tasks = NULL -- Clear remaining_tasks for skipped steps + FROM steps_to_skip sts + WHERE ss.run_id = cascade_skip_steps.run_id + AND ss.step_slug = sts.step_slug + AND ss.status IN ('created', 'started') -- Only skip non-terminal steps + RETURNING + ss.*, + -- Broadcast step:skipped event + realtime.send( + jsonb_build_object( + 'event_type', 'step:skipped', + 'run_id', ss.run_id, + 'flow_slug', ss.flow_slug, + 'step_slug', ss.step_slug, + 'status', 'skipped', + 'skip_reason', ss.skip_reason, + 'skipped_at', ss.skipped_at + ), + concat('step:', ss.step_slug, ':skipped'), + concat('pgflow:run:', ss.run_id), + false + ) as _broadcast_result + ), + -- ---------- Update run counters ---------- + run_updates AS ( + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - skipped_count.count + FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count + WHERE r.run_id = cascade_skip_steps.run_id + AND skipped_count.count > 0 + ) + SELECT COUNT(*) INTO v_total_skipped FROM skipped; + + RETURN v_total_skipped; +END; +$$; diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index 510d7e144..9a4fe7c7d 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -132,6 +132,8 @@ export type Database = { remaining_deps: number remaining_tasks: number | null run_id: string + skip_reason: string | null + skipped_at: string | null started_at: string | null status: string step_slug: string @@ -147,6 +149,8 @@ export type Database = { remaining_deps?: number remaining_tasks?: number | null run_id: string + skip_reason?: string | null + skipped_at?: string | null started_at?: string | null status?: string step_slug: string @@ -162,6 +166,8 @@ export type Database = { remaining_deps?: number remaining_tasks?: number | null run_id?: string + skip_reason?: string | null + skipped_at?: string | null started_at?: string | null status?: string step_slug?: string @@ -272,6 +278,7 @@ export type Database = { } steps: { Row: { + condition_pattern: Json | null created_at: string deps_count: number flow_slug: string @@ -282,8 +289,11 @@ export type Database = { step_index: number step_slug: string step_type: string + when_failed: string + when_unmet: string } Insert: { + condition_pattern?: Json | null created_at?: string deps_count?: number flow_slug: string @@ -294,8 +304,11 @@ export type Database = { step_index?: number step_slug: string step_type?: string + when_failed?: string + when_unmet?: string } Update: { + condition_pattern?: Json | null created_at?: string deps_count?: number flow_slug?: string @@ -306,6 +319,8 @@ export type Database = { step_index?: number step_slug?: string step_type?: string + when_failed?: string + when_unmet?: string } Relationships: [ { @@ -391,6 +406,7 @@ export type Database = { add_step: { Args: { base_delay?: number + condition_pattern?: Json deps_slugs?: string[] flow_slug: string max_attempts?: number @@ -398,8 +414,11 @@ export type Database = { step_slug: string step_type?: string timeout?: number + when_failed?: string + when_unmet?: string } Returns: { + condition_pattern: Json | null created_at: string deps_count: number flow_slug: string @@ -410,6 +429,8 @@ export type Database = { step_index: number step_slug: string step_type: string + when_failed: string + when_unmet: string } SetofOptions: { from: "*" @@ -426,6 +447,10 @@ export type Database = { Args: { run_id: string } Returns: number } + cascade_skip_steps: { + Args: { run_id: string; skip_reason: string; step_slug: string } + Returns: number + } cleanup_ensure_workers_logs: { Args: { retention_hours?: number } Returns: { diff --git a/pkgs/core/supabase/migrations/20260105074725_pgflow_temp_skip_infrastructure.sql b/pkgs/core/supabase/migrations/20260105074725_pgflow_temp_skip_infrastructure.sql new file mode 100644 index 000000000..178f74ac3 --- /dev/null +++ b/pkgs/core/supabase/migrations/20260105074725_pgflow_temp_skip_infrastructure.sql @@ -0,0 +1,173 @@ +-- Modify "step_states" table +ALTER TABLE "pgflow"."step_states" DROP CONSTRAINT "completed_at_or_failed_at", DROP CONSTRAINT "remaining_tasks_state_consistency", ADD CONSTRAINT "remaining_tasks_state_consistency" CHECK ((remaining_tasks IS NULL) OR (status <> ALL (ARRAY['created'::text, 'skipped'::text]))), DROP CONSTRAINT "status_is_valid", ADD CONSTRAINT "status_is_valid" CHECK (status = ANY (ARRAY['created'::text, 'started'::text, 'completed'::text, 'failed'::text, 'skipped'::text])), ADD CONSTRAINT "completed_at_or_failed_at_or_skipped_at" CHECK ((( +CASE + WHEN (completed_at IS NOT NULL) THEN 1 + ELSE 0 +END + +CASE + WHEN (failed_at IS NOT NULL) THEN 1 + ELSE 0 +END) + +CASE + WHEN (skipped_at IS NOT NULL) THEN 1 + ELSE 0 +END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipped'::text) AND (skip_reason IS NOT NULL)) OR ((status <> 'skipped'::text) AND (skip_reason IS NULL))), ADD CONSTRAINT "skipped_at_is_after_created_at" CHECK ((skipped_at IS NULL) OR (skipped_at >= created_at)), ADD COLUMN "skip_reason" text NULL, ADD COLUMN "skipped_at" timestamptz NULL; +-- Create index "idx_step_states_skipped" to table: "step_states" +CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text); +-- Modify "steps" table +ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail'; +-- Create "add_step" function +CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + result_step pgflow.steps; + next_idx int; +BEGIN + -- Validate map step constraints + -- Map steps can have either: + -- 0 dependencies (root map - maps over flow input array) + -- 1 dependency (dependent map - maps over dependency output array) + IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN + RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', + add_step.step_slug, + COALESCE(array_length(add_step.deps_slugs, 1), 0), + array_to_string(add_step.deps_slugs, ', '); + END IF; + + -- Get next step index + SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx + FROM pgflow.steps s + WHERE s.flow_slug = add_step.flow_slug; + + -- Create the step + INSERT INTO pgflow.steps ( + flow_slug, step_slug, step_type, step_index, deps_count, + opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay, + condition_pattern, when_unmet, when_failed + ) + VALUES ( + add_step.flow_slug, + add_step.step_slug, + COALESCE(add_step.step_type, 'single'), + next_idx, + COALESCE(array_length(add_step.deps_slugs, 1), 0), + add_step.max_attempts, + add_step.base_delay, + add_step.timeout, + add_step.start_delay, + add_step.condition_pattern, + add_step.when_unmet, + add_step.when_failed + ) + ON CONFLICT ON CONSTRAINT steps_pkey + DO UPDATE SET step_slug = EXCLUDED.step_slug + RETURNING * INTO result_step; + + -- Insert dependencies + INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug) + SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug + FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug) + WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0 + ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING; + + RETURN result_step; +END; +$$; +-- Create "cascade_skip_steps" function +CREATE FUNCTION "pgflow"."cascade_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$ +DECLARE + v_flow_slug text; + v_total_skipped int := 0; +BEGIN + -- Get flow_slug for this run + SELECT r.flow_slug INTO v_flow_slug + FROM pgflow.runs r + WHERE r.run_id = cascade_skip_steps.run_id; + + IF v_flow_slug IS NULL THEN + RAISE EXCEPTION 'Run not found: %', cascade_skip_steps.run_id; + END IF; + + -- ========================================== + -- SKIP STEPS IN TOPOLOGICAL ORDER + -- ========================================== + -- Use recursive CTE to find all downstream dependents, + -- then skip them in topological order (by step_index) + WITH RECURSIVE + -- ---------- Find all downstream steps ---------- + downstream_steps AS ( + -- Base case: the trigger step + SELECT + s.flow_slug, + s.step_slug, + s.step_index, + cascade_skip_steps.skip_reason AS reason -- Original reason for trigger step + FROM pgflow.steps s + WHERE s.flow_slug = v_flow_slug + AND s.step_slug = cascade_skip_steps.step_slug + + UNION ALL + + -- Recursive case: steps that depend on already-found steps + SELECT + s.flow_slug, + s.step_slug, + s.step_index, + 'dependency_skipped'::text AS reason -- Downstream steps get this reason + FROM pgflow.steps s + JOIN pgflow.deps d ON d.flow_slug = s.flow_slug AND d.step_slug = s.step_slug + JOIN downstream_steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.dep_slug + ), + -- ---------- Deduplicate and order by step_index ---------- + steps_to_skip AS ( + SELECT DISTINCT ON (ds.step_slug) + ds.flow_slug, + ds.step_slug, + ds.step_index, + ds.reason + FROM downstream_steps ds + ORDER BY ds.step_slug, ds.step_index -- Keep first occurrence (trigger step has original reason) + ), + -- ---------- Skip the steps ---------- + skipped AS ( + UPDATE pgflow.step_states ss + SET status = 'skipped', + skip_reason = sts.reason, + skipped_at = now(), + remaining_tasks = NULL -- Clear remaining_tasks for skipped steps + FROM steps_to_skip sts + WHERE ss.run_id = cascade_skip_steps.run_id + AND ss.step_slug = sts.step_slug + AND ss.status IN ('created', 'started') -- Only skip non-terminal steps + RETURNING + ss.*, + -- Broadcast step:skipped event + realtime.send( + jsonb_build_object( + 'event_type', 'step:skipped', + 'run_id', ss.run_id, + 'flow_slug', ss.flow_slug, + 'step_slug', ss.step_slug, + 'status', 'skipped', + 'skip_reason', ss.skip_reason, + 'skipped_at', ss.skipped_at + ), + concat('step:', ss.step_slug, ':skipped'), + concat('pgflow:run:', ss.run_id), + false + ) as _broadcast_result + ), + -- ---------- Update run counters ---------- + run_updates AS ( + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - skipped_count.count + FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count + WHERE r.run_id = cascade_skip_steps.run_id + AND skipped_count.count > 0 + ) + SELECT COUNT(*) INTO v_total_skipped FROM skipped; + + RETURN v_total_skipped; +END; +$$; +-- Drop "add_step" function +DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer, integer, text); diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index c0881d482..e23d991f8 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g= +h1:95pJcIaIV04WBvPgFpjULl/TWBCArYhQTMB4IG69phs= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -16,3 +16,4 @@ h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g= 20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ= 20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc= 20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o= +20260105074725_pgflow_temp_skip_infrastructure.sql h1:tjele0FyNwcK0DLlr7I8QxiAueTC36r7KYK27Mkbi2s= diff --git a/pkgs/core/supabase/tests/add_step/condition_invalid_values.test.sql b/pkgs/core/supabase/tests/add_step/condition_invalid_values.test.sql new file mode 100644 index 000000000..17458192e --- /dev/null +++ b/pkgs/core/supabase/tests/add_step/condition_invalid_values.test.sql @@ -0,0 +1,24 @@ +-- Test: add_step - Invalid condition parameter values +-- Verifies CHECK constraints reject invalid when_unmet and when_failed values +begin; +select plan(2); + +select pgflow_tests.reset_db(); +select pgflow.create_flow('invalid_test'); + +-- Test 1: Invalid when_unmet value should fail +select throws_ok( + $$ SELECT pgflow.add_step('invalid_test', 'bad_step', when_unmet => 'invalid_value') $$, + 'new row for relation "steps" violates check constraint "when_unmet_is_valid"', + 'Invalid when_unmet value should be rejected' +); + +-- Test 2: Invalid when_failed value should fail +select throws_ok( + $$ SELECT pgflow.add_step('invalid_test', 'bad_step2', when_failed => 'invalid_value') $$, + 'new row for relation "steps" violates check constraint "when_failed_is_valid"', + 'Invalid when_failed value should be rejected' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/add_step/condition_parameters.test.sql b/pkgs/core/supabase/tests/add_step/condition_parameters.test.sql new file mode 100644 index 000000000..324aae2bf --- /dev/null +++ b/pkgs/core/supabase/tests/add_step/condition_parameters.test.sql @@ -0,0 +1,125 @@ +-- Test: add_step - New condition parameters +-- Verifies condition_pattern, when_unmet, when_failed parameters work correctly +begin; +select plan(9); + +select pgflow_tests.reset_db(); +select pgflow.create_flow('condition_test'); + +-- Test 1: Add step with condition_pattern +select pgflow.add_step( + 'condition_test', + 'step_with_condition', + condition_pattern => '{"type": "premium"}'::jsonb +); + +select is( + (select condition_pattern from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_with_condition'), + '{"type": "premium"}'::jsonb, + 'condition_pattern should be stored correctly' +); + +-- Test 2: Add step with when_unmet = skip +select pgflow.add_step( + 'condition_test', + 'step_skip_unmet', + when_unmet => 'skip' +); + +select is( + (select when_unmet from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_skip_unmet'), + 'skip', + 'when_unmet should be skip' +); + +-- Test 3: Add step with when_unmet = skip-cascade +select pgflow.add_step( + 'condition_test', + 'step_skip_cascade_unmet', + when_unmet => 'skip-cascade' +); + +select is( + (select when_unmet from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_skip_cascade_unmet'), + 'skip-cascade', + 'when_unmet should be skip-cascade' +); + +-- Test 4: Add step with when_failed = skip +select pgflow.add_step( + 'condition_test', + 'step_skip_failed', + when_failed => 'skip' +); + +select is( + (select when_failed from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_skip_failed'), + 'skip', + 'when_failed should be skip' +); + +-- Test 5: Add step with when_failed = skip-cascade +select pgflow.add_step( + 'condition_test', + 'step_skip_cascade_failed', + when_failed => 'skip-cascade' +); + +select is( + (select when_failed from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_skip_cascade_failed'), + 'skip-cascade', + 'when_failed should be skip-cascade' +); + +-- Test 6: Default when_unmet should be skip (natural default for conditions) +select pgflow.add_step('condition_test', 'step_default_unmet'); + +select is( + (select when_unmet from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_default_unmet'), + 'skip', + 'Default when_unmet should be skip' +); + +-- Test 7: Default when_failed should be fail +select is( + (select when_failed from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_default_unmet'), + 'fail', + 'Default when_failed should be fail' +); + +-- Test 8: Default condition_pattern should be NULL +select is( + (select condition_pattern from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_default_unmet'), + NULL::jsonb, + 'Default condition_pattern should be NULL' +); + +-- Test 9: Add step with all condition parameters +select pgflow.add_step( + 'condition_test', + 'step_all_params', + condition_pattern => '{"status": "active"}'::jsonb, + when_unmet => 'skip', + when_failed => 'skip-cascade' +); + +select ok( + (select + condition_pattern = '{"status": "active"}'::jsonb + AND when_unmet = 'skip' + AND when_failed = 'skip-cascade' + from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_all_params'), + 'All condition parameters should be stored correctly together' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/broadcast_order.test.sql b/pkgs/core/supabase/tests/cascade_skip_steps/broadcast_order.test.sql new file mode 100644 index 000000000..44e04d649 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_skip_steps/broadcast_order.test.sql @@ -0,0 +1,64 @@ +-- Test: cascade_skip_steps - Broadcast order respects dependency graph +-- Verifies step:skipped events are sent in topological order +begin; +select plan(2); + +-- Reset database and create a chain: A -> B -> C +select pgflow_tests.reset_db(); +select pgflow.create_flow('order_test'); +select pgflow.add_step('order_test', 'step_a'); +select pgflow.add_step('order_test', 'step_b', ARRAY['step_a']); +select pgflow.add_step('order_test', 'step_c', ARRAY['step_b']); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('order_test', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Skip step_a (cascades to B and C) +select pgflow.cascade_skip_steps( + (select run_id from run_ids), + 'step_a', + 'condition_unmet' +); + +-- Test 1: All 3 step:skipped events should exist +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:skipped' + and payload->>'run_id' = (select run_id::text from run_ids)), + 3::bigint, + 'Should have 3 step:skipped events' +); + +-- Test 2: Events should be in dependency order (A before B before C) +with ordered_events as ( + select + inserted_at, + payload->>'step_slug' as step_slug, + row_number() over (order by inserted_at) as event_order + from realtime.messages + where payload->>'event_type' = 'step:skipped' + and payload->>'run_id' = (select run_id::text from run_ids) +), +step_a_event as ( + select event_order from ordered_events where step_slug = 'step_a' +), +step_b_event as ( + select event_order from ordered_events where step_slug = 'step_b' +), +step_c_event as ( + select event_order from ordered_events where step_slug = 'step_c' +) +select ok( + (select event_order from step_a_event) < (select event_order from step_b_event) + AND (select event_order from step_b_event) < (select event_order from step_c_event), + 'Events must be in dependency order (A -> B -> C)' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/cascade_through_multiple_levels.test.sql b/pkgs/core/supabase/tests/cascade_skip_steps/cascade_through_multiple_levels.test.sql new file mode 100644 index 000000000..888a8d6f8 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_skip_steps/cascade_through_multiple_levels.test.sql @@ -0,0 +1,95 @@ +-- Test: cascade_skip_steps - Cascade through multiple DAG levels +-- Verifies skipping A cascades through A -> B -> C chain +begin; +select plan(8); + +-- Reset database and create a flow: A -> B -> C +select pgflow_tests.reset_db(); +select pgflow.create_flow('deep_cascade'); +select pgflow.add_step('deep_cascade', 'step_a'); +select pgflow.add_step('deep_cascade', 'step_b', ARRAY['step_a']); +select pgflow.add_step('deep_cascade', 'step_c', ARRAY['step_b']); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('deep_cascade', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Skip step_a (should cascade to step_b and step_c) +select pgflow.cascade_skip_steps( + (select run_id from run_ids), + 'step_a', + 'handler_failed' +); + +-- Test 1: step_a should be skipped with handler_failed reason +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'handler_failed', + 'step_a skip_reason should be handler_failed' +); + +-- Test 2: step_b should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'skipped', + 'step_b should be skipped (direct dependent of step_a)' +); + +-- Test 3: step_b should have dependency_skipped reason +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'dependency_skipped', + 'step_b skip_reason should be dependency_skipped' +); + +-- Test 4: step_c should also be skipped (transitive) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_c'), + 'skipped', + 'step_c should be skipped (transitive cascade)' +); + +-- Test 5: step_c should have dependency_skipped reason +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_c'), + 'dependency_skipped', + 'step_c skip_reason should be dependency_skipped' +); + +-- Test 6: All three steps should be skipped +select is( + (select count(*) from pgflow.step_states + where run_id = (select run_id from run_ids) and status = 'skipped'), + 3::bigint, + 'All 3 steps should be skipped' +); + +-- Test 7: remaining_steps should be 0 +select is( + (select remaining_steps from pgflow.runs + where run_id = (select run_id from run_ids)), + 0::int, + 'remaining_steps should be 0' +); + +-- Test 8: step:skipped events should be sent for all 3 steps +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:skipped' + and payload->>'run_id' = (select run_id::text from run_ids)), + 3::bigint, + 'Should send 3 step:skipped events' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/cascade_to_single_dependent.test.sql b/pkgs/core/supabase/tests/cascade_skip_steps/cascade_to_single_dependent.test.sql new file mode 100644 index 000000000..a6b086b41 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_skip_steps/cascade_to_single_dependent.test.sql @@ -0,0 +1,86 @@ +-- Test: cascade_skip_steps - Cascade to single dependent +-- Verifies skipping a step cascades to its direct dependent +begin; +select plan(7); + +-- Reset database and create a flow: A -> B +select pgflow_tests.reset_db(); +select pgflow.create_flow('cascade_flow'); +select pgflow.add_step('cascade_flow', 'step_a'); +select pgflow.add_step('cascade_flow', 'step_b', ARRAY['step_a']); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('cascade_flow', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Skip step_a (should cascade to step_b) +select pgflow.cascade_skip_steps( + (select run_id from run_ids), + 'step_a', + 'condition_unmet' +); + +-- Test 1: step_a should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'skipped', + 'step_a should be skipped' +); + +-- Test 2: step_a should have skip_reason = condition_unmet +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'condition_unmet', + 'step_a skip_reason should be condition_unmet' +); + +-- Test 3: step_b should also be skipped (cascade) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'skipped', + 'step_b should be skipped due to cascade' +); + +-- Test 4: step_b should have skip_reason = dependency_skipped +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'dependency_skipped', + 'step_b skip_reason should be dependency_skipped' +); + +-- Test 5: Both steps should have skipped_at timestamp set +select ok( + (select count(*) = 2 from pgflow.step_states + where run_id = (select run_id from run_ids) + and skipped_at is not null), + 'Both steps should have skipped_at timestamp' +); + +-- Test 6: remaining_steps should be 0 (both skipped) +select is( + (select remaining_steps from pgflow.runs + where run_id = (select run_id from run_ids)), + 0::int, + 'remaining_steps should be 0 (both steps skipped)' +); + +-- Test 7: step:skipped events should be sent for both steps +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:skipped' + and payload->>'run_id' = (select run_id::text from run_ids)), + 2::bigint, + 'Should send step:skipped events for both steps' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/multi_dependency_partial_skip.test.sql b/pkgs/core/supabase/tests/cascade_skip_steps/multi_dependency_partial_skip.test.sql new file mode 100644 index 000000000..c7a1f2327 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_skip_steps/multi_dependency_partial_skip.test.sql @@ -0,0 +1,80 @@ +-- Test: cascade_skip_steps - Multi-dependency scenario +-- Flow: A -> C, B -> C (C depends on both A and B) +-- Skipping A should cascade to C, even though B is still runnable +begin; +select plan(6); + +-- Reset database and create a diamond-ish flow +select pgflow_tests.reset_db(); +select pgflow.create_flow('multi_dep'); +select pgflow.add_step('multi_dep', 'step_a'); +select pgflow.add_step('multi_dep', 'step_b'); +select pgflow.add_step('multi_dep', 'step_c', ARRAY['step_a', 'step_b']); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('multi_dep', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Skip step_a (should cascade to step_c) +select pgflow.cascade_skip_steps( + (select run_id from run_ids), + 'step_a', + 'condition_unmet' +); + +-- Test 1: step_a should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'skipped', + 'step_a should be skipped' +); + +-- Test 2: step_b should NOT be skipped (independent of step_a, root step so started) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'started', + 'step_b should remain in started status (independent root step)' +); + +-- Test 3: step_c should be skipped (depends on skipped step_a) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_c'), + 'skipped', + 'step_c should be skipped (one of its deps was skipped)' +); + +-- Test 4: step_c skip_reason should be dependency_skipped +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_c'), + 'dependency_skipped', + 'step_c skip_reason should be dependency_skipped' +); + +-- Test 5: remaining_steps should be 1 (only step_b) +select is( + (select remaining_steps from pgflow.runs + where run_id = (select run_id from run_ids)), + 1::int, + 'remaining_steps should be 1 (only step_b remains)' +); + +-- Test 6: 2 step:skipped events (step_a and step_c) +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:skipped' + and payload->>'run_id' = (select run_id::text from run_ids)), + 2::bigint, + 'Should send 2 step:skipped events (step_a and step_c)' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/single_step_skip.test.sql b/pkgs/core/supabase/tests/cascade_skip_steps/single_step_skip.test.sql new file mode 100644 index 000000000..2df25cbd5 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_skip_steps/single_step_skip.test.sql @@ -0,0 +1,69 @@ +-- Test: cascade_skip_steps - Single step skip (base case) +-- Verifies the function can skip a single step without dependencies +begin; +select plan(5); + +-- Reset database and create a simple flow with no dependencies +select pgflow_tests.reset_db(); +select pgflow.create_flow('simple_flow'); +select pgflow.add_step('simple_flow', 'step_a'); +select pgflow.add_step('simple_flow', 'step_b'); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('simple_flow', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: Verify step_a starts in 'started' status (root steps auto-start) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'started', + 'step_a should start in started status (root step auto-starts)' +); + +-- Skip step_a +select pgflow.cascade_skip_steps( + (select run_id from run_ids), + 'step_a', + 'condition_unmet' +); + +-- Test 2: step_a should now have status 'skipped' +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'skipped', + 'step_a should be skipped after cascade_skip_steps' +); + +-- Test 3: step_a should have skip_reason set +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'condition_unmet', + 'step_a should have skip_reason = condition_unmet' +); + +-- Test 4: step_b should remain unaffected (still started, independent root step) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'started', + 'step_b (independent step) should remain in started status' +); + +-- Test 5: remaining_steps on run should be decremented by 1 +select is( + (select remaining_steps from pgflow.runs + where run_id = (select run_id from run_ids)), + 1::int, + 'remaining_steps should be decremented by 1 (was 2, now 1)' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/skipped_event_payload.test.sql b/pkgs/core/supabase/tests/cascade_skip_steps/skipped_event_payload.test.sql new file mode 100644 index 000000000..6b546928f --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_skip_steps/skipped_event_payload.test.sql @@ -0,0 +1,88 @@ +-- Test: cascade_skip_steps - step:skipped event payload format +-- Verifies the realtime event contains all required fields +begin; +select plan(8); + +-- Reset database and create a simple flow +select pgflow_tests.reset_db(); +select pgflow.create_flow('event_test'); +select pgflow.add_step('event_test', 'step_a'); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('event_test', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Skip step_a +select pgflow.cascade_skip_steps( + (select run_id from run_ids), + 'step_a', + 'condition_unmet' +); + +-- Get the event for assertions +select * into temporary skip_event +from pgflow_tests.get_realtime_message('step:skipped', (select run_id from run_ids), 'step_a'); + +-- Test 1: Event type should be step:skipped +select is( + (select payload->>'event_type' from skip_event), + 'step:skipped', + 'Event type should be step:skipped' +); + +-- Test 2: step_slug should be in payload +select is( + (select payload->>'step_slug' from skip_event), + 'step_a', + 'Payload should contain step_slug' +); + +-- Test 3: flow_slug should be in payload +select is( + (select payload->>'flow_slug' from skip_event), + 'event_test', + 'Payload should contain flow_slug' +); + +-- Test 4: run_id should be in payload +select is( + (select payload->>'run_id' from skip_event), + (select run_id::text from run_ids), + 'Payload should contain run_id' +); + +-- Test 5: status should be skipped +select is( + (select payload->>'status' from skip_event), + 'skipped', + 'Payload status should be skipped' +); + +-- Test 6: skip_reason should be in payload +select is( + (select payload->>'skip_reason' from skip_event), + 'condition_unmet', + 'Payload should contain skip_reason' +); + +-- Test 7: skipped_at timestamp should be present +select ok( + (select (payload->>'skipped_at')::timestamptz is not null from skip_event), + 'Payload should include skipped_at timestamp' +); + +-- Test 8: Event name format should be step::skipped +select is( + (select event from skip_event), + 'step:step_a:skipped', + 'Event name should be step::skipped' +); + +-- Clean up +drop table if exists run_ids; +drop table if exists skip_event; + +select finish(); +rollback;