diff --git a/.sqlx/query-0404a682784ca66d2c778acb3b0368aa6e0fcbe476ca57841ce2c0e5e995d1e9.json b/.sqlx/query-0404a682784ca66d2c778acb3b0368aa6e0fcbe476ca57841ce2c0e5e995d1e9.json new file mode 100644 index 0000000..3c7a79b --- /dev/null +++ b/.sqlx/query-0404a682784ca66d2c778acb3b0368aa6e0fcbe476ca57841ce2c0e5e995d1e9.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE durable.task\n SET state = 'complete',\n completed_at = CURRENT_TIMESTAMP,\n running_on = NULL,\n wasm = NULL\n WHERE id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "0404a682784ca66d2c778acb3b0368aa6e0fcbe476ca57841ce2c0e5e995d1e9" +} \ No newline at end of file diff --git a/.sqlx/query-20d16d2914bc03e3bdd46f015697c90c9c71d40e32432cf173a7fd6fb77c5ce8.json b/.sqlx/query-20d16d2914bc03e3bdd46f015697c90c9c71d40e32432cf173a7fd6fb77c5ce8.json deleted file mode 100644 index 0a8eb3b..0000000 --- a/.sqlx/query-20d16d2914bc03e3bdd46f015697c90c9c71d40e32432cf173a7fd6fb77c5ce8.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE durable.task\n SET running_on = NULL\n WHERE state = 'ready'\n AND running_on = $1\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "20d16d2914bc03e3bdd46f015697c90c9c71d40e32432cf173a7fd6fb77c5ce8" -} diff --git a/.sqlx/query-22fd40e5440beb2915e95637404e32c48eef6bddbd5192c1677772cbf51dbe84.json b/.sqlx/query-22fd40e5440beb2915e95637404e32c48eef6bddbd5192c1677772cbf51dbe84.json new file mode 100644 index 0000000..a6b33ae --- /dev/null +++ b/.sqlx/query-22fd40e5440beb2915e95637404e32c48eef6bddbd5192c1677772cbf51dbe84.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE durable.task\n SET running_on = NULL\n WHERE state = 'ready'\n AND running_on = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "22fd40e5440beb2915e95637404e32c48eef6bddbd5192c1677772cbf51dbe84" +} \ No newline at end of file diff --git a/.sqlx/query-2d3b7402df579487b699a1b50b5dfc75722e027524c753738323bea2f343fb56.json b/.sqlx/query-2d3b7402df579487b699a1b50b5dfc75722e027524c753738323bea2f343fb56.json new file mode 100644 index 0000000..9ad6465 --- /dev/null +++ b/.sqlx/query-2d3b7402df579487b699a1b50b5dfc75722e027524c753738323bea2f343fb56.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE durable.task\n SET state = 'ready'\n WHERE state = 'suspended'\n AND EXISTS((\n SELECT task_id\n FROM durable.notification\n WHERE task_id = task.id\n AND created_at < NOW() - '10 minutes'::interval\n ))\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "2d3b7402df579487b699a1b50b5dfc75722e027524c753738323bea2f343fb56" +} \ No newline at end of file diff --git a/.sqlx/query-3402a2d91a23aa927386d4825768a50c5731c6b9bb4dfae298197349827113b6.json b/.sqlx/query-3402a2d91a23aa927386d4825768a50c5731c6b9bb4dfae298197349827113b6.json new file mode 100644 index 0000000..561818b --- /dev/null +++ b/.sqlx/query-3402a2d91a23aa927386d4825768a50c5731c6b9bb4dfae298197349827113b6.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE durable.task\n SET state = 'suspended',\n running_on = NULL\n WHERE id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "3402a2d91a23aa927386d4825768a50c5731c6b9bb4dfae298197349827113b6" +} \ No newline at end of file diff --git a/.sqlx/query-3cd8e92d208177ffc7168ab4bf19451793d9f5f8b5af7be67c348d952216ac92.json b/.sqlx/query-3cd8e92d208177ffc7168ab4bf19451793d9f5f8b5af7be67c348d952216ac92.json new file mode 100644 index 0000000..9d6798f --- /dev/null +++ b/.sqlx/query-3cd8e92d208177ffc7168ab4bf19451793d9f5f8b5af7be67c348d952216ac92.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM durable.task\n WHERE task.ctid = ANY(ARRAY(\n SELECT ctid\n FROM durable.task\n WHERE completed_at < NOW() - $1::interval\n LIMIT $2\n FOR UPDATE\n ))\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Interval", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "3cd8e92d208177ffc7168ab4bf19451793d9f5f8b5af7be67c348d952216ac92" +} \ No newline at end of file diff --git a/.sqlx/query-3e5ac013b44f79280645f5b047c769c2b49c2c84b8e4fc14ed7ec17c3619b1f0.json b/.sqlx/query-3e5ac013b44f79280645f5b047c769c2b49c2c84b8e4fc14ed7ec17c3619b1f0.json deleted file mode 100644 index 7d88cba..0000000 --- a/.sqlx/query-3e5ac013b44f79280645f5b047c769c2b49c2c84b8e4fc14ed7ec17c3619b1f0.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE durable.task\n SET state = 'suspended',\n running_on = NULL\n WHERE id = $1\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "3e5ac013b44f79280645f5b047c769c2b49c2c84b8e4fc14ed7ec17c3619b1f0" -} diff --git a/.sqlx/query-46866c27afbd80cda4ce93a15dc9597bd10ac826027f2819e6f6f38fa8ad10be.json b/.sqlx/query-46866c27afbd80cda4ce93a15dc9597bd10ac826027f2819e6f6f38fa8ad10be.json new file mode 100644 index 0000000..ca425ff --- /dev/null +++ b/.sqlx/query-46866c27afbd80cda4ce93a15dc9597bd10ac826027f2819e6f6f38fa8ad10be.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE durable.task\n SET state = 'ready',\n running_on = NULL\n WHERE id = $1\n AND running_on = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "46866c27afbd80cda4ce93a15dc9597bd10ac826027f2819e6f6f38fa8ad10be" +} \ No newline at end of file diff --git a/.sqlx/query-bf5f642e0af7d9c702337836160399a11915c903a691f57f5bf806a1ac72fc3d.json b/.sqlx/query-50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9.json similarity index 53% rename from .sqlx/query-bf5f642e0af7d9c702337836160399a11915c903a691f57f5bf806a1ac72fc3d.json rename to .sqlx/query-50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9.json index 03872b9..060d48b 100644 --- a/.sqlx/query-bf5f642e0af7d9c702337836160399a11915c903a691f57f5bf806a1ac72fc3d.json +++ b/.sqlx/query-50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE durable.worker\n SET heartbeat_at = CURRENT_TIMESTAMP\n WHERE id = $1\n RETURNING id", + "query": "UPDATE durable.worker\n SET heartbeat_at = CURRENT_TIMESTAMP\n WHERE id = $1\n RETURNING id", "describe": { "columns": [ { @@ -18,5 +18,5 @@ false ] }, - "hash": "bf5f642e0af7d9c702337836160399a11915c903a691f57f5bf806a1ac72fc3d" -} + "hash": "50e935d0a6e5dd395bb86c7866c01ebdb7cc711e0c41b0f116b518b3b26156c9" +} \ No newline at end of file diff --git a/.sqlx/query-5d6584f635dd607563d134d3d0f7fe4d597f323487706d6b6b0b52d2ff214339.json b/.sqlx/query-5d6584f635dd607563d134d3d0f7fe4d597f323487706d6b6b0b52d2ff214339.json deleted file mode 100644 index c2943b1..0000000 --- a/.sqlx/query-5d6584f635dd607563d134d3d0f7fe4d597f323487706d6b6b0b52d2ff214339.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM durable.worker\n WHERE CURRENT_TIMESTAMP - heartbeat_at > $2\n AND NOT id = $1\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Interval" - ] - }, - "nullable": [] - }, - "hash": "5d6584f635dd607563d134d3d0f7fe4d597f323487706d6b6b0b52d2ff214339" -} diff --git a/.sqlx/query-6586483f7d9bff87477cae679ad2c6894b1eb27718b2e01878138d05c9fdb0e7.json b/.sqlx/query-6586483f7d9bff87477cae679ad2c6894b1eb27718b2e01878138d05c9fdb0e7.json new file mode 100644 index 0000000..0964212 --- /dev/null +++ b/.sqlx/query-6586483f7d9bff87477cae679ad2c6894b1eb27718b2e01878138d05c9fdb0e7.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH\n prev AS (\n SELECT id, heartbeat_at\n FROM durable.worker\n WHERE id < $1\n ORDER BY id DESC\n LIMIT 1\n ),\n next AS (\n SELECT id, heartbeat_at\n FROM durable.worker\n WHERE NOT id = $1\n ORDER BY id DESC\n LIMIT 1\n ),\n combined AS (\n SELECT * FROM prev\n UNION ALL\n SELECT * FROM next\n )\n SELECT\n id as \"id!\",\n heartbeat_at as \"heartbeat_at!\"\n FROM combined\n ORDER BY id ASC\n LIMIT 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "heartbeat_at!", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "6586483f7d9bff87477cae679ad2c6894b1eb27718b2e01878138d05c9fdb0e7" +} \ No newline at end of file diff --git a/.sqlx/query-67f92dc0e6da8c29a559a979b460dfb15a51401a24b06c1359ab4899e7d968ce.json b/.sqlx/query-67f92dc0e6da8c29a559a979b460dfb15a51401a24b06c1359ab4899e7d968ce.json deleted file mode 100644 index c8f43a4..0000000 --- a/.sqlx/query-67f92dc0e6da8c29a559a979b460dfb15a51401a24b06c1359ab4899e7d968ce.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT wakeup_at as \"wakeup_at!\"\n FROM durable.task\n WHERE state = 'suspended'\n AND wakeup_at IS NOT NULL\n ORDER BY wakeup_at ASC\n LIMIT 1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "wakeup_at!", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - true - ] - }, - "hash": "67f92dc0e6da8c29a559a979b460dfb15a51401a24b06c1359ab4899e7d968ce" -} diff --git a/.sqlx/query-685f6bb7abf389c452400578793c41e2d3a82eb19f95d8e2a3cf721298c965d2.json b/.sqlx/query-685f6bb7abf389c452400578793c41e2d3a82eb19f95d8e2a3cf721298c965d2.json new file mode 100644 index 0000000..9b84ab5 --- /dev/null +++ b/.sqlx/query-685f6bb7abf389c452400578793c41e2d3a82eb19f95d8e2a3cf721298c965d2.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE durable.task\n SET state = 'ready',\n running_on = NULL\n WHERE id = ANY($1::bigint[])\n AND running_on = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8Array", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "685f6bb7abf389c452400578793c41e2d3a82eb19f95d8e2a3cf721298c965d2" +} \ No newline at end of file diff --git a/.sqlx/query-6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed.json b/.sqlx/query-6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed.json new file mode 100644 index 0000000..8c732f3 --- /dev/null +++ b/.sqlx/query-6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM durable.worker\n WHERE id = $1\n AND CURRENT_TIMESTAMP - heartbeat_at > $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Interval" + ] + }, + "nullable": [] + }, + "hash": "6a58fd11039e3da7feadcf625dca001a52fae5d290811ea953f2ad022d96dfed" +} \ No newline at end of file diff --git a/.sqlx/query-6b9dd270eea86985822ded5a1fc4edc7f9aee0703ea1f5c1c84b221c0dd529b1.json b/.sqlx/query-6b9dd270eea86985822ded5a1fc4edc7f9aee0703ea1f5c1c84b221c0dd529b1.json new file mode 100644 index 0000000..da681d2 --- /dev/null +++ b/.sqlx/query-6b9dd270eea86985822ded5a1fc4edc7f9aee0703ea1f5c1c84b221c0dd529b1.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT wakeup_at as \"wakeup_at!\"\n FROM durable.task\n WHERE state = 'suspended'\n AND wakeup_at IS NOT NULL\n ORDER BY wakeup_at ASC\n LIMIT 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "wakeup_at!", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + true + ] + }, + "hash": "6b9dd270eea86985822ded5a1fc4edc7f9aee0703ea1f5c1c84b221c0dd529b1" +} \ No newline at end of file diff --git a/.sqlx/query-7079303b7c4017e46ef7bab339a683708fa4a03576b80a9a677732e5387865a2.json b/.sqlx/query-7079303b7c4017e46ef7bab339a683708fa4a03576b80a9a677732e5387865a2.json new file mode 100644 index 0000000..72320bc --- /dev/null +++ b/.sqlx/query-7079303b7c4017e46ef7bab339a683708fa4a03576b80a9a677732e5387865a2.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM durable.notification\n WHERE ctid IN (\n SELECT ctid\n FROM durable.notification\n WHERE task_id = $1\n ORDER BY created_at ASC\n LIMIT 1\n FOR UPDATE\n )\n RETURNING\n created_at,\n event,\n data as \"data: Json>\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 1, + "name": "event", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "data: Json>", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "7079303b7c4017e46ef7bab339a683708fa4a03576b80a9a677732e5387865a2" +} \ No newline at end of file diff --git a/.sqlx/query-775263f590071917b90b555d438e2f4b62d2ad6c37da62a6a00580100b06e2a1.json b/.sqlx/query-775263f590071917b90b555d438e2f4b62d2ad6c37da62a6a00580100b06e2a1.json deleted file mode 100644 index 789e5b4..0000000 --- a/.sqlx/query-775263f590071917b90b555d438e2f4b62d2ad6c37da62a6a00580100b06e2a1.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE durable.task\n SET state = 'ready',\n wakeup_at = NULL,\n running_on = (\n SELECT id\n FROM durable.worker\n ORDER BY random() + task.id\n LIMIT 1\n )\n WHERE state = 'suspended'\n AND wakeup_at <= (NOW() - $1::interval)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Interval" - ] - }, - "nullable": [] - }, - "hash": "775263f590071917b90b555d438e2f4b62d2ad6c37da62a6a00580100b06e2a1" -} diff --git a/.sqlx/query-7a204104ee904fe7004337a7e7f8c2210d9ff9eb79c1224f2ce1d6a33fcbf247.json b/.sqlx/query-7a204104ee904fe7004337a7e7f8c2210d9ff9eb79c1224f2ce1d6a33fcbf247.json deleted file mode 100644 index e5871ce..0000000 --- a/.sqlx/query-7a204104ee904fe7004337a7e7f8c2210d9ff9eb79c1224f2ce1d6a33fcbf247.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE durable.task\n SET state = 'ready'\n WHERE state = 'suspended'\n AND EXISTS((\n SELECT task_id\n FROM durable.notification\n WHERE task_id = task.id\n AND created_at < NOW() - '10 minutes'::interval\n ))\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [] - }, - "nullable": [] - }, - "hash": "7a204104ee904fe7004337a7e7f8c2210d9ff9eb79c1224f2ce1d6a33fcbf247" -} diff --git a/.sqlx/query-7c8a61cb5415f825ae1498e222b99b5b99c6986016d7e0267e3787201a987c4a.json b/.sqlx/query-7c8a61cb5415f825ae1498e222b99b5b99c6986016d7e0267e3787201a987c4a.json deleted file mode 100644 index 8e77d49..0000000 --- a/.sqlx/query-7c8a61cb5415f825ae1498e222b99b5b99c6986016d7e0267e3787201a987c4a.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM durable.worker\n WHERE id = $1\n AND CURRENT_TIMESTAMP - heartbeat_at > $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Interval" - ] - }, - "nullable": [] - }, - "hash": "7c8a61cb5415f825ae1498e222b99b5b99c6986016d7e0267e3787201a987c4a" -} diff --git a/.sqlx/query-8646787e97a2e9c59fa9a1f2f510a48574aa64161b0ccc37be8bb58deaff5783.json b/.sqlx/query-8646787e97a2e9c59fa9a1f2f510a48574aa64161b0ccc37be8bb58deaff5783.json deleted file mode 100644 index 1ad93a8..0000000 --- a/.sqlx/query-8646787e97a2e9c59fa9a1f2f510a48574aa64161b0ccc37be8bb58deaff5783.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE durable.task\n SET state = 'ready',\n running_on = NULL\n WHERE id = $1\n AND running_on = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "8646787e97a2e9c59fa9a1f2f510a48574aa64161b0ccc37be8bb58deaff5783" -} diff --git a/.sqlx/query-8e086d78a75c54b63c96225597764e31d82a666249c8cb1bbca40760bd7b2724.json b/.sqlx/query-8e086d78a75c54b63c96225597764e31d82a666249c8cb1bbca40760bd7b2724.json deleted file mode 100644 index 0510ce4..0000000 --- a/.sqlx/query-8e086d78a75c54b63c96225597764e31d82a666249c8cb1bbca40760bd7b2724.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO durable.notification(task_id, event, data)\n VALUES ($1, $2, $3)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Text", - "Jsonb" - ] - }, - "nullable": [] - }, - "hash": "8e086d78a75c54b63c96225597764e31d82a666249c8cb1bbca40760bd7b2724" -} diff --git a/.sqlx/query-eda0e7e4a91f19b4098d3d5ed1160a3a997e7c7c05ce7adffa01d627e4f75422.json b/.sqlx/query-9586a1de8c2d853dc4c2e33b5b930023315c5ce96b9b77b8d32e143f05d55f51.json similarity index 67% rename from .sqlx/query-eda0e7e4a91f19b4098d3d5ed1160a3a997e7c7c05ce7adffa01d627e4f75422.json rename to .sqlx/query-9586a1de8c2d853dc4c2e33b5b930023315c5ce96b9b77b8d32e143f05d55f51.json index 17168bc..af6e145 100644 --- a/.sqlx/query-eda0e7e4a91f19b4098d3d5ed1160a3a997e7c7c05ce7adffa01d627e4f75422.json +++ b/.sqlx/query-9586a1de8c2d853dc4c2e33b5b930023315c5ce96b9b77b8d32e143f05d55f51.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT state as \"state!: TaskState\" \n FROM durable.task\n WHERE task.id = $1\n FOR UPDATE\n ", + "query": "\n SELECT state as \"state!: TaskState\"\n FROM durable.task\n WHERE task.id = $1\n FOR UPDATE\n ", "describe": { "columns": [ { @@ -31,5 +31,5 @@ false ] }, - "hash": "eda0e7e4a91f19b4098d3d5ed1160a3a997e7c7c05ce7adffa01d627e4f75422" -} + "hash": "9586a1de8c2d853dc4c2e33b5b930023315c5ce96b9b77b8d32e143f05d55f51" +} \ No newline at end of file diff --git a/.sqlx/query-df8f78420eee9d8c6e56b7b349ba10235ff73813bda043108ec1d3a87793b3d7.json b/.sqlx/query-a2e037bea4471d830a137c39dad6507b05f9333aedaa31aa3128fcd59124618c.json similarity index 67% rename from .sqlx/query-df8f78420eee9d8c6e56b7b349ba10235ff73813bda043108ec1d3a87793b3d7.json rename to .sqlx/query-a2e037bea4471d830a137c39dad6507b05f9333aedaa31aa3128fcd59124618c.json index a2d0df5..b504bfc 100644 --- a/.sqlx/query-df8f78420eee9d8c6e56b7b349ba10235ff73813bda043108ec1d3a87793b3d7.json +++ b/.sqlx/query-a2e037bea4471d830a137c39dad6507b05f9333aedaa31aa3128fcd59124618c.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO durable.log(task_id, index, message)\n VALUES ($1, $2, $3)", + "query": "INSERT INTO durable.log(task_id, index, message)\n VALUES ($1, $2, $3)", "describe": { "columns": [], "parameters": { @@ -12,5 +12,5 @@ }, "nullable": [] }, - "hash": "df8f78420eee9d8c6e56b7b349ba10235ff73813bda043108ec1d3a87793b3d7" -} + "hash": "a2e037bea4471d830a137c39dad6507b05f9333aedaa31aa3128fcd59124618c" +} \ No newline at end of file diff --git a/.sqlx/query-b2cb7dae793f63b5c4af04f55ced99b482e3540944f313d1652af9953c8a3e41.json b/.sqlx/query-b2cb7dae793f63b5c4af04f55ced99b482e3540944f313d1652af9953c8a3e41.json deleted file mode 100644 index 4b9e021..0000000 --- a/.sqlx/query-b2cb7dae793f63b5c4af04f55ced99b482e3540944f313d1652af9953c8a3e41.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO durable.log(task_id, index, message)\n VALUES ($1, $2, $3)\n ON CONFLICT ON CONSTRAINT log_pkey DO UPDATE\n SET message = $3\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Int4", - "Text" - ] - }, - "nullable": [] - }, - "hash": "b2cb7dae793f63b5c4af04f55ced99b482e3540944f313d1652af9953c8a3e41" -} diff --git a/.sqlx/query-b7682529f96f192a7fd8d0aabd4ccde7f7b473d163b428b5ba542b04a0d61796.json b/.sqlx/query-b7682529f96f192a7fd8d0aabd4ccde7f7b473d163b428b5ba542b04a0d61796.json deleted file mode 100644 index abec5c1..0000000 --- a/.sqlx/query-b7682529f96f192a7fd8d0aabd4ccde7f7b473d163b428b5ba542b04a0d61796.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO durable.log(task_id, index, message)\n VALUES ($1, $2, $3)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Int4", - "Text" - ] - }, - "nullable": [] - }, - "hash": "b7682529f96f192a7fd8d0aabd4ccde7f7b473d163b428b5ba542b04a0d61796" -} diff --git a/.sqlx/query-c6857347c4dfab3aec1bdfc7323dab8715d8d407b5b7f2f73ba9a374d8b1eee6.json b/.sqlx/query-c6857347c4dfab3aec1bdfc7323dab8715d8d407b5b7f2f73ba9a374d8b1eee6.json deleted file mode 100644 index 82daa3c..0000000 --- a/.sqlx/query-c6857347c4dfab3aec1bdfc7323dab8715d8d407b5b7f2f73ba9a374d8b1eee6.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH\n prev AS (\n SELECT id, heartbeat_at\n FROM durable.worker\n WHERE id < $1\n ORDER BY id DESC\n LIMIT 1\n ),\n next AS (\n SELECT id, heartbeat_at\n FROM durable.worker\n WHERE NOT id = $1\n ORDER BY id DESC\n LIMIT 1\n ),\n combined AS (\n SELECT * FROM prev\n UNION ALL\n SELECT * FROM next\n )\n SELECT\n id as \"id!\",\n heartbeat_at as \"heartbeat_at!\"\n FROM combined\n ORDER BY id ASC\n LIMIT 1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id!", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "heartbeat_at!", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - null, - null - ] - }, - "hash": "c6857347c4dfab3aec1bdfc7323dab8715d8d407b5b7f2f73ba9a374d8b1eee6" -} diff --git a/.sqlx/query-ca7ed3f1d6dad66c99c5ff82827bcc2f6b9c247e52e92e67cca74772d69b2de0.json b/.sqlx/query-ca7ed3f1d6dad66c99c5ff82827bcc2f6b9c247e52e92e67cca74772d69b2de0.json deleted file mode 100644 index 9d48ca0..0000000 --- a/.sqlx/query-ca7ed3f1d6dad66c99c5ff82827bcc2f6b9c247e52e92e67cca74772d69b2de0.json +++ /dev/null @@ -1,34 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM durable.notification\n WHERE ctid IN (\n SELECT ctid\n FROM durable.notification\n WHERE task_id = $1\n ORDER BY created_at ASC\n LIMIT 1\n FOR UPDATE\n )\n RETURNING\n created_at,\n event,\n data as \"data: Json>\"\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "created_at", - "type_info": "Timestamptz" - }, - { - "ordinal": 1, - "name": "event", - "type_info": "Text" - }, - { - "ordinal": 2, - "name": "data: Json>", - "type_info": "Jsonb" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false, - false, - false - ] - }, - "hash": "ca7ed3f1d6dad66c99c5ff82827bcc2f6b9c247e52e92e67cca74772d69b2de0" -} diff --git a/.sqlx/query-cf31973a833c40d4d43aa01102e38178092c2ba065b7c7f367ce1cd2a90d2dce.json b/.sqlx/query-cf31973a833c40d4d43aa01102e38178092c2ba065b7c7f367ce1cd2a90d2dce.json deleted file mode 100644 index a2bfd3a..0000000 --- a/.sqlx/query-cf31973a833c40d4d43aa01102e38178092c2ba065b7c7f367ce1cd2a90d2dce.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE durable.task\n SET state = 'ready',\n running_on = NULL\n WHERE id = ANY($1::bigint[])\n AND running_on = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8Array", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "cf31973a833c40d4d43aa01102e38178092c2ba065b7c7f367ce1cd2a90d2dce" -} diff --git a/.sqlx/query-dadbe61077d006ce342f60e7c98d04204dee2e7fc10b5d4a7a2fd0ae0f89dd0f.json b/.sqlx/query-dadbe61077d006ce342f60e7c98d04204dee2e7fc10b5d4a7a2fd0ae0f89dd0f.json deleted file mode 100644 index 121240e..0000000 --- a/.sqlx/query-dadbe61077d006ce342f60e7c98d04204dee2e7fc10b5d4a7a2fd0ae0f89dd0f.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM durable.task\n WHERE task.ctid = ANY(ARRAY(\n SELECT ctid\n FROM durable.task\n WHERE completed_at < NOW() - $1::interval\n LIMIT $2\n FOR UPDATE\n ))\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Interval", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "dadbe61077d006ce342f60e7c98d04204dee2e7fc10b5d4a7a2fd0ae0f89dd0f" -} diff --git a/.sqlx/query-e7a5f2b6c1fe528b50d8fad456f16d53ef5ec74e1e797250141dc2eaef7f493a.json b/.sqlx/query-e7a5f2b6c1fe528b50d8fad456f16d53ef5ec74e1e797250141dc2eaef7f493a.json new file mode 100644 index 0000000..49f225a --- /dev/null +++ b/.sqlx/query-e7a5f2b6c1fe528b50d8fad456f16d53ef5ec74e1e797250141dc2eaef7f493a.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT state = 'suspended' as \"state!\"\n FROM durable.task\n WHERE id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "state!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "e7a5f2b6c1fe528b50d8fad456f16d53ef5ec74e1e797250141dc2eaef7f493a" +} diff --git a/.sqlx/query-ed078b499703e5cc048730b948acf6b0a9e690fe51f5886b99cf6a5ee0ff5e75.json b/.sqlx/query-ed078b499703e5cc048730b948acf6b0a9e690fe51f5886b99cf6a5ee0ff5e75.json new file mode 100644 index 0000000..63c12b2 --- /dev/null +++ b/.sqlx/query-ed078b499703e5cc048730b948acf6b0a9e690fe51f5886b99cf6a5ee0ff5e75.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE durable.task\n SET state = 'failed',\n completed_at = CURRENT_TIMESTAMP,\n running_on = NULL,\n wasm = NULL\n WHERE id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "ed078b499703e5cc048730b948acf6b0a9e690fe51f5886b99cf6a5ee0ff5e75" +} \ No newline at end of file diff --git a/.sqlx/query-f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf.json b/.sqlx/query-f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf.json new file mode 100644 index 0000000..0541d8d --- /dev/null +++ b/.sqlx/query-f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE durable.task\n SET state = 'ready',\n wakeup_at = NULL,\n running_on = (\n SELECT id\n FROM durable.worker\n ORDER BY random() + task.id\n LIMIT 1\n )\n WHERE state = 'suspended'\n AND wakeup_at <= (NOW() - $1::interval)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Interval" + ] + }, + "nullable": [] + }, + "hash": "f0fd494f64d8b26d4d87690558f59f91d8efe0b28c7d014c8320e973ea552dcf" +} \ No newline at end of file diff --git a/.sqlx/query-b1b5852c0b0f5767e296ba99a454abea66609b34002ecc94dd719504429e1ebc.json b/.sqlx/query-f640d1e56e13001d184a4d13441dc5cd6bb5e3e24361bf22d896a412fecbbf9c.json similarity index 53% rename from .sqlx/query-b1b5852c0b0f5767e296ba99a454abea66609b34002ecc94dd719504429e1ebc.json rename to .sqlx/query-f640d1e56e13001d184a4d13441dc5cd6bb5e3e24361bf22d896a412fecbbf9c.json index ab1c443..3b6c3fc 100644 --- a/.sqlx/query-b1b5852c0b0f5767e296ba99a454abea66609b34002ecc94dd719504429e1ebc.json +++ b/.sqlx/query-f640d1e56e13001d184a4d13441dc5cd6bb5e3e24361bf22d896a412fecbbf9c.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO durable.log(task_id, index, message)\n VALUES ($1, $2, $3)", + "query": "INSERT INTO durable.log(task_id, index, message)\n VALUES ($1, $2, $3)\n ON CONFLICT ON CONSTRAINT log_pkey DO UPDATE\n SET message = $3\n ", "describe": { "columns": [], "parameters": { @@ -12,5 +12,5 @@ }, "nullable": [] }, - "hash": "b1b5852c0b0f5767e296ba99a454abea66609b34002ecc94dd719504429e1ebc" -} + "hash": "f640d1e56e13001d184a4d13441dc5cd6bb5e3e24361bf22d896a412fecbbf9c" +} \ No newline at end of file diff --git a/.sqlx/query-f93b02946393ce2e3b14f732b1daec69069e7abf7188f9b2dcb048fa02fe5651.json b/.sqlx/query-f93b02946393ce2e3b14f732b1daec69069e7abf7188f9b2dcb048fa02fe5651.json deleted file mode 100644 index 1d1ca65..0000000 --- a/.sqlx/query-f93b02946393ce2e3b14f732b1daec69069e7abf7188f9b2dcb048fa02fe5651.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE durable.task\n SET state = 'complete',\n completed_at = CURRENT_TIMESTAMP,\n running_on = NULL,\n wasm = NULL\n WHERE id = $1\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "f93b02946393ce2e3b14f732b1daec69069e7abf7188f9b2dcb048fa02fe5651" -} diff --git a/.sqlx/query-fa2b86c00cdbd5322bda9eef3e35a392857ece1061614b3cfb47c7c55aab7c7e.json b/.sqlx/query-fa2b86c00cdbd5322bda9eef3e35a392857ece1061614b3cfb47c7c55aab7c7e.json deleted file mode 100644 index 00ca0fb..0000000 --- a/.sqlx/query-fa2b86c00cdbd5322bda9eef3e35a392857ece1061614b3cfb47c7c55aab7c7e.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE durable.task\n SET state = 'failed',\n completed_at = CURRENT_TIMESTAMP,\n running_on = NULL,\n wasm = NULL\n WHERE id = $1", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "fa2b86c00cdbd5322bda9eef3e35a392857ece1061614b3cfb47c7c55aab7c7e" -} diff --git a/.sqlx/query-fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596.json b/.sqlx/query-fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596.json new file mode 100644 index 0000000..203980b --- /dev/null +++ b/.sqlx/query-fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM durable.worker\n WHERE CURRENT_TIMESTAMP - heartbeat_at > $2\n AND NOT id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Interval" + ] + }, + "nullable": [] + }, + "hash": "fef563a3e3a6e8c1a8386e17003626eb8dc9ebb07cd563b2681b106e6fa74596" +} \ No newline at end of file diff --git a/crates/durable-runtime/src/lib.rs b/crates/durable-runtime/src/lib.rs index 43b1148..469298f 100644 --- a/crates/durable-runtime/src/lib.rs +++ b/crates/durable-runtime/src/lib.rs @@ -14,6 +14,7 @@ pub mod migrate; pub mod plugin; mod resource; pub mod scheduler; +pub(crate) mod storage; pub mod task; pub mod util; mod worker; diff --git a/crates/durable-runtime/src/plugin/durable/notify.rs b/crates/durable-runtime/src/plugin/durable/notify.rs index eb488ff..8125f54 100644 --- a/crates/durable-runtime/src/plugin/durable/notify.rs +++ b/crates/durable-runtime/src/plugin/durable/notify.rs @@ -5,6 +5,7 @@ use tokio::sync::broadcast::error::RecvError; use tokio::time::Instant; use crate::bindings::durable::core::notify::{Event, Host, NotifyError}; +use crate::storage::TaskState; use crate::task::TransactionOptions; use crate::{Task, TaskStatus}; @@ -12,27 +13,17 @@ async fn poll_notification( task: &mut Task, tx: &mut sqlx::PgConnection, ) -> anyhow::Result> { - let data = sqlx::query_as!( - EventData, - r#" - DELETE FROM durable.notification - WHERE ctid IN ( - SELECT ctid - FROM durable.notification - WHERE task_id = $1 - ORDER BY created_at ASC - LIMIT 1 - FOR UPDATE - ) - RETURNING - created_at, - event, - data as "data: Json>" - "#, - task.state.task_id() - ) - .fetch_optional(&mut *tx) - .await?; + let task_id = task.state.task_id(); + let data = task + .state + .storage() + .poll_notification(&mut *tx, task_id) + .await? + .map(|n| EventData { + created_at: n.created_at, + event: n.event, + data: n.data, + }); Ok(data) } @@ -86,16 +77,10 @@ impl Host for Task { // The timer expired, so we need to attempt to suspend. let mut tx = self.state.pool().begin().await?; - sqlx::query!( - "UPDATE durable.task - SET state = 'suspended', - running_on = NULL - WHERE id = $1 - ", - self.task_id() - ) - .execute(&mut *tx) - .await?; + self.state + .storage() + .suspend_task_no_wakeup(&mut tx, self.task_id()) + .await?; if poll_notification(&mut *self, &mut tx).await?.is_some() { // A new notification barged in while we were updating. Roll back the @@ -206,16 +191,10 @@ impl Host for Task { Some(Expired::Suspend) => { let mut tx = self.state.pool().begin().await?; - sqlx::query!( - "UPDATE durable.task - SET state = 'suspended', - running_on = NULL - WHERE id = $1 - ", - self.task_id() - ) - .execute(&mut *tx) - .await?; + self.state + .storage() + .suspend_task_no_wakeup(&mut tx, self.task_id()) + .await?; if poll_notification(&mut *self, &mut tx).await?.is_some() { // A new notification barged in while we were updating. @@ -250,6 +229,7 @@ impl Host for Task { return Ok(result); } + let storage = self.state.shared.storage.clone(); let txn = self.state.transaction_mut().unwrap(); let tx = txn.conn().unwrap(); @@ -261,17 +241,7 @@ impl Host for Task { // Note: We lock the row here so that concurrent notification polls // cannot barge in here. - let state = sqlx::query_scalar!( - r#" - SELECT state as "state!: TaskState" - FROM durable.task - WHERE task.id = $1 - FOR UPDATE - "#, - task - ) - .fetch_optional(&mut **tx) - .await?; + let state = storage.fetch_task_state_locked(&mut *tx, task).await?; match state { Some(TaskState::Complete | TaskState::Failed) => { @@ -281,17 +251,9 @@ impl Host for Task { _ => (), } - let result = sqlx::query_scalar!( - r#" - INSERT INTO durable.notification(task_id, event, data) - VALUES ($1, $2, $3) - "#, - task, - event, - Json(json) as Json<&RawValue> - ) - .execute(&mut **tx) - .await; + let result = storage + .insert_notification(&mut *tx, task, &event, Json(json)) + .await; match result { Ok(_) => Ok(Ok(())), @@ -359,13 +321,3 @@ impl<'de> serde::Deserialize<'de> for NotifyError { RemoteNotifyError::deserialize(de) } } - -#[derive(Copy, Clone, Debug, Eq, PartialEq, sqlx::Type)] -#[sqlx(type_name = "durable.task_state", rename_all = "lowercase")] -enum TaskState { - Ready, - Active, - Suspended, - Complete, - Failed, -} diff --git a/crates/durable-runtime/src/storage/mod.rs b/crates/durable-runtime/src/storage/mod.rs new file mode 100644 index 0000000..1aedfee --- /dev/null +++ b/crates/durable-runtime/src/storage/mod.rs @@ -0,0 +1,245 @@ +//! Database storage abstraction for the durable runtime. +//! +//! The [`Storage`] trait collects every query the runtime issues against the +//! durable schema. Each method corresponds to one logical operation and its +//! body is a verbatim move of the SQL that previously lived inline at the +//! call site. Transaction lifecycle (`pool.begin()`, `pool.acquire()`, +//! `commit`/`rollback`) remains at the call sites — methods take a borrowed +//! [`sqlx::PgConnection`], which works for both pooled connections and +//! transactions via sqlx's `Executor` blanket impl. +//! +//! The current implementation, [`PgStorage`], is Postgres-specific and is +//! intended to be the only impl for now. The trait exists so that future +//! work can introduce alternative backends (e.g. SQLite with a polling +//! executor) without further restructuring of the runtime. + +mod pg; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde_json::value::RawValue; +use sqlx::postgres::types::PgInterval; +use sqlx::postgres::PgQueryResult; +use sqlx::types::Json; +use sqlx::PgConnection; + +pub(crate) use self::pg::PgStorage; +use crate::task::RecordedEvent; +use crate::worker::TaskData; + +#[derive(Copy, Clone, Debug, Eq, PartialEq, sqlx::Type)] +#[sqlx(type_name = "durable.task_state", rename_all = "lowercase")] +pub(crate) enum TaskState { + Ready, + Active, + Suspended, + Complete, + Failed, +} + +#[derive(Debug)] +pub(crate) struct WorkerRecord { + pub id: i64, + pub heartbeat_at: DateTime, +} + +#[derive(Debug)] +pub(crate) struct StoredEvent { + pub label: String, + pub value: Json>, +} + +#[derive(Debug)] +pub(crate) struct PolledNotification { + pub created_at: DateTime, + pub event: String, + pub data: Json>, +} + +#[async_trait] +pub(crate) trait Storage: Send + Sync + 'static { + /// Currently unused — transaction lifecycle is driven through + /// `SharedState::pool` until phase 2 of the storage abstraction. + #[allow(dead_code)] + fn pool(&self) -> &sqlx::PgPool; + + async fn insert_worker(&self, conn: &mut PgConnection) -> sqlx::Result; + + async fn delete_worker( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result; + + /// Returns `true` if the row still exists. + async fn heartbeat_worker(&self, conn: &mut PgConnection, worker_id: i64) + -> sqlx::Result; + + async fn delete_following_expired_worker( + &self, + conn: &mut PgConnection, + following: i64, + timeout: PgInterval, + ) -> sqlx::Result; + + async fn delete_other_expired_workers( + &self, + conn: &mut PgConnection, + worker_id: i64, + timeout: PgInterval, + ) -> sqlx::Result; + + /// See `Worker::validate_workers` for the algorithm. + async fn next_worker_in_sequence( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result>; + + /// The leader is the oldest worker. + async fn load_leader_id(&self, conn: &mut PgConnection) -> sqlx::Result>; + + async fn wake_suspended_tasks( + &self, + conn: &mut PgConnection, + suspend_margin: PgInterval, + ) -> sqlx::Result; + + async fn next_wakeup_at(&self, conn: &mut PgConnection) -> sqlx::Result>>; + + async fn cleanup_old_tasks( + &self, + conn: &mut PgConnection, + cleanup_age: PgInterval, + limit: i64, + ) -> sqlx::Result; + + async fn unwedge_stuck_notifications( + &self, + conn: &mut PgConnection, + ) -> sqlx::Result; + + async fn reset_failed_tasks( + &self, + conn: &mut PgConnection, + failed_ids: &[i64], + worker_id: i64, + ) -> sqlx::Result; + + async fn claim_tasks( + &self, + conn: &mut PgConnection, + worker_id: i64, + allowed: i64, + ) -> sqlx::Result>; + + /// Used when a worker hits its capacity before committing a claim. + async fn release_owned_ready_tasks( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result; + + /// Only resets the task if it is still owned by `worker_id`. + async fn reset_task_for_retry( + &self, + conn: &mut PgConnection, + task_id: i64, + worker_id: i64, + ) -> sqlx::Result; + + async fn mark_task_complete( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result; + + async fn mark_task_failed( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result; + + async fn fetch_wasm_blob(&self, conn: &mut PgConnection, wasm_id: i64) + -> sqlx::Result>; + + async fn fetch_recorded_events( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result>; + + async fn fetch_event_at_index( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + ) -> sqlx::Result>; + + /// Inserts an event row, optionally inserts a log row, and returns the + /// task's `running_on` value in a single round-trip. + async fn commit_event_with_log( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + label: &str, + value: Json<&RawValue>, + message: Option<&str>, + ) -> sqlx::Result>; + + async fn suspend_task( + &self, + conn: &mut PgConnection, + task_id: i64, + wakeup_at: Option>, + ) -> sqlx::Result; + + /// Suspends without touching `wakeup_at`. Used by the notification + /// blocking path, which must not clobber an existing wakeup deadline. + async fn suspend_task_no_wakeup( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result; + + async fn insert_log( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + message: &str, + ) -> sqlx::Result; + + /// Replaces the message if a row already exists at `(task_id, index)`. + /// Used to record the final error message for a task. + async fn upsert_log_error( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + message: &str, + ) -> sqlx::Result; + + async fn poll_notification( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result>; + + /// Locks the row for the duration of the surrounding transaction so a + /// concurrent notification poll cannot consume the task. + async fn fetch_task_state_locked( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result>; + + async fn insert_notification( + &self, + conn: &mut PgConnection, + task_id: i64, + event: &str, + data: Json<&RawValue>, + ) -> sqlx::Result; +} diff --git a/crates/durable-runtime/src/storage/pg.rs b/crates/durable-runtime/src/storage/pg.rs new file mode 100644 index 0000000..1406b9c --- /dev/null +++ b/crates/durable-runtime/src/storage/pg.rs @@ -0,0 +1,657 @@ +//! Postgres implementation of the [`Storage`](super::Storage) trait. +//! +//! Each method body is a verbatim move of the SQL that previously lived +//! inline at the call site. No semantic changes — see `storage::mod` docs. + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde_json::value::RawValue; +use sqlx::postgres::types::PgInterval; +use sqlx::postgres::PgQueryResult; +use sqlx::types::Json; +use sqlx::PgConnection; + +use super::{PolledNotification, Storage, StoredEvent, TaskState, WorkerRecord}; +use crate::task::RecordedEvent; +use crate::worker::TaskData; + +#[derive(Clone)] +pub(crate) struct PgStorage { + #[allow(dead_code)] + pool: sqlx::PgPool, +} + +impl PgStorage { + pub(crate) fn new(pool: sqlx::PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl Storage for PgStorage { + fn pool(&self) -> &sqlx::PgPool { + &self.pool + } + + async fn insert_worker(&self, conn: &mut PgConnection) -> sqlx::Result { + let record = sqlx::query!( + " + INSERT INTO durable.worker(heartbeat_at) + VALUES (CURRENT_TIMESTAMP) + RETURNING id + " + ) + .fetch_one(&mut *conn) + .await?; + + Ok(record.id) + } + + async fn delete_worker( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result { + sqlx::query!("DELETE FROM durable.worker WHERE id = $1", worker_id) + .execute(&mut *conn) + .await + } + + async fn heartbeat_worker( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result { + let record = sqlx::query!( + "UPDATE durable.worker + SET heartbeat_at = CURRENT_TIMESTAMP + WHERE id = $1 + RETURNING id", + worker_id + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(record.is_some()) + } + + async fn delete_following_expired_worker( + &self, + conn: &mut PgConnection, + following: i64, + timeout: PgInterval, + ) -> sqlx::Result { + sqlx::query!( + " + DELETE FROM durable.worker + WHERE id = $1 + AND CURRENT_TIMESTAMP - heartbeat_at > $2 + ", + following, + timeout + ) + .execute(&mut *conn) + .await + } + + async fn delete_other_expired_workers( + &self, + conn: &mut PgConnection, + worker_id: i64, + timeout: PgInterval, + ) -> sqlx::Result { + sqlx::query!( + " + DELETE FROM durable.worker + WHERE CURRENT_TIMESTAMP - heartbeat_at > $2 + AND NOT id = $1 + ", + worker_id, + timeout + ) + .execute(&mut *conn) + .await + } + + async fn next_worker_in_sequence( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result> { + let record = sqlx::query!( + r#" + WITH + prev AS ( + SELECT id, heartbeat_at + FROM durable.worker + WHERE id < $1 + ORDER BY id DESC + LIMIT 1 + ), + next AS ( + SELECT id, heartbeat_at + FROM durable.worker + WHERE NOT id = $1 + ORDER BY id DESC + LIMIT 1 + ), + combined AS ( + SELECT * FROM prev + UNION ALL + SELECT * FROM next + ) + SELECT + id as "id!", + heartbeat_at as "heartbeat_at!" + FROM combined + ORDER BY id ASC + LIMIT 1 + "#, + worker_id + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(record.map(|r| WorkerRecord { + id: r.id, + heartbeat_at: r.heartbeat_at, + })) + } + + async fn load_leader_id(&self, conn: &mut PgConnection) -> sqlx::Result> { + let record = sqlx::query!( + " + SELECT id + FROM durable.worker + ORDER BY started_at ASC, id ASC + LIMIT 1 + " + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(record.map(|r| r.id)) + } + + async fn wake_suspended_tasks( + &self, + conn: &mut PgConnection, + suspend_margin: PgInterval, + ) -> sqlx::Result { + sqlx::query!( + " + UPDATE durable.task + SET state = 'ready', + wakeup_at = NULL, + running_on = ( + SELECT id + FROM durable.worker + ORDER BY random() + task.id + LIMIT 1 + ) + WHERE state = 'suspended' + AND wakeup_at <= (NOW() - $1::interval) + ", + suspend_margin + ) + .execute(&mut *conn) + .await + } + + async fn next_wakeup_at(&self, conn: &mut PgConnection) -> sqlx::Result>> { + let record = sqlx::query!( + r#" + SELECT wakeup_at as "wakeup_at!" + FROM durable.task + WHERE state = 'suspended' + AND wakeup_at IS NOT NULL + ORDER BY wakeup_at ASC + LIMIT 1 + "# + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(record.map(|r| r.wakeup_at)) + } + + async fn cleanup_old_tasks( + &self, + conn: &mut PgConnection, + cleanup_age: PgInterval, + limit: i64, + ) -> sqlx::Result { + sqlx::query!( + r#" + DELETE FROM durable.task + WHERE task.ctid = ANY(ARRAY( + SELECT ctid + FROM durable.task + WHERE completed_at < NOW() - $1::interval + LIMIT $2 + FOR UPDATE + )) + "#, + cleanup_age, + limit + ) + .execute(&mut *conn) + .await + } + + async fn unwedge_stuck_notifications( + &self, + conn: &mut PgConnection, + ) -> sqlx::Result { + sqlx::query!( + r#" + UPDATE durable.task + SET state = 'ready' + WHERE state = 'suspended' + AND EXISTS(( + SELECT task_id + FROM durable.notification + WHERE task_id = task.id + AND created_at < NOW() - '10 minutes'::interval + )) + "# + ) + .execute(&mut *conn) + .await + } + + async fn reset_failed_tasks( + &self, + conn: &mut PgConnection, + failed_ids: &[i64], + worker_id: i64, + ) -> sqlx::Result { + sqlx::query!( + " + UPDATE durable.task + SET state = 'ready', + running_on = NULL + WHERE id = ANY($1::bigint[]) + AND running_on = $2 + ", + failed_ids, + worker_id + ) + .execute(&mut *conn) + .await + } + + async fn claim_tasks( + &self, + conn: &mut PgConnection, + worker_id: i64, + allowed: i64, + ) -> sqlx::Result> { + sqlx::query_as!( + TaskData, + r#" + WITH selected AS ( + SELECT id + FROM durable.task + WHERE (state IN ('ready', 'active') AND running_on IS NULL) + OR (state = 'ready' AND running_on = $1) + ORDER BY id ASC + FOR NO KEY UPDATE SKIP LOCKED + LIMIT $2 + ) + UPDATE durable.task + SET running_on = $1, + state = 'active' + FROM selected + WHERE selected.id = task.id + RETURNING + task.id as id, + task.name as name, + task.created_at as created_at, + task.wasm as "wasm!", + task.data as "data!: Json>" + "#, + worker_id, + allowed + ) + .fetch_all(&mut *conn) + .await + } + + async fn release_owned_ready_tasks( + &self, + conn: &mut PgConnection, + worker_id: i64, + ) -> sqlx::Result { + sqlx::query!( + " + UPDATE durable.task + SET running_on = NULL + WHERE state = 'ready' + AND running_on = $1 + ", + worker_id + ) + .execute(&mut *conn) + .await + } + + async fn reset_task_for_retry( + &self, + conn: &mut PgConnection, + task_id: i64, + worker_id: i64, + ) -> sqlx::Result { + sqlx::query!( + " + UPDATE durable.task + SET state = 'ready', + running_on = NULL + WHERE id = $1 + AND running_on = $2 + ", + task_id, + worker_id + ) + .execute(&mut *conn) + .await + } + + async fn mark_task_complete( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result { + sqlx::query!( + "UPDATE durable.task + SET state = 'complete', + completed_at = CURRENT_TIMESTAMP, + running_on = NULL, + wasm = NULL + WHERE id = $1 + ", + task_id + ) + .execute(&mut *conn) + .await + } + + async fn mark_task_failed( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result { + sqlx::query!( + "UPDATE durable.task + SET state = 'failed', + completed_at = CURRENT_TIMESTAMP, + running_on = NULL, + wasm = NULL + WHERE id = $1", + task_id + ) + .execute(&mut *conn) + .await + } + + async fn fetch_wasm_blob( + &self, + conn: &mut PgConnection, + wasm_id: i64, + ) -> sqlx::Result> { + let record = sqlx::query!("SELECT wasm FROM durable.wasm WHERE id = $1", wasm_id) + .fetch_one(&mut *conn) + .await?; + + Ok(record.wasm) + } + + async fn fetch_recorded_events( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result> { + sqlx::query_as!( + RecordedEvent, + r#" + SELECT + index, + label, + value as "value!: Json>" + FROM durable.event + WHERE task_id = $1 + ORDER BY index ASC + LIMIT 1000 + "#, + task_id + ) + .fetch_all(&mut *conn) + .await + } + + async fn fetch_event_at_index( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + ) -> sqlx::Result> { + let record = sqlx::query!( + r#" + SELECT + label, + value as "value: Json>" + FROM durable.event + WHERE task_id = $1 + AND index = $2 + "#, + task_id, + index + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(record.map(|r| StoredEvent { + label: r.label, + value: r.value, + })) + } + + async fn commit_event_with_log( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + label: &str, + value: Json<&RawValue>, + message: Option<&str>, + ) -> sqlx::Result> { + let record = sqlx::query!( + r#" + WITH + current_task AS ( + SELECT id, running_on + FROM durable.task + WHERE id = $1 + LIMIT 1 + ), + insert_event AS ( + INSERT INTO durable.event(task_id, index, label, value) + SELECT + id as task_id, + $2 as index, + $3 as label, + $4 as value + FROM current_task + RETURNING task_id + ), + insert_log AS ( + INSERT INTO durable.log(task_id, index, message) + SELECT task_id, index, message + FROM (VALUES ($1, $2, $5)) as t(task_id, index, message) + JOIN current_task task ON task.id = task_id + WHERE message IS NOT NULL + RETURNING task_id + ) + SELECT running_on + FROM current_task + LEFT JOIN insert_event event ON event.task_id = id + LEFT JOIN insert_event log ON log.task_id = id + "#, + task_id, + index, + label, + value as Json<&RawValue>, + message + ) + .fetch_one(&mut *conn) + .await?; + + Ok(record.running_on) + } + + async fn suspend_task( + &self, + conn: &mut PgConnection, + task_id: i64, + wakeup_at: Option>, + ) -> sqlx::Result { + sqlx::query!( + "UPDATE durable.task + SET state = 'suspended', + running_on = NULL, + wakeup_at = $2 + WHERE id = $1", + task_id, + wakeup_at + ) + .execute(&mut *conn) + .await + } + + async fn suspend_task_no_wakeup( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result { + sqlx::query!( + "UPDATE durable.task + SET state = 'suspended', + running_on = NULL + WHERE id = $1 + ", + task_id + ) + .execute(&mut *conn) + .await + } + + async fn insert_log( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + message: &str, + ) -> sqlx::Result { + sqlx::query!( + "INSERT INTO durable.log(task_id, index, message) + VALUES ($1, $2, $3)", + task_id, + index, + message + ) + .execute(&mut *conn) + .await + } + + async fn upsert_log_error( + &self, + conn: &mut PgConnection, + task_id: i64, + index: i32, + message: &str, + ) -> sqlx::Result { + sqlx::query!( + "INSERT INTO durable.log(task_id, index, message) + VALUES ($1, $2, $3) + ON CONFLICT ON CONSTRAINT log_pkey DO UPDATE + SET message = $3 + ", + task_id, + index, + message + ) + .execute(&mut *conn) + .await + } + + async fn poll_notification( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result> { + let record = sqlx::query!( + r#" + DELETE FROM durable.notification + WHERE ctid IN ( + SELECT ctid + FROM durable.notification + WHERE task_id = $1 + ORDER BY created_at ASC + LIMIT 1 + FOR UPDATE + ) + RETURNING + created_at, + event, + data as "data: Json>" + "#, + task_id + ) + .fetch_optional(&mut *conn) + .await?; + + Ok(record.map(|r| PolledNotification { + created_at: r.created_at, + event: r.event, + data: r.data, + })) + } + + async fn fetch_task_state_locked( + &self, + conn: &mut PgConnection, + task_id: i64, + ) -> sqlx::Result> { + sqlx::query_scalar!( + r#" + SELECT state as "state!: TaskState" + FROM durable.task + WHERE task.id = $1 + FOR UPDATE + "#, + task_id + ) + .fetch_optional(&mut *conn) + .await + } + + async fn insert_notification( + &self, + conn: &mut PgConnection, + task_id: i64, + event: &str, + data: Json<&RawValue>, + ) -> sqlx::Result { + sqlx::query!( + r#" + INSERT INTO durable.notification(task_id, event, data) + VALUES ($1, $2, $3) + "#, + task_id, + event, + data as Json<&RawValue> + ) + .execute(&mut *conn) + .await + } +} diff --git a/crates/durable-runtime/src/task.rs b/crates/durable-runtime/src/task.rs index 65752ec..77c793d 100644 --- a/crates/durable-runtime/src/task.rs +++ b/crates/durable-runtime/src/task.rs @@ -202,7 +202,7 @@ pub struct Task { } pub struct TaskState { - shared: Arc, + pub(crate) shared: Arc, worker_id: i64, task: TaskData, @@ -275,6 +275,10 @@ impl TaskState { &self.shared.pool } + pub(crate) fn storage(&self) -> &dyn crate::storage::Storage { + &*self.shared.storage + } + /// Access the reqwest client for the worker. pub fn client(&self) -> &reqwest::Client { &self.shared.client @@ -444,20 +448,11 @@ impl TaskState { ); } - let record = sqlx::query!( - r#" - SELECT - label, - value as "value: Json>" - FROM durable.event - WHERE task_id = $1 - AND index = $2 - "#, - self.task_id(), - self.txn_index - ) - .fetch_optional(&mut *conn) - .await?; + let record = self + .shared + .storage + .fetch_event_at_index(&mut *conn, self.task_id(), self.txn_index) + .await?; if let Some(record) = record { if record.label != options.label { @@ -600,47 +595,19 @@ impl TaskState { // - We avoid multiple roundtrips to the database. // - Since all the statements are conditional on running_on = worker_id, we can // run this outside of a transaction with no issues. - let running_on = sqlx::query!( - r#" - WITH - current_task AS ( - SELECT id, running_on - FROM durable.task - WHERE id = $1 - LIMIT 1 - ), - insert_event AS ( - INSERT INTO durable.event(task_id, index, label, value) - SELECT - id as task_id, - $2 as index, - $3 as label, - $4 as value - FROM current_task - RETURNING task_id - ), - insert_log AS ( - INSERT INTO durable.log(task_id, index, message) - SELECT task_id, index, message - FROM (VALUES ($1, $2, $5)) as t(task_id, index, message) - JOIN current_task task ON task.id = task_id - WHERE message IS NOT NULL - RETURNING task_id - ) - SELECT running_on - FROM current_task - LEFT JOIN insert_event event ON event.task_id = id - LEFT JOIN insert_event log ON log.task_id = id - "#, - self.task_id(), - self.txn_index, - &*txn.label, - Json(data) as Json<&T>, - message - ) - .fetch_one(&mut *conn) - .await? - .running_on; + let value = serde_json::value::to_raw_value(data)?; + let running_on = self + .shared + .storage + .commit_event_with_log( + &mut *conn, + self.task_id(), + self.txn_index, + &txn.label, + Json(&*value), + message.as_deref(), + ) + .await?; tracing::trace!( target: "durable_runtime::task::transaction", @@ -673,17 +640,10 @@ impl TaskState { conn: &mut PgConnection, timeout: Option>, ) -> anyhow::Result { - sqlx::query!( - "UPDATE durable.task - SET state = 'suspended', - running_on = NULL, - wakeup_at = $2 - WHERE id = $1", - self.task_id(), - timeout - ) - .execute(&mut *conn) - .await?; + self.shared + .storage + .suspend_task(&mut *conn, self.task_id(), timeout) + .await?; Ok(TaskStatus::Suspend) } diff --git a/crates/durable-runtime/src/worker.rs b/crates/durable-runtime/src/worker.rs index d301e62..cab91d8 100644 --- a/crates/durable-runtime/src/worker.rs +++ b/crates/durable-runtime/src/worker.rs @@ -24,7 +24,7 @@ use crate::event::{self, Event, EventSource, Notification}; use crate::flag::{ShutdownFlag, ShutdownGuard}; use crate::plugin::{DurablePlugin, Plugin}; use crate::scheduler::{Component as SchedulerComponent, ScheduleEvent}; -use crate::task::{RecordedEvent, Task, TaskState}; +use crate::task::{Task, TaskState}; use crate::util::{IntoPgInterval, Mailbox, MetricSpan}; use crate::Config; @@ -34,6 +34,7 @@ const LOG_PANIC_INDEX: i32 = i32::MAX; pub(crate) struct SharedState { pub shutdown: ShutdownFlag, pub pool: sqlx::PgPool, + pub(crate) storage: Arc, pub client: reqwest::Client, pub notifications: broadcast::Sender, pub config: Config, @@ -255,6 +256,7 @@ impl WorkerBuilder { suspend: Notify::new(), cache: Mutex::new(uluru::LRUCache::new()), compile_sema: Semaphore::new(self.config.max_concurrent_compilations), + storage: Arc::new(crate::storage::PgStorage::new(self.pool.clone())), pool: self.pool, config: self.config, plugins: self.plugins, @@ -339,16 +341,9 @@ impl Worker { } pub async fn run(&mut self) -> anyhow::Result<()> { - self.worker_id = sqlx::query!( - " - INSERT INTO durable.worker(heartbeat_at) - VALUES (CURRENT_TIMESTAMP) - RETURNING id - " - ) - .fetch_one(&self.shared.pool) - .await? - .id; + let mut conn = self.shared.pool.acquire().await?; + self.worker_id = self.shared.storage.insert_worker(&mut conn).await?; + drop(conn); tracing::info!("durable worker id is {}", self.worker_id); @@ -390,10 +385,15 @@ impl Worker { span.in_scope(|| { tracing::info!("deleting worker database entry"); }); - let result = sqlx::query!("DELETE FROM durable.worker WHERE id = $1", self.worker_id) - .execute(&self.shared.pool) - .await - .context("failed to delete the worker entry from the database"); + let result = async { + let mut conn = self.shared.pool.acquire().await?; + self.shared + .storage + .delete_worker(&mut conn, self.worker_id) + .await + } + .await + .context("failed to delete the worker entry from the database"); self.shared.scheduler.notify(ScheduleEvent::WorkerDeleted { worker_id: self.worker_id, @@ -431,21 +431,18 @@ impl Worker { .acquire(SchedulerComponent::Heartbeat { worker_id }) .await; - let record = sqlx::query!( - "UPDATE durable.worker - SET heartbeat_at = CURRENT_TIMESTAMP - WHERE id = $1 - RETURNING id", - worker_id - ) - .fetch_optional(&shared.pool) - .await?; + let mut conn = shared.pool.acquire().await?; + let alive = shared + .storage + .heartbeat_worker(&mut conn, worker_id) + .await?; + drop(conn); // Our record is gone from the database. This means that some other worker // determined that we were inactive. // // We should shutdown and then (optionally) restart with a new worker id. - if record.is_none() { + if !alive { shared.shutdown.raise(); anyhow::bail!("worker entry was deleted from the database"); } @@ -499,34 +496,20 @@ impl Worker { let timeout = shared.config.heartbeat_timeout.into_pg_interval(); let mut result = if let Some(following) = following.take() { - sqlx::query!( - " - DELETE FROM durable.worker - WHERE id = $1 - AND CURRENT_TIMESTAMP - heartbeat_at > $2 - ", - following, - timeout - ) - .execute(&mut *tx) - .await? + shared + .storage + .delete_following_expired_worker(&mut tx, following, timeout) + .await? } else { Default::default() }; if result.rows_affected() > 0 { result.extend(std::iter::once( - sqlx::query!( - " - DELETE FROM durable.worker - WHERE CURRENT_TIMESTAMP - heartbeat_at > $2 - AND NOT id = $1 - ", - worker_id, - timeout - ) - .execute(&mut *tx) - .await?, + shared + .storage + .delete_other_expired_workers(&mut tx, worker_id, timeout) + .await?, )); tracing::debug!( @@ -537,39 +520,10 @@ impl Worker { } // Select either the next worker in sequence, or the newest id in the sequence. - let record = sqlx::query!( - r#" - WITH - prev AS ( - SELECT id, heartbeat_at - FROM durable.worker - WHERE id < $1 - ORDER BY id DESC - LIMIT 1 - ), - next AS ( - SELECT id, heartbeat_at - FROM durable.worker - WHERE NOT id = $1 - ORDER BY id DESC - LIMIT 1 - ), - combined AS ( - SELECT * FROM prev - UNION ALL - SELECT * FROM next - ) - SELECT - id as "id!", - heartbeat_at as "heartbeat_at!" - FROM combined - ORDER BY id ASC - LIMIT 1 - "#, - worker_id - ) - .fetch_optional(&mut *tx) - .await?; + let record = shared + .storage + .next_worker_in_sequence(&mut tx, worker_id) + .await?; tx.commit().await?; @@ -641,24 +595,10 @@ impl Worker { // postgresql is forced to evaluate it for each row. // // If we don't do that all the rows here get the same random number. - let result = sqlx::query!( - " - UPDATE durable.task - SET state = 'ready', - wakeup_at = NULL, - running_on = ( - SELECT id - FROM durable.worker - ORDER BY random() + task.id - LIMIT 1 - ) - WHERE state = 'suspended' - AND wakeup_at <= (NOW() - $1::interval) - ", - shared.config.suspend_margin.into_pg_interval() - ) - .execute(&mut *conn) - .await?; + let result = shared + .storage + .wake_suspended_tasks(&mut conn, shared.config.suspend_margin.into_pg_interval()) + .await?; let count = result.rows_affected(); if count > 0 { @@ -666,19 +606,7 @@ impl Worker { shared.scheduler.notify(ScheduleEvent::TasksWoken { count }); } - let wakeup_at = sqlx::query!( - r#" - SELECT wakeup_at as "wakeup_at!" - FROM durable.task - WHERE state = 'suspended' - AND wakeup_at IS NOT NULL - ORDER BY wakeup_at ASC - LIMIT 1 - "# - ) - .fetch_optional(&mut *conn) - .await? - .map(|record| record.wakeup_at); + let wakeup_at = shared.storage.next_wakeup_at(&mut conn).await?; let now = shared.clock.now(); let delay = match wakeup_at { @@ -743,22 +671,10 @@ impl Worker { // We do cleanup loop { - let result = sqlx::query!( - r#" - DELETE FROM durable.task - WHERE task.ctid = ANY(ARRAY( - SELECT ctid - FROM durable.task - WHERE completed_at < NOW() - $1::interval - LIMIT $2 - FOR UPDATE - )) - "#, - interval, - limit - ) - .execute(&mut *conn) - .await; + let result = shared + .storage + .cleanup_old_tasks(&mut conn, interval, limit) + .await; match result { Ok(result) if result.rows_affected() < limit as u64 => break, @@ -810,21 +726,7 @@ impl Worker { } }; - let result = sqlx::query!( - r#" - UPDATE durable.task - SET state = 'ready' - WHERE state = 'suspended' - AND EXISTS(( - SELECT task_id - FROM durable.notification - WHERE task_id = task.id - AND created_at < NOW() - '10 minutes'::interval - )) - "# - ) - .execute(&mut *conn) - .await; + let result = shared.storage.unwedge_stuck_notifications(&mut conn).await; match result { Ok(res) if res.rows_affected() != 0 => { @@ -893,19 +795,11 @@ impl Worker { } } - sqlx::query!( - " - UPDATE durable.task - SET state = 'ready', - running_on = NULL - WHERE id = ANY($1::bigint[]) - AND running_on = $2 - ", - &failed, - self.worker_id - ) - .execute(&self.shared.pool) - .await?; + let mut conn = self.shared.pool.acquire().await?; + self.shared + .storage + .reset_failed_tasks(&mut conn, &failed, self.worker_id) + .await?; continue; } @@ -949,21 +843,11 @@ impl Worker { } async fn load_leader_id(&mut self) -> anyhow::Result<()> { - let record = sqlx::query!( - " - SELECT id - FROM durable.worker - ORDER BY started_at ASC, id ASC - LIMIT 1 - " - ) - .fetch_optional(&self.shared.pool) - .await?; - - let new_leader = match record { - Some(record) => record.id, - None => -1, - }; + let mut conn = self.shared.pool.acquire().await?; + let record = self.shared.storage.load_leader_id(&mut conn).await?; + drop(conn); + + let new_leader = record.unwrap_or(-1); self.shared.leader.store(new_leader); self.shared @@ -991,48 +875,17 @@ impl Worker { .await; let mut tx = self.shared.pool.begin().await?; - let tasks = sqlx::query_as!( - TaskData, - r#" - WITH selected AS ( - SELECT id - FROM durable.task - WHERE (state IN ('ready', 'active') AND running_on IS NULL) - OR (state = 'ready' AND running_on = $1) - ORDER BY id ASC - FOR NO KEY UPDATE SKIP LOCKED - LIMIT $2 - ) - UPDATE durable.task - SET running_on = $1, - state = 'active' - FROM selected - WHERE selected.id = task.id - RETURNING - task.id as id, - task.name as name, - task.created_at as created_at, - task.wasm as "wasm!", - task.data as "data!: Json>" - "#, - self.worker_id, - allowed as i64 - ) - .fetch_all(&mut *tx) - .await?; + let tasks = self + .shared + .storage + .claim_tasks(&mut tx, self.worker_id, allowed as i64) + .await?; if tasks.len() + self.tasks.len() >= max_tasks { - sqlx::query!( - " - UPDATE durable.task - SET running_on = NULL - WHERE state = 'ready' - AND running_on = $1 - ", - self.worker_id - ) - .execute(&mut *tx) - .await?; + self.shared + .storage + .release_owned_ready_tasks(&mut tx, self.worker_id) + .await?; self.blocked = true; } @@ -1128,19 +981,11 @@ impl Worker { // // If this fails then the task failure gets reported to the main event // loop which can ensure it gets retried. - sqlx::query!( - " - UPDATE durable.task - SET state = 'ready', - running_on = NULL - WHERE id = $1 - AND running_on = $2 - ", - task_id, - worker_id - ) - .execute(&shared.pool) - .await?; + let mut conn = shared.pool.acquire().await?; + shared + .storage + .reset_task_for_retry(&mut conn, task_id, worker_id) + .await?; break TaskStatus::Suspend; } @@ -1153,14 +998,13 @@ impl Worker { let message = format!("{error:?}\n"); - let result = sqlx::query!( - "INSERT INTO durable.log(task_id, index, message) - VALUES ($1, $2, $3)", - task_id, - LOG_ERROR_INDEX, - message - ) - .execute(&shared.pool) + let result = async { + let mut conn = shared.pool.acquire().await?; + shared + .storage + .insert_log(&mut conn, task_id, LOG_ERROR_INDEX, &message) + .await + } .await; if let Err(e) = result { @@ -1180,14 +1024,18 @@ impl Worker { tracing::error!("task {task_id} panicked: {message}"); - let result = sqlx::query!( - "INSERT INTO durable.log(task_id, index, message) - VALUES ($1, $2, $3)", - task_id, - LOG_PANIC_INDEX, - format!("task panicked: {message}\n") - ) - .execute(&shared.pool) + let result = async { + let mut conn = shared.pool.acquire().await?; + shared + .storage + .insert_log( + &mut conn, + task_id, + LOG_PANIC_INDEX, + &format!("task panicked: {message}\n"), + ) + .await + } .await; if let Err(e) = result { @@ -1216,18 +1064,11 @@ impl Worker { }); } TaskStatus::ExitSuccess => { - sqlx::query!( - "UPDATE durable.task - SET state = 'complete', - completed_at = CURRENT_TIMESTAMP, - running_on = NULL, - wasm = NULL - WHERE id = $1 - ", - task_id - ) - .execute(&shared.pool) - .await?; + let mut conn = shared.pool.acquire().await?; + shared + .storage + .mark_task_complete(&mut conn, task_id) + .await?; shared.metrics.task_complete.increment(1); shared.scheduler.notify(ScheduleEvent::TaskCompleted { @@ -1236,17 +1077,8 @@ impl Worker { }); } TaskStatus::ExitFailure => { - sqlx::query!( - "UPDATE durable.task - SET state = 'failed', - completed_at = CURRENT_TIMESTAMP, - running_on = NULL, - wasm = NULL - WHERE id = $1", - task_id - ) - .execute(&shared.pool) - .await?; + let mut conn = shared.pool.acquire().await?; + shared.storage.mark_task_failed(&mut conn, task_id).await?; shared.metrics.task_failed.increment(1); shared.scheduler.notify(ScheduleEvent::TaskCompleted { @@ -1296,15 +1128,17 @@ impl Worker { // once. Compiling one is an expensive operation, so if let component = component .get_or_compute(|| async { - let record = sqlx::query!("SELECT wasm FROM durable.wasm WHERE id = $1", task.wasm) - .fetch_one(&shared.pool) + let mut conn = shared.pool.acquire().await.map_err(anyhow::Error::from)?; + let wasm = shared + .storage + .fetch_wasm_blob(&mut conn, task.wasm) .await .map_err(anyhow::Error::from)?; + drop(conn); // If an error occurs then we just allow ourselves to proceed anyway. let _permit = shared.compile_sema.acquire().await; - let wasm = record.wasm; let start = Instant::now(); let component = tokio::task::spawn_blocking({ let engine = engine.clone(); @@ -1328,21 +1162,13 @@ impl Worker { }) .await?; - let events = sqlx::query_as!( - RecordedEvent, - r#" - SELECT - index, - label, - value as "value!: Json>" - FROM durable.event - WHERE task_id = $1 - ORDER BY index ASC - LIMIT 1000 - "#, - task.id - ) - .fetch_all(&shared.pool) + let events = async { + let mut conn = shared.pool.acquire().await?; + shared + .storage + .fetch_recorded_events(&mut conn, task.id) + .await + } .await .unwrap_or_default(); @@ -1411,16 +1237,16 @@ impl Worker { logs.trim_end() ); - if let Err(e) = sqlx::query!( - "INSERT INTO durable.log(task_id, index, message) - VALUES ($1, $2, $3)", - task_id, - index, - logs - ) - .execute(&shared.pool) - .await - { + let result = async { + let mut conn = shared.pool.acquire().await?; + shared + .storage + .insert_log(&mut conn, task_id, index, &logs) + .await + } + .await; + + if let Err(e) = result { tracing::error!("failed to save remaining logs to the database: {e}"); } } @@ -1431,17 +1257,13 @@ impl Worker { tracing::warn!("task failed to execute with an error: {message}"); - let result = sqlx::query!( - "INSERT INTO durable.log(task_id, index, message) - VALUES ($1, $2, $3) - ON CONFLICT ON CONSTRAINT log_pkey DO UPDATE - SET message = $3 - ", - task_id, - LOG_ERROR_INDEX, - message - ) - .execute(&shared.pool) + let result = async { + let mut conn = shared.pool.acquire().await?; + shared + .storage + .upsert_log_error(&mut conn, task_id, LOG_ERROR_INDEX, &message) + .await + } .await; if let Err(e) = result {