Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 80 additions & 5 deletions frontend/src/services/ragService.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,94 @@ export const askStream = (sessionId, sessionSecret, question, onChunk, onDone, o
throw new Error(data.error || `Ask failed (HTTP ${res.status})`);
}

if (!res.body) {
throw new Error('Streaming response is unavailable.');
}

const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let completed = false;

const handleEvent = (eventText) => {
if (!eventText) return;

let eventName = 'message';
const dataLines = [];

for (const rawLine of eventText.split('\n')) {
const line = rawLine.replace(/\r$/, '');
if (!line || line.startsWith(':')) {
continue;
}
if (line.startsWith('event:')) {
eventName = line.slice(6).trim();
continue;
}
if (line.startsWith('data:')) {
dataLines.push(line.slice(5).replace(/^ /, ''));
}
}

const data = dataLines.join('\n');

if (!data) {
return;
}

if (eventName === 'error') {
completed = true;
onError(data || 'Stream error');
return;
}

if (data === '[DONE]') {
completed = true;
onDone();
return;
}

onChunk(data);
};

while (true) {
const { done, value } = await reader.read();
if (done) break;
if (done) {
break;
}
buffer += decoder.decode(value, { stream: true });

const chunk = decoder.decode(value, { stream: true });
if (chunk) {
onChunk(chunk);
// Normalize CRLF line endings to LF so frames using CRLF are
// detected correctly (\r\n -> \n). This also tolerates mixed
// line endings from various servers.
buffer = buffer.replace(/\r\n/g, '\n').replace(/\r/g, '\n');

let separatorIndex = buffer.indexOf('\n\n');
while (separatorIndex !== -1) {
const eventText = buffer.slice(0, separatorIndex);
buffer = buffer.slice(separatorIndex + 2);
handleEvent(eventText);
if (completed) {
// Cancel the reader to release the connection promptly.
try {
await reader.cancel();
} catch (cancelErr) {
// ignore
}
return;
}
Comment on lines +155 to +164
separatorIndex = buffer.indexOf('\n\n');
}
Comment on lines +144 to 166
}
onDone();

buffer += decoder.decode();
if (buffer.trim()) {
handleEvent(buffer);
}

if (!completed) {
onDone();
}
} catch (err) {
if (err.name !== 'AbortError') {
onError(err.message || 'Stream error');
Expand Down
93 changes: 58 additions & 35 deletions rag-service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3281,8 +3281,8 @@
@app.post("/ask/stream")
def ask_question_stream(data: Question):
"""
Streaming variant of /ask. Returns the generated answer as a plain-text
chunked response so the frontend can render tokens progressively.
Streaming variant of /ask. Returns the generated answer as SSE so the
frontend can render tokens progressively.

Retrieval and evidence-gating are identical to /ask. Generation is run in
a background thread using TextIteratorStreamer so the HTTP response can
Expand All @@ -3292,6 +3292,18 @@
in both `protected_paths` (exact match) and under the `/ask/` prefix guard,
so it cannot be reached without a valid X-Internal-Token.
"""
def _sse_frame(text: str, event: str | None = None) -> str:
parts = []
if event:
parts.append(f"event: {event}")
normalized_text = str(text).replace("\r\n", "\n").replace("\r", "\n")
for line in normalized_text.split("\n"):
parts.append(f"data: {line}")
return "\n".join(parts) + "\n\n"

def _sse_done() -> str:
return "data: [DONE]\n\n"

question = (data.question or "").strip()
if not question:
raise HTTPException(status_code=400, detail="Question is required.")
Expand Down Expand Up @@ -3405,9 +3417,10 @@
_mark_session_dirty(session_id)

def _refuse_stream():
yield INSUFFICIENT_CONTEXT_MESSAGE
yield _sse_frame(INSUFFICIENT_CONTEXT_MESSAGE)
yield _sse_done()

return StreamingResponse(_refuse_stream(), media_type="text/plain; charset=utf-8")
return StreamingResponse(_refuse_stream(), media_type="text/event-stream; charset=utf-8")

context = format_context(docs)

Expand Down Expand Up @@ -3437,9 +3450,10 @@
_mark_session_dirty(session_id)

def _grounded_stream():
yield framed
yield _sse_frame(framed)
yield _sse_done()

return StreamingResponse(_grounded_stream(), media_type="text/plain; charset=utf-8")
return StreamingResponse(_grounded_stream(), media_type="text/event-stream; charset=utf-8")

# LLM generation path — run in a background thread so we can stream tokens
# back to the caller as they are produced rather than waiting for the full
Expand Down Expand Up @@ -3504,7 +3518,7 @@
token = data['choices'][0]['delta'].get('content', '')
if token:
full_answer_parts.append(token)
yield token
yield _sse_frame(token)
except Exception:
pass
except Exception as e:
Expand All @@ -3514,40 +3528,49 @@

full_answer = "".join(full_answer_parts).strip()

framed = apply_mode_framing(
full_answer,
question,
mode,
docs,
context,
)
try:
# Streamed tokens were already yielded above as they arrived.
# Now produce the final framed answer, persist the chat exchange,
# and send the final framed answer + done event.
framed = apply_mode_framing(full_answer, question, mode, docs, context)

if ASK_REQUIRE_CITATIONS and not answer_contains_citation(framed, len(docs)):
framed = full_answer
if ASK_REQUIRE_CITATIONS and not answer_contains_citation(framed, len(docs)):
framed = full_answer

citation_sources = [
citation_source_for_document(doc, idx)
for idx, doc in enumerate(docs)
]
citation_sources = [
citation_source_for_document(doc, idx)
for idx, doc in enumerate(docs)
]

# stream the final framed answer once at the end
yield framed
# stream the final framed answer once at the end
yield _sse_frame(framed)

with sessions_lock:
current_session = sessions.get(session_id)
if current_session:
ensure_retrieval_cache(current_session)
append_chat_exchange(
current_session,
question,
framed,
citation_sources,
mode,
)
_mark_session_dirty(session_id)
with sessions_lock:
current_session = sessions.get(session_id)
if current_session:
ensure_retrieval_cache(current_session)
append_chat_exchange(
current_session,
question,
framed,
citation_sources,
mode,
)
_mark_session_dirty(session_id)

yield _sse_done()
except Exception:
logger.exception("Stream generation failed session_id=%s", session_id)
yield _sse_frame("Generation error. Please try again.", event="error")
Comment on lines +3561 to +3564
# Emit an explicit done marker after the error so SSE clients
# that rely on an in-band completion token can handle the

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
# terminal state deterministically.
try:
yield _sse_done()
except Exception:
pass

return StreamingResponse(_generate_and_stream(), media_type="text/plain; charset=utf-8")
return StreamingResponse(_generate_and_stream(), media_type="text/event-stream; charset=utf-8")


def _run_generation_locked(model, generate_kwargs):
Expand Down
41 changes: 40 additions & 1 deletion server.js
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,18 @@ app.post("/ask/stream", inferenceSlowDown, inferenceLimiter, async (req, res) =>

const { question, session_id, mode } = validation.data;
const session_secret = validation.data.session_secret;
const upstreamAbort = new AbortController();
let upstreamStream = null;
let cleanedUp = false;

const cleanup = () => {
if (cleanedUp) return;
cleanedUp = true;
upstreamAbort.abort();
if (upstreamStream && typeof upstreamStream.destroy === "function") {
upstreamStream.destroy();
}
};

try {
const ragResponse = await axios.post(
Expand All @@ -1001,26 +1013,53 @@ app.post("/ask/stream", inferenceSlowDown, inferenceLimiter, async (req, res) =>
headers: ragAuthHeaders(),
responseType: "stream",
timeout: 120000,
signal: upstreamAbort.signal,
}
);

res.setHeader("Content-Type", "text/plain; charset=utf-8");
upstreamStream = ragResponse.data;

res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
res.setHeader("Transfer-Encoding", "chunked");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
res.flushHeaders();

req.on("close", cleanup);
res.on("close", cleanup);

ragResponse.data.pipe(res);

ragResponse.data.on("error", (err) => {
// Ensure we always clean up the upstream stream/abort state so the
// upstream request is destroyed promptly and background resources
// are released even when headers have already been sent.
try {
cleanup();
} catch (cleanupErr) {
// Ignore cleanup errors; still proceed to notify the client.
}

if (upstreamAbort.signal.aborted || req.aborted) {
return;
}

console.error("Stream error from RAG service:", err.message);
if (!res.headersSent) {
res.status(502).json({ error: "Streaming response failed." });
} else {
res.write("event: error\ndata: Streaming response failed.\n\n");
// End the response after signalling the error to the client.
res.end();
}
});

ragResponse.data.on("end", cleanup);
Comment on lines 1034 to +1058
} catch (err) {
if (upstreamAbort.signal.aborted || req.aborted || err.code === "ERR_CANCELED" || err.name === "CanceledError") {
return;
}
const statusCode = err.response?.status || (err.code === "ECONNREFUSED" ? 502 : 500);
const details = extractServiceDetails(err, "Error answering question");
console.error("Streaming question answering failed:", details);
Expand Down
Loading