Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pkgs/core/schemas/0100_function_compare_flow_shapes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,32 @@ BEGIN
)
);
END IF;

-- Compare whenUnmet (structural - affects DAG execution semantics)
IF v_local_step->>'whenUnmet' != v_db_step->>'whenUnmet' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: whenUnmet differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'whenUnmet',
v_db_step->>'whenUnmet'
)
);
END IF;

-- Compare whenFailed (structural - affects DAG execution semantics)
IF v_local_step->>'whenFailed' != v_db_step->>'whenFailed' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: whenFailed differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'whenFailed',
v_db_step->>'whenFailed'
)
);
END IF;
END IF;
END LOOP;

Expand Down
4 changes: 3 additions & 1 deletion pkgs/core/schemas/0100_function_create_flow_from_shape.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ BEGIN
base_delay => (v_step_options->>'baseDelay')::int,
timeout => (v_step_options->>'timeout')::int,
start_delay => (v_step_options->>'startDelay')::int,
step_type => v_step->>'stepType'
step_type => v_step->>'stepType',
when_unmet => v_step->>'whenUnmet',
when_failed => v_step->>'whenFailed'
Comment on lines +51 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical NULL handling bug: When whenUnmet or whenFailed fields are missing from the JSON shape, v_step->>'whenUnmet' returns NULL. Explicitly passing NULL to add_step() will attempt to insert NULL into a NOT NULL column, causing a constraint violation.

-- Fix by using COALESCE to apply defaults:
when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'),
when_failed => COALESCE(v_step->>'whenFailed', 'fail')

This will fail when processing old shapes or malformed data that don't include these new fields. SQL function defaults only apply when parameters are omitted, not when NULL is explicitly passed.

Suggested change
when_unmet => v_step->>'whenUnmet',
when_failed => v_step->>'whenFailed'
when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'),
when_failed => COALESCE(v_step->>'whenFailed', 'fail')

Spotted by Graphite Agent

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

);
END LOOP;
END;
Expand Down
4 changes: 3 additions & 1 deletion pkgs/core/schemas/0100_function_get_flow_shape.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ as $$
AND dep.step_slug = step.step_slug
),
'[]'::jsonb
)
),
'whenUnmet', step.when_unmet,
'whenFailed', step.when_failed
)
ORDER BY step.step_index
),
Expand Down
321 changes: 264 additions & 57 deletions pkgs/core/supabase/migrations/20260108131350_pgflow_step_conditions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,270 @@ END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipp
CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text);
-- Modify "steps" table
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "condition_not_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail';
-- Modify "_compare_flow_shapes" function
CREATE OR REPLACE FUNCTION "pgflow"."_compare_flow_shapes" ("p_local" jsonb, "p_db" jsonb) RETURNS text[] LANGUAGE plpgsql STABLE SET "search_path" = '' AS $BODY$
DECLARE
v_differences text[] := '{}';
v_local_steps jsonb;
v_db_steps jsonb;
v_local_count int;
v_db_count int;
v_max_count int;
v_idx int;
v_local_step jsonb;
v_db_step jsonb;
v_local_deps text;
v_db_deps text;
BEGIN
v_local_steps := p_local->'steps';
v_db_steps := p_db->'steps';
v_local_count := jsonb_array_length(COALESCE(v_local_steps, '[]'::jsonb));
v_db_count := jsonb_array_length(COALESCE(v_db_steps, '[]'::jsonb));

-- Compare step counts
IF v_local_count != v_db_count THEN
v_differences := array_append(
v_differences,
format('Step count differs: %s vs %s', v_local_count, v_db_count)
);
END IF;

-- Compare steps by index
v_max_count := GREATEST(v_local_count, v_db_count);

FOR v_idx IN 0..(v_max_count - 1) LOOP
v_local_step := v_local_steps->v_idx;
v_db_step := v_db_steps->v_idx;

IF v_local_step IS NULL THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: missing in first shape (second has '%s')$$,
v_idx,
v_db_step->>'slug'
)
);
ELSIF v_db_step IS NULL THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: missing in second shape (first has '%s')$$,
v_idx,
v_local_step->>'slug'
)
);
ELSE
-- Compare slug
IF v_local_step->>'slug' != v_db_step->>'slug' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: slug differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'slug',
v_db_step->>'slug'
)
);
END IF;

