Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkgs/core/schemas/0050_tables_definitions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ create table pgflow.steps (
opt_base_delay int,
opt_timeout int,
opt_start_delay int,
condition_pattern jsonb, -- JSON pattern for @> containment check (if)
condition_not_pattern jsonb, -- JSON pattern for NOT @> containment check (ifNot)
required_input_pattern jsonb, -- JSON pattern for @> containment check (if)
forbidden_input_pattern jsonb, -- JSON pattern for NOT @> containment check (ifNot)
when_unmet text not null default 'skip', -- What to do when condition not met (skip is natural default)
when_failed text not null default 'fail', -- What to do when handler fails after retries
created_at timestamptz not null default now(),
Expand Down
10 changes: 5 additions & 5 deletions pkgs/core/schemas/0100_function_add_step.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ create or replace function pgflow.add_step(
timeout int default null,
start_delay int default null,
step_type text default 'single',
condition_pattern jsonb default null,
condition_not_pattern jsonb default null,
required_input_pattern jsonb default null,
forbidden_input_pattern jsonb default null,
when_unmet text default 'skip',
when_failed text default 'fail'
)
Expand Down Expand Up @@ -41,7 +41,7 @@ BEGIN
INSERT INTO pgflow.steps (
flow_slug, step_slug, step_type, step_index, deps_count,
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
condition_pattern, condition_not_pattern, when_unmet, when_failed
required_input_pattern, forbidden_input_pattern, when_unmet, when_failed
)
VALUES (
add_step.flow_slug,
Expand All @@ -53,8 +53,8 @@ BEGIN
add_step.base_delay,
add_step.timeout,
add_step.start_delay,
add_step.condition_pattern,
add_step.condition_not_pattern,
add_step.required_input_pattern,
add_step.forbidden_input_pattern,
add_step.when_unmet,
add_step.when_failed
)
Expand Down
44 changes: 22 additions & 22 deletions pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ BEGIN
-- ==========================================
-- Find first step (by topological order) with unmet condition and 'fail' mode.
-- Condition is unmet when:
-- (condition_pattern is set AND input does NOT contain it) OR
-- (condition_not_pattern is set AND input DOES contain it)
-- (required_input_pattern is set AND input does NOT contain it) OR
-- (forbidden_input_pattern is set AND input DOES contain it)
WITH steps_with_conditions AS (
SELECT
step_state.flow_slug,
step_state.step_slug,
step.condition_pattern,
step.condition_not_pattern,
step.required_input_pattern,
step.forbidden_input_pattern,
step.when_unmet,
step.deps_count,
step.step_index
Expand All @@ -65,7 +65,7 @@ BEGIN
WHERE step_state.run_id = cascade_resolve_conditions.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
AND (step.required_input_pattern IS NOT NULL OR step.forbidden_input_pattern IS NOT NULL)
),
step_deps_output AS (
SELECT
Expand All @@ -84,16 +84,16 @@ BEGIN
SELECT
swc.*,
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
(swc.condition_pattern IS NULL OR
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
(swc.required_input_pattern IS NULL OR
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.required_input_pattern)
AND
(swc.condition_not_pattern IS NULL OR
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
(swc.forbidden_input_pattern IS NULL OR
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.forbidden_input_pattern))
AS condition_met
FROM steps_with_conditions swc
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
)
SELECT flow_slug, step_slug, condition_pattern, condition_not_pattern
SELECT flow_slug, step_slug, required_input_pattern, forbidden_input_pattern
INTO v_first_fail
FROM condition_evaluations
WHERE NOT condition_met AND when_unmet = 'fail'
Expand Down Expand Up @@ -128,8 +128,8 @@ BEGIN
SELECT
step_state.flow_slug,
step_state.step_slug,
step.condition_pattern,
step.condition_not_pattern,
step.required_input_pattern,
step.forbidden_input_pattern,
step.when_unmet,
step.deps_count,
step.step_index
Expand All @@ -140,7 +140,7 @@ BEGIN
WHERE step_state.run_id = cascade_resolve_conditions.run_id
AND step_state.status = 'created'
AND step_state.remaining_deps = 0
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
AND (step.required_input_pattern IS NOT NULL OR step.forbidden_input_pattern IS NOT NULL)
),
step_deps_output AS (
SELECT
Expand All @@ -159,11 +159,11 @@ BEGIN
SELECT
swc.*,
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
(swc.condition_pattern IS NULL OR
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
(swc.required_input_pattern IS NULL OR
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.required_input_pattern)
AND
(swc.condition_not_pattern IS NULL OR
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
(swc.forbidden_input_pattern IS NULL OR
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.forbidden_input_pattern))
AS condition_met
FROM steps_with_conditions swc
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
Expand Down Expand Up @@ -244,15 +244,15 @@ BEGIN
WHERE ready_step.run_id = cascade_resolve_conditions.run_id
AND ready_step.status = 'created'
AND ready_step.remaining_deps = 0
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
AND (step.required_input_pattern IS NOT NULL OR step.forbidden_input_pattern IS NOT NULL)
AND step.when_unmet = 'skip-cascade'
-- Condition is NOT met when: (if fails) OR (ifNot fails)
AND NOT (
(step.condition_pattern IS NULL OR
CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_pattern)
(step.required_input_pattern IS NULL OR
CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.required_input_pattern)
AND
(step.condition_not_pattern IS NULL OR
NOT (CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_not_pattern))
(step.forbidden_input_pattern IS NULL OR
NOT (CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.forbidden_input_pattern))
)
ORDER BY step.step_index;

Expand Down
28 changes: 28 additions & 0 deletions pkgs/core/schemas/0100_function_compare_flow_shapes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,34 @@ BEGIN
)
);
END IF;

-- Compare requiredInputPattern (structural - affects DAG execution semantics)
-- Uses -> (jsonb) not ->> (text) to properly compare wrapper objects
IF v_local_step->'requiredInputPattern' IS DISTINCT FROM v_db_step->'requiredInputPattern' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: requiredInputPattern differs '%s' vs '%s'$$,
v_idx,
v_local_step->'requiredInputPattern',
v_db_step->'requiredInputPattern'
)
);
END IF;

