Conversation
…state management and updating callback signatures. Move responsibility for saving final chat message from frontend client to backend
There was a problem hiding this comment.
Pull request overview
Implements an initial end-to-end chat system with Clerk authentication, a FastAPI streaming backend (including MCP tool execution), and Convex-backed persistence so assistant messages are saved server-side after streaming completes.
Changes:
- Added FastAPI
/api/chat/streamSSE endpoint with OpenRouter streaming + iterative MCP tool-call loop and Convex persistence. - Introduced Clerk JWT verification dependency in FastAPI and shared
httpx.AsyncClientlifecycle management. - Updated web chat UI to support concurrent per-conversation streams, tool call indicators, and backend-owned final message persistence.
Reviewed changes
Copilot reviewed 20 out of 24 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/fastapi/requirements.txt | Adds SSE + JWT crypto dependencies for streaming and Clerk token verification. |
| packages/fastapi/app/services/openrouter.py | Streams OpenRouter chat completions and yields parsed SSE chunks. |
| packages/fastapi/app/services/mcp_client.py | Lists/calls MCP tools via Junction Engine. |
| packages/fastapi/app/services/convex.py | Persists assistant messages to Convex via deploy key/internal mutation. |
| packages/fastapi/app/routes/chat.py | New SSE chat streaming endpoint with agentic tool loop and persistence. |
| packages/fastapi/app/models.py | Adds Pydantic request models for chat streaming payload. |
| packages/fastapi/app/main.py | Adds lifespan-managed shared httpx client, logging, CORS origins, and chat router. |
| packages/fastapi/app/dependencies.py | Adds shared HTTP client + authenticated user dependencies. |
| packages/fastapi/app/config.py | Tightens startup config validation and introduces OpenRouter model mapping. |
| packages/fastapi/app/auth.py | Adds Clerk JWT verification via JWKS lookup. |
| packages/convex-backend/convex/messages.ts | Adds internal mutation for backend-only assistant message persistence. |
| bun.lock | Lockfile updates reflecting removed Nitro and added deps. |
| apps/web/vite.config.ts | Removes Nitro plugin and adjusts dependency optimization. |
| apps/web/src/routes/chat/index.tsx | Adds concurrent streaming UI state, tool call indicators, and backend-synced message handling. |
| apps/web/src/routes/__root.tsx | Styles Clerk UI via appearance customization. |
| apps/web/src/lib/use-chat-stream.ts | Adds streaming hook that consumes SSE from FastAPI with per-conversation abort support. |
| apps/web/src/env.ts | Adds VITE_FASTAPI_URL env var. |
| apps/web/package.json | Removes Nitro and adds use-sync-external-store. |
| apps/web/.env.example | Documents VITE_FASTAPI_URL for local dev. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| }, | ||
| handler: async (ctx, args) => { | ||
| const convo = await ctx.db.get(args.conversationId); | ||
| if (!convo) throw new Error("Conversation not found"); |
There was a problem hiding this comment.
saveAssistantMessage is callable with a deploy key and currently only checks that the conversation exists. This means any backend caller can write assistant messages into any conversation ID (cross-user tampering) if they can hit the FastAPI endpoint. Include the authenticated userId in the args and enforce convo.userId === userId (or otherwise verify ownership) before inserting/patching.
| }, | |
| handler: async (ctx, args) => { | |
| const convo = await ctx.db.get(args.conversationId); | |
| if (!convo) throw new Error("Conversation not found"); | |
| userId: v.string(), | |
| }, | |
| handler: async (ctx, args) => { | |
| const convo = await ctx.db.get(args.conversationId); | |
| if (!convo || convo.userId !== args.userId) { | |
| throw new Error("Conversation not found"); | |
| } |
| json={ | ||
| "path": "messages:saveAssistantMessage", | ||
| "args": { | ||
| "conversationId": conversation_id, | ||
| "content": content, | ||
| }, | ||
| "format": "json", |
There was a problem hiding this comment.
save_assistant_message sends only conversationId + content to Convex. To prevent cross-user writes (especially since this uses a deploy key), include the authenticated userId in the mutation args and have the internal mutation validate the conversation belongs to that user before inserting.
| // Abort any existing stream for this conversation | ||
| abortControllers.current.get(convoId)?.abort(); | ||
|
|
||
| const controller = new AbortController(); | ||
| abortControllers.current.set(convoId, controller); | ||
| setStreamingConvoIds((prev) => new Set(prev).add(convoId)); |
There was a problem hiding this comment.
useChatStream.stream aborts any existing stream for the same conversation and immediately starts a new one, but the old stream’s finally block unconditionally deletes abortControllers.current[convoId] and removes convoId from streamingConvoIds. If a new stream has already replaced the controller, the old finally can delete the new controller and clear the streaming flag while the new stream is still running. Track a per-stream token/controller and only clean up if it still matches the latest controller for that convoId.
| abortControllers.current.delete(convoId); | ||
| setStreamingConvoIds((prev) => { | ||
| const next = new Set(prev); | ||
| next.delete(convoId); | ||
| return next; | ||
| }); |
There was a problem hiding this comment.
Related to the per-conversation stream replacement: the finally cleanup unconditionally runs abortControllers.current.delete(convoId) and removes the convoId from streamingConvoIds, which can race with a newly-started stream for the same convo. Gate this cleanup on abortControllers.current.get(convoId) === controller (or similar) to avoid clearing the new stream's state.
| abortControllers.current.delete(convoId); | |
| setStreamingConvoIds((prev) => { | |
| const next = new Set(prev); | |
| next.delete(convoId); | |
| return next; | |
| }); | |
| const currentController = abortControllers.current.get(convoId); | |
| if (currentController === controller) { | |
| abortControllers.current.delete(convoId); | |
| setStreamingConvoIds((prev) => { | |
| const next = new Set(prev); | |
| next.delete(convoId); | |
| return next; | |
| }); | |
| } |
| unverified_claims = jwt.decode(token, options={"verify_signature": False}) | ||
| issuer = unverified_claims.get("iss", "") | ||
| if not issuer: | ||
| raise HTTPException(status_code=401, detail="Token missing issuer") | ||
|
|
There was a problem hiding this comment.
JWT verification trusts the unverified iss claim from the incoming token to build the JWKS URL. This allows an attacker to supply their own issuer, host their own JWKS, and have the backend accept forged tokens (and can also become an SSRF vector). Store the expected Clerk issuer in config and reject tokens whose iss does not exactly match it (and build the JWKS URL from the configured issuer, not the token).
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Summary
What's included
Frontend (apps/web)
streamStatesRecord keyed by conversation ID)useChatStreamhook supporting concurrent streams with per-conversation abort controllersBackend (packages/fastapi)
/api/chat/streamSSE endpoint with agentic loop (up to 10 tool iterations)Database (packages/convex-backend)
internalMutationfor backend-only assistant message savescheckpoint chat implementation, implementation is polished to the point that multiple conversations can happen on a single tab at once and all be streamed in correctly. Crucially the final message is now delivered by the backend to convex via http action, rather than from the client. Redis Streams is to be revisited as a QoL thing in the future.