-- Compare step type
IF v_local_step->>'stepType' != v_db_step->>'stepType' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: type differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'stepType',
v_db_step->>'stepType'
)
);
END IF;

-- Compare dependencies (convert arrays to comma-separated strings)
SELECT string_agg(dep, ', ' ORDER BY dep)
INTO v_local_deps
FROM jsonb_array_elements_text(COALESCE(v_local_step->'dependencies', '[]'::jsonb)) AS dep;

SELECT string_agg(dep, ', ' ORDER BY dep)
INTO v_db_deps
FROM jsonb_array_elements_text(COALESCE(v_db_step->'dependencies', '[]'::jsonb)) AS dep;

IF COALESCE(v_local_deps, '') != COALESCE(v_db_deps, '') THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: dependencies differ [%s] vs [%s]$$,
v_idx,
COALESCE(v_local_deps, ''),
COALESCE(v_db_deps, '')
)
);
END IF;

-- Compare whenUnmet (structural - affects DAG execution semantics)
IF v_local_step->>'whenUnmet' != v_db_step->>'whenUnmet' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: whenUnmet differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'whenUnmet',
v_db_step->>'whenUnmet'
)
);
END IF;

-- Compare whenFailed (structural - affects DAG execution semantics)
IF v_local_step->>'whenFailed' != v_db_step->>'whenFailed' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: whenFailed differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'whenFailed',
v_db_step->>'whenFailed'
)
);
END IF;
END IF;
END LOOP;

RETURN v_differences;
END;
$BODY$;
-- Create "add_step" function
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "condition_not_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
result_step pgflow.steps;
next_idx int;
BEGIN
-- Validate map step constraints
-- Map steps can have either:
-- 0 dependencies (root map - maps over flow input array)
-- 1 dependency (dependent map - maps over dependency output array)
IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
add_step.step_slug,
COALESCE(array_length(add_step.deps_slugs, 1), 0),
array_to_string(add_step.deps_slugs, ', ');
END IF;

-- Get next step index
SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx
FROM pgflow.steps s
WHERE s.flow_slug = add_step.flow_slug;

-- Create the step
INSERT INTO pgflow.steps (
flow_slug, step_slug, step_type, step_index, deps_count,
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
condition_pattern, condition_not_pattern, when_unmet, when_failed
)
VALUES (
add_step.flow_slug,
add_step.step_slug,
COALESCE(add_step.step_type, 'single'),
next_idx,
COALESCE(array_length(add_step.deps_slugs, 1), 0),
add_step.max_attempts,
add_step.base_delay,
add_step.timeout,
add_step.start_delay,
add_step.condition_pattern,
add_step.condition_not_pattern,
add_step.when_unmet,
add_step.when_failed
)
ON CONFLICT ON CONSTRAINT steps_pkey
DO UPDATE SET step_slug = EXCLUDED.step_slug
RETURNING * INTO result_step;

-- Insert dependencies
INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug)
SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug
FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug)
WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0
ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING;

RETURN result_step;
END;
$$;
-- Modify "_create_flow_from_shape" function
CREATE OR REPLACE FUNCTION "pgflow"."_create_flow_from_shape" ("p_flow_slug" text, "p_shape" jsonb) RETURNS void LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
v_step jsonb;
v_deps text[];
v_flow_options jsonb;
v_step_options jsonb;
BEGIN
-- Extract flow-level options (may be null)
v_flow_options := p_shape->'options';

-- Create the flow with options (NULL = use default)
PERFORM pgflow.create_flow(
p_flow_slug,
(v_flow_options->>'maxAttempts')::int,
(v_flow_options->>'baseDelay')::int,
(v_flow_options->>'timeout')::int
);

-- Iterate over steps in order and add each one
FOR v_step IN SELECT * FROM jsonb_array_elements(p_shape->'steps')
LOOP
-- Convert dependencies jsonb array to text array
SELECT COALESCE(array_agg(dep), '{}')
INTO v_deps
FROM jsonb_array_elements_text(COALESCE(v_step->'dependencies', '[]'::jsonb)) AS dep;

