diff --git a/pkgs/core/schemas/0100_function_compare_flow_shapes.sql b/pkgs/core/schemas/0100_function_compare_flow_shapes.sql index 193d8c564..ae543c138 100644 --- a/pkgs/core/schemas/0100_function_compare_flow_shapes.sql +++ b/pkgs/core/schemas/0100_function_compare_flow_shapes.sql @@ -107,6 +107,32 @@ BEGIN ) ); END IF; + + -- Compare whenUnmet (structural - affects DAG execution semantics) + IF v_local_step->>'whenUnmet' != v_db_step->>'whenUnmet' THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: whenUnmet differs '%s' vs '%s'$$, + v_idx, + v_local_step->>'whenUnmet', + v_db_step->>'whenUnmet' + ) + ); + END IF; + + -- Compare whenFailed (structural - affects DAG execution semantics) + IF v_local_step->>'whenFailed' != v_db_step->>'whenFailed' THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: whenFailed differs '%s' vs '%s'$$, + v_idx, + v_local_step->>'whenFailed', + v_db_step->>'whenFailed' + ) + ); + END IF; END IF; END LOOP; diff --git a/pkgs/core/schemas/0100_function_create_flow_from_shape.sql b/pkgs/core/schemas/0100_function_create_flow_from_shape.sql index daf2dc548..7c9e445c2 100644 --- a/pkgs/core/schemas/0100_function_create_flow_from_shape.sql +++ b/pkgs/core/schemas/0100_function_create_flow_from_shape.sql @@ -47,7 +47,9 @@ BEGIN base_delay => (v_step_options->>'baseDelay')::int, timeout => (v_step_options->>'timeout')::int, start_delay => (v_step_options->>'startDelay')::int, - step_type => v_step->>'stepType' + step_type => v_step->>'stepType', + when_unmet => v_step->>'whenUnmet', + when_failed => v_step->>'whenFailed' ); END LOOP; END; diff --git a/pkgs/core/schemas/0100_function_get_flow_shape.sql b/pkgs/core/schemas/0100_function_get_flow_shape.sql index a725c820c..725388ebc 100644 --- a/pkgs/core/schemas/0100_function_get_flow_shape.sql +++ b/pkgs/core/schemas/0100_function_get_flow_shape.sql @@ -22,7 +22,9 @@ as $$ AND dep.step_slug = step.step_slug ), '[]'::jsonb - ) + ), + 'whenUnmet', step.when_unmet, + 'whenFailed', step.when_failed ) ORDER BY step.step_index ), diff --git a/pkgs/core/supabase/migrations/20260108131350_pgflow_step_conditions.sql b/pkgs/core/supabase/migrations/20260108131350_pgflow_step_conditions.sql index 840c2d134..529436eaf 100644 --- a/pkgs/core/supabase/migrations/20260108131350_pgflow_step_conditions.sql +++ b/pkgs/core/supabase/migrations/20260108131350_pgflow_step_conditions.sql @@ -16,6 +16,270 @@ END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipp CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text); -- Modify "steps" table ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "condition_not_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail'; +-- Modify "_compare_flow_shapes" function +CREATE OR REPLACE FUNCTION "pgflow"."_compare_flow_shapes" ("p_local" jsonb, "p_db" jsonb) RETURNS text[] LANGUAGE plpgsql STABLE SET "search_path" = '' AS $BODY$ +DECLARE + v_differences text[] := '{}'; + v_local_steps jsonb; + v_db_steps jsonb; + v_local_count int; + v_db_count int; + v_max_count int; + v_idx int; + v_local_step jsonb; + v_db_step jsonb; + v_local_deps text; + v_db_deps text; +BEGIN + v_local_steps := p_local->'steps'; + v_db_steps := p_db->'steps'; + v_local_count := jsonb_array_length(COALESCE(v_local_steps, '[]'::jsonb)); + v_db_count := jsonb_array_length(COALESCE(v_db_steps, '[]'::jsonb)); + + -- Compare step counts + IF v_local_count != v_db_count THEN + v_differences := array_append( + v_differences, + format('Step count differs: %s vs %s', v_local_count, v_db_count) + ); + END IF; + + -- Compare steps by index + v_max_count := GREATEST(v_local_count, v_db_count); + + FOR v_idx IN 0..(v_max_count - 1) LOOP + v_local_step := v_local_steps->v_idx; + v_db_step := v_db_steps->v_idx; + + IF v_local_step IS NULL THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: missing in first shape (second has '%s')$$, + v_idx, + v_db_step->>'slug' + ) + ); + ELSIF v_db_step IS NULL THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: missing in second shape (first has '%s')$$, + v_idx, + v_local_step->>'slug' + ) + ); + ELSE + -- Compare slug + IF v_local_step->>'slug' != v_db_step->>'slug' THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: slug differs '%s' vs '%s'$$, + v_idx, + v_local_step->>'slug', + v_db_step->>'slug' + ) + ); + END IF; + + -- Compare step type + IF v_local_step->>'stepType' != v_db_step->>'stepType' THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: type differs '%s' vs '%s'$$, + v_idx, + v_local_step->>'stepType', + v_db_step->>'stepType' + ) + ); + END IF; + + -- Compare dependencies (convert arrays to comma-separated strings) + SELECT string_agg(dep, ', ' ORDER BY dep) + INTO v_local_deps + FROM jsonb_array_elements_text(COALESCE(v_local_step->'dependencies', '[]'::jsonb)) AS dep; + + SELECT string_agg(dep, ', ' ORDER BY dep) + INTO v_db_deps + FROM jsonb_array_elements_text(COALESCE(v_db_step->'dependencies', '[]'::jsonb)) AS dep; + + IF COALESCE(v_local_deps, '') != COALESCE(v_db_deps, '') THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: dependencies differ [%s] vs [%s]$$, + v_idx, + COALESCE(v_local_deps, ''), + COALESCE(v_db_deps, '') + ) + ); + END IF; + + -- Compare whenUnmet (structural - affects DAG execution semantics) + IF v_local_step->>'whenUnmet' != v_db_step->>'whenUnmet' THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: whenUnmet differs '%s' vs '%s'$$, + v_idx, + v_local_step->>'whenUnmet', + v_db_step->>'whenUnmet' + ) + ); + END IF; + + -- Compare whenFailed (structural - affects DAG execution semantics) + IF v_local_step->>'whenFailed' != v_db_step->>'whenFailed' THEN + v_differences := array_append( + v_differences, + format( + $$Step at index %s: whenFailed differs '%s' vs '%s'$$, + v_idx, + v_local_step->>'whenFailed', + v_db_step->>'whenFailed' + ) + ); + END IF; + END IF; + END LOOP; + + RETURN v_differences; +END; +$BODY$; +-- Create "add_step" function +CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "condition_not_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + result_step pgflow.steps; + next_idx int; +BEGIN + -- Validate map step constraints + -- Map steps can have either: + -- 0 dependencies (root map - maps over flow input array) + -- 1 dependency (dependent map - maps over dependency output array) + IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN + RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', + add_step.step_slug, + COALESCE(array_length(add_step.deps_slugs, 1), 0), + array_to_string(add_step.deps_slugs, ', '); + END IF; + + -- Get next step index + SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx + FROM pgflow.steps s + WHERE s.flow_slug = add_step.flow_slug; + + -- Create the step + INSERT INTO pgflow.steps ( + flow_slug, step_slug, step_type, step_index, deps_count, + opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay, + condition_pattern, condition_not_pattern, when_unmet, when_failed + ) + VALUES ( + add_step.flow_slug, + add_step.step_slug, + COALESCE(add_step.step_type, 'single'), + next_idx, + COALESCE(array_length(add_step.deps_slugs, 1), 0), + add_step.max_attempts, + add_step.base_delay, + add_step.timeout, + add_step.start_delay, + add_step.condition_pattern, + add_step.condition_not_pattern, + add_step.when_unmet, + add_step.when_failed + ) + ON CONFLICT ON CONSTRAINT steps_pkey + DO UPDATE SET step_slug = EXCLUDED.step_slug + RETURNING * INTO result_step; + + -- Insert dependencies + INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug) + SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug + FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug) + WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0 + ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING; + + RETURN result_step; +END; +$$; +-- Modify "_create_flow_from_shape" function +CREATE OR REPLACE FUNCTION "pgflow"."_create_flow_from_shape" ("p_flow_slug" text, "p_shape" jsonb) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + v_step jsonb; + v_deps text[]; + v_flow_options jsonb; + v_step_options jsonb; +BEGIN + -- Extract flow-level options (may be null) + v_flow_options := p_shape->'options'; + + -- Create the flow with options (NULL = use default) + PERFORM pgflow.create_flow( + p_flow_slug, + (v_flow_options->>'maxAttempts')::int, + (v_flow_options->>'baseDelay')::int, + (v_flow_options->>'timeout')::int + ); + + -- Iterate over steps in order and add each one + FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps') + LOOP + -- Convert dependencies jsonb array to text array + SELECT COALESCE(array_agg(dep), '{}') + INTO v_deps + FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep; + + -- Extract step options (may be null) + v_step_options := v_step->'options'; + + -- Add the step with options (NULL = use default/inherit) + PERFORM pgflow.add_step( + flow_slug => p_flow_slug, + step_slug => v_step->>'slug', + deps_slugs => v_deps, + max_attempts => (v_step_options->>'maxAttempts')::int, + base_delay => (v_step_options->>'baseDelay')::int, + timeout => (v_step_options->>'timeout')::int, + start_delay => (v_step_options->>'startDelay')::int, + step_type => v_step->>'stepType', + when_unmet => v_step->>'whenUnmet', + when_failed => v_step->>'whenFailed' + ); + END LOOP; +END; +$$; +-- Modify "_get_flow_shape" function +CREATE OR REPLACE FUNCTION "pgflow"."_get_flow_shape" ("p_flow_slug" text) RETURNS jsonb LANGUAGE sql STABLE SET "search_path" = '' AS $$ +SELECT jsonb_build_object( + 'steps', + COALESCE( + jsonb_agg( + jsonb_build_object( + 'slug', step.step_slug, + 'stepType', step.step_type, + 'dependencies', COALESCE( + ( + SELECT jsonb_agg(dep.dep_slug ORDER BY dep.dep_slug) + FROM pgflow.deps AS dep + WHERE dep.flow_slug = step.flow_slug + AND dep.step_slug = step.step_slug + ), + '[]'::jsonb + ), + 'whenUnmet', step.when_unmet, + 'whenFailed', step.when_failed + ) + ORDER BY step.step_index + ), + '[]'::jsonb + ) + ) + FROM pgflow.steps AS step + WHERE step.flow_slug = p_flow_slug; +$$; -- Create "_cascade_force_skip_steps" function CREATE FUNCTION "pgflow"."_cascade_force_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$ DECLARE @@ -1454,62 +1718,5 @@ with tasks as ( dep_out.run_id = st.run_id and dep_out.step_slug = st.step_slug $$; --- Create "add_step" function -CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "condition_not_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$ -DECLARE - result_step pgflow.steps; - next_idx int; -BEGIN - -- Validate map step constraints - -- Map steps can have either: - -- 0 dependencies (root map - maps over flow input array) - -- 1 dependency (dependent map - maps over dependency output array) - IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN - RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', - add_step.step_slug, - COALESCE(array_length(add_step.deps_slugs, 1), 0), - array_to_string(add_step.deps_slugs, ', '); - END IF; - - -- Get next step index - SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx - FROM pgflow.steps s - WHERE s.flow_slug = add_step.flow_slug; - - -- Create the step - INSERT INTO pgflow.steps ( - flow_slug, step_slug, step_type, step_index, deps_count, - opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay, - condition_pattern, condition_not_pattern, when_unmet, when_failed - ) - VALUES ( - add_step.flow_slug, - add_step.step_slug, - COALESCE(add_step.step_type, 'single'), - next_idx, - COALESCE(array_length(add_step.deps_slugs, 1), 0), - add_step.max_attempts, - add_step.base_delay, - add_step.timeout, - add_step.start_delay, - add_step.condition_pattern, - add_step.condition_not_pattern, - add_step.when_unmet, - add_step.when_failed - ) - ON CONFLICT ON CONSTRAINT steps_pkey - DO UPDATE SET step_slug = EXCLUDED.step_slug - RETURNING * INTO result_step; - - -- Insert dependencies - INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug) - SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug - FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug) - WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0 - ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING; - - RETURN result_step; -END; -$$; -- Drop "add_step" function DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer, integer, text); diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 5151880b0..acade6fd5 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:UUZln51my4XRIQECtp1HayMW7tGjk5w8qLQhW0x7gEY= +h1:VPsRoEfaQqAEkEJMJL839iHbH9F6ZaPPa6kOgbWRoI4= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -16,4 +16,4 @@ h1:UUZln51my4XRIQECtp1HayMW7tGjk5w8qLQhW0x7gEY= 20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ= 20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc= 20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o= -20260108131350_pgflow_step_conditions.sql h1:7YMszmTlExOtx9PyYLB7hIc3RiMmtB4ZOc2EOQVfuPs= +20260108131350_pgflow_step_conditions.sql h1:lSVGOZvKeW8Jw0j95es8CB87nJxZyog9TLQs0L9PNm8= diff --git a/pkgs/core/supabase/tests/compare_flow_shapes/condition_mode_drift.test.sql b/pkgs/core/supabase/tests/compare_flow_shapes/condition_mode_drift.test.sql new file mode 100644 index 000000000..d280cc91d --- /dev/null +++ b/pkgs/core/supabase/tests/compare_flow_shapes/condition_mode_drift.test.sql @@ -0,0 +1,60 @@ +begin; +select plan(3); +select pgflow_tests.reset_db(); + +-- Setup: Create a flow with specific condition modes +select pgflow.create_flow('drift_test'); +select pgflow.add_step( + flow_slug => 'drift_test', + step_slug => 'step1', + when_unmet => 'skip', + when_failed => 'fail' +); + +-- Test: Detect whenUnmet drift +select is( + pgflow._compare_flow_shapes( + '{ + "steps": [ + {"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip-cascade", "whenFailed": "fail"} + ] + }'::jsonb, + pgflow._get_flow_shape('drift_test') + ), + ARRAY[$$Step at index 0: whenUnmet differs 'skip-cascade' vs 'skip'$$], + 'Should detect whenUnmet mismatch' +); + +-- Test: Detect whenFailed drift +select is( + pgflow._compare_flow_shapes( + '{ + "steps": [ + {"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "skip-cascade"} + ] + }'::jsonb, + pgflow._get_flow_shape('drift_test') + ), + ARRAY[$$Step at index 0: whenFailed differs 'skip-cascade' vs 'fail'$$], + 'Should detect whenFailed mismatch' +); + +-- Test: Detect both whenUnmet and whenFailed drift simultaneously +select is( + pgflow._compare_flow_shapes( + '{ + "steps": [ + {"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "fail", "whenFailed": "skip"} + ] + }'::jsonb, + pgflow._get_flow_shape('drift_test') + ), + ARRAY[ + $$Step at index 0: whenUnmet differs 'fail' vs 'skip'$$, + $$Step at index 0: whenFailed differs 'skip' vs 'fail'$$ + ], + 'Should detect both condition mode mismatches' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/create_flow_from_shape/basic_compile.test.sql b/pkgs/core/supabase/tests/create_flow_from_shape/basic_compile.test.sql index ee059957b..6d9b08a9d 100644 --- a/pkgs/core/supabase/tests/create_flow_from_shape/basic_compile.test.sql +++ b/pkgs/core/supabase/tests/create_flow_from_shape/basic_compile.test.sql @@ -1,5 +1,5 @@ begin; -select plan(4); +select plan(5); select pgflow_tests.reset_db(); -- Test: Compile a simple sequential flow from shape @@ -7,9 +7,9 @@ select pgflow._create_flow_from_shape( 'test_flow', '{ "steps": [ - {"slug": "first", "stepType": "single", "dependencies": []}, - {"slug": "second", "stepType": "single", "dependencies": ["first"]}, - {"slug": "third", "stepType": "single", "dependencies": ["second"]} + {"slug": "first", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "second", "stepType": "single", "dependencies": ["first"], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "third", "stepType": "single", "dependencies": ["second"], "whenUnmet": "skip", "whenFailed": "fail"} ] }'::jsonb ); @@ -42,5 +42,18 @@ select results_eq( 'Dependencies should be created correctly' ); +-- Verify shape round-trips correctly +select is( + pgflow._get_flow_shape('test_flow'), + '{ + "steps": [ + {"slug": "first", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "second", "stepType": "single", "dependencies": ["first"], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "third", "stepType": "single", "dependencies": ["second"], "whenUnmet": "skip", "whenFailed": "fail"} + ] + }'::jsonb, + 'Shape should round-trip correctly' +); + select finish(); rollback; diff --git a/pkgs/core/supabase/tests/create_flow_from_shape/condition_modes_compile.test.sql b/pkgs/core/supabase/tests/create_flow_from_shape/condition_modes_compile.test.sql new file mode 100644 index 000000000..40350a315 --- /dev/null +++ b/pkgs/core/supabase/tests/create_flow_from_shape/condition_modes_compile.test.sql @@ -0,0 +1,61 @@ +begin; +select plan(4); +select pgflow_tests.reset_db(); + +-- Test: Compile flow with non-default whenUnmet/whenFailed values +select pgflow._create_flow_from_shape( + 'condition_flow', + '{ + "steps": [ + {"slug": "always_run", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "cascade_skip", "stepType": "single", "dependencies": ["always_run"], "whenUnmet": "skip-cascade", "whenFailed": "skip"}, + {"slug": "fail_on_unmet", "stepType": "single", "dependencies": ["always_run"], "whenUnmet": "fail", "whenFailed": "skip-cascade"} + ] + }'::jsonb +); + +-- Verify when_unmet values were stored correctly +select results_eq( + $$ SELECT step_slug, when_unmet FROM pgflow.steps WHERE flow_slug = 'condition_flow' ORDER BY step_index $$, + $$ VALUES ('always_run', 'skip'), ('cascade_skip', 'skip-cascade'), ('fail_on_unmet', 'fail') $$, + 'when_unmet values should be stored correctly' +); + +-- Verify when_failed values were stored correctly +select results_eq( + $$ SELECT step_slug, when_failed FROM pgflow.steps WHERE flow_slug = 'condition_flow' ORDER BY step_index $$, + $$ VALUES ('always_run', 'fail'), ('cascade_skip', 'skip'), ('fail_on_unmet', 'skip-cascade') $$, + 'when_failed values should be stored correctly' +); + +-- Verify shape round-trips correctly with all condition mode variants +select is( + pgflow._get_flow_shape('condition_flow'), + '{ + "steps": [ + {"slug": "always_run", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "cascade_skip", "stepType": "single", "dependencies": ["always_run"], "whenUnmet": "skip-cascade", "whenFailed": "skip"}, + {"slug": "fail_on_unmet", "stepType": "single", "dependencies": ["always_run"], "whenUnmet": "fail", "whenFailed": "skip-cascade"} + ] + }'::jsonb, + 'Shape with condition modes should round-trip correctly' +); + +-- Verify comparison detects no differences for matching shape +select is( + pgflow._compare_flow_shapes( + pgflow._get_flow_shape('condition_flow'), + '{ + "steps": [ + {"slug": "always_run", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "cascade_skip", "stepType": "single", "dependencies": ["always_run"], "whenUnmet": "skip-cascade", "whenFailed": "skip"}, + {"slug": "fail_on_unmet", "stepType": "single", "dependencies": ["always_run"], "whenUnmet": "fail", "whenFailed": "skip-cascade"} + ] + }'::jsonb + ), + '{}'::text[], + 'Matching shapes should have no differences' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/create_flow_from_shape/map_step_compile.test.sql b/pkgs/core/supabase/tests/create_flow_from_shape/map_step_compile.test.sql index a0e26bb41..c2396efd5 100644 --- a/pkgs/core/supabase/tests/create_flow_from_shape/map_step_compile.test.sql +++ b/pkgs/core/supabase/tests/create_flow_from_shape/map_step_compile.test.sql @@ -7,8 +7,8 @@ select pgflow._create_flow_from_shape( 'map_flow', '{ "steps": [ - {"slug": "root_map", "stepType": "map", "dependencies": []}, - {"slug": "process", "stepType": "single", "dependencies": ["root_map"]} + {"slug": "root_map", "stepType": "map", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "process", "stepType": "single", "dependencies": ["root_map"], "whenUnmet": "skip", "whenFailed": "fail"} ] }'::jsonb ); @@ -25,11 +25,11 @@ select is( pgflow._get_flow_shape('map_flow'), '{ "steps": [ - {"slug": "root_map", "stepType": "map", "dependencies": []}, - {"slug": "process", "stepType": "single", "dependencies": ["root_map"]} + {"slug": "root_map", "stepType": "map", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "process", "stepType": "single", "dependencies": ["root_map"], "whenUnmet": "skip", "whenFailed": "fail"} ] }'::jsonb, - 'Shape should round-trip correctly' + 'Shape should round-trips correctly' ); select finish(); diff --git a/pkgs/core/supabase/tests/create_flow_from_shape/options_compile.test.sql b/pkgs/core/supabase/tests/create_flow_from_shape/options_compile.test.sql index 3e57bdd89..72a1542b6 100644 --- a/pkgs/core/supabase/tests/create_flow_from_shape/options_compile.test.sql +++ b/pkgs/core/supabase/tests/create_flow_from_shape/options_compile.test.sql @@ -7,7 +7,7 @@ select pgflow._create_flow_from_shape( 'flow_with_options', '{ "steps": [ - {"slug": "step1", "stepType": "single", "dependencies": []} + {"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"} ], "options": { "maxAttempts": 5, @@ -33,6 +33,8 @@ select pgflow._create_flow_from_shape( "slug": "step1", "stepType": "single", "dependencies": [], + "whenUnmet": "skip", + "whenFailed": "fail", "options": { "maxAttempts": 7, "baseDelay": 15, @@ -56,7 +58,7 @@ select pgflow._create_flow_from_shape( 'flow_no_options', '{ "steps": [ - {"slug": "step1", "stepType": "single", "dependencies": []} + {"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"} ] }'::jsonb ); @@ -84,6 +86,8 @@ select pgflow._create_flow_from_shape( "slug": "step1", "stepType": "single", "dependencies": [], + "whenUnmet": "skip", + "whenFailed": "fail", "options": { "timeout": 30 } diff --git a/pkgs/core/supabase/tests/ensure_flow_compiled/allow_data_loss_recompiles.test.sql b/pkgs/core/supabase/tests/ensure_flow_compiled/allow_data_loss_recompiles.test.sql index d77bda21d..41cd12f81 100644 --- a/pkgs/core/supabase/tests/ensure_flow_compiled/allow_data_loss_recompiles.test.sql +++ b/pkgs/core/supabase/tests/ensure_flow_compiled/allow_data_loss_recompiles.test.sql @@ -17,7 +17,7 @@ select is( 'allow_loss_flow', '{ "steps": [ - {"slug": "new_step", "stepType": "single", "dependencies": []} + {"slug": "new_step", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"} ] }'::jsonb, true -- allow_data_loss = true diff --git a/pkgs/core/supabase/tests/ensure_flow_compiled/auto_recompiles_when_local.test.sql b/pkgs/core/supabase/tests/ensure_flow_compiled/auto_recompiles_when_local.test.sql index e354d4741..895dc70b6 100644 --- a/pkgs/core/supabase/tests/ensure_flow_compiled/auto_recompiles_when_local.test.sql +++ b/pkgs/core/supabase/tests/ensure_flow_compiled/auto_recompiles_when_local.test.sql @@ -17,7 +17,7 @@ select is( 'local_flow', '{ "steps": [ - {"slug": "new_step", "stepType": "single", "dependencies": []} + {"slug": "new_step", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"} ] }'::jsonb ) as result diff --git a/pkgs/core/supabase/tests/ensure_flow_compiled/compiles_missing_flow.test.sql b/pkgs/core/supabase/tests/ensure_flow_compiled/compiles_missing_flow.test.sql index ac4f6cb91..1d083066c 100644 --- a/pkgs/core/supabase/tests/ensure_flow_compiled/compiles_missing_flow.test.sql +++ b/pkgs/core/supabase/tests/ensure_flow_compiled/compiles_missing_flow.test.sql @@ -10,7 +10,7 @@ select is( 'new_flow', '{ "steps": [ - {"slug": "first", "stepType": "single", "dependencies": []} + {"slug": "first", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"} ] }'::jsonb ) as result diff --git a/pkgs/core/supabase/tests/get_flow_shape/basic_shape.test.sql b/pkgs/core/supabase/tests/get_flow_shape/basic_shape.test.sql index 573e276dc..41792e00a 100644 --- a/pkgs/core/supabase/tests/get_flow_shape/basic_shape.test.sql +++ b/pkgs/core/supabase/tests/get_flow_shape/basic_shape.test.sql @@ -13,9 +13,9 @@ select is( pgflow._get_flow_shape('test_flow'), '{ "steps": [ - {"slug": "first", "stepType": "single", "dependencies": []}, - {"slug": "second", "stepType": "single", "dependencies": ["first"]}, - {"slug": "third", "stepType": "single", "dependencies": ["second"]} + {"slug": "first", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "second", "stepType": "single", "dependencies": ["first"], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "third", "stepType": "single", "dependencies": ["second"], "whenUnmet": "skip", "whenFailed": "fail"} ] }'::jsonb, 'Should return correct shape for simple sequential flow' diff --git a/pkgs/core/supabase/tests/get_flow_shape/map_steps.test.sql b/pkgs/core/supabase/tests/get_flow_shape/map_steps.test.sql index 292bb9a3d..efcdad088 100644 --- a/pkgs/core/supabase/tests/get_flow_shape/map_steps.test.sql +++ b/pkgs/core/supabase/tests/get_flow_shape/map_steps.test.sql @@ -20,8 +20,8 @@ select is( pgflow._get_flow_shape('map_flow'), '{ "steps": [ - {"slug": "root_map", "stepType": "map", "dependencies": []}, - {"slug": "process", "stepType": "single", "dependencies": ["root_map"]} + {"slug": "root_map", "stepType": "map", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "process", "stepType": "single", "dependencies": ["root_map"], "whenUnmet": "skip", "whenFailed": "fail"} ] }'::jsonb, 'Should correctly identify map step type' diff --git a/pkgs/core/supabase/tests/get_flow_shape/multiple_deps_sorted.test.sql b/pkgs/core/supabase/tests/get_flow_shape/multiple_deps_sorted.test.sql index a92436dd1..0a838016d 100644 --- a/pkgs/core/supabase/tests/get_flow_shape/multiple_deps_sorted.test.sql +++ b/pkgs/core/supabase/tests/get_flow_shape/multiple_deps_sorted.test.sql @@ -16,10 +16,10 @@ select is( pgflow._get_flow_shape('multi_deps'), '{ "steps": [ - {"slug": "alpha", "stepType": "single", "dependencies": []}, - {"slug": "beta", "stepType": "single", "dependencies": []}, - {"slug": "gamma", "stepType": "single", "dependencies": []}, - {"slug": "final", "stepType": "single", "dependencies": ["alpha", "beta", "gamma"]} + {"slug": "alpha", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "beta", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "gamma", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenFailed": "fail"}, + {"slug": "final", "stepType": "single", "dependencies": ["alpha", "beta", "gamma"], "whenUnmet": "skip", "whenFailed": "fail"} ] }'::jsonb, 'Dependencies should be sorted alphabetically' diff --git a/pkgs/dsl/__tests__/runtime/flow-shape.test.ts b/pkgs/dsl/__tests__/runtime/flow-shape.test.ts index 6b7971568..50e8781cf 100644 --- a/pkgs/dsl/__tests__/runtime/flow-shape.test.ts +++ b/pkgs/dsl/__tests__/runtime/flow-shape.test.ts @@ -61,6 +61,8 @@ describe('extractFlowShape', () => { slug: 'step1', stepType: 'single', dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', }); }); @@ -108,6 +110,8 @@ describe('extractFlowShape', () => { slug: 'step1', stepType: 'single', dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', options: { maxAttempts: 3, baseDelay: 5, @@ -129,6 +133,8 @@ describe('extractFlowShape', () => { slug: 'step1', stepType: 'single', dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', }); expect('options' in shape.steps[0]).toBe(false); }); @@ -160,6 +166,8 @@ describe('extractFlowShape', () => { slug: 'process_items', stepType: 'map', dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', }); }); @@ -175,6 +183,8 @@ describe('extractFlowShape', () => { slug: 'process', stepType: 'map', dependencies: ['get_items'], + whenUnmet: 'skip', + whenFailed: 'fail', }); }); }); @@ -189,7 +199,12 @@ describe('extractFlowShape', () => { }) .step({ slug: 'website' }, (flowInput) => ({ content: flowInput.url })) .step( - { slug: 'sentiment', dependsOn: ['website'], maxAttempts: 5, timeout: 30 }, + { + slug: 'sentiment', + dependsOn: ['website'], + maxAttempts: 5, + timeout: 30, + }, () => ({ score: 0.8 }) ) .step({ slug: 'summary', dependsOn: ['website'] }, () => ({ @@ -209,11 +224,15 @@ describe('extractFlowShape', () => { slug: 'website', stepType: 'single', dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', }, { slug: 'sentiment', stepType: 'single', dependencies: ['website'], + whenUnmet: 'skip', + whenFailed: 'fail', options: { maxAttempts: 5, timeout: 30, @@ -223,11 +242,15 @@ describe('extractFlowShape', () => { slug: 'summary', stepType: 'single', dependencies: ['website'], + whenUnmet: 'skip', + whenFailed: 'fail', }, { slug: 'save_to_db', stepType: 'single', dependencies: ['sentiment', 'summary'], // sorted alphabetically + whenUnmet: 'skip', + whenFailed: 'fail', }, ], options: { @@ -263,6 +286,8 @@ describe('compareFlowShapes', () => { slug: 'step1', stepType: 'single', dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', }, ], }; @@ -290,7 +315,13 @@ describe('compareFlowShapes', () => { }; const b: FlowShape = { steps: [ - { slug: 'step1', stepType: 'single', dependencies: [] }, + { + slug: 'step1', + stepType: 'single', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, ], }; @@ -305,7 +336,13 @@ describe('compareFlowShapes', () => { it('should detect extra step at end', () => { const a: FlowShape = { steps: [ - { slug: 'step1', stepType: 'single', dependencies: [] }, + { + slug: 'step1', + stepType: 'single', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, ], }; const b: FlowShape = { steps: [] }; @@ -321,14 +358,38 @@ describe('compareFlowShapes', () => { it('should detect different steps at same positions', () => { const a: FlowShape = { steps: [ - { slug: 'step_a', stepType: 'single', dependencies: [] }, - { slug: 'step_b', stepType: 'single', dependencies: [] }, + { + slug: 'step_a', + stepType: 'single', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, + { + slug: 'step_b', + stepType: 'single', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, ], }; const b: FlowShape = { steps: [ - { slug: 'step_c', stepType: 'single', dependencies: [] }, - { slug: 'step_d', stepType: 'single', dependencies: [] }, + { + slug: 'step_c', + stepType: 'single', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, + { + slug: 'step_d', + stepType: 'single', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, ], }; @@ -347,14 +408,38 @@ describe('compareFlowShapes', () => { it('should detect steps in different order', () => { const a: FlowShape = { steps: [ - { slug: 'step_a', stepType: 'single', dependencies: [] }, - { slug: 'step_b', stepType: 'single', dependencies: [] }, + { + slug: 'step_a', + stepType: 'single', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, + { + slug: 'step_b', + stepType: 'single', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, ], }; const b: FlowShape = { steps: [ - { slug: 'step_b', stepType: 'single', dependencies: [] }, - { slug: 'step_a', stepType: 'single', dependencies: [] }, + { + slug: 'step_b', + stepType: 'single', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, + { + slug: 'step_a', + stepType: 'single', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, ], }; @@ -373,12 +458,24 @@ describe('compareFlowShapes', () => { it('should detect stepType difference', () => { const a: FlowShape = { steps: [ - { slug: 'step1', stepType: 'single', dependencies: [] }, + { + slug: 'step1', + stepType: 'single', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, ], }; const b: FlowShape = { steps: [ - { slug: 'step1', stepType: 'map', dependencies: [] }, + { + slug: 'step1', + stepType: 'map', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, ], }; @@ -394,7 +491,13 @@ describe('compareFlowShapes', () => { it('should detect added dependency', () => { const a: FlowShape = { steps: [ - { slug: 'step1', stepType: 'single', dependencies: [] }, + { + slug: 'step1', + stepType: 'single', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, ], }; const b: FlowShape = { @@ -403,6 +506,8 @@ describe('compareFlowShapes', () => { slug: 'step1', stepType: 'single', dependencies: ['step0'], + whenUnmet: 'skip', + whenFailed: 'fail', }, ], }; @@ -421,6 +526,8 @@ describe('compareFlowShapes', () => { slug: 'step1', stepType: 'single', dependencies: ['dep1', 'dep2'], + whenUnmet: 'skip', + whenFailed: 'fail', }, ], }; @@ -430,6 +537,8 @@ describe('compareFlowShapes', () => { slug: 'step1', stepType: 'single', dependencies: ['dep1'], + whenUnmet: 'skip', + whenFailed: 'fail', }, ], }; @@ -448,6 +557,8 @@ describe('compareFlowShapes', () => { slug: 'step1', stepType: 'single', dependencies: ['old_dep'], + whenUnmet: 'skip', + whenFailed: 'fail', }, ], }; @@ -457,6 +568,8 @@ describe('compareFlowShapes', () => { slug: 'step1', stepType: 'single', dependencies: ['new_dep'], + whenUnmet: 'skip', + whenFailed: 'fail', }, ], }; @@ -473,12 +586,15 @@ describe('compareFlowShapes', () => { it('should match flows with same structure but different DSL options', () => { // This is the key behavior: options are in shape for creation, // but don't affect shape matching (runtime tunable via SQL) - const flowA = new Flow({ slug: 'test_flow', maxAttempts: 3 }).step( - { slug: 'step1', timeout: 60 }, - (flowInput) => flowInput - ); + const flowA = new Flow({ + slug: 'test_flow', + maxAttempts: 3, + }).step({ slug: 'step1', timeout: 60 }, (flowInput) => flowInput); - const flowB = new Flow({ slug: 'test_flow', maxAttempts: 10 }).step( + const flowB = new Flow({ + slug: 'test_flow', + maxAttempts: 10, + }).step( { slug: 'step1', timeout: 300, startDelay: 100 }, (flowInput) => flowInput ); @@ -490,7 +606,10 @@ describe('compareFlowShapes', () => { expect(shapeA.options).toEqual({ maxAttempts: 3 }); expect(shapeB.options).toEqual({ maxAttempts: 10 }); expect(shapeA.steps[0].options).toEqual({ timeout: 60 }); - expect(shapeB.steps[0].options).toEqual({ timeout: 300, startDelay: 100 }); + expect(shapeB.steps[0].options).toEqual({ + timeout: 300, + startDelay: 100, + }); // But comparison ignores options - only structure matters const result = compareFlowShapes(shapeA, shapeB); @@ -507,6 +626,8 @@ describe('compareFlowShapes', () => { slug: 'step1', stepType: 'single', dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', }, ], }; @@ -516,11 +637,15 @@ describe('compareFlowShapes', () => { slug: 'step1', stepType: 'map', dependencies: ['dep1'], + whenUnmet: 'skip', + whenFailed: 'fail', }, { slug: 'step2', stepType: 'single', dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', }, ], }; diff --git a/pkgs/dsl/src/flow-shape.ts b/pkgs/dsl/src/flow-shape.ts index d6ce295c7..d0fa85911 100644 --- a/pkgs/dsl/src/flow-shape.ts +++ b/pkgs/dsl/src/flow-shape.ts @@ -1,4 +1,4 @@ -import { AnyFlow } from './dsl.js'; +import { AnyFlow, WhenUnmetMode, RetriesExhaustedMode } from './dsl.js'; // ======================== // SHAPE TYPE DEFINITIONS @@ -31,11 +31,16 @@ export interface FlowShapeOptions { * The `options` field is included for flow creation but NOT compared during * shape comparison. Options can be tuned at runtime via SQL without * requiring recompilation. See: /deploy/tune-flow-config/ + * + * `whenUnmet` and `whenFailed` ARE structural - they affect DAG execution + * semantics and must match between worker and database. */ export interface StepShape { slug: string; stepType: 'single' | 'map'; dependencies: string[]; // sorted alphabetically for deterministic comparison + whenUnmet: WhenUnmetMode; + whenFailed: RetriesExhaustedMode; options?: StepShapeOptions; } @@ -107,6 +112,9 @@ export function extractFlowShape(flow: AnyFlow): FlowShape { stepType: stepDef.stepType ?? 'single', // Sort dependencies alphabetically for deterministic comparison dependencies: [...stepDef.dependencies].sort(), + // Condition modes are structural - they affect DAG execution semantics + whenUnmet: stepDef.options.whenUnmet ?? 'skip', + whenFailed: stepDef.options.retriesExhausted ?? 'fail', }; // Only include options if at least one is defined @@ -175,9 +183,13 @@ export function compareFlowShapes( const bStep = b.steps[i]; if (!aStep) { - differences.push(`Step at index ${i}: missing in first shape (second has '${bStep.slug}')`); + differences.push( + `Step at index ${i}: missing in first shape (second has '${bStep.slug}')` + ); } else if (!bStep) { - differences.push(`Step at index ${i}: missing in second shape (first has '${aStep.slug}')`); + differences.push( + `Step at index ${i}: missing in second shape (first has '${aStep.slug}')` + ); } else { compareSteps(aStep, bStep, i, differences); } @@ -217,7 +229,22 @@ function compareSteps( const bDeps = b.dependencies.join(','); if (aDeps !== bDeps) { differences.push( - `Step at index ${index}: dependencies differ [${a.dependencies.join(', ')}] vs [${b.dependencies.join(', ')}]` + `Step at index ${index}: dependencies differ [${a.dependencies.join( + ', ' + )}] vs [${b.dependencies.join(', ')}]` + ); + } + + // Compare condition modes (structural - affects DAG execution semantics) + if (a.whenUnmet !== b.whenUnmet) { + differences.push( + `Step at index ${index}: whenUnmet differs '${a.whenUnmet}' vs '${b.whenUnmet}'` + ); + } + + if (a.whenFailed !== b.whenFailed) { + differences.push( + `Step at index ${index}: whenFailed differs '${a.whenFailed}' vs '${b.whenFailed}'` ); } } diff --git a/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts b/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts index 3fa616090..ebd35bdd0 100644 --- a/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts +++ b/pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts @@ -9,11 +9,12 @@ import postgresLib from 'postgres'; import { integrationConfig } from '../../config.ts'; // Define a minimal test flow -const TestCompilationFlow = new Flow<{ value: number }>({ slug: 'test_compilation_flow' }) - .step({ slug: 'double' }, async (flowInput) => { - await delay(1); - return flowInput.value * 2; - }); +const TestCompilationFlow = new Flow<{ value: number }>({ + slug: 'test_compilation_flow', +}).step({ slug: 'double' }, async (flowInput) => { + await delay(1); + return flowInput.value * 2; +}); const noop = () => {}; @@ -43,7 +44,9 @@ function createPlatformAdapterWithLocalEnv( return { ...baseAdapter, - get isLocalEnvironment() { return isLocal; }, + get isLocalEnvironment() { + return isLocal; + }, }; } @@ -56,7 +59,11 @@ Deno.test( const [flowBefore] = await sql` SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' `; - assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup'); + assertEquals( + flowBefore, + undefined, + 'Flow should not exist before worker startup' + ); // Create worker (compilation happens during acknowledgeStart) const worker = createFlowWorker( @@ -86,7 +93,11 @@ Deno.test( const [flowAfter] = await sql` SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' `; - assertEquals(flowAfter?.flow_slug, 'test_compilation_flow', 'Flow should be created'); + assertEquals( + flowAfter?.flow_slug, + 'test_compilation_flow', + 'Flow should be created' + ); // Verify step was created const steps = await sql` @@ -113,7 +124,11 @@ Deno.test( const [flowBefore] = await sql` SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' `; - assertEquals(flowBefore?.flow_slug, 'test_compilation_flow', 'Flow should exist'); + assertEquals( + flowBefore?.flow_slug, + 'test_compilation_flow', + 'Flow should exist' + ); // Create and start worker const worker = createFlowWorker( @@ -143,7 +158,11 @@ Deno.test( const [flowAfter] = await sql` SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' `; - assertEquals(flowAfter?.flow_slug, 'test_compilation_flow', 'Flow should still exist'); + assertEquals( + flowAfter?.flow_slug, + 'test_compilation_flow', + 'Flow should still exist' + ); } finally { await worker.stop(); } @@ -196,9 +215,17 @@ Deno.test( await delay(200); // Verify error was thrown - assertEquals(caughtErrors.length > 0, true, 'Should have caught an error'); + assertEquals( + caughtErrors.length > 0, + true, + 'Should have caught an error' + ); const caughtError = caughtErrors[0]; - assertEquals(caughtError.name, 'FlowShapeMismatchError', 'Error should be FlowShapeMismatchError'); + assertEquals( + caughtError.name, + 'FlowShapeMismatchError', + 'Error should be FlowShapeMismatchError' + ); assertEquals( caughtError.message.includes('shape mismatch'), true, @@ -254,7 +281,11 @@ Deno.test( SELECT step_slug FROM pgflow.steps WHERE flow_slug = 'test_compilation_flow' ORDER BY step_slug `; assertEquals(steps.length, 1, 'Should have 1 step after recompilation'); - assertEquals(steps[0].step_slug, 'double', 'Step should be "double" after recompilation'); + assertEquals( + steps[0].step_slug, + 'double', + 'Step should be "double" after recompilation' + ); } finally { await worker.stop(); } @@ -269,7 +300,15 @@ Deno.test( const CONCURRENT = 50; // 50 separate connections const flowSlug = `concurrent_test_${Date.now()}`; const shape = { - steps: [{ slug: 'step1', stepType: 'single', dependencies: [] }], + steps: [ + { + slug: 'step1', + stepType: 'single', + dependencies: [], + whenUnmet: 'skip', + whenFailed: 'fail', + }, + ], }; // Create N SEPARATE connections (critical for true concurrency) @@ -283,8 +322,9 @@ Deno.test( // Fire all compilations simultaneously on separate connections // Note: Must use conn.json() for proper jsonb parameter passing const results = await Promise.all( - connections.map((conn) => - conn`SELECT pgflow.ensure_flow_compiled( + connections.map( + (conn) => + conn`SELECT pgflow.ensure_flow_compiled( ${flowSlug}, ${conn.json(shape)} ) as result` @@ -314,7 +354,9 @@ Deno.test( assertEquals(stepCount.count, 1, 'Exactly 1 step should exist'); } finally { // Cleanup - await sql`SELECT pgflow.delete_flow_and_data(${flowSlug})`.catch(() => {}); + await sql`SELECT pgflow.delete_flow_and_data(${flowSlug})`.catch( + () => {} + ); await Promise.all(connections.map((c) => c.end())); } }) @@ -331,14 +373,18 @@ Deno.test( const [flowBefore] = await sql` SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' `; - assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup'); + assertEquals( + flowBefore, + undefined, + 'Flow should not exist before worker startup' + ); // Create worker with compilation: false const worker = createFlowWorker( TestCompilationFlow, { sql, - compilation: false, // SKIP compilation + compilation: false, // SKIP compilation maxConcurrent: 1, batchSize: 10, maxPollSeconds: 1, @@ -359,7 +405,11 @@ Deno.test( const [flowAfter] = await sql` SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' `; - assertEquals(flowAfter, undefined, 'Flow should NOT be created when compilation skipped'); + assertEquals( + flowAfter, + undefined, + 'Flow should NOT be created when compilation skipped' + ); } finally { await worker.stop(); } @@ -375,14 +425,18 @@ Deno.test( const [flowBefore] = await sql` SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' `; - assertEquals(flowBefore, undefined, 'Flow should not exist before worker startup'); + assertEquals( + flowBefore, + undefined, + 'Flow should not exist before worker startup' + ); // Create worker with compilation: {} (explicit) const worker = createFlowWorker( TestCompilationFlow, { sql, - compilation: {}, // EXPLICIT empty object = enable compilation + compilation: {}, // EXPLICIT empty object = enable compilation maxConcurrent: 1, batchSize: 10, maxPollSeconds: 1, @@ -403,7 +457,11 @@ Deno.test( const [flowAfter] = await sql` SELECT * FROM pgflow.flows WHERE flow_slug = 'test_compilation_flow' `; - assertEquals(flowAfter?.flow_slug, 'test_compilation_flow', 'Flow should be created when compilation: {}'); + assertEquals( + flowAfter?.flow_slug, + 'test_compilation_flow', + 'Flow should be created when compilation: {}' + ); } finally { await worker.stop(); } @@ -426,7 +484,7 @@ Deno.test( TestCompilationFlow, { sql, - compilation: false, // Skip compilation check + compilation: false, // Skip compilation check maxConcurrent: 1, batchSize: 10, maxPollSeconds: 1, @@ -447,8 +505,16 @@ Deno.test( const workers = await sql` SELECT * FROM pgflow.workers WHERE worker_id = ${workerId} `; - assertEquals(workers.length, 1, 'Worker should be registered even when skipping compilation'); - assertEquals(workers[0].queue_name, 'test_compilation_flow', 'Worker should be registered for the correct queue'); + assertEquals( + workers.length, + 1, + 'Worker should be registered even when skipping compilation' + ); + assertEquals( + workers[0].queue_name, + 'test_compilation_flow', + 'Worker should be registered for the correct queue' + ); } finally { await worker.stop(); } @@ -474,7 +540,7 @@ Deno.test( TestCompilationFlow, // Has 'double' step, not 'old_step' { sql, - compilation: { allowDataLoss: true }, // Allow destructive recompile in production + compilation: { allowDataLoss: true }, // Allow destructive recompile in production maxConcurrent: 1, batchSize: 10, maxPollSeconds: 1, @@ -497,8 +563,16 @@ Deno.test( const steps = await sql` SELECT step_slug FROM pgflow.steps WHERE flow_slug = 'test_compilation_flow' ORDER BY step_slug `; - assertEquals(steps.length, 1, 'Should have 1 step after recompilation with allowDataLoss'); - assertEquals(steps[0].step_slug, 'double', 'Step should be "double" after recompilation'); + assertEquals( + steps.length, + 1, + 'Should have 1 step after recompilation with allowDataLoss' + ); + assertEquals( + steps[0].step_slug, + 'double', + 'Step should be "double" after recompilation' + ); } finally { await worker.stop(); } @@ -524,7 +598,7 @@ Deno.test( TestCompilationFlow, // Has only 'double' step { sql, - compilation: { allowDataLoss: false }, // Explicit false + compilation: { allowDataLoss: false }, // Explicit false maxConcurrent: 1, batchSize: 10, maxPollSeconds: 1, @@ -552,9 +626,17 @@ Deno.test( await delay(200); // Verify error was thrown - assertEquals(caughtErrors.length > 0, true, 'Should have caught an error'); + assertEquals( + caughtErrors.length > 0, + true, + 'Should have caught an error' + ); const caughtError = caughtErrors[0]; - assertEquals(caughtError.name, 'FlowShapeMismatchError', 'Error should be FlowShapeMismatchError'); + assertEquals( + caughtError.name, + 'FlowShapeMismatchError', + 'Error should be FlowShapeMismatchError' + ); assertEquals( caughtError.message.includes('shape mismatch'), true,