From c55551a4c0c6f9083557abde903367ccae89b768 Mon Sep 17 00:00:00 2001 From: aloewright Date: Fri, 8 May 2026 08:02:45 -0700 Subject: [PATCH 1/2] feat(uploads): resume-on-disconnect + idempotent chunk retry (ALO-121) Adds GET /api/videos/upload/:uploadId/status so the client can re-derive which chunks the server already has after a disconnect, plus server-side idempotency: re-sending an already-accepted chunk acks without paying for a redundant R2 multipart part. Frontend now retries each chunk with exponential backoff and persists uploadId in localStorage so a refresh mid-upload picks up where it left off. --- package-lock.json | 102 ++++++------------------------ src/frontend/pages/Upload.tsx | 113 ++++++++++++++++++++++++++-------- src/workers/videos.test.ts | 75 ++++++++++++++++++++++ src/workers/videos.ts | 70 +++++++++++++++++++-- 4 files changed, 247 insertions(+), 113 deletions(-) diff --git a/package-lock.json b/package-lock.json index c7c119a..30b4900 100644 --- a/package-lock.json +++ b/package-lock.json @@ -51,7 +51,7 @@ "version": "7.27.1", "resolved": "https://registry.npmjs.org/@babel/helper-string-parser/-/helper-string-parser-7.27.1.tgz", "integrity": "sha512-qMlSxKbpRlAridDExk92nSobyDdpPijUq2DW6oDnUqd0iOGxmQjyqhMIihI9+zv4LPyZdRje2cavWPbCbWm3eA==", - "dev": true, + "devOptional": true, "license": "MIT", "engines": { "node": ">=6.9.0" @@ -61,7 +61,7 @@ "version": "7.28.5", "resolved": "https://registry.npmjs.org/@babel/helper-validator-identifier/-/helper-validator-identifier-7.28.5.tgz", "integrity": "sha512-qSs4ifwzKJSV39ucNjsvc6WVHs6b7S03sOh2OcHF9UHfVPqWWALUsNUVzhSBiItjRZoLHx7nIarVjqKVusUZ1Q==", - "dev": true, + "devOptional": true, "license": "MIT", "engines": { "node": ">=6.9.0" @@ -71,7 +71,7 @@ "version": "7.29.2", "resolved": "https://registry.npmjs.org/@babel/parser/-/parser-7.29.2.tgz", "integrity": "sha512-4GgRzy/+fsBa72/RZVJmGKPmZu9Byn8o4MoLpmNe1m8ZfYnz5emHLQz3U4gLud6Zwl0RZIcgiLD7Uq7ySFuDLA==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "@babel/types": "^7.29.0" @@ -96,7 +96,7 @@ "version": "7.29.0", "resolved": "https://registry.npmjs.org/@babel/types/-/types-7.29.0.tgz", "integrity": "sha512-LwdZHpScM4Qz8Xw2iKSzS+cfglZzJGvofQICy7W7v4caru4EaAmyUuO6BGrbyQ2mYV11W0U8j5mBhd14dd3B0A==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "@babel/helper-string-parser": "^7.27.1", @@ -163,7 +163,7 @@ "version": "1.0.2", "resolved": "https://registry.npmjs.org/@bcoe/v8-coverage/-/v8-coverage-1.0.2.tgz", "integrity": "sha512-6zABk/ECA/QYSCQ1NGiVwwbQerUCZ+TQbp64Q3AgmfNvurHH0j8TtXa1qbShXA6qqkpAj4V5W8pP6mLe1mcMqA==", - "dev": true, + "devOptional": true, "license": "MIT", "engines": { "node": ">=18" @@ -483,7 +483,6 @@ "version": "1.10.0", "resolved": "https://registry.npmjs.org/@emnapi/core/-/core-1.10.0.tgz", "integrity": "sha512-yq6OkJ4p82CAfPl0u9mQebQHKPJkY7WrIuk205cTYnYe+k2Z8YBh11FrbRG/H6ihirqcacOgl2BIO8oyMQLeXw==", - "dev": true, "license": "MIT", "optional": true, "dependencies": { @@ -495,7 +494,6 @@ "version": "2.8.1", "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", - "dev": true, "license": "0BSD", "optional": true }, @@ -503,7 +501,6 @@ "version": "1.10.0", "resolved": "https://registry.npmjs.org/@emnapi/runtime/-/runtime-1.10.0.tgz", "integrity": "sha512-ewvYlk86xUoGI0zQRNq/mC+16R1QeDlKQy21Ki3oSYXNgLb45GV1P6A0M+/s6nyCuNDqe5VpaY84BzXGwVbwFA==", - "dev": true, "license": "MIT", "optional": true, "dependencies": { @@ -514,7 +511,6 @@ "version": "2.8.1", "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", - "dev": true, "license": "0BSD", "optional": true }, @@ -522,7 +518,6 @@ "version": "1.2.1", "resolved": "https://registry.npmjs.org/@emnapi/wasi-threads/-/wasi-threads-1.2.1.tgz", "integrity": "sha512-uTII7OYF+/Mes/MrcIOYp5yOtSMLBWSIoLPpcgwipoiKbli6k322tcoFsxoIIxPDqW01SQGAgko4EzZi2BNv2w==", - "dev": true, "license": "MIT", "optional": true, "dependencies": { @@ -533,7 +528,6 @@ "version": "2.8.1", "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", - "dev": true, "license": "0BSD", "optional": true }, @@ -544,7 +538,6 @@ "cpu": [ "ppc64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -561,7 +554,6 @@ "cpu": [ "arm" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -578,7 +570,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -595,7 +586,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -612,7 +602,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -629,7 +618,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -646,7 +634,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -663,7 +650,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -680,7 +666,6 @@ "cpu": [ "arm" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -697,7 +682,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -714,7 +698,6 @@ "cpu": [ "ia32" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -731,7 +714,6 @@ "cpu": [ "loong64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -748,7 +730,6 @@ "cpu": [ "mips64el" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -765,7 +746,6 @@ "cpu": [ "ppc64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -782,7 +762,6 @@ "cpu": [ "riscv64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -799,7 +778,6 @@ "cpu": [ "s390x" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -816,7 +794,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -850,7 +827,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -884,7 +860,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -918,7 +893,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -935,7 +909,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -952,7 +925,6 @@ "cpu": [ "ia32" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -969,7 +941,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -1520,7 +1491,7 @@ "version": "3.1.2", "resolved": "https://registry.npmjs.org/@jridgewell/resolve-uri/-/resolve-uri-3.1.2.tgz", "integrity": "sha512-bRISgCIjP20/tbWSPWMEi54QVPRZExkuD9lJL+UIxUKtwVJA8wW1Trb1jMs1RFXo1CBTNZ/5hpC9QvmKWdopKw==", - "dev": true, + "devOptional": true, "license": "MIT", "engines": { "node": ">=6.0.0" @@ -1537,7 +1508,7 @@ "version": "0.3.31", "resolved": "https://registry.npmjs.org/@jridgewell/trace-mapping/-/trace-mapping-0.3.31.tgz", "integrity": "sha512-zzNR+SdQSDJzc8joaeP8QQoCQr8NuYx2dIIytl1QeBEZHJ9uW6hebsrYgbz8hJwUQao3TWCMtmfV8Nu1twOLAw==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "@jridgewell/resolve-uri": "^3.1.0", @@ -1548,7 +1519,6 @@ "version": "1.1.4", "resolved": "https://registry.npmjs.org/@napi-rs/wasm-runtime/-/wasm-runtime-1.1.4.tgz", "integrity": "sha512-3NQNNgA1YSlJb/kMH1ildASP9HW7/7kYnRI2szWJaofaS1hWmbGI4H+d3+22aGzXXN9IJ+n+GiFVcGipJP18ow==", - "dev": true, "license": "MIT", "optional": true, "dependencies": { @@ -2333,7 +2303,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2350,7 +2319,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2367,7 +2335,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2384,7 +2351,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2401,7 +2367,6 @@ "cpu": [ "arm" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2418,7 +2383,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2435,7 +2399,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2452,7 +2415,6 @@ "cpu": [ "ppc64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2469,7 +2431,6 @@ "cpu": [ "s390x" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2486,7 +2447,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2503,7 +2463,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2520,7 +2479,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2537,7 +2495,6 @@ "cpu": [ "wasm32" ], - "dev": true, "license": "MIT", "optional": true, "dependencies": { @@ -2556,7 +2513,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2573,7 +2529,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2841,7 +2796,6 @@ "version": "0.10.1", "resolved": "https://registry.npmjs.org/@tybys/wasm-util/-/wasm-util-0.10.1.tgz", "integrity": "sha512-9tTaPJLSiejZKx+Bmog4uSubteqTvFrVrURwkmHixBo0G4seD0zUxp98E1DzUBJxLQ3NPwXrGKDiVjwx/DpPsg==", - "dev": true, "license": "MIT", "optional": true, "dependencies": { @@ -2852,7 +2806,6 @@ "version": "2.8.1", "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", - "dev": true, "license": "0BSD", "optional": true }, @@ -2992,7 +2945,7 @@ "version": "4.1.5", "resolved": "https://registry.npmjs.org/@vitest/coverage-v8/-/coverage-v8-4.1.5.tgz", "integrity": "sha512-38C0/Ddb7HcRG0Z4/DUem8x57d2p9jYgp18mkaYswEOQBGsI1CG4f/hjm0ZCeaJfWhSZ4k7jgs29V1Zom7Ki9A==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "@bcoe/v8-coverage": "^1.0.2", @@ -3172,7 +3125,7 @@ "version": "1.0.0", "resolved": "https://registry.npmjs.org/ast-v8-to-istanbul/-/ast-v8-to-istanbul-1.0.0.tgz", "integrity": "sha512-1fSfIwuDICFA4LKkCzRPO7F0hzFf0B7+Xqrl27ynQaa+Rh0e1Es0v6kWHPott3lU10AyAr7oKHa65OppjLn3Rg==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "@jridgewell/trace-mapping": "^0.3.31", @@ -3184,7 +3137,7 @@ "version": "10.0.0", "resolved": "https://registry.npmjs.org/js-tokens/-/js-tokens-10.0.0.tgz", "integrity": "sha512-lM/UBzQmfJRo9ABXbPWemivdCW8V2G8FHaHdypQaIy523snUjog0W71ayWXTjiR+ixeMyVHN2XcpnTd/liPg/Q==", - "dev": true, + "devOptional": true, "license": "MIT" }, "node_modules/better-auth": { @@ -3685,7 +3638,7 @@ "version": "0.28.0", "resolved": "https://registry.npmjs.org/esbuild/-/esbuild-0.28.0.tgz", "integrity": "sha512-sNR9MHpXSUV/XB4zmsFKN+QgVG82Cc7+/aaxJ8Adi8hyOac+EXptIp45QBPaVyX3N70664wRbTcLTOemCAnyqw==", - "dev": true, + "devOptional": true, "hasInstallScript": true, "license": "MIT", "bin": { @@ -3730,7 +3683,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -3747,7 +3699,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -3764,7 +3715,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -3865,7 +3815,6 @@ "version": "2.3.3", "resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.3.tgz", "integrity": "sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw==", - "dev": true, "hasInstallScript": true, "license": "MIT", "optional": true, @@ -3948,7 +3897,7 @@ "version": "4.0.0", "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz", "integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==", - "dev": true, + "devOptional": true, "license": "MIT", "engines": { "node": ">=8" @@ -4033,7 +3982,7 @@ "version": "2.0.2", "resolved": "https://registry.npmjs.org/html-escaper/-/html-escaper-2.0.2.tgz", "integrity": "sha512-H2iMtd0I4Mt5eYiapRdIDjp+XzelXQ0tFE4JS7YFwFevXXMmOp9myNrUvCg0D6ws8iqkRPBfKHgbwig1SmlLfg==", - "dev": true, + "devOptional": true, "license": "MIT" }, "node_modules/html-void-elements": { @@ -4060,7 +4009,7 @@ "version": "3.2.2", "resolved": "https://registry.npmjs.org/istanbul-lib-coverage/-/istanbul-lib-coverage-3.2.2.tgz", "integrity": "sha512-O8dpsF+r0WV/8MNRKfnmrtCWhuKjxrq2w+jpzBL5UZKTi2LeVWnWOmWRxFlesJONmc+wLAGvKQZEOanko0LFTg==", - "dev": true, + "devOptional": true, "license": "BSD-3-Clause", "engines": { "node": ">=8" @@ -4070,7 +4019,7 @@ "version": "3.0.1", "resolved": "https://registry.npmjs.org/istanbul-lib-report/-/istanbul-lib-report-3.0.1.tgz", "integrity": "sha512-GCfE1mtsHGOELCU8e/Z7YWzpmybrx/+dSTfLrvY8qRmaY6zXTKWn6WQIjaAFw069icm6GVMNkgu0NzI4iPZUNw==", - "dev": true, + "devOptional": true, "license": "BSD-3-Clause", "dependencies": { "istanbul-lib-coverage": "^3.0.0", @@ -4085,7 +4034,7 @@ "version": "3.2.0", "resolved": "https://registry.npmjs.org/istanbul-reports/-/istanbul-reports-3.2.0.tgz", "integrity": "sha512-HGYWWS/ehqTV3xN10i23tkPkpH46MLCIMFNCaaKNavAXTF1RkqxawEPtnjnGZ6XKSInBKkiOA5BKS+aZiY3AvA==", - "dev": true, + "devOptional": true, "license": "BSD-3-Clause", "dependencies": { "html-escaper": "^2.0.0", @@ -4166,7 +4115,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -4187,7 +4135,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -4208,7 +4155,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -4229,7 +4175,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -4250,7 +4195,6 @@ "cpu": [ "arm" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -4271,7 +4215,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -4292,7 +4235,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -4313,7 +4255,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -4334,7 +4275,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -4355,7 +4295,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -4376,7 +4315,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MPL-2.0", "optional": true, "os": [ @@ -4422,7 +4360,7 @@ "version": "0.5.2", "resolved": "https://registry.npmjs.org/magicast/-/magicast-0.5.2.tgz", "integrity": "sha512-E3ZJh4J3S9KfwdjZhe2afj6R9lGIN5Pher1pF39UGrXRqq/VDaGVIGN13BjHd2u8B61hArAGOnso7nBOouW3TQ==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "@babel/parser": "^7.29.0", @@ -4434,7 +4372,7 @@ "version": "4.0.0", "resolved": "https://registry.npmjs.org/make-dir/-/make-dir-4.0.0.tgz", "integrity": "sha512-hXdUTZYIVOt1Ex//jAQi+wTZZpUpwBj/0QsOzqegb3rGMMeJiSEu5xLHnYfBrRV4RH2+OCSOO95Is/7x1WJ4bw==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "semver": "^7.5.3" @@ -5146,7 +5084,7 @@ "version": "7.7.4", "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.4.tgz", "integrity": "sha512-vFKC2IEtQnVhpT78h1Yp8wzwrf8CM+MzKMHGJZfBtzhZNycRFnXsHk6E5TxIkkMsgNS7mdX3AGB7x2QM2di4lA==", - "dev": true, + "devOptional": true, "license": "ISC", "bin": { "semver": "bin/semver.js" @@ -5410,7 +5348,7 @@ "version": "7.2.0", "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz", "integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "has-flag": "^4.0.0" diff --git a/src/frontend/pages/Upload.tsx b/src/frontend/pages/Upload.tsx index 126c066..141729f 100644 --- a/src/frontend/pages/Upload.tsx +++ b/src/frontend/pages/Upload.tsx @@ -25,6 +25,26 @@ function isAcceptedVideo(file: File): boolean { return ALLOWED_EXTENSIONS.has(file.name.slice(dot + 1).toLowerCase()); } +function resumeKey(file: File): string { + return `spooool:upload:${file.name}:${file.size}:${file.lastModified}`; +} + +async function fetchResumeState( + uploadId: string, +): Promise<{ received: number[]; chunkCount: number } | null> { + try { + const res = await fetch(`/api/videos/upload/${encodeURIComponent(uploadId)}/status`, { + credentials: 'same-origin', + }); + if (!res.ok) return null; + const data = (await res.json()) as { received?: number[]; chunkCount?: number }; + if (typeof data.chunkCount !== 'number' || !Array.isArray(data.received)) return null; + return { received: data.received, chunkCount: data.chunkCount }; + } catch { + return null; + } +} + async function uploadInChunks( file: File, title: string, @@ -34,50 +54,93 @@ async function uploadInChunks( const chunkCount = Math.ceil(file.size / CHUNK_SIZE); let lastResponse: Response | null = null; let uploadId: string | null = null; + const received = new Set(); + + // ALO-121: resume across page reloads / disconnects. Persist uploadId + // keyed by file identity so a refresh continues where the last attempt + // left off; the server is the source of truth for which chunks landed. + const persistKey = resumeKey(file); + const stored = typeof localStorage !== 'undefined' ? localStorage.getItem(persistKey) : null; + if (stored) { + const state = await fetchResumeState(stored); + if (state && state.chunkCount === chunkCount) { + uploadId = stored; + for (const i of state.received) received.add(i); + if (received.size > 0) { + onProgress(Math.round((received.size / chunkCount) * 100)); + } + } else { + try { localStorage.removeItem(persistKey); } catch { /* private mode */ } + } + } for (let index = 0; index < chunkCount; index += 1) { + if (received.has(index) && index < chunkCount - 1) { + // Final chunk must still be sent so the server can run completion; + // intermediate chunks the server already has can be skipped. + onProgress(Math.round(((index + 1) / chunkCount) * 100)); + continue; + } const start = index * CHUNK_SIZE; const end = Math.min(start + CHUNK_SIZE, file.size); - // Pass file.type so the resulting Blob keeps the parent's MIME — without it - // the chunk's type is '' and the multipart part is sent as - // application/octet-stream, which the upload validator then rejects. const chunk = file.slice(start, end, file.type); - const formData = new FormData(); - formData.set('title', title); - formData.set('description', description); - formData.set('file', chunk, file.name); - formData.set('chunkIndex', String(index)); - formData.set('chunkCount', String(chunkCount)); - if (uploadId) { - formData.set('uploadId', uploadId); - } - lastResponse = await fetch('/api/videos/upload', { - method: 'POST', - body: formData, - }); + let attempt = 0; + const maxAttempts = 4; + // Per-chunk retry with exponential backoff. Network blips and 5xx are + // transient; the server is idempotent on chunk re-submission so retries + // are safe. + while (true) { + const formData = new FormData(); + formData.set('title', title); + formData.set('description', description); + formData.set('file', chunk, file.name); + formData.set('chunkIndex', String(index)); + formData.set('chunkCount', String(chunkCount)); + if (uploadId) formData.set('uploadId', uploadId); - if (!lastResponse.ok) { - const body = await lastResponse.text(); + let res: Response; + try { + res = await fetch('/api/videos/upload', { method: 'POST', body: formData }); + } catch (err) { + if (++attempt >= maxAttempts) throw err; + await new Promise((r) => setTimeout(r, 500 * 2 ** (attempt - 1))); + continue; + } + + if (res.ok) { + lastResponse = res; + const responseData = (await res.json()) as { uploadId?: string }; + if (responseData.uploadId) { + uploadId = responseData.uploadId; + try { localStorage.setItem(persistKey, uploadId); } catch { /* private mode */ } + } + received.add(index); + break; + } + + if (res.status >= 500 && ++attempt < maxAttempts) { + await new Promise((r) => setTimeout(r, 500 * 2 ** (attempt - 1))); + continue; + } + + const body = await res.text(); let detail = body; try { const parsed = JSON.parse(body) as { error?: string; code?: string }; detail = parsed.error ?? body; if (parsed.code) detail = `${detail} (${parsed.code})`; } catch { - // Non-JSON response — keep raw text. + /* non-JSON */ } - throw new Error(`Upload failed (${lastResponse.status}): ${detail.slice(0, 300)}`); - } - - const responseData = (await lastResponse.json()) as { uploadId?: string }; - if (responseData.uploadId) { - uploadId = responseData.uploadId; + throw new Error(`Upload failed (${res.status}): ${detail.slice(0, 300)}`); } onProgress(Math.round(((index + 1) / chunkCount) * 100)); } + try { localStorage.removeItem(persistKey); } catch { /* private mode */ } + if (!lastResponse) { throw new Error('No upload response'); } diff --git a/src/workers/videos.test.ts b/src/workers/videos.test.ts index 5b6be91..182aba0 100644 --- a/src/workers/videos.test.ts +++ b/src/workers/videos.test.ts @@ -141,3 +141,78 @@ describe('upload storage-quota gate', () => { expect(res.status).toBe(201); }); }); + +// ALO-121: resume-on-disconnect endpoint surfaces server-known chunks. +describe('upload status endpoint', () => { + function envWithSessions(store: Record): VideoRoutesEnv { + const sessions = { + async get(key: string) { + return store[key] ?? null; + }, + async put() {}, + async delete() {}, + } as unknown as KVNamespace; + return { + DB: {} as unknown as D1Database, + VIDEOS: {} as unknown as R2Bucket, + CACHE: {} as unknown as KVNamespace, + SESSIONS: sessions, + VIDEO_ENCODING: { send: async () => {} } as unknown as Queue, + }; + } + + it('returns 401 when unauthenticated', async () => { + const fetcher = mountWithUser(envWithSessions({}), null); + const res = await fetcher('/api/videos/upload/abc/status'); + expect(res.status).toBe(401); + }); + + it('returns 404 when no session exists for the user/uploadId pair', async () => { + const fetcher = mountWithUser(envWithSessions({}), { + id: 'u1', email: 'a@b.com', name: 'A', emailVerified: true, + }); + const res = await fetcher('/api/videos/upload/missing/status'); + expect(res.status).toBe(404); + }); + + it('reports received chunk indexes (0-based) from stored parts', async () => { + const base = 'upload:u1:abc'; + const store: Record = { + [`${base}:mpid`]: 'mpid-1', + [`${base}:meta`]: JSON.stringify({ + videoId: 'v1', r2Key: 'u1/v1/clip.mp4', title: 't', description: 'd', chunkCount: 3, + }), + [`${base}:parts`]: JSON.stringify({ + '1': { etag: 'e1', size: 10 }, + '3': { etag: 'e3', size: 7 }, + }), + }; + const fetcher = mountWithUser(envWithSessions(store), { + id: 'u1', email: 'a@b.com', name: 'A', emailVerified: true, + }); + const res = await fetcher('/api/videos/upload/abc/status'); + expect(res.status).toBe(200); + const body = (await res.json()) as { + uploadId: string; chunkCount: number; received: number[]; bytesReceived: number; + }; + expect(body.uploadId).toBe('abc'); + expect(body.chunkCount).toBe(3); + expect(body.received).toEqual([0, 2]); + expect(body.bytesReceived).toBe(17); + }); + + it('scopes upload sessions per user (cannot read another user\'s upload)', async () => { + const base = 'upload:other:abc'; + const store: Record = { + [`${base}:mpid`]: 'mpid-1', + [`${base}:meta`]: JSON.stringify({ + videoId: 'v1', r2Key: 'other/v1/clip.mp4', title: 't', description: 'd', chunkCount: 1, + }), + }; + const fetcher = mountWithUser(envWithSessions(store), { + id: 'u1', email: 'a@b.com', name: 'A', emailVerified: true, + }); + const res = await fetcher('/api/videos/upload/abc/status'); + expect(res.status).toBe(404); + }); +}); diff --git a/src/workers/videos.ts b/src/workers/videos.ts index 902f2cd..eb9f17b 100644 --- a/src/workers/videos.ts +++ b/src/workers/videos.ts @@ -80,11 +80,51 @@ const uploadMetaPersistedSchema = z.object({ chunkCount: z.number().int().positive(), }); +function uploadKvBase(userId: string, uploadId: string): string { + return `upload:${userId}:${uploadId}`; +} + export const videoRoutes = new Hono<{ Bindings: VideoRoutesEnv; Variables: VideoRoutesVariables; }>(); +// ALO-121: resume-on-disconnect. Lets the client re-derive which chunks the +// server already has after a network failure, so we don't re-upload bytes +// the multipart already accepted. Auth-gated by user-scoped KV key. +videoRoutes.get('/api/videos/upload/:uploadId/status', async (c) => { + const user = c.get('user'); + if (!user) return c.json({ error: 'Unauthorized' }, 401); + + const uploadId = c.req.param('uploadId'); + const baseKvKey = uploadKvBase(user.id, uploadId); + const [metaJson, partsJson, mpid] = await Promise.all([ + c.env.SESSIONS.get(`${baseKvKey}:meta`), + c.env.SESSIONS.get(`${baseKvKey}:parts`), + c.env.SESSIONS.get(`${baseKvKey}:mpid`), + ]); + if (!metaJson || !mpid) { + return c.json({ error: 'Upload session not found', code: 'upload_not_found' }, 404); + } + const meta = uploadMetaPersistedSchema.parse(JSON.parse(metaJson)); + const partsMap = partsJson + ? (JSON.parse(partsJson) as Record) + : {}; + // Stored part numbers are 1-indexed; client uses 0-indexed chunk indexes. + const received = Object.keys(partsMap) + .map((p) => Number(p) - 1) + .filter((i) => Number.isInteger(i) && i >= 0) + .sort((a, b) => a - b); + const bytesReceived = Object.values(partsMap).reduce((sum, p) => sum + p.size, 0); + return c.json({ + uploadId, + chunkCount: meta.chunkCount, + received, + bytesReceived, + complete: false, + }); +}); + videoRoutes.get('/api/videos/trending', async (c) => { const parsed = trendingQuerySchema.safeParse(c.req.query()); if (!parsed.success) { @@ -432,7 +472,7 @@ videoRoutes.post('/api/videos/upload', async (c) => { const resolvedUploadId = uploadId ?? crypto.randomUUID(); - const baseKvKey = `upload:${user.id}:${resolvedUploadId}`; + const baseKvKey = uploadKvBase(user.id, resolvedUploadId); const mpidKey = `${baseKvKey}:mpid`; const metaKey = `${baseKvKey}:meta`; const partsKey = `${baseKvKey}:parts`; @@ -485,6 +525,20 @@ videoRoutes.post('/api/videos/upload', async (c) => { ? (JSON.parse(partsJson) as Record) : {}; + // ALO-121: idempotent retry. If the client retries a chunk we already + // accepted (typical on flaky connections where the response was lost), + // ack without re-paying the multipart upload cost. + const partKey = String(chunkIndex + 1); + if (uploadedPartsMap[partKey]) { + if (chunkIndex < chunkCount - 1) { + return c.json( + { status: 'chunk_received', chunkIndex, chunkCount, idempotent: true }, + 202, + ); + } + // Final chunk re-send: fall through so completion logic still runs. + } + const priorBytes = Object.values(uploadedPartsMap).reduce((sum, part) => sum + part.size, 0); if (priorBytes + rawFile.size > MAX_VIDEO_BYTES) { return c.json( @@ -494,10 +548,14 @@ videoRoutes.post('/api/videos/upload', async (c) => { } const multipart = env.VIDEOS.resumeMultipartUpload(uploadMeta.r2Key, multipartUploadId); - const uploadedPart = await multipart.uploadPart(chunkIndex + 1, rawFile.stream()); - - uploadedPartsMap[String(chunkIndex + 1)] = { etag: uploadedPart.etag, size: rawFile.size }; - await env.SESSIONS.put(partsKey, JSON.stringify(uploadedPartsMap), { expirationTtl: 86400 }); + const alreadyHavePart = Boolean(uploadedPartsMap[partKey]); + if (!alreadyHavePart) { + const uploadedPart = await multipart.uploadPart(chunkIndex + 1, rawFile.stream()); + uploadedPartsMap[partKey] = { etag: uploadedPart.etag, size: rawFile.size }; + await env.SESSIONS.put(partsKey, JSON.stringify(uploadedPartsMap), { + expirationTtl: 86400, + }); + } if (chunkIndex < chunkCount - 1) { return c.json({ status: 'chunk_received', chunkIndex, chunkCount }, 202); @@ -514,7 +572,7 @@ videoRoutes.post('/api/videos/upload', async (c) => { // ALO-139: authoritative quota check at completion. Catches the case // where the first-chunk precheck passed but a parallel upload (or a // very large total via many small chunks) would push the user over. - const totalBytes = priorBytes + rawFile.size; + const totalBytes = Object.values(uploadedPartsMap).reduce((sum, p) => sum + p.size, 0); const finalUsage = await getStorageUsage(env, user.id); if (!hasRoomFor(finalUsage, totalBytes)) { await multipart.abort().catch(() => {}); From b77cf2cb3c75940883f7022bb9c9a6ca84ee437a Mon Sep 17 00:00:00 2001 From: aloewright Date: Sat, 9 May 2026 00:22:21 -0700 Subject: [PATCH 2/2] feat(uploads): chunked upload resume + retry hardening (ALO-121) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lets a creator pause/disconnect mid-upload and pick up where they left off without re-sending bytes the server already has. Hardens chunk-0 retry so a swallowed 202 doesn't leak a parallel multipart in R2. Backend: - GET /api/videos/upload/:uploadId/status — uploadedChunks + chunkCount - DELETE /api/videos/upload/:uploadId — abort R2 multipart + clear KV - chunk-0 retry now reuses the existing R2 multipart instead of creating a duplicate when the same uploadId comes back - Session state extracted to src/workers/upload-session.ts (read/abort/ delete helpers + zod-validated meta with optional fileName/fileSize fingerprint) Frontend: - src/frontend/lib/upload-resume.ts — pure, testable upload pipeline with exponential-backoff retry on 5xx/408/429/network errors, online/offline awareness, AbortController support, and localStorage persistence (24h TTL aligned with KV) - Upload.tsx wires the new module: resume banner when a saved session matches the picked file's name/size/lastModified fingerprint, status chip ("Retrying…", "Waiting for connection…"), and a Cancel button that aborts the multipart server-side Tests: 60+ new unit tests covering resume helpers, status/cancel routes, chunk retry classification, backoff jitter, and offline-wait sequencing. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 --- src/frontend/lib/upload-resume.test.ts | 460 +++++++++++++++++++++++++ src/frontend/lib/upload-resume.ts | 417 ++++++++++++++++++++++ src/frontend/pages/Upload.tsx | 317 +++++++++-------- src/workers/upload-session.test.ts | 241 +++++++++++++ src/workers/upload-session.ts | 164 +++++++++ src/workers/videos.test.ts | 233 ++++++++++--- src/workers/videos.ts | 213 ++++++------ 7 files changed, 1747 insertions(+), 298 deletions(-) create mode 100644 src/frontend/lib/upload-resume.test.ts create mode 100644 src/frontend/lib/upload-resume.ts create mode 100644 src/workers/upload-session.test.ts create mode 100644 src/workers/upload-session.ts diff --git a/src/frontend/lib/upload-resume.test.ts b/src/frontend/lib/upload-resume.test.ts new file mode 100644 index 0000000..adedfe8 --- /dev/null +++ b/src/frontend/lib/upload-resume.test.ts @@ -0,0 +1,460 @@ +import { describe, expect, it, vi } from 'vitest'; +import { + backoffMs, + cancelUpload, + classifyChunkResponse, + chunkCountFor, + clearResumeRecord, + fetchUploadStatus, + fingerprintFile, + fingerprintsMatch, + loadResumeRecord, + RESUME_RECORD_TTL_MS, + saveResumeRecord, + uploadFileInChunks, + type FileSlice, + type ResumeRecord, +} from './upload-resume'; + +class MemoryStorage { + private store = new Map(); + getItem(key: string): string | null { + return this.store.get(key) ?? null; + } + setItem(key: string, value: string): void { + this.store.set(key, value); + } + removeItem(key: string): void { + this.store.delete(key); + } + clear(): void { + this.store.clear(); + } +} + +function syntheticFile(size: number, name = 'clip.mp4', type = 'video/mp4'): FileSlice { + return { + size, + type, + name, + slice(start: number, end: number) { + const len = Math.max(0, end - start); + return new Blob([new Uint8Array(len)], { type }); + }, + }; +} + +describe('chunkCountFor', () => { + it('returns 1 for empty/zero-sized files', () => { + expect(chunkCountFor(0)).toBe(1); + }); + + it('rounds up to the next chunk', () => { + expect(chunkCountFor(1, 10)).toBe(1); + expect(chunkCountFor(10, 10)).toBe(1); + expect(chunkCountFor(11, 10)).toBe(2); + expect(chunkCountFor(25, 10)).toBe(3); + }); +}); + +describe('classifyChunkResponse', () => { + it('treats network errors and 5xx as retryable', () => { + expect(classifyChunkResponse(0)).toBe('retry'); + expect(classifyChunkResponse(500)).toBe('retry'); + expect(classifyChunkResponse(503)).toBe('retry'); + }); + + it('treats 408 / 429 as retryable', () => { + expect(classifyChunkResponse(408)).toBe('retry'); + expect(classifyChunkResponse(429)).toBe('retry'); + }); + + it('treats 4xx (other than 408/429) as fatal — re-uploading wont fix a 400', () => { + expect(classifyChunkResponse(400)).toBe('fail'); + expect(classifyChunkResponse(401)).toBe('fail'); + expect(classifyChunkResponse(403)).toBe('fail'); + expect(classifyChunkResponse(413)).toBe('fail'); + }); + + it('2xx is not classified as retry (caller short-circuits before reaching here)', () => { + expect(classifyChunkResponse(200)).toBe('fail'); + }); +}); + +describe('backoffMs', () => { + it('caps exponentially with jitter', () => { + const max = 10_000; + for (let i = 0; i < 8; i += 1) { + const v = backoffMs(i, 100, max); + expect(v).toBeGreaterThanOrEqual(0); + expect(v).toBeLessThanOrEqual(max); + } + }); +}); + +describe('fingerprint', () => { + it('matches identical fingerprints', () => { + const a = fingerprintFile({ name: 'a.mp4', size: 10, lastModified: 5 } as File); + const b = fingerprintFile({ name: 'a.mp4', size: 10, lastModified: 5 } as File); + expect(fingerprintsMatch(a, b)).toBe(true); + }); + + it('rejects when lastModified differs', () => { + const a = fingerprintFile({ name: 'a.mp4', size: 10, lastModified: 5 } as File); + const b = fingerprintFile({ name: 'a.mp4', size: 10, lastModified: 6 } as File); + expect(fingerprintsMatch(a, b)).toBe(false); + }); +}); + +describe('resume record persistence', () => { + function recordAt(t: number): ResumeRecord { + return { + uploadId: 'up-1', + chunkCount: 5, + title: 'hi', + description: '', + fingerprint: { name: 'a.mp4', size: 100, lastModified: 1 }, + createdAt: t, + }; + } + + it('round-trips a fresh record', () => { + const storage = new MemoryStorage(); + saveResumeRecord(recordAt(1000), storage); + const loaded = loadResumeRecord(storage, 1500); + expect(loaded?.uploadId).toBe('up-1'); + expect(loaded?.chunkCount).toBe(5); + }); + + it('returns null when stored record is older than TTL', () => { + const storage = new MemoryStorage(); + saveResumeRecord(recordAt(0), storage); + const loaded = loadResumeRecord(storage, RESUME_RECORD_TTL_MS + 1); + expect(loaded).toBeNull(); + }); + + it('clearResumeRecord removes the entry', () => { + const storage = new MemoryStorage(); + saveResumeRecord(recordAt(0), storage); + clearResumeRecord(storage); + expect(loadResumeRecord(storage, 0)).toBeNull(); + }); + + it('returns null when stored payload is malformed', () => { + const storage = new MemoryStorage(); + storage.setItem('spooool.upload.resume.v1', '{not-json'); + expect(loadResumeRecord(storage, 0)).toBeNull(); + }); + + it('returns null when stored payload is missing required fields', () => { + const storage = new MemoryStorage(); + storage.setItem( + 'spooool.upload.resume.v1', + JSON.stringify({ uploadId: 'x' }), + ); + expect(loadResumeRecord(storage, 0)).toBeNull(); + }); +}); + +describe('uploadFileInChunks', () => { + it('uploads each chunk in order, captures uploadId, reports progress', async () => { + const calls: Array<{ chunkIndex: string; uploadId: string | null }> = []; + const fetchImpl: typeof fetch = vi.fn(async (_url, init) => { + const fd = (init as RequestInit).body as FormData; + const chunkIndex = fd.get('chunkIndex') as string; + const uploadIdSent = fd.get('uploadId') as string | null; + calls.push({ chunkIndex, uploadId: uploadIdSent }); + const total = Number(fd.get('chunkCount')); + const idx = Number(chunkIndex); + if (idx === 0) { + return new Response( + JSON.stringify({ status: 'chunk_received', chunkIndex: 0, chunkCount: total, uploadId: 'up-server' }), + { status: 202 }, + ); + } + if (idx === total - 1) { + return new Response(JSON.stringify({ id: 'video-final', status: 'queued' }), { + status: 201, + }); + } + return new Response( + JSON.stringify({ status: 'chunk_received', chunkIndex: idx, chunkCount: total }), + { status: 202 }, + ); + }) as unknown as typeof fetch; + + const progressFractions: number[] = []; + const statuses: string[] = []; + let captured: string | null = null; + const result = await uploadFileInChunks( + syntheticFile(25), + { title: 't', description: '' }, + { + fetchImpl, + chunkSize: 10, + delay: async () => {}, + }, + { + onProgress: (p) => progressFractions.push(p.fraction), + onStatus: (s) => statuses.push(s), + onUploadId: (id) => { + captured = id; + }, + }, + ); + + expect(result).toEqual({ videoId: 'video-final', status: 'queued' }); + expect(calls.map((c) => c.chunkIndex)).toEqual(['0', '1', '2']); + expect(calls[0].uploadId).toBeNull(); + expect(calls[1].uploadId).toBe('up-server'); + expect(captured).toBe('up-server'); + // Initial 0/3, then 1/3, 2/3, 3/3. + expect(progressFractions[0]).toBe(0); + expect(progressFractions[progressFractions.length - 1]).toBeCloseTo(1, 5); + expect(statuses).toContain('uploading'); + }); + + it('skips already-uploaded chunks via skipChunks (resume path)', async () => { + const calls: string[] = []; + const fetchImpl: typeof fetch = vi.fn(async (_url, init) => { + const fd = (init as RequestInit).body as FormData; + const idx = Number(fd.get('chunkIndex')); + const total = Number(fd.get('chunkCount')); + calls.push(String(idx)); + if (idx === total - 1) { + return new Response(JSON.stringify({ id: 'v-resumed', status: 'queued' }), { + status: 201, + }); + } + return new Response(JSON.stringify({ status: 'chunk_received' }), { status: 202 }); + }) as unknown as typeof fetch; + + const result = await uploadFileInChunks( + syntheticFile(25), + { title: 't', description: '' }, + { + fetchImpl, + chunkSize: 10, + delay: async () => {}, + uploadId: 'up-resume', + skipChunks: new Set([0, 1]), + }, + ); + + expect(calls).toEqual(['2']); + expect(result.videoId).toBe('v-resumed'); + }); + + it('retries a chunk on 503 with backoff, then succeeds', async () => { + let count = 0; + const fetchImpl: typeof fetch = vi.fn(async () => { + count += 1; + if (count <= 2) { + return new Response('{"error":"unavailable"}', { status: 503 }); + } + return new Response(JSON.stringify({ id: 'v-ok', status: 'queued' }), { status: 201 }); + }) as unknown as typeof fetch; + + const delays: number[] = []; + const result = await uploadFileInChunks( + syntheticFile(5), + { title: 't', description: '' }, + { + fetchImpl, + chunkSize: 10, + delay: async (ms) => { + delays.push(ms); + }, + }, + ); + + expect(result.videoId).toBe('v-ok'); + expect(count).toBe(3); + expect(delays.length).toBe(2); // 2 retries between 3 attempts + }); + + it('retries network errors (fetch throws) as transient', async () => { + let count = 0; + const fetchImpl: typeof fetch = vi.fn(async () => { + count += 1; + if (count === 1) throw new TypeError('Network error'); + return new Response(JSON.stringify({ id: 'v-net', status: 'queued' }), { status: 201 }); + }) as unknown as typeof fetch; + + const result = await uploadFileInChunks( + syntheticFile(5), + { title: 't', description: '' }, + { + fetchImpl, + chunkSize: 10, + delay: async () => {}, + }, + ); + + expect(result.videoId).toBe('v-net'); + expect(count).toBe(2); + }); + + it('stops retrying after maxRetries on persistent 5xx', async () => { + const fetchImpl: typeof fetch = vi.fn( + async () => new Response('{"error":"down"}', { status: 503 }), + ) as unknown as typeof fetch; + + await expect( + uploadFileInChunks( + syntheticFile(5), + { title: 't', description: '' }, + { + fetchImpl, + chunkSize: 10, + delay: async () => {}, + maxRetries: 2, + }, + ), + ).rejects.toThrow(/Upload failed \(503\)/); + + expect(fetchImpl).toHaveBeenCalledTimes(3); // initial + 2 retries + }); + + it('does not retry a 4xx — those are deterministic', async () => { + const fetchImpl: typeof fetch = vi.fn( + async () => + new Response( + JSON.stringify({ error: 'Storage quota exceeded.', code: 'storage_quota_exceeded' }), + { status: 413 }, + ), + ) as unknown as typeof fetch; + + await expect( + uploadFileInChunks( + syntheticFile(5), + { title: 't', description: '' }, + { fetchImpl, chunkSize: 10, delay: async () => {} }, + ), + ).rejects.toThrow(/Storage quota exceeded/); + + expect(fetchImpl).toHaveBeenCalledTimes(1); + }); + + it('waits for online before retrying when offline', async () => { + const fetchImpl: typeof fetch = vi.fn( + async () => new Response(JSON.stringify({ id: 'v', status: 'queued' }), { status: 201 }), + ) as unknown as typeof fetch; + + let nowOffline = true; + let resolveOnline = (): void => {}; + const waitForOnlinePromise = new Promise((resolve) => { + resolveOnline = () => { + nowOffline = false; + resolve(); + }; + }); + + const statuses: string[] = []; + const promise = uploadFileInChunks( + syntheticFile(5), + { title: 't', description: '' }, + { + fetchImpl, + chunkSize: 10, + delay: async () => {}, + isOffline: () => nowOffline, + waitForOnline: () => waitForOnlinePromise, + }, + { + onStatus: (s) => statuses.push(s), + }, + ); + + // Allow the upload to enter the offline-wait branch. + await new Promise((r) => setTimeout(r, 5)); + expect(statuses).toContain('offline'); + expect(fetchImpl).not.toHaveBeenCalled(); + + resolveOnline(); + const result = await promise; + expect(result.videoId).toBe('v'); + expect(fetchImpl).toHaveBeenCalledTimes(1); + }); + + it('throws UploadAbortedError when signal is aborted before next chunk', async () => { + const controller = new AbortController(); + const fetchImpl: typeof fetch = vi.fn(async () => { + controller.abort(); + return new Response(JSON.stringify({ status: 'chunk_received', uploadId: 'x' }), { + status: 202, + }); + }) as unknown as typeof fetch; + + await expect( + uploadFileInChunks( + syntheticFile(15), + { title: 't', description: '' }, + { + fetchImpl, + chunkSize: 10, + delay: async () => {}, + signal: controller.signal, + }, + ), + ).rejects.toThrow(/Upload aborted/); + }); +}); + +describe('fetchUploadStatus', () => { + it('returns the parsed body on 200', async () => { + const fetchImpl: typeof fetch = vi.fn( + async () => + new Response( + JSON.stringify({ + uploadId: 'up-1', + chunkCount: 4, + uploadedChunks: [0, 1], + fileName: 'clip.mp4', + fileSize: 100, + title: 'hi', + description: '', + }), + { status: 200 }, + ), + ) as unknown as typeof fetch; + const result = await fetchUploadStatus('up-1', fetchImpl); + expect(result?.uploadedChunks).toEqual([0, 1]); + }); + + it('returns null on 404 (session expired or never existed)', async () => { + const fetchImpl: typeof fetch = vi.fn( + async () => new Response('{}', { status: 404 }), + ) as unknown as typeof fetch; + expect(await fetchUploadStatus('up-1', fetchImpl)).toBeNull(); + }); + + it('throws on other non-2xx responses', async () => { + const fetchImpl: typeof fetch = vi.fn( + async () => new Response('oops', { status: 500 }), + ) as unknown as typeof fetch; + await expect(fetchUploadStatus('up-1', fetchImpl)).rejects.toThrow(); + }); +}); + +describe('cancelUpload', () => { + it('issues DELETE to the upload endpoint', async () => { + const calls: Array<{ url: string; method: string }> = []; + const fetchImpl: typeof fetch = vi.fn(async (url, init) => { + calls.push({ url: String(url), method: (init as RequestInit).method ?? 'GET' }); + return new Response(null, { status: 204 }); + }) as unknown as typeof fetch; + + await cancelUpload('up xy', fetchImpl); + expect(calls).toEqual([ + { url: '/api/videos/upload/up%20xy', method: 'DELETE' }, + ]); + }); + + it('swallows network errors so cancel is best-effort', async () => { + const fetchImpl: typeof fetch = vi.fn(async () => { + throw new Error('boom'); + }) as unknown as typeof fetch; + await expect(cancelUpload('up-1', fetchImpl)).resolves.toBeUndefined(); + }); +}); diff --git a/src/frontend/lib/upload-resume.ts b/src/frontend/lib/upload-resume.ts new file mode 100644 index 0000000..c285129 --- /dev/null +++ b/src/frontend/lib/upload-resume.ts @@ -0,0 +1,417 @@ +// ALO-121: chunked upload with retry + resume. +// +// The backend stores per-upload state in KV under uploadId for 24h. This +// module is responsible for: +// +// - Slicing the file into 10MB chunks (matching MAX_CHUNK_BYTES on the +// server's upload-validation). +// - Retrying each chunk POST with exponential backoff on transient +// failures (5xx, 408, 429, network errors). 4xx with a deterministic +// payload (e.g. validation failure) aborts immediately. +// - Pausing while the browser is offline and resuming when 'online' fires. +// - Persisting `{ uploadId, chunkCount, fingerprint }` to localStorage so +// a page reload after a disconnect can rejoin the same upload by +// calling GET /api/videos/upload/:uploadId/status and skipping +// already-uploaded chunks. +// +// The server expects `chunkIndex=0` to come first when no `uploadId` is +// supplied, but accepts an explicit `uploadId` for retry. The resumption +// path skips chunk-0 if it's already on the server, so we never need to +// re-init the multipart from the client side. + +export const CHUNK_SIZE = 10 * 1024 * 1024; +export const MAX_VIDEO_BYTES = 30 * 1024 * 1024 * 1024; +const RESUME_STORAGE_KEY = 'spooool.upload.resume.v1'; +const DEFAULT_MAX_RETRIES = 4; +const BASE_BACKOFF_MS = 500; +const MAX_BACKOFF_MS = 15_000; + +export interface FileFingerprint { + name: string; + size: number; + lastModified: number; +} + +export interface ResumeRecord { + uploadId: string; + chunkCount: number; + fingerprint: FileFingerprint; + title: string; + description: string; + // Wall-clock time the record was written. Used to expire local records + // older than the server's 24h KV TTL (see UPLOAD_SESSION_TTL_SECONDS). + createdAt: number; +} + +export const RESUME_RECORD_TTL_MS = 23 * 60 * 60 * 1000; + +export function fingerprintFile(file: File): FileFingerprint { + return { + name: file.name, + size: file.size, + lastModified: file.lastModified, + }; +} + +export function fingerprintsMatch(a: FileFingerprint, b: FileFingerprint): boolean { + return a.name === b.name && a.size === b.size && a.lastModified === b.lastModified; +} + +export function chunkCountFor(fileSize: number, chunkSize: number = CHUNK_SIZE): number { + if (fileSize <= 0) return 1; + return Math.ceil(fileSize / chunkSize); +} + +export function loadResumeRecord( + storage: Pick | null = typeof localStorage === 'undefined' + ? null + : localStorage, + now: number = Date.now(), +): ResumeRecord | null { + if (!storage) return null; + const raw = storage.getItem(RESUME_STORAGE_KEY); + if (!raw) return null; + let parsed: unknown; + try { + parsed = JSON.parse(raw); + } catch { + return null; + } + if (!isResumeRecord(parsed)) return null; + if (now - parsed.createdAt > RESUME_RECORD_TTL_MS) return null; + return parsed; +} + +export function saveResumeRecord( + record: ResumeRecord, + storage: Pick | null = typeof localStorage === 'undefined' + ? null + : localStorage, +): void { + if (!storage) return; + try { + storage.setItem(RESUME_STORAGE_KEY, JSON.stringify(record)); + } catch { + // Quota exceeded / private mode — non-fatal; resume just won't survive a reload. + } +} + +export function clearResumeRecord( + storage: Pick | null = typeof localStorage === 'undefined' + ? null + : localStorage, +): void { + if (!storage) return; + try { + storage.removeItem(RESUME_STORAGE_KEY); + } catch { + // Same rationale as saveResumeRecord — best-effort. + } +} + +function isResumeRecord(v: unknown): v is ResumeRecord { + if (!v || typeof v !== 'object') return false; + const r = v as Record; + if (typeof r.uploadId !== 'string') return false; + if (typeof r.chunkCount !== 'number' || r.chunkCount < 1) return false; + if (typeof r.title !== 'string') return false; + if (typeof r.description !== 'string') return false; + if (typeof r.createdAt !== 'number') return false; + const fp = r.fingerprint as Record | undefined; + if (!fp || typeof fp !== 'object') return false; + return ( + typeof fp.name === 'string' && + typeof fp.size === 'number' && + typeof fp.lastModified === 'number' + ); +} + +export interface UploadStatusResponse { + uploadId: string; + chunkCount: number; + uploadedChunks: number[]; + fileName: string | null; + fileSize: number | null; + title: string; + description: string; +} + +export type RetryClassification = 'retry' | 'fail'; + +// Pure, testable: decide if a chunk POST should be retried or abandoned. +// 5xx, 408 Request Timeout, 429 Too Many Requests are transient. +// 0 (network error: client-side fetch threw) is also transient. +export function classifyChunkResponse(status: number): RetryClassification { + if (status === 0) return 'retry'; + if (status === 408 || status === 429) return 'retry'; + if (status >= 500 && status < 600) return 'retry'; + return 'fail'; +} + +export function backoffMs( + attempt: number, + base: number = BASE_BACKOFF_MS, + max: number = MAX_BACKOFF_MS, +): number { + // Exponential, capped, with full jitter so a thundering herd of failed + // chunks doesn't all retry on the same tick. + const exp = Math.min(max, base * 2 ** attempt); + return Math.floor(Math.random() * exp); +} + +export interface UploadProgress { + /** Fraction in [0, 1]. */ + fraction: number; + /** 0-indexed chunk count completed (across resume + new). */ + completed: number; + total: number; +} + +export interface UploadCallbacks { + onProgress?: (progress: UploadProgress) => void; + /** Fired after chunk-0 succeeds with an uploadId so the caller can persist a resume record. */ + onUploadId?: (uploadId: string) => void; + /** Fired when the uploader transitions between online/offline / retrying. */ + onStatus?: (status: 'uploading' | 'retrying' | 'offline' | 'paused') => void; +} + +export interface UploadOptions { + fetchImpl?: typeof fetch; + signal?: AbortSignal; + maxRetries?: number; + /** Override for tests so backoff doesn't actually delay. */ + delay?: (ms: number) => Promise; + /** True when window.navigator.onLine is false. Falls back to true. */ + isOffline?: () => boolean; + /** Resolves when the network is back. In production this is a 'online' event subscription. */ + waitForOnline?: () => Promise; + /** Pre-existing uploadId to resume against. */ + uploadId?: string; + /** Indices that the server has already received — these chunks are skipped. */ + skipChunks?: ReadonlySet; + /** Override chunk size for tests. */ + chunkSize?: number; +} + +export interface UploadResult { + videoId: string; + status: string; +} + +export class UploadAbortedError extends Error { + constructor() { + super('Upload aborted'); + this.name = 'UploadAbortedError'; + } +} + +async function defaultDelay(ms: number): Promise { + if (ms <= 0) return; + await new Promise((resolve) => setTimeout(resolve, ms)); +} + +function defaultIsOffline(): boolean { + return typeof navigator !== 'undefined' && navigator.onLine === false; +} + +function defaultWaitForOnline(): Promise { + if (typeof window === 'undefined') return Promise.resolve(); + if (navigator.onLine) return Promise.resolve(); + return new Promise((resolve) => { + const handler = (): void => { + window.removeEventListener('online', handler); + resolve(); + }; + window.addEventListener('online', handler, { once: true }); + }); +} + +interface ChunkAttemptOutcome { + ok: true; + status: number; + body: { uploadId?: string; id?: string; status?: string }; +} + +interface ChunkAttemptFailure { + ok: false; + status: number; + errorMessage: string; + retryable: boolean; +} + +async function postChunkOnce( + url: string, + formData: FormData, + fetchImpl: typeof fetch, + signal: AbortSignal | undefined, +): Promise { + let res: Response; + try { + res = await fetchImpl(url, { method: 'POST', body: formData, signal }); + } catch (err) { + if ((err as { name?: string }).name === 'AbortError') { + throw new UploadAbortedError(); + } + return { + ok: false, + status: 0, + errorMessage: err instanceof Error ? err.message : 'Network error', + retryable: true, + }; + } + const text = await res.text(); + let parsed: { uploadId?: string; id?: string; status?: string; error?: string; code?: string } = {}; + try { + parsed = text ? JSON.parse(text) : {}; + } catch { + parsed = {}; + } + if (!res.ok) { + const errorMessage = + (parsed.error ? `${parsed.error}${parsed.code ? ` (${parsed.code})` : ''}` : text) + || `HTTP ${res.status}`; + return { + ok: false, + status: res.status, + errorMessage, + retryable: classifyChunkResponse(res.status) === 'retry', + }; + } + return { ok: true, status: res.status, body: parsed }; +} + +// Sliceable so tests can substitute a synthetic blob source. +export interface FileSlice { + size: number; + type: string; + name: string; + slice(start: number, end: number, contentType?: string): Blob; +} + +export async function uploadFileInChunks( + file: FileSlice, + metadata: { title: string; description: string }, + options: UploadOptions = {}, + callbacks: UploadCallbacks = {}, +): Promise { + const fetchImpl = options.fetchImpl ?? fetch; + const delay = options.delay ?? defaultDelay; + const isOffline = options.isOffline ?? defaultIsOffline; + const waitForOnline = options.waitForOnline ?? defaultWaitForOnline; + const maxRetries = options.maxRetries ?? DEFAULT_MAX_RETRIES; + const chunkSize = options.chunkSize ?? CHUNK_SIZE; + const chunkCount = chunkCountFor(file.size, chunkSize); + const skip = options.skipChunks ?? new Set(); + + let uploadId: string | null = options.uploadId ?? null; + let lastBody: { uploadId?: string; id?: string; status?: string } | null = null; + let completed = skip.size; + + const emitProgress = (): void => { + callbacks.onProgress?.({ + fraction: chunkCount === 0 ? 1 : completed / chunkCount, + completed, + total: chunkCount, + }); + }; + + // Emit initial progress so the UI reflects already-uploaded chunks + // immediately on resume. + emitProgress(); + + for (let index = 0; index < chunkCount; index += 1) { + if (options.signal?.aborted) throw new UploadAbortedError(); + if (skip.has(index)) continue; + + const start = index * chunkSize; + const end = Math.min(start + chunkSize, file.size); + const chunk = file.slice(start, end, file.type); + + let attempt = 0; + let backoff = 0; + while (true) { + if (options.signal?.aborted) throw new UploadAbortedError(); + if (isOffline()) { + callbacks.onStatus?.('offline'); + await waitForOnline(); + } + callbacks.onStatus?.(attempt === 0 ? 'uploading' : 'retrying'); + + const formData = new FormData(); + formData.set('title', metadata.title); + formData.set('description', metadata.description); + formData.set('file', chunk, file.name); + formData.set('chunkIndex', String(index)); + formData.set('chunkCount', String(chunkCount)); + if (uploadId) formData.set('uploadId', uploadId); + + const outcome = await postChunkOnce('/api/videos/upload', formData, fetchImpl, options.signal); + if (outcome.ok) { + lastBody = outcome.body; + if (!uploadId && outcome.body.uploadId) { + uploadId = outcome.body.uploadId; + callbacks.onUploadId?.(uploadId); + } + break; + } + if (!outcome.retryable || attempt >= maxRetries) { + throw new Error(`Upload failed (${outcome.status}): ${outcome.errorMessage}`); + } + backoff = backoffMs(attempt); + attempt += 1; + callbacks.onStatus?.('retrying'); + await delay(backoff); + } + + completed += 1; + emitProgress(); + } + + if (!lastBody) { + // Edge case: every chunk was skipped (already uploaded) but the server + // still owes us a completion. The backend completes the multipart on + // the final chunk's POST, so if every chunk was already on the server + // the multipart is already complete and the videoId is in the resume + // status response — but we don't carry that here. The caller is + // expected to have driven the final chunk through this function. + throw new Error('Upload finished without a completion response'); + } + return { + videoId: lastBody.id ?? '', + status: lastBody.status ?? 'queued', + }; +} + +// Calls GET /api/videos/upload/:uploadId/status. Returns null on 404 so the +// caller can transparently fall through to a fresh upload when the server's +// session has expired. +export async function fetchUploadStatus( + uploadId: string, + fetchImpl: typeof fetch = fetch, +): Promise { + const res = await fetchImpl( + `/api/videos/upload/${encodeURIComponent(uploadId)}/status`, + { method: 'GET' }, + ); + if (res.status === 404) return null; + if (!res.ok) { + throw new Error(`Resume status failed (${res.status})`); + } + return (await res.json()) as UploadStatusResponse; +} + +// Best-effort. The DELETE endpoint is idempotent, so a swallowed network +// error here at most leaves the multipart sitting in R2 until R2's lifecycle +// rules clean it up. +export async function cancelUpload( + uploadId: string, + fetchImpl: typeof fetch = fetch, +): Promise { + try { + await fetchImpl(`/api/videos/upload/${encodeURIComponent(uploadId)}`, { + method: 'DELETE', + }); + } catch { + // Network error mid-cancel: best-effort. + } +} diff --git a/src/frontend/pages/Upload.tsx b/src/frontend/pages/Upload.tsx index 141729f..1c7abcc 100644 --- a/src/frontend/pages/Upload.tsx +++ b/src/frontend/pages/Upload.tsx @@ -1,7 +1,19 @@ -import { FormEvent, useMemo, useState } from 'react'; +import { FormEvent, useEffect, useMemo, useRef, useState } from 'react'; import { useSession } from '../lib/auth-client'; +import { + cancelUpload, + chunkCountFor, + clearResumeRecord, + fetchUploadStatus, + fingerprintFile, + fingerprintsMatch, + loadResumeRecord, + saveResumeRecord, + uploadFileInChunks, + UploadAbortedError, + type ResumeRecord, +} from '../lib/upload-resume'; -const CHUNK_SIZE = 10 * 1024 * 1024; const MAX_SIZE = 30 * 1024 * 1024 * 1024; const ALLOWED_EXTENSIONS = new Set([ 'mp4', @@ -25,128 +37,6 @@ function isAcceptedVideo(file: File): boolean { return ALLOWED_EXTENSIONS.has(file.name.slice(dot + 1).toLowerCase()); } -function resumeKey(file: File): string { - return `spooool:upload:${file.name}:${file.size}:${file.lastModified}`; -} - -async function fetchResumeState( - uploadId: string, -): Promise<{ received: number[]; chunkCount: number } | null> { - try { - const res = await fetch(`/api/videos/upload/${encodeURIComponent(uploadId)}/status`, { - credentials: 'same-origin', - }); - if (!res.ok) return null; - const data = (await res.json()) as { received?: number[]; chunkCount?: number }; - if (typeof data.chunkCount !== 'number' || !Array.isArray(data.received)) return null; - return { received: data.received, chunkCount: data.chunkCount }; - } catch { - return null; - } -} - -async function uploadInChunks( - file: File, - title: string, - description: string, - onProgress: (value: number) => void, -): Promise { - const chunkCount = Math.ceil(file.size / CHUNK_SIZE); - let lastResponse: Response | null = null; - let uploadId: string | null = null; - const received = new Set(); - - // ALO-121: resume across page reloads / disconnects. Persist uploadId - // keyed by file identity so a refresh continues where the last attempt - // left off; the server is the source of truth for which chunks landed. - const persistKey = resumeKey(file); - const stored = typeof localStorage !== 'undefined' ? localStorage.getItem(persistKey) : null; - if (stored) { - const state = await fetchResumeState(stored); - if (state && state.chunkCount === chunkCount) { - uploadId = stored; - for (const i of state.received) received.add(i); - if (received.size > 0) { - onProgress(Math.round((received.size / chunkCount) * 100)); - } - } else { - try { localStorage.removeItem(persistKey); } catch { /* private mode */ } - } - } - - for (let index = 0; index < chunkCount; index += 1) { - if (received.has(index) && index < chunkCount - 1) { - // Final chunk must still be sent so the server can run completion; - // intermediate chunks the server already has can be skipped. - onProgress(Math.round(((index + 1) / chunkCount) * 100)); - continue; - } - const start = index * CHUNK_SIZE; - const end = Math.min(start + CHUNK_SIZE, file.size); - const chunk = file.slice(start, end, file.type); - - let attempt = 0; - const maxAttempts = 4; - // Per-chunk retry with exponential backoff. Network blips and 5xx are - // transient; the server is idempotent on chunk re-submission so retries - // are safe. - while (true) { - const formData = new FormData(); - formData.set('title', title); - formData.set('description', description); - formData.set('file', chunk, file.name); - formData.set('chunkIndex', String(index)); - formData.set('chunkCount', String(chunkCount)); - if (uploadId) formData.set('uploadId', uploadId); - - let res: Response; - try { - res = await fetch('/api/videos/upload', { method: 'POST', body: formData }); - } catch (err) { - if (++attempt >= maxAttempts) throw err; - await new Promise((r) => setTimeout(r, 500 * 2 ** (attempt - 1))); - continue; - } - - if (res.ok) { - lastResponse = res; - const responseData = (await res.json()) as { uploadId?: string }; - if (responseData.uploadId) { - uploadId = responseData.uploadId; - try { localStorage.setItem(persistKey, uploadId); } catch { /* private mode */ } - } - received.add(index); - break; - } - - if (res.status >= 500 && ++attempt < maxAttempts) { - await new Promise((r) => setTimeout(r, 500 * 2 ** (attempt - 1))); - continue; - } - - const body = await res.text(); - let detail = body; - try { - const parsed = JSON.parse(body) as { error?: string; code?: string }; - detail = parsed.error ?? body; - if (parsed.code) detail = `${detail} (${parsed.code})`; - } catch { - /* non-JSON */ - } - throw new Error(`Upload failed (${res.status}): ${detail.slice(0, 300)}`); - } - - onProgress(Math.round(((index + 1) / chunkCount) * 100)); - } - - try { localStorage.removeItem(persistKey); } catch { /* private mode */ } - - if (!lastResponse) { - throw new Error('No upload response'); - } - return lastResponse; -} - async function resendVerification(): Promise<{ ok: boolean; error: string | null }> { // ALO-128: ask better-auth to re-issue the verification email. The session // cookie identifies the user, so the body is empty. @@ -172,6 +62,8 @@ async function resendVerification(): Promise<{ ok: boolean; error: string | null return { ok: false, error: message }; } +type UploadStatus = 'idle' | 'uploading' | 'retrying' | 'offline' | 'paused' | 'done' | 'error'; + export function Upload(): JSX.Element { const { data: session } = useSession(); const [file, setFile] = useState(null); @@ -179,8 +71,10 @@ export function Upload(): JSX.Element { const [description, setDescription] = useState(''); const [progress, setProgress] = useState(0); const [error, setError] = useState(null); - const [status, setStatus] = useState(null); + const [status, setStatus] = useState('idle'); const [resendStatus, setResendStatus] = useState(null); + const [pendingResume, setPendingResume] = useState(null); + const abortRef = useRef(null); const isEmailVerified = session?.user?.emailVerified !== false; const isValidFile = useMemo(() => { @@ -190,10 +84,105 @@ export function Upload(): JSX.Element { return file.size <= MAX_SIZE && isAcceptedVideo(file); }, [file]); + // ALO-121: surface a saved resume offer on mount. The user has to re-pick + // the same file — the File API doesn't let us hold a handle across reloads — + // and we verify the fingerprint matches before offering resume. + useEffect(() => { + const stored = loadResumeRecord(); + if (stored) { + setPendingResume(stored); + setTitle(stored.title); + setDescription(stored.description); + } + }, []); + + const canResumeWithFile = useMemo(() => { + if (!file || !pendingResume) return false; + return fingerprintsMatch(fingerprintFile(file), pendingResume.fingerprint); + }, [file, pendingResume]); + + async function runUpload(resume: ResumeRecord | null): Promise { + if (!file) return; + setError(null); + setStatus('uploading'); + setProgress(0); + + const controller = new AbortController(); + abortRef.current = controller; + + let uploadIdForResume = resume?.uploadId ?? null; + let skipChunks = new Set(); + + if (resume) { + try { + const remote = await fetchUploadStatus(resume.uploadId); + if (remote && remote.chunkCount === resume.chunkCount) { + skipChunks = new Set(remote.uploadedChunks); + } else { + // Server forgot the session — start fresh. + uploadIdForResume = null; + clearResumeRecord(); + setPendingResume(null); + } + } catch (err) { + // If the status probe fails, fall back to a fresh upload rather + // than getting stuck. The previous multipart in R2 will be cleaned + // up by R2's lifecycle policy. + uploadIdForResume = null; + clearResumeRecord(); + setPendingResume(null); + setError(err instanceof Error ? err.message : 'Could not check resume state'); + } + } + + const chunkCount = chunkCountFor(file.size); + const fingerprint = fingerprintFile(file); + + try { + const result = await uploadFileInChunks( + file, + { title, description }, + { + uploadId: uploadIdForResume ?? undefined, + skipChunks, + signal: controller.signal, + }, + { + onProgress: (p) => setProgress(Math.round(p.fraction * 100)), + onStatus: (s) => setStatus(s), + onUploadId: (uploadId) => { + const record: ResumeRecord = { + uploadId, + chunkCount, + fingerprint, + title, + description, + createdAt: Date.now(), + }; + saveResumeRecord(record); + setPendingResume(record); + }, + }, + ); + clearResumeRecord(); + setPendingResume(null); + setStatus('done'); + setProgress(100); + void result; + } catch (err: unknown) { + if (err instanceof UploadAbortedError) { + setStatus('paused'); + return; + } + setStatus('error'); + setError(err instanceof Error ? err.message : 'Upload failed'); + } finally { + abortRef.current = null; + } + } + async function onSubmit(event: FormEvent): Promise { event.preventDefault(); - setError(null); - setStatus(null); if (!file) { setError('Please choose a file'); @@ -208,12 +197,29 @@ export function Upload(): JSX.Element { return; } - try { - await uploadInChunks(file, title, description, setProgress); - setStatus('Upload complete'); - } catch (err: unknown) { - setError(err instanceof Error ? err.message : 'Upload failed'); + const resume = canResumeWithFile ? pendingResume : null; + await runUpload(resume); + } + + async function onCancel(): Promise { + abortRef.current?.abort(); + if (pendingResume) { + await cancelUpload(pendingResume.uploadId); + clearResumeRecord(); + setPendingResume(null); + } + setStatus('idle'); + setProgress(0); + } + + function onDiscardResume(): void { + if (pendingResume) { + void cancelUpload(pendingResume.uploadId); } + clearResumeRecord(); + setPendingResume(null); + setTitle(''); + setDescription(''); } return ( @@ -247,6 +253,26 @@ export function Upload(): JSX.Element { ) : null} + {pendingResume ? ( +
+ You have an unfinished upload. +

+ {pendingResume.fingerprint.name} ({Math.round( + pendingResume.fingerprint.size / (1024 * 1024), + )}{' '} + MB).{' '} + {canResumeWithFile + ? 'Re-selecting the same file will resume where you left off.' + : 'Pick the same file to resume, or discard to start over.'} +

+
+ +
+
+ ) : null} +
void onSubmit(event)} className="card stack" @@ -293,7 +319,15 @@ export function Upload(): JSX.Element {
- Upload progress + + {status === 'offline' + ? 'Waiting for connection…' + : status === 'retrying' + ? 'Retrying…' + : status === 'paused' + ? 'Paused' + : 'Upload progress'} + {progress}%
-
- + {status === 'uploading' || status === 'retrying' || status === 'offline' ? ( + + ) : null}
{error ?

{error}

: null} - {status ?

{status}

: null} + {status === 'done' ?

Upload complete

: null} ); } diff --git a/src/workers/upload-session.test.ts b/src/workers/upload-session.test.ts new file mode 100644 index 0000000..6bedf7a --- /dev/null +++ b/src/workers/upload-session.test.ts @@ -0,0 +1,241 @@ +import { describe, expect, it } from 'vitest'; +import { + abortUploadSession, + deleteUploadSession, + readUploadSession, + uploadedChunkIndices, + uploadMetaPersistedSchema, + uploadSessionKeys, +} from './upload-session'; + +class FakeKV { + private store = new Map(); + + async get(key: string): Promise { + return this.store.get(key) ?? null; + } + + async put(key: string, value: string): Promise { + this.store.set(key, value); + } + + async delete(key: string): Promise { + this.store.delete(key); + } + + has(key: string): boolean { + return this.store.has(key); + } +} + +interface FakeMultipartUpload { + abort(): Promise; +} + +class FakeR2 { + abortCalls: Array<{ key: string; uploadId: string }> = []; + resumeMultipartUpload(key: string, uploadId: string): FakeMultipartUpload { + return { + abort: async () => { + this.abortCalls.push({ key, uploadId }); + }, + }; + } +} + +function envFor(): { SESSIONS: KVNamespace; VIDEOS: R2Bucket; kv: FakeKV; r2: FakeR2 } { + const kv = new FakeKV(); + const r2 = new FakeR2(); + return { + SESSIONS: kv as unknown as KVNamespace, + VIDEOS: r2 as unknown as R2Bucket, + kv, + r2, + }; +} + +describe('uploadSessionKeys', () => { + it('namespaces by user id and upload id', () => { + const keys = uploadSessionKeys('u1', 'up-abc'); + expect(keys.base).toBe('upload:u1:up-abc'); + expect(keys.mpid).toBe('upload:u1:up-abc:mpid'); + expect(keys.meta).toBe('upload:u1:up-abc:meta'); + expect(keys.parts).toBe('upload:u1:up-abc:parts'); + }); +}); + +describe('uploadMetaPersistedSchema', () => { + it('parses the legacy shape (no fingerprint fields)', () => { + const parsed = uploadMetaPersistedSchema.safeParse({ + videoId: 'v1', + r2Key: 'u1/v1/clip.mp4', + title: 'hi', + description: '', + chunkCount: 3, + }); + expect(parsed.success).toBe(true); + }); + + it('parses with fingerprint fields', () => { + const parsed = uploadMetaPersistedSchema.safeParse({ + videoId: 'v1', + r2Key: 'u1/v1/clip.mp4', + title: 'hi', + description: '', + chunkCount: 3, + fileName: 'clip.mp4', + fileSize: 1024, + }); + expect(parsed.success).toBe(true); + }); + + it('rejects a non-positive chunkCount', () => { + const parsed = uploadMetaPersistedSchema.safeParse({ + videoId: 'v1', + r2Key: 'k', + title: 't', + description: '', + chunkCount: 0, + }); + expect(parsed.success).toBe(false); + }); +}); + +describe('readUploadSession', () => { + it('returns null when the session is missing', async () => { + const env = envFor(); + expect(await readUploadSession(env, 'u1', 'missing')).toBeNull(); + }); + + it('reads back a session with parts manifest', async () => { + const env = envFor(); + const keys = uploadSessionKeys('u1', 'up-1'); + await env.kv.put(keys.mpid, 'mpid-xyz'); + await env.kv.put( + keys.meta, + JSON.stringify({ + videoId: 'v1', + r2Key: 'u1/v1/clip.mp4', + title: 'hi', + description: 'd', + chunkCount: 3, + fileName: 'clip.mp4', + fileSize: 999, + }), + ); + await env.kv.put( + keys.parts, + JSON.stringify({ + '1': { etag: 'e1', size: 10 }, + '2': { etag: 'e2', size: 20 }, + }), + ); + + const session = await readUploadSession(env, 'u1', 'up-1'); + expect(session).not.toBeNull(); + expect(session?.multipartUploadId).toBe('mpid-xyz'); + expect(session?.meta.fileName).toBe('clip.mp4'); + expect(session?.parts['1'].etag).toBe('e1'); + expect(session?.parts['2'].size).toBe(20); + }); + + it('returns null when the meta JSON is malformed', async () => { + const env = envFor(); + const keys = uploadSessionKeys('u1', 'up-bad'); + await env.kv.put(keys.mpid, 'm'); + await env.kv.put(keys.meta, '{not-json'); + expect(await readUploadSession(env, 'u1', 'up-bad')).toBeNull(); + }); + + it('drops malformed individual part entries but keeps valid ones', async () => { + const env = envFor(); + const keys = uploadSessionKeys('u1', 'up-mix'); + await env.kv.put(keys.mpid, 'm'); + await env.kv.put( + keys.meta, + JSON.stringify({ + videoId: 'v1', + r2Key: 'k', + title: 't', + description: '', + chunkCount: 2, + }), + ); + await env.kv.put( + keys.parts, + JSON.stringify({ + '1': { etag: 'good', size: 5 }, + '2': { etag: 'bad' }, + }), + ); + const session = await readUploadSession(env, 'u1', 'up-mix'); + expect(session?.parts['1']).toEqual({ etag: 'good', size: 5 }); + expect(session?.parts['2']).toBeUndefined(); + }); +}); + +describe('uploadedChunkIndices', () => { + it('converts 1-indexed part numbers to 0-indexed chunk indices, sorted', () => { + const out = uploadedChunkIndices({ + '3': { etag: 'e3', size: 1 }, + '1': { etag: 'e1', size: 1 }, + '5': { etag: 'e5', size: 1 }, + }); + expect(out).toEqual([0, 2, 4]); + }); + + it('ignores non-integer part keys defensively', () => { + const out = uploadedChunkIndices({ + foo: { etag: 'e', size: 1 }, + '1': { etag: 'e', size: 1 }, + }); + expect(out).toEqual([0]); + }); +}); + +describe('deleteUploadSession', () => { + it('removes all three keys', async () => { + const env = envFor(); + const keys = uploadSessionKeys('u1', 'up-d'); + await env.kv.put(keys.mpid, 'm'); + await env.kv.put(keys.meta, '{}'); + await env.kv.put(keys.parts, '{}'); + await deleteUploadSession(env, 'u1', 'up-d'); + expect(env.kv.has(keys.mpid)).toBe(false); + expect(env.kv.has(keys.meta)).toBe(false); + expect(env.kv.has(keys.parts)).toBe(false); + }); +}); + +describe('abortUploadSession', () => { + it('aborts the R2 multipart and clears KV when the session exists', async () => { + const env = envFor(); + const keys = uploadSessionKeys('u1', 'up-a'); + await env.kv.put(keys.mpid, 'mp-1'); + await env.kv.put( + keys.meta, + JSON.stringify({ + videoId: 'v1', + r2Key: 'u1/v1/clip.mp4', + title: 't', + description: '', + chunkCount: 2, + }), + ); + await env.kv.put(keys.parts, JSON.stringify({ '1': { etag: 'e1', size: 1 } })); + + const result = await abortUploadSession(env, 'u1', 'up-a'); + expect(result.aborted).toBe(true); + expect(env.r2.abortCalls).toEqual([{ key: 'u1/v1/clip.mp4', uploadId: 'mp-1' }]); + expect(env.kv.has(keys.mpid)).toBe(false); + expect(env.kv.has(keys.meta)).toBe(false); + expect(env.kv.has(keys.parts)).toBe(false); + }); + + it('returns aborted:false when the session is already gone, and still clears (idempotent)', async () => { + const env = envFor(); + const result = await abortUploadSession(env, 'u1', 'never-existed'); + expect(result.aborted).toBe(false); + expect(env.r2.abortCalls).toHaveLength(0); + }); +}); diff --git a/src/workers/upload-session.ts b/src/workers/upload-session.ts new file mode 100644 index 0000000..eb29940 --- /dev/null +++ b/src/workers/upload-session.ts @@ -0,0 +1,164 @@ +// ALO-121: chunked upload session state. +// +// State for an in-flight multipart upload lives in the SESSIONS KV under +// three keys keyed by user id + upload id, with a 24h TTL: +// +// upload:{userId}:{uploadId}:mpid — R2 multipart uploadId string +// upload:{userId}:{uploadId}:meta — JSON: { videoId, r2Key, title, description, chunkCount, fileName?, fileSize? } +// upload:{userId}:{uploadId}:parts — JSON: Record +// +// Splitting metadata/parts/uploadId across keys keeps the parts manifest +// small enough to update on every chunk write without the cost of +// re-serialising the whole session document on each PUT (KV value cap is +// 25MB but write amplification still costs us). + +import { z } from 'zod'; + +export const UPLOAD_SESSION_TTL_SECONDS = 24 * 60 * 60; + +export const uploadMetaPersistedSchema = z.object({ + videoId: z.string(), + r2Key: z.string(), + title: z.string(), + description: z.string(), + chunkCount: z.number().int().positive(), + // Optional fingerprint fields — used by the resume status endpoint so the + // frontend can verify the user picked the same file before resuming. + // Older sessions without these still parse and just lack the fingerprint + // in the status response. + fileName: z.string().optional(), + fileSize: z.number().int().nonnegative().optional(), +}); + +export type UploadMetaPersisted = z.infer; + +export const uploadPartSchema = z.object({ + etag: z.string(), + size: z.number().int().nonnegative(), +}); + +export type UploadPart = z.infer; + +export type UploadPartsMap = Record; + +export interface UploadSessionEnv { + SESSIONS: KVNamespace; + VIDEOS: R2Bucket; +} + +export interface UploadSessionKeys { + base: string; + mpid: string; + meta: string; + parts: string; +} + +export function uploadSessionKeys(userId: string, uploadId: string): UploadSessionKeys { + const base = `upload:${userId}:${uploadId}`; + return { + base, + mpid: `${base}:mpid`, + meta: `${base}:meta`, + parts: `${base}:parts`, + }; +} + +export interface UploadSession { + uploadId: string; + multipartUploadId: string; + meta: UploadMetaPersisted; + parts: UploadPartsMap; +} + +export async function readUploadSession( + env: UploadSessionEnv, + userId: string, + uploadId: string, +): Promise { + const keys = uploadSessionKeys(userId, uploadId); + const [multipartUploadId, metaJson, partsJson] = await Promise.all([ + env.SESSIONS.get(keys.mpid), + env.SESSIONS.get(keys.meta), + env.SESSIONS.get(keys.parts), + ]); + if (!multipartUploadId || !metaJson) return null; + + let meta: UploadMetaPersisted; + try { + meta = uploadMetaPersistedSchema.parse(JSON.parse(metaJson)); + } catch { + return null; + } + + let parts: UploadPartsMap = {}; + if (partsJson) { + try { + const raw = JSON.parse(partsJson) as unknown; + if (raw && typeof raw === 'object') { + for (const [k, v] of Object.entries(raw as Record)) { + const parsed = uploadPartSchema.safeParse(v); + if (parsed.success) parts[k] = parsed.data; + } + } + } catch { + parts = {}; + } + } + + return { uploadId, multipartUploadId, meta, parts }; +} + +// Translate the parts manifest into a 0-indexed list of chunk indices the +// server has already received, sorted ascending. The frontend uses this to +// skip already-uploaded chunks when resuming. Part numbers in R2 are +// 1-indexed; chunkIndex is 0-indexed. +export function uploadedChunkIndices(parts: UploadPartsMap): number[] { + const indices: number[] = []; + for (const partNumber of Object.keys(parts)) { + const n = Number(partNumber); + if (Number.isInteger(n) && n >= 1) { + indices.push(n - 1); + } + } + indices.sort((a, b) => a - b); + return indices; +} + +export async function deleteUploadSession( + env: UploadSessionEnv, + userId: string, + uploadId: string, +): Promise { + const keys = uploadSessionKeys(userId, uploadId); + await Promise.all([ + env.SESSIONS.delete(keys.mpid), + env.SESSIONS.delete(keys.meta), + env.SESSIONS.delete(keys.parts), + ]); +} + +// Best-effort R2 multipart abort + KV cleanup. Used by the explicit DELETE +// endpoint and the storage-quota safety net in videos.ts. +export async function abortUploadSession( + env: UploadSessionEnv, + userId: string, + uploadId: string, +): Promise<{ aborted: boolean }> { + const session = await readUploadSession(env, userId, uploadId); + if (!session) { + await deleteUploadSession(env, userId, uploadId); + return { aborted: false }; + } + try { + const multipart = env.VIDEOS.resumeMultipartUpload( + session.meta.r2Key, + session.multipartUploadId, + ); + await multipart.abort(); + } catch { + // R2 abort is best-effort — the part data is uncommitted and R2's + // lifecycle eventually reclaims it. Always clear the KV state. + } + await deleteUploadSession(env, userId, uploadId); + return { aborted: true }; +} diff --git a/src/workers/videos.test.ts b/src/workers/videos.test.ts index 182aba0..7f9e3b5 100644 --- a/src/workers/videos.test.ts +++ b/src/workers/videos.test.ts @@ -1,13 +1,33 @@ import { describe, expect, it } from 'vitest'; import { Hono } from 'hono'; import { videoRoutes, type VideoRoutesEnv } from './videos'; +import { uploadSessionKeys } from './upload-session'; interface QuotaState { used: number; quota: number; } -function fakeEnv(quota: QuotaState): VideoRoutesEnv { +class FakeKV { + private store = new Map(); + async get(key: string): Promise { + return this.store.get(key) ?? null; + } + async put(key: string, value: string): Promise { + this.store.set(key, value); + } + async delete(key: string): Promise { + this.store.delete(key); + } + has(key: string): boolean { + return this.store.has(key); + } +} + +function fakeEnv( + quota: QuotaState, + overrides: Partial = {}, +): VideoRoutesEnv { // Minimal D1 stub — only the storage-quota SELECTs are exercised by the // tests below. The 413 path returns before any other DB / R2 / KV / // queue work, so we don't have to fake those bindings. @@ -46,6 +66,7 @@ function fakeEnv(quota: QuotaState): VideoRoutesEnv { CACHE: {} as unknown as KVNamespace, SESSIONS: {} as unknown as KVNamespace, VIDEO_ENCODING: { send: async () => {} } as unknown as Queue, + ...overrides, }; } @@ -142,77 +163,181 @@ describe('upload storage-quota gate', () => { }); }); -// ALO-121: resume-on-disconnect endpoint surfaces server-known chunks. -describe('upload status endpoint', () => { - function envWithSessions(store: Record): VideoRoutesEnv { - const sessions = { - async get(key: string) { - return store[key] ?? null; - }, - async put() {}, - async delete() {}, - } as unknown as KVNamespace; - return { - DB: {} as unknown as D1Database, - VIDEOS: {} as unknown as R2Bucket, - CACHE: {} as unknown as KVNamespace, - SESSIONS: sessions, - VIDEO_ENCODING: { send: async () => {} } as unknown as Queue, - }; - } - - it('returns 401 when unauthenticated', async () => { - const fetcher = mountWithUser(envWithSessions({}), null); - const res = await fetcher('/api/videos/upload/abc/status'); +// ALO-121: resume status endpoint. The frontend uses it to learn which +// chunks are already on the server before retrying so a 1GB upload +// resumed after a disconnect doesn't re-send any megabyte twice. +describe('GET /api/videos/upload/:uploadId/status', () => { + it('401 when no session', async () => { + const fetcher = mountWithUser(fakeEnv({ used: 0, quota: 1024 }), null); + const res = await fetcher('/api/videos/upload/up-1/status'); expect(res.status).toBe(401); }); - it('returns 404 when no session exists for the user/uploadId pair', async () => { - const fetcher = mountWithUser(envWithSessions({}), { - id: 'u1', email: 'a@b.com', name: 'A', emailVerified: true, + it('404 when the upload session is missing or expired', async () => { + const sessions = new FakeKV(); + const env = fakeEnv( + { used: 0, quota: 1024 }, + { SESSIONS: sessions as unknown as KVNamespace }, + ); + const fetcher = mountWithUser(env, { + id: 'u1', + email: 'a@b.com', + name: 'A', + emailVerified: true, }); - const res = await fetcher('/api/videos/upload/missing/status'); + const res = await fetcher('/api/videos/upload/never-existed/status'); expect(res.status).toBe(404); }); - it('reports received chunk indexes (0-based) from stored parts', async () => { - const base = 'upload:u1:abc'; - const store: Record = { - [`${base}:mpid`]: 'mpid-1', - [`${base}:meta`]: JSON.stringify({ - videoId: 'v1', r2Key: 'u1/v1/clip.mp4', title: 't', description: 'd', chunkCount: 3, + it('returns chunkCount + uploadedChunks for the caller’s own session', async () => { + const sessions = new FakeKV(); + const keys = uploadSessionKeys('u1', 'up-resume'); + await sessions.put(keys.mpid, 'mpid-xyz'); + await sessions.put( + keys.meta, + JSON.stringify({ + videoId: 'v1', + r2Key: 'u1/v1/clip.mp4', + title: 'My clip', + description: 'd', + chunkCount: 5, + fileName: 'clip.mp4', + fileSize: 12345, }), - [`${base}:parts`]: JSON.stringify({ + ); + await sessions.put( + keys.parts, + JSON.stringify({ '1': { etag: 'e1', size: 10 }, - '3': { etag: 'e3', size: 7 }, + '3': { etag: 'e3', size: 10 }, }), - }; - const fetcher = mountWithUser(envWithSessions(store), { - id: 'u1', email: 'a@b.com', name: 'A', emailVerified: true, + ); + const env = fakeEnv( + { used: 0, quota: 1024 }, + { SESSIONS: sessions as unknown as KVNamespace }, + ); + const fetcher = mountWithUser(env, { + id: 'u1', + email: 'a@b.com', + name: 'A', + emailVerified: true, }); - const res = await fetcher('/api/videos/upload/abc/status'); + + const res = await fetcher('/api/videos/upload/up-resume/status'); expect(res.status).toBe(200); const body = (await res.json()) as { - uploadId: string; chunkCount: number; received: number[]; bytesReceived: number; + uploadId: string; + chunkCount: number; + uploadedChunks: number[]; + fileName: string | null; + fileSize: number | null; + title: string; }; - expect(body.uploadId).toBe('abc'); - expect(body.chunkCount).toBe(3); - expect(body.received).toEqual([0, 2]); - expect(body.bytesReceived).toBe(17); + expect(body.uploadId).toBe('up-resume'); + expect(body.chunkCount).toBe(5); + expect(body.uploadedChunks).toEqual([0, 2]); + expect(body.fileName).toBe('clip.mp4'); + expect(body.fileSize).toBe(12345); + expect(body.title).toBe('My clip'); }); - it('scopes upload sessions per user (cannot read another user\'s upload)', async () => { - const base = 'upload:other:abc'; - const store: Record = { - [`${base}:mpid`]: 'mpid-1', - [`${base}:meta`]: JSON.stringify({ - videoId: 'v1', r2Key: 'other/v1/clip.mp4', title: 't', description: 'd', chunkCount: 1, + it('cannot read another user’s upload (different KV namespace path)', async () => { + const sessions = new FakeKV(); + // Session belongs to u2. + const keys = uploadSessionKeys('u2', 'up-private'); + await sessions.put(keys.mpid, 'mpid'); + await sessions.put( + keys.meta, + JSON.stringify({ + videoId: 'v1', + r2Key: 'u2/v1/clip.mp4', + title: 't', + description: '', + chunkCount: 2, }), - }; - const fetcher = mountWithUser(envWithSessions(store), { - id: 'u1', email: 'a@b.com', name: 'A', emailVerified: true, + ); + const env = fakeEnv( + { used: 0, quota: 1024 }, + { SESSIONS: sessions as unknown as KVNamespace }, + ); + // ...but u1 calls the endpoint. + const fetcher = mountWithUser(env, { + id: 'u1', + email: 'a@b.com', + name: 'A', + emailVerified: true, }); - const res = await fetcher('/api/videos/upload/abc/status'); + const res = await fetcher('/api/videos/upload/up-private/status'); expect(res.status).toBe(404); }); }); + +describe('DELETE /api/videos/upload/:uploadId', () => { + it('401 when no session', async () => { + const fetcher = mountWithUser(fakeEnv({ used: 0, quota: 1024 }), null); + const res = await fetcher('/api/videos/upload/up-1', { method: 'DELETE' }); + expect(res.status).toBe(401); + }); + + it('clears the KV session and aborts the R2 multipart', async () => { + const sessions = new FakeKV(); + const keys = uploadSessionKeys('u1', 'up-cancel'); + await sessions.put(keys.mpid, 'mpid-xyz'); + await sessions.put( + keys.meta, + JSON.stringify({ + videoId: 'v1', + r2Key: 'u1/v1/clip.mp4', + title: 't', + description: '', + chunkCount: 3, + }), + ); + await sessions.put(keys.parts, JSON.stringify({ '1': { etag: 'e1', size: 1 } })); + + const aborts: Array<{ key: string; uploadId: string }> = []; + const r2 = { + put: async () => {}, + resumeMultipartUpload: (key: string, mpid: string) => ({ + async abort() { + aborts.push({ key, uploadId: mpid }); + }, + }), + } as unknown as R2Bucket; + + const env = fakeEnv( + { used: 0, quota: 1024 }, + { SESSIONS: sessions as unknown as KVNamespace, VIDEOS: r2 }, + ); + const fetcher = mountWithUser(env, { + id: 'u1', + email: 'a@b.com', + name: 'A', + emailVerified: true, + }); + + const res = await fetcher('/api/videos/upload/up-cancel', { method: 'DELETE' }); + expect(res.status).toBe(204); + expect(aborts).toEqual([{ key: 'u1/v1/clip.mp4', uploadId: 'mpid-xyz' }]); + expect(sessions.has(keys.mpid)).toBe(false); + expect(sessions.has(keys.meta)).toBe(false); + expect(sessions.has(keys.parts)).toBe(false); + }); + + it('returns 204 even when the session does not exist (idempotent)', async () => { + const sessions = new FakeKV(); + const env = fakeEnv( + { used: 0, quota: 1024 }, + { SESSIONS: sessions as unknown as KVNamespace }, + ); + const fetcher = mountWithUser(env, { + id: 'u1', + email: 'a@b.com', + name: 'A', + emailVerified: true, + }); + + const res = await fetcher('/api/videos/upload/never-existed', { method: 'DELETE' }); + expect(res.status).toBe(204); + }); +}); diff --git a/src/workers/videos.ts b/src/workers/videos.ts index eb9f17b..4184b87 100644 --- a/src/workers/videos.ts +++ b/src/workers/videos.ts @@ -7,6 +7,15 @@ import { rateLimit, rateLimitHeaders, } from './rate-limit'; +import { + abortUploadSession, + deleteUploadSession, + readUploadSession, + uploadedChunkIndices, + UPLOAD_SESSION_TTL_SECONDS, + uploadSessionKeys, + type UploadPartsMap, +} from './upload-session'; import { MAX_VIDEO_BYTES, parseChunkMetadataFromFormData, @@ -72,59 +81,11 @@ const uploadMetadataSchema = z.object({ description: z.string().max(5000).optional().default(''), }); -const uploadMetaPersistedSchema = z.object({ - videoId: z.string(), - r2Key: z.string(), - title: z.string(), - description: z.string(), - chunkCount: z.number().int().positive(), -}); - -function uploadKvBase(userId: string, uploadId: string): string { - return `upload:${userId}:${uploadId}`; -} - export const videoRoutes = new Hono<{ Bindings: VideoRoutesEnv; Variables: VideoRoutesVariables; }>(); -// ALO-121: resume-on-disconnect. Lets the client re-derive which chunks the -// server already has after a network failure, so we don't re-upload bytes -// the multipart already accepted. Auth-gated by user-scoped KV key. -videoRoutes.get('/api/videos/upload/:uploadId/status', async (c) => { - const user = c.get('user'); - if (!user) return c.json({ error: 'Unauthorized' }, 401); - - const uploadId = c.req.param('uploadId'); - const baseKvKey = uploadKvBase(user.id, uploadId); - const [metaJson, partsJson, mpid] = await Promise.all([ - c.env.SESSIONS.get(`${baseKvKey}:meta`), - c.env.SESSIONS.get(`${baseKvKey}:parts`), - c.env.SESSIONS.get(`${baseKvKey}:mpid`), - ]); - if (!metaJson || !mpid) { - return c.json({ error: 'Upload session not found', code: 'upload_not_found' }, 404); - } - const meta = uploadMetaPersistedSchema.parse(JSON.parse(metaJson)); - const partsMap = partsJson - ? (JSON.parse(partsJson) as Record) - : {}; - // Stored part numbers are 1-indexed; client uses 0-indexed chunk indexes. - const received = Object.keys(partsMap) - .map((p) => Number(p) - 1) - .filter((i) => Number.isInteger(i) && i >= 0) - .sort((a, b) => a - b); - const bytesReceived = Object.values(partsMap).reduce((sum, p) => sum + p.size, 0); - return c.json({ - uploadId, - chunkCount: meta.chunkCount, - received, - bytesReceived, - complete: false, - }); -}); - videoRoutes.get('/api/videos/trending', async (c) => { const parsed = trendingQuerySchema.safeParse(c.req.query()); if (!parsed.success) { @@ -471,13 +432,34 @@ videoRoutes.post('/api/videos/upload', async (c) => { } const resolvedUploadId = uploadId ?? crypto.randomUUID(); - - const baseKvKey = uploadKvBase(user.id, resolvedUploadId); - const mpidKey = `${baseKvKey}:mpid`; - const metaKey = `${baseKvKey}:meta`; - const partsKey = `${baseKvKey}:parts`; + const keys = uploadSessionKeys(user.id, resolvedUploadId); if (chunkIndex === 0) { + // ALO-121: idempotent chunk-0 retry. If the client already opened a + // multipart for this uploadId (network blip swallowed our 202 and the + // client retried with the same uploadId), reuse the existing R2 + // multipart instead of creating a second one — otherwise we'd leak + // an orphaned multipart and the parts manifest would split across two. + const existing = uploadId ? await readUploadSession(env, user.id, uploadId) : null; + if (existing) { + const multipart = env.VIDEOS.resumeMultipartUpload( + existing.meta.r2Key, + existing.multipartUploadId, + ); + const firstPart = await multipart.uploadPart(1, rawFile.stream()); + const updatedParts: UploadPartsMap = { + ...existing.parts, + '1': { etag: firstPart.etag, size: rawFile.size }, + }; + await env.SESSIONS.put(keys.parts, JSON.stringify(updatedParts), { + expirationTtl: UPLOAD_SESSION_TTL_SECONDS, + }); + return c.json( + { status: 'chunk_received', chunkIndex, chunkCount, uploadId: resolvedUploadId }, + 202, + ); + } + const videoId = crypto.randomUUID(); const r2Key = `${user.id}/${videoId}/${rawFile.name}`; const multipart = await env.VIDEOS.createMultipartUpload(r2Key, { @@ -486,60 +468,44 @@ videoRoutes.post('/api/videos/upload', async (c) => { const firstPart = await multipart.uploadPart(1, rawFile.stream()); - await env.SESSIONS.put(mpidKey, multipart.uploadId, { expirationTtl: 86400 }); + await env.SESSIONS.put(keys.mpid, multipart.uploadId, { + expirationTtl: UPLOAD_SESSION_TTL_SECONDS, + }); await env.SESSIONS.put( - metaKey, + keys.meta, JSON.stringify({ videoId, r2Key, title: metadataParsed.data.title, description: metadataParsed.data.description, chunkCount, + fileName: rawFile.name, + fileSize: rawFile.size, }), - { expirationTtl: 86400 }, - ); - await env.SESSIONS.put( - partsKey, - JSON.stringify({ '1': { etag: firstPart.etag, size: rawFile.size } } as Record< - string, - { etag: string; size: number } - >), - { expirationTtl: 86400 }, + { expirationTtl: UPLOAD_SESSION_TTL_SECONDS }, ); + const initialParts: UploadPartsMap = { + '1': { etag: firstPart.etag, size: rawFile.size }, + }; + await env.SESSIONS.put(keys.parts, JSON.stringify(initialParts), { + expirationTtl: UPLOAD_SESSION_TTL_SECONDS, + }); return c.json({ status: 'chunk_received', chunkIndex, chunkCount, uploadId: resolvedUploadId }, 202); } - const [multipartUploadId, uploadMetaJson, partsJson] = await Promise.all([ - env.SESSIONS.get(mpidKey), - env.SESSIONS.get(metaKey), - env.SESSIONS.get(partsKey), - ]); - - if (!multipartUploadId || !uploadMetaJson) { + const session = await readUploadSession(env, user.id, resolvedUploadId); + if (!session) { return c.json({ error: 'Missing upload session. Start with chunkIndex=0.' }, 400); } - const uploadMeta = uploadMetaPersistedSchema.parse(JSON.parse(uploadMetaJson)); + const uploadMeta = session.meta; + const uploadedPartsMap: UploadPartsMap = { ...session.parts }; - const uploadedPartsMap = partsJson - ? (JSON.parse(partsJson) as Record) - : {}; - - // ALO-121: idempotent retry. If the client retries a chunk we already - // accepted (typical on flaky connections where the response was lost), - // ack without re-paying the multipart upload cost. - const partKey = String(chunkIndex + 1); - if (uploadedPartsMap[partKey]) { - if (chunkIndex < chunkCount - 1) { - return c.json( - { status: 'chunk_received', chunkIndex, chunkCount, idempotent: true }, - 202, - ); - } - // Final chunk re-send: fall through so completion logic still runs. - } - - const priorBytes = Object.values(uploadedPartsMap).reduce((sum, part) => sum + part.size, 0); + // chunk-N retries are idempotent: re-uploading a part overwrites the + // previous one in R2, and the parts manifest just gets the new etag/size. + const priorBytes = Object.entries(uploadedPartsMap) + .filter(([partNumber]) => Number(partNumber) !== chunkIndex + 1) + .reduce((sum, [, part]) => sum + part.size, 0); if (priorBytes + rawFile.size > MAX_VIDEO_BYTES) { return c.json( { error: `Upload exceeds ${MAX_VIDEO_BYTES} bytes`, code: 'file_too_large' }, @@ -547,15 +513,13 @@ videoRoutes.post('/api/videos/upload', async (c) => { ); } - const multipart = env.VIDEOS.resumeMultipartUpload(uploadMeta.r2Key, multipartUploadId); - const alreadyHavePart = Boolean(uploadedPartsMap[partKey]); - if (!alreadyHavePart) { - const uploadedPart = await multipart.uploadPart(chunkIndex + 1, rawFile.stream()); - uploadedPartsMap[partKey] = { etag: uploadedPart.etag, size: rawFile.size }; - await env.SESSIONS.put(partsKey, JSON.stringify(uploadedPartsMap), { - expirationTtl: 86400, - }); - } + const multipart = env.VIDEOS.resumeMultipartUpload(uploadMeta.r2Key, session.multipartUploadId); + const uploadedPart = await multipart.uploadPart(chunkIndex + 1, rawFile.stream()); + + uploadedPartsMap[String(chunkIndex + 1)] = { etag: uploadedPart.etag, size: rawFile.size }; + await env.SESSIONS.put(keys.parts, JSON.stringify(uploadedPartsMap), { + expirationTtl: UPLOAD_SESSION_TTL_SECONDS, + }); if (chunkIndex < chunkCount - 1) { return c.json({ status: 'chunk_received', chunkIndex, chunkCount }, 202); @@ -572,15 +536,11 @@ videoRoutes.post('/api/videos/upload', async (c) => { // ALO-139: authoritative quota check at completion. Catches the case // where the first-chunk precheck passed but a parallel upload (or a // very large total via many small chunks) would push the user over. - const totalBytes = Object.values(uploadedPartsMap).reduce((sum, p) => sum + p.size, 0); + const totalBytes = priorBytes + rawFile.size; const finalUsage = await getStorageUsage(env, user.id); if (!hasRoomFor(finalUsage, totalBytes)) { await multipart.abort().catch(() => {}); - await Promise.all([ - env.SESSIONS.delete(mpidKey), - env.SESSIONS.delete(metaKey), - env.SESSIONS.delete(partsKey), - ]); + await deleteUploadSession(env, user.id, resolvedUploadId); return c.json( { error: 'Storage quota exceeded.', @@ -615,11 +575,50 @@ videoRoutes.post('/api/videos/upload', async (c) => { }); await bumpTrendingCacheVersion(env.CACHE); - await Promise.all([env.SESSIONS.delete(mpidKey), env.SESSIONS.delete(metaKey), env.SESSIONS.delete(partsKey)]); + await deleteUploadSession(env, user.id, resolvedUploadId); return c.json({ id: uploadMeta.videoId, status: 'queued' }, 201); }); +// ALO-121 resume support. The frontend persists `uploadId` to localStorage +// after chunk-0; on remount/reload it calls this endpoint to learn which +// chunks (0-indexed) are already on the server, then re-uploads only the +// missing ones. The session row in KV doubles as the source of truth for +// `chunkCount`/`r2Key`/file fingerprint, so a stale localStorage entry +// (different file picked, session expired) is detected here. +videoRoutes.get('/api/videos/upload/:uploadId/status', async (c) => { + const user = c.get('user'); + if (!user) return c.json({ error: 'Unauthorized' }, 401); + + const uploadId = c.req.param('uploadId'); + const session = await readUploadSession(c.env, user.id, uploadId); + if (!session) { + return c.json({ error: 'Upload session not found' }, 404); + } + + return c.json({ + uploadId, + chunkCount: session.meta.chunkCount, + uploadedChunks: uploadedChunkIndices(session.parts), + fileName: session.meta.fileName ?? null, + fileSize: session.meta.fileSize ?? null, + title: session.meta.title, + description: session.meta.description, + }); +}); + +// ALO-121 user-initiated cancel. Aborts the R2 multipart so storage +// isn't billed for orphaned parts and clears the KV session keys. +// Idempotent: returns 204 whether the session existed or not. +videoRoutes.delete('/api/videos/upload/:uploadId', async (c) => { + const user = c.get('user'); + if (!user) return c.json({ error: 'Unauthorized' }, 401); + + const uploadId = c.req.param('uploadId'); + await abortUploadSession(c.env, user.id, uploadId); + return c.body(null, 204); +}); + videoRoutes.delete('/api/videos/:id', async (c) => { const user = c.get('user'); if (!user) {