From 622901ddb1fd5a6df82c3a695f534b24f17f0fb9 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 5 May 2026 06:17:32 +0000 Subject: [PATCH 1/6] Introduce Storage trait abstracting durable-runtime DB queries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move every sqlx query against the durable schema in `worker.rs`, `task.rs`, and `plugin/durable/notify.rs` behind a new `Storage` trait, with a `PgStorage` impl that holds the existing query bodies verbatim. Transaction lifecycle (`pool.begin()`, `pool.acquire()`, commit/rollback) stays at the call sites — methods take a borrowed `&mut sqlx::PgConnection`, which works for both pooled connections and transactions via sqlx's `Executor` blanket impl. This is a mechanical refactor with no functional or behavioural changes. The trait is the scaffolding for a future swappable backend (e.g. SQLite with a polling executor); for now it is `pub(crate)` and Postgres-typed. The new `.sqlx/query-*.json` files are regenerated cache entries for the queries whose surrounding code indentation changed when they moved into trait-method bodies. Their describe info is identical to the previous entries — only the literal whitespace in the SQL string differs. --- ...368aa6e0fcbe476ca57841ce2c0e5e995d1e9.json | 14 + ...e32c48eef6bddbd5192c1677772cbf51dbe84.json | 14 + ...dfc75722e027524c753738323bea2f343fb56.json | 12 + ...8a50c5731c6b9bb4dfae298197349827113b6.json | 14 + ...9451793d9f5f8b5af7be67c348d952216ac92.json | 15 + ...9597bd10ac826027f2819e6f6f38fa8ad10be.json | 15 + ...01ebdb7cc711e0c41b0f116b518b3b26156c9.json | 22 + ...2c6894b1eb27718b2e01878138d05c9fdb0e7.json | 28 + ...c41e2d3a82eb19f95d8e2a3cf721298c965d2.json | 15 + ...a001a52fae5d290811ea953f2ad022d96dfed.json | 15 + ...4edc7f9aee0703ea1f5c1c84b221c0dd529b1.json | 20 + ...683708fa4a03576b80a9a677732e5387865a2.json | 34 + ...30023315c5ce96b9b77b8d32e143f05d55f51.json | 35 + ...6507b05f9333aedaa31aa3128fcd59124618c.json | 16 + ...cf6b0a9e690fe51f5886b99cf6a5ee0ff5e75.json | 14 + ...59f91d8efe0b28c7d014c8320e973ea552dcf.json | 14 + ...dc5cd6bb5e3e24361bf22d896a412fecbbf9c.json | 16 + ...626eb8dc9ebb07cd563b2681b106e6fa74596.json | 15 + crates/durable-runtime/src/lib.rs | 1 + .../src/plugin/durable/notify.rs | 97 +-- crates/durable-runtime/src/storage/mod.rs | 315 +++++++++ crates/durable-runtime/src/storage/pg.rs | 661 ++++++++++++++++++ crates/durable-runtime/src/task.rs | 94 +-- crates/durable-runtime/src/worker.rs | 413 +++-------- 24 files changed, 1473 insertions(+), 436 deletions(-) create mode 100644 .sqlx/query-0404a682784ca66d2c778acb3b0368aa6e0fcbe476ca57841ce2c0e5e995d1e9.json create mode 100644 .sqlx/query-22fd40e5440beb2915e95637404e32c48eef6bddbd5192c1677772cbf51dbe84.json create mode 100644 .sqlx/query-2d3b7402df579487b699a1b50b5dfc75722e027524c753738323bea2f343fb56.json create mode 100644 .sqlx/query-3402a2d91a23aa927386d4825768a50c5731c6b9bb4dfae298197349827113b6.json create mode 100644 .sqlx/query-3cd8e92d208177ffc7168ab4bf19451793d9f5f8b5af7be67c348d952216ac92.json create mode 100644 .sqlx/query-46866c27afbd80cda4ce93a15dc9597bd10ac826027f2819e6f6f38fa8ad10be.json create mode 100644 .sqlx/query-50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9.json create mode 100644 .sqlx/query-6586483f7d9bff87477cae679ad2c6894b1eb27718b2e01878138d05c9fdb0e7.json create mode 100644 .sqlx/query-685f6bb7abf389c452400578793c41e2d3a82eb19f95d8e2a3cf721298c965d2.json create mode 100644 .sqlx/query-6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed.json create mode 100644 .sqlx/query-6b9dd270eea86985822ded5a1fc4edc7f9aee0703ea1f5c1c84b221c0dd529b1.json create mode 100644 .sqlx/query-7079303b7c4017e46ef7bab339a683708fa4a03576b80a9a677732e5387865a2.json create mode 100644 .sqlx/query-9586a1de8c2d853dc4c2e33b5b930023315c5ce96b9b77b8d32e143f05d55f51.json create mode 100644 .sqlx/query-a2e037bea4471d830a137c39dad6507b05f9333aedaa31aa3128fcd59124618c.json create mode 100644 .sqlx/query-ed078b499703e5cc048730b948acf6b0a9e690fe51f5886b99cf6a5ee0ff5e75.json create mode 100644 .sqlx/query-f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf.json create mode 100644 .sqlx/query-f640d1e56e13001d184a4d13441dc5cd6bb5e3e24361bf22d896a412fecbbf9c.json create mode 100644 .sqlx/query-fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596.json create mode 100644 crates/durable-runtime/src/storage/mod.rs create mode 100644 crates/durable-runtime/src/storage/pg.rs diff --git a/.sqlx/query-0404a682784ca66d2c778acb3b0368aa6e0fcbe476ca57841ce2c0e5e995d1e9.json b/.sqlx/query-0404a682784ca66d2c778acb3b0368aa6e0fcbe476ca57841ce2c0e5e995d1e9.json new file mode 100644 index 0000000..3c7a79b --- /dev/null +++ b/.sqlx/query-0404a682784ca66d2c778acb3b0368aa6e0fcbe476ca57841ce2c0e5e995d1e9.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE durable.task\n SET state = 'complete',\n completed_at = CURRENT_TIMESTAMP,\n running_on = NULL,\n wasm = NULL\n WHERE id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "0404a682784ca66d2c778acb3b0368aa6e0fcbe476ca57841ce2c0e5e995d1e9" +} \ No newline at end of file diff --git a/.sqlx/query-22fd40e5440beb2915e95637404e32c48eef6bddbd5192c1677772cbf51dbe84.json b/.sqlx/query-22fd40e5440beb2915e95637404e32c48eef6bddbd5192c1677772cbf51dbe84.json new file mode 100644 index 0000000..a6b33ae --- /dev/null +++ b/.sqlx/query-22fd40e5440beb2915e95637404e32c48eef6bddbd5192c1677772cbf51dbe84.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE durable.task\n SET running_on = NULL\n WHERE state = 'ready'\n AND running_on = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "22fd40e5440beb2915e95637404e32c48eef6bddbd5192c1677772cbf51dbe84" +} \ No newline at end of file diff --git a/.sqlx/query-2d3b7402df579487b699a1b50b5dfc75722e027524c753738323bea2f343fb56.json b/.sqlx/query-2d3b7402df579487b699a1b50b5dfc75722e027524c753738323bea2f343fb56.json new file mode 100644 index 0000000..9ad6465 --- /dev/null +++ b/.sqlx/query-2d3b7402df579487b699a1b50b5dfc75722e027524c753738323bea2f343fb56.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE durable.task\n SET state = 'ready'\n WHERE state = 'suspended'\n AND EXISTS((\n SELECT task_id\n FROM durable.notification\n WHERE task_id = task.id\n AND created_at < NOW() - '10 minutes'::interval\n ))\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "2d3b7402df579487b699a1b50b5dfc75722e027524c753738323bea2f343fb56" +} \ No newline at end of file diff --git a/.sqlx/query-3402a2d91a23aa927386d4825768a50c5731c6b9bb4dfae298197349827113b6.json b/.sqlx/query-3402a2d91a23aa927386d4825768a50c5731c6b9bb4dfae298197349827113b6.json new file mode 100644 index 0000000..561818b --- /dev/null +++ b/.sqlx/query-3402a2d91a23aa927386d4825768a50c5731c6b9bb4dfae298197349827113b6.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE durable.task\n SET state = 'suspended',\n running_on = NULL\n WHERE id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "3402a2d91a23aa927386d4825768a50c5731c6b9bb4dfae298197349827113b6" +} \ No newline at end of file diff --git a/.sqlx/query-3cd8e92d208177ffc7168ab4bf19451793d9f5f8b5af7be67c348d952216ac92.json b/.sqlx/query-3cd8e92d208177ffc7168ab4bf19451793d9f5f8b5af7be67c348d952216ac92.json new file mode 100644 index 0000000..9d6798f --- /dev/null +++ b/.sqlx/query-3cd8e92d208177ffc7168ab4bf19451793d9f5f8b5af7be67c348d952216ac92.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM durable.task\n WHERE task.ctid = ANY(ARRAY(\n SELECT ctid\n FROM durable.task\n WHERE completed_at < NOW() - $1::interval\n LIMIT $2\n FOR UPDATE\n ))\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Interval", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "3cd8e92d208177ffc7168ab4bf19451793d9f5f8b5af7be67c348d952216ac92" +} \ No newline at end of file diff --git a/.sqlx/query-46866c27afbd80cda4ce93a15dc9597bd10ac826027f2819e6f6f38fa8ad10be.json b/.sqlx/query-46866c27afbd80cda4ce93a15dc9597bd10ac826027f2819e6f6f38fa8ad10be.json new file mode 100644 index 0000000..ca425ff --- /dev/null +++ b/.sqlx/query-46866c27afbd80cda4ce93a15dc9597bd10ac826027f2819e6f6f38fa8ad10be.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE durable.task\n SET state = 'ready',\n running_on = NULL\n WHERE id = $1\n AND running_on = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "46866c27afbd80cda4ce93a15dc9597bd10ac826027f2819e6f6f38fa8ad10be" +} \ No newline at end of file diff --git a/.sqlx/query-50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9.json b/.sqlx/query-50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9.json new file mode 100644 index 0000000..060d48b --- /dev/null +++ b/.sqlx/query-50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE durable.worker\n SET heartbeat_at = CURRENT_TIMESTAMP\n WHERE id = $1\n RETURNING id", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9" +} \ No newline at end of file diff --git a/.sqlx/query-6586483f7d9bff87477cae679ad2c6894b1eb27718b2e01878138d05c9fdb0e7.json b/.sqlx/query-6586483f7d9bff87477cae679ad2c6894b1eb27718b2e01878138d05c9fdb0e7.json new file mode 100644 index 0000000..0964212 --- /dev/null +++ b/.sqlx/query-6586483f7d9bff87477cae679ad2c6894b1eb27718b2e01878138d05c9fdb0e7.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH\n prev AS (\n SELECT id, heartbeat_at\n FROM durable.worker\n WHERE id < $1\n ORDER BY id DESC\n LIMIT 1\n ),\n next AS (\n SELECT id, heartbeat_at\n FROM durable.worker\n WHERE NOT id = $1\n ORDER BY id DESC\n LIMIT 1\n ),\n combined AS (\n SELECT * FROM prev\n UNION ALL\n SELECT * FROM next\n )\n SELECT\n id as \"id!\",\n heartbeat_at as \"heartbeat_at!\"\n FROM combined\n ORDER BY id ASC\n LIMIT 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "heartbeat_at!", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "6586483f7d9bff87477cae679ad2c6894b1eb27718b2e01878138d05c9fdb0e7" +} \ No newline at end of file diff --git a/.sqlx/query-685f6bb7abf389c452400578793c41e2d3a82eb19f95d8e2a3cf721298c965d2.json b/.sqlx/query-685f6bb7abf389c452400578793c41e2d3a82eb19f95d8e2a3cf721298c965d2.json new file mode 100644 index 0000000..9b84ab5 --- /dev/null +++ b/.sqlx/query-685f6bb7abf389c452400578793c41e2d3a82eb19f95d8e2a3cf721298c965d2.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE durable.task\n SET state = 'ready',\n running_on = NULL\n WHERE id = ANY($1::bigint[])\n AND running_on = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8Array", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "685f6bb7abf389c452400578793c41e2d3a82eb19f95d8e2a3cf721298c965d2" +} \ No newline at end of file diff --git a/.sqlx/query-6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed.json b/.sqlx/query-6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed.json new file mode 100644 index 0000000..8c732f3 --- /dev/null +++ b/.sqlx/query-6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM durable.worker\n WHERE id = $1\n AND CURRENT_TIMESTAMP - heartbeat_at > $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Interval" + ] + }, + "nullable": [] + }, + "hash": "6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed" +} \ No newline at end of file diff --git a/.sqlx/query-6b9dd270eea86985822ded5a1fc4edc7f9aee0703ea1f5c1c84b221c0dd529b1.json b/.sqlx/query-6b9dd270eea86985822ded5a1fc4edc7f9aee0703ea1f5c1c84b221c0dd529b1.json new file mode 100644 index 0000000..da681d2 --- /dev/null +++ b/.sqlx/query-6b9dd270eea86985822ded5a1fc4edc7f9aee0703ea1f5c1c84b221c0dd529b1.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT wakeup_at as \"wakeup_at!\"\n FROM durable.task\n WHERE state = 'suspended'\n AND wakeup_at IS NOT NULL\n ORDER BY wakeup_at ASC\n LIMIT 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "wakeup_at!", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + true + ] + }, + "hash": "6b9dd270eea86985822ded5a1fc4edc7f9aee0703ea1f5c1c84b221c0dd529b1" +} \ No newline at end of file diff --git a/.sqlx/query-7079303b7c4017e46ef7bab339a683708fa4a03576b80a9a677732e5387865a2.json b/.sqlx/query-7079303b7c4017e46ef7bab339a683708fa4a03576b80a9a677732e5387865a2.json new file mode 100644 index 0000000..72320bc --- /dev/null +++ b/.sqlx/query-7079303b7c4017e46ef7bab339a683708fa4a03576b80a9a677732e5387865a2.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM durable.notification\n WHERE ctid IN (\n SELECT ctid\n FROM durable.notification\n WHERE task_id = $1\n ORDER BY created_at ASC\n LIMIT 1\n FOR UPDATE\n )\n RETURNING\n created_at,\n event,\n data as \"data: Json>\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 1, + "name": "event", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "data: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "7079303b7c4017e46ef7bab339a683708fa4a03576b80a9a677732e5387865a2" +} \ No newline at end of file diff --git a/.sqlx/query-9586a1de8c2d853dc4c2e33b5b930023315c5ce96b9b77b8d32e143f05d55f51.json b/.sqlx/query-9586a1de8c2d853dc4c2e33b5b930023315c5ce96b9b77b8d32e143f05d55f51.json new file mode 100644 index 0000000..af6e145 --- /dev/null +++ b/.sqlx/query-9586a1de8c2d853dc4c2e33b5b930023315c5ce96b9b77b8d32e143f05d55f51.json @@ -0,0 +1,35 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT state as \"state!: TaskState\"\n FROM durable.task\n WHERE task.id = $1\n FOR UPDATE\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "state!: TaskState", + "type_info": { + "Custom": { + "name": "durable.task_state", + "kind": { + "Enum": [ + "ready", + "active", + "suspended", + "complete", + "failed" + ] + } + } + } + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "9586a1de8c2d853dc4c2e33b5b930023315c5ce96b9b77b8d32e143f05d55f51" +} \ No newline at end of file diff --git a/.sqlx/query-a2e037bea4471d830a137c39dad6507b05f9333aedaa31aa3128fcd59124618c.json b/.sqlx/query-a2e037bea4471d830a137c39dad6507b05f9333aedaa31aa3128fcd59124618c.json new file mode 100644 index 0000000..b504bfc --- /dev/null +++ b/.sqlx/query-a2e037bea4471d830a137c39dad6507b05f9333aedaa31aa3128fcd59124618c.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO durable.log(task_id, index, message)\n VALUES ($1, $2, $3)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Int4", + "Text" + ] + }, + "nullable": [] + }, + "hash": "a2e037bea4471d830a137c39dad6507b05f9333aedaa31aa3128fcd59124618c" +} \ No newline at end of file diff --git a/.sqlx/query-ed078b499703e5cc048730b948acf6b0a9e690fe51f5886b99cf6a5ee0ff5e75.json b/.sqlx/query-ed078b499703e5cc048730b948acf6b0a9e690fe51f5886b99cf6a5ee0ff5e75.json new file mode 100644 index 0000000..63c12b2 --- /dev/null +++ b/.sqlx/query-ed078b499703e5cc048730b948acf6b0a9e690fe51f5886b99cf6a5ee0ff5e75.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE durable.task\n SET state = 'failed',\n completed_at = CURRENT_TIMESTAMP,\n running_on = NULL,\n wasm = NULL\n WHERE id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "ed078b499703e5cc048730b948acf6b0a9e690fe51f5886b99cf6a5ee0ff5e75" +} \ No newline at end of file diff --git a/.sqlx/query-f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf.json b/.sqlx/query-f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf.json new file mode 100644 index 0000000..0541d8d --- /dev/null +++ b/.sqlx/query-f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE durable.task\n SET state = 'ready',\n wakeup_at = NULL,\n running_on = (\n SELECT id\n FROM durable.worker\n ORDER BY random() + task.id\n LIMIT 1\n )\n WHERE state = 'suspended'\n AND wakeup_at <= (NOW() - $1::interval)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Interval" + ] + }, + "nullable": [] + }, + "hash": "f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf" +} \ No newline at end of file diff --git a/.sqlx/query-f640d1e56e13001d184a4d13441dc5cd6bb5e3e24361bf22d896a412fecbbf9c.json b/.sqlx/query-f640d1e56e13001d184a4d13441dc5cd6bb5e3e24361bf22d896a412fecbbf9c.json new file mode 100644 index 0000000..3b6c3fc --- /dev/null +++ b/.sqlx/query-f640d1e56e13001d184a4d13441dc5cd6bb5e3e24361bf22d896a412fecbbf9c.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO durable.log(task_id, index, message)\n VALUES ($1, $2, $3)\n ON CONFLICT ON CONSTRAINT log_pkey DO UPDATE\n SET message = $3\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Int4", + "Text" + ] + }, + "nullable": [] + }, + "hash": "f640d1e56e13001d184a4d13441dc5cd6bb5e3e24361bf22d896a412fecbbf9c" +} \ No newline at end of file diff --git a/.sqlx/query-fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596.json b/.sqlx/query-fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596.json new file mode 100644 index 0000000..203980b --- /dev/null +++ b/.sqlx/query-fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM durable.worker\n WHERE CURRENT_TIMESTAMP - heartbeat_at > $2\n AND NOT id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Interval" + ] + }, + "nullable": [] + }, + "hash": "fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596" +} \ No newline at end of file diff --git a/crates/durable-runtime/src/lib.rs b/crates/durable-runtime/src/lib.rs index 43b1148..469298f 100644 --- a/crates/durable-runtime/src/lib.rs +++ b/crates/durable-runtime/src/lib.rs @@ -14,6 +14,7 @@ pub mod migrate; pub mod plugin; mod resource; pub mod scheduler; +pub(crate) mod storage; pub mod task; pub mod util; mod worker; diff --git a/crates/durable-runtime/src/plugin/durable/notify.rs b/crates/durable-runtime/src/plugin/durable/notify.rs index eb488ff..90c1327 100644 --- a/crates/durable-runtime/src/plugin/durable/notify.rs +++ b/crates/durable-runtime/src/plugin/durable/notify.rs @@ -5,6 +5,7 @@ use tokio::sync::broadcast::error::RecvError; use tokio::time::Instant; use crate::bindings::durable::core::notify::{Event, Host, NotifyError}; +use crate::storage::TaskState; use crate::task::TransactionOptions; use crate::{Task, TaskStatus}; @@ -12,27 +13,17 @@ async fn poll_notification( task: &mut Task, tx: &mut sqlx::PgConnection, ) -> anyhow::Result> { - let data = sqlx::query_as!( - EventData, - r#" - DELETE FROM durable.notification - WHERE ctid IN ( - SELECT ctid - FROM durable.notification - WHERE task_id = $1 - ORDER BY created_at ASC - LIMIT 1 - FOR UPDATE - ) - RETURNING - created_at, - event, - data as "data: Json>" - "#, - task.state.task_id() - ) - .fetch_optional(&mut *tx) - .await?; + let task_id = task.state.task_id(); + let data = task + .state + .storage() + .poll_notification(&mut *tx, task_id) + .await? + .map(|n| EventData { + created_at: n.created_at, + event: n.event, + data: n.data, + }); Ok(data) } @@ -86,16 +77,10 @@ impl Host for Task { // The timer expired, so we need to attempt to suspend. let mut tx = self.state.pool().begin().await?; - sqlx::query!( - "UPDATE durable.task - SET state = 'suspended', - running_on = NULL - WHERE id = $1 - ", - self.task_id() - ) - .execute(&mut *tx) - .await?; + self.state + .storage() + .suspend_task_no_wakeup(&mut *tx, self.task_id()) + .await?; if poll_notification(&mut *self, &mut tx).await?.is_some() { // A new notification barged in while we were updating. Roll back the @@ -206,16 +191,10 @@ impl Host for Task { Some(Expired::Suspend) => { let mut tx = self.state.pool().begin().await?; - sqlx::query!( - "UPDATE durable.task - SET state = 'suspended', - running_on = NULL - WHERE id = $1 - ", - self.task_id() - ) - .execute(&mut *tx) - .await?; + self.state + .storage() + .suspend_task_no_wakeup(&mut *tx, self.task_id()) + .await?; if poll_notification(&mut *self, &mut tx).await?.is_some() { // A new notification barged in while we were updating. @@ -250,6 +229,7 @@ impl Host for Task { return Ok(result); } + let storage = self.state.shared.storage.clone(); let txn = self.state.transaction_mut().unwrap(); let tx = txn.conn().unwrap(); @@ -261,17 +241,7 @@ impl Host for Task { // Note: We lock the row here so that concurrent notification polls // cannot barge in here. - let state = sqlx::query_scalar!( - r#" - SELECT state as "state!: TaskState" - FROM durable.task - WHERE task.id = $1 - FOR UPDATE - "#, - task - ) - .fetch_optional(&mut **tx) - .await?; + let state = storage.fetch_task_state_locked(&mut **tx, task).await?; match state { Some(TaskState::Complete | TaskState::Failed) => { @@ -281,17 +251,9 @@ impl Host for Task { _ => (), } - let result = sqlx::query_scalar!( - r#" - INSERT INTO durable.notification(task_id, event, data) - VALUES ($1, $2, $3) - "#, - task, - event, - Json(json) as Json<&RawValue> - ) - .execute(&mut **tx) - .await; + let result = storage + .insert_notification(&mut **tx, task, &event, Json(json)) + .await; match result { Ok(_) => Ok(Ok(())), @@ -360,12 +322,3 @@ impl<'de> serde::Deserialize<'de> for NotifyError { } } -#[derive(Copy, Clone, Debug, Eq, PartialEq, sqlx::Type)] -#[sqlx(type_name = "durable.task_state", rename_all = "lowercase")] -enum TaskState { - Ready, - Active, - Suspended, - Complete, - Failed, -} diff --git a/crates/durable-runtime/src/storage/mod.rs b/crates/durable-runtime/src/storage/mod.rs new file mode 100644 index 0000000..f014882 --- /dev/null +++ b/crates/durable-runtime/src/storage/mod.rs @@ -0,0 +1,315 @@ +//! Database storage abstraction for the durable runtime. +//! +//! The [`Storage`] trait collects every query the runtime issues against the +//! durable schema. Each method corresponds to one logical operation and its +//! body is a verbatim move of the SQL that previously lived inline at the +//! call site. Transaction lifecycle (`pool.begin()`, `pool.acquire()`, +//! `commit`/`rollback`) remains at the call sites — methods take a borrowed +//! [`sqlx::PgConnection`], which works for both pooled connections and +//! transactions via sqlx's `Executor` blanket impl. +//! +//! The current implementation, [`PgStorage`], is Postgres-specific and is +//! intended to be the only impl for now. The trait exists so that future +//! work can introduce alternative backends (e.g. SQLite with a polling +//! executor) without further restructuring of the runtime. + +mod pg; + +pub(crate) use self::pg::PgStorage; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde_json::value::RawValue; +use sqlx::postgres::types::PgInterval; +use sqlx::postgres::PgQueryResult; +use sqlx::types::Json; +use sqlx::PgConnection; + +use crate::task::RecordedEvent; +use crate::worker::TaskData; + +/// The state of a row in `durable.task`. +#[derive(Copy, Clone, Debug, Eq, PartialEq, sqlx::Type)] +#[sqlx(type_name = "durable.task_state", rename_all = "lowercase")] +pub(crate) enum TaskState { + Ready, + Active, + Suspended, + Complete, + Failed, +} + +/// A worker row, as returned by the validate-workers sequencing query. +#[derive(Debug)] +pub(crate) struct WorkerRecord { + pub id: i64, + pub heartbeat_at: DateTime, +} + +/// A row from `durable.event`, as returned when looking up a previously +/// recorded transaction by index. +#[derive(Debug)] +pub(crate) struct StoredEvent { + pub label: String, + pub value: Json>, +} + +/// A row from `durable.notification`, as returned by `poll_notification`. +#[derive(Debug)] +pub(crate) struct PolledNotification { + pub created_at: DateTime, + pub event: String, + pub data: Json>, +} + +/// Database operations issued by the durable runtime. +/// +/// All methods accept a borrowed [`PgConnection`] and leave transaction +/// lifecycle to the caller. See module-level docs for the rationale. +#[async_trait] +pub(crate) trait Storage: Send + Sync + 'static { + /// Access the underlying connection pool. + /// + /// Used by callers that need to acquire a connection or open a + /// transaction directly. Currently unused — transaction lifecycle is + /// driven through `SharedState::pool` until phase 2 of the storage + /// abstraction. + #[allow(dead_code)] + fn pool(&self) -> &sqlx::PgPool; + + // --------------------------------------------------------------------- + // Worker lifecycle + // --------------------------------------------------------------------- + + /// Insert a new worker row and return its id. + async fn insert_worker(&self, conn: &mut PgConnection) -> sqlx::Result; + + /// Delete the worker row with the given id. + async fn delete_worker( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result; + + /// Refresh the heartbeat for the given worker. Returns `true` if the row + /// still exists. + async fn heartbeat_worker( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result; + + /// Delete the worker we are following if its heartbeat has expired. + async fn delete_following_expired_worker( + &self, + conn: &mut PgConnection, + following: i64, + timeout: PgInterval, + ) -> sqlx::Result; + + /// Delete every other expired worker, leaving the current one alone. + async fn delete_other_expired_workers( + &self, + conn: &mut PgConnection, + worker_id: i64, + timeout: PgInterval, + ) -> sqlx::Result; + + /// Return the next worker in the validation sequence relative to + /// `worker_id`. See `Worker::validate_workers` for the algorithm. + async fn next_worker_in_sequence( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result>; + + /// Return the id of the current cluster leader (the oldest worker), if + /// any. + async fn load_leader_id(&self, conn: &mut PgConnection) -> sqlx::Result>; + + // --------------------------------------------------------------------- + // Task scheduling + // --------------------------------------------------------------------- + + /// Wake any suspended tasks whose `wakeup_at` has elapsed past the + /// configured suspend margin. + async fn wake_suspended_tasks( + &self, + conn: &mut PgConnection, + suspend_margin: PgInterval, + ) -> sqlx::Result; + + /// Return the earliest `wakeup_at` of any currently suspended task. + async fn next_wakeup_at( + &self, + conn: &mut PgConnection, + ) -> sqlx::Result>>; + + /// Delete a batch of completed tasks older than `cleanup_age`. + async fn cleanup_old_tasks( + &self, + conn: &mut PgConnection, + cleanup_age: PgInterval, + limit: i64, + ) -> sqlx::Result; + + /// Force-resume any tasks stuck waiting on notifications older than 10 + /// minutes. + async fn unwedge_stuck_notifications( + &self, + conn: &mut PgConnection, + ) -> sqlx::Result; + + /// Reset a batch of failed tasks back to the `ready` state. + async fn reset_failed_tasks( + &self, + conn: &mut PgConnection, + failed_ids: &[i64], + worker_id: i64, + ) -> sqlx::Result; + + /// Atomically claim up to `allowed` new tasks for `worker_id`. + async fn claim_tasks( + &self, + conn: &mut PgConnection, + worker_id: i64, + allowed: i64, + ) -> sqlx::Result>; + + /// Release any ready tasks currently assigned to `worker_id`. Used when + /// a worker hits its capacity before committing a claim. + async fn release_owned_ready_tasks( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result; + + /// Reset a single task back to `ready` so it can be picked up again, + /// only if it is still owned by the given worker. + async fn reset_task_for_retry( + &self, + conn: &mut PgConnection, + task_id: i64, + worker_id: i64, + ) -> sqlx::Result; + + /// Mark a task as successfully completed. + async fn mark_task_complete( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result; + + /// Mark a task as failed. + async fn mark_task_failed( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result; + + // --------------------------------------------------------------------- + // Task execution + // --------------------------------------------------------------------- + + /// Fetch the wasm bytecode for a stored program. + async fn fetch_wasm_blob( + &self, + conn: &mut PgConnection, + wasm_id: i64, + ) -> sqlx::Result>; + + /// Load up to 1000 recorded events for a task, used for replay. + async fn fetch_recorded_events( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result>; + + /// Look up a single recorded event for a task by its index. + async fn fetch_event_at_index( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + ) -> sqlx::Result>; + + /// Insert an event row, optionally insert a log row, and return the + /// task's `running_on` value — all in one round-trip. + async fn commit_event_with_log( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + label: &str, + value: Json<&RawValue>, + message: Option<&str>, + ) -> sqlx::Result>; + + /// Suspend a task and set an optional wakeup timestamp. + async fn suspend_task( + &self, + conn: &mut PgConnection, + task_id: i64, + wakeup_at: Option>, + ) -> sqlx::Result; + + /// Suspend a task without touching `wakeup_at`. Used by the notification + /// blocking path, which must not clobber an existing wakeup deadline. + async fn suspend_task_no_wakeup( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result; + + // --------------------------------------------------------------------- + // Logs + // --------------------------------------------------------------------- + + /// Append a row to `durable.log`. + async fn insert_log( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + message: &str, + ) -> sqlx::Result; + + /// Insert a log row, replacing the message if a row already exists at + /// the same `(task_id, index)`. Used to record the final error message + /// for a task. + async fn upsert_log_error( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + message: &str, + ) -> sqlx::Result; + + // --------------------------------------------------------------------- + // Notifications + // --------------------------------------------------------------------- + + /// Pop the oldest notification for a task, with row-level locking. + async fn poll_notification( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result>; + + /// Look up a task's state, locking the row for the duration of the + /// surrounding transaction. Used to safely insert a notification. + async fn fetch_task_state_locked( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result>; + + /// Insert a notification row. + async fn insert_notification( + &self, + conn: &mut PgConnection, + task_id: i64, + event: &str, + data: Json<&RawValue>, + ) -> sqlx::Result; +} diff --git a/crates/durable-runtime/src/storage/pg.rs b/crates/durable-runtime/src/storage/pg.rs new file mode 100644 index 0000000..446f047 --- /dev/null +++ b/crates/durable-runtime/src/storage/pg.rs @@ -0,0 +1,661 @@ +//! Postgres implementation of the [`Storage`](super::Storage) trait. +//! +//! Each method body is a verbatim move of the SQL that previously lived +//! inline at the call site. No semantic changes — see `storage::mod` docs. + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde_json::value::RawValue; +use sqlx::postgres::types::PgInterval; +use sqlx::postgres::PgQueryResult; +use sqlx::types::Json; +use sqlx::PgConnection; + +use super::{PolledNotification, Storage, StoredEvent, TaskState, WorkerRecord}; +use crate::task::RecordedEvent; +use crate::worker::TaskData; + +/// Postgres-backed storage. +#[derive(Clone)] +pub(crate) struct PgStorage { + #[allow(dead_code)] + pool: sqlx::PgPool, +} + +impl PgStorage { + pub(crate) fn new(pool: sqlx::PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl Storage for PgStorage { + fn pool(&self) -> &sqlx::PgPool { + &self.pool + } + + async fn insert_worker(&self, conn: &mut PgConnection) -> sqlx::Result { + let record = sqlx::query!( + " + INSERT INTO durable.worker(heartbeat_at) + VALUES (CURRENT_TIMESTAMP) + RETURNING id + " + ) + .fetch_one(&mut *conn) + .await?; + + Ok(record.id) + } + + async fn delete_worker( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result { + sqlx::query!("DELETE FROM durable.worker WHERE id = $1", worker_id) + .execute(&mut *conn) + .await + } + + async fn heartbeat_worker( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result { + let record = sqlx::query!( + "UPDATE durable.worker + SET heartbeat_at = CURRENT_TIMESTAMP + WHERE id = $1 + RETURNING id", + worker_id + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(record.is_some()) + } + + async fn delete_following_expired_worker( + &self, + conn: &mut PgConnection, + following: i64, + timeout: PgInterval, + ) -> sqlx::Result { + sqlx::query!( + " + DELETE FROM durable.worker + WHERE id = $1 + AND CURRENT_TIMESTAMP - heartbeat_at > $2 + ", + following, + timeout + ) + .execute(&mut *conn) + .await + } + + async fn delete_other_expired_workers( + &self, + conn: &mut PgConnection, + worker_id: i64, + timeout: PgInterval, + ) -> sqlx::Result { + sqlx::query!( + " + DELETE FROM durable.worker + WHERE CURRENT_TIMESTAMP - heartbeat_at > $2 + AND NOT id = $1 + ", + worker_id, + timeout + ) + .execute(&mut *conn) + .await + } + + async fn next_worker_in_sequence( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result> { + let record = sqlx::query!( + r#" + WITH + prev AS ( + SELECT id, heartbeat_at + FROM durable.worker + WHERE id < $1 + ORDER BY id DESC + LIMIT 1 + ), + next AS ( + SELECT id, heartbeat_at + FROM durable.worker + WHERE NOT id = $1 + ORDER BY id DESC + LIMIT 1 + ), + combined AS ( + SELECT * FROM prev + UNION ALL + SELECT * FROM next + ) + SELECT + id as "id!", + heartbeat_at as "heartbeat_at!" + FROM combined + ORDER BY id ASC + LIMIT 1 + "#, + worker_id + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(record.map(|r| WorkerRecord { + id: r.id, + heartbeat_at: r.heartbeat_at, + })) + } + + async fn load_leader_id(&self, conn: &mut PgConnection) -> sqlx::Result> { + let record = sqlx::query!( + " + SELECT id + FROM durable.worker + ORDER BY started_at ASC, id ASC + LIMIT 1 + " + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(record.map(|r| r.id)) + } + + async fn wake_suspended_tasks( + &self, + conn: &mut PgConnection, + suspend_margin: PgInterval, + ) -> sqlx::Result { + sqlx::query!( + " + UPDATE durable.task + SET state = 'ready', + wakeup_at = NULL, + running_on = ( + SELECT id + FROM durable.worker + ORDER BY random() + task.id + LIMIT 1 + ) + WHERE state = 'suspended' + AND wakeup_at <= (NOW() - $1::interval) + ", + suspend_margin + ) + .execute(&mut *conn) + .await + } + + async fn next_wakeup_at( + &self, + conn: &mut PgConnection, + ) -> sqlx::Result>> { + let record = sqlx::query!( + r#" + SELECT wakeup_at as "wakeup_at!" + FROM durable.task + WHERE state = 'suspended' + AND wakeup_at IS NOT NULL + ORDER BY wakeup_at ASC + LIMIT 1 + "# + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(record.map(|r| r.wakeup_at)) + } + + async fn cleanup_old_tasks( + &self, + conn: &mut PgConnection, + cleanup_age: PgInterval, + limit: i64, + ) -> sqlx::Result { + sqlx::query!( + r#" + DELETE FROM durable.task + WHERE task.ctid = ANY(ARRAY( + SELECT ctid + FROM durable.task + WHERE completed_at < NOW() - $1::interval + LIMIT $2 + FOR UPDATE + )) + "#, + cleanup_age, + limit + ) + .execute(&mut *conn) + .await + } + + async fn unwedge_stuck_notifications( + &self, + conn: &mut PgConnection, + ) -> sqlx::Result { + sqlx::query!( + r#" + UPDATE durable.task + SET state = 'ready' + WHERE state = 'suspended' + AND EXISTS(( + SELECT task_id + FROM durable.notification + WHERE task_id = task.id + AND created_at < NOW() - '10 minutes'::interval + )) + "# + ) + .execute(&mut *conn) + .await + } + + async fn reset_failed_tasks( + &self, + conn: &mut PgConnection, + failed_ids: &[i64], + worker_id: i64, + ) -> sqlx::Result { + sqlx::query!( + " + UPDATE durable.task + SET state = 'ready', + running_on = NULL + WHERE id = ANY($1::bigint[]) + AND running_on = $2 + ", + failed_ids, + worker_id + ) + .execute(&mut *conn) + .await + } + + async fn claim_tasks( + &self, + conn: &mut PgConnection, + worker_id: i64, + allowed: i64, + ) -> sqlx::Result> { + sqlx::query_as!( + TaskData, + r#" + WITH selected AS ( + SELECT id + FROM durable.task + WHERE (state IN ('ready', 'active') AND running_on IS NULL) + OR (state = 'ready' AND running_on = $1) + ORDER BY id ASC + FOR NO KEY UPDATE SKIP LOCKED + LIMIT $2 + ) + UPDATE durable.task + SET running_on = $1, + state = 'active' + FROM selected + WHERE selected.id = task.id + RETURNING + task.id as id, + task.name as name, + task.created_at as created_at, + task.wasm as "wasm!", + task.data as "data!: Json>" + "#, + worker_id, + allowed + ) + .fetch_all(&mut *conn) + .await + } + + async fn release_owned_ready_tasks( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result { + sqlx::query!( + " + UPDATE durable.task + SET running_on = NULL + WHERE state = 'ready' + AND running_on = $1 + ", + worker_id + ) + .execute(&mut *conn) + .await + } + + async fn reset_task_for_retry( + &self, + conn: &mut PgConnection, + task_id: i64, + worker_id: i64, + ) -> sqlx::Result { + sqlx::query!( + " + UPDATE durable.task + SET state = 'ready', + running_on = NULL + WHERE id = $1 + AND running_on = $2 + ", + task_id, + worker_id + ) + .execute(&mut *conn) + .await + } + + async fn mark_task_complete( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result { + sqlx::query!( + "UPDATE durable.task + SET state = 'complete', + completed_at = CURRENT_TIMESTAMP, + running_on = NULL, + wasm = NULL + WHERE id = $1 + ", + task_id + ) + .execute(&mut *conn) + .await + } + + async fn mark_task_failed( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result { + sqlx::query!( + "UPDATE durable.task + SET state = 'failed', + completed_at = CURRENT_TIMESTAMP, + running_on = NULL, + wasm = NULL + WHERE id = $1", + task_id + ) + .execute(&mut *conn) + .await + } + + async fn fetch_wasm_blob( + &self, + conn: &mut PgConnection, + wasm_id: i64, + ) -> sqlx::Result> { + let record = sqlx::query!("SELECT wasm FROM durable.wasm WHERE id = $1", wasm_id) + .fetch_one(&mut *conn) + .await?; + + Ok(record.wasm) + } + + async fn fetch_recorded_events( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result> { + sqlx::query_as!( + RecordedEvent, + r#" + SELECT + index, + label, + value as "value!: Json>" + FROM durable.event + WHERE task_id = $1 + ORDER BY index ASC + LIMIT 1000 + "#, + task_id + ) + .fetch_all(&mut *conn) + .await + } + + async fn fetch_event_at_index( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + ) -> sqlx::Result> { + let record = sqlx::query!( + r#" + SELECT + label, + value as "value: Json>" + FROM durable.event + WHERE task_id = $1 + AND index = $2 + "#, + task_id, + index + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(record.map(|r| StoredEvent { + label: r.label, + value: r.value, + })) + } + + async fn commit_event_with_log( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + label: &str, + value: Json<&RawValue>, + message: Option<&str>, + ) -> sqlx::Result> { + let record = sqlx::query!( + r#" + WITH + current_task AS ( + SELECT id, running_on + FROM durable.task + WHERE id = $1 + LIMIT 1 + ), + insert_event AS ( + INSERT INTO durable.event(task_id, index, label, value) + SELECT + id as task_id, + $2 as index, + $3 as label, + $4 as value + FROM current_task + RETURNING task_id + ), + insert_log AS ( + INSERT INTO durable.log(task_id, index, message) + SELECT task_id, index, message + FROM (VALUES ($1, $2, $5)) as t(task_id, index, message) + JOIN current_task task ON task.id = task_id + WHERE message IS NOT NULL + RETURNING task_id + ) + SELECT running_on + FROM current_task + LEFT JOIN insert_event event ON event.task_id = id + LEFT JOIN insert_event log ON log.task_id = id + "#, + task_id, + index, + label, + value as Json<&RawValue>, + message + ) + .fetch_one(&mut *conn) + .await?; + + Ok(record.running_on) + } + + async fn suspend_task( + &self, + conn: &mut PgConnection, + task_id: i64, + wakeup_at: Option>, + ) -> sqlx::Result { + sqlx::query!( + "UPDATE durable.task + SET state = 'suspended', + running_on = NULL, + wakeup_at = $2 + WHERE id = $1", + task_id, + wakeup_at + ) + .execute(&mut *conn) + .await + } + + async fn suspend_task_no_wakeup( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result { + sqlx::query!( + "UPDATE durable.task + SET state = 'suspended', + running_on = NULL + WHERE id = $1 + ", + task_id + ) + .execute(&mut *conn) + .await + } + + async fn insert_log( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + message: &str, + ) -> sqlx::Result { + sqlx::query!( + "INSERT INTO durable.log(task_id, index, message) + VALUES ($1, $2, $3)", + task_id, + index, + message + ) + .execute(&mut *conn) + .await + } + + async fn upsert_log_error( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + message: &str, + ) -> sqlx::Result { + sqlx::query!( + "INSERT INTO durable.log(task_id, index, message) + VALUES ($1, $2, $3) + ON CONFLICT ON CONSTRAINT log_pkey DO UPDATE + SET message = $3 + ", + task_id, + index, + message + ) + .execute(&mut *conn) + .await + } + + async fn poll_notification( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result> { + let record = sqlx::query!( + r#" + DELETE FROM durable.notification + WHERE ctid IN ( + SELECT ctid + FROM durable.notification + WHERE task_id = $1 + ORDER BY created_at ASC + LIMIT 1 + FOR UPDATE + ) + RETURNING + created_at, + event, + data as "data: Json>" + "#, + task_id + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(record.map(|r| PolledNotification { + created_at: r.created_at, + event: r.event, + data: r.data, + })) + } + + async fn fetch_task_state_locked( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result> { + sqlx::query_scalar!( + r#" + SELECT state as "state!: TaskState" + FROM durable.task + WHERE task.id = $1 + FOR UPDATE + "#, + task_id + ) + .fetch_optional(&mut *conn) + .await + } + + async fn insert_notification( + &self, + conn: &mut PgConnection, + task_id: i64, + event: &str, + data: Json<&RawValue>, + ) -> sqlx::Result { + sqlx::query!( + r#" + INSERT INTO durable.notification(task_id, event, data) + VALUES ($1, $2, $3) + "#, + task_id, + event, + data as Json<&RawValue> + ) + .execute(&mut *conn) + .await + } +} diff --git a/crates/durable-runtime/src/task.rs b/crates/durable-runtime/src/task.rs index 65752ec..77c793d 100644 --- a/crates/durable-runtime/src/task.rs +++ b/crates/durable-runtime/src/task.rs @@ -202,7 +202,7 @@ pub struct Task { } pub struct TaskState { - shared: Arc, + pub(crate) shared: Arc, worker_id: i64, task: TaskData, @@ -275,6 +275,10 @@ impl TaskState { &self.shared.pool } + pub(crate) fn storage(&self) -> &dyn crate::storage::Storage { + &*self.shared.storage + } + /// Access the reqwest client for the worker. pub fn client(&self) -> &reqwest::Client { &self.shared.client @@ -444,20 +448,11 @@ impl TaskState { ); } - let record = sqlx::query!( - r#" - SELECT - label, - value as "value: Json>" - FROM durable.event - WHERE task_id = $1 - AND index = $2 - "#, - self.task_id(), - self.txn_index - ) - .fetch_optional(&mut *conn) - .await?; + let record = self + .shared + .storage + .fetch_event_at_index(&mut *conn, self.task_id(), self.txn_index) + .await?; if let Some(record) = record { if record.label != options.label { @@ -600,47 +595,19 @@ impl TaskState { // - We avoid multiple roundtrips to the database. // - Since all the statements are conditional on running_on = worker_id, we can // run this outside of a transaction with no issues. - let running_on = sqlx::query!( - r#" - WITH - current_task AS ( - SELECT id, running_on - FROM durable.task - WHERE id = $1 - LIMIT 1 - ), - insert_event AS ( - INSERT INTO durable.event(task_id, index, label, value) - SELECT - id as task_id, - $2 as index, - $3 as label, - $4 as value - FROM current_task - RETURNING task_id - ), - insert_log AS ( - INSERT INTO durable.log(task_id, index, message) - SELECT task_id, index, message - FROM (VALUES ($1, $2, $5)) as t(task_id, index, message) - JOIN current_task task ON task.id = task_id - WHERE message IS NOT NULL - RETURNING task_id - ) - SELECT running_on - FROM current_task - LEFT JOIN insert_event event ON event.task_id = id - LEFT JOIN insert_event log ON log.task_id = id - "#, - self.task_id(), - self.txn_index, - &*txn.label, - Json(data) as Json<&T>, - message - ) - .fetch_one(&mut *conn) - .await? - .running_on; + let value = serde_json::value::to_raw_value(data)?; + let running_on = self + .shared + .storage + .commit_event_with_log( + &mut *conn, + self.task_id(), + self.txn_index, + &txn.label, + Json(&*value), + message.as_deref(), + ) + .await?; tracing::trace!( target: "durable_runtime::task::transaction", @@ -673,17 +640,10 @@ impl TaskState { conn: &mut PgConnection, timeout: Option>, ) -> anyhow::Result { - sqlx::query!( - "UPDATE durable.task - SET state = 'suspended', - running_on = NULL, - wakeup_at = $2 - WHERE id = $1", - self.task_id(), - timeout - ) - .execute(&mut *conn) - .await?; + self.shared + .storage + .suspend_task(&mut *conn, self.task_id(), timeout) + .await?; Ok(TaskStatus::Suspend) } diff --git a/crates/durable-runtime/src/worker.rs b/crates/durable-runtime/src/worker.rs index d301e62..5f9bfe0 100644 --- a/crates/durable-runtime/src/worker.rs +++ b/crates/durable-runtime/src/worker.rs @@ -24,7 +24,7 @@ use crate::event::{self, Event, EventSource, Notification}; use crate::flag::{ShutdownFlag, ShutdownGuard}; use crate::plugin::{DurablePlugin, Plugin}; use crate::scheduler::{Component as SchedulerComponent, ScheduleEvent}; -use crate::task::{RecordedEvent, Task, TaskState}; +use crate::task::{Task, TaskState}; use crate::util::{IntoPgInterval, Mailbox, MetricSpan}; use crate::Config; @@ -34,6 +34,7 @@ const LOG_PANIC_INDEX: i32 = i32::MAX; pub(crate) struct SharedState { pub shutdown: ShutdownFlag, pub pool: sqlx::PgPool, + pub(crate) storage: Arc, pub client: reqwest::Client, pub notifications: broadcast::Sender, pub config: Config, @@ -255,6 +256,7 @@ impl WorkerBuilder { suspend: Notify::new(), cache: Mutex::new(uluru::LRUCache::new()), compile_sema: Semaphore::new(self.config.max_concurrent_compilations), + storage: Arc::new(crate::storage::PgStorage::new(self.pool.clone())), pool: self.pool, config: self.config, plugins: self.plugins, @@ -339,16 +341,9 @@ impl Worker { } pub async fn run(&mut self) -> anyhow::Result<()> { - self.worker_id = sqlx::query!( - " - INSERT INTO durable.worker(heartbeat_at) - VALUES (CURRENT_TIMESTAMP) - RETURNING id - " - ) - .fetch_one(&self.shared.pool) - .await? - .id; + let mut conn = self.shared.pool.acquire().await?; + self.worker_id = self.shared.storage.insert_worker(&mut conn).await?; + drop(conn); tracing::info!("durable worker id is {}", self.worker_id); @@ -390,10 +385,15 @@ impl Worker { span.in_scope(|| { tracing::info!("deleting worker database entry"); }); - let result = sqlx::query!("DELETE FROM durable.worker WHERE id = $1", self.worker_id) - .execute(&self.shared.pool) - .await - .context("failed to delete the worker entry from the database"); + let result = async { + let mut conn = self.shared.pool.acquire().await?; + self.shared + .storage + .delete_worker(&mut conn, self.worker_id) + .await + } + .await + .context("failed to delete the worker entry from the database"); self.shared.scheduler.notify(ScheduleEvent::WorkerDeleted { worker_id: self.worker_id, @@ -431,21 +431,15 @@ impl Worker { .acquire(SchedulerComponent::Heartbeat { worker_id }) .await; - let record = sqlx::query!( - "UPDATE durable.worker - SET heartbeat_at = CURRENT_TIMESTAMP - WHERE id = $1 - RETURNING id", - worker_id - ) - .fetch_optional(&shared.pool) - .await?; + let mut conn = shared.pool.acquire().await?; + let alive = shared.storage.heartbeat_worker(&mut conn, worker_id).await?; + drop(conn); // Our record is gone from the database. This means that some other worker // determined that we were inactive. // // We should shutdown and then (optionally) restart with a new worker id. - if record.is_none() { + if !alive { shared.shutdown.raise(); anyhow::bail!("worker entry was deleted from the database"); } @@ -499,34 +493,20 @@ impl Worker { let timeout = shared.config.heartbeat_timeout.into_pg_interval(); let mut result = if let Some(following) = following.take() { - sqlx::query!( - " - DELETE FROM durable.worker - WHERE id = $1 - AND CURRENT_TIMESTAMP - heartbeat_at > $2 - ", - following, - timeout - ) - .execute(&mut *tx) - .await? + shared + .storage + .delete_following_expired_worker(&mut tx, following, timeout) + .await? } else { Default::default() }; if result.rows_affected() > 0 { result.extend(std::iter::once( - sqlx::query!( - " - DELETE FROM durable.worker - WHERE CURRENT_TIMESTAMP - heartbeat_at > $2 - AND NOT id = $1 - ", - worker_id, - timeout - ) - .execute(&mut *tx) - .await?, + shared + .storage + .delete_other_expired_workers(&mut tx, worker_id, timeout) + .await?, )); tracing::debug!( @@ -537,39 +517,10 @@ impl Worker { } // Select either the next worker in sequence, or the newest id in the sequence. - let record = sqlx::query!( - r#" - WITH - prev AS ( - SELECT id, heartbeat_at - FROM durable.worker - WHERE id < $1 - ORDER BY id DESC - LIMIT 1 - ), - next AS ( - SELECT id, heartbeat_at - FROM durable.worker - WHERE NOT id = $1 - ORDER BY id DESC - LIMIT 1 - ), - combined AS ( - SELECT * FROM prev - UNION ALL - SELECT * FROM next - ) - SELECT - id as "id!", - heartbeat_at as "heartbeat_at!" - FROM combined - ORDER BY id ASC - LIMIT 1 - "#, - worker_id - ) - .fetch_optional(&mut *tx) - .await?; + let record = shared + .storage + .next_worker_in_sequence(&mut tx, worker_id) + .await?; tx.commit().await?; @@ -641,24 +592,13 @@ impl Worker { // postgresql is forced to evaluate it for each row. // // If we don't do that all the rows here get the same random number. - let result = sqlx::query!( - " - UPDATE durable.task - SET state = 'ready', - wakeup_at = NULL, - running_on = ( - SELECT id - FROM durable.worker - ORDER BY random() + task.id - LIMIT 1 - ) - WHERE state = 'suspended' - AND wakeup_at <= (NOW() - $1::interval) - ", - shared.config.suspend_margin.into_pg_interval() - ) - .execute(&mut *conn) - .await?; + let result = shared + .storage + .wake_suspended_tasks( + &mut conn, + shared.config.suspend_margin.into_pg_interval(), + ) + .await?; let count = result.rows_affected(); if count > 0 { @@ -666,19 +606,7 @@ impl Worker { shared.scheduler.notify(ScheduleEvent::TasksWoken { count }); } - let wakeup_at = sqlx::query!( - r#" - SELECT wakeup_at as "wakeup_at!" - FROM durable.task - WHERE state = 'suspended' - AND wakeup_at IS NOT NULL - ORDER BY wakeup_at ASC - LIMIT 1 - "# - ) - .fetch_optional(&mut *conn) - .await? - .map(|record| record.wakeup_at); + let wakeup_at = shared.storage.next_wakeup_at(&mut conn).await?; let now = shared.clock.now(); let delay = match wakeup_at { @@ -743,22 +671,10 @@ impl Worker { // We do cleanup loop { - let result = sqlx::query!( - r#" - DELETE FROM durable.task - WHERE task.ctid = ANY(ARRAY( - SELECT ctid - FROM durable.task - WHERE completed_at < NOW() - $1::interval - LIMIT $2 - FOR UPDATE - )) - "#, - interval, - limit - ) - .execute(&mut *conn) - .await; + let result = shared + .storage + .cleanup_old_tasks(&mut conn, interval, limit) + .await; match result { Ok(result) if result.rows_affected() < limit as u64 => break, @@ -810,21 +726,7 @@ impl Worker { } }; - let result = sqlx::query!( - r#" - UPDATE durable.task - SET state = 'ready' - WHERE state = 'suspended' - AND EXISTS(( - SELECT task_id - FROM durable.notification - WHERE task_id = task.id - AND created_at < NOW() - '10 minutes'::interval - )) - "# - ) - .execute(&mut *conn) - .await; + let result = shared.storage.unwedge_stuck_notifications(&mut conn).await; match result { Ok(res) if res.rows_affected() != 0 => { @@ -893,19 +795,11 @@ impl Worker { } } - sqlx::query!( - " - UPDATE durable.task - SET state = 'ready', - running_on = NULL - WHERE id = ANY($1::bigint[]) - AND running_on = $2 - ", - &failed, - self.worker_id - ) - .execute(&self.shared.pool) - .await?; + let mut conn = self.shared.pool.acquire().await?; + self.shared + .storage + .reset_failed_tasks(&mut conn, &failed, self.worker_id) + .await?; continue; } @@ -949,19 +843,12 @@ impl Worker { } async fn load_leader_id(&mut self) -> anyhow::Result<()> { - let record = sqlx::query!( - " - SELECT id - FROM durable.worker - ORDER BY started_at ASC, id ASC - LIMIT 1 - " - ) - .fetch_optional(&self.shared.pool) - .await?; + let mut conn = self.shared.pool.acquire().await?; + let record = self.shared.storage.load_leader_id(&mut conn).await?; + drop(conn); let new_leader = match record { - Some(record) => record.id, + Some(id) => id, None => -1, }; @@ -991,48 +878,17 @@ impl Worker { .await; let mut tx = self.shared.pool.begin().await?; - let tasks = sqlx::query_as!( - TaskData, - r#" - WITH selected AS ( - SELECT id - FROM durable.task - WHERE (state IN ('ready', 'active') AND running_on IS NULL) - OR (state = 'ready' AND running_on = $1) - ORDER BY id ASC - FOR NO KEY UPDATE SKIP LOCKED - LIMIT $2 - ) - UPDATE durable.task - SET running_on = $1, - state = 'active' - FROM selected - WHERE selected.id = task.id - RETURNING - task.id as id, - task.name as name, - task.created_at as created_at, - task.wasm as "wasm!", - task.data as "data!: Json>" - "#, - self.worker_id, - allowed as i64 - ) - .fetch_all(&mut *tx) - .await?; + let tasks = self + .shared + .storage + .claim_tasks(&mut tx, self.worker_id, allowed as i64) + .await?; if tasks.len() + self.tasks.len() >= max_tasks { - sqlx::query!( - " - UPDATE durable.task - SET running_on = NULL - WHERE state = 'ready' - AND running_on = $1 - ", - self.worker_id - ) - .execute(&mut *tx) - .await?; + self.shared + .storage + .release_owned_ready_tasks(&mut tx, self.worker_id) + .await?; self.blocked = true; } @@ -1128,19 +984,11 @@ impl Worker { // // If this fails then the task failure gets reported to the main event // loop which can ensure it gets retried. - sqlx::query!( - " - UPDATE durable.task - SET state = 'ready', - running_on = NULL - WHERE id = $1 - AND running_on = $2 - ", - task_id, - worker_id - ) - .execute(&shared.pool) - .await?; + let mut conn = shared.pool.acquire().await?; + shared + .storage + .reset_task_for_retry(&mut conn, task_id, worker_id) + .await?; break TaskStatus::Suspend; } @@ -1153,14 +1001,13 @@ impl Worker { let message = format!("{error:?}\n"); - let result = sqlx::query!( - "INSERT INTO durable.log(task_id, index, message) - VALUES ($1, $2, $3)", - task_id, - LOG_ERROR_INDEX, - message - ) - .execute(&shared.pool) + let result = async { + let mut conn = shared.pool.acquire().await?; + shared + .storage + .insert_log(&mut conn, task_id, LOG_ERROR_INDEX, &message) + .await + } .await; if let Err(e) = result { @@ -1180,14 +1027,18 @@ impl Worker { tracing::error!("task {task_id} panicked: {message}"); - let result = sqlx::query!( - "INSERT INTO durable.log(task_id, index, message) - VALUES ($1, $2, $3)", - task_id, - LOG_PANIC_INDEX, - format!("task panicked: {message}\n") - ) - .execute(&shared.pool) + let result = async { + let mut conn = shared.pool.acquire().await?; + shared + .storage + .insert_log( + &mut conn, + task_id, + LOG_PANIC_INDEX, + &format!("task panicked: {message}\n"), + ) + .await + } .await; if let Err(e) = result { @@ -1216,18 +1067,8 @@ impl Worker { }); } TaskStatus::ExitSuccess => { - sqlx::query!( - "UPDATE durable.task - SET state = 'complete', - completed_at = CURRENT_TIMESTAMP, - running_on = NULL, - wasm = NULL - WHERE id = $1 - ", - task_id - ) - .execute(&shared.pool) - .await?; + let mut conn = shared.pool.acquire().await?; + shared.storage.mark_task_complete(&mut conn, task_id).await?; shared.metrics.task_complete.increment(1); shared.scheduler.notify(ScheduleEvent::TaskCompleted { @@ -1236,17 +1077,8 @@ impl Worker { }); } TaskStatus::ExitFailure => { - sqlx::query!( - "UPDATE durable.task - SET state = 'failed', - completed_at = CURRENT_TIMESTAMP, - running_on = NULL, - wasm = NULL - WHERE id = $1", - task_id - ) - .execute(&shared.pool) - .await?; + let mut conn = shared.pool.acquire().await?; + shared.storage.mark_task_failed(&mut conn, task_id).await?; shared.metrics.task_failed.increment(1); shared.scheduler.notify(ScheduleEvent::TaskCompleted { @@ -1296,15 +1128,17 @@ impl Worker { // once. Compiling one is an expensive operation, so if let component = component .get_or_compute(|| async { - let record = sqlx::query!("SELECT wasm FROM durable.wasm WHERE id = $1", task.wasm) - .fetch_one(&shared.pool) + let mut conn = shared.pool.acquire().await.map_err(anyhow::Error::from)?; + let wasm = shared + .storage + .fetch_wasm_blob(&mut conn, task.wasm) .await .map_err(anyhow::Error::from)?; + drop(conn); // If an error occurs then we just allow ourselves to proceed anyway. let _permit = shared.compile_sema.acquire().await; - let wasm = record.wasm; let start = Instant::now(); let component = tokio::task::spawn_blocking({ let engine = engine.clone(); @@ -1328,21 +1162,10 @@ impl Worker { }) .await?; - let events = sqlx::query_as!( - RecordedEvent, - r#" - SELECT - index, - label, - value as "value!: Json>" - FROM durable.event - WHERE task_id = $1 - ORDER BY index ASC - LIMIT 1000 - "#, - task.id - ) - .fetch_all(&shared.pool) + let events = async { + let mut conn = shared.pool.acquire().await?; + shared.storage.fetch_recorded_events(&mut conn, task.id).await + } .await .unwrap_or_default(); @@ -1411,16 +1234,16 @@ impl Worker { logs.trim_end() ); - if let Err(e) = sqlx::query!( - "INSERT INTO durable.log(task_id, index, message) - VALUES ($1, $2, $3)", - task_id, - index, - logs - ) - .execute(&shared.pool) - .await - { + let result = async { + let mut conn = shared.pool.acquire().await?; + shared + .storage + .insert_log(&mut conn, task_id, index, &logs) + .await + } + .await; + + if let Err(e) = result { tracing::error!("failed to save remaining logs to the database: {e}"); } } @@ -1431,17 +1254,13 @@ impl Worker { tracing::warn!("task failed to execute with an error: {message}"); - let result = sqlx::query!( - "INSERT INTO durable.log(task_id, index, message) - VALUES ($1, $2, $3) - ON CONFLICT ON CONSTRAINT log_pkey DO UPDATE - SET message = $3 - ", - task_id, - LOG_ERROR_INDEX, - message - ) - .execute(&shared.pool) + let result = async { + let mut conn = shared.pool.acquire().await?; + shared + .storage + .upsert_log_error(&mut conn, task_id, LOG_ERROR_INDEX, &message) + .await + } .await; if let Err(e) = result { From cbcab1a960614e27ab6ba8fbf0b267cb061a6797 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 5 May 2026 06:19:21 +0000 Subject: [PATCH 2/6] Apply rustfmt --- .../src/plugin/durable/notify.rs | 1 - crates/durable-runtime/src/storage/mod.rs | 22 +++++-------------- crates/durable-runtime/src/storage/pg.rs | 5 +---- crates/durable-runtime/src/worker.rs | 20 +++++++++++------ 4 files changed, 20 insertions(+), 28 deletions(-) diff --git a/crates/durable-runtime/src/plugin/durable/notify.rs b/crates/durable-runtime/src/plugin/durable/notify.rs index 90c1327..556a1b2 100644 --- a/crates/durable-runtime/src/plugin/durable/notify.rs +++ b/crates/durable-runtime/src/plugin/durable/notify.rs @@ -321,4 +321,3 @@ impl<'de> serde::Deserialize<'de> for NotifyError { RemoteNotifyError::deserialize(de) } } - diff --git a/crates/durable-runtime/src/storage/mod.rs b/crates/durable-runtime/src/storage/mod.rs index f014882..a8b1b1b 100644 --- a/crates/durable-runtime/src/storage/mod.rs +++ b/crates/durable-runtime/src/storage/mod.rs @@ -15,8 +15,6 @@ mod pg; -pub(crate) use self::pg::PgStorage; - use async_trait::async_trait; use chrono::{DateTime, Utc}; use serde_json::value::RawValue; @@ -25,6 +23,7 @@ use sqlx::postgres::PgQueryResult; use sqlx::types::Json; use sqlx::PgConnection; +pub(crate) use self::pg::PgStorage; use crate::task::RecordedEvent; use crate::worker::TaskData; @@ -93,11 +92,8 @@ pub(crate) trait Storage: Send + Sync + 'static { /// Refresh the heartbeat for the given worker. Returns `true` if the row /// still exists. - async fn heartbeat_worker( - &self, - conn: &mut PgConnection, - worker_id: i64, - ) -> sqlx::Result; + async fn heartbeat_worker(&self, conn: &mut PgConnection, worker_id: i64) + -> sqlx::Result; /// Delete the worker we are following if its heartbeat has expired. async fn delete_following_expired_worker( @@ -140,10 +136,7 @@ pub(crate) trait Storage: Send + Sync + 'static { ) -> sqlx::Result; /// Return the earliest `wakeup_at` of any currently suspended task. - async fn next_wakeup_at( - &self, - conn: &mut PgConnection, - ) -> sqlx::Result>>; + async fn next_wakeup_at(&self, conn: &mut PgConnection) -> sqlx::Result>>; /// Delete a batch of completed tasks older than `cleanup_age`. async fn cleanup_old_tasks( @@ -212,11 +205,8 @@ pub(crate) trait Storage: Send + Sync + 'static { // --------------------------------------------------------------------- /// Fetch the wasm bytecode for a stored program. - async fn fetch_wasm_blob( - &self, - conn: &mut PgConnection, - wasm_id: i64, - ) -> sqlx::Result>; + async fn fetch_wasm_blob(&self, conn: &mut PgConnection, wasm_id: i64) + -> sqlx::Result>; /// Load up to 1000 recorded events for a task, used for replay. async fn fetch_recorded_events( diff --git a/crates/durable-runtime/src/storage/pg.rs b/crates/durable-runtime/src/storage/pg.rs index 446f047..4f6c240 100644 --- a/crates/durable-runtime/src/storage/pg.rs +++ b/crates/durable-runtime/src/storage/pg.rs @@ -199,10 +199,7 @@ impl Storage for PgStorage { .await } - async fn next_wakeup_at( - &self, - conn: &mut PgConnection, - ) -> sqlx::Result>> { + async fn next_wakeup_at(&self, conn: &mut PgConnection) -> sqlx::Result>> { let record = sqlx::query!( r#" SELECT wakeup_at as "wakeup_at!" diff --git a/crates/durable-runtime/src/worker.rs b/crates/durable-runtime/src/worker.rs index 5f9bfe0..fe80422 100644 --- a/crates/durable-runtime/src/worker.rs +++ b/crates/durable-runtime/src/worker.rs @@ -432,7 +432,10 @@ impl Worker { .await; let mut conn = shared.pool.acquire().await?; - let alive = shared.storage.heartbeat_worker(&mut conn, worker_id).await?; + let alive = shared + .storage + .heartbeat_worker(&mut conn, worker_id) + .await?; drop(conn); // Our record is gone from the database. This means that some other worker @@ -594,10 +597,7 @@ impl Worker { // If we don't do that all the rows here get the same random number. let result = shared .storage - .wake_suspended_tasks( - &mut conn, - shared.config.suspend_margin.into_pg_interval(), - ) + .wake_suspended_tasks(&mut conn, shared.config.suspend_margin.into_pg_interval()) .await?; let count = result.rows_affected(); @@ -1068,7 +1068,10 @@ impl Worker { } TaskStatus::ExitSuccess => { let mut conn = shared.pool.acquire().await?; - shared.storage.mark_task_complete(&mut conn, task_id).await?; + shared + .storage + .mark_task_complete(&mut conn, task_id) + .await?; shared.metrics.task_complete.increment(1); shared.scheduler.notify(ScheduleEvent::TaskCompleted { @@ -1164,7 +1167,10 @@ impl Worker { let events = async { let mut conn = shared.pool.acquire().await?; - shared.storage.fetch_recorded_events(&mut conn, task.id).await + shared + .storage + .fetch_recorded_events(&mut conn, task.id) + .await } .await .unwrap_or_default(); From 53ee1292d01e14381356ad9110f624d6fcef5af3 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 5 May 2026 06:30:12 +0000 Subject: [PATCH 3/6] Address clippy lints Use auto-deref for &mut Transaction at four call sites in plugin/durable/notify.rs (clippy::explicit_auto_deref) and replace a match with Option::unwrap_or in worker.rs::load_leader_id (clippy::manual_unwrap_or). --- crates/durable-runtime/src/plugin/durable/notify.rs | 8 ++++---- crates/durable-runtime/src/worker.rs | 5 +---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/crates/durable-runtime/src/plugin/durable/notify.rs b/crates/durable-runtime/src/plugin/durable/notify.rs index 556a1b2..8125f54 100644 --- a/crates/durable-runtime/src/plugin/durable/notify.rs +++ b/crates/durable-runtime/src/plugin/durable/notify.rs @@ -79,7 +79,7 @@ impl Host for Task { self.state .storage() - .suspend_task_no_wakeup(&mut *tx, self.task_id()) + .suspend_task_no_wakeup(&mut tx, self.task_id()) .await?; if poll_notification(&mut *self, &mut tx).await?.is_some() { @@ -193,7 +193,7 @@ impl Host for Task { self.state .storage() - .suspend_task_no_wakeup(&mut *tx, self.task_id()) + .suspend_task_no_wakeup(&mut tx, self.task_id()) .await?; if poll_notification(&mut *self, &mut tx).await?.is_some() { @@ -241,7 +241,7 @@ impl Host for Task { // Note: We lock the row here so that concurrent notification polls // cannot barge in here. - let state = storage.fetch_task_state_locked(&mut **tx, task).await?; + let state = storage.fetch_task_state_locked(&mut *tx, task).await?; match state { Some(TaskState::Complete | TaskState::Failed) => { @@ -252,7 +252,7 @@ impl Host for Task { } let result = storage - .insert_notification(&mut **tx, task, &event, Json(json)) + .insert_notification(&mut *tx, task, &event, Json(json)) .await; match result { diff --git a/crates/durable-runtime/src/worker.rs b/crates/durable-runtime/src/worker.rs index fe80422..cab91d8 100644 --- a/crates/durable-runtime/src/worker.rs +++ b/crates/durable-runtime/src/worker.rs @@ -847,10 +847,7 @@ impl Worker { let record = self.shared.storage.load_leader_id(&mut conn).await?; drop(conn); - let new_leader = match record { - Some(id) => id, - None => -1, - }; + let new_leader = record.unwrap_or(-1); self.shared.leader.store(new_leader); self.shared From 59a599f0499550fd4327bb53766afb253df0264a Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 5 May 2026 16:55:40 +0000 Subject: [PATCH 4/6] Clean up sqlx offline cache * Add the missing cache entry for `SELECT state = 'suspended' as "state!"`, used at 5 call sites in `durable-test/tests/it/{notify,dst_notify}.rs`. Without it, those tests failed to compile under `SQLX_OFFLINE=true`. * Delete 21 unused cache entries left over from the storage refactor. Most were entries for the old SQL strings whose surrounding indentation changed when the queries moved into trait-method bodies in `crates/durable-runtime/src/storage/pg.rs`; the rest were pre-existing orphans. After this commit, every entry under `.sqlx/` matches a query somewhere in the workspace and every macro-checked query has a matching entry. --- ...7c90c9c71d40e32432cf173a7fd6fb77c5ce8.json | 14 -------- ...769c2b49c2c84b8e4fc14ed7ec17c3619b1f0.json | 14 -------- ...7fe4d597f323487706d6b6b0b52d2ff214339.json | 15 -------- ...0dfb15a51401a24b06c1359ab4899e7d968ce.json | 20 ----------- ...e2f4b62d2ad6c37da62a6a00580100b06e2a1.json | 14 -------- ...8c2210d9ff9eb79c1224f2ce1d6a33fcbf247.json | 12 ------- ...99b5b99c6986016d7e0267e3787201a987c4a.json | 15 -------- ...0a48574aa64161b0ccc37be8bb58deaff5783.json | 15 -------- ...64e31d82a666249c8cb1bbca40760bd7b2724.json | 16 --------- ...4abea66609b34002ecc94dd719504429e1ebc.json | 16 --------- ...d99b482e3540944f313d1652af9953c8a3e41.json | 16 --------- ...ccde7f7b473d163b428b5ba542b04a0d61796.json | 16 --------- ...399a11915c903a691f57f5bf806a1ac72fc3d.json | 22 ------------ ...dab8715d8d407b5b7f2f73ba9a374d8b1eee6.json | 28 --------------- ...bcc2f6b9c247e52e92e67cca74772d69b2de0.json | 34 ------------------ ...38178092c2ba065b7c7f367ce1cd2a90d2dce.json | 15 -------- ...d04204dee2e7fc10b5d4a7a2fd0ae0f89dd0f.json | 15 -------- ...a10235ff73813bda043108ec1d3a87793b3d7.json | 16 --------- ...16d53ef5ec74e1e797250141dc2eaef7f493a.json | 22 ++++++++++++ ...60a3a997e7c7c05ce7adffa01d627e4f75422.json | 35 ------------------- ...aec69069e7abf7188f9b2dcb048fa02fe5651.json | 14 -------- ...5a392857ece1061614b3cfb47c7c55aab7c7e.json | 14 -------- 22 files changed, 22 insertions(+), 376 deletions(-) delete mode 100644 .sqlx/query-20d16d2914bc03e3bdd46f015697c90c9c71d40e32432cf173a7fd6fb77c5ce8.json delete mode 100644 .sqlx/query-3e5ac013b44f79280645f5b047c769c2b49c2c84b8e4fc14ed7ec17c3619b1f0.json delete mode 100644 .sqlx/query-5d6584f635dd607563d134d3d0f7fe4d597f323487706d6b6b0b52d2ff214339.json delete mode 100644 .sqlx/query-67f92dc0e6da8c29a559a979b460dfb15a51401a24b06c1359ab4899e7d968ce.json delete mode 100644 .sqlx/query-775263f590071917b90b555d438e2f4b62d2ad6c37da62a6a00580100b06e2a1.json delete mode 100644 .sqlx/query-7a204104ee904fe7004337a7e7f8c2210d9ff9eb79c1224f2ce1d6a33fcbf247.json delete mode 100644 .sqlx/query-7c8a61cb5415f825ae1498e222b99b5b99c6986016d7e0267e3787201a987c4a.json delete mode 100644 .sqlx/query-8646787e97a2e9c59fa9a1f2f510a48574aa64161b0ccc37be8bb58deaff5783.json delete mode 100644 .sqlx/query-8e086d78a75c54b63c96225597764e31d82a666249c8cb1bbca40760bd7b2724.json delete mode 100644 .sqlx/query-b1b5852c0b0f5767e296ba99a454abea66609b34002ecc94dd719504429e1ebc.json delete mode 100644 .sqlx/query-b2cb7dae793f63b5c4af04f55ced99b482e3540944f313d1652af9953c8a3e41.json delete mode 100644 .sqlx/query-b7682529f96f192a7fd8d0aabd4ccde7f7b473d163b428b5ba542b04a0d61796.json delete mode 100644 .sqlx/query-bf5f642e0af7d9c702337836160399a11915c903a691f57f5bf806a1ac72fc3d.json delete mode 100644 .sqlx/query-c6857347c4dfab3aec1bdfc7323dab8715d8d407b5b7f2f73ba9a374d8b1eee6.json delete mode 100644 .sqlx/query-ca7ed3f1d6dad66c99c5ff82827bcc2f6b9c247e52e92e67cca74772d69b2de0.json delete mode 100644 .sqlx/query-cf31973a833c40d4d43aa01102e38178092c2ba065b7c7f367ce1cd2a90d2dce.json delete mode 100644 .sqlx/query-dadbe61077d006ce342f60e7c98d04204dee2e7fc10b5d4a7a2fd0ae0f89dd0f.json delete mode 100644 .sqlx/query-df8f78420eee9d8c6e56b7b349ba10235ff73813bda043108ec1d3a87793b3d7.json create mode 100644 .sqlx/query-e7a5f2b6c1fe528b50d8fad456f16d53ef5ec74e1e797250141dc2eaef7f493a.json delete mode 100644 .sqlx/query-eda0e7e4a91f19b4098d3d5ed1160a3a997e7c7c05ce7adffa01d627e4f75422.json delete mode 100644 .sqlx/query-f93b02946393ce2e3b14f732b1daec69069e7abf7188f9b2dcb048fa02fe5651.json delete mode 100644 .sqlx/query-fa2b86c00cdbd5322bda9eef3e35a392857ece1061614b3cfb47c7c55aab7c7e.json diff --git a/.sqlx/query-20d16d2914bc03e3bdd46f015697c90c9c71d40e32432cf173a7fd6fb77c5ce8.json b/.sqlx/query-20d16d2914bc03e3bdd46f015697c90c9c71d40e32432cf173a7fd6fb77c5ce8.json deleted file mode 100644 index 0a8eb3b..0000000 --- a/.sqlx/query-20d16d2914bc03e3bdd46f015697c90c9c71d40e32432cf173a7fd6fb77c5ce8.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE durable.task\n SET running_on = NULL\n WHERE state = 'ready'\n AND running_on = $1\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "20d16d2914bc03e3bdd46f015697c90c9c71d40e32432cf173a7fd6fb77c5ce8" -} diff --git a/.sqlx/query-3e5ac013b44f79280645f5b047c769c2b49c2c84b8e4fc14ed7ec17c3619b1f0.json b/.sqlx/query-3e5ac013b44f79280645f5b047c769c2b49c2c84b8e4fc14ed7ec17c3619b1f0.json deleted file mode 100644 index 7d88cba..0000000 --- a/.sqlx/query-3e5ac013b44f79280645f5b047c769c2b49c2c84b8e4fc14ed7ec17c3619b1f0.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE durable.task\n SET state = 'suspended',\n running_on = NULL\n WHERE id = $1\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "3e5ac013b44f79280645f5b047c769c2b49c2c84b8e4fc14ed7ec17c3619b1f0" -} diff --git a/.sqlx/query-5d6584f635dd607563d134d3d0f7fe4d597f323487706d6b6b0b52d2ff214339.json b/.sqlx/query-5d6584f635dd607563d134d3d0f7fe4d597f323487706d6b6b0b52d2ff214339.json deleted file mode 100644 index c2943b1..0000000 --- a/.sqlx/query-5d6584f635dd607563d134d3d0f7fe4d597f323487706d6b6b0b52d2ff214339.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM durable.worker\n WHERE CURRENT_TIMESTAMP - heartbeat_at > $2\n AND NOT id = $1\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Interval" - ] - }, - "nullable": [] - }, - "hash": "5d6584f635dd607563d134d3d0f7fe4d597f323487706d6b6b0b52d2ff214339" -} diff --git a/.sqlx/query-67f92dc0e6da8c29a559a979b460dfb15a51401a24b06c1359ab4899e7d968ce.json b/.sqlx/query-67f92dc0e6da8c29a559a979b460dfb15a51401a24b06c1359ab4899e7d968ce.json deleted file mode 100644 index c8f43a4..0000000 --- a/.sqlx/query-67f92dc0e6da8c29a559a979b460dfb15a51401a24b06c1359ab4899e7d968ce.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT wakeup_at as \"wakeup_at!\"\n FROM durable.task\n WHERE state = 'suspended'\n AND wakeup_at IS NOT NULL\n ORDER BY wakeup_at ASC\n LIMIT 1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "wakeup_at!", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - true - ] - }, - "hash": "67f92dc0e6da8c29a559a979b460dfb15a51401a24b06c1359ab4899e7d968ce" -} diff --git a/.sqlx/query-775263f590071917b90b555d438e2f4b62d2ad6c37da62a6a00580100b06e2a1.json b/.sqlx/query-775263f590071917b90b555d438e2f4b62d2ad6c37da62a6a00580100b06e2a1.json deleted file mode 100644 index 789e5b4..0000000 --- a/.sqlx/query-775263f590071917b90b555d438e2f4b62d2ad6c37da62a6a00580100b06e2a1.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE durable.task\n SET state = 'ready',\n wakeup_at = NULL,\n running_on = (\n SELECT id\n FROM durable.worker\n ORDER BY random() + task.id\n LIMIT 1\n )\n WHERE state = 'suspended'\n AND wakeup_at <= (NOW() - $1::interval)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Interval" - ] - }, - "nullable": [] - }, - "hash": "775263f590071917b90b555d438e2f4b62d2ad6c37da62a6a00580100b06e2a1" -} diff --git a/.sqlx/query-7a204104ee904fe7004337a7e7f8c2210d9ff9eb79c1224f2ce1d6a33fcbf247.json b/.sqlx/query-7a204104ee904fe7004337a7e7f8c2210d9ff9eb79c1224f2ce1d6a33fcbf247.json deleted file mode 100644 index e5871ce..0000000 --- a/.sqlx/query-7a204104ee904fe7004337a7e7f8c2210d9ff9eb79c1224f2ce1d6a33fcbf247.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE durable.task\n SET state = 'ready'\n WHERE state = 'suspended'\n AND EXISTS((\n SELECT task_id\n FROM durable.notification\n WHERE task_id = task.id\n AND created_at < NOW() - '10 minutes'::interval\n ))\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [] - }, - "nullable": [] - }, - "hash": "7a204104ee904fe7004337a7e7f8c2210d9ff9eb79c1224f2ce1d6a33fcbf247" -} diff --git a/.sqlx/query-7c8a61cb5415f825ae1498e222b99b5b99c6986016d7e0267e3787201a987c4a.json b/.sqlx/query-7c8a61cb5415f825ae1498e222b99b5b99c6986016d7e0267e3787201a987c4a.json deleted file mode 100644 index 8e77d49..0000000 --- a/.sqlx/query-7c8a61cb5415f825ae1498e222b99b5b99c6986016d7e0267e3787201a987c4a.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM durable.worker\n WHERE id = $1\n AND CURRENT_TIMESTAMP - heartbeat_at > $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Interval" - ] - }, - "nullable": [] - }, - "hash": "7c8a61cb5415f825ae1498e222b99b5b99c6986016d7e0267e3787201a987c4a" -} diff --git a/.sqlx/query-8646787e97a2e9c59fa9a1f2f510a48574aa64161b0ccc37be8bb58deaff5783.json b/.sqlx/query-8646787e97a2e9c59fa9a1f2f510a48574aa64161b0ccc37be8bb58deaff5783.json deleted file mode 100644 index 1ad93a8..0000000 --- a/.sqlx/query-8646787e97a2e9c59fa9a1f2f510a48574aa64161b0ccc37be8bb58deaff5783.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE durable.task\n SET state = 'ready',\n running_on = NULL\n WHERE id = $1\n AND running_on = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "8646787e97a2e9c59fa9a1f2f510a48574aa64161b0ccc37be8bb58deaff5783" -} diff --git a/.sqlx/query-8e086d78a75c54b63c96225597764e31d82a666249c8cb1bbca40760bd7b2724.json b/.sqlx/query-8e086d78a75c54b63c96225597764e31d82a666249c8cb1bbca40760bd7b2724.json deleted file mode 100644 index 0510ce4..0000000 --- a/.sqlx/query-8e086d78a75c54b63c96225597764e31d82a666249c8cb1bbca40760bd7b2724.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO durable.notification(task_id, event, data)\n VALUES ($1, $2, $3)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Text", - "Jsonb" - ] - }, - "nullable": [] - }, - "hash": "8e086d78a75c54b63c96225597764e31d82a666249c8cb1bbca40760bd7b2724" -} diff --git a/.sqlx/query-b1b5852c0b0f5767e296ba99a454abea66609b34002ecc94dd719504429e1ebc.json b/.sqlx/query-b1b5852c0b0f5767e296ba99a454abea66609b34002ecc94dd719504429e1ebc.json deleted file mode 100644 index ab1c443..0000000 --- a/.sqlx/query-b1b5852c0b0f5767e296ba99a454abea66609b34002ecc94dd719504429e1ebc.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO durable.log(task_id, index, message)\n VALUES ($1, $2, $3)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Int4", - "Text" - ] - }, - "nullable": [] - }, - "hash": "b1b5852c0b0f5767e296ba99a454abea66609b34002ecc94dd719504429e1ebc" -} diff --git a/.sqlx/query-b2cb7dae793f63b5c4af04f55ced99b482e3540944f313d1652af9953c8a3e41.json b/.sqlx/query-b2cb7dae793f63b5c4af04f55ced99b482e3540944f313d1652af9953c8a3e41.json deleted file mode 100644 index 4b9e021..0000000 --- a/.sqlx/query-b2cb7dae793f63b5c4af04f55ced99b482e3540944f313d1652af9953c8a3e41.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO durable.log(task_id, index, message)\n VALUES ($1, $2, $3)\n ON CONFLICT ON CONSTRAINT log_pkey DO UPDATE\n SET message = $3\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Int4", - "Text" - ] - }, - "nullable": [] - }, - "hash": "b2cb7dae793f63b5c4af04f55ced99b482e3540944f313d1652af9953c8a3e41" -} diff --git a/.sqlx/query-b7682529f96f192a7fd8d0aabd4ccde7f7b473d163b428b5ba542b04a0d61796.json b/.sqlx/query-b7682529f96f192a7fd8d0aabd4ccde7f7b473d163b428b5ba542b04a0d61796.json deleted file mode 100644 index abec5c1..0000000 --- a/.sqlx/query-b7682529f96f192a7fd8d0aabd4ccde7f7b473d163b428b5ba542b04a0d61796.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO durable.log(task_id, index, message)\n VALUES ($1, $2, $3)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Int4", - "Text" - ] - }, - "nullable": [] - }, - "hash": "b7682529f96f192a7fd8d0aabd4ccde7f7b473d163b428b5ba542b04a0d61796" -} diff --git a/.sqlx/query-bf5f642e0af7d9c702337836160399a11915c903a691f57f5bf806a1ac72fc3d.json b/.sqlx/query-bf5f642e0af7d9c702337836160399a11915c903a691f57f5bf806a1ac72fc3d.json deleted file mode 100644 index 03872b9..0000000 --- a/.sqlx/query-bf5f642e0af7d9c702337836160399a11915c903a691f57f5bf806a1ac72fc3d.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE durable.worker\n SET heartbeat_at = CURRENT_TIMESTAMP\n WHERE id = $1\n RETURNING id", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false - ] - }, - "hash": "bf5f642e0af7d9c702337836160399a11915c903a691f57f5bf806a1ac72fc3d" -} diff --git a/.sqlx/query-c6857347c4dfab3aec1bdfc7323dab8715d8d407b5b7f2f73ba9a374d8b1eee6.json b/.sqlx/query-c6857347c4dfab3aec1bdfc7323dab8715d8d407b5b7f2f73ba9a374d8b1eee6.json deleted file mode 100644 index 82daa3c..0000000 --- a/.sqlx/query-c6857347c4dfab3aec1bdfc7323dab8715d8d407b5b7f2f73ba9a374d8b1eee6.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH\n prev AS (\n SELECT id, heartbeat_at\n FROM durable.worker\n WHERE id < $1\n ORDER BY id DESC\n LIMIT 1\n ),\n next AS (\n SELECT id, heartbeat_at\n FROM durable.worker\n WHERE NOT id = $1\n ORDER BY id DESC\n LIMIT 1\n ),\n combined AS (\n SELECT * FROM prev\n UNION ALL\n SELECT * FROM next\n )\n SELECT\n id as \"id!\",\n heartbeat_at as \"heartbeat_at!\"\n FROM combined\n ORDER BY id ASC\n LIMIT 1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id!", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "heartbeat_at!", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - null, - null - ] - }, - "hash": "c6857347c4dfab3aec1bdfc7323dab8715d8d407b5b7f2f73ba9a374d8b1eee6" -} diff --git a/.sqlx/query-ca7ed3f1d6dad66c99c5ff82827bcc2f6b9c247e52e92e67cca74772d69b2de0.json b/.sqlx/query-ca7ed3f1d6dad66c99c5ff82827bcc2f6b9c247e52e92e67cca74772d69b2de0.json deleted file mode 100644 index 9d48ca0..0000000 --- a/.sqlx/query-ca7ed3f1d6dad66c99c5ff82827bcc2f6b9c247e52e92e67cca74772d69b2de0.json +++ /dev/null @@ -1,34 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM durable.notification\n WHERE ctid IN (\n SELECT ctid\n FROM durable.notification\n WHERE task_id = $1\n ORDER BY created_at ASC\n LIMIT 1\n FOR UPDATE\n )\n RETURNING\n created_at,\n event,\n data as \"data: Json>\"\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "created_at", - "type_info": "Timestamptz" - }, - { - "ordinal": 1, - "name": "event", - "type_info": "Text" - }, - { - "ordinal": 2, - "name": "data: Json>", - "type_info": "Jsonb" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false, - false, - false - ] - }, - "hash": "ca7ed3f1d6dad66c99c5ff82827bcc2f6b9c247e52e92e67cca74772d69b2de0" -} diff --git a/.sqlx/query-cf31973a833c40d4d43aa01102e38178092c2ba065b7c7f367ce1cd2a90d2dce.json b/.sqlx/query-cf31973a833c40d4d43aa01102e38178092c2ba065b7c7f367ce1cd2a90d2dce.json deleted file mode 100644 index a2bfd3a..0000000 --- a/.sqlx/query-cf31973a833c40d4d43aa01102e38178092c2ba065b7c7f367ce1cd2a90d2dce.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE durable.task\n SET state = 'ready',\n running_on = NULL\n WHERE id = ANY($1::bigint[])\n AND running_on = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8Array", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "cf31973a833c40d4d43aa01102e38178092c2ba065b7c7f367ce1cd2a90d2dce" -} diff --git a/.sqlx/query-dadbe61077d006ce342f60e7c98d04204dee2e7fc10b5d4a7a2fd0ae0f89dd0f.json b/.sqlx/query-dadbe61077d006ce342f60e7c98d04204dee2e7fc10b5d4a7a2fd0ae0f89dd0f.json deleted file mode 100644 index 121240e..0000000 --- a/.sqlx/query-dadbe61077d006ce342f60e7c98d04204dee2e7fc10b5d4a7a2fd0ae0f89dd0f.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM durable.task\n WHERE task.ctid = ANY(ARRAY(\n SELECT ctid\n FROM durable.task\n WHERE completed_at < NOW() - $1::interval\n LIMIT $2\n FOR UPDATE\n ))\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Interval", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "dadbe61077d006ce342f60e7c98d04204dee2e7fc10b5d4a7a2fd0ae0f89dd0f" -} diff --git a/.sqlx/query-df8f78420eee9d8c6e56b7b349ba10235ff73813bda043108ec1d3a87793b3d7.json b/.sqlx/query-df8f78420eee9d8c6e56b7b349ba10235ff73813bda043108ec1d3a87793b3d7.json deleted file mode 100644 index a2d0df5..0000000 --- a/.sqlx/query-df8f78420eee9d8c6e56b7b349ba10235ff73813bda043108ec1d3a87793b3d7.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO durable.log(task_id, index, message)\n VALUES ($1, $2, $3)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Int4", - "Text" - ] - }, - "nullable": [] - }, - "hash": "df8f78420eee9d8c6e56b7b349ba10235ff73813bda043108ec1d3a87793b3d7" -} diff --git a/.sqlx/query-e7a5f2b6c1fe528b50d8fad456f16d53ef5ec74e1e797250141dc2eaef7f493a.json b/.sqlx/query-e7a5f2b6c1fe528b50d8fad456f16d53ef5ec74e1e797250141dc2eaef7f493a.json new file mode 100644 index 0000000..49f225a --- /dev/null +++ b/.sqlx/query-e7a5f2b6c1fe528b50d8fad456f16d53ef5ec74e1e797250141dc2eaef7f493a.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT state = 'suspended' as \"state!\"\n FROM durable.task\n WHERE id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "state!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "e7a5f2b6c1fe528b50d8fad456f16d53ef5ec74e1e797250141dc2eaef7f493a" +} diff --git a/.sqlx/query-eda0e7e4a91f19b4098d3d5ed1160a3a997e7c7c05ce7adffa01d627e4f75422.json b/.sqlx/query-eda0e7e4a91f19b4098d3d5ed1160a3a997e7c7c05ce7adffa01d627e4f75422.json deleted file mode 100644 index 17168bc..0000000 --- a/.sqlx/query-eda0e7e4a91f19b4098d3d5ed1160a3a997e7c7c05ce7adffa01d627e4f75422.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT state as \"state!: TaskState\" \n FROM durable.task\n WHERE task.id = $1\n FOR UPDATE\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "state!: TaskState", - "type_info": { - "Custom": { - "name": "durable.task_state", - "kind": { - "Enum": [ - "ready", - "active", - "suspended", - "complete", - "failed" - ] - } - } - } - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false - ] - }, - "hash": "eda0e7e4a91f19b4098d3d5ed1160a3a997e7c7c05ce7adffa01d627e4f75422" -} diff --git a/.sqlx/query-f93b02946393ce2e3b14f732b1daec69069e7abf7188f9b2dcb048fa02fe5651.json b/.sqlx/query-f93b02946393ce2e3b14f732b1daec69069e7abf7188f9b2dcb048fa02fe5651.json deleted file mode 100644 index 1d1ca65..0000000 --- a/.sqlx/query-f93b02946393ce2e3b14f732b1daec69069e7abf7188f9b2dcb048fa02fe5651.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE durable.task\n SET state = 'complete',\n completed_at = CURRENT_TIMESTAMP,\n running_on = NULL,\n wasm = NULL\n WHERE id = $1\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "f93b02946393ce2e3b14f732b1daec69069e7abf7188f9b2dcb048fa02fe5651" -} diff --git a/.sqlx/query-fa2b86c00cdbd5322bda9eef3e35a392857ece1061614b3cfb47c7c55aab7c7e.json b/.sqlx/query-fa2b86c00cdbd5322bda9eef3e35a392857ece1061614b3cfb47c7c55aab7c7e.json deleted file mode 100644 index 00ca0fb..0000000 --- a/.sqlx/query-fa2b86c00cdbd5322bda9eef3e35a392857ece1061614b3cfb47c7c55aab7c7e.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE durable.task\n SET state = 'failed',\n completed_at = CURRENT_TIMESTAMP,\n running_on = NULL,\n wasm = NULL\n WHERE id = $1", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "fa2b86c00cdbd5322bda9eef3e35a392857ece1061614b3cfb47c7c55aab7c7e" -} From e1dac0602e666679e1ebe91c329313c1b77b3775 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 5 May 2026 18:11:32 +0000 Subject: [PATCH 5/6] Drop redundant doc comments in storage trait Per the project's commenting policy, remove doc comments that just restate the method or type name. Keep ones that explain why behaviour exists, document constraints, or describe non-obvious return semantics. --- crates/durable-runtime/src/storage/mod.rs | 89 ++++------------------- crates/durable-runtime/src/storage/pg.rs | 1 - 2 files changed, 15 insertions(+), 75 deletions(-) diff --git a/crates/durable-runtime/src/storage/mod.rs b/crates/durable-runtime/src/storage/mod.rs index a8b1b1b..cc61433 100644 --- a/crates/durable-runtime/src/storage/mod.rs +++ b/crates/durable-runtime/src/storage/mod.rs @@ -27,7 +27,6 @@ pub(crate) use self::pg::PgStorage; use crate::task::RecordedEvent; use crate::worker::TaskData; -/// The state of a row in `durable.task`. #[derive(Copy, Clone, Debug, Eq, PartialEq, sqlx::Type)] #[sqlx(type_name = "durable.task_state", rename_all = "lowercase")] pub(crate) enum TaskState { @@ -38,22 +37,18 @@ pub(crate) enum TaskState { Failed, } -/// A worker row, as returned by the validate-workers sequencing query. #[derive(Debug)] pub(crate) struct WorkerRecord { pub id: i64, pub heartbeat_at: DateTime, } -/// A row from `durable.event`, as returned when looking up a previously -/// recorded transaction by index. #[derive(Debug)] pub(crate) struct StoredEvent { pub label: String, pub value: Json>, } -/// A row from `durable.notification`, as returned by `poll_notification`. #[derive(Debug)] pub(crate) struct PolledNotification { pub created_at: DateTime, @@ -61,41 +56,25 @@ pub(crate) struct PolledNotification { pub data: Json>, } -/// Database operations issued by the durable runtime. -/// -/// All methods accept a borrowed [`PgConnection`] and leave transaction -/// lifecycle to the caller. See module-level docs for the rationale. #[async_trait] pub(crate) trait Storage: Send + Sync + 'static { - /// Access the underlying connection pool. - /// - /// Used by callers that need to acquire a connection or open a - /// transaction directly. Currently unused — transaction lifecycle is - /// driven through `SharedState::pool` until phase 2 of the storage - /// abstraction. + /// Currently unused — transaction lifecycle is driven through + /// `SharedState::pool` until phase 2 of the storage abstraction. #[allow(dead_code)] fn pool(&self) -> &sqlx::PgPool; - // --------------------------------------------------------------------- - // Worker lifecycle - // --------------------------------------------------------------------- - - /// Insert a new worker row and return its id. async fn insert_worker(&self, conn: &mut PgConnection) -> sqlx::Result; - /// Delete the worker row with the given id. async fn delete_worker( &self, conn: &mut PgConnection, worker_id: i64, ) -> sqlx::Result; - /// Refresh the heartbeat for the given worker. Returns `true` if the row - /// still exists. + /// Returns `true` if the row still exists. async fn heartbeat_worker(&self, conn: &mut PgConnection, worker_id: i64) -> sqlx::Result; - /// Delete the worker we are following if its heartbeat has expired. async fn delete_following_expired_worker( &self, conn: &mut PgConnection, @@ -103,7 +82,6 @@ pub(crate) trait Storage: Send + Sync + 'static { timeout: PgInterval, ) -> sqlx::Result; - /// Delete every other expired worker, leaving the current one alone. async fn delete_other_expired_workers( &self, conn: &mut PgConnection, @@ -111,34 +89,24 @@ pub(crate) trait Storage: Send + Sync + 'static { timeout: PgInterval, ) -> sqlx::Result; - /// Return the next worker in the validation sequence relative to - /// `worker_id`. See `Worker::validate_workers` for the algorithm. + /// See `Worker::validate_workers` for the algorithm. async fn next_worker_in_sequence( &self, conn: &mut PgConnection, worker_id: i64, ) -> sqlx::Result>; - /// Return the id of the current cluster leader (the oldest worker), if - /// any. + /// The leader is the oldest worker. async fn load_leader_id(&self, conn: &mut PgConnection) -> sqlx::Result>; - // --------------------------------------------------------------------- - // Task scheduling - // --------------------------------------------------------------------- - - /// Wake any suspended tasks whose `wakeup_at` has elapsed past the - /// configured suspend margin. async fn wake_suspended_tasks( &self, conn: &mut PgConnection, suspend_margin: PgInterval, ) -> sqlx::Result; - /// Return the earliest `wakeup_at` of any currently suspended task. async fn next_wakeup_at(&self, conn: &mut PgConnection) -> sqlx::Result>>; - /// Delete a batch of completed tasks older than `cleanup_age`. async fn cleanup_old_tasks( &self, conn: &mut PgConnection, @@ -146,14 +114,11 @@ pub(crate) trait Storage: Send + Sync + 'static { limit: i64, ) -> sqlx::Result; - /// Force-resume any tasks stuck waiting on notifications older than 10 - /// minutes. async fn unwedge_stuck_notifications( &self, conn: &mut PgConnection, ) -> sqlx::Result; - /// Reset a batch of failed tasks back to the `ready` state. async fn reset_failed_tasks( &self, conn: &mut PgConnection, @@ -161,7 +126,6 @@ pub(crate) trait Storage: Send + Sync + 'static { worker_id: i64, ) -> sqlx::Result; - /// Atomically claim up to `allowed` new tasks for `worker_id`. async fn claim_tasks( &self, conn: &mut PgConnection, @@ -169,16 +133,14 @@ pub(crate) trait Storage: Send + Sync + 'static { allowed: i64, ) -> sqlx::Result>; - /// Release any ready tasks currently assigned to `worker_id`. Used when - /// a worker hits its capacity before committing a claim. + /// Used when a worker hits its capacity before committing a claim. async fn release_owned_ready_tasks( &self, conn: &mut PgConnection, worker_id: i64, ) -> sqlx::Result; - /// Reset a single task back to `ready` so it can be picked up again, - /// only if it is still owned by the given worker. + /// Only resets the task if it is still owned by `worker_id`. async fn reset_task_for_retry( &self, conn: &mut PgConnection, @@ -186,36 +148,28 @@ pub(crate) trait Storage: Send + Sync + 'static { worker_id: i64, ) -> sqlx::Result; - /// Mark a task as successfully completed. async fn mark_task_complete( &self, conn: &mut PgConnection, task_id: i64, ) -> sqlx::Result; - /// Mark a task as failed. async fn mark_task_failed( &self, conn: &mut PgConnection, task_id: i64, ) -> sqlx::Result; - // --------------------------------------------------------------------- - // Task execution - // --------------------------------------------------------------------- - - /// Fetch the wasm bytecode for a stored program. async fn fetch_wasm_blob(&self, conn: &mut PgConnection, wasm_id: i64) -> sqlx::Result>; - /// Load up to 1000 recorded events for a task, used for replay. + /// Loads up to 1000 events; used to replay completed transactions. async fn fetch_recorded_events( &self, conn: &mut PgConnection, task_id: i64, ) -> sqlx::Result>; - /// Look up a single recorded event for a task by its index. async fn fetch_event_at_index( &self, conn: &mut PgConnection, @@ -223,8 +177,8 @@ pub(crate) trait Storage: Send + Sync + 'static { index: i32, ) -> sqlx::Result>; - /// Insert an event row, optionally insert a log row, and return the - /// task's `running_on` value — all in one round-trip. + /// Inserts an event row, optionally inserts a log row, and returns the + /// task's `running_on` value in a single round-trip. async fn commit_event_with_log( &self, conn: &mut PgConnection, @@ -235,7 +189,6 @@ pub(crate) trait Storage: Send + Sync + 'static { message: Option<&str>, ) -> sqlx::Result>; - /// Suspend a task and set an optional wakeup timestamp. async fn suspend_task( &self, conn: &mut PgConnection, @@ -243,7 +196,7 @@ pub(crate) trait Storage: Send + Sync + 'static { wakeup_at: Option>, ) -> sqlx::Result; - /// Suspend a task without touching `wakeup_at`. Used by the notification + /// Suspends without touching `wakeup_at`. Used by the notification /// blocking path, which must not clobber an existing wakeup deadline. async fn suspend_task_no_wakeup( &self, @@ -251,11 +204,6 @@ pub(crate) trait Storage: Send + Sync + 'static { task_id: i64, ) -> sqlx::Result; - // --------------------------------------------------------------------- - // Logs - // --------------------------------------------------------------------- - - /// Append a row to `durable.log`. async fn insert_log( &self, conn: &mut PgConnection, @@ -264,9 +212,8 @@ pub(crate) trait Storage: Send + Sync + 'static { message: &str, ) -> sqlx::Result; - /// Insert a log row, replacing the message if a row already exists at - /// the same `(task_id, index)`. Used to record the final error message - /// for a task. + /// Replaces the message if a row already exists at `(task_id, index)`. + /// Used to record the final error message for a task. async fn upsert_log_error( &self, conn: &mut PgConnection, @@ -275,26 +222,20 @@ pub(crate) trait Storage: Send + Sync + 'static { message: &str, ) -> sqlx::Result; - // --------------------------------------------------------------------- - // Notifications - // --------------------------------------------------------------------- - - /// Pop the oldest notification for a task, with row-level locking. async fn poll_notification( &self, conn: &mut PgConnection, task_id: i64, ) -> sqlx::Result>; - /// Look up a task's state, locking the row for the duration of the - /// surrounding transaction. Used to safely insert a notification. + /// Locks the row for the duration of the surrounding transaction so a + /// concurrent notification poll cannot consume the task. async fn fetch_task_state_locked( &self, conn: &mut PgConnection, task_id: i64, ) -> sqlx::Result>; - /// Insert a notification row. async fn insert_notification( &self, conn: &mut PgConnection, diff --git a/crates/durable-runtime/src/storage/pg.rs b/crates/durable-runtime/src/storage/pg.rs index 4f6c240..1406b9c 100644 --- a/crates/durable-runtime/src/storage/pg.rs +++ b/crates/durable-runtime/src/storage/pg.rs @@ -15,7 +15,6 @@ use super::{PolledNotification, Storage, StoredEvent, TaskState, WorkerRecord}; use crate::task::RecordedEvent; use crate::worker::TaskData; -/// Postgres-backed storage. #[derive(Clone)] pub(crate) struct PgStorage { #[allow(dead_code)] From 74bc085ea2bc40789e660b350a6ced9523cb6e1d Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 5 May 2026 18:15:12 +0000 Subject: [PATCH 6/6] Drop impl-detail constant from fetch_recorded_events doc --- crates/durable-runtime/src/storage/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/durable-runtime/src/storage/mod.rs b/crates/durable-runtime/src/storage/mod.rs index cc61433..1aedfee 100644 --- a/crates/durable-runtime/src/storage/mod.rs +++ b/crates/durable-runtime/src/storage/mod.rs @@ -163,7 +163,6 @@ pub(crate) trait Storage: Send + Sync + 'static { async fn fetch_wasm_blob(&self, conn: &mut PgConnection, wasm_id: i64) -> sqlx::Result>; - /// Loads up to 1000 events; used to replay completed transactions. async fn fetch_recorded_events( &self, conn: &mut PgConnection,