-
Notifications
You must be signed in to change notification settings - Fork 15
feat: add whenUnmet and whenFailed condition modes to flow shapes #585
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 01-08-implement_ifnot_negative_condition_pattern
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -16,6 +16,270 @@ END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipp | |||||||||
| CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text); | ||||||||||
| -- Modify "steps" table | ||||||||||
| ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "condition_not_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail'; | ||||||||||
| -- Modify "_compare_flow_shapes" function | ||||||||||
| CREATE OR REPLACE FUNCTION "pgflow"."_compare_flow_shapes" ("p_local" jsonb, "p_db" jsonb) RETURNS text[] LANGUAGE plpgsql STABLE SET "search_path" = '' AS $BODY$ | ||||||||||
| DECLARE | ||||||||||
| v_differences text[] := '{}'; | ||||||||||
| v_local_steps jsonb; | ||||||||||
| v_db_steps jsonb; | ||||||||||
| v_local_count int; | ||||||||||
| v_db_count int; | ||||||||||
| v_max_count int; | ||||||||||
| v_idx int; | ||||||||||
| v_local_step jsonb; | ||||||||||
| v_db_step jsonb; | ||||||||||
| v_local_deps text; | ||||||||||
| v_db_deps text; | ||||||||||
| BEGIN | ||||||||||
| v_local_steps := p_local->'steps'; | ||||||||||
| v_db_steps := p_db->'steps'; | ||||||||||
| v_local_count := jsonb_array_length(COALESCE(v_local_steps, '[]'::jsonb)); | ||||||||||
| v_db_count := jsonb_array_length(COALESCE(v_db_steps, '[]'::jsonb)); | ||||||||||
|
|
||||||||||
| -- Compare step counts | ||||||||||
| IF v_local_count != v_db_count THEN | ||||||||||
| v_differences := array_append( | ||||||||||
| v_differences, | ||||||||||
| format('Step count differs: %s vs %s', v_local_count, v_db_count) | ||||||||||
| ); | ||||||||||
| END IF; | ||||||||||
|
|
||||||||||
| -- Compare steps by index | ||||||||||
| v_max_count := GREATEST(v_local_count, v_db_count); | ||||||||||
|
|
||||||||||
| FOR v_idx IN 0..(v_max_count - 1) LOOP | ||||||||||
| v_local_step := v_local_steps->v_idx; | ||||||||||
| v_db_step := v_db_steps->v_idx; | ||||||||||
|
|
||||||||||
| IF v_local_step IS NULL THEN | ||||||||||
| v_differences := array_append( | ||||||||||
| v_differences, | ||||||||||
| format( | ||||||||||
| $$Step at index %s: missing in first shape (second has '%s')$$, | ||||||||||
| v_idx, | ||||||||||
| v_db_step->>'slug' | ||||||||||
| ) | ||||||||||
| ); | ||||||||||
| ELSIF v_db_step IS NULL THEN | ||||||||||
| v_differences := array_append( | ||||||||||
| v_differences, | ||||||||||
| format( | ||||||||||
| $$Step at index %s: missing in second shape (first has '%s')$$, | ||||||||||
| v_idx, | ||||||||||
| v_local_step->>'slug' | ||||||||||
| ) | ||||||||||
| ); | ||||||||||
| ELSE | ||||||||||
| -- Compare slug | ||||||||||
| IF v_local_step->>'slug' != v_db_step->>'slug' THEN | ||||||||||
| v_differences := array_append( | ||||||||||
| v_differences, | ||||||||||
| format( | ||||||||||
| $$Step at index %s: slug differs '%s' vs '%s'$$, | ||||||||||
| v_idx, | ||||||||||
| v_local_step->>'slug', | ||||||||||
| v_db_step->>'slug' | ||||||||||
| ) | ||||||||||
| ); | ||||||||||
| END IF; | ||||||||||
|
|
||||||||||
| -- Compare step type | ||||||||||
| IF v_local_step->>'stepType' != v_db_step->>'stepType' THEN | ||||||||||
| v_differences := array_append( | ||||||||||
| v_differences, | ||||||||||
| format( | ||||||||||
| $$Step at index %s: type differs '%s' vs '%s'$$, | ||||||||||
| v_idx, | ||||||||||
| v_local_step->>'stepType', | ||||||||||
| v_db_step->>'stepType' | ||||||||||
| ) | ||||||||||
| ); | ||||||||||
| END IF; | ||||||||||
|
|
||||||||||
| -- Compare dependencies (convert arrays to comma-separated strings) | ||||||||||
| SELECT string_agg(dep, ', ' ORDER BY dep) | ||||||||||
| INTO v_local_deps | ||||||||||
| FROM jsonb_array_elements_text(COALESCE(v_local_step->'dependencies', '[]'::jsonb)) AS dep; | ||||||||||
|
|
||||||||||
| SELECT string_agg(dep, ', ' ORDER BY dep) | ||||||||||
| INTO v_db_deps | ||||||||||
| FROM jsonb_array_elements_text(COALESCE(v_db_step->'dependencies', '[]'::jsonb)) AS dep; | ||||||||||
|
|
||||||||||
| IF COALESCE(v_local_deps, '') != COALESCE(v_db_deps, '') THEN | ||||||||||
| v_differences := array_append( | ||||||||||
| v_differences, | ||||||||||
| format( | ||||||||||
| $$Step at index %s: dependencies differ [%s] vs [%s]$$, | ||||||||||
| v_idx, | ||||||||||
| COALESCE(v_local_deps, ''), | ||||||||||
| COALESCE(v_db_deps, '') | ||||||||||
| ) | ||||||||||
| ); | ||||||||||
| END IF; | ||||||||||
|
|
||||||||||
| -- Compare whenUnmet (structural - affects DAG execution semantics) | ||||||||||
| IF v_local_step->>'whenUnmet' != v_db_step->>'whenUnmet' THEN | ||||||||||
| v_differences := array_append( | ||||||||||
| v_differences, | ||||||||||
| format( | ||||||||||
| $$Step at index %s: whenUnmet differs '%s' vs '%s'$$, | ||||||||||
| v_idx, | ||||||||||
| v_local_step->>'whenUnmet', | ||||||||||
| v_db_step->>'whenUnmet' | ||||||||||
| ) | ||||||||||
| ); | ||||||||||
| END IF; | ||||||||||
|
|
||||||||||
| -- Compare whenFailed (structural - affects DAG execution semantics) | ||||||||||
| IF v_local_step->>'whenFailed' != v_db_step->>'whenFailed' THEN | ||||||||||
| v_differences := array_append( | ||||||||||
| v_differences, | ||||||||||
| format( | ||||||||||
| $$Step at index %s: whenFailed differs '%s' vs '%s'$$, | ||||||||||
| v_idx, | ||||||||||
| v_local_step->>'whenFailed', | ||||||||||
| v_db_step->>'whenFailed' | ||||||||||
| ) | ||||||||||
| ); | ||||||||||
| END IF; | ||||||||||
| END IF; | ||||||||||
| END LOOP; | ||||||||||
|
|
||||||||||
| RETURN v_differences; | ||||||||||
| END; | ||||||||||
| $BODY$; | ||||||||||
| -- Create "add_step" function | ||||||||||
| CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "condition_not_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$ | ||||||||||
| DECLARE | ||||||||||
| result_step pgflow.steps; | ||||||||||
| next_idx int; | ||||||||||
| BEGIN | ||||||||||
| -- Validate map step constraints | ||||||||||
| -- Map steps can have either: | ||||||||||
| -- 0 dependencies (root map - maps over flow input array) | ||||||||||
| -- 1 dependency (dependent map - maps over dependency output array) | ||||||||||
| IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN | ||||||||||
| RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', | ||||||||||
| add_step.step_slug, | ||||||||||
| COALESCE(array_length(add_step.deps_slugs, 1), 0), | ||||||||||
| array_to_string(add_step.deps_slugs, ', '); | ||||||||||
| END IF; | ||||||||||
|
|
||||||||||
| -- Get next step index | ||||||||||
| SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx | ||||||||||
| FROM pgflow.steps s | ||||||||||
| WHERE s.flow_slug = add_step.flow_slug; | ||||||||||
|
|
||||||||||
| -- Create the step | ||||||||||
| INSERT INTO pgflow.steps ( | ||||||||||
| flow_slug, step_slug, step_type, step_index, deps_count, | ||||||||||
| opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay, | ||||||||||
| condition_pattern, condition_not_pattern, when_unmet, when_failed | ||||||||||
| ) | ||||||||||
| VALUES ( | ||||||||||
| add_step.flow_slug, | ||||||||||
| add_step.step_slug, | ||||||||||
| COALESCE(add_step.step_type, 'single'), | ||||||||||
| next_idx, | ||||||||||
| COALESCE(array_length(add_step.deps_slugs, 1), 0), | ||||||||||
| add_step.max_attempts, | ||||||||||
| add_step.base_delay, | ||||||||||
| add_step.timeout, | ||||||||||
| add_step.start_delay, | ||||||||||
| add_step.condition_pattern, | ||||||||||
| add_step.condition_not_pattern, | ||||||||||
| add_step.when_unmet, | ||||||||||
| add_step.when_failed | ||||||||||
| ) | ||||||||||
| ON CONFLICT ON CONSTRAINT steps_pkey | ||||||||||
| DO UPDATE SET step_slug = EXCLUDED.step_slug | ||||||||||
| RETURNING * INTO result_step; | ||||||||||
|
|
||||||||||
| -- Insert dependencies | ||||||||||
| INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug) | ||||||||||
| SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug | ||||||||||
| FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug) | ||||||||||
| WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0 | ||||||||||
| ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING; | ||||||||||
|
|
||||||||||
| RETURN result_step; | ||||||||||
| END; | ||||||||||
| $$; | ||||||||||
| -- Modify "_create_flow_from_shape" function | ||||||||||
| CREATE OR REPLACE FUNCTION "pgflow"."_create_flow_from_shape" ("p_flow_slug" text, "p_shape" jsonb) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$ | ||||||||||
| DECLARE | ||||||||||
| v_step jsonb; | ||||||||||
| v_deps text[]; | ||||||||||
| v_flow_options jsonb; | ||||||||||
| v_step_options jsonb; | ||||||||||
| BEGIN | ||||||||||
| -- Extract flow-level options (may be null) | ||||||||||
| v_flow_options := p_shape->'options'; | ||||||||||
|
|
||||||||||
| -- Create the flow with options (NULL = use default) | ||||||||||
| PERFORM pgflow.create_flow( | ||||||||||
| p_flow_slug, | ||||||||||
| (v_flow_options->>'maxAttempts')::int, | ||||||||||
| (v_flow_options->>'baseDelay')::int, | ||||||||||
| (v_flow_options->>'timeout')::int | ||||||||||
| ); | ||||||||||
|
|
||||||||||
| -- Iterate over steps in order and add each one | ||||||||||
| FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps') | ||||||||||
| LOOP | ||||||||||
| -- Convert dependencies jsonb array to text array | ||||||||||
| SELECT COALESCE(array_agg(dep), '{}') | ||||||||||
| INTO v_deps | ||||||||||
| FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep; | ||||||||||
|
|
||||||||||
| -- Extract step options (may be null) | ||||||||||
| v_step_options := v_step->'options'; | ||||||||||
|
|
||||||||||
| -- Add the step with options (NULL = use default/inherit) | ||||||||||
| PERFORM pgflow.add_step( | ||||||||||
| flow_slug => p_flow_slug, | ||||||||||
| step_slug => v_step->>'slug', | ||||||||||
| deps_slugs => v_deps, | ||||||||||
| max_attempts => (v_step_options->>'maxAttempts')::int, | ||||||||||
| base_delay => (v_step_options->>'baseDelay')::int, | ||||||||||
| timeout => (v_step_options->>'timeout')::int, | ||||||||||
| start_delay => (v_step_options->>'startDelay')::int, | ||||||||||
| step_type => v_step->>'stepType', | ||||||||||
| when_unmet => v_step->>'whenUnmet', | ||||||||||
| when_failed => v_step->>'whenFailed' | ||||||||||
|
Comment on lines
+248
to
+249
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical NULL handling bug: Same issue as in -- Fix by using COALESCE:
when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'),
when_failed => COALESCE(v_step->>'whenFailed', 'fail')This creates a production risk during deployment or when processing legacy shapes.
Suggested change
Spotted by Graphite Agent |
||||||||||
| ); | ||||||||||
| END LOOP; | ||||||||||
| END; | ||||||||||
| $$; | ||||||||||
| -- Modify "_get_flow_shape" function | ||||||||||
| CREATE OR REPLACE FUNCTION "pgflow"."_get_flow_shape" ("p_flow_slug" text) RETURNS jsonb LANGUAGE sql STABLE SET "search_path" = '' AS $$ | ||||||||||
| SELECT jsonb_build_object( | ||||||||||
| 'steps', | ||||||||||
| COALESCE( | ||||||||||
| jsonb_agg( | ||||||||||
| jsonb_build_object( | ||||||||||
| 'slug', step.step_slug, | ||||||||||
| 'stepType', step.step_type, | ||||||||||
| 'dependencies', COALESCE( | ||||||||||
| ( | ||||||||||
| SELECT jsonb_agg(dep.dep_slug ORDER BY dep.dep_slug) | ||||||||||
| FROM pgflow.deps AS dep | ||||||||||
| WHERE dep.flow_slug = step.flow_slug | ||||||||||
| AND dep.step_slug = step.step_slug | ||||||||||
| ), | ||||||||||
| '[]'::jsonb | ||||||||||
| ), | ||||||||||
| 'whenUnmet', step.when_unmet, | ||||||||||
| 'whenFailed', step.when_failed | ||||||||||
| ) | ||||||||||
| ORDER BY step.step_index | ||||||||||
| ), | ||||||||||
| '[]'::jsonb | ||||||||||
| ) | ||||||||||
| ) | ||||||||||
| FROM pgflow.steps AS step | ||||||||||
| WHERE step.flow_slug = p_flow_slug; | ||||||||||
| $$; | ||||||||||
| -- Create "_cascade_force_skip_steps" function | ||||||||||
| CREATE FUNCTION "pgflow"."_cascade_force_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$ | ||||||||||
| DECLARE | ||||||||||
|
|
@@ -1454,62 +1718,5 @@ with tasks as ( | |||||||||
| dep_out.run_id = st.run_id and | ||||||||||
| dep_out.step_slug = st.step_slug | ||||||||||
| $$; | ||||||||||
| -- Create "add_step" function | ||||||||||
| CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "condition_not_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$ | ||||||||||
| DECLARE | ||||||||||
| result_step pgflow.steps; | ||||||||||
| next_idx int; | ||||||||||
| BEGIN | ||||||||||
| -- Validate map step constraints | ||||||||||
| -- Map steps can have either: | ||||||||||
| -- 0 dependencies (root map - maps over flow input array) | ||||||||||
| -- 1 dependency (dependent map - maps over dependency output array) | ||||||||||
| IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN | ||||||||||
| RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', | ||||||||||
| add_step.step_slug, | ||||||||||
| COALESCE(array_length(add_step.deps_slugs, 1), 0), | ||||||||||
| array_to_string(add_step.deps_slugs, ', '); | ||||||||||
| END IF; | ||||||||||
|
|
||||||||||
| -- Get next step index | ||||||||||
| SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx | ||||||||||
| FROM pgflow.steps s | ||||||||||
| WHERE s.flow_slug = add_step.flow_slug; | ||||||||||
|
|
||||||||||
| -- Create the step | ||||||||||
| INSERT INTO pgflow.steps ( | ||||||||||
| flow_slug, step_slug, step_type, step_index, deps_count, | ||||||||||
| opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay, | ||||||||||
| condition_pattern, condition_not_pattern, when_unmet, when_failed | ||||||||||
| ) | ||||||||||
| VALUES ( | ||||||||||
| add_step.flow_slug, | ||||||||||
| add_step.step_slug, | ||||||||||
| COALESCE(add_step.step_type, 'single'), | ||||||||||
| next_idx, | ||||||||||
| COALESCE(array_length(add_step.deps_slugs, 1), 0), | ||||||||||
| add_step.max_attempts, | ||||||||||
| add_step.base_delay, | ||||||||||
| add_step.timeout, | ||||||||||
| add_step.start_delay, | ||||||||||
| add_step.condition_pattern, | ||||||||||
| add_step.condition_not_pattern, | ||||||||||
| add_step.when_unmet, | ||||||||||
| add_step.when_failed | ||||||||||
| ) | ||||||||||
| ON CONFLICT ON CONSTRAINT steps_pkey | ||||||||||
| DO UPDATE SET step_slug = EXCLUDED.step_slug | ||||||||||
| RETURNING * INTO result_step; | ||||||||||
|
|
||||||||||
| -- Insert dependencies | ||||||||||
| INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug) | ||||||||||
| SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug | ||||||||||
| FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug) | ||||||||||
| WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0 | ||||||||||
| ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING; | ||||||||||
|
|
||||||||||
| RETURN result_step; | ||||||||||
| END; | ||||||||||
| $$; | ||||||||||
| -- Drop "add_step" function | ||||||||||
| DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer, integer, text); | ||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical NULL handling bug: When
whenUnmetorwhenFailedfields are missing from the JSON shape,v_step->>'whenUnmet'returns NULL. Explicitly passing NULL toadd_step()will attempt to insert NULL into a NOT NULL column, causing a constraint violation.This will fail when processing old shapes or malformed data that don't include these new fields. SQL function defaults only apply when parameters are omitted, not when NULL is explicitly passed.
Spotted by Graphite Agent

Is this helpful? React 👍 or 👎 to let us know.