From 58bf8dc08d50c656d6189bfbe4d37f634ebcf7b5 Mon Sep 17 00:00:00 2001 From: ArshVermaGit Date: Mon, 1 Jun 2026 04:28:42 +0530 Subject: [PATCH] fix(api): properly handle abort signals and controller state in SSE streams --- src/app/api/stream/route.ts | 14 +++++++++++--- test/sse-stream-route.test.ts | 11 +++++------ 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/app/api/stream/route.ts b/src/app/api/stream/route.ts index 228d0a1b..d997f08e 100644 --- a/src/app/api/stream/route.ts +++ b/src/app/api/stream/route.ts @@ -52,9 +52,12 @@ export async function GET(req: NextRequest) { let lastCheckedSyncedAt: string | null = null; let lastCheckedUnreadCount: number | null = null; + let isClosed = false; + const stream = new ReadableStream({ start(controller) { const checkData = async () => { + if (isClosed) return; try { const { data: goals } = await supabaseAdmin .from("goals") @@ -89,7 +92,7 @@ export async function GET(req: NextRequest) { lastCheckedUnreadCount = currentUnreadCount; } - if (hasChanges) { + if (hasChanges && !isClosed) { controller.enqueue(`data: ${JSON.stringify(payload)}\n\n`); } } catch (error) { @@ -101,12 +104,17 @@ export async function GET(req: NextRequest) { // guaranteed to be in place before any async work begins. This prevents // a race where abort() fires before the listener is attached. const interval = setInterval(() => { - checkData(); + if (!isClosed) checkData(); }, POLL_INTERVAL_MS); req.signal.addEventListener("abort", () => { + isClosed = true; clearInterval(interval); - controller.close(); + try { + controller.close(); + } catch (e) { + // ignore already closed + } // Decrement the connection counter so the slot becomes available again. const remaining = activeStreamConnections.get(userId) ?? 1; diff --git a/test/sse-stream-route.test.ts b/test/sse-stream-route.test.ts index 3d245a09..93030aa8 100644 --- a/test/sse-stream-route.test.ts +++ b/test/sse-stream-route.test.ts @@ -21,16 +21,15 @@ vi.mock("@/lib/supabase", () => ({ function makeRequest(): NextRequest { const controller = new AbortController(); - return new NextRequest("http://localhost/api/stream", { - signal: controller.signal, - }); + const req = new NextRequest("http://localhost/api/stream"); + Object.defineProperty(req, 'signal', { value: controller.signal }); + return req; } function makeAbortableRequest(): { req: NextRequest; abort: () => void } { const controller = new AbortController(); - const req = new NextRequest("http://localhost/api/stream", { - signal: controller.signal, - }); + const req = new NextRequest("http://localhost/api/stream"); + Object.defineProperty(req, 'signal', { value: controller.signal }); return { req, abort: () => controller.abort() }; }