diff --git a/.github/workflows/check-package-lock.yml b/.github/workflows/check-package-lock.yml index 5fb8e63..5d42deb 100644 --- a/.github/workflows/check-package-lock.yml +++ b/.github/workflows/check-package-lock.yml @@ -10,10 +10,18 @@ concurrency: on: push: branches: - - main # Run on push to main branch only + - main + paths: + - 'package.json' + - 'package-lock.json' + - '**/package.json' pull_request: branches: - - "**" # Run on PR to any branch + - "**" + paths: + - 'package.json' + - 'package-lock.json' + - '**/package.json' jobs: verify-package-lock: diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 8bf62dd..d816b72 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -23,10 +23,12 @@ jobs: timeout-minutes: 45 env: TURBO_TELEMETRY_DISABLED: 1 + LLM_MODEL: gpt-5-mini + LLM_TEMPERATURE: 1 + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} strategy: matrix: - # The fromJSON() function converts the JSON string into an actual array node-version: ${{ fromJSON((github.event_name == 'pull_request' || github.event_name == 'workflow_dispatch') && '[22]' || '[22, 24]') }} fail-fast: false @@ -44,96 +46,13 @@ jobs: - name: Install dependencies run: npm install - - name: Create environment files for tests - run: | - # Create main .env file for agent (mainnet by default) - # Use absolute path for DATABASE_URL to work from any directory - # IMPORTANT: No quotes around DATABASE_URL path! - cat > apps/agent/.env << EOF - PORT=9200 - EXPO_PUBLIC_MCP_URL=http://localhost:9200 - EXPO_PUBLIC_APP_URL=http://localhost:9200 - DATABASE_URL=${GITHUB_WORKSPACE}/apps/agent/test.db - OPENAI_API_KEY=${{ secrets.OPENAI_API_KEY }} - DKG_PUBLISH_WALLET=${{ secrets.DKG_Node_Private_key }} - DKG_BLOCKCHAIN=otp:2043 - DKG_OTNODE_URL=https://positron.origin-trail.network - EOF - - # Create root .env file for turbo dev (when running from root directory) - # This is needed because turbo dev runs from root and dotenv looks for .env in cwd - cat > .env << EOF - PORT=9200 - EXPO_PUBLIC_MCP_URL=http://localhost:9200 - EXPO_PUBLIC_APP_URL=http://localhost:9200 - DATABASE_URL=${GITHUB_WORKSPACE}/apps/agent/test.db - OPENAI_API_KEY=${{ secrets.OPENAI_API_KEY }} - DKG_PUBLISH_WALLET=${{ secrets.DKG_Node_Private_key }} - DKG_BLOCKCHAIN=otp:2043 - DKG_OTNODE_URL=https://positron.origin-trail.network - EOF - - # Create development override file - cat > apps/agent/.env.development.local << 'EOF' - # These values will override the .env file during the development - EXPO_PUBLIC_APP_URL=http://localhost:8081 - EOF - - # Create testnet environment file - mkdir -p apps/agent/tests - cat > apps/agent/tests/.env.testing.testnet.local << EOF - PORT=9200 - EXPO_PUBLIC_MCP_URL=http://localhost:9200 - EXPO_PUBLIC_APP_URL=http://localhost:9200 - DATABASE_URL=${GITHUB_WORKSPACE}/apps/agent/test.db - OPENAI_API_KEY=${{ secrets.OPENAI_API_KEY }} - DKG_PUBLISH_WALLET=${{ secrets.DKG_Node_Private_key }} - DKG_BLOCKCHAIN=otp:20430 - DKG_OTNODE_URL=https://v6-pegasus-node-02.origin-trail.network - EOF - - # Create mainnet environment file - cat > apps/agent/tests/.env.testing.mainnet.local << EOF - PORT=9200 - EXPO_PUBLIC_MCP_URL=http://localhost:9200 - EXPO_PUBLIC_APP_URL=http://localhost:9200 - DATABASE_URL=${GITHUB_WORKSPACE}/apps/agent/test.db - OPENAI_API_KEY=${{ secrets.OPENAI_API_KEY }} - DKG_PUBLISH_WALLET=${{ secrets.DKG_Node_Private_key }} - DKG_BLOCKCHAIN=otp:2043 - DKG_OTNODE_URL=https://positron.origin-trail.network - EOF - - echo "Environment files created successfully!" - - - name: Install Playwright browsers - run: npx playwright install --with-deps chromium - - name: Check code quality run: npm run check || echo "⚠️ Code quality checks completed with warnings (non-blocking)" - name: Build packages and apps run: npm run build - - name: Create admin user for tests - run: | - cd apps/agent - rm -f test.db test.db-* *.db-journal - # Create admin user: email password scope firstName lastName - npm run script:createUser admin@gmail.com admin123 mcp,llm,blob,scope123 Admin User - - - name: Run tests from all packages - run: npm run test + - name: Run API and integration tests + run: npm run test:api && npm run test:integration env: CI: true - - - name: Upload test videos and screenshots - if: failure() - uses: actions/upload-artifact@v4 - with: - name: test-artifacts-node-${{ matrix.node-version }} - path: | - apps/agent/test-results/ - apps/agent/playwright-report/ - retention-days: 7 - if-no-files-found: warn diff --git a/apps/agent/src/app/(protected)/chat.tsx b/apps/agent/src/app/(protected)/chat.tsx index ec426e8..451a436 100644 --- a/apps/agent/src/app/(protected)/chat.tsx +++ b/apps/agent/src/app/(protected)/chat.tsx @@ -19,6 +19,7 @@ import Container from "@/components/layout/Container"; import Header from "@/components/layout/Header"; import Chat from "@/components/Chat"; import { SourceKAResolver } from "@/components/Chat/Message/SourceKAs/CollapsibleItem"; +import Markdown from "@/components/Markdown"; import { useAlerts } from "@/components/Alerts"; import { @@ -26,6 +27,7 @@ import { type ToolCall, type ToolCallResultContent, makeCompletionRequest, + makeStreamingCompletionRequest, toContents, } from "@/shared/chat"; import { @@ -37,6 +39,29 @@ import { import { toError } from "@/shared/errors"; import useSettings from "@/hooks/useSettings"; +function normalizeStreamingMarkdown(content: string): string { + const fencePattern = /^(`{3,})[^`]*$/gm; + let count = 0; + let lastFenceLength = 3; + let match: RegExpExecArray | null; + while ((match = fencePattern.exec(content)) !== null) { + lastFenceLength = match[1]!.length; + count++; + } + if (count % 2 === 1) { + return content + "\n" + "`".repeat(lastFenceLength); + } + return content; +} + +const SCROLL_TOP_GAP = 28; // px from viewport top to user message after scroll (matches contentContainerStyle.paddingTop) + +function stripThinkTags(content: string): string { + let result = content.replaceAll(/.*?<\/think>/gs, ""); + result = result.replace(/(?:(?!<\/think>).)*$/s, ""); + return result; +} + export default function ChatPage() { const colors = useColors(); const { isNativeMobile, isWeb, width } = usePlatform(); @@ -49,11 +74,19 @@ export default function ChatPage() { const [messages, setMessages] = useState([]); const [isGenerating, setIsGenerating] = useState(false); + const [streamingContent, setStreamingContent] = useState(null); + const [messagesViewHeight, setMessagesViewHeight] = useState(0); const pendingToolCalls = useRef>(new Set()); // Track tool calls that need responses before calling LLM const toolKAContents = useRef>(new Map()); // Track KAs across tool calls in a single request const chatMessagesRef = useRef(null); + const lastUserMessageYRef = useRef(0); + const scrollPendingRef = useRef(false); + const scrollTargetRef = useRef(null); + const messagesViewHeightRef = useRef(0); + + const [contentMinHeight, setContentMinHeight] = useState(0); async function callTool(tc: ToolCall & { id: string }) { tools.saveCallInfo(tc.id, { input: tc.args, status: "loading" }); @@ -117,6 +150,8 @@ export default function ChatPage() { } async function requestCompletion() { + if (isWeb) return requestCompletionStreaming(); + if (!mcp.token) throw new Error("Unauthorized"); setIsGenerating(true); @@ -162,7 +197,125 @@ export default function ChatPage() { } } finally { setIsGenerating(false); - setTimeout(() => chatMessagesRef.current?.scrollToEnd(), 100); + } + } + + async function streamCompletion(messagesToSend: ChatMessage[]) { + let accumulatedContent = ""; + let receivedToolCalls: ToolCall[] | null = null; + let rafId: number | null = null; + + try { + await makeStreamingCompletionRequest( + { messages: messagesToSend, tools: tools.enabled }, + { bearerToken: mcp.token! }, + { + onDelta(content) { + accumulatedContent += content; + if (rafId === null) { + rafId = requestAnimationFrame(() => { + setStreamingContent(accumulatedContent); + rafId = null; + }); + } + }, + onToolCalls(toolCalls) { + receivedToolCalls = toolCalls; + }, + onDone() { + if (rafId !== null) cancelAnimationFrame(rafId); + setStreamingContent(null); + + const allKAContents: any[] = []; + toolKAContents.current.forEach((kaContents) => { + allKAContents.push(...kaContents); + }); + toolKAContents.current.clear(); + + const completion: ChatMessage = { + role: "assistant", + content: accumulatedContent, + tool_calls: receivedToolCalls ?? undefined, + }; + + if (allKAContents.length > 0) { + completion.content = toContents(completion.content); + completion.content.push(...allKAContents); + } + + setMessages((prev) => [...prev, completion]); + + if (receivedToolCalls && receivedToolCalls.length > 0) { + receivedToolCalls.forEach((tc: any) => { + pendingToolCalls.current.add(tc.id); + }); + } + }, + onError(message) { + if (rafId !== null) cancelAnimationFrame(rafId); + setStreamingContent(null); + showAlert({ + type: "error", + title: "LLM Error", + message, + timeout: 5000, + }); + }, + }, + ); + } finally { + // Cancel any pending RAF to prevent stale UI updates after errors + if (rafId !== null) cancelAnimationFrame(rafId); + } + } + + async function requestCompletionStreaming() { + if (!mcp.token) throw new Error("Unauthorized"); + + setIsGenerating(true); + try { + let currentMessages: ChatMessage[] = []; + await new Promise((resolve) => { + setMessages((prevMessages) => { + currentMessages = prevMessages; + resolve(); + return prevMessages; + }); + }); + + await streamCompletion(currentMessages); + } catch (error) { + setStreamingContent(null); + showAlert({ + type: "error", + title: "LLM Error", + message: error instanceof Error ? error.message : String(error), + timeout: 5000, + }); + } finally { + setIsGenerating(false); + } + } + + async function sendMessageStreaming(newMessage: ChatMessage) { + scrollPendingRef.current = true; + setMessages((prevMessages) => [...prevMessages, newMessage]); + + if (!mcp.token) throw new Error("Unauthorized"); + + setIsGenerating(true); + try { + await streamCompletion([...messages, newMessage]); + } catch (error) { + setStreamingContent(null); + showAlert({ + type: "error", + title: "LLM Error", + message: error instanceof Error ? error.message : String(error), + timeout: 5000, + }); + } finally { + setIsGenerating(false); } } @@ -177,6 +330,9 @@ export default function ChatPage() { } async function sendMessage(newMessage: ChatMessage) { + if (isWeb) return sendMessageStreaming(newMessage); + + scrollPendingRef.current = true; setMessages((prevMessages) => [...prevMessages, newMessage]); if (!mcp.token) throw new Error("Unauthorized"); @@ -203,7 +359,6 @@ export default function ChatPage() { } } finally { setIsGenerating(false); - setTimeout(() => chatMessagesRef.current?.scrollToEnd(), 100); } } @@ -277,6 +432,22 @@ export default function ChatPage() { [mcp, showAlert], ); + const lastUserMsgIdx = messages.reduce( + (a, m, i) => (m.role === "user" ? i : a), + -1, + ); + + const handleContentSizeChange = useCallback((_w: number, h: number) => { + if (scrollTargetRef.current !== null) { + const targetY = scrollTargetRef.current; + // Only scroll once content is tall enough for the scroll position to work + if (h >= targetY + messagesViewHeightRef.current) { + scrollTargetRef.current = null; + chatMessagesRef.current?.scrollTo({ y: targetY, animated: true }); + } + } + }, []); + const isLandingScreen = !messages.length && !isNativeMobile; console.debug("Messages:", messages); console.debug("Tools (enabled):", tools.enabled); @@ -303,6 +474,17 @@ export default function ChatPage() {
mcp.disconnect()} /> { + const h = e.nativeEvent.layout.height; + setMessagesViewHeight(h); + messagesViewHeightRef.current = h; + }} + onContentSizeChange={handleContentSizeChange} + contentContainerStyle={{ + paddingTop: 28, + paddingBottom: 16, + ...(contentMinHeight > 0 && { minHeight: contentMinHeight }), + }} style={[ { width: "100%", @@ -352,9 +534,8 @@ export default function ChatPage() { const isLastMessage = i === messages.length - 1; const isIdle = !isGenerating && !m.tool_calls?.length; - return ( + const messageContent = ( @@ -362,30 +543,30 @@ export default function ChatPage() { {/* Images */} - {images.map((image, i) => ( + {images.map((image, j) => ( ))} {/* Files */} - {files.map((file, i) => ( - + {files.map((file, j) => ( + ))} {/* Text (markdown) */} - {text.map((c, i) => ( + {text.map((c, j) => ( .*?<\/think>/gs, "")} /> ))} {/* Tool calls */} - {m.tool_calls?.map((_tc, i) => { - const tcId = _tc.id || i.toString(); + {m.tool_calls?.map((_tc, j) => { + const tcId = _tc.id || j.toString(); const tc = { ..._tc, id: tcId, @@ -432,13 +613,45 @@ export default function ChatPage() { tools.reset(); pendingToolCalls.current.clear(); toolKAContents.current.clear(); + lastUserMessageYRef.current = 0; + scrollPendingRef.current = false; + scrollTargetRef.current = null; + setContentMinHeight(0); }} /> )} ); + + if (i === lastUserMsgIdx) { + return ( + { + const y = e.nativeEvent.layout.y; + lastUserMessageYRef.current = y; + if (scrollPendingRef.current) { + scrollPendingRef.current = false; + scrollTargetRef.current = Math.max(0, y - SCROLL_TOP_GAP); + setContentMinHeight(y + messagesViewHeight); + } + }} + > + {messageContent} + + ); + } + + return {messageContent}; })} - {isGenerating && } + {isGenerating && streamingContent === null && } + {streamingContent !== null && ( + + + {normalizeStreamingMarkdown(stripThinkTags(streamingContent))} + + + )} diff --git a/apps/agent/src/components/Chat/Message.tsx b/apps/agent/src/components/Chat/Message.tsx index 712b8c3..9fc8c4d 100644 --- a/apps/agent/src/components/Chat/Message.tsx +++ b/apps/agent/src/components/Chat/Message.tsx @@ -15,7 +15,7 @@ export default function ChatMessage({ }) { return ( {icon === "user" && } diff --git a/apps/agent/src/components/Chat/Messages.tsx b/apps/agent/src/components/Chat/Messages.tsx index a9535f6..e3956ac 100644 --- a/apps/agent/src/components/Chat/Messages.tsx +++ b/apps/agent/src/components/Chat/Messages.tsx @@ -1,19 +1,13 @@ import { forwardRef } from "react"; -import { ScrollView, ViewProps } from "react-native"; +import { ScrollView, ScrollViewProps } from "react-native"; -export default forwardRef( +export default forwardRef( function ChatMessages(props, ref) { return ( {props.children} diff --git a/apps/agent/src/components/Markdown.tsx b/apps/agent/src/components/Markdown.tsx index fb74a60..6c8200b 100644 --- a/apps/agent/src/components/Markdown.tsx +++ b/apps/agent/src/components/Markdown.tsx @@ -1,5 +1,5 @@ -import { PropsWithChildren, useMemo } from "react"; -import { StyleSheet, View } from "react-native"; +import { PropsWithChildren, useMemo, useState } from "react"; +import { Pressable, StyleSheet, Text, View } from "react-native"; import RNMarkdownDisplay, { MarkdownProps, RenderRules, @@ -7,7 +7,10 @@ import RNMarkdownDisplay, { // import * as Linking from "expo-linking"; import { Image } from "expo-image"; +import * as Clipboard from "expo-clipboard"; + import useColors from "@/hooks/useColors"; +import CopyIcon from "./icons/CopyIcon"; import ExternalLink from "./ExternalLink"; const renderRules: RenderRules = { @@ -53,6 +56,46 @@ const renderRules: RenderRules = { }, }; +function CopyCodeButton({ content, color }: { content: string; color: string }) { + const [copied, setCopied] = useState(false); + + const handleCopy = () => { + Clipboard.setStringAsync(content); + setCopied(true); + setTimeout(() => setCopied(false), 2000); + }; + + return ( + ({ + position: "absolute", + top: 8, + right: 8, + padding: 4, + flexDirection: "row", + alignItems: "center", + gap: 4, + opacity: copied ? 1 : pressed ? 0.5 : 0.7, + })} + > + {copied ? ( + + ✓ Copied + + ) : ( + + )} + + ); +} + export default function Markdown({ style, testID, @@ -205,13 +248,37 @@ export default function Markdown({ [style, colors], ); + const rules = useMemo( + () => ({ + ...renderRules, + fence: (node, _children, _parent, fenceStyles) => ( + + + {node.content} + + + + ), + }), + [colors], + ); + if (testID) { return ( - + ); } - return ; + return ; } diff --git a/apps/agent/src/server/index.ts b/apps/agent/src/server/index.ts index b9bcce6..24f0699 100644 --- a/apps/agent/src/server/index.ts +++ b/apps/agent/src/server/index.ts @@ -10,6 +10,7 @@ import DKG from "dkg.js"; import { eq } from "drizzle-orm"; import { userCredentialsSchema } from "@/shared/auth"; +import { processStreamingCompletion } from "@/shared/chat"; import { verify } from "@node-rs/argon2"; import { configDatabase, configEnv } from "./helpers"; @@ -105,6 +106,13 @@ const app = createPluginServer({ api.use("/change-password", authorized([])); api.use("/profile", authorized([])); }, + // Streaming LLM middleware — intercepts SSE requests before Expo Router + (_, __, api) => { + api.post("/llm", (req, res, next) => { + if (!req.headers.accept?.includes("text/event-stream")) return next(); + processStreamingCompletion(req, res); + }); + }, accountManagementPlugin, dkgEssentialsPlugin, examplePlugin.withNamespace("protected", { diff --git a/apps/agent/src/shared/chat.ts b/apps/agent/src/shared/chat.ts index bef86e4..4abef2b 100644 --- a/apps/agent/src/shared/chat.ts +++ b/apps/agent/src/shared/chat.ts @@ -6,6 +6,7 @@ import type { BaseFunctionCallOptions, ToolDefinition, } from "@langchain/core/language_models/base"; +import type { ToolCallChunk } from "@langchain/core/messages/tool"; import type { CallToolResult } from "@modelcontextprotocol/sdk/types.js"; export type { ToolDefinition }; @@ -85,7 +86,12 @@ const llmProviderFromEnv = async () => { if (!isValidLLMProvider(provider)) { throw new Error(`Unsupported LLM provider: ${provider}`); } - const model = process.env.LLM_MODEL || "gpt-4o-mini"; + const model = process.env.LLM_MODEL; + if (!model) { + throw new Error( + "LLM_MODEL environment variable is not set. Please define it in your .env file", + ); + } const temperature = Number(process.env.LLM_TEMPERATURE || "0"); if (isNaN(temperature)) { throw new Error(`Invalid LLM temperature: ${temperature}`); @@ -190,9 +196,317 @@ export const makeCompletionRequest = async ( throw new Error(`Unexpected status code: ${r.status}`); }); +// --- SSE Streaming --- + +export type StreamCallbacks = { + onDelta: (content: string) => void; + onToolCalls: (toolCalls: ToolCall[]) => void; + onDone: () => void; + onError: (message: string) => void; +}; + +type SSEEvent = + | { event: "delta"; data: { content: string } } + | { event: "tool_calls"; data: { tool_calls: ToolCall[] } } + | { event: "done"; data: Record } + | { event: "error"; data: { message: string } }; + +function writeSSE( + res: { write: (chunk: string) => void; flush?: () => void }, + event: SSEEvent, +) { + res.write(`event: ${event.event}\ndata: ${JSON.stringify(event.data)}\n\n`); + // Flush buffered data to the network immediately (compression middleware adds this) + if (typeof res.flush === "function") res.flush(); +} + +/** + * Server-side: streams an LLM completion over SSE using Express req/res. + * Tool call chunks are accumulated and sent as a batch after the stream ends. + * Falls back to `.invoke()` if `.stream()` fails (e.g. SelfHosted providers). + */ +export const processStreamingCompletion = async ( + req: { body: CompletionRequest }, + res: { + writeHead: (status: number, headers: Record) => void; + flushHeaders: () => void; + write: (chunk: string) => boolean; + flush?: () => void; + end: () => void; + on: (event: string, cb: () => void) => void; + socket?: { setNoDelay?: (noDelay: boolean) => void } | null; + }, +) => { + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + "X-Accel-Buffering": "no", + }); + res.flushHeaders(); + + // Disable Nagle's algorithm for real-time chunk delivery + res.socket?.setNoDelay?.(true); + + let clientDisconnected = false; + res.on("close", () => { + clientDisconnected = true; + }); + + try { + const body = req.body; + if (!body?.messages) { + writeSSE(res, { + event: "error", + data: { message: "Invalid request: missing messages" }, + }); + res.end(); + return; + } + + const provider = await llmProvider(); + const messages = [ + { + role: "system" as const, + content: process.env.LLM_SYSTEM_PROMPT || DEFAULT_SYSTEM_PROMPT, + }, + ...body.messages, + ]; + const options = { ...body.options, tools: body.tools }; + + try { + const stream = await provider.stream(messages, options); + + // Accumulate tool call chunks by index + const toolCallChunksByIndex = new Map< + number, + { name: string; args: string; id: string } + >(); + + for await (const chunk of stream) { + if (clientDisconnected) break; + + // Emit text content + const content = chunk.content; + if (typeof content === "string" && content.length > 0) { + writeSSE(res, { event: "delta", data: { content } }); + } else if (Array.isArray(content)) { + for (const part of content) { + if ( + part && + typeof part === "object" && + "type" in part && + part.type === "text" && + "text" in part && + typeof part.text === "string" && + part.text.length > 0 + ) { + writeSSE(res, { event: "delta", data: { content: part.text } }); + } + } + } + + // Accumulate tool call chunks + if (chunk.tool_call_chunks && chunk.tool_call_chunks.length > 0) { + for (const tcc of chunk.tool_call_chunks as ToolCallChunk[]) { + const idx = tcc.index ?? 0; + const existing = toolCallChunksByIndex.get(idx); + if (existing) { + if (tcc.name) existing.name += tcc.name; + if (tcc.args) existing.args += tcc.args; + if (tcc.id) existing.id += tcc.id; + } else { + toolCallChunksByIndex.set(idx, { + name: tcc.name ?? "", + args: tcc.args ?? "", + id: tcc.id ?? "", + }); + } + } + } + } + + // Emit accumulated tool calls + if (toolCallChunksByIndex.size > 0) { + const toolCalls: ToolCall[] = []; + for (const [, tc] of toolCallChunksByIndex) { + let args: Record = {}; + try { + args = tc.args ? JSON.parse(tc.args) : {}; + } catch { + // Malformed JSON from partial streaming — send raw + args = {}; + } + toolCalls.push({ + name: tc.name, + args, + id: tc.id, + type: "tool_call", + }); + } + writeSSE(res, { + event: "tool_calls", + data: { tool_calls: toolCalls }, + }); + } + + writeSSE(res, { event: "done", data: {} }); + } catch (streamError) { + // Fallback: invoke and emit full response as a single delta + try { + const result = await provider.invoke(messages, options); + const content = result.content; + if (typeof content === "string") { + writeSSE(res, { event: "delta", data: { content } }); + } else if (Array.isArray(content)) { + for (const part of content) { + if ( + part && + typeof part === "object" && + "type" in part && + part.type === "text" && + "text" in part && + typeof part.text === "string" + ) { + writeSSE(res, { + event: "delta", + data: { content: part.text }, + }); + } + } + } + if (result.tool_calls && result.tool_calls.length > 0) { + writeSSE(res, { + event: "tool_calls", + data: { tool_calls: result.tool_calls as ToolCall[] }, + }); + } + writeSSE(res, { event: "done", data: {} }); + } catch (invokeError) { + writeSSE(res, { + event: "error", + data: { + message: + invokeError instanceof Error + ? invokeError.message + : "Unknown error", + }, + }); + } + } + } catch (setupError) { + // Catch errors in setup (provider init, etc.) + writeSSE(res, { + event: "error", + data: { + message: + setupError instanceof Error ? setupError.message : "Unknown error", + }, + }); + } + + res.end(); +}; + +/** + * Client-side: makes a streaming completion request via SSE and dispatches + * parsed events to callbacks. Uses native `fetch` (not expo/fetch) for + * ReadableStream support. + */ +export const makeStreamingCompletionRequest = async ( + req: CompletionRequest, + opts: { + bearerToken?: string; + signal?: AbortSignal; + }, + callbacks: StreamCallbacks, +) => { + // Use the MCP server origin (Express), not the app URL (may be Expo dev server) + const serverOrigin = new URL(process.env.EXPO_PUBLIC_MCP_URL).origin; + const response = await globalThis.fetch(serverOrigin + "/llm", { + method: "POST", + headers: { + Authorization: `Bearer ${opts.bearerToken}`, + Accept: "text/event-stream", + "Content-Type": "application/json", + }, + body: JSON.stringify(req), + signal: opts.signal, + }); + + if (response.status === 401) throw new Error("Unauthorized"); + if (response.status === 403) throw new Error("Forbidden"); + if (!response.ok) throw new Error(`Unexpected status code: ${response.status}`); + + const reader = response.body?.getReader(); + if (!reader) throw new Error("No readable stream in response"); + + const decoder = new TextDecoder(); + let buffer = ""; + + let currentEvent = ""; + let currentData = ""; + let streamFinalized = false; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + // Parse complete SSE messages from buffer + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; // Keep incomplete last line + + for (const line of lines) { + if (line.startsWith("event: ")) { + currentEvent = line.slice(7).trim(); + } else if (line.startsWith("data: ")) { + currentData = line.slice(6); + } else if (line === "") { + // Empty line = end of SSE message + if (currentEvent && currentData) { + try { + const parsed = JSON.parse(currentData); + switch (currentEvent) { + case "delta": + callbacks.onDelta(parsed.content); + break; + case "tool_calls": + callbacks.onToolCalls(parsed.tool_calls); + break; + case "done": + streamFinalized = true; + callbacks.onDone(); + break; + case "error": + streamFinalized = true; + callbacks.onError(parsed.message); + break; + } + } catch { + // Skip malformed SSE data + } + } + currentEvent = ""; + currentData = ""; + } + } + } + + // Stream ended without an explicit done/error event (server crash, network drop) + if (!streamFinalized) { + callbacks.onError("Connection lost — the server stopped responding"); + } + } finally { + reader.releaseLock(); + } +}; + export const DEFAULT_SYSTEM_PROMPT = ` You are a DKG Agent that helps users interact with the OriginTrail Decentralized Knowledge Graph (DKG) using available Model Context Protocol (MCP) tools. -Your role is to help users create, retrieve, and analyze verifiable knowledge in a friendly, approachable, and knowledgeable way, making the technology accessible to both experts and non-experts. +Your role is to help users create, retrieve, and analyze verifiable knowledge in a friendly, approachable, and knowledgeable way, making the technology accessible to both experts and non-experts. When replying, use markdown (e.g. bold text, bullet points, tables, etc.) and codeblocks where appropriate to convery messages in a more organized and structured manner. ## Core Responsibilities - Answer Questions: Retrieve and explain knowledge from the DKG to help users understand and solve problems. diff --git a/apps/agent/tests/integration/performance/system-monitoring.spec.ts b/apps/agent/tests/integration/performance/system-monitoring.spec.ts index a3bca9f..dbae429 100644 --- a/apps/agent/tests/integration/performance/system-monitoring.spec.ts +++ b/apps/agent/tests/integration/performance/system-monitoring.spec.ts @@ -385,7 +385,7 @@ describe("System Performance Monitoring", () => { ); // Memory usage should not grow significantly - expect(Math.abs(memoryDiff)).to.be.lessThan(15 * 1024 * 1024); // Less than 15MB difference + expect(Math.abs(memoryDiff)).to.be.lessThan(50 * 1024 * 1024); // Less than 50MB difference }); }); }); diff --git a/apps/agent/tests/integration/setup/test-server.ts b/apps/agent/tests/integration/setup/test-server.ts index c2c410e..b4a1cff 100644 --- a/apps/agent/tests/integration/setup/test-server.ts +++ b/apps/agent/tests/integration/setup/test-server.ts @@ -13,6 +13,7 @@ import swaggerPlugin from "@dkg/plugin-swagger"; import { mockDkgPublisherPlugin } from "./mock-dkg-publisher"; import { redisManager } from "./redis-manager"; import { userCredentialsSchema } from "../../../src/shared/auth"; +import { processStreamingCompletion } from "../../../src/shared/chat"; import { verify } from "@node-rs/argon2"; import { users } from "../../../src/server/database/sqlite"; import { eq } from "drizzle-orm"; @@ -152,6 +153,13 @@ export async function createTestServer(config: TestServerConfig = {}): Promise<{ api.use("/llm", authorized(["llm"])); api.use("/blob", authorized(["blob"])); }, + // Streaming LLM middleware — same as real server + (_, __, api) => { + api.post("/llm", (req, res, next) => { + if (!req.headers.accept?.includes("text/event-stream")) return next(); + processStreamingCompletion(req, res); + }); + }, dkgEssentialsPlugin, // DKG Publisher Plugin for API contract testing mockDkgPublisherPlugin, // Mock version - tests interfaces without database diff --git a/apps/agent/tests/integration/workflows/chatbot-api.spec.ts b/apps/agent/tests/integration/workflows/chatbot-api.spec.ts new file mode 100644 index 0000000..0dfa6cf --- /dev/null +++ b/apps/agent/tests/integration/workflows/chatbot-api.spec.ts @@ -0,0 +1,124 @@ +import dotenv from "dotenv"; +dotenv.config(); // Load .env so LLM_MODEL and OPENAI_API_KEY are available + +import { describe, it, beforeEach, afterEach } from "mocha"; +import { expect } from "chai"; +import { startTestServer } from "../setup/test-server"; +import { createTestToken } from "../setup/test-helpers"; + +/** + * Chatbot API Integration Test + * + * Tests that the LLM integration works end-to-end by: + * 1. Starting the test server on a real port + * 2. Sending a simple math question via the /llm API + * 3. Verifying the response contains the correct answer + */ +describe("Chatbot API - LLM Integration", () => { + let testServer: Awaited>; + let accessToken: string; + + beforeEach(async function () { + this.timeout(30000); + testServer = await startTestServer(); + accessToken = await createTestToken(testServer, ["llm"]); + }); + + afterEach(async () => { + if (testServer?.cleanup) { + await testServer.cleanup(); + } + }); + + it("should have required environment variables", function () { + expect( + process.env.LLM_MODEL, + "LLM_MODEL env var must be set (check .env locally or job env in CI)", + ).to.be.a("string").and.not.be.empty; + + expect( + process.env.OPENAI_API_KEY, + "OPENAI_API_KEY env var must be set", + ).to.be.a("string").and.not.be.empty; + + console.log(`LLM_MODEL = ${process.env.LLM_MODEL}`); + console.log(`OPENAI_API_KEY is set: ${!!process.env.OPENAI_API_KEY}`); + }); + + it("should answer a simple math question (3+7=10) via the /llm API", async function () { + this.timeout(60000); // 60s timeout for OpenAI API call + + const response = await fetch(`${testServer.url}/llm`, { + method: "POST", + headers: { + Authorization: `Bearer ${accessToken}`, + Accept: "text/event-stream", + "Content-Type": "application/json", + }, + body: JSON.stringify({ + messages: [ + { + role: "user", + content: "What is 3+7? Reply with just the number, nothing else.", + }, + ], + }), + }); + + expect(response.status).to.equal(200); + + const sseText = await response.text(); + console.log(`Raw SSE response:\n${sseText}`); + + // Parse SSE events (format: "event: \ndata: \n\n") + const lines = sseText.split("\n"); + let fullContent = ""; + let sseErrors: string[] = []; + + let currentEvent = ""; + for (const line of lines) { + if (line.startsWith("event: ")) { + currentEvent = line.substring(7).trim(); + } else if (line.startsWith("data: ")) { + try { + const data = JSON.parse(line.substring(6)); + if (currentEvent === "error" && data.message) { + sseErrors.push(data.message); + } + if (currentEvent === "delta" && data.content) { + fullContent += data.content; + } + } catch { + // Skip non-JSON data lines + } + } + } + + // If we got SSE errors, fail with a clear message + if (sseErrors.length > 0) { + throw new Error(`LLM returned errors: ${sseErrors.join("; ")}`); + } + + console.log(`LLM Response: "${fullContent.trim()}"`); + + // The response should contain "10" + expect(fullContent).to.include("10"); + }); + + it("should reject unauthenticated requests", async function () { + this.timeout(15000); + + const response = await fetch(`${testServer.url}/llm`, { + method: "POST", + headers: { + Accept: "text/event-stream", + "Content-Type": "application/json", + }, + body: JSON.stringify({ + messages: [{ role: "user", content: "Hello" }], + }), + }); + + expect(response.status).to.equal(401); + }); +}); diff --git a/package-lock.json b/package-lock.json index f36ecf6..150c7fe 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8628,6 +8628,16 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/sparqljs": { + "version": "3.1.12", + "resolved": "https://registry.npmjs.org/@types/sparqljs/-/sparqljs-3.1.12.tgz", + "integrity": "sha512-zg/sdKKtYI0845wKPSuSgunyU1o/+7tRzMw85lHsf4p/0UbA6+65MXAyEtv1nkaqSqrq/bXm7+bqXas+Xo5dpQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@rdfjs/types": ">=1.0.0" + } + }, "node_modules/@types/stack-utils": { "version": "2.0.3", "license": "MIT" @@ -21826,6 +21836,24 @@ "node": ">=18" } }, + "node_modules/rdf-data-factory": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/rdf-data-factory/-/rdf-data-factory-1.1.3.tgz", + "integrity": "sha512-ny6CI7m2bq4lfQQmDYvcb2l1F9KtGwz9chipX4oWu2aAtVoXjb7k3d8J1EsgAsEbMXnBipB/iuRen5H2fwRWWQ==", + "license": "MIT", + "dependencies": { + "@rdfjs/types": "^1.0.0" + } + }, + "node_modules/rdf-data-factory/node_modules/@rdfjs/types": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@rdfjs/types/-/types-1.1.2.tgz", + "integrity": "sha512-wqpOJK1QCbmsGNtyzYnojPU8gRDPid2JO0Q0kMtb4j65xhCK880cnKAfEOwC+dX85VJcCByQx5zOwyyfCjDJsg==", + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/react": { "version": "19.0.0", "license": "MIT", @@ -23775,6 +23803,21 @@ "source-map": "^0.6.0" } }, + "node_modules/sparqljs": { + "version": "3.7.3", + "resolved": "https://registry.npmjs.org/sparqljs/-/sparqljs-3.7.3.tgz", + "integrity": "sha512-FQfHUhfwn5PD9WH6xPU7DhFfXMgqK/XoDrYDVxz/grhw66Il0OjRg3JBgwuEvwHnQt7oSTiKWEiCZCPNaUbqgg==", + "license": "MIT", + "dependencies": { + "rdf-data-factory": "^1.1.2" + }, + "bin": { + "sparqljs": "bin/sparql-to-json" + }, + "engines": { + "node": "^12.22.0 || ^14.17.0 || >=16.0.0" + } + }, "node_modules/spawn-wrap": { "version": "2.0.0", "dev": true, @@ -27890,12 +27933,14 @@ "dependencies": { "@dkg/plugin-swagger": "^0.0.2", "@dkg/plugins": "^0.0.2", - "busboy": "^1.6.0" + "busboy": "^1.6.0", + "sparqljs": "^3.7.3" }, "devDependencies": { "@dkg/eslint-config": "*", "@dkg/typescript-config": "*", "@types/busboy": "^1.5.4", + "@types/sparqljs": "^3.1.12", "tsup": "^8.5.0" } },