Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 34 additions & 83 deletions frontend/app/api/chat/route.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { NextResponse } from 'next/server';
import { createServerClient } from '@/lib/langgraph-server';
import { retrievalAssistantStreamConfig } from '@/constants/graphConfigs';

export const runtime = 'edge';
Expand All @@ -9,105 +8,57 @@ export async function POST(req: Request) {
const { message, threadId } = await req.json();

if (!message) {
return new NextResponse(
JSON.stringify({ error: 'Message is required' }),
{
status: 400,
headers: { 'Content-Type': 'application/json' },
},
);
return NextResponse.json({ error: 'Message is required' }, { status: 400 });
}

if (!threadId) {
return new NextResponse(
JSON.stringify({ error: 'Thread ID is required' }),
{
status: 400,
headers: { 'Content-Type': 'application/json' },
},
);
return NextResponse.json({ error: 'Thread ID is required' }, { status: 400 });
}

if (!process.env.LANGGRAPH_RETRIEVAL_ASSISTANT_ID) {
return new NextResponse(
JSON.stringify({
error: 'LANGGRAPH_RETRIEVAL_ASSISTANT_ID is not set',
}),
{ status: 500, headers: { 'Content-Type': 'application/json' } },
);
const assistantId = process.env.LANGGRAPH_RETRIEVAL_ASSISTANT_ID;
if (!assistantId) {
return NextResponse.json({ error: 'LANGGRAPH_RETRIEVAL_ASSISTANT_ID is not set' }, { status: 500 });
}

try {
const assistantId = process.env.LANGGRAPH_RETRIEVAL_ASSISTANT_ID;
const serverClient = createServerClient();
const apiUrl = process.env.NEXT_PUBLIC_LANGGRAPH_API_URL;
if (!apiUrl) {
return NextResponse.json({ error: 'LANGGRAPH_API_URL is not set' }, { status: 500 });
}

const stream = await serverClient.client.runs.stream(
threadId,
assistantId,
{
const response = await fetch(
`${apiUrl}/threads/${threadId}/runs/stream`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Api-Key': process.env.LANGCHAIN_API_KEY || '',
},
body: JSON.stringify({
assistant_id: assistantId,
input: { query: message },
streamMode: ['messages', 'updates'],
stream_mode: ['messages', 'updates'],
config: {
configurable: {
...retrievalAssistantStreamConfig,
},
},
},
);

// Set up response as a stream
const encoder = new TextEncoder();
const customReadable = new ReadableStream({
async start(controller) {
try {
// Forward each chunk from the graph to the client
for await (const chunk of stream) {
// Only send relevant chunks
controller.enqueue(
encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`),
);
}
} catch (error) {
console.error('Streaming error:', error);
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ error: 'Streaming error occurred' })}\n\n`,
),
);
} finally {
controller.close();
}
},
});
}),
},
);

// Return the stream with appropriate headers
return new Response(customReadable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
});
} catch (error) {
// Handle streamRun errors
console.error('Stream initialization error:', error);
return new NextResponse(
JSON.stringify({ error: 'Internal server error' }),
{
status: 500,
headers: { 'Content-Type': 'application/json' },
},
);
if (!response.ok || !response.body) {
return NextResponse.json({ error: 'Failed to connect to LangGraph server' }, { status: 502 });
}

return new Response(response.body, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
});
} catch (error) {
// Handle JSON parsing errors
console.error('Route error:', error);
return new NextResponse(
JSON.stringify({ error: 'Internal server error' }),
{
status: 500,
headers: { 'Content-Type': 'application/json' },
},
);
return NextResponse.json({ error: 'Internal server error' }, { status: 500 });
}
}
41 changes: 14 additions & 27 deletions frontend/app/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type React from 'react';

import { useToast } from '@/hooks/use-toast';
import { useRef, useState, useEffect } from 'react';
import { EventSourceParserStream } from 'eventsource-parser/stream';
import { Button } from '@/components/ui/button';
import { Input } from '@/components/ui/input';
import { Paperclip, ArrowUp, Loader2 } from 'lucide-react';
Expand Down Expand Up @@ -104,39 +105,31 @@ export default function Home() {
throw new Error(`HTTP error! status: ${response.status}`);
}

const reader = response.body?.getReader();
if (!reader) throw new Error('No reader available');
if (!response.body) throw new Error('No response body');

const decoder = new TextDecoder();
const eventStream = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream());
const reader = eventStream.getReader();

while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunkStr = decoder.decode(value);
const lines = chunkStr.split('\n').filter(Boolean);

for (const line of lines) {
if (!line.startsWith('data: ')) continue;

const sseString = line.slice('data: '.length);
let sseEvent: any;
try {
sseEvent = JSON.parse(sseString);
} catch (err) {
console.error('Error parsing SSE line:', err, line);
continue;
}

const { event, data } = sseEvent;
const event = value.event || '';
let data: any;
try {
data = JSON.parse(value.data);
} catch (err) {
continue;
}

if (event === 'messages/partial') {
if (event === 'messages/partial') {
if (Array.isArray(data)) {
const lastObj = data[data.length - 1];
if (lastObj?.type === 'ai') {
const partialContent = lastObj.content ?? '';

// Only display if content is a string message
if (
typeof partialContent === 'string' &&
!partialContent.startsWith('{')
Expand Down Expand Up @@ -168,17 +161,11 @@ export default function Home() {
const retrievedDocs = (data as RetrieveDocumentsNodeUpdates)
.retrieveDocuments.documents as PDFDocument[];

// // Handle documents here
lastRetrievedDocsRef.current = retrievedDocs;
console.log('Retrieved documents:', retrievedDocs);
} else {
// Clear the last retrieved documents if it's a direct answer
lastRetrievedDocsRef.current = [];
}
} else {
console.log('Unknown SSE event:', event, data);
}
}
}
} catch (error) {
console.error('Error sending message:', error);
Expand Down
1 change: 1 addition & 0 deletions frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"cmdk": "1.0.4",
"date-fns": "^3.0.0",
"embla-carousel-react": "8.5.1",
"eventsource-parser": "^3.0.6",
"formidable": "^3.5.2",
"geist": "latest",
"input-otp": "1.4.1",
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4408,6 +4408,11 @@ eventemitter3@^5.0.1:
resolved "https://registry.yarnpkg.com/eventemitter3/-/eventemitter3-5.0.4.tgz#a86d66170433712dde814707ac52b5271ceb1feb"
integrity sha512-mlsTRyGaPBjPedk6Bvw+aqbsXDtoAyAzm5MO7JgU+yVRyMQ5O8bD4Kcci7BS85f93veegeCPkL8R4GLClnjLFw==

eventsource-parser@^3.0.6:
version "3.0.6"
resolved "https://registry.yarnpkg.com/eventsource-parser/-/eventsource-parser-3.0.6.tgz#292e165e34cacbc936c3c92719ef326d4aeb4e90"
integrity sha512-Vo1ab+QXPzZ4tCa8SwIHJFaSzy4R6SHf7BY79rFBDf0idraZWAkYrDjDj8uWaSm3S2TK+hJ7/t1CEmZ7jXw+pg==

execa@^5.0.0:
version "5.1.1"
resolved "https://registry.npmjs.org/execa/-/execa-5.1.1.tgz"
Expand Down
Loading