fix: standardize SSE streaming across frontend and backend#405
fix: standardize SSE streaming across frontend and backend#405Namraa310806 wants to merge 4 commits into
Conversation
|
@Namraa310806 is attempting to deploy a commit to the firefistisdead's projects Team on Vercel. A member of the Team first needs to authorize it. |
|
Caution Review failedFailed to post review comments 📝 WalkthroughWalkthroughThe PR standardizes /ask/stream end-to-end to Server-Sent Events: backend emits SSE frames for refusals, grounded answers, and tokens (with terminal ChangesSSE Streaming Standardization
Sequence DiagramsequenceDiagram
participant User as User/Client
participant Frontend as Frontend Parser
participant Gateway as Node Gateway
participant Backend as FastAPI Service
User->>Frontend: ask(question, onChunk, onDone)
Frontend->>Gateway: POST /ask/stream
Gateway->>Gateway: Create AbortController
Gateway->>Backend: Axios with signal
alt Evidence Gate Refused
Backend->>Gateway: SSE: data: refusal\n\n
Backend->>Gateway: SSE: data: [DONE]\n\n
else Grounded Answer
Backend->>Gateway: SSE: data: answer\n\n
Backend->>Gateway: SSE: data: [DONE]\n\n
else LLM Generation
loop Each Token
Backend->>Gateway: SSE: data: token\n\n
end
Backend->>Gateway: SSE: data: final_framed_answer\n\n
Backend->>Gateway: SSE: data: [DONE]\n\n
end
Gateway->>Frontend: Stream SSE events
loop Parse SSE Events
Frontend->>Frontend: Buffer & split on \n\n
Frontend->>Frontend: Parse event: and data: lines
alt Event Type: error
Frontend->>Frontend: onError()
else Data: [DONE]
Frontend->>Frontend: onDone()
else Other Data
Frontend->>Frontend: onChunk(text)
end
end
alt Client Disconnect
User->>Gateway: close
Gateway->>Gateway: cleanup()
Gateway->>Backend: abort()
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
| yield _sse_frame("Generation error. Please try again.", event="error") | ||
|
|
||
| return StreamingResponse(_generate_and_stream(), media_type="text/plain; charset=utf-8") | ||
| return StreamingResponse(_generate_and_stream(), media_type="text/event-stream; charset=utf-8") |
| ragResponse.data.on("error", (err) => { | ||
| if (upstreamAbort.signal.aborted || req.aborted) { | ||
| return; | ||
| } | ||
| console.error("Stream error from RAG service:", err.message); | ||
| if (!res.headersSent) { | ||
| res.status(502).json({ error: "Streaming response failed." }); | ||
| } else { | ||
| res.write("event: error\ndata: Streaming response failed.\n\n"); | ||
| res.end(); | ||
| } | ||
| }); | ||
|
|
||
| ragResponse.data.on("end", cleanup); |
| yield _sse_done() | ||
| except Exception: | ||
| logger.exception("Stream generation failed session_id=%s", session_id) | ||
| yield _sse_frame("Generation error. Please try again.", event="error") |
| buffer += decoder.decode(value, { stream: true }); | ||
|
|
||
| let separatorIndex = buffer.indexOf('\n\n'); | ||
| while (separatorIndex !== -1) { | ||
| const eventText = buffer.slice(0, separatorIndex); | ||
| buffer = buffer.slice(separatorIndex + 2); | ||
| handleEvent(eventText); | ||
| if (completed) { | ||
| return; | ||
| } | ||
| separatorIndex = buffer.indexOf('\n\n'); | ||
| } |
| handleEvent(eventText); | ||
| if (completed) { | ||
| return; | ||
| } |
Summary
This PR fixes a streaming protocol mismatch between the frontend, Express gateway, and FastAPI RAG service.
Previously, the frontend expected Server-Sent Events (SSE) formatted messages (
data: ...and[DONE]), while the backend streamed raw text chunks. This inconsistency could cause incomplete rendering, missed completion events, hanging loading states, and unreliable streaming behavior.Changes Made
Standardized
/ask/streamto use SSE-compatible framing end-to-end.Added SSE event formatting in the FastAPI streaming endpoint.
Added explicit completion signaling using:
data: [DONE]Preserved incremental token streaming behavior.
Updated frontend stream parsing to properly handle:
Improved stream completion handling to ensure
onDone()fires reliably.Preserved existing abort/cancellation behavior.
Kept retrieval, generation, and answer quality logic unchanged.
Related issue
Fixes: #360
Testing
Verified Scenarios
onDone().Checklist:
Screenshots / recordings
N/A – Streaming protocol and backend/frontend compatibility fix.
Notes
This PR is intentionally focused on resolving the
/ask/streamprotocol mismatch.No changes were made to:
The change only standardizes the transport protocol used for streaming responses and improves stream lifecycle reliability.
Security
Summary by CodeRabbit
New Features
Bug Fixes