diff --git a/.github/workflows/node.js.yml b/.github/workflows/node.js.yml
index 5c2a99f..23284f2 100644
--- a/.github/workflows/node.js.yml
+++ b/.github/workflows/node.js.yml
@@ -5,7 +5,7 @@ name: Test
on:
pull_request:
- branches: ["main"]
+ branches: ["main", "test"]
push:
branches: ["test"]
diff --git a/components/dashboard/QueuePanel.js b/components/dashboard/QueuePanel.js
index f8e875d..0dfda45 100644
--- a/components/dashboard/QueuePanel.js
+++ b/components/dashboard/QueuePanel.js
@@ -13,9 +13,29 @@ import {
import { Download } from "lucide-react";
import { auth } from "../../lib/firebase";
+function friendlyReason(reason) {
+ if (!reason) return null;
+ if (reason.includes("interrupted upload") || reason.includes("memory limit")) {
+ return "Upload was interrupted by a server restart or memory limit.";
+ }
+ if (reason.includes("Upload exception") || reason.includes("fetch failed")) {
+ return "Could not connect to OSF.";
+ }
+ if (reason.includes("OSF error 503") || reason.includes("OSF error 502")) {
+ return "OSF was temporarily unavailable.";
+ }
+ if (reason.includes("OSF error 429")) {
+ return "OSF rate-limited the request.";
+ }
+ if (reason.includes("OSF error 401") || reason.includes("OSF error 403")) {
+ return "Authentication error. Your OSF token may need to be refreshed.";
+ }
+ return reason;
+}
+
function statusBadge(status) {
const labels = {
- pending: { color: "orange", text: "Waiting to retry" },
+ pending: { color: "orange", text: "Retrying" },
processing: { color: "blue", text: "Retrying now" },
failed: { color: "red", text: "Failed" },
};
@@ -27,6 +47,20 @@ function statusBadge(status) {
);
}
+function nextRetryText(nextRetryAt) {
+ if (!nextRetryAt) return null;
+ const t = nextRetryAt.toDate ? nextRetryAt.toDate() : new Date(nextRetryAt);
+ const msUntil = t.getTime() - Date.now();
+ if (msUntil <= 0) return "soon";
+ const minUntil = Math.ceil(msUntil / (60 * 1000));
+ if (minUntil >= 60) {
+ const hours = Math.floor(minUntil / 60);
+ const mins = minUntil % 60;
+ return `in ${hours}h${mins > 0 ? ` ${mins}m` : ""}`;
+ }
+ return `in ${minUntil}m`;
+}
+
function timeRemaining(createdAt) {
if (!createdAt) return null;
const created = createdAt.toDate ? createdAt.toDate() : new Date(createdAt);
@@ -36,9 +70,9 @@ function timeRemaining(createdAt) {
const hoursLeft = Math.floor(msLeft / (60 * 60 * 1000));
if (hoursLeft >= 24) {
const days = Math.floor(hoursLeft / 24);
- return `${days}d ${hoursLeft % 24}h remaining`;
+ return `${days}d ${hoursLeft % 24}h`;
}
- return `${hoursLeft}h remaining`;
+ return `${hoursLeft}h`;
}
async function fetchFile(experimentId, entryId) {
@@ -51,7 +85,12 @@ async function fetchFile(experimentId, entryId) {
);
}
-export default function QueuePanel({ entries, experimentId, errorLog }) {
+/**
+ * QueuePanel — shows all queued uploads (pending + failed) with immediate
+ * download access. Pending items are being retried automatically but the
+ * researcher can download them right away without waiting.
+ */
+export default function QueuePanel({ entries, experimentId }) {
const [downloading, setDownloading] = useState(null);
const [downloadingAll, setDownloadingAll] = useState(false);
@@ -113,13 +152,18 @@ export default function QueuePanel({ entries, experimentId, errorLog }) {
const plural = (n, word) => `${n} ${word}${n !== 1 ? "s" : ""}`;
- let alertTitle = "";
- if (pendingCount > 0 && failedCount > 0) {
- alertTitle = `${plural(pendingCount, "file")} waiting to upload, ${plural(failedCount, "file")} failed.`;
- } else if (pendingCount > 0) {
- alertTitle = `${plural(pendingCount, "file")} waiting to upload to OSF.`;
- } else {
+ let alertTitle;
+ let alertDescription;
+
+ if (allFailed) {
alertTitle = `${plural(failedCount, "file")} could not be uploaded to OSF.`;
+ alertDescription = "All retries were exhausted. Download these files and upload them to your OSF project manually to prevent data loss.";
+ } else if (failedCount > 0) {
+ alertTitle = `${plural(entries.length, "file")} did not upload to OSF.`;
+ alertDescription = `${plural(pendingCount, "file")} still being retried. ${plural(failedCount, "file")} failed permanently. You can download all files below.`;
+ } else {
+ alertTitle = `${plural(pendingCount, "file")} did not upload to OSF.`;
+ alertDescription = "DataPipe is retrying automatically. You can also download the files now.";
}
return (
@@ -127,30 +171,38 @@ export default function QueuePanel({ entries, experimentId, errorLog }) {
{alertTitle}
-
- {allFailed
- ? "These files could not be delivered after multiple attempts. Download them to avoid data loss."
- : "DataPipe will keep retrying automatically. Files are stored for up to 1 week. You can also download them below."}
-
+ {alertDescription}
- Why am I seeing this?
+ Why did these uploads fail?
When a participant submits data, DataPipe tries to upload it to
- your OSF project immediately. If that transfer fails — for
- example, because OSF is temporarily unavailable, rate-limiting
- requests, or there is a configuration issue with your project —
- DataPipe saves a copy of the data and retries automatically over
- the next several days. The files listed here are those saved
- copies. Once a retry succeeds the file will disappear from this
- list. If all retries are exhausted, you can still download the
- data and upload it to OSF manually.
+ your OSF project immediately. If that fails, DataPipe saves a
+ copy and retries automatically. Common reasons include:
+
+
+
+ Server memory limit — Large data submissions
+ can occasionally exceed the server's memory capacity.
+
+
+ OSF unavailable — OSF may be temporarily
+ down or rate-limiting requests.
+
+
+ Configuration issue — There may be a problem
+ with your OSF project settings or authentication token.
+
+
+
+ Files are stored for up to 7 days. If retries don't succeed,
+ download the files and upload them to OSF manually.
@@ -167,78 +219,68 @@ export default function QueuePanel({ entries, experimentId, errorLog }) {
Download all as ZIP
-
-
-
-
- View file details
-
-
-
-
-
-
-
- FILENAME
- STATUS
- EXPIRES
- ATTEMPTS
- DOWNLOAD
-
-
-
- {entries.map((entry) => (
-
-
- {entry.filename}
- {entry.failureReason && (
-
- {entry.failureReason}
-
- )}
-
- {statusBadge(entry.status)}
- {timeRemaining(entry.createdAt)}
-
- {entry.retryCount}/{entry.maxRetries}
-
-
- handleDownload(entry)}
- >
-
-
-
-
- ))}
- {errorLog && errorLog.map((error, index) => (
-
-
- {error.error}
-
- {error.time}
-
-
-
-
- Error
-
-
- -
- -
- -
-
- ))}
-
-
-
-
-
+
+
+
+ FILENAME
+ STATUS
+ REASON
+ STORED FOR
+
+
+
+
+ {entries.map((entry) => (
+
+ {entry.filename}
+
+ {statusBadge(entry.status)}
+ {(entry.status === "pending" || entry.status === "processing") &&
+ entry.nextRetryAt && (
+
+ Next retry {nextRetryText(entry.nextRetryAt)}
+
+ )}
+
+
+
+ {friendlyReason(entry.failureReason) || "\u2014"}
+
+
+
+
+ {timeRemaining(entry.createdAt) || "\u2014"}
+
+
+
+ handleDownload(entry)}
+ >
+
+
+
+
+ ))}
+
+
);
}
+
+/**
+ * UploadsResolvedNotice — brief success confirmation shown when
+ * previously pending/failed uploads have all been resolved.
+ */
+export function UploadsResolvedNotice() {
+ return (
+
+
+ All queued uploads completed successfully.
+
+ );
+}
diff --git a/firebase.json b/firebase.json
index 06c22ff..d5122af 100644
--- a/firebase.json
+++ b/firebase.json
@@ -73,6 +73,10 @@
"port": 5000,
"host": "localhost"
},
+ "storage": {
+ "port": 9199,
+ "host": "localhost"
+ },
"ui": {
"enabled": true
}
diff --git a/functions/src/__tests__/data-emulator.test.js b/functions/src/__tests__/data-emulator.test.js
index 729e14d..9a88130 100644
--- a/functions/src/__tests__/data-emulator.test.js
+++ b/functions/src/__tests__/data-emulator.test.js
@@ -30,17 +30,6 @@ const config = {
jest.setTimeout(30000);
-async function waitForLog(db, docId, field, expectedValue, timeoutMs = 10000) {
- const start = Date.now();
- while (Date.now() - start < timeoutMs) {
- const doc = await db.collection("logs").doc(docId).get();
- if (doc.exists && doc.data()?.[field] === expectedValue) {
- return doc;
- }
- await new Promise((resolve) => setTimeout(resolve, 250));
- }
- return db.collection("logs").doc(docId).get();
-}
beforeAll(async () => {
initializeApp(config);
@@ -78,12 +67,17 @@ describe("apiData", () => {
it("should increment the write request log for the experiment when there is a complete request", async () => {
const db = getFirestore();
await db.collection("logs").doc("testlog").delete();
+ // writeLog is awaited inside apiData before the response is sent,
+ // so the log document should exist by the time we get the response.
await saveData({
experimentID: "testlog",
data: "test",
filename: "test",
});
- let doc = await waitForLog(db, "testlog", "saveData", 1);
+ // Small delay to allow Firestore emulator to sync
+ await new Promise((resolve) => setTimeout(resolve, 500));
+ let doc = await db.collection("logs").doc("testlog").get();
+ expect(doc.exists).toBe(true);
expect(doc.data().saveData).toBe(1);
await saveData({
@@ -91,7 +85,8 @@ describe("apiData", () => {
data: "test",
filename: "test",
});
- doc = await waitForLog(db, "testlog", "saveData", 2);
+ await new Promise((resolve) => setTimeout(resolve, 500));
+ doc = await db.collection("logs").doc("testlog").get();
expect(doc.data().saveData).toBe(2);
});
@@ -106,7 +101,9 @@ describe("apiData", () => {
filename: "test",
});
- let doc = await waitForLog(db, "data-testexp", "logError", 1);
+ await new Promise((resolve) => setTimeout(resolve, 500));
+ let doc = await db.collection("logs").doc("data-testexp").get();
+ expect(doc.exists).toBe(true);
expect(doc.data().logError).toBe(1);
await db.collection("experiments").doc("data-testexp").set(
@@ -124,7 +121,8 @@ describe("apiData", () => {
filename: "test",
});
- doc = await waitForLog(db, "data-testexp", "logError", 2);
+ await new Promise((resolve) => setTimeout(resolve, 500));
+ doc = await db.collection("logs").doc("data-testexp").get();
expect(doc.data().logError).toBe(2);
});
diff --git a/functions/src/__tests__/early-persist-emulator.test.js b/functions/src/__tests__/early-persist-emulator.test.js
new file mode 100644
index 0000000..9c2ad09
--- /dev/null
+++ b/functions/src/__tests__/early-persist-emulator.test.js
@@ -0,0 +1,161 @@
+/**
+ * @jest-environment node
+ */
+
+import { initializeApp } from "firebase-admin/app";
+import { getFirestore } from "firebase-admin/firestore";
+import { getStorage } from "firebase-admin/storage";
+import express from "express";
+import MESSAGES from "../api-messages";
+
+process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080";
+process.env.FIREBASE_STORAGE_EMULATOR_HOST = "localhost:9199";
+
+jest.setTimeout(30000);
+
+const config = {
+ projectId: "datapipe-test",
+ storageBucket: "datapipe-test.appspot.com",
+};
+
+async function saveData(body) {
+ const response = await fetch(
+ "http://localhost:5001/datapipe-test/us-central1/apidata",
+ {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ Accept: "*/*",
+ },
+ body: JSON.stringify(body),
+ }
+ );
+ const message = await response.json();
+ return { status: response.status, body: message };
+}
+
+async function listPendingFiles(bucket, experimentID) {
+ const [files] = await bucket.getFiles({
+ prefix: `pending-data/${experimentID}/`,
+ });
+ return files;
+}
+
+const sampleData = `[{
+ "trial_type": "html-keyboard-response",
+ "trial_index": 1,
+ "time_elapsed": 776
+}]`;
+
+let db;
+let bucket;
+let mockServerInstance;
+let mockServerPort;
+
+function createMockOSFServer() {
+ const app = express();
+ app.put("/endpoint", (req, res) => {
+ res.status(201).json({ data: { attributes: { name: req.query.name || "uploaded.json" } } });
+ });
+ return new Promise((resolve) => {
+ const server = app.listen(0, () => {
+ resolve(server);
+ });
+ });
+}
+
+beforeAll(async () => {
+ mockServerInstance = await createMockOSFServer();
+ mockServerPort = mockServerInstance.address().port;
+
+ const app = initializeApp(config);
+ db = getFirestore();
+ bucket = getStorage(app).bucket();
+
+ await db.collection("users").doc("persist-test-user").set({
+ osfTokenValid: true,
+ osfToken: "valid",
+ usingPersonalToken: true,
+ });
+
+ await db.collection("experiments").doc("persist-test-exp").set({
+ active: true,
+ metadataActive: false,
+ owner: "persist-test-user",
+ osfFilesLink: `http://localhost:${mockServerPort}/endpoint`,
+ });
+
+ await db.collection("experiments").doc("persist-test-inactive").set({
+ active: false,
+ owner: "persist-test-user",
+ });
+});
+
+afterAll(async () => {
+ mockServerInstance.close();
+});
+
+describe("early persist data loss prevention", () => {
+ it("should clean up pending data after successful OSF upload", async () => {
+ const result = await saveData({
+ experimentID: "persist-test-exp",
+ data: sampleData,
+ filename: "test-persist-cleanup.json",
+ });
+
+ // The request should succeed
+ expect(result.body.message).toEqual("Success");
+
+ // After success, there should be no pending files for this experiment
+ const pendingFiles = await listPendingFiles(bucket, "persist-test-exp");
+ const matchingFiles = pendingFiles.filter((f) =>
+ f.name.includes("test-persist-cleanup")
+ );
+ expect(matchingFiles.length).toBe(0);
+ });
+
+ it("should not persist data when validation fails before persist step", async () => {
+ // Missing parameters should fail before the persist step
+ const result = await saveData({
+ experimentID: "persist-test-exp",
+ });
+
+ expect(result.body).toEqual(MESSAGES.MISSING_PARAMETER);
+
+ // No pending files should exist since we failed before persist
+ const pendingFiles = await listPendingFiles(bucket, "persist-test-exp");
+ expect(pendingFiles.length).toBe(0);
+ });
+
+ it("should not persist data when experiment is inactive", async () => {
+ const result = await saveData({
+ experimentID: "persist-test-inactive",
+ data: sampleData,
+ filename: "test-inactive.json",
+ });
+
+ expect(result.body).toEqual(MESSAGES.DATA_COLLECTION_NOT_ACTIVE);
+
+ // No pending files since we fail before persist step
+ const pendingFiles = await listPendingFiles(
+ bucket,
+ "persist-test-inactive"
+ );
+ expect(pendingFiles.length).toBe(0);
+ });
+
+ it("should handle multiple submissions without leaving pending files", async () => {
+ // Submit multiple requests
+ for (let i = 0; i < 3; i++) {
+ await saveData({
+ experimentID: "persist-test-exp",
+ data: sampleData,
+ filename: `test-multi-${i}.json`,
+ });
+ }
+
+ // All pending files should be cleaned up
+ const pendingFiles = await listPendingFiles(bucket, "persist-test-exp");
+ expect(pendingFiles.length).toBe(0);
+ });
+});
diff --git a/functions/src/__tests__/skip-metadata-emulator.test.js b/functions/src/__tests__/skip-metadata-emulator.test.js
new file mode 100644
index 0000000..2bb7dd0
--- /dev/null
+++ b/functions/src/__tests__/skip-metadata-emulator.test.js
@@ -0,0 +1,112 @@
+/**
+ * @jest-environment node
+ */
+
+import { initializeApp } from "firebase-admin/app";
+import { getFirestore } from "firebase-admin/firestore";
+import MESSAGES from "../api-messages";
+
+process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080";
+
+jest.setTimeout(30000);
+
+const config = {
+ projectId: "datapipe-test",
+};
+
+async function saveData(body) {
+ const response = await fetch(
+ "http://localhost:5001/datapipe-test/us-central1/apidata",
+ {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ Accept: "*/*",
+ },
+ body: JSON.stringify(body),
+ }
+ );
+ const message = await response.json();
+ return message;
+}
+
+const sampleData = `[{
+ "trial_type": "html-keyboard-response",
+ "trial_index": 1,
+ "time_elapsed": 776
+}]`;
+
+beforeAll(async () => {
+ initializeApp(config);
+ const db = getFirestore();
+
+ await db.collection("users").doc("skip-metadata-user").set({
+ osfTokenValid: true,
+ osfToken: "valid",
+ usingPersonalToken: true,
+ });
+
+ // Experiment with metadata disabled
+ await db.collection("experiments").doc("skip-metadata-exp").set({
+ active: true,
+ metadataActive: false,
+ owner: "skip-metadata-user",
+ osfFilesLink: "http://localhost:3000/endpoint",
+ });
+
+ // Experiment with metadata enabled (for comparison)
+ await db.collection("experiments").doc("skip-metadata-exp-active").set({
+ active: true,
+ metadataActive: true,
+ owner: "skip-metadata-user",
+ osfFilesLink: "http://localhost:3000/endpoint",
+ });
+});
+
+describe("skip metadata when inactive", () => {
+ it("should not produce metadata when metadataActive is false", async () => {
+ const response = await saveData({
+ experimentID: "skip-metadata-exp",
+ data: sampleData,
+ filename: "test-skip-metadata.json",
+ });
+
+ // When metadata is skipped, metadataMessage should be empty
+ // and the response should not contain metadata error info
+ expect(response.metadataMessage).toBeFalsy();
+ });
+
+ it("should not create a metadata document in Firestore when metadataActive is false", async () => {
+ const db = getFirestore();
+
+ // Clear any existing metadata doc
+ await db.collection("metadata").doc("skip-metadata-exp").delete();
+
+ await saveData({
+ experimentID: "skip-metadata-exp",
+ data: sampleData,
+ filename: "test-skip-metadata-2.json",
+ });
+
+ const metadataDoc = await db
+ .collection("metadata")
+ .doc("skip-metadata-exp")
+ .get();
+
+ // No metadata document should have been created
+ expect(metadataDoc.exists).toBe(false);
+ });
+
+ it("should still attempt metadata processing when metadataActive is true", async () => {
+ const response = await saveData({
+ experimentID: "skip-metadata-exp-active",
+ data: sampleData,
+ filename: "test-with-metadata.json",
+ });
+
+ // When metadata is active, the function will attempt to process metadata.
+ // The response should have the metadataMessage key present, confirming
+ // the metadata code path was entered (unlike when metadataActive is false).
+ expect(response).toHaveProperty("metadataMessage");
+ });
+});
diff --git a/functions/src/api-base64.ts b/functions/src/api-base64.ts
index e4279b2..46173d7 100644
--- a/functions/src/api-base64.ts
+++ b/functions/src/api-base64.ts
@@ -7,9 +7,10 @@ import isBase64 from "is-base64";
import MESSAGES from "./api-messages.js";
import resolveToken from "./resolve-token.js";
import queueUpload from "./queue-upload.js";
+import { persistPending, cleanupPending } from "./persist-pending.js";
import { ExperimentData, UserData, OSFResult } from './interfaces';
-export const apiBase64 = onRequest({ cors: true }, async (req, res) => {
+export const apiBase64 = onRequest({ cors: true, memory: "512MiB", concurrency: 1 }, async (req, res) => {
const { experimentID, data, filename } = req.body;
if (!experimentID || !data || !filename) {
@@ -64,6 +65,19 @@ export const apiBase64 = onRequest({ cors: true }, async (req, res) => {
return;
}
+ // Persist data to Cloud Storage immediately after validation.
+ // This ensures the data survives even if the function OOM-crashes
+ // during heavy processing (OSF upload).
+ let pendingPath: string;
+ try {
+ pendingPath = await persistPending(experimentID, filename, data);
+ } catch (e) {
+ const detail = e instanceof Error ? e.message : "Unknown error";
+ res.status(500).json(MESSAGES.DATA_PERSIST_ERROR);
+ await writeLog(experimentID, "logError", {...MESSAGES.DATA_PERSIST_ERROR, detail});
+ return;
+ }
+
const user_doc = await db.doc(`users/${exp_data.owner}`).get();
if (!user_doc.exists) {
res.status(400).json(MESSAGES.INVALID_OWNER);
@@ -116,6 +130,7 @@ export const apiBase64 = onRequest({ cors: true }, async (req, res) => {
errorCode: 0, sessionIncremented: false,
failureReason: `Upload exception: ${detail}`,
});
+ await cleanupPending(pendingPath); // queue-upload has its own copy
res.status(202).json(MESSAGES.OSF_UPLOAD_QUEUED);
await writeLog(experimentID, "logError", {...MESSAGES.OSF_UPLOAD_EXCEPTION, detail});
return;
@@ -140,6 +155,7 @@ export const apiBase64 = onRequest({ cors: true }, async (req, res) => {
errorCode: result.errorCode || 0, sessionIncremented: false,
failureReason: `OSF error ${result.errorCode}: ${result.errorText}`,
});
+ await cleanupPending(pendingPath); // queue-upload has its own copy
res.status(202).json(MESSAGES.OSF_UPLOAD_QUEUED);
await writeLog(experimentID, "logError", {...MESSAGES.OSF_UPLOAD_ERROR, osfStatus: result.errorCode, osfStatusText: result.errorText});
return;
@@ -150,5 +166,8 @@ export const apiBase64 = onRequest({ cors: true }, async (req, res) => {
}
}
+ // Data successfully uploaded to OSF — clean up the pending copy.
+ await cleanupPending(pendingPath);
+
res.status(201).json(MESSAGES.SUCCESS);
});
diff --git a/functions/src/api-data.ts b/functions/src/api-data.ts
index 9c0b46b..3c14c3a 100644
--- a/functions/src/api-data.ts
+++ b/functions/src/api-data.ts
@@ -9,9 +9,10 @@ import MESSAGES from "./api-messages.js";
import blockMetadata from "./metadata-block.js";
import resolveToken from "./resolve-token.js";
import queueUpload from "./queue-upload.js";
+import { persistPending, cleanupPending } from "./persist-pending.js";
import { ExperimentData, UserData, MetadataResponse, OSFResult, RequestBody } from './interfaces';
-export const apiData = onRequest({ cors: true }, async (req, res) => {
+export const apiData = onRequest({ cors: true, memory: "512MiB", concurrency: 1 }, async (req, res) => {
const { experimentID, data, filename, metadataOptions }: RequestBody = req.body;
if (!experimentID || !data || !filename) {
@@ -74,6 +75,19 @@ export const apiData = onRequest({ cors: true }, async (req, res) => {
}
}
+ // Persist data to Cloud Storage immediately after validation.
+ // This ensures the data survives even if the function OOM-crashes
+ // during heavy processing (metadata, OSF upload).
+ let pendingPath: string;
+ try {
+ pendingPath = await persistPending(experimentID, filename, data, metadataOptions);
+ } catch (e) {
+ const detail = e instanceof Error ? e.message : "Unknown error";
+ res.status(500).json(MESSAGES.DATA_PERSIST_ERROR);
+ await writeLog(experimentID, "logError", {...MESSAGES.DATA_PERSIST_ERROR, detail});
+ return;
+ }
+
const user_doc: DocumentSnapshot = await db.doc(`users/${exp_data.owner}`).get();
if (!user_doc.exists) {
@@ -111,18 +125,24 @@ export const apiData = onRequest({ cors: true }, async (req, res) => {
//METADATA BLOCK START
- //Creates or references a document containing the metadata for the experiment in the metdata collection on Firestore.
- const metadata_doc_ref: DocumentReference = db.collection("metadata").doc(experimentID);
+ let metadataMessage: string = '';
- const metadataResponse: MetadataResponse = await blockMetadata(exp_data, user_data, metadata_doc_ref, data, metadataOptions);
+ if (exp_data.metadataActive) {
+ //Creates or references a document containing the metadata for the experiment in the metdata collection on Firestore.
+ const metadata_doc_ref: DocumentReference = db.collection("metadata").doc(experimentID);
- if (metadataResponse.success === false) {
- res.status(400).json(metadataResponse);
- await writeLog(experimentID, "logError", {...MESSAGES.METADATA_ERROR, detail: metadataResponse.message});
- return;
+ const metadataResponse: MetadataResponse = await blockMetadata(exp_data, user_data, metadata_doc_ref, data, metadataOptions);
+
+ if (metadataResponse.success === false) {
+ await cleanupPending(pendingPath);
+ res.status(400).json(metadataResponse);
+ await writeLog(experimentID, "logError", {...MESSAGES.METADATA_ERROR, detail: metadataResponse.message});
+ return;
+ }
+
+ metadataMessage = metadataResponse.metadataMessage;
}
- const metadataMessage: string = metadataResponse.metadataMessage;
//METADATA BLOCK END
let result: OSFResult;
@@ -144,6 +164,7 @@ export const apiData = onRequest({ cors: true }, async (req, res) => {
failureReason: `Upload exception: ${detail}`,
});
await exp_doc_ref.set({ sessions: FieldValue.increment(1) }, { merge: true });
+ await cleanupPending(pendingPath); // queue-upload has its own copy
res.status(202).json({...MESSAGES.OSF_UPLOAD_QUEUED, metadataMessage});
await writeLog(experimentID, "logError", {...MESSAGES.OSF_UPLOAD_EXCEPTION, detail});
return;
@@ -169,6 +190,7 @@ export const apiData = onRequest({ cors: true }, async (req, res) => {
failureReason: `OSF error ${result.errorCode}: ${result.errorText}`,
});
await exp_doc_ref.set({ sessions: FieldValue.increment(1) }, { merge: true });
+ await cleanupPending(pendingPath); // queue-upload has its own copy
res.status(202).json({...MESSAGES.OSF_UPLOAD_QUEUED, metadataMessage});
await writeLog(experimentID, "logError", {...MESSAGES.OSF_UPLOAD_ERROR, osfStatus: result.errorCode, osfStatusText: result.errorText});
return;
@@ -181,5 +203,8 @@ export const apiData = onRequest({ cors: true }, async (req, res) => {
await exp_doc_ref.set({ sessions: FieldValue.increment(1) }, { merge: true });
+ // Data successfully uploaded to OSF — clean up the pending copy.
+ await cleanupPending(pendingPath);
+
res.status(201).json({...MESSAGES.SUCCESS, metadataMessage});
});
diff --git a/functions/src/api-messages.ts b/functions/src/api-messages.ts
index da3de58..cb9bec1 100644
--- a/functions/src/api-messages.ts
+++ b/functions/src/api-messages.ts
@@ -109,6 +109,10 @@ const MESSAGES = {
METADATA_IN_OSF_AND_FIRESTORE: {
metadataMessage : "Metadata is in OSF and in Firestore",
},
+ DATA_PERSIST_ERROR: {
+ error: "DATA_PERSIST_ERROR",
+ message: "Failed to save data. The data was not stored. If this is from a live experiment, participants may need to resubmit.",
+ },
OSF_UPLOAD_QUEUED: {
error: null,
message: "Data received. OSF upload will be retried automatically.",
diff --git a/functions/src/index.ts b/functions/src/index.ts
index 9219688..ecbb6c1 100644
--- a/functions/src/index.ts
+++ b/functions/src/index.ts
@@ -8,6 +8,7 @@ import { oauth2Regenerate } from "./oauth2-regenerate.js";
import { checkEmailConflict } from "./check-email-conflict.js";
import { scheduledTokenRefresh } from "./scheduled-token-refresh.js";
import { scheduledUploadRetry } from "./scheduled-upload-retry.js";
+import { scheduledPendingRecovery } from "./scheduled-pending-recovery.js";
import { apiQueueStatus } from "./api-queue-status.js";
import { generateOAuthState } from "./generate-oauth-state.js";
import { saveOsfToken } from "./save-osf-token.js";
@@ -27,6 +28,7 @@ export {
checkEmailConflict as checkemailconflict,
scheduledTokenRefresh as scheduledtokenrefresh,
scheduledUploadRetry as scheduleduploadretry,
+ scheduledPendingRecovery as scheduledpendingrecovery,
apiQueueStatus as apiqueuestatus,
generateOAuthState as generateoauthstate,
saveOsfToken as saveosftoken,
diff --git a/functions/src/persist-pending.ts b/functions/src/persist-pending.ts
new file mode 100644
index 0000000..69b8caf
--- /dev/null
+++ b/functions/src/persist-pending.ts
@@ -0,0 +1,59 @@
+import { storage } from "./app.js";
+
+const PENDING_PREFIX = "pending-data";
+
+interface PendingEnvelope {
+ experimentID: string;
+ filename: string;
+ data: string;
+ metadataOptions?: object;
+}
+
+/**
+ * Persist incoming request data to Cloud Storage immediately after validation,
+ * before any heavy processing. This ensures data survives OOM crashes.
+ * Stores the full request envelope (data + metadataOptions) so the recovery
+ * function can replay the complete processing pipeline including metadata.
+ * Returns the storage path for later cleanup.
+ */
+export async function persistPending(
+ experimentID: string,
+ filename: string,
+ data: string,
+ metadataOptions?: object
+): Promise {
+ const timestamp = Date.now();
+ const safeName = filename.replace(/[/\\]/g, "_");
+ const storagePath = `${PENDING_PREFIX}/${experimentID}/${safeName}_${timestamp}`;
+
+ const envelope: PendingEnvelope = { experimentID, filename, data, metadataOptions };
+
+ const bucket = storage.bucket();
+ const file = bucket.file(storagePath);
+ await file.save(JSON.stringify(envelope), { contentType: "application/json" });
+ return storagePath;
+}
+
+/**
+ * Read a pending envelope from Cloud Storage.
+ */
+export async function readPendingEnvelope(storagePath: string): Promise {
+ const bucket = storage.bucket();
+ const file = bucket.file(storagePath);
+ const [contents] = await file.download();
+ return JSON.parse(contents.toString("utf-8")) as PendingEnvelope;
+}
+
+/**
+ * Remove the pending data file after successful processing.
+ */
+export async function cleanupPending(storagePath: string): Promise {
+ try {
+ const bucket = storage.bucket();
+ const file = bucket.file(storagePath);
+ await file.delete();
+ } catch {
+ // Non-critical: if cleanup fails, the file will remain in storage
+ // but won't cause any issues. A scheduled cleanup can handle stragglers.
+ }
+}
diff --git a/functions/src/scheduled-pending-recovery.ts b/functions/src/scheduled-pending-recovery.ts
new file mode 100644
index 0000000..15c9800
--- /dev/null
+++ b/functions/src/scheduled-pending-recovery.ts
@@ -0,0 +1,180 @@
+import { onSchedule } from "firebase-functions/v2/scheduler";
+import { Timestamp } from "firebase-admin/firestore";
+import { db, storage } from "./app.js";
+import { readPendingEnvelope, cleanupPending } from "./persist-pending.js";
+import { ExperimentData } from "./interfaces.js";
+
+const PENDING_PREFIX = "pending-data/";
+
+// Files older than this are considered orphaned (the original request
+// either OOM-crashed or timed out without cleaning up).
+const STALE_THRESHOLD_MS = 15 * 60 * 1000; // 15 minutes
+
+// Process at most this many files per run to stay within time/memory limits.
+const MAX_FILES_PER_RUN = 10;
+
+const MAX_RETRIES = 5;
+
+/**
+ * Scheduled function that runs every 15 minutes to recover data that was
+ * persisted to Cloud Storage but never uploaded to OSF (e.g., because the
+ * original api-data function OOM-crashed).
+ *
+ * Instead of attempting the OSF upload directly, this function promotes
+ * orphaned pending files into the existing uploadQueue system. This means:
+ * - The data immediately appears in the researcher's dashboard QueuePanel
+ * - The existing scheduled-upload-retry handles retries with exponential backoff
+ * - The researcher can download the data manually if all retries fail
+ * - No duplicate retry infrastructure is needed
+ */
+export const scheduledPendingRecovery = onSchedule(
+ { schedule: "*/15 * * * *", memory: "256MiB" },
+ async () => {
+ await recoverPendingUploads();
+ }
+);
+
+async function recoverPendingUploads() {
+ const bucket = storage.bucket();
+ const cutoffTime = new Date(Date.now() - STALE_THRESHOLD_MS);
+
+ // List files under pending-data/ prefix
+ const [files] = await bucket.getFiles({
+ prefix: PENDING_PREFIX,
+ maxResults: MAX_FILES_PER_RUN * 2, // fetch extra in case some are too recent
+ });
+
+ if (files.length === 0) {
+ return;
+ }
+
+ let processed = 0;
+
+ for (const file of files) {
+ if (processed >= MAX_FILES_PER_RUN) break;
+
+ // Check file age via metadata
+ const [metadata] = await file.getMetadata();
+ const createdAt = new Date(metadata.timeCreated as string);
+
+ if (createdAt > cutoffTime) {
+ // File is recent — the original request may still be processing
+ continue;
+ }
+
+ console.log(`Recovering pending data: ${file.name}`);
+
+ try {
+ await promoteToQueue(file);
+ processed++;
+ } catch (e) {
+ const detail = e instanceof Error ? e.message : "Unknown error";
+ console.error(`Failed to recover ${file.name}: ${detail}`);
+ }
+ }
+
+ if (processed > 0) {
+ console.log(`Promoted ${processed} pending file(s) to upload queue.`);
+ }
+}
+
+/**
+ * Promote an orphaned pending file into the uploadQueue system.
+ *
+ * 1. Read the pending envelope to get experiment/filename/data
+ * 2. Look up the experiment to get the owner and osfFilesLink
+ * 3. Copy the data to upload-queue/ storage (where queue-status API expects it)
+ * 4. Create an uploadQueue Firestore document
+ * 5. Clean up the pending-data/ file
+ */
+async function promoteToQueue(
+ file: ReturnType["file"]>
+) {
+ // Read the envelope
+ let envelope;
+ try {
+ envelope = await readPendingEnvelope(file.name);
+ } catch (e) {
+ const detail = e instanceof Error ? e.message : "Unknown error";
+ console.error(`Failed to read pending envelope ${file.name}: ${detail}. Deleting corrupt file.`);
+ await file.delete();
+ return;
+ }
+
+ const { experimentID, filename, data } = envelope;
+
+ // Look up the experiment to get owner and osfFilesLink
+ const expDoc = await db.collection("experiments").doc(experimentID).get();
+ if (!expDoc.exists) {
+ console.warn(`Experiment ${experimentID} not found. Deleting orphaned file ${file.name}.`);
+ await cleanupPending(file.name);
+ return;
+ }
+
+ const expData = expDoc.data() as ExperimentData;
+
+ if (!expData.owner) {
+ console.warn(`Experiment ${experimentID} has no owner. Deleting orphaned file ${file.name}.`);
+ await cleanupPending(file.name);
+ return;
+ }
+
+ // Check for deduplication and atomically create the queue entry via transaction.
+ // This prevents duplicate entries if two recovery runs overlap.
+ const deduplicationKey = `${experimentID}:${filename}`;
+ const docId = deduplicationKey.replace(/[/\\]/g, "_");
+ const docRef = db.collection("uploadQueue").doc(docId);
+
+ const storagePath = `upload-queue/${docId}`;
+
+ const created = await db.runTransaction(async (transaction) => {
+ const existingDoc = await transaction.get(docRef);
+ if (existingDoc.exists) {
+ const status = existingDoc.data()?.status;
+ if (status === "pending" || status === "processing") {
+ return false; // Already queued
+ }
+ }
+
+ const now = Timestamp.now();
+ const nextRetryAt = Timestamp.fromMillis(now.toMillis() + 60 * 1000); // 1 minute — retry soon
+
+ transaction.set(docRef, {
+ experimentID,
+ owner: expData.owner,
+ filename,
+ storagePath,
+ dataType: "data",
+ osfFilesLink: expData.osfFilesLink,
+ status: "pending",
+ errorCode: 0,
+ retryCount: 0,
+ maxRetries: MAX_RETRIES,
+ createdAt: now,
+ lastAttemptAt: null,
+ nextRetryAt,
+ completedAt: null,
+ failureReason: "Recovered from interrupted upload (server restart or memory limit)",
+ deduplicationKey,
+ sessionIncremented: false,
+ });
+
+ return true;
+ });
+
+ if (!created) {
+ console.log(`Queue entry already exists for ${deduplicationKey}. Cleaning up pending file.`);
+ await cleanupPending(file.name);
+ return;
+ }
+
+ // Write data to upload-queue/ storage (where the queue-status API expects it)
+ const bucket = storage.bucket();
+ const queueFile = bucket.file(storagePath);
+ await queueFile.save(data, { contentType: "text/plain" });
+
+ // Clean up the pending-data/ file
+ await cleanupPending(file.name);
+
+ console.log(`Promoted ${filename} (experiment ${experimentID}) to upload queue.`);
+}
diff --git a/pages/admin/[experiment_id].js b/pages/admin/[experiment_id].js
index c824b56..fb83efc 100644
--- a/pages/admin/[experiment_id].js
+++ b/pages/admin/[experiment_id].js
@@ -1,3 +1,4 @@
+import { useState, useEffect, useRef } from "react";
import AuthCheck from "../../components/AuthCheck";
import { useRouter } from "next/router";
import { useDocumentData, useCollectionData } from "react-firebase-hooks/firestore";
@@ -13,7 +14,7 @@ import ExperimentValidation from "../../components/dashboard/ExperimentValidatio
import MetadataControl from "../../components/dashboard/MetadataControl";
import CodeHints from "../../components/dashboard/CodeHints";
import ErrorPanel from "../../components/dashboard/ErrorPanel";
-import QueuePanel from "../../components/dashboard/QueuePanel";
+import QueuePanel, { UploadsResolvedNotice } from "../../components/dashboard/QueuePanel";
export async function getServerSideProps() {
return { props: {} };
@@ -48,10 +49,23 @@ function ExperimentPageDashboard({ experiment_id }) {
const [, , , queueSnapshot] = useCollectionData(queueRef);
const queueEntries = queueSnapshot?.docs.map(d => ({ id: d.id, ...d.data() })) || [];
- const pendingUploads = queueEntries.filter(e => e.status === "pending" || e.status === "processing").length;
const uploadError = logs?.logError;
const errorLog = logs?.errors;
+ // Track resolved state: show success notice when queue goes from non-empty to empty
+ const [showResolved, setShowResolved] = useState(false);
+ const prevQueueCount = useRef(0);
+
+ useEffect(() => {
+ const currentCount = queueEntries.length;
+ if (prevQueueCount.current > 0 && currentCount === 0) {
+ setShowResolved(true);
+ const timer = setTimeout(() => setShowResolved(false), 8000);
+ return () => clearTimeout(timer);
+ }
+ prevQueueCount.current = currentCount;
+ }, [queueEntries.length]);
+
return (
<>
{loading && }
@@ -66,24 +80,27 @@ function ExperimentPageDashboard({ experiment_id }) {
{data.sessions || 0} session{data.sessions !== 1 ? "s" : ""}
- {uploadError && queueEntries.length === 0 && (
+ {uploadError && queueEntries.length === 0 && !showResolved && (
Data upload errors
)}
- {pendingUploads > 0 && (
-
- {pendingUploads} upload{pendingUploads !== 1 ? "s" : ""} waiting
-
- )}
- {pendingUploads === 0 && queueEntries.length > 0 && (
-
- {queueEntries.length} failed upload{queueEntries.length !== 1 ? "s" : ""}
+ {queueEntries.length > 0 && (
+ e.status === "failed") ? "red" : "orange"}
+ variant="solid"
+ px={2}
+ py={1}
+ >
+ {queueEntries.length} upload{queueEntries.length !== 1 ? "s" : ""} queued
)}
- {uploadError && queueEntries.length === 0 && }
- {queueEntries.length > 0 && }
+ {showResolved && }
+ {uploadError && queueEntries.length === 0 && !showResolved && }
+ {queueEntries.length > 0 && (
+
+ )}