diff --git a/frontend/app/api/chat/route.ts b/frontend/app/api/chat/route.ts index 9ce70cd..e17442d 100644 --- a/frontend/app/api/chat/route.ts +++ b/frontend/app/api/chat/route.ts @@ -1,5 +1,4 @@ import { NextResponse } from 'next/server'; -import { createServerClient } from '@/lib/langgraph-server'; import { retrievalAssistantStreamConfig } from '@/constants/graphConfigs'; export const runtime = 'edge'; @@ -9,105 +8,57 @@ export async function POST(req: Request) { const { message, threadId } = await req.json(); if (!message) { - return new NextResponse( - JSON.stringify({ error: 'Message is required' }), - { - status: 400, - headers: { 'Content-Type': 'application/json' }, - }, - ); + return NextResponse.json({ error: 'Message is required' }, { status: 400 }); } if (!threadId) { - return new NextResponse( - JSON.stringify({ error: 'Thread ID is required' }), - { - status: 400, - headers: { 'Content-Type': 'application/json' }, - }, - ); + return NextResponse.json({ error: 'Thread ID is required' }, { status: 400 }); } - if (!process.env.LANGGRAPH_RETRIEVAL_ASSISTANT_ID) { - return new NextResponse( - JSON.stringify({ - error: 'LANGGRAPH_RETRIEVAL_ASSISTANT_ID is not set', - }), - { status: 500, headers: { 'Content-Type': 'application/json' } }, - ); + const assistantId = process.env.LANGGRAPH_RETRIEVAL_ASSISTANT_ID; + if (!assistantId) { + return NextResponse.json({ error: 'LANGGRAPH_RETRIEVAL_ASSISTANT_ID is not set' }, { status: 500 }); } - try { - const assistantId = process.env.LANGGRAPH_RETRIEVAL_ASSISTANT_ID; - const serverClient = createServerClient(); + const apiUrl = process.env.NEXT_PUBLIC_LANGGRAPH_API_URL; + if (!apiUrl) { + return NextResponse.json({ error: 'LANGGRAPH_API_URL is not set' }, { status: 500 }); + } - const stream = await serverClient.client.runs.stream( - threadId, - assistantId, - { + const response = await fetch( + `${apiUrl}/threads/${threadId}/runs/stream`, + { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Api-Key': process.env.LANGCHAIN_API_KEY || '', + }, + body: JSON.stringify({ + assistant_id: assistantId, input: { query: message }, - streamMode: ['messages', 'updates'], + stream_mode: ['messages', 'updates'], config: { configurable: { ...retrievalAssistantStreamConfig, }, }, - }, - ); - - // Set up response as a stream - const encoder = new TextEncoder(); - const customReadable = new ReadableStream({ - async start(controller) { - try { - // Forward each chunk from the graph to the client - for await (const chunk of stream) { - // Only send relevant chunks - controller.enqueue( - encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`), - ); - } - } catch (error) { - console.error('Streaming error:', error); - controller.enqueue( - encoder.encode( - `data: ${JSON.stringify({ error: 'Streaming error occurred' })}\n\n`, - ), - ); - } finally { - controller.close(); - } - }, - }); + }), + }, + ); - // Return the stream with appropriate headers - return new Response(customReadable, { - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive', - }, - }); - } catch (error) { - // Handle streamRun errors - console.error('Stream initialization error:', error); - return new NextResponse( - JSON.stringify({ error: 'Internal server error' }), - { - status: 500, - headers: { 'Content-Type': 'application/json' }, - }, - ); + if (!response.ok || !response.body) { + return NextResponse.json({ error: 'Failed to connect to LangGraph server' }, { status: 502 }); } + + return new Response(response.body, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }, + }); } catch (error) { - // Handle JSON parsing errors console.error('Route error:', error); - return new NextResponse( - JSON.stringify({ error: 'Internal server error' }), - { - status: 500, - headers: { 'Content-Type': 'application/json' }, - }, - ); + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }); } } diff --git a/frontend/app/page.tsx b/frontend/app/page.tsx index 4feeb74..93b47a5 100644 --- a/frontend/app/page.tsx +++ b/frontend/app/page.tsx @@ -4,6 +4,7 @@ import type React from 'react'; import { useToast } from '@/hooks/use-toast'; import { useRef, useState, useEffect } from 'react'; +import { EventSourceParserStream } from 'eventsource-parser/stream'; import { Button } from '@/components/ui/button'; import { Input } from '@/components/ui/input'; import { Paperclip, ArrowUp, Loader2 } from 'lucide-react'; @@ -104,39 +105,31 @@ export default function Home() { throw new Error(`HTTP error! status: ${response.status}`); } - const reader = response.body?.getReader(); - if (!reader) throw new Error('No reader available'); + if (!response.body) throw new Error('No response body'); - const decoder = new TextDecoder(); + const eventStream = response.body + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()); + const reader = eventStream.getReader(); while (true) { const { done, value } = await reader.read(); if (done) break; - const chunkStr = decoder.decode(value); - const lines = chunkStr.split('\n').filter(Boolean); - - for (const line of lines) { - if (!line.startsWith('data: ')) continue; - - const sseString = line.slice('data: '.length); - let sseEvent: any; - try { - sseEvent = JSON.parse(sseString); - } catch (err) { - console.error('Error parsing SSE line:', err, line); - continue; - } - - const { event, data } = sseEvent; + const event = value.event || ''; + let data: any; + try { + data = JSON.parse(value.data); + } catch (err) { + continue; + } - if (event === 'messages/partial') { + if (event === 'messages/partial') { if (Array.isArray(data)) { const lastObj = data[data.length - 1]; if (lastObj?.type === 'ai') { const partialContent = lastObj.content ?? ''; - // Only display if content is a string message if ( typeof partialContent === 'string' && !partialContent.startsWith('{') @@ -168,17 +161,11 @@ export default function Home() { const retrievedDocs = (data as RetrieveDocumentsNodeUpdates) .retrieveDocuments.documents as PDFDocument[]; - // // Handle documents here lastRetrievedDocsRef.current = retrievedDocs; - console.log('Retrieved documents:', retrievedDocs); } else { - // Clear the last retrieved documents if it's a direct answer lastRetrievedDocsRef.current = []; } - } else { - console.log('Unknown SSE event:', event, data); } - } } } catch (error) { console.error('Error sending message:', error); diff --git a/frontend/package.json b/frontend/package.json index cb3a91b..1e37928 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -50,6 +50,7 @@ "cmdk": "1.0.4", "date-fns": "^3.0.0", "embla-carousel-react": "8.5.1", + "eventsource-parser": "^3.0.6", "formidable": "^3.5.2", "geist": "latest", "input-otp": "1.4.1", diff --git a/yarn.lock b/yarn.lock index 7c03826..1cafcdf 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4408,6 +4408,11 @@ eventemitter3@^5.0.1: resolved "https://registry.yarnpkg.com/eventemitter3/-/eventemitter3-5.0.4.tgz#a86d66170433712dde814707ac52b5271ceb1feb" integrity sha512-mlsTRyGaPBjPedk6Bvw+aqbsXDtoAyAzm5MO7JgU+yVRyMQ5O8bD4Kcci7BS85f93veegeCPkL8R4GLClnjLFw== +eventsource-parser@^3.0.6: + version "3.0.6" + resolved "https://registry.yarnpkg.com/eventsource-parser/-/eventsource-parser-3.0.6.tgz#292e165e34cacbc936c3c92719ef326d4aeb4e90" + integrity sha512-Vo1ab+QXPzZ4tCa8SwIHJFaSzy4R6SHf7BY79rFBDf0idraZWAkYrDjDj8uWaSm3S2TK+hJ7/t1CEmZ7jXw+pg== + execa@^5.0.0: version "5.1.1" resolved "https://registry.npmjs.org/execa/-/execa-5.1.1.tgz"