-- Compare forbiddenInputPattern (structural - affects DAG execution semantics)
-- Uses -> (jsonb) not ->> (text) to properly compare wrapper objects
IF v_local_step->'forbiddenInputPattern' IS DISTINCT FROM v_db_step->'forbiddenInputPattern' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: forbiddenInputPattern differs '%s' vs '%s'$$,
v_idx,
v_local_step->'forbiddenInputPattern',
v_db_step->'forbiddenInputPattern'
)
);
END IF;
END IF;
END LOOP;

Expand Down
12 changes: 11 additions & 1 deletion pkgs/core/schemas/0100_function_create_flow_from_shape.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,17 @@ BEGIN
start_delay => (v_step_options->>'startDelay')::int,
step_type => v_step->>'stepType',
when_unmet => v_step->>'whenUnmet',
when_failed => v_step->>'whenFailed'
when_failed => v_step->>'whenFailed',
required_input_pattern => CASE
WHEN (v_step->'requiredInputPattern'->>'defined')::boolean
THEN v_step->'requiredInputPattern'->'value'
ELSE NULL
END,
forbidden_input_pattern => CASE
WHEN (v_step->'forbiddenInputPattern'->>'defined')::boolean
THEN v_step->'forbiddenInputPattern'->'value'
ELSE NULL
END
);
END LOOP;
END;
Expand Down
12 changes: 11 additions & 1 deletion pkgs/core/schemas/0100_function_get_flow_shape.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@ as $$
'[]'::jsonb
),
'whenUnmet', step.when_unmet,
'whenFailed', step.when_failed
'whenFailed', step.when_failed,
'requiredInputPattern', CASE
WHEN step.required_input_pattern IS NULL
THEN '{"defined": false}'::jsonb
ELSE jsonb_build_object('defined', true, 'value', step.required_input_pattern)
END,
'forbiddenInputPattern', CASE
WHEN step.forbidden_input_pattern IS NULL
THEN '{"defined": false}'::jsonb
ELSE jsonb_build_object('defined', true, 'value', step.forbidden_input_pattern)
END
)
ORDER BY step.step_index
),
Expand Down
20 changes: 10 additions & 10 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,47 +278,47 @@ export type Database = {
}
steps: {
Row: {
condition_not_pattern: Json | null
condition_pattern: Json | null
created_at: string
deps_count: number
flow_slug: string
forbidden_input_pattern: Json | null
opt_base_delay: number | null
opt_max_attempts: number | null
opt_start_delay: number | null
opt_timeout: number | null
required_input_pattern: Json | null
step_index: number
step_slug: string
step_type: string
when_failed: string
when_unmet: string
}
Insert: {
condition_not_pattern?: Json | null
condition_pattern?: Json | null
created_at?: string
deps_count?: number
flow_slug: string
forbidden_input_pattern?: Json | null
opt_base_delay?: number | null
opt_max_attempts?: number | null
opt_start_delay?: number | null
opt_timeout?: number | null
required_input_pattern?: Json | null
step_index?: number
step_slug: string
step_type?: string
when_failed?: string
when_unmet?: string
}
Update: {
condition_not_pattern?: Json | null
condition_pattern?: Json | null
created_at?: string
deps_count?: number
flow_slug?: string
forbidden_input_pattern?: Json | null
opt_base_delay?: number | null
opt_max_attempts?: number | null
opt_start_delay?: number | null
opt_timeout?: number | null
required_input_pattern?: Json | null
step_index?: number
step_slug?: string
step_type?: string
Expand Down Expand Up @@ -413,11 +413,11 @@ export type Database = {
add_step: {
Args: {
base_delay?: number
condition_not_pattern?: Json
condition_pattern?: Json
deps_slugs?: string[]
flow_slug: string
forbidden_input_pattern?: Json
max_attempts?: number
required_input_pattern?: Json
start_delay?: number
step_slug: string
step_type?: string
Expand All @@ -426,15 +426,15 @@ export type Database = {
when_unmet?: string
}
Returns: {
condition_not_pattern: Json | null
condition_pattern: Json | null
created_at: string
deps_count: number
flow_slug: string
forbidden_input_pattern: Json | null
opt_base_delay: number | null
opt_max_attempts: number | null
opt_start_delay: number | null
opt_timeout: number | null
required_input_pattern: Json | null
step_index: number
step_slug: string
step_type: string
Expand Down
Loading
Loading