diff --git a/.github/workflows/node.js.yml b/.github/workflows/node.js.yml index 5c2a99f..23284f2 100644 --- a/.github/workflows/node.js.yml +++ b/.github/workflows/node.js.yml @@ -5,7 +5,7 @@ name: Test on: pull_request: - branches: ["main"] + branches: ["main", "test"] push: branches: ["test"] diff --git a/components/dashboard/QueuePanel.js b/components/dashboard/QueuePanel.js index f8e875d..0dfda45 100644 --- a/components/dashboard/QueuePanel.js +++ b/components/dashboard/QueuePanel.js @@ -13,9 +13,29 @@ import { import { Download } from "lucide-react"; import { auth } from "../../lib/firebase"; +function friendlyReason(reason) { + if (!reason) return null; + if (reason.includes("interrupted upload") || reason.includes("memory limit")) { + return "Upload was interrupted by a server restart or memory limit."; + } + if (reason.includes("Upload exception") || reason.includes("fetch failed")) { + return "Could not connect to OSF."; + } + if (reason.includes("OSF error 503") || reason.includes("OSF error 502")) { + return "OSF was temporarily unavailable."; + } + if (reason.includes("OSF error 429")) { + return "OSF rate-limited the request."; + } + if (reason.includes("OSF error 401") || reason.includes("OSF error 403")) { + return "Authentication error. Your OSF token may need to be refreshed."; + } + return reason; +} + function statusBadge(status) { const labels = { - pending: { color: "orange", text: "Waiting to retry" }, + pending: { color: "orange", text: "Retrying" }, processing: { color: "blue", text: "Retrying now" }, failed: { color: "red", text: "Failed" }, }; @@ -27,6 +47,20 @@ function statusBadge(status) { ); } +function nextRetryText(nextRetryAt) { + if (!nextRetryAt) return null; + const t = nextRetryAt.toDate ? nextRetryAt.toDate() : new Date(nextRetryAt); + const msUntil = t.getTime() - Date.now(); + if (msUntil <= 0) return "soon"; + const minUntil = Math.ceil(msUntil / (60 * 1000)); + if (minUntil >= 60) { + const hours = Math.floor(minUntil / 60); + const mins = minUntil % 60; + return `in ${hours}h${mins > 0 ? ` ${mins}m` : ""}`; + } + return `in ${minUntil}m`; +} + function timeRemaining(createdAt) { if (!createdAt) return null; const created = createdAt.toDate ? createdAt.toDate() : new Date(createdAt); @@ -36,9 +70,9 @@ function timeRemaining(createdAt) { const hoursLeft = Math.floor(msLeft / (60 * 60 * 1000)); if (hoursLeft >= 24) { const days = Math.floor(hoursLeft / 24); - return `${days}d ${hoursLeft % 24}h remaining`; + return `${days}d ${hoursLeft % 24}h`; } - return `${hoursLeft}h remaining`; + return `${hoursLeft}h`; } async function fetchFile(experimentId, entryId) { @@ -51,7 +85,12 @@ async function fetchFile(experimentId, entryId) { ); } -export default function QueuePanel({ entries, experimentId, errorLog }) { +/** + * QueuePanel — shows all queued uploads (pending + failed) with immediate + * download access. Pending items are being retried automatically but the + * researcher can download them right away without waiting. + */ +export default function QueuePanel({ entries, experimentId }) { const [downloading, setDownloading] = useState(null); const [downloadingAll, setDownloadingAll] = useState(false); @@ -113,13 +152,18 @@ export default function QueuePanel({ entries, experimentId, errorLog }) { const plural = (n, word) => `${n} ${word}${n !== 1 ? "s" : ""}`; - let alertTitle = ""; - if (pendingCount > 0 && failedCount > 0) { - alertTitle = `${plural(pendingCount, "file")} waiting to upload, ${plural(failedCount, "file")} failed.`; - } else if (pendingCount > 0) { - alertTitle = `${plural(pendingCount, "file")} waiting to upload to OSF.`; - } else { + let alertTitle; + let alertDescription; + + if (allFailed) { alertTitle = `${plural(failedCount, "file")} could not be uploaded to OSF.`; + alertDescription = "All retries were exhausted. Download these files and upload them to your OSF project manually to prevent data loss."; + } else if (failedCount > 0) { + alertTitle = `${plural(entries.length, "file")} did not upload to OSF.`; + alertDescription = `${plural(pendingCount, "file")} still being retried. ${plural(failedCount, "file")} failed permanently. You can download all files below.`; + } else { + alertTitle = `${plural(pendingCount, "file")} did not upload to OSF.`; + alertDescription = "DataPipe is retrying automatically. You can also download the files now."; } return ( @@ -127,30 +171,38 @@ export default function QueuePanel({ entries, experimentId, errorLog }) { {alertTitle} - - {allFailed - ? "These files could not be delivered after multiple attempts. Download them to avoid data loss." - : "DataPipe will keep retrying automatically. Files are stored for up to 1 week. You can also download them below."} - + {alertDescription} - Why am I seeing this? + Why did these uploads fail? When a participant submits data, DataPipe tries to upload it to - your OSF project immediately. If that transfer fails — for - example, because OSF is temporarily unavailable, rate-limiting - requests, or there is a configuration issue with your project — - DataPipe saves a copy of the data and retries automatically over - the next several days. The files listed here are those saved - copies. Once a retry succeeds the file will disappear from this - list. If all retries are exhausted, you can still download the - data and upload it to OSF manually. + your OSF project immediately. If that fails, DataPipe saves a + copy and retries automatically. Common reasons include: + + + + Server memory limit — Large data submissions + can occasionally exceed the server's memory capacity. + + + OSF unavailable — OSF may be temporarily + down or rate-limiting requests. + + + Configuration issue — There may be a problem + with your OSF project settings or authentication token. + + + + Files are stored for up to 7 days. If retries don't succeed, + download the files and upload them to OSF manually. @@ -167,78 +219,68 @@ export default function QueuePanel({ entries, experimentId, errorLog }) { Download all as ZIP - - - - - View file details - - - - - - - - FILENAME - STATUS - EXPIRES - ATTEMPTS - DOWNLOAD - - - - {entries.map((entry) => ( - - - {entry.filename} - {entry.failureReason && ( - - {entry.failureReason} - - )} - - {statusBadge(entry.status)} - {timeRemaining(entry.createdAt)} - - {entry.retryCount}/{entry.maxRetries} - - - handleDownload(entry)} - > - - - - - ))} - {errorLog && errorLog.map((error, index) => ( - - - {error.error} - - {error.time} - - - - - Error - - - - - - - - - - ))} - - - - - + + + + FILENAME + STATUS + REASON + STORED FOR + + + + + {entries.map((entry) => ( + + {entry.filename} + + {statusBadge(entry.status)} + {(entry.status === "pending" || entry.status === "processing") && + entry.nextRetryAt && ( + + Next retry {nextRetryText(entry.nextRetryAt)} + + )} + + + + {friendlyReason(entry.failureReason) || "\u2014"} + + + + + {timeRemaining(entry.createdAt) || "\u2014"} + + + + handleDownload(entry)} + > + + + + + ))} + + ); } + +/** + * UploadsResolvedNotice — brief success confirmation shown when + * previously pending/failed uploads have all been resolved. + */ +export function UploadsResolvedNotice() { + return ( + + + All queued uploads completed successfully. + + ); +} diff --git a/firebase.json b/firebase.json index 06c22ff..d5122af 100644 --- a/firebase.json +++ b/firebase.json @@ -73,6 +73,10 @@ "port": 5000, "host": "localhost" }, + "storage": { + "port": 9199, + "host": "localhost" + }, "ui": { "enabled": true } diff --git a/functions/src/__tests__/data-emulator.test.js b/functions/src/__tests__/data-emulator.test.js index 729e14d..9a88130 100644 --- a/functions/src/__tests__/data-emulator.test.js +++ b/functions/src/__tests__/data-emulator.test.js @@ -30,17 +30,6 @@ const config = { jest.setTimeout(30000); -async function waitForLog(db, docId, field, expectedValue, timeoutMs = 10000) { - const start = Date.now(); - while (Date.now() - start < timeoutMs) { - const doc = await db.collection("logs").doc(docId).get(); - if (doc.exists && doc.data()?.[field] === expectedValue) { - return doc; - } - await new Promise((resolve) => setTimeout(resolve, 250)); - } - return db.collection("logs").doc(docId).get(); -} beforeAll(async () => { initializeApp(config); @@ -78,12 +67,17 @@ describe("apiData", () => { it("should increment the write request log for the experiment when there is a complete request", async () => { const db = getFirestore(); await db.collection("logs").doc("testlog").delete(); + // writeLog is awaited inside apiData before the response is sent, + // so the log document should exist by the time we get the response. await saveData({ experimentID: "testlog", data: "test", filename: "test", }); - let doc = await waitForLog(db, "testlog", "saveData", 1); + // Small delay to allow Firestore emulator to sync + await new Promise((resolve) => setTimeout(resolve, 500)); + let doc = await db.collection("logs").doc("testlog").get(); + expect(doc.exists).toBe(true); expect(doc.data().saveData).toBe(1); await saveData({ @@ -91,7 +85,8 @@ describe("apiData", () => { data: "test", filename: "test", }); - doc = await waitForLog(db, "testlog", "saveData", 2); + await new Promise((resolve) => setTimeout(resolve, 500)); + doc = await db.collection("logs").doc("testlog").get(); expect(doc.data().saveData).toBe(2); }); @@ -106,7 +101,9 @@ describe("apiData", () => { filename: "test", }); - let doc = await waitForLog(db, "data-testexp", "logError", 1); + await new Promise((resolve) => setTimeout(resolve, 500)); + let doc = await db.collection("logs").doc("data-testexp").get(); + expect(doc.exists).toBe(true); expect(doc.data().logError).toBe(1); await db.collection("experiments").doc("data-testexp").set( @@ -124,7 +121,8 @@ describe("apiData", () => { filename: "test", }); - doc = await waitForLog(db, "data-testexp", "logError", 2); + await new Promise((resolve) => setTimeout(resolve, 500)); + doc = await db.collection("logs").doc("data-testexp").get(); expect(doc.data().logError).toBe(2); }); diff --git a/functions/src/__tests__/early-persist-emulator.test.js b/functions/src/__tests__/early-persist-emulator.test.js new file mode 100644 index 0000000..9c2ad09 --- /dev/null +++ b/functions/src/__tests__/early-persist-emulator.test.js @@ -0,0 +1,161 @@ +/** + * @jest-environment node + */ + +import { initializeApp } from "firebase-admin/app"; +import { getFirestore } from "firebase-admin/firestore"; +import { getStorage } from "firebase-admin/storage"; +import express from "express"; +import MESSAGES from "../api-messages"; + +process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080"; +process.env.FIREBASE_STORAGE_EMULATOR_HOST = "localhost:9199"; + +jest.setTimeout(30000); + +const config = { + projectId: "datapipe-test", + storageBucket: "datapipe-test.appspot.com", +}; + +async function saveData(body) { + const response = await fetch( + "http://localhost:5001/datapipe-test/us-central1/apidata", + { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "*/*", + }, + body: JSON.stringify(body), + } + ); + const message = await response.json(); + return { status: response.status, body: message }; +} + +async function listPendingFiles(bucket, experimentID) { + const [files] = await bucket.getFiles({ + prefix: `pending-data/${experimentID}/`, + }); + return files; +} + +const sampleData = `[{ + "trial_type": "html-keyboard-response", + "trial_index": 1, + "time_elapsed": 776 +}]`; + +let db; +let bucket; +let mockServerInstance; +let mockServerPort; + +function createMockOSFServer() { + const app = express(); + app.put("/endpoint", (req, res) => { + res.status(201).json({ data: { attributes: { name: req.query.name || "uploaded.json" } } }); + }); + return new Promise((resolve) => { + const server = app.listen(0, () => { + resolve(server); + }); + }); +} + +beforeAll(async () => { + mockServerInstance = await createMockOSFServer(); + mockServerPort = mockServerInstance.address().port; + + const app = initializeApp(config); + db = getFirestore(); + bucket = getStorage(app).bucket(); + + await db.collection("users").doc("persist-test-user").set({ + osfTokenValid: true, + osfToken: "valid", + usingPersonalToken: true, + }); + + await db.collection("experiments").doc("persist-test-exp").set({ + active: true, + metadataActive: false, + owner: "persist-test-user", + osfFilesLink: `http://localhost:${mockServerPort}/endpoint`, + }); + + await db.collection("experiments").doc("persist-test-inactive").set({ + active: false, + owner: "persist-test-user", + }); +}); + +afterAll(async () => { + mockServerInstance.close(); +}); + +describe("early persist data loss prevention", () => { + it("should clean up pending data after successful OSF upload", async () => { + const result = await saveData({ + experimentID: "persist-test-exp", + data: sampleData, + filename: "test-persist-cleanup.json", + }); + + // The request should succeed + expect(result.body.message).toEqual("Success"); + + // After success, there should be no pending files for this experiment + const pendingFiles = await listPendingFiles(bucket, "persist-test-exp"); + const matchingFiles = pendingFiles.filter((f) => + f.name.includes("test-persist-cleanup") + ); + expect(matchingFiles.length).toBe(0); + }); + + it("should not persist data when validation fails before persist step", async () => { + // Missing parameters should fail before the persist step + const result = await saveData({ + experimentID: "persist-test-exp", + }); + + expect(result.body).toEqual(MESSAGES.MISSING_PARAMETER); + + // No pending files should exist since we failed before persist + const pendingFiles = await listPendingFiles(bucket, "persist-test-exp"); + expect(pendingFiles.length).toBe(0); + }); + + it("should not persist data when experiment is inactive", async () => { + const result = await saveData({ + experimentID: "persist-test-inactive", + data: sampleData, + filename: "test-inactive.json", + }); + + expect(result.body).toEqual(MESSAGES.DATA_COLLECTION_NOT_ACTIVE); + + // No pending files since we fail before persist step + const pendingFiles = await listPendingFiles( + bucket, + "persist-test-inactive" + ); + expect(pendingFiles.length).toBe(0); + }); + + it("should handle multiple submissions without leaving pending files", async () => { + // Submit multiple requests + for (let i = 0; i < 3; i++) { + await saveData({ + experimentID: "persist-test-exp", + data: sampleData, + filename: `test-multi-${i}.json`, + }); + } + + // All pending files should be cleaned up + const pendingFiles = await listPendingFiles(bucket, "persist-test-exp"); + expect(pendingFiles.length).toBe(0); + }); +}); diff --git a/functions/src/__tests__/skip-metadata-emulator.test.js b/functions/src/__tests__/skip-metadata-emulator.test.js new file mode 100644 index 0000000..2bb7dd0 --- /dev/null +++ b/functions/src/__tests__/skip-metadata-emulator.test.js @@ -0,0 +1,112 @@ +/** + * @jest-environment node + */ + +import { initializeApp } from "firebase-admin/app"; +import { getFirestore } from "firebase-admin/firestore"; +import MESSAGES from "../api-messages"; + +process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080"; + +jest.setTimeout(30000); + +const config = { + projectId: "datapipe-test", +}; + +async function saveData(body) { + const response = await fetch( + "http://localhost:5001/datapipe-test/us-central1/apidata", + { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "*/*", + }, + body: JSON.stringify(body), + } + ); + const message = await response.json(); + return message; +} + +const sampleData = `[{ + "trial_type": "html-keyboard-response", + "trial_index": 1, + "time_elapsed": 776 +}]`; + +beforeAll(async () => { + initializeApp(config); + const db = getFirestore(); + + await db.collection("users").doc("skip-metadata-user").set({ + osfTokenValid: true, + osfToken: "valid", + usingPersonalToken: true, + }); + + // Experiment with metadata disabled + await db.collection("experiments").doc("skip-metadata-exp").set({ + active: true, + metadataActive: false, + owner: "skip-metadata-user", + osfFilesLink: "http://localhost:3000/endpoint", + }); + + // Experiment with metadata enabled (for comparison) + await db.collection("experiments").doc("skip-metadata-exp-active").set({ + active: true, + metadataActive: true, + owner: "skip-metadata-user", + osfFilesLink: "http://localhost:3000/endpoint", + }); +}); + +describe("skip metadata when inactive", () => { + it("should not produce metadata when metadataActive is false", async () => { + const response = await saveData({ + experimentID: "skip-metadata-exp", + data: sampleData, + filename: "test-skip-metadata.json", + }); + + // When metadata is skipped, metadataMessage should be empty + // and the response should not contain metadata error info + expect(response.metadataMessage).toBeFalsy(); + }); + + it("should not create a metadata document in Firestore when metadataActive is false", async () => { + const db = getFirestore(); + + // Clear any existing metadata doc + await db.collection("metadata").doc("skip-metadata-exp").delete(); + + await saveData({ + experimentID: "skip-metadata-exp", + data: sampleData, + filename: "test-skip-metadata-2.json", + }); + + const metadataDoc = await db + .collection("metadata") + .doc("skip-metadata-exp") + .get(); + + // No metadata document should have been created + expect(metadataDoc.exists).toBe(false); + }); + + it("should still attempt metadata processing when metadataActive is true", async () => { + const response = await saveData({ + experimentID: "skip-metadata-exp-active", + data: sampleData, + filename: "test-with-metadata.json", + }); + + // When metadata is active, the function will attempt to process metadata. + // The response should have the metadataMessage key present, confirming + // the metadata code path was entered (unlike when metadataActive is false). + expect(response).toHaveProperty("metadataMessage"); + }); +}); diff --git a/functions/src/api-base64.ts b/functions/src/api-base64.ts index e4279b2..46173d7 100644 --- a/functions/src/api-base64.ts +++ b/functions/src/api-base64.ts @@ -7,9 +7,10 @@ import isBase64 from "is-base64"; import MESSAGES from "./api-messages.js"; import resolveToken from "./resolve-token.js"; import queueUpload from "./queue-upload.js"; +import { persistPending, cleanupPending } from "./persist-pending.js"; import { ExperimentData, UserData, OSFResult } from './interfaces'; -export const apiBase64 = onRequest({ cors: true }, async (req, res) => { +export const apiBase64 = onRequest({ cors: true, memory: "512MiB", concurrency: 1 }, async (req, res) => { const { experimentID, data, filename } = req.body; if (!experimentID || !data || !filename) { @@ -64,6 +65,19 @@ export const apiBase64 = onRequest({ cors: true }, async (req, res) => { return; } + // Persist data to Cloud Storage immediately after validation. + // This ensures the data survives even if the function OOM-crashes + // during heavy processing (OSF upload). + let pendingPath: string; + try { + pendingPath = await persistPending(experimentID, filename, data); + } catch (e) { + const detail = e instanceof Error ? e.message : "Unknown error"; + res.status(500).json(MESSAGES.DATA_PERSIST_ERROR); + await writeLog(experimentID, "logError", {...MESSAGES.DATA_PERSIST_ERROR, detail}); + return; + } + const user_doc = await db.doc(`users/${exp_data.owner}`).get(); if (!user_doc.exists) { res.status(400).json(MESSAGES.INVALID_OWNER); @@ -116,6 +130,7 @@ export const apiBase64 = onRequest({ cors: true }, async (req, res) => { errorCode: 0, sessionIncremented: false, failureReason: `Upload exception: ${detail}`, }); + await cleanupPending(pendingPath); // queue-upload has its own copy res.status(202).json(MESSAGES.OSF_UPLOAD_QUEUED); await writeLog(experimentID, "logError", {...MESSAGES.OSF_UPLOAD_EXCEPTION, detail}); return; @@ -140,6 +155,7 @@ export const apiBase64 = onRequest({ cors: true }, async (req, res) => { errorCode: result.errorCode || 0, sessionIncremented: false, failureReason: `OSF error ${result.errorCode}: ${result.errorText}`, }); + await cleanupPending(pendingPath); // queue-upload has its own copy res.status(202).json(MESSAGES.OSF_UPLOAD_QUEUED); await writeLog(experimentID, "logError", {...MESSAGES.OSF_UPLOAD_ERROR, osfStatus: result.errorCode, osfStatusText: result.errorText}); return; @@ -150,5 +166,8 @@ export const apiBase64 = onRequest({ cors: true }, async (req, res) => { } } + // Data successfully uploaded to OSF — clean up the pending copy. + await cleanupPending(pendingPath); + res.status(201).json(MESSAGES.SUCCESS); }); diff --git a/functions/src/api-data.ts b/functions/src/api-data.ts index 9c0b46b..3c14c3a 100644 --- a/functions/src/api-data.ts +++ b/functions/src/api-data.ts @@ -9,9 +9,10 @@ import MESSAGES from "./api-messages.js"; import blockMetadata from "./metadata-block.js"; import resolveToken from "./resolve-token.js"; import queueUpload from "./queue-upload.js"; +import { persistPending, cleanupPending } from "./persist-pending.js"; import { ExperimentData, UserData, MetadataResponse, OSFResult, RequestBody } from './interfaces'; -export const apiData = onRequest({ cors: true }, async (req, res) => { +export const apiData = onRequest({ cors: true, memory: "512MiB", concurrency: 1 }, async (req, res) => { const { experimentID, data, filename, metadataOptions }: RequestBody = req.body; if (!experimentID || !data || !filename) { @@ -74,6 +75,19 @@ export const apiData = onRequest({ cors: true }, async (req, res) => { } } + // Persist data to Cloud Storage immediately after validation. + // This ensures the data survives even if the function OOM-crashes + // during heavy processing (metadata, OSF upload). + let pendingPath: string; + try { + pendingPath = await persistPending(experimentID, filename, data, metadataOptions); + } catch (e) { + const detail = e instanceof Error ? e.message : "Unknown error"; + res.status(500).json(MESSAGES.DATA_PERSIST_ERROR); + await writeLog(experimentID, "logError", {...MESSAGES.DATA_PERSIST_ERROR, detail}); + return; + } + const user_doc: DocumentSnapshot = await db.doc(`users/${exp_data.owner}`).get(); if (!user_doc.exists) { @@ -111,18 +125,24 @@ export const apiData = onRequest({ cors: true }, async (req, res) => { //METADATA BLOCK START - //Creates or references a document containing the metadata for the experiment in the metdata collection on Firestore. - const metadata_doc_ref: DocumentReference = db.collection("metadata").doc(experimentID); + let metadataMessage: string = ''; - const metadataResponse: MetadataResponse = await blockMetadata(exp_data, user_data, metadata_doc_ref, data, metadataOptions); + if (exp_data.metadataActive) { + //Creates or references a document containing the metadata for the experiment in the metdata collection on Firestore. + const metadata_doc_ref: DocumentReference = db.collection("metadata").doc(experimentID); - if (metadataResponse.success === false) { - res.status(400).json(metadataResponse); - await writeLog(experimentID, "logError", {...MESSAGES.METADATA_ERROR, detail: metadataResponse.message}); - return; + const metadataResponse: MetadataResponse = await blockMetadata(exp_data, user_data, metadata_doc_ref, data, metadataOptions); + + if (metadataResponse.success === false) { + await cleanupPending(pendingPath); + res.status(400).json(metadataResponse); + await writeLog(experimentID, "logError", {...MESSAGES.METADATA_ERROR, detail: metadataResponse.message}); + return; + } + + metadataMessage = metadataResponse.metadataMessage; } - const metadataMessage: string = metadataResponse.metadataMessage; //METADATA BLOCK END let result: OSFResult; @@ -144,6 +164,7 @@ export const apiData = onRequest({ cors: true }, async (req, res) => { failureReason: `Upload exception: ${detail}`, }); await exp_doc_ref.set({ sessions: FieldValue.increment(1) }, { merge: true }); + await cleanupPending(pendingPath); // queue-upload has its own copy res.status(202).json({...MESSAGES.OSF_UPLOAD_QUEUED, metadataMessage}); await writeLog(experimentID, "logError", {...MESSAGES.OSF_UPLOAD_EXCEPTION, detail}); return; @@ -169,6 +190,7 @@ export const apiData = onRequest({ cors: true }, async (req, res) => { failureReason: `OSF error ${result.errorCode}: ${result.errorText}`, }); await exp_doc_ref.set({ sessions: FieldValue.increment(1) }, { merge: true }); + await cleanupPending(pendingPath); // queue-upload has its own copy res.status(202).json({...MESSAGES.OSF_UPLOAD_QUEUED, metadataMessage}); await writeLog(experimentID, "logError", {...MESSAGES.OSF_UPLOAD_ERROR, osfStatus: result.errorCode, osfStatusText: result.errorText}); return; @@ -181,5 +203,8 @@ export const apiData = onRequest({ cors: true }, async (req, res) => { await exp_doc_ref.set({ sessions: FieldValue.increment(1) }, { merge: true }); + // Data successfully uploaded to OSF — clean up the pending copy. + await cleanupPending(pendingPath); + res.status(201).json({...MESSAGES.SUCCESS, metadataMessage}); }); diff --git a/functions/src/api-messages.ts b/functions/src/api-messages.ts index da3de58..cb9bec1 100644 --- a/functions/src/api-messages.ts +++ b/functions/src/api-messages.ts @@ -109,6 +109,10 @@ const MESSAGES = { METADATA_IN_OSF_AND_FIRESTORE: { metadataMessage : "Metadata is in OSF and in Firestore", }, + DATA_PERSIST_ERROR: { + error: "DATA_PERSIST_ERROR", + message: "Failed to save data. The data was not stored. If this is from a live experiment, participants may need to resubmit.", + }, OSF_UPLOAD_QUEUED: { error: null, message: "Data received. OSF upload will be retried automatically.", diff --git a/functions/src/index.ts b/functions/src/index.ts index 9219688..ecbb6c1 100644 --- a/functions/src/index.ts +++ b/functions/src/index.ts @@ -8,6 +8,7 @@ import { oauth2Regenerate } from "./oauth2-regenerate.js"; import { checkEmailConflict } from "./check-email-conflict.js"; import { scheduledTokenRefresh } from "./scheduled-token-refresh.js"; import { scheduledUploadRetry } from "./scheduled-upload-retry.js"; +import { scheduledPendingRecovery } from "./scheduled-pending-recovery.js"; import { apiQueueStatus } from "./api-queue-status.js"; import { generateOAuthState } from "./generate-oauth-state.js"; import { saveOsfToken } from "./save-osf-token.js"; @@ -27,6 +28,7 @@ export { checkEmailConflict as checkemailconflict, scheduledTokenRefresh as scheduledtokenrefresh, scheduledUploadRetry as scheduleduploadretry, + scheduledPendingRecovery as scheduledpendingrecovery, apiQueueStatus as apiqueuestatus, generateOAuthState as generateoauthstate, saveOsfToken as saveosftoken, diff --git a/functions/src/persist-pending.ts b/functions/src/persist-pending.ts new file mode 100644 index 0000000..69b8caf --- /dev/null +++ b/functions/src/persist-pending.ts @@ -0,0 +1,59 @@ +import { storage } from "./app.js"; + +const PENDING_PREFIX = "pending-data"; + +interface PendingEnvelope { + experimentID: string; + filename: string; + data: string; + metadataOptions?: object; +} + +/** + * Persist incoming request data to Cloud Storage immediately after validation, + * before any heavy processing. This ensures data survives OOM crashes. + * Stores the full request envelope (data + metadataOptions) so the recovery + * function can replay the complete processing pipeline including metadata. + * Returns the storage path for later cleanup. + */ +export async function persistPending( + experimentID: string, + filename: string, + data: string, + metadataOptions?: object +): Promise { + const timestamp = Date.now(); + const safeName = filename.replace(/[/\\]/g, "_"); + const storagePath = `${PENDING_PREFIX}/${experimentID}/${safeName}_${timestamp}`; + + const envelope: PendingEnvelope = { experimentID, filename, data, metadataOptions }; + + const bucket = storage.bucket(); + const file = bucket.file(storagePath); + await file.save(JSON.stringify(envelope), { contentType: "application/json" }); + return storagePath; +} + +/** + * Read a pending envelope from Cloud Storage. + */ +export async function readPendingEnvelope(storagePath: string): Promise { + const bucket = storage.bucket(); + const file = bucket.file(storagePath); + const [contents] = await file.download(); + return JSON.parse(contents.toString("utf-8")) as PendingEnvelope; +} + +/** + * Remove the pending data file after successful processing. + */ +export async function cleanupPending(storagePath: string): Promise { + try { + const bucket = storage.bucket(); + const file = bucket.file(storagePath); + await file.delete(); + } catch { + // Non-critical: if cleanup fails, the file will remain in storage + // but won't cause any issues. A scheduled cleanup can handle stragglers. + } +} diff --git a/functions/src/scheduled-pending-recovery.ts b/functions/src/scheduled-pending-recovery.ts new file mode 100644 index 0000000..15c9800 --- /dev/null +++ b/functions/src/scheduled-pending-recovery.ts @@ -0,0 +1,180 @@ +import { onSchedule } from "firebase-functions/v2/scheduler"; +import { Timestamp } from "firebase-admin/firestore"; +import { db, storage } from "./app.js"; +import { readPendingEnvelope, cleanupPending } from "./persist-pending.js"; +import { ExperimentData } from "./interfaces.js"; + +const PENDING_PREFIX = "pending-data/"; + +// Files older than this are considered orphaned (the original request +// either OOM-crashed or timed out without cleaning up). +const STALE_THRESHOLD_MS = 15 * 60 * 1000; // 15 minutes + +// Process at most this many files per run to stay within time/memory limits. +const MAX_FILES_PER_RUN = 10; + +const MAX_RETRIES = 5; + +/** + * Scheduled function that runs every 15 minutes to recover data that was + * persisted to Cloud Storage but never uploaded to OSF (e.g., because the + * original api-data function OOM-crashed). + * + * Instead of attempting the OSF upload directly, this function promotes + * orphaned pending files into the existing uploadQueue system. This means: + * - The data immediately appears in the researcher's dashboard QueuePanel + * - The existing scheduled-upload-retry handles retries with exponential backoff + * - The researcher can download the data manually if all retries fail + * - No duplicate retry infrastructure is needed + */ +export const scheduledPendingRecovery = onSchedule( + { schedule: "*/15 * * * *", memory: "256MiB" }, + async () => { + await recoverPendingUploads(); + } +); + +async function recoverPendingUploads() { + const bucket = storage.bucket(); + const cutoffTime = new Date(Date.now() - STALE_THRESHOLD_MS); + + // List files under pending-data/ prefix + const [files] = await bucket.getFiles({ + prefix: PENDING_PREFIX, + maxResults: MAX_FILES_PER_RUN * 2, // fetch extra in case some are too recent + }); + + if (files.length === 0) { + return; + } + + let processed = 0; + + for (const file of files) { + if (processed >= MAX_FILES_PER_RUN) break; + + // Check file age via metadata + const [metadata] = await file.getMetadata(); + const createdAt = new Date(metadata.timeCreated as string); + + if (createdAt > cutoffTime) { + // File is recent — the original request may still be processing + continue; + } + + console.log(`Recovering pending data: ${file.name}`); + + try { + await promoteToQueue(file); + processed++; + } catch (e) { + const detail = e instanceof Error ? e.message : "Unknown error"; + console.error(`Failed to recover ${file.name}: ${detail}`); + } + } + + if (processed > 0) { + console.log(`Promoted ${processed} pending file(s) to upload queue.`); + } +} + +/** + * Promote an orphaned pending file into the uploadQueue system. + * + * 1. Read the pending envelope to get experiment/filename/data + * 2. Look up the experiment to get the owner and osfFilesLink + * 3. Copy the data to upload-queue/ storage (where queue-status API expects it) + * 4. Create an uploadQueue Firestore document + * 5. Clean up the pending-data/ file + */ +async function promoteToQueue( + file: ReturnType["file"]> +) { + // Read the envelope + let envelope; + try { + envelope = await readPendingEnvelope(file.name); + } catch (e) { + const detail = e instanceof Error ? e.message : "Unknown error"; + console.error(`Failed to read pending envelope ${file.name}: ${detail}. Deleting corrupt file.`); + await file.delete(); + return; + } + + const { experimentID, filename, data } = envelope; + + // Look up the experiment to get owner and osfFilesLink + const expDoc = await db.collection("experiments").doc(experimentID).get(); + if (!expDoc.exists) { + console.warn(`Experiment ${experimentID} not found. Deleting orphaned file ${file.name}.`); + await cleanupPending(file.name); + return; + } + + const expData = expDoc.data() as ExperimentData; + + if (!expData.owner) { + console.warn(`Experiment ${experimentID} has no owner. Deleting orphaned file ${file.name}.`); + await cleanupPending(file.name); + return; + } + + // Check for deduplication and atomically create the queue entry via transaction. + // This prevents duplicate entries if two recovery runs overlap. + const deduplicationKey = `${experimentID}:${filename}`; + const docId = deduplicationKey.replace(/[/\\]/g, "_"); + const docRef = db.collection("uploadQueue").doc(docId); + + const storagePath = `upload-queue/${docId}`; + + const created = await db.runTransaction(async (transaction) => { + const existingDoc = await transaction.get(docRef); + if (existingDoc.exists) { + const status = existingDoc.data()?.status; + if (status === "pending" || status === "processing") { + return false; // Already queued + } + } + + const now = Timestamp.now(); + const nextRetryAt = Timestamp.fromMillis(now.toMillis() + 60 * 1000); // 1 minute — retry soon + + transaction.set(docRef, { + experimentID, + owner: expData.owner, + filename, + storagePath, + dataType: "data", + osfFilesLink: expData.osfFilesLink, + status: "pending", + errorCode: 0, + retryCount: 0, + maxRetries: MAX_RETRIES, + createdAt: now, + lastAttemptAt: null, + nextRetryAt, + completedAt: null, + failureReason: "Recovered from interrupted upload (server restart or memory limit)", + deduplicationKey, + sessionIncremented: false, + }); + + return true; + }); + + if (!created) { + console.log(`Queue entry already exists for ${deduplicationKey}. Cleaning up pending file.`); + await cleanupPending(file.name); + return; + } + + // Write data to upload-queue/ storage (where the queue-status API expects it) + const bucket = storage.bucket(); + const queueFile = bucket.file(storagePath); + await queueFile.save(data, { contentType: "text/plain" }); + + // Clean up the pending-data/ file + await cleanupPending(file.name); + + console.log(`Promoted ${filename} (experiment ${experimentID}) to upload queue.`); +} diff --git a/pages/admin/[experiment_id].js b/pages/admin/[experiment_id].js index c824b56..fb83efc 100644 --- a/pages/admin/[experiment_id].js +++ b/pages/admin/[experiment_id].js @@ -1,3 +1,4 @@ +import { useState, useEffect, useRef } from "react"; import AuthCheck from "../../components/AuthCheck"; import { useRouter } from "next/router"; import { useDocumentData, useCollectionData } from "react-firebase-hooks/firestore"; @@ -13,7 +14,7 @@ import ExperimentValidation from "../../components/dashboard/ExperimentValidatio import MetadataControl from "../../components/dashboard/MetadataControl"; import CodeHints from "../../components/dashboard/CodeHints"; import ErrorPanel from "../../components/dashboard/ErrorPanel"; -import QueuePanel from "../../components/dashboard/QueuePanel"; +import QueuePanel, { UploadsResolvedNotice } from "../../components/dashboard/QueuePanel"; export async function getServerSideProps() { return { props: {} }; @@ -48,10 +49,23 @@ function ExperimentPageDashboard({ experiment_id }) { const [, , , queueSnapshot] = useCollectionData(queueRef); const queueEntries = queueSnapshot?.docs.map(d => ({ id: d.id, ...d.data() })) || []; - const pendingUploads = queueEntries.filter(e => e.status === "pending" || e.status === "processing").length; const uploadError = logs?.logError; const errorLog = logs?.errors; + // Track resolved state: show success notice when queue goes from non-empty to empty + const [showResolved, setShowResolved] = useState(false); + const prevQueueCount = useRef(0); + + useEffect(() => { + const currentCount = queueEntries.length; + if (prevQueueCount.current > 0 && currentCount === 0) { + setShowResolved(true); + const timer = setTimeout(() => setShowResolved(false), 8000); + return () => clearTimeout(timer); + } + prevQueueCount.current = currentCount; + }, [queueEntries.length]); + return ( <> {loading && } @@ -66,24 +80,27 @@ function ExperimentPageDashboard({ experiment_id }) { {data.sessions || 0} session{data.sessions !== 1 ? "s" : ""} - {uploadError && queueEntries.length === 0 && ( + {uploadError && queueEntries.length === 0 && !showResolved && ( Data upload errors )} - {pendingUploads > 0 && ( - - {pendingUploads} upload{pendingUploads !== 1 ? "s" : ""} waiting - - )} - {pendingUploads === 0 && queueEntries.length > 0 && ( - - {queueEntries.length} failed upload{queueEntries.length !== 1 ? "s" : ""} + {queueEntries.length > 0 && ( + e.status === "failed") ? "red" : "orange"} + variant="solid" + px={2} + py={1} + > + {queueEntries.length} upload{queueEntries.length !== 1 ? "s" : ""} queued )} - {uploadError && queueEntries.length === 0 && } - {queueEntries.length > 0 && } + {showResolved && } + {uploadError && queueEntries.length === 0 && !showResolved && } + {queueEntries.length > 0 && ( + + )}