-- Extract step options (may be null)
v_step_options := v_step->'options';

-- Add the step with options (NULL = use default/inherit)
PERFORM pgflow.add_step(
flow_slug => p_flow_slug,
step_slug => v_step->>'slug',
deps_slugs => v_deps,
max_attempts => (v_step_options->>'maxAttempts')::int,
base_delay => (v_step_options->>'baseDelay')::int,
timeout => (v_step_options->>'timeout')::int,
start_delay => (v_step_options->>'startDelay')::int,
step_type => v_step->>'stepType',
when_unmet => v_step->>'whenUnmet',
when_failed => v_step->>'whenFailed'
Comment on lines +248 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical NULL handling bug: Same issue as in 0100_function_create_flow_from_shape.sql. When JSON fields are missing, v_step->>'whenUnmet' returns NULL. Explicitly passing NULL bypasses the parameter defaults and will violate the NOT NULL constraint on the when_unmet and when_failed columns.

-- Fix by using COALESCE:
when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'),
when_failed => COALESCE(v_step->>'whenFailed', 'fail')

This creates a production risk during deployment or when processing legacy shapes.

Suggested change
when_unmet => v_step->>'whenUnmet',
when_failed => v_step->>'whenFailed'
when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'),
when_failed => COALESCE(v_step->>'whenFailed', 'fail')

Spotted by Graphite Agent

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

);
END LOOP;
END;
$$;
-- Modify "_get_flow_shape" function
CREATE OR REPLACE FUNCTION "pgflow"."_get_flow_shape" ("p_flow_slug" text) RETURNS jsonb LANGUAGE sql STABLE SET "search_path" = '' AS $$
SELECT jsonb_build_object(
'steps',
COALESCE(
jsonb_agg(
jsonb_build_object(
'slug', step.step_slug,
'stepType', step.step_type,
'dependencies', COALESCE(
(
SELECT jsonb_agg(dep.dep_slug ORDER BY dep.dep_slug)
FROM pgflow.deps AS dep
WHERE dep.flow_slug = step.flow_slug
AND dep.step_slug = step.step_slug
),
'[]'::jsonb
),
'whenUnmet', step.when_unmet,
'whenFailed', step.when_failed
)
ORDER BY step.step_index
),
'[]'::jsonb
)
)
FROM pgflow.steps AS step
WHERE step.flow_slug = p_flow_slug;
$$;
-- Create "_cascade_force_skip_steps" function
CREATE FUNCTION "pgflow"."_cascade_force_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$
DECLARE
Expand Down Expand Up @@ -1454,62 +1718,5 @@ with tasks as (
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
$$;
-- Create "add_step" function
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "condition_not_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$
DECLARE
result_step pgflow.steps;
next_idx int;
BEGIN
-- Validate map step constraints
-- Map steps can have either:
-- 0 dependencies (root map - maps over flow input array)
-- 1 dependency (dependent map - maps over dependency output array)
IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN
RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %',
add_step.step_slug,
COALESCE(array_length(add_step.deps_slugs, 1), 0),
array_to_string(add_step.deps_slugs, ', ');
END IF;

-- Get next step index
SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx
FROM pgflow.steps s
WHERE s.flow_slug = add_step.flow_slug;

-- Create the step
INSERT INTO pgflow.steps (
flow_slug, step_slug, step_type, step_index, deps_count,
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
condition_pattern, condition_not_pattern, when_unmet, when_failed
)
VALUES (
add_step.flow_slug,
add_step.step_slug,
COALESCE(add_step.step_type, 'single'),
next_idx,
COALESCE(array_length(add_step.deps_slugs, 1), 0),
add_step.max_attempts,
add_step.base_delay,
add_step.timeout,
add_step.start_delay,
add_step.condition_pattern,
add_step.condition_not_pattern,
add_step.when_unmet,
add_step.when_failed
)
ON CONFLICT ON CONSTRAINT steps_pkey
DO UPDATE SET step_slug = EXCLUDED.step_slug
RETURNING * INTO result_step;

-- Insert dependencies
INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug)
SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug
FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug)
WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0
ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING;

RETURN result_step;
END;
$$;
-- Drop "add_step" function
DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer, integer, text);
Loading