From 9083fa2b2078af81e1f93c1b747b94471ddc769a Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Mon, 12 Jan 2026 09:51:22 +0100 Subject: [PATCH] feat: add requeue tracking columns and implement stalled task requeue logic - Introduced requeued_count and last_requeued_at columns to step_tasks table - Developed requeue_stalled_tasks function to requeue or fail stalled tasks based on max requeues - Created setup_requeue_stalled_tasks_cron function to schedule automatic requeue checks - Updated migration scripts to include new columns and functions - Added comprehensive tests for requeue behavior, max requeue limit, and cron setup --- .changeset/requeue-stalled-tasks.md | 6 + pkgs/core/schemas/0060_tables_runtime.sql | 3 + .../0062_function_requeue_stalled_tasks.sql | 119 ++++++++++++++++++ ...112094903_pgflow_requeue_stalled_tasks.sql | 109 ++++++++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- .../basic_requeue.test.sql | 109 ++++++++++++++++ .../requeue_stalled_tasks/cron_setup.test.sql | 54 ++++++++ .../max_requeue_limit.test.sql | 104 +++++++++++++++ .../multiple_flows.test.sql | 88 +++++++++++++ pkgs/edge-worker/src/flow/createFlowWorker.ts | 35 ++++-- 10 files changed, 619 insertions(+), 11 deletions(-) create mode 100644 .changeset/requeue-stalled-tasks.md create mode 100644 pkgs/core/schemas/0062_function_requeue_stalled_tasks.sql create mode 100644 pkgs/core/supabase/migrations/20260112094903_pgflow_requeue_stalled_tasks.sql create mode 100644 pkgs/core/supabase/tests/requeue_stalled_tasks/basic_requeue.test.sql create mode 100644 pkgs/core/supabase/tests/requeue_stalled_tasks/cron_setup.test.sql create mode 100644 pkgs/core/supabase/tests/requeue_stalled_tasks/max_requeue_limit.test.sql create mode 100644 pkgs/core/supabase/tests/requeue_stalled_tasks/multiple_flows.test.sql diff --git a/.changeset/requeue-stalled-tasks.md b/.changeset/requeue-stalled-tasks.md new file mode 100644 index 000000000..8dc48612d --- /dev/null +++ b/.changeset/requeue-stalled-tasks.md @@ -0,0 +1,6 @@ +--- +'@pgflow/core': patch +'@pgflow/edge-worker': patch +--- + +Add automatic requeue for stalled tasks via cron job - tasks stuck beyond timeout+30s are requeued up to 3 times, then archived with status left as 'started' for easy identification (closes #586) diff --git a/pkgs/core/schemas/0060_tables_runtime.sql b/pkgs/core/schemas/0060_tables_runtime.sql index 7a408410c..8dea60f89 100644 --- a/pkgs/core/schemas/0060_tables_runtime.sql +++ b/pkgs/core/schemas/0060_tables_runtime.sql @@ -81,6 +81,9 @@ create table pgflow.step_tasks ( completed_at timestamptz, failed_at timestamptz, last_worker_id uuid references pgflow.workers (worker_id) on delete set null, + -- Requeue tracking columns + requeued_count int not null default 0, + last_requeued_at timestamptz, constraint step_tasks_pkey primary key (run_id, step_slug, task_index), foreign key (run_id, step_slug) references pgflow.step_states (run_id, step_slug), diff --git a/pkgs/core/schemas/0062_function_requeue_stalled_tasks.sql b/pkgs/core/schemas/0062_function_requeue_stalled_tasks.sql new file mode 100644 index 000000000..feca61373 --- /dev/null +++ b/pkgs/core/schemas/0062_function_requeue_stalled_tasks.sql @@ -0,0 +1,119 @@ +-- Requeue stalled tasks that have been in 'started' status longer than their timeout + 30s buffer +-- This handles tasks that got stuck when workers crashed without completing them +create or replace function pgflow.requeue_stalled_tasks() +returns int +language plpgsql +security definer +set search_path = '' +as $$ +declare + result_count int := 0; + max_requeues constant int := 3; +begin + -- Find and requeue stalled tasks (where started_at > timeout + 30s buffer) + -- Tasks with requeued_count >= max_requeues will have their message archived + -- but status left as 'started' for easy identification via requeued_count column + with stalled_tasks as ( + select + st.run_id, + st.step_slug, + st.task_index, + st.message_id, + r.flow_slug, + st.requeued_count, + f.opt_timeout + from pgflow.step_tasks st + join pgflow.runs r on r.run_id = st.run_id + join pgflow.flows f on f.flow_slug = r.flow_slug + where st.status = 'started' + and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds' + for update of st skip locked + ), + -- Separate tasks that can be requeued from those that exceeded max requeues + to_requeue as ( + select * from stalled_tasks where requeued_count < max_requeues + ), + to_archive as ( + select * from stalled_tasks where requeued_count >= max_requeues + ), + -- Update tasks that will be requeued + requeued as ( + update pgflow.step_tasks st + set + status = 'queued', + started_at = null, + last_worker_id = null, + requeued_count = st.requeued_count + 1, + last_requeued_at = now() + from to_requeue tr + where st.run_id = tr.run_id + and st.step_slug = tr.step_slug + and st.task_index = tr.task_index + returning tr.flow_slug as queue_name, tr.message_id + ), + -- Make requeued messages visible immediately (batched per queue) + visibility_reset as ( + select pgflow.set_vt_batch( + r.queue_name, + array_agg(r.message_id), + array_agg(0) -- all offsets are 0 (immediate visibility) + ) + from requeued r + where r.message_id is not null + group by r.queue_name + ), + -- Archive messages for tasks that exceeded max requeues (batched per queue) + -- Task status remains 'started' with requeued_count >= 3 for easy identification + archived as ( + select pgmq.archive(ta.flow_slug, array_agg(ta.message_id)) + from to_archive ta + where ta.message_id is not null + group by ta.flow_slug + ), + -- Force execution of visibility_reset CTE + _vr as (select count(*) from visibility_reset), + -- Force execution of archived CTE + _ar as (select count(*) from archived) + select count(*) into result_count + from requeued, _vr, _ar; + + return result_count; +end; +$$; + +-- Cron setup function for automatic requeue monitoring +create or replace function pgflow.setup_requeue_stalled_tasks_cron( + cron_interval text default '15 seconds' +) +returns text +language plpgsql +security definer +set search_path = pgflow, cron, pg_temp +as $$ +declare + job_id bigint; +begin + -- Remove existing job if any + begin + perform cron.unschedule('pgflow_requeue_stalled_tasks'); + exception + when others then null; + end; + + -- Schedule the new job + job_id := cron.schedule( + job_name => 'pgflow_requeue_stalled_tasks', + schedule => setup_requeue_stalled_tasks_cron.cron_interval, + command => 'select pgflow.requeue_stalled_tasks()' + ); + + return format('Scheduled pgflow_requeue_stalled_tasks (every %s, job_id=%s)', + setup_requeue_stalled_tasks_cron.cron_interval, job_id); +end; +$$; + +comment on function pgflow.setup_requeue_stalled_tasks_cron(text) is +'Sets up cron job to automatically requeue stalled tasks. +Schedules pgflow_requeue_stalled_tasks at the specified cron_interval (default: 15 seconds). +Replaces existing job if it exists (idempotent). +Returns a confirmation message with job ID.'; diff --git a/pkgs/core/supabase/migrations/20260112094903_pgflow_requeue_stalled_tasks.sql b/pkgs/core/supabase/migrations/20260112094903_pgflow_requeue_stalled_tasks.sql new file mode 100644 index 000000000..154173026 --- /dev/null +++ b/pkgs/core/supabase/migrations/20260112094903_pgflow_requeue_stalled_tasks.sql @@ -0,0 +1,109 @@ +-- Modify "step_tasks" table +ALTER TABLE "pgflow"."step_tasks" ADD COLUMN "requeued_count" integer NOT NULL DEFAULT 0, ADD COLUMN "last_requeued_at" timestamptz NULL; +-- Create "requeue_stalled_tasks" function +CREATE FUNCTION "pgflow"."requeue_stalled_tasks" () RETURNS integer LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = '' AS $$ +declare + result_count int := 0; + max_requeues constant int := 3; +begin + -- Find and requeue stalled tasks (where started_at > timeout + 30s buffer) + -- Tasks with requeued_count >= max_requeues will have their message archived + -- but status left as 'started' for easy identification via requeued_count column + with stalled_tasks as ( + select + st.run_id, + st.step_slug, + st.task_index, + st.message_id, + r.flow_slug, + st.requeued_count, + f.opt_timeout + from pgflow.step_tasks st + join pgflow.runs r on r.run_id = st.run_id + join pgflow.flows f on f.flow_slug = r.flow_slug + where st.status = 'started' + and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds' + for update of st skip locked + ), + -- Separate tasks that can be requeued from those that exceeded max requeues + to_requeue as ( + select * from stalled_tasks where requeued_count < max_requeues + ), + to_archive as ( + select * from stalled_tasks where requeued_count >= max_requeues + ), + -- Update tasks that will be requeued + requeued as ( + update pgflow.step_tasks st + set + status = 'queued', + started_at = null, + last_worker_id = null, + requeued_count = st.requeued_count + 1, + last_requeued_at = now() + from to_requeue tr + where st.run_id = tr.run_id + and st.step_slug = tr.step_slug + and st.task_index = tr.task_index + returning tr.flow_slug as queue_name, tr.message_id + ), + -- Make requeued messages visible immediately (batched per queue) + visibility_reset as ( + select pgflow.set_vt_batch( + r.queue_name, + array_agg(r.message_id), + array_agg(0) -- all offsets are 0 (immediate visibility) + ) + from requeued r + where r.message_id is not null + group by r.queue_name + ), + -- Archive messages for tasks that exceeded max requeues (batched per queue) + -- Task status remains 'started' with requeued_count >= 3 for easy identification + archived as ( + select pgmq.archive(ta.flow_slug, array_agg(ta.message_id)) + from to_archive ta + where ta.message_id is not null + group by ta.flow_slug + ), + -- Force execution of visibility_reset CTE + _vr as (select count(*) from visibility_reset), + -- Force execution of archived CTE + _ar as (select count(*) from archived) + select count(*) into result_count + from requeued, _vr, _ar; + + return result_count; +end; +$$; +-- Create "setup_requeue_stalled_tasks_cron" function +CREATE FUNCTION "pgflow"."setup_requeue_stalled_tasks_cron" ("cron_interval" text DEFAULT '15 seconds') RETURNS text LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = pgflow, cron, pg_temp AS $$ +declare + job_id bigint; +begin + -- Remove existing job if any + begin + perform cron.unschedule('pgflow_requeue_stalled_tasks'); + exception + when others then null; + end; + + -- Schedule the new job + job_id := cron.schedule( + job_name => 'pgflow_requeue_stalled_tasks', + schedule => setup_requeue_stalled_tasks_cron.cron_interval, + command => 'select pgflow.requeue_stalled_tasks()' + ); + + return format('Scheduled pgflow_requeue_stalled_tasks (every %s, job_id=%s)', + setup_requeue_stalled_tasks_cron.cron_interval, job_id); +end; +$$; +-- Set comment to function: "setup_requeue_stalled_tasks_cron" +COMMENT ON FUNCTION "pgflow"."setup_requeue_stalled_tasks_cron" IS 'Sets up cron job to automatically requeue stalled tasks. +Schedules pgflow_requeue_stalled_tasks at the specified cron_interval (default: 15 seconds). +Replaces existing job if it exists (idempotent). +Returns a confirmation message with job ID.'; + +-- Setup cron job to automatically requeue stalled tasks (every 15 seconds) +SELECT pgflow.setup_requeue_stalled_tasks_cron('15 seconds'); diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index c0881d482..41fe40a41 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g= +h1:DGjqEwXpbrpLbFLXmBEHQabXBHvwznaMtUYaXEdYQ9k= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -16,3 +16,4 @@ h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g= 20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ= 20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc= 20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o= +20260112094903_pgflow_requeue_stalled_tasks.sql h1:xDkh3LSMke9gG7Gd37D1EpNmzmjFuPzeY1VNWmEOlz4= diff --git a/pkgs/core/supabase/tests/requeue_stalled_tasks/basic_requeue.test.sql b/pkgs/core/supabase/tests/requeue_stalled_tasks/basic_requeue.test.sql new file mode 100644 index 000000000..be0cb745f --- /dev/null +++ b/pkgs/core/supabase/tests/requeue_stalled_tasks/basic_requeue.test.sql @@ -0,0 +1,109 @@ +-- Test: Basic requeue functionality for stalled tasks +begin; +select plan(12); + +select pgflow_tests.reset_db(); + +-- Create a flow with timeout of 5 seconds +select pgflow.create_flow('test_flow', null, null, 5); +select pgflow.add_step('test_flow', 'step_a'); + +-- Start a flow run +select pgflow.start_flow('test_flow', '{"input": "test"}'::jsonb); + +-- Ensure worker and read+start a task +select pgflow_tests.ensure_worker('test_flow'); +select pgflow_tests.read_and_start('test_flow', 30, 1); + +-- Test 1: Task is initially in 'started' status +select is( + (select status from pgflow.step_tasks where step_slug = 'step_a' limit 1), + 'started', + 'Task should be in started status initially' +); + +-- Test 2: requeue_stalled_tasks returns 0 when no tasks are stalled (within timeout) +select is( + pgflow.requeue_stalled_tasks(), + 0, + 'Should return 0 when no tasks are stalled yet' +); + +-- Test 3: Task still started (not stalled yet - within timeout + 30s buffer) +select is( + (select status from pgflow.step_tasks where step_slug = 'step_a' limit 1), + 'started', + 'Task should remain started when within timeout window' +); + +-- Simulate a stalled task by backdating timestamps to timeout + 31 seconds ago +-- Must also backdate queued_at to satisfy started_at >= queued_at constraint +update pgflow.step_tasks +set + queued_at = now() - interval '40 seconds', + started_at = now() - interval '36 seconds' +where step_slug = 'step_a'; + +-- Test 4: requeue_stalled_tasks returns 1 when task is stalled +select is( + pgflow.requeue_stalled_tasks(), + 1, + 'Should return 1 when one task is stalled' +); + +-- Test 5: Task is now back to 'queued' status +select is( + (select status from pgflow.step_tasks where step_slug = 'step_a' limit 1), + 'queued', + 'Stalled task should be requeued to queued status' +); + +-- Test 6: requeued_count is incremented +select is( + (select requeued_count from pgflow.step_tasks where step_slug = 'step_a' limit 1), + 1, + 'requeued_count should be 1 after first requeue' +); + +-- Test 7: last_requeued_at is set +select ok( + (select last_requeued_at is not null from pgflow.step_tasks where step_slug = 'step_a' limit 1), + 'last_requeued_at should be set after requeue' +); + +-- Test 8: started_at is cleared +select is( + (select started_at from pgflow.step_tasks where step_slug = 'step_a' limit 1), + null, + 'started_at should be cleared after requeue' +); + +-- Test 9: attempts_count is NOT reset (task will retry) +select is( + (select attempts_count from pgflow.step_tasks where step_slug = 'step_a' limit 1), + 1, + 'attempts_count should remain unchanged after requeue' +); + +-- Test 10: Calling again returns 0 (task no longer stalled) +select is( + pgflow.requeue_stalled_tasks(), + 0, + 'Should return 0 when called again with no stalled tasks' +); + +-- Test 11: Message is visible in queue again (can be read) +select ok( + (select count(*) > 0 from pgmq.read('test_flow', 0, 1)), + 'Message should be readable from the queue after requeue' +); + +-- Test 12: last_worker_id is cleared +select is( + (select last_worker_id from pgflow.step_tasks where step_slug = 'step_a' limit 1), + null, + 'last_worker_id should be cleared after requeue' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/requeue_stalled_tasks/cron_setup.test.sql b/pkgs/core/supabase/tests/requeue_stalled_tasks/cron_setup.test.sql new file mode 100644 index 000000000..c469ae01e --- /dev/null +++ b/pkgs/core/supabase/tests/requeue_stalled_tasks/cron_setup.test.sql @@ -0,0 +1,54 @@ +-- Test: setup_requeue_stalled_tasks_cron() function +begin; +select plan(6); + +select pgflow_tests.reset_db(); + +-- Test 1: Function exists +select has_function( + 'pgflow', + 'setup_requeue_stalled_tasks_cron', + array['text'], + 'setup_requeue_stalled_tasks_cron(text) should exist' +); + +-- Test 2: Default call with no arguments creates cron job +select ok( + pgflow.setup_requeue_stalled_tasks_cron() is not null, + 'setup_requeue_stalled_tasks_cron() should return confirmation message' +); + +-- Test 3: Cron job is created with correct name +select ok( + exists( + select 1 from cron.job + where jobname = 'pgflow_requeue_stalled_tasks' + ), + 'Cron job pgflow_requeue_stalled_tasks should be created' +); + +-- Test 4: Default schedule is every 15 seconds +select is( + (select schedule from cron.job where jobname = 'pgflow_requeue_stalled_tasks'), + '15 seconds', + 'Default schedule should be 15 seconds' +); + +-- Test 5: Calling with custom interval updates the schedule +select pgflow.setup_requeue_stalled_tasks_cron('30 seconds'); + +select is( + (select schedule from cron.job where jobname = 'pgflow_requeue_stalled_tasks'), + '30 seconds', + 'Schedule should be updated to 30 seconds' +); + +-- Test 6: Job command calls requeue_stalled_tasks +select ok( + (select command from cron.job where jobname = 'pgflow_requeue_stalled_tasks') + like '%requeue_stalled_tasks%', + 'Cron job should call requeue_stalled_tasks' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/requeue_stalled_tasks/max_requeue_limit.test.sql b/pkgs/core/supabase/tests/requeue_stalled_tasks/max_requeue_limit.test.sql new file mode 100644 index 000000000..048eb479c --- /dev/null +++ b/pkgs/core/supabase/tests/requeue_stalled_tasks/max_requeue_limit.test.sql @@ -0,0 +1,104 @@ +-- Test: Max requeue limit (3 requeues then archive) +begin; +select plan(8); + +select pgflow_tests.reset_db(); + +-- Create a flow with timeout of 5 seconds +select pgflow.create_flow('test_flow', null, null, 5); +select pgflow.add_step('test_flow', 'step_a'); + +-- Start a flow run +select pgflow.start_flow('test_flow', '{"input": "test"}'::jsonb); + +-- Ensure worker and read+start a task +select pgflow_tests.ensure_worker('test_flow'); +select pgflow_tests.read_and_start('test_flow', 30, 1); + +-- Test 1-3: Requeue 3 times successfully +-- First requeue +update pgflow.step_tasks +set + queued_at = now() - interval '40 seconds', + started_at = now() - interval '36 seconds' +where step_slug = 'step_a'; + +select is( + pgflow.requeue_stalled_tasks(), + 1, + 'First requeue should succeed' +); + +-- Start again and make it stalled for second requeue +select pgflow_tests.read_and_start('test_flow', 30, 1); +update pgflow.step_tasks +set + queued_at = now() - interval '40 seconds', + started_at = now() - interval '36 seconds' +where step_slug = 'step_a'; + +select is( + pgflow.requeue_stalled_tasks(), + 1, + 'Second requeue should succeed' +); + +-- Start again and make it stalled for third requeue +select pgflow_tests.read_and_start('test_flow', 30, 1); +update pgflow.step_tasks +set + queued_at = now() - interval '40 seconds', + started_at = now() - interval '36 seconds' +where step_slug = 'step_a'; + +select is( + pgflow.requeue_stalled_tasks(), + 1, + 'Third requeue should succeed' +); + +-- Test 4: requeued_count is now 3 +select is( + (select requeued_count from pgflow.step_tasks where step_slug = 'step_a' limit 1), + 3, + 'requeued_count should be 3 after three requeues' +); + +-- Start again and make it stalled for fourth attempt +select pgflow_tests.read_and_start('test_flow', 30, 1); +update pgflow.step_tasks +set + queued_at = now() - interval '40 seconds', + started_at = now() - interval '36 seconds' +where step_slug = 'step_a'; + +-- Test 5: Fourth requeue attempt should archive instead (returns 0 requeued) +select is( + pgflow.requeue_stalled_tasks(), + 0, + 'Fourth requeue attempt should return 0 (task archived, not requeued)' +); + +-- Test 6: Task should still be in 'started' status (not failed) +select is( + (select status from pgflow.step_tasks where step_slug = 'step_a' limit 1), + 'started', + 'Task should remain started after max requeues (easy to identify via requeued_count)' +); + +-- Test 7: requeued_count should still be 3 (not incremented on archive) +select is( + (select requeued_count from pgflow.step_tasks where step_slug = 'step_a' limit 1), + 3, + 'requeued_count should remain 3 after archive' +); + +-- Test 8: Message should be archived from queue (not readable) +select is( + (select count(*)::int from pgmq.read('test_flow', 0, 10)), + 0, + 'Message should be archived from the queue' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/requeue_stalled_tasks/multiple_flows.test.sql b/pkgs/core/supabase/tests/requeue_stalled_tasks/multiple_flows.test.sql new file mode 100644 index 000000000..84100ff1a --- /dev/null +++ b/pkgs/core/supabase/tests/requeue_stalled_tasks/multiple_flows.test.sql @@ -0,0 +1,88 @@ +-- Test: Requeue handles multiple flows with different timeouts +begin; +select plan(6); + +select pgflow_tests.reset_db(); + +-- Create two flows with different timeouts +select pgflow.create_flow('fast_flow', null, null, 5); -- 5 second timeout +select pgflow.add_step('fast_flow', 'step_a'); + +select pgflow.create_flow('slow_flow', null, null, 60); -- 60 second timeout +select pgflow.add_step('slow_flow', 'step_a'); + +-- Start runs for both flows +select pgflow.start_flow('fast_flow', '{"input": "fast"}'::jsonb); +select pgflow.start_flow('slow_flow', '{"input": "slow"}'::jsonb); + +-- Ensure workers and read+start tasks for both flows +select pgflow_tests.ensure_worker('fast_flow'); +select pgflow_tests.ensure_worker('slow_flow'); +select pgflow_tests.read_and_start('fast_flow', 30, 1); +select pgflow_tests.read_and_start('slow_flow', 30, 1); + +-- Test 1: Both tasks are in 'started' status +select is( + (select count(*)::int from pgflow.step_tasks where status = 'started'), + 2, + 'Both tasks should be in started status' +); + +-- Test 2: No stalled tasks yet +select is( + pgflow.requeue_stalled_tasks(), + 0, + 'No tasks stalled yet' +); + +-- Backdate fast_flow task to be stalled (timeout 5s + 30s buffer = 35s, so 36s is stalled) +update pgflow.step_tasks +set + queued_at = now() - interval '40 seconds', + started_at = now() - interval '36 seconds' +where flow_slug = 'fast_flow'; + +-- Backdate slow_flow task to 36s ago (not stalled: timeout 60s + 30s = 90s) +update pgflow.step_tasks +set + queued_at = now() - interval '40 seconds', + started_at = now() - interval '36 seconds' +where flow_slug = 'slow_flow'; + +-- Test 3: Only fast_flow task is requeued +select is( + pgflow.requeue_stalled_tasks(), + 1, + 'Only fast_flow task should be requeued' +); + +-- Test 4: fast_flow task is queued +select is( + (select status from pgflow.step_tasks where flow_slug = 'fast_flow' limit 1), + 'queued', + 'fast_flow task should be requeued' +); + +-- Test 5: slow_flow task is still started +select is( + (select status from pgflow.step_tasks where flow_slug = 'slow_flow' limit 1), + 'started', + 'slow_flow task should still be started (not past its timeout)' +); + +-- Now backdate slow_flow to exceed its timeout (60s + 30s = 90s, so 91s is stalled) +update pgflow.step_tasks +set + queued_at = now() - interval '95 seconds', + started_at = now() - interval '91 seconds' +where flow_slug = 'slow_flow'; + +-- Test 6: slow_flow task is now requeued +select is( + pgflow.requeue_stalled_tasks(), + 1, + 'slow_flow task should now be requeued' +); + +select finish(); +rollback; diff --git a/pkgs/edge-worker/src/flow/createFlowWorker.ts b/pkgs/edge-worker/src/flow/createFlowWorker.ts index 950ad2c2d..9f53b9572 100644 --- a/pkgs/edge-worker/src/flow/createFlowWorker.ts +++ b/pkgs/edge-worker/src/flow/createFlowWorker.ts @@ -7,13 +7,19 @@ import { PgflowSqlClient } from '@pgflow/core'; import { Queries } from '../core/Queries.js'; import type { IExecutor } from '../core/types.js'; import type { Logger, PlatformAdapter } from '../platform/types.js'; -import type { StepTaskWithMessage, StepTaskHandlerContext } from '../core/context.js'; +import type { + StepTaskWithMessage, + StepTaskHandlerContext, +} from '../core/context.js'; import { createContextSafeConfig } from '../core/context.js'; import { Worker } from '../core/Worker.js'; import postgres from 'postgres'; import { FlowWorkerLifecycle } from './FlowWorkerLifecycle.js'; import { BatchProcessor } from '../core/BatchProcessor.js'; -import type { FlowWorkerConfig, ResolvedFlowWorkerConfig } from '../core/workerConfigTypes.js'; +import type { + FlowWorkerConfig, + ResolvedFlowWorkerConfig, +} from '../core/workerConfigTypes.js'; // Re-export type from workerConfigTypes to maintain backward compatibility export type { FlowWorkerConfig } from '../core/workerConfigTypes.js'; @@ -23,7 +29,7 @@ const DEFAULT_FLOW_CONFIG = { maxConcurrent: 10, maxPgConnections: 4, batchSize: 10, - visibilityTimeout: 2, + visibilityTimeout: 5, maxPollSeconds: 2, pollIntervalMs: 100, } as const; @@ -31,13 +37,17 @@ const DEFAULT_FLOW_CONFIG = { /** * Normalizes flow worker configuration by applying all defaults */ -function normalizeFlowConfig(config: FlowWorkerConfig, sql: postgres.Sql, platformEnv: Record): ResolvedFlowWorkerConfig { +function normalizeFlowConfig( + config: FlowWorkerConfig, + sql: postgres.Sql, + platformEnv: Record +): ResolvedFlowWorkerConfig { return { ...DEFAULT_FLOW_CONFIG, ...config, sql, env: platformEnv, - connectionString: config.connectionString + connectionString: config.connectionString, }; } @@ -51,7 +61,10 @@ function normalizeFlowConfig(config: FlowWorkerConfig, sql: postgres.Sql, platfo * @param platformAdapter - Platform adapter for creating contexts * @returns A configured Worker instance ready to be started */ -export function createFlowWorker>( +export function createFlowWorker< + TFlow extends AnyFlow, + TResources extends Record +>( flow: TFlow, config: FlowWorkerConfig, createLogger: (module: string) => Logger, @@ -92,7 +105,7 @@ export function createFlowWorker>( + const executionController = new ExecutionController< + StepTaskWithMessage + >( executorFactory, abortSignal, { @@ -190,4 +205,4 @@ export function createFlowWorker