Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/requeue-stalled-tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@pgflow/core': patch
'@pgflow/edge-worker': patch
---

Add automatic requeue for stalled tasks via cron job - tasks stuck beyond timeout+30s are requeued up to 3 times, then archived with status left as 'started' for easy identification (closes #586)
3 changes: 3 additions & 0 deletions pkgs/core/schemas/0060_tables_runtime.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ create table pgflow.step_tasks (
completed_at timestamptz,
failed_at timestamptz,
last_worker_id uuid references pgflow.workers (worker_id) on delete set null,
-- Requeue tracking columns
requeued_count int not null default 0,
last_requeued_at timestamptz,
constraint step_tasks_pkey primary key (run_id, step_slug, task_index),
foreign key (run_id, step_slug)
references pgflow.step_states (run_id, step_slug),
Expand Down
119 changes: 119 additions & 0 deletions pkgs/core/schemas/0062_function_requeue_stalled_tasks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
-- Requeue stalled tasks that have been in 'started' status longer than their timeout + 30s buffer
-- This handles tasks that got stuck when workers crashed without completing them
create or replace function pgflow.requeue_stalled_tasks()
returns int
language plpgsql
security definer
set search_path = ''
as $$
declare
result_count int := 0;
max_requeues constant int := 3;
begin
-- Find and requeue stalled tasks (where started_at > timeout + 30s buffer)
-- Tasks with requeued_count >= max_requeues will have their message archived
-- but status left as 'started' for easy identification via requeued_count column
with stalled_tasks as (
select
st.run_id,
st.step_slug,
st.task_index,
st.message_id,
r.flow_slug,
st.requeued_count,
f.opt_timeout
from pgflow.step_tasks st
join pgflow.runs r on r.run_id = st.run_id
join pgflow.flows f on f.flow_slug = r.flow_slug
where st.status = 'started'
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
for update of st skip locked
),
-- Separate tasks that can be requeued from those that exceeded max requeues
to_requeue as (
select * from stalled_tasks where requeued_count < max_requeues
),
to_archive as (
select * from stalled_tasks where requeued_count >= max_requeues
),
-- Update tasks that will be requeued
requeued as (
update pgflow.step_tasks st
set
status = 'queued',
started_at = null,
last_worker_id = null,
requeued_count = st.requeued_count + 1,
last_requeued_at = now()
from to_requeue tr
where st.run_id = tr.run_id
and st.step_slug = tr.step_slug
and st.task_index = tr.task_index
returning tr.flow_slug as queue_name, tr.message_id
),
-- Make requeued messages visible immediately (batched per queue)
visibility_reset as (
select pgflow.set_vt_batch(
r.queue_name,
array_agg(r.message_id),
array_agg(0) -- all offsets are 0 (immediate visibility)
)
from requeued r
where r.message_id is not null
group by r.queue_name
),
-- Archive messages for tasks that exceeded max requeues (batched per queue)
-- Task status remains 'started' with requeued_count >= 3 for easy identification
archived as (
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
from to_archive ta
where ta.message_id is not null
group by ta.flow_slug
),
-- Force execution of visibility_reset CTE
_vr as (select count(*) from visibility_reset),
-- Force execution of archived CTE
_ar as (select count(*) from archived)
select count(*) into result_count
from requeued, _vr, _ar;

return result_count;
end;
$$;

-- Cron setup function for automatic requeue monitoring
create or replace function pgflow.setup_requeue_stalled_tasks_cron(
cron_interval text default '15 seconds'
)
returns text
language plpgsql
security definer
set search_path = pgflow, cron, pg_temp
as $$
declare
job_id bigint;
begin
-- Remove existing job if any
begin
perform cron.unschedule('pgflow_requeue_stalled_tasks');
exception
when others then null;
end;

-- Schedule the new job
job_id := cron.schedule(
job_name => 'pgflow_requeue_stalled_tasks',
schedule => setup_requeue_stalled_tasks_cron.cron_interval,
command => 'select pgflow.requeue_stalled_tasks()'
);

return format('Scheduled pgflow_requeue_stalled_tasks (every %s, job_id=%s)',
setup_requeue_stalled_tasks_cron.cron_interval, job_id);
end;
$$;

comment on function pgflow.setup_requeue_stalled_tasks_cron(text) is
'Sets up cron job to automatically requeue stalled tasks.
Schedules pgflow_requeue_stalled_tasks at the specified cron_interval (default: 15 seconds).
Replaces existing job if it exists (idempotent).
Returns a confirmation message with job ID.';
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
-- Modify "step_tasks" table
ALTER TABLE "pgflow"."step_tasks" ADD COLUMN "requeued_count" integer NOT NULL DEFAULT 0, ADD COLUMN "last_requeued_at" timestamptz NULL;
-- Create "requeue_stalled_tasks" function
CREATE FUNCTION "pgflow"."requeue_stalled_tasks" () RETURNS integer LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = '' AS $$
declare
result_count int := 0;
max_requeues constant int := 3;
begin
-- Find and requeue stalled tasks (where started_at > timeout + 30s buffer)
-- Tasks with requeued_count >= max_requeues will have their message archived
-- but status left as 'started' for easy identification via requeued_count column
with stalled_tasks as (
select
st.run_id,
st.step_slug,
st.task_index,
st.message_id,
r.flow_slug,
st.requeued_count,
f.opt_timeout
from pgflow.step_tasks st
join pgflow.runs r on r.run_id = st.run_id
join pgflow.flows f on f.flow_slug = r.flow_slug
where st.status = 'started'
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
for update of st skip locked
),
-- Separate tasks that can be requeued from those that exceeded max requeues
to_requeue as (
select * from stalled_tasks where requeued_count < max_requeues
),
to_archive as (
select * from stalled_tasks where requeued_count >= max_requeues
),
-- Update tasks that will be requeued
requeued as (
update pgflow.step_tasks st
set
status = 'queued',
started_at = null,
last_worker_id = null,
requeued_count = st.requeued_count + 1,
last_requeued_at = now()
from to_requeue tr
where st.run_id = tr.run_id
and st.step_slug = tr.step_slug
and st.task_index = tr.task_index
returning tr.flow_slug as queue_name, tr.message_id
),
-- Make requeued messages visible immediately (batched per queue)
visibility_reset as (
select pgflow.set_vt_batch(
r.queue_name,
array_agg(r.message_id),
array_agg(0) -- all offsets are 0 (immediate visibility)
)
from requeued r
where r.message_id is not null
group by r.queue_name
),
-- Archive messages for tasks that exceeded max requeues (batched per queue)
-- Task status remains 'started' with requeued_count >= 3 for easy identification
archived as (
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
from to_archive ta
where ta.message_id is not null
group by ta.flow_slug
),
-- Force execution of visibility_reset CTE
_vr as (select count(*) from visibility_reset),
-- Force execution of archived CTE
_ar as (select count(*) from archived)
select count(*) into result_count
from requeued, _vr, _ar;

return result_count;
end;
$$;
-- Create "setup_requeue_stalled_tasks_cron" function
CREATE FUNCTION "pgflow"."setup_requeue_stalled_tasks_cron" ("cron_interval" text DEFAULT '15 seconds') RETURNS text LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = pgflow, cron, pg_temp AS $$
declare
job_id bigint;
begin
-- Remove existing job if any
begin
perform cron.unschedule('pgflow_requeue_stalled_tasks');
exception
when others then null;
end;

-- Schedule the new job
job_id := cron.schedule(
job_name => 'pgflow_requeue_stalled_tasks',
schedule => setup_requeue_stalled_tasks_cron.cron_interval,
command => 'select pgflow.requeue_stalled_tasks()'
);

return format('Scheduled pgflow_requeue_stalled_tasks (every %s, job_id=%s)',
setup_requeue_stalled_tasks_cron.cron_interval, job_id);
end;
$$;
-- Set comment to function: "setup_requeue_stalled_tasks_cron"
COMMENT ON FUNCTION "pgflow"."setup_requeue_stalled_tasks_cron" IS 'Sets up cron job to automatically requeue stalled tasks.
Schedules pgflow_requeue_stalled_tasks at the specified cron_interval (default: 15 seconds).
Replaces existing job if it exists (idempotent).
Returns a confirmation message with job ID.';

-- Setup cron job to automatically requeue stalled tasks (every 15 seconds)
SELECT pgflow.setup_requeue_stalled_tasks_cron('15 seconds');
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g=
h1:DGjqEwXpbrpLbFLXmBEHQabXBHvwznaMtUYaXEdYQ9k=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -16,3 +16,4 @@ h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g=
20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ=
20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc=
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
20260112094903_pgflow_requeue_stalled_tasks.sql h1:xDkh3LSMke9gG7Gd37D1EpNmzmjFuPzeY1VNWmEOlz4=
109 changes: 109 additions & 0 deletions pkgs/core/supabase/tests/requeue_stalled_tasks/basic_requeue.test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
-- Test: Basic requeue functionality for stalled tasks
begin;
select plan(12);

select pgflow_tests.reset_db();

-- Create a flow with timeout of 5 seconds
select pgflow.create_flow('test_flow', null, null, 5);
select pgflow.add_step('test_flow', 'step_a');

-- Start a flow run
select pgflow.start_flow('test_flow', '{"input": "test"}'::jsonb);

-- Ensure worker and read+start a task
select pgflow_tests.ensure_worker('test_flow');
select pgflow_tests.read_and_start('test_flow', 30, 1);

-- Test 1: Task is initially in 'started' status
select is(
(select status from pgflow.step_tasks where step_slug = 'step_a' limit 1),
'started',
'Task should be in started status initially'
);

-- Test 2: requeue_stalled_tasks returns 0 when no tasks are stalled (within timeout)
select is(
pgflow.requeue_stalled_tasks(),
0,
'Should return 0 when no tasks are stalled yet'
);

-- Test 3: Task still started (not stalled yet - within timeout + 30s buffer)
select is(
(select status from pgflow.step_tasks where step_slug = 'step_a' limit 1),
'started',
'Task should remain started when within timeout window'
);

-- Simulate a stalled task by backdating timestamps to timeout + 31 seconds ago
-- Must also backdate queued_at to satisfy started_at >= queued_at constraint
update pgflow.step_tasks
set
queued_at = now() - interval '40 seconds',
started_at = now() - interval '36 seconds'
where step_slug = 'step_a';

-- Test 4: requeue_stalled_tasks returns 1 when task is stalled
select is(
pgflow.requeue_stalled_tasks(),
1,
'Should return 1 when one task is stalled'
);

-- Test 5: Task is now back to 'queued' status
select is(
(select status from pgflow.step_tasks where step_slug = 'step_a' limit 1),
'queued',
'Stalled task should be requeued to queued status'
);

-- Test 6: requeued_count is incremented
select is(
(select requeued_count from pgflow.step_tasks where step_slug = 'step_a' limit 1),
1,
'requeued_count should be 1 after first requeue'
);

-- Test 7: last_requeued_at is set
select ok(
(select last_requeued_at is not null from pgflow.step_tasks where step_slug = 'step_a' limit 1),
'last_requeued_at should be set after requeue'
);

-- Test 8: started_at is cleared
select is(
(select started_at from pgflow.step_tasks where step_slug = 'step_a' limit 1),
null,
'started_at should be cleared after requeue'
);

-- Test 9: attempts_count is NOT reset (task will retry)
select is(
(select attempts_count from pgflow.step_tasks where step_slug = 'step_a' limit 1),
1,
'attempts_count should remain unchanged after requeue'
);

-- Test 10: Calling again returns 0 (task no longer stalled)
select is(
pgflow.requeue_stalled_tasks(),
0,
'Should return 0 when called again with no stalled tasks'
);

-- Test 11: Message is visible in queue again (can be read)
select ok(
(select count(*) > 0 from pgmq.read('test_flow', 0, 1)),
'Message should be readable from the queue after requeue'
);

-- Test 12: last_worker_id is cleared
select is(
(select last_worker_id from pgflow.step_tasks where step_slug = 'step_a' limit 1),
null,
'last_worker_id should be cleared after requeue'
);

select finish();
rollback;
Loading
Loading