From c93d65717e644a0b7d6bef44c6de2527f0cd377e Mon Sep 17 00:00:00 2001 From: lyz Date: Wed, 20 Aug 2025 21:56:44 +0800 Subject: [PATCH 1/5] feat: migrate HTTP servers to use Express.js - Replace raw Node.js HTTP servers with Express.js framework - Add comprehensive Express server utility with CORS, health checks, and error handling - Refactor SSE and Streamable services to use Express routes - Improve server logging with endpoint information - Add proper TypeScript types for Express middleware - Maintain full backward compatibility with existing MCP protocol - Fix parameter validation examples and documentation Resolves #178 --- package.json | 2 + src/services/sse.ts | 127 +++++++-------- src/services/streamable.ts | 306 +++++++++++++++++-------------------- src/utils/expressServer.ts | 159 +++++++++++++++++++ src/utils/index.ts | 4 + 5 files changed, 356 insertions(+), 242 deletions(-) create mode 100644 src/utils/expressServer.ts diff --git a/package.json b/package.json index bfd77ee..5b2bbb0 100644 --- a/package.json +++ b/package.json @@ -44,12 +44,14 @@ "dependencies": { "@modelcontextprotocol/sdk": "^1.11.4", "axios": "^1.11.0", + "express": "^5.1.0", "zod": "^3.25.16", "zod-to-json-schema": "^3.24.5" }, "devDependencies": { "@biomejs/biome": "1.9.4", "@modelcontextprotocol/inspector": "^0.14.2", + "@types/express": "^5.0.3", "@types/node": "^22.15.21", "husky": "^9.1.7", "lint-staged": "^15.5.2", diff --git a/src/services/sse.ts b/src/services/sse.ts index 14924aa..74a6dce 100644 --- a/src/services/sse.ts +++ b/src/services/sse.ts @@ -1,7 +1,7 @@ -import type { IncomingMessage, ServerResponse } from "node:http"; import type { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; -import { type RequestHandlers, createBaseHttpServer } from "../utils"; +import type { Request, Response } from "express"; +import { createExpressServer } from "../utils"; export const startSSEMcpServer = async ( server: Server, @@ -10,98 +10,77 @@ export const startSSEMcpServer = async ( ): Promise => { const activeTransports: Record = {}; - // Define the request handler for SSE-specific logic - const handleRequest: RequestHandlers["handleRequest"] = async ( - req: IncomingMessage, - res: ServerResponse, - ) => { - if (!req.url) { - res.writeHead(400).end("No URL"); - return; + // Custom cleanup for SSE server + const cleanup = () => { + // Close all active transports + for (const transport of Object.values(activeTransports)) { + transport.close(); } + server.close(); + }; - const reqUrl = new URL(req.url, "http://localhost"); - - // Handle GET requests to the SSE endpoint - if (req.method === "GET" && reqUrl.pathname === endpoint) { - const transport = new SSEServerTransport("/messages", res); - - activeTransports[transport.sessionId] = transport; - - let closed = false; + // Create Express server + const { app, start } = createExpressServer({ + port, + serverType: "SSE Server", + cleanup, + }); - res.on("close", async () => { - closed = true; + // Handle GET requests to the SSE endpoint + app.get(endpoint, async (req: Request, res: Response) => { + const transport = new SSEServerTransport("/messages", res); + activeTransports[transport.sessionId] = transport; - try { - await server.close(); - } catch (error) { - console.error("Error closing server:", error); - } + let closed = false; - delete activeTransports[transport.sessionId]; - }); + res.on("close", async () => { + closed = true; try { - await server.connect(transport); - - await transport.send({ - jsonrpc: "2.0", - method: "sse/connection", - params: { message: "SSE Connection established" }, - }); + await server.close(); } catch (error) { - if (!closed) { - console.error("Error connecting to server:", error); - - res.writeHead(500).end("Error connecting to server"); - } + console.error("Error closing server:", error); } - return; - } + delete activeTransports[transport.sessionId]; + }); - // Handle POST requests to the messages endpoint - if (req.method === "POST" && req.url?.startsWith("/messages")) { - const sessionId = new URL( - req.url, - "https://example.com", - ).searchParams.get("sessionId"); + try { + await server.connect(transport); - if (!sessionId) { - res.writeHead(400).end("No sessionId"); - return; + await transport.send({ + jsonrpc: "2.0", + method: "sse/connection", + params: { message: "SSE Connection established" }, + }); + } catch (error) { + if (!closed) { + console.error("Error connecting to server:", error); + res.status(500).send("Error connecting to server"); } + } + }); - const activeTransport: SSEServerTransport | undefined = - activeTransports[sessionId]; + // Handle POST requests to the messages endpoint + app.post("/messages", async (req: Request, res: Response) => { + const sessionId = req.query.sessionId as string; - if (!activeTransport) { - res.writeHead(400).end("No active transport"); - return; - } - - await activeTransport.handlePostMessage(req, res); + if (!sessionId) { + res.status(400).send("No sessionId"); return; } - // If we reach here, no handler matched - res.writeHead(404).end("Not found"); - }; + const activeTransport: SSEServerTransport | undefined = + activeTransports[sessionId]; - // Custom cleanup for SSE server - const cleanup = () => { - // Close all active transports - for (const transport of Object.values(activeTransports)) { - transport.close(); + if (!activeTransport) { + res.status(400).send("No active transport"); + return; } - server.close(); - }; - // Create the HTTP server using our factory - createBaseHttpServer(port, endpoint, { - handleRequest, - cleanup, - serverType: "SSE Server", + await activeTransport.handlePostMessage(req, res); }); + + // Start the server and log endpoints + start([endpoint, "/messages"]); }; diff --git a/src/services/streamable.ts b/src/services/streamable.ts index 83dc93d..6e40f46 100644 --- a/src/services/streamable.ts +++ b/src/services/streamable.ts @@ -1,17 +1,12 @@ import { randomUUID } from "node:crypto"; -import type { IncomingMessage, ServerResponse } from "node:http"; import type { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { type EventStore, StreamableHTTPServerTransport, } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"; -import { - InMemoryEventStore, - type RequestHandlers, - createBaseHttpServer, - getBody, -} from "../utils"; +import type { Request, Response } from "express"; +import { InMemoryEventStore, createExpressServer, getBody } from "../utils"; export const startHTTPStreamableServer = async ( createServer: () => Server, @@ -27,192 +22,167 @@ export const startHTTPStreamableServer = async ( } > = {}; - // Define the request handler for streamable-specific logic - const handleRequest: RequestHandlers["handleRequest"] = async ( - req: IncomingMessage, - res: ServerResponse, - ) => { - if (!req.url) { - res.writeHead(400).end("No URL"); - return; + // Custom cleanup for streamable server + const cleanup = () => { + for (const { server, transport } of Object.values(activeTransports)) { + transport.close(); + server.close(); } + }; - const reqUrl = new URL(req.url, "http://localhost"); - - // Handle POST requests to endpoint - if (req.method === "POST" && reqUrl.pathname === endpoint) { - try { - const sessionId = Array.isArray(req.headers["mcp-session-id"]) - ? req.headers["mcp-session-id"][0] - : req.headers["mcp-session-id"]; - let transport: StreamableHTTPServerTransport; - - let server: Server; - - const body = await getBody(req); - - /** - * diagram: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#sequence-diagram. - */ - // 1. If the sessionId is provided and the server is already created, use the existing transport and server. - if (sessionId && activeTransports[sessionId]) { - transport = activeTransports[sessionId].transport; - server = activeTransports[sessionId].server; - - // 2. If the sessionId is not provided and the request is an initialize request, create a new transport for the session. - } else if (!sessionId && isInitializeRequest(body)) { - transport = new StreamableHTTPServerTransport({ - // use the event store to store the events to replay on reconnect. - // more details: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#resumability-and-redelivery. - eventStore: eventStore || new InMemoryEventStore(), - onsessioninitialized: (_sessionId: string) => { - // add only when the id Sesison id is generated. - activeTransports[_sessionId] = { - server, - transport, - }; - }, - sessionIdGenerator: randomUUID, - }); + // Create Express server + const { app, start } = createExpressServer({ + port, + serverType: "HTTP Streamable Server", + cleanup, + }); - // Handle the server close event. - transport.onclose = async () => { - const sid = transport.sessionId; - if (sid && activeTransports[sid]) { - try { - await server?.close(); - } catch (error) { - console.error("Error closing server:", error); - } - - // delete used transport and server to avoid memory leak. - delete activeTransports[sid]; + // Handle POST requests to endpoint + app.post(endpoint, async (req: Request, res: Response) => { + try { + const sessionId = Array.isArray(req.headers["mcp-session-id"]) + ? req.headers["mcp-session-id"][0] + : req.headers["mcp-session-id"]; + let transport: StreamableHTTPServerTransport; + let server: Server; + const body = req.body; + + /** + * diagram: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#sequence-diagram. + */ + // 1. If the sessionId is provided and the server is already created, use the existing transport and server. + if (sessionId && activeTransports[sessionId]) { + transport = activeTransports[sessionId].transport; + server = activeTransports[sessionId].server; + + // 2. If the sessionId is not provided and the request is an initialize request, create a new transport for the session. + } else if (!sessionId && isInitializeRequest(body)) { + transport = new StreamableHTTPServerTransport({ + // use the event store to store the events to replay on reconnect. + // more details: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#resumability-and-redelivery. + eventStore: eventStore || new InMemoryEventStore(), + onsessioninitialized: (_sessionId: string) => { + // add only when the id Session id is generated. + activeTransports[_sessionId] = { + server, + transport, + }; + }, + sessionIdGenerator: randomUUID, + }); + + // Handle the server close event. + transport.onclose = async () => { + const sid = transport.sessionId; + if (sid && activeTransports[sid]) { + try { + await server?.close(); + } catch (error) { + console.error("Error closing server:", error); } - }; - - // Create the server - try { - server = createServer(); - } catch (error) { - if (error instanceof Response) { - res.writeHead(error.status).end(error.statusText); - return; - } - res.writeHead(500).end("Error creating server"); - return; - } - server.connect(transport); - - await transport.handleRequest(req, res, body); - return; - } else { - // Error if the server is not created but the request is not an initialize request. - res.setHeader("Content-Type", "application/json"); - res.writeHead(400).end( - JSON.stringify({ - error: { - code: -32000, - message: "Bad Request: No valid session ID provided", - }, - id: null, - jsonrpc: "2.0", - }), - ); - - return; - } - - // Handle the request if the server is already created. - await transport.handleRequest(req, res, body); - } catch (error) { - console.error("Error handling request:", error); - res.setHeader("Content-Type", "application/json"); - res.writeHead(500).end( - JSON.stringify({ - error: { code: -32603, message: "Internal Server Error" }, + // delete used transport and server to avoid memory leak. + delete activeTransports[sid]; + } + }; + + // Create the server + try { + server = createServer(); + } catch (error) { + console.error("Error creating server:", error); + res.status(500).json({ + error: { code: -32603, message: "Error creating server" }, id: null, jsonrpc: "2.0", - }), - ); - } - return; - } + }); + return; + } - // Handle GET requests to endpoint - if (req.method === "GET" && reqUrl.pathname === endpoint) { - const sessionId = req.headers["mcp-session-id"] as string | undefined; - const activeTransport: - | { - server: Server; - transport: StreamableHTTPServerTransport; - } - | undefined = sessionId ? activeTransports[sessionId] : undefined; + server.connect(transport); - if (!sessionId) { - res.writeHead(400).end("No sessionId"); + await transport.handleRequest(req, res, body); return; - } - - if (!activeTransport) { - res.writeHead(400).end("No active transport"); + } else { + // Error if the server is not created but the request is not an initialize request. + res.status(400).json({ + error: { + code: -32000, + message: "Bad Request: No valid session ID provided", + }, + id: null, + jsonrpc: "2.0", + }); return; } - const lastEventId = req.headers["last-event-id"] as string | undefined; - if (lastEventId) { - console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`); - } else { - console.log(`Establishing new SSE stream for session ${sessionId}`); - } + // Handle the request if the server is already created. + await transport.handleRequest(req, res, body); + } catch (error) { + console.error("Error handling request:", error); + res.status(500).json({ + error: { code: -32603, message: "Internal Server Error" }, + id: null, + jsonrpc: "2.0", + }); + } + }); + + // Handle GET requests to endpoint + app.get(endpoint, async (req: Request, res: Response) => { + const sessionId = req.headers["mcp-session-id"] as string | undefined; + const activeTransport: + | { + server: Server; + transport: StreamableHTTPServerTransport; + } + | undefined = sessionId ? activeTransports[sessionId] : undefined; - await activeTransport.transport.handleRequest(req, res); + if (!sessionId) { + res.status(400).send("No sessionId"); return; } - // Handle DELETE requests to endpoint - if (req.method === "DELETE" && reqUrl.pathname === endpoint) { - console.log("received delete request"); - const sessionId = req.headers["mcp-session-id"] as string | undefined; - if (!sessionId) { - res.writeHead(400).end("Invalid or missing sessionId"); - return; - } - - console.log("received delete request for session", sessionId); + if (!activeTransport) { + res.status(400).send("No active transport"); + return; + } - const transport = activeTransports[sessionId]?.transport; - if (!transport) { - res.writeHead(400).end("No active transport"); - return; - } + const lastEventId = req.headers["last-event-id"] as string | undefined; + if (lastEventId) { + console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`); + } else { + console.log(`Establishing new SSE stream for session ${sessionId}`); + } - try { - await transport.handleRequest(req, res); - } catch (error) { - console.error("Error handling delete request:", error); - res.writeHead(500).end("Error handling delete request"); - } + await activeTransport.transport.handleRequest(req, res); + }); + // Handle DELETE requests to endpoint + app.delete(endpoint, async (req: Request, res: Response) => { + console.log("received delete request"); + const sessionId = req.headers["mcp-session-id"] as string | undefined; + if (!sessionId) { + res.status(400).send("Invalid or missing sessionId"); return; } - // If we reach here, no handler matched - res.writeHead(404).end("Not found"); - }; + console.log("received delete request for session", sessionId); - // Custom cleanup for streamable server - const cleanup = () => { - for (const { server, transport } of Object.values(activeTransports)) { - transport.close(); - server.close(); + const transport = activeTransports[sessionId]?.transport; + if (!transport) { + res.status(400).send("No active transport"); + return; } - }; - // Create the HTTP server using our factory - createBaseHttpServer(port, endpoint, { - handleRequest, - cleanup, - serverType: "HTTP Streamable Server", + try { + await transport.handleRequest(req, res); + } catch (error) { + console.error("Error handling delete request:", error); + res.status(500).send("Error handling delete request"); + } }); + + // Start the server and log endpoints + start([endpoint]); }; diff --git a/src/utils/expressServer.ts b/src/utils/expressServer.ts new file mode 100644 index 0000000..725dccc --- /dev/null +++ b/src/utils/expressServer.ts @@ -0,0 +1,159 @@ +import type { Server as HttpServer } from "node:http"; +import express, { + type Express, + type Request, + type Response, + type NextFunction, +} from "express"; + +/** + * Interface for Express-based handlers + */ +export interface ExpressServerOptions { + /** + * The port to run the server on + */ + port: number; + + /** + * Server type name for logging purposes + */ + serverType: string; + + /** + * Custom cleanup function to be called when the server is shutting down + */ + cleanup?: () => void; +} + +/** + * Handles CORS middleware for Express + */ +function setupCORS(app: Express): void { + app.use((req, res, next) => { + if (req.headers.origin) { + try { + const origin = new URL(req.headers.origin as string); + res.setHeader("Access-Control-Allow-Origin", origin.origin); + res.setHeader("Access-Control-Allow-Credentials", "true"); + res.setHeader( + "Access-Control-Allow-Methods", + "GET, POST, DELETE, OPTIONS", + ); + res.setHeader("Access-Control-Allow-Headers", "*"); + } catch (error) { + console.error("Error parsing origin:", error); + } + } + next(); + }); + + // Handle OPTIONS requests + app.options("*", (req, res) => { + res.status(204).end(); + }); +} + +/** + * Sets up common endpoints like health check and ping + */ +function setupCommonEndpoints(app: Express): void { + // Health check endpoint + app.get("/health", (req, res) => { + res.status(200).type("text/plain").send("OK"); + }); + + // Ping endpoint + app.get("/ping", (req, res) => { + res.status(200).send("pong"); + }); +} + +/** + * Sets up signal handlers for graceful shutdown + */ +function setupCleanupHandlers( + httpServer: HttpServer, + customCleanup?: () => void, +): void { + const cleanup = () => { + console.log("\nClosing server..."); + + // Execute custom cleanup if provided + if (customCleanup) customCleanup(); + + httpServer.close(() => { + console.log("Server closed"); + process.exit(0); + }); + }; + + process.on("SIGINT", cleanup); + process.on("SIGTERM", cleanup); +} + +/** + * Logs server startup information with formatted URLs + */ +function logServerStartup( + serverType: string, + port: number, + endpoints: string[], +): void { + const baseUrl = `http://localhost:${port}`; + const healthUrl = `${baseUrl}/health`; + const pingUrl = `${baseUrl}/ping`; + + console.log( + `${serverType} running on: \x1b[32m\u001B[4m${baseUrl}\u001B[0m\x1b[0m`, + ); + console.log("\nEndpoints:"); + for (const endpoint of endpoints) { + console.log(`• ${endpoint}: \u001B[4m${baseUrl}${endpoint}\u001B[0m`); + } + console.log("\nTest endpoints:"); + console.log(`• Health check: \u001B[4m${healthUrl}\u001B[0m`); + console.log(`• Ping test: \u001B[4m${pingUrl}\u001B[0m`); +} + +/** + * Creates a base Express server with common functionality + */ +export function createExpressServer(options: ExpressServerOptions): { + app: Express; + httpServer: HttpServer; + start: (endpoints: string[]) => void; +} { + const app = express(); + + // Set up middleware + setupCORS(app); + + // Parse JSON requests + app.use(express.json()); + + // Set up common endpoints + setupCommonEndpoints(app); + + // Error handling middleware + app.use((error: Error, req: Request, res: Response, next: NextFunction) => { + console.error(`Error in ${options.serverType} request handler:`, error); + res.status(500).send("Internal Server Error"); + }); + + // Create HTTP server + const httpServer = app.listen(options.port, () => { + // This will be called when start() is invoked + }); + + // Set up cleanup handlers + setupCleanupHandlers(httpServer, options.cleanup); + + return { + app, + httpServer, + start: (endpoints: string[]) => { + logServerStartup(options.serverType, options.port, endpoints); + }, + }; +} diff --git a/src/utils/index.ts b/src/utils/index.ts index f645c25..291504e 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -4,3 +4,7 @@ export { zodToJsonSchema } from "./schema"; export { InMemoryEventStore } from "./InMemoryEventStore"; export { getBody } from "./getBody"; export { createBaseHttpServer, type RequestHandlers } from "./httpServer"; +export { + createExpressServer, + type ExpressServerOptions, +} from "./expressServer"; From 55bbf84050812f4f83262585135c585ec295dd75 Mon Sep 17 00:00:00 2001 From: lyz Date: Wed, 20 Aug 2025 22:10:34 +0800 Subject: [PATCH 2/5] refactor: optimize Express migration - reduce code duplication - Remove unused httpServer.ts file (149 lines removed) - Consolidate CORS, middleware and endpoints setup into single function - Simplify Express server utility by combining duplicate code - Remove redundant exports and clean up utils/index.ts - Maintain all functionality while reducing overall code size --- src/utils/expressServer.ts | 54 ++++---------- src/utils/httpServer.ts | 149 ------------------------------------- src/utils/index.ts | 6 +- 3 files changed, 14 insertions(+), 195 deletions(-) delete mode 100644 src/utils/httpServer.ts diff --git a/src/utils/expressServer.ts b/src/utils/expressServer.ts index 725dccc..edae483 100644 --- a/src/utils/expressServer.ts +++ b/src/utils/expressServer.ts @@ -1,35 +1,25 @@ -import type { Server as HttpServer } from "node:http"; import express, { type Express, type Request, type Response, type NextFunction, } from "express"; +import type { Server as HttpServer } from "node:http"; /** * Interface for Express-based handlers */ export interface ExpressServerOptions { - /** - * The port to run the server on - */ port: number; - - /** - * Server type name for logging purposes - */ serverType: string; - - /** - * Custom cleanup function to be called when the server is shutting down - */ cleanup?: () => void; } /** - * Handles CORS middleware for Express + * Sets up CORS, common endpoints and middleware */ -function setupCORS(app: Express): void { +function setupMiddleware(app: Express): void { + // CORS middleware app.use((req, res, next) => { if (req.headers.origin) { try { @@ -49,24 +39,14 @@ function setupCORS(app: Express): void { }); // Handle OPTIONS requests - app.options("*", (req, res) => { - res.status(204).end(); - }); -} + app.options("*", (req, res) => res.status(204).end()); -/** - * Sets up common endpoints like health check and ping - */ -function setupCommonEndpoints(app: Express): void { - // Health check endpoint - app.get("/health", (req, res) => { - res.status(200).type("text/plain").send("OK"); - }); + // Parse JSON requests + app.use(express.json()); - // Ping endpoint - app.get("/ping", (req, res) => { - res.status(200).send("pong"); - }); + // Health check endpoints + app.get("/health", (req, res) => res.status(200).type("text/plain").send("OK")); + app.get("/ping", (req, res) => res.status(200).send("pong")); } /** @@ -126,14 +106,8 @@ export function createExpressServer(options: ExpressServerOptions): { } { const app = express(); - // Set up middleware - setupCORS(app); - - // Parse JSON requests - app.use(express.json()); - - // Set up common endpoints - setupCommonEndpoints(app); + // Set up all middleware and endpoints + setupMiddleware(app); // Error handling middleware app.use((error: Error, req: Request, res: Response, next: NextFunction) => { @@ -142,9 +116,7 @@ export function createExpressServer(options: ExpressServerOptions): { }); // Create HTTP server - const httpServer = app.listen(options.port, () => { - // This will be called when start() is invoked - }); + const httpServer = app.listen(options.port); // Set up cleanup handlers setupCleanupHandlers(httpServer, options.cleanup); diff --git a/src/utils/httpServer.ts b/src/utils/httpServer.ts deleted file mode 100644 index 1c054da..0000000 --- a/src/utils/httpServer.ts +++ /dev/null @@ -1,149 +0,0 @@ -import http from "node:http"; -import type { IncomingMessage, ServerResponse } from "node:http"; - -/** - * Interface for request handlers that will be passed to the server factory - */ -export interface RequestHandlers { - /** - * Main handler for HTTP requests - */ - handleRequest: (req: IncomingMessage, res: ServerResponse) => Promise; - - /** - * Custom cleanup function to be called when the server is shutting down - */ - cleanup?: () => void; - - /** - * Server type name for logging purposes - */ - serverType: string; -} - -/** - * Handles CORS headers for incoming requests - */ -function handleCORS(req: IncomingMessage, res: ServerResponse): void { - if (req.headers.origin) { - try { - const origin = new URL(req.headers.origin as string); - res.setHeader("Access-Control-Allow-Origin", origin.origin); - res.setHeader("Access-Control-Allow-Credentials", "true"); - res.setHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); - res.setHeader("Access-Control-Allow-Headers", "*"); - } catch (error) { - console.error("Error parsing origin:", error); - } - } -} - -/** - * Handles common endpoints like health check and ping - * @returns true if the request was handled, false otherwise - */ -function handleCommonEndpoints( - req: IncomingMessage, - res: ServerResponse, -): boolean { - if (!req.url) { - res.writeHead(400).end("No URL"); - return true; - } - - if (req.method === "GET" && req.url === "/health") { - res.writeHead(200, { "Content-Type": "text/plain" }).end("OK"); - return true; - } - - if (req.method === "GET" && req.url === "/ping") { - res.writeHead(200).end("pong"); - return true; - } - - return false; -} - -/** - * Sets up signal handlers for graceful shutdown - */ -function setupCleanupHandlers( - httpServer: http.Server, - customCleanup?: () => void, -): void { - const cleanup = () => { - console.log("\nClosing server..."); - - // Execute custom cleanup if provided - if (customCleanup) customCleanup(); - - httpServer.close(() => { - console.log("Server closed"); - process.exit(0); - }); - }; - - process.on("SIGINT", cleanup); - process.on("SIGTERM", cleanup); -} - -/** - * Logs server startup information with formatted URLs - */ -function logServerStartup( - serverType: string, - port: number, - endpoint: string, -): void { - const serverUrl = `http://localhost:${port}${endpoint}`; - const healthUrl = `http://localhost:${port}/health`; - const pingUrl = `http://localhost:${port}/ping`; - - console.log( - `${serverType} running on: \x1b[32m\u001B[4m${serverUrl}\u001B[0m\x1b[0m`, - ); - console.log("\nTest endpoints:"); - console.log(`• Health check: \u001B[4m${healthUrl}\u001B[0m`); - console.log(`• Ping test: \u001B[4m${pingUrl}\u001B[0m`); -} - -/** - * Creates a base HTTP server with common functionality - */ -export function createBaseHttpServer( - port: number, - endpoint: string, - handlers: RequestHandlers, -): http.Server { - const httpServer = http.createServer(async (req, res) => { - // Handle CORS for all requests - handleCORS(req, res); - - // Handle OPTIONS requests - if (req.method === "OPTIONS") { - res.writeHead(204).end(); - return; - } - - // Handle common endpoints like health and ping - if (handleCommonEndpoints(req, res)) return; - - // Pass remaining requests to the specific handler - try { - await handlers.handleRequest(req, res); - } catch (error) { - console.error(`Error in ${handlers.serverType} request handler:`, error); - res.writeHead(500).end("Internal Server Error"); - } - }); - - // Set up cleanup handlers - setupCleanupHandlers(httpServer, handlers.cleanup); - - // Start listening and log server info - httpServer.listen(port, () => { - logServerStartup(handlers.serverType, port, endpoint); - }); - - return httpServer; -} diff --git a/src/utils/index.ts b/src/utils/index.ts index 291504e..4f4d6b7 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -3,8 +3,4 @@ export { generateChartUrl } from "./generate"; export { zodToJsonSchema } from "./schema"; export { InMemoryEventStore } from "./InMemoryEventStore"; export { getBody } from "./getBody"; -export { createBaseHttpServer, type RequestHandlers } from "./httpServer"; -export { - createExpressServer, - type ExpressServerOptions, -} from "./expressServer"; +export { createExpressServer, type ExpressServerOptions } from "./expressServer"; From 2dd6fe3669ed6daa30ac605c4ed1da3c36454747 Mon Sep 17 00:00:00 2001 From: lyz Date: Wed, 20 Aug 2025 22:33:04 +0800 Subject: [PATCH 3/5] refactor: simplify Express implementation following mcp-echarts pattern --- src/services/sse.ts | 38 ++----- src/services/streamable.ts | 208 ++++++++++--------------------------- src/utils/expressServer.ts | 131 ----------------------- src/utils/index.ts | 1 - 4 files changed, 67 insertions(+), 311 deletions(-) delete mode 100644 src/utils/expressServer.ts diff --git a/src/services/sse.ts b/src/services/sse.ts index 74a6dce..d89f0be 100644 --- a/src/services/sse.ts +++ b/src/services/sse.ts @@ -1,33 +1,19 @@ import type { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; -import type { Request, Response } from "express"; -import { createExpressServer } from "../utils"; +import express from "express"; export const startSSEMcpServer = async ( server: Server, endpoint = "/sse", port = 1122, ): Promise => { - const activeTransports: Record = {}; - - // Custom cleanup for SSE server - const cleanup = () => { - // Close all active transports - for (const transport of Object.values(activeTransports)) { - transport.close(); - } - server.close(); - }; + const app = express(); + app.use(express.json()); - // Create Express server - const { app, start } = createExpressServer({ - port, - serverType: "SSE Server", - cleanup, - }); + const activeTransports: Record = {}; // Handle GET requests to the SSE endpoint - app.get(endpoint, async (req: Request, res: Response) => { + app.get(endpoint, async (req, res) => { const transport = new SSEServerTransport("/messages", res); activeTransports[transport.sessionId] = transport; @@ -35,19 +21,16 @@ export const startSSEMcpServer = async ( res.on("close", async () => { closed = true; - try { await server.close(); } catch (error) { console.error("Error closing server:", error); } - delete activeTransports[transport.sessionId]; }); try { await server.connect(transport); - await transport.send({ jsonrpc: "2.0", method: "sse/connection", @@ -62,7 +45,7 @@ export const startSSEMcpServer = async ( }); // Handle POST requests to the messages endpoint - app.post("/messages", async (req: Request, res: Response) => { + app.post("/messages", async (req, res) => { const sessionId = req.query.sessionId as string; if (!sessionId) { @@ -70,9 +53,7 @@ export const startSSEMcpServer = async ( return; } - const activeTransport: SSEServerTransport | undefined = - activeTransports[sessionId]; - + const activeTransport = activeTransports[sessionId]; if (!activeTransport) { res.status(400).send("No active transport"); return; @@ -81,6 +62,7 @@ export const startSSEMcpServer = async ( await activeTransport.handlePostMessage(req, res); }); - // Start the server and log endpoints - start([endpoint, "/messages"]); + app.listen(port, () => { + console.log(`SSE Server running on http://localhost:${port}${endpoint}`); + }); }; diff --git a/src/services/streamable.ts b/src/services/streamable.ts index 6e40f46..fc2af09 100644 --- a/src/services/streamable.ts +++ b/src/services/streamable.ts @@ -5,8 +5,8 @@ import { StreamableHTTPServerTransport, } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"; -import type { Request, Response } from "express"; -import { InMemoryEventStore, createExpressServer, getBody } from "../utils"; +import express from "express"; +import { InMemoryEventStore } from "../utils"; export const startHTTPStreamableServer = async ( createServer: () => Server, @@ -14,175 +14,81 @@ export const startHTTPStreamableServer = async ( port = 1122, eventStore: EventStore = new InMemoryEventStore(), ): Promise => { - const activeTransports: Record< - string, - { - server: Server; - transport: StreamableHTTPServerTransport; - } - > = {}; - - // Custom cleanup for streamable server - const cleanup = () => { - for (const { server, transport } of Object.values(activeTransports)) { - transport.close(); - server.close(); - } - }; - - // Create Express server - const { app, start } = createExpressServer({ - port, - serverType: "HTTP Streamable Server", - cleanup, - }); - - // Handle POST requests to endpoint - app.post(endpoint, async (req: Request, res: Response) => { - try { - const sessionId = Array.isArray(req.headers["mcp-session-id"]) - ? req.headers["mcp-session-id"][0] - : req.headers["mcp-session-id"]; - let transport: StreamableHTTPServerTransport; - let server: Server; - const body = req.body; - - /** - * diagram: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#sequence-diagram. - */ - // 1. If the sessionId is provided and the server is already created, use the existing transport and server. - if (sessionId && activeTransports[sessionId]) { - transport = activeTransports[sessionId].transport; - server = activeTransports[sessionId].server; - - // 2. If the sessionId is not provided and the request is an initialize request, create a new transport for the session. - } else if (!sessionId && isInitializeRequest(body)) { - transport = new StreamableHTTPServerTransport({ - // use the event store to store the events to replay on reconnect. - // more details: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#resumability-and-redelivery. - eventStore: eventStore || new InMemoryEventStore(), - onsessioninitialized: (_sessionId: string) => { - // add only when the id Session id is generated. - activeTransports[_sessionId] = { - server, - transport, - }; - }, - sessionIdGenerator: randomUUID, - }); + const app = express(); + app.use(express.json()); - // Handle the server close event. - transport.onclose = async () => { - const sid = transport.sessionId; - if (sid && activeTransports[sid]) { - try { - await server?.close(); - } catch (error) { - console.error("Error closing server:", error); - } + // Store transports by session ID + const transports: Record = {}; - // delete used transport and server to avoid memory leak. - delete activeTransports[sid]; - } - }; + // Handle POST requests for client-to-server communication + app.post(endpoint, async (req, res) => { + const sessionId = req.headers["mcp-session-id"] as string | undefined; + let transport: StreamableHTTPServerTransport; + + if (sessionId && transports[sessionId]) { + // Reuse existing transport + transport = transports[sessionId]; + } else if (!sessionId && isInitializeRequest(req.body)) { + // New initialization request + transport = new StreamableHTTPServerTransport({ + eventStore, + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: (sessionId) => { + transports[sessionId] = transport; + }, + }); - // Create the server - try { - server = createServer(); - } catch (error) { - console.error("Error creating server:", error); - res.status(500).json({ - error: { code: -32603, message: "Error creating server" }, - id: null, - jsonrpc: "2.0", - }); - return; + // Clean up transport when closed + transport.onclose = () => { + if (transport.sessionId) { + delete transports[transport.sessionId]; } + }; - server.connect(transport); - - await transport.handleRequest(req, res, body); - return; - } else { - // Error if the server is not created but the request is not an initialize request. - res.status(400).json({ - error: { - code: -32000, - message: "Bad Request: No valid session ID provided", - }, - id: null, - jsonrpc: "2.0", - }); - return; - } - - // Handle the request if the server is already created. - await transport.handleRequest(req, res, body); - } catch (error) { - console.error("Error handling request:", error); - res.status(500).json({ - error: { code: -32603, message: "Internal Server Error" }, - id: null, + const server = createServer(); + await server.connect(transport); + } else { + // Invalid request + res.status(400).json({ jsonrpc: "2.0", + error: { + code: -32000, + message: "Bad Request: No valid session ID provided", + }, + id: null, }); - } - }); - - // Handle GET requests to endpoint - app.get(endpoint, async (req: Request, res: Response) => { - const sessionId = req.headers["mcp-session-id"] as string | undefined; - const activeTransport: - | { - server: Server; - transport: StreamableHTTPServerTransport; - } - | undefined = sessionId ? activeTransports[sessionId] : undefined; - - if (!sessionId) { - res.status(400).send("No sessionId"); - return; - } - - if (!activeTransport) { - res.status(400).send("No active transport"); return; } - const lastEventId = req.headers["last-event-id"] as string | undefined; - if (lastEventId) { - console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`); - } else { - console.log(`Establishing new SSE stream for session ${sessionId}`); - } - - await activeTransport.transport.handleRequest(req, res); + // Handle the request + await transport.handleRequest(req, res, req.body); }); - // Handle DELETE requests to endpoint - app.delete(endpoint, async (req: Request, res: Response) => { - console.log("received delete request"); + // Handle GET requests for server-to-client notifications via SSE + app.get(endpoint, async (req, res) => { const sessionId = req.headers["mcp-session-id"] as string | undefined; - if (!sessionId) { - res.status(400).send("Invalid or missing sessionId"); + if (!sessionId || !transports[sessionId]) { + res.status(400).send("Invalid or missing session ID"); return; } - console.log("received delete request for session", sessionId); + const transport = transports[sessionId]; + await transport.handleRequest(req, res); + }); - const transport = activeTransports[sessionId]?.transport; - if (!transport) { - res.status(400).send("No active transport"); + // Handle DELETE requests for session termination + app.delete(endpoint, async (req, res) => { + const sessionId = req.headers["mcp-session-id"] as string | undefined; + if (!sessionId || !transports[sessionId]) { + res.status(400).send("Invalid or missing session ID"); return; } - try { - await transport.handleRequest(req, res); - } catch (error) { - console.error("Error handling delete request:", error); - res.status(500).send("Error handling delete request"); - } + const transport = transports[sessionId]; + await transport.handleRequest(req, res); }); - // Start the server and log endpoints - start([endpoint]); + app.listen(port, () => { + console.log(`Streamable HTTP Server running on http://localhost:${port}${endpoint}`); + }); }; diff --git a/src/utils/expressServer.ts b/src/utils/expressServer.ts deleted file mode 100644 index edae483..0000000 --- a/src/utils/expressServer.ts +++ /dev/null @@ -1,131 +0,0 @@ -import express, { - type Express, - type Request, - type Response, - type NextFunction, -} from "express"; -import type { Server as HttpServer } from "node:http"; - -/** - * Interface for Express-based handlers - */ -export interface ExpressServerOptions { - port: number; - serverType: string; - cleanup?: () => void; -} - -/** - * Sets up CORS, common endpoints and middleware - */ -function setupMiddleware(app: Express): void { - // CORS middleware - app.use((req, res, next) => { - if (req.headers.origin) { - try { - const origin = new URL(req.headers.origin as string); - res.setHeader("Access-Control-Allow-Origin", origin.origin); - res.setHeader("Access-Control-Allow-Credentials", "true"); - res.setHeader( - "Access-Control-Allow-Methods", - "GET, POST, DELETE, OPTIONS", - ); - res.setHeader("Access-Control-Allow-Headers", "*"); - } catch (error) { - console.error("Error parsing origin:", error); - } - } - next(); - }); - - // Handle OPTIONS requests - app.options("*", (req, res) => res.status(204).end()); - - // Parse JSON requests - app.use(express.json()); - - // Health check endpoints - app.get("/health", (req, res) => res.status(200).type("text/plain").send("OK")); - app.get("/ping", (req, res) => res.status(200).send("pong")); -} - -/** - * Sets up signal handlers for graceful shutdown - */ -function setupCleanupHandlers( - httpServer: HttpServer, - customCleanup?: () => void, -): void { - const cleanup = () => { - console.log("\nClosing server..."); - - // Execute custom cleanup if provided - if (customCleanup) customCleanup(); - - httpServer.close(() => { - console.log("Server closed"); - process.exit(0); - }); - }; - - process.on("SIGINT", cleanup); - process.on("SIGTERM", cleanup); -} - -/** - * Logs server startup information with formatted URLs - */ -function logServerStartup( - serverType: string, - port: number, - endpoints: string[], -): void { - const baseUrl = `http://localhost:${port}`; - const healthUrl = `${baseUrl}/health`; - const pingUrl = `${baseUrl}/ping`; - - console.log( - `${serverType} running on: \x1b[32m\u001B[4m${baseUrl}\u001B[0m\x1b[0m`, - ); - console.log("\nEndpoints:"); - for (const endpoint of endpoints) { - console.log(`• ${endpoint}: \u001B[4m${baseUrl}${endpoint}\u001B[0m`); - } - console.log("\nTest endpoints:"); - console.log(`• Health check: \u001B[4m${healthUrl}\u001B[0m`); - console.log(`• Ping test: \u001B[4m${pingUrl}\u001B[0m`); -} - -/** - * Creates a base Express server with common functionality - */ -export function createExpressServer(options: ExpressServerOptions): { - app: Express; - httpServer: HttpServer; - start: (endpoints: string[]) => void; -} { - const app = express(); - - // Set up all middleware and endpoints - setupMiddleware(app); - - // Error handling middleware - app.use((error: Error, req: Request, res: Response, next: NextFunction) => { - console.error(`Error in ${options.serverType} request handler:`, error); - res.status(500).send("Internal Server Error"); - }); - - // Create HTTP server - const httpServer = app.listen(options.port); - - // Set up cleanup handlers - setupCleanupHandlers(httpServer, options.cleanup); - - return { - app, - httpServer, - start: (endpoints: string[]) => { - logServerStartup(options.serverType, options.port, endpoints); - }, - }; -} diff --git a/src/utils/index.ts b/src/utils/index.ts index 4f4d6b7..408f736 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -3,4 +3,3 @@ export { generateChartUrl } from "./generate"; export { zodToJsonSchema } from "./schema"; export { InMemoryEventStore } from "./InMemoryEventStore"; export { getBody } from "./getBody"; -export { createExpressServer, type ExpressServerOptions } from "./expressServer"; From a3b28da74d65a940d1119fc87bb1c597d947b78d Mon Sep 17 00:00:00 2001 From: lyz Date: Thu, 21 Aug 2025 21:22:16 +0800 Subject: [PATCH 4/5] refactor: simplify Express-based MCP services following official SDK patterns - Streamline SSE service - Streamline Streamable service - Follow official TypeScript SDK examples for minimal implementations - Add cors dependency for proper browser support --- package.json | 15 +++++- src/services/sse.ts | 59 +++++++---------------- src/services/streamable.ts | 99 ++++++++++++-------------------------- 3 files changed, 60 insertions(+), 113 deletions(-) diff --git a/package.json b/package.json index 5b2bbb0..cda406c 100644 --- a/package.json +++ b/package.json @@ -30,8 +30,17 @@ "registry": "https://registry.npmjs.org/", "access": "public" }, - "files": ["build"], - "keywords": ["antv", "mcp", "data-visualization", "chart", "graph", "map"], + "files": [ + "build" + ], + "keywords": [ + "antv", + "mcp", + "data-visualization", + "chart", + "graph", + "map" + ], "repository": { "type": "git", "url": "https://github.com/antvis/mcp-server-chart" @@ -44,6 +53,7 @@ "dependencies": { "@modelcontextprotocol/sdk": "^1.11.4", "axios": "^1.11.0", + "cors": "^2.8.5", "express": "^5.1.0", "zod": "^3.25.16", "zod-to-json-schema": "^3.24.5" @@ -51,6 +61,7 @@ "devDependencies": { "@biomejs/biome": "1.9.4", "@modelcontextprotocol/inspector": "^0.14.2", + "@types/cors": "^2.8.19", "@types/express": "^5.0.3", "@types/node": "^22.15.21", "husky": "^9.1.7", diff --git a/src/services/sse.ts b/src/services/sse.ts index d89f0be..3c068b0 100644 --- a/src/services/sse.ts +++ b/src/services/sse.ts @@ -9,60 +9,35 @@ export const startSSEMcpServer = async ( ): Promise => { const app = express(); app.use(express.json()); + + const transports: Record = {}; - const activeTransports: Record = {}; - - // Handle GET requests to the SSE endpoint app.get(endpoint, async (req, res) => { - const transport = new SSEServerTransport("/messages", res); - activeTransports[transport.sessionId] = transport; - - let closed = false; - - res.on("close", async () => { - closed = true; - try { - await server.close(); - } catch (error) { - console.error("Error closing server:", error); - } - delete activeTransports[transport.sessionId]; - }); - try { + const transport = new SSEServerTransport('/messages', res); + transports[transport.sessionId] = transport; + transport.onclose = () => delete transports[transport.sessionId]; await server.connect(transport); - await transport.send({ - jsonrpc: "2.0", - method: "sse/connection", - params: { message: "SSE Connection established" }, - }); } catch (error) { - if (!closed) { - console.error("Error connecting to server:", error); - res.status(500).send("Error connecting to server"); - } + if (!res.headersSent) res.status(500).send('Error establishing SSE stream'); } }); - // Handle POST requests to the messages endpoint - app.post("/messages", async (req, res) => { + app.post('/messages', async (req, res) => { const sessionId = req.query.sessionId as string; - - if (!sessionId) { - res.status(400).send("No sessionId"); - return; - } - - const activeTransport = activeTransports[sessionId]; - if (!activeTransport) { - res.status(400).send("No active transport"); - return; + if (!sessionId) return res.status(400).send('Missing sessionId parameter'); + + const transport = transports[sessionId]; + if (!transport) return res.status(404).send('Session not found'); + + try { + await transport.handlePostMessage(req, res, req.body); + } catch (error) { + if (!res.headersSent) res.status(500).send('Error handling request'); } - - await activeTransport.handlePostMessage(req, res); }); app.listen(port, () => { - console.log(`SSE Server running on http://localhost:${port}${endpoint}`); + console.log(`SSE Server listening on http://localhost:${port}${endpoint}`); }); }; diff --git a/src/services/streamable.ts b/src/services/streamable.ts index fc2af09..516952b 100644 --- a/src/services/streamable.ts +++ b/src/services/streamable.ts @@ -1,94 +1,55 @@ -import { randomUUID } from "node:crypto"; import type { Server } from "@modelcontextprotocol/sdk/server/index.js"; -import { - type EventStore, - StreamableHTTPServerTransport, -} from "@modelcontextprotocol/sdk/server/streamableHttp.js"; -import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"; +import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import express from "express"; -import { InMemoryEventStore } from "../utils"; +import cors from "cors"; export const startHTTPStreamableServer = async ( createServer: () => Server, endpoint = "/mcp", port = 1122, - eventStore: EventStore = new InMemoryEventStore(), ): Promise => { const app = express(); app.use(express.json()); + app.use(cors({ origin: '*', exposedHeaders: ['Mcp-Session-Id'] })); - // Store transports by session ID - const transports: Record = {}; - - // Handle POST requests for client-to-server communication app.post(endpoint, async (req, res) => { - const sessionId = req.headers["mcp-session-id"] as string | undefined; - let transport: StreamableHTTPServerTransport; - - if (sessionId && transports[sessionId]) { - // Reuse existing transport - transport = transports[sessionId]; - } else if (!sessionId && isInitializeRequest(req.body)) { - // New initialization request - transport = new StreamableHTTPServerTransport({ - eventStore, - sessionIdGenerator: () => randomUUID(), - onsessioninitialized: (sessionId) => { - transports[sessionId] = transport; - }, - }); - - // Clean up transport when closed - transport.onclose = () => { - if (transport.sessionId) { - delete transports[transport.sessionId]; - } - }; - + try { const server = createServer(); + const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: undefined }); await server.connect(transport); - } else { - // Invalid request - res.status(400).json({ - jsonrpc: "2.0", - error: { - code: -32000, - message: "Bad Request: No valid session ID provided", - }, - id: null, + await transport.handleRequest(req, res, req.body); + res.on('close', () => { + transport.close(); + server.close(); }); - return; + } catch (error) { + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { code: -32603, message: 'Internal server error' }, + id: null, + }); + } } - - // Handle the request - await transport.handleRequest(req, res, req.body); }); - // Handle GET requests for server-to-client notifications via SSE - app.get(endpoint, async (req, res) => { - const sessionId = req.headers["mcp-session-id"] as string | undefined; - if (!sessionId || !transports[sessionId]) { - res.status(400).send("Invalid or missing session ID"); - return; - } - - const transport = transports[sessionId]; - await transport.handleRequest(req, res); + app.get(endpoint, (req, res) => { + res.status(405).json({ + jsonrpc: "2.0", + error: { code: -32000, message: "Method not allowed" }, + id: null + }); }); - // Handle DELETE requests for session termination - app.delete(endpoint, async (req, res) => { - const sessionId = req.headers["mcp-session-id"] as string | undefined; - if (!sessionId || !transports[sessionId]) { - res.status(400).send("Invalid or missing session ID"); - return; - } - - const transport = transports[sessionId]; - await transport.handleRequest(req, res); + app.delete(endpoint, (req, res) => { + res.status(405).json({ + jsonrpc: "2.0", + error: { code: -32000, message: "Method not allowed" }, + id: null + }); }); app.listen(port, () => { - console.log(`Streamable HTTP Server running on http://localhost:${port}${endpoint}`); + console.log(`Streamable HTTP Server listening on http://localhost:${port}${endpoint}`); }); }; From d2f5c6e366e0ee9e3c11d6c7b00bf761f3eb55d5 Mon Sep 17 00:00:00 2001 From: lyz Date: Thu, 21 Aug 2025 22:51:33 +0800 Subject: [PATCH 5/5] refactor: remove unused InMemoryEventStore and getBody utility functions --- src/utils/InMemoryEventStore.ts | 92 --------------------------------- src/utils/getBody.ts | 16 ------ src/utils/index.ts | 2 - 3 files changed, 110 deletions(-) delete mode 100644 src/utils/InMemoryEventStore.ts delete mode 100644 src/utils/getBody.ts diff --git a/src/utils/InMemoryEventStore.ts b/src/utils/InMemoryEventStore.ts deleted file mode 100644 index e1a0b59..0000000 --- a/src/utils/InMemoryEventStore.ts +++ /dev/null @@ -1,92 +0,0 @@ -/** - * This is a copy of the InMemoryEventStore from the typescript-sdk. - * reference: https://github.com/modelcontextprotocol/typescript-sdk/blob/main/src/examples/shared/inMemoryEventStore.ts - */ - -import type { EventStore } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; -import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js"; - -/** - * Simple in-memory implementation of the EventStore interface for resumability. - * This is primarily intended for examples and testing, not for production use. - * where a persistent storage solution would be more appropriate. - * see more details: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#resumability-and-redelivery. - */ -export class InMemoryEventStore implements EventStore { - private events: Map = - new Map(); - - /** - * Generates a unique event ID for a given stream ID - */ - private generateEventId(streamId: string): string { - return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`; - } - - /** - * Extracts the stream ID from an event ID - */ - private getStreamIdFromEventId(eventId: string): string { - const parts = eventId.split("_"); - return parts.length > 0 ? parts[0] : ""; - } - - /** - * Stores an event with a generated event ID - * Implements EventStore.storeEvent - */ - async storeEvent(streamId: string, message: JSONRPCMessage): Promise { - const eventId = this.generateEventId(streamId); - this.events.set(eventId, { streamId, message }); - return eventId; - } - - /** - * Replays events that occurred after a specific event ID - * Implements EventStore.replayEventsAfter - */ - async replayEventsAfter( - lastEventId: string, - { - send, - }: { send: (eventId: string, message: JSONRPCMessage) => Promise }, - ): Promise { - if (!lastEventId || !this.events.has(lastEventId)) { - return ""; - } - - // Extract the stream ID from the event ID - const streamId = this.getStreamIdFromEventId(lastEventId); - if (!streamId) { - return ""; - } - - let foundLastEvent = false; - - // Sort events by eventId for chronological ordering - const sortedEvents = [...this.events.entries()].sort((a, b) => - a[0].localeCompare(b[0]), - ); - - for (const [ - eventId, - { streamId: eventStreamId, message }, - ] of sortedEvents) { - // Only include events from the same stream - if (eventStreamId !== streamId) { - continue; - } - - // Start sending events after we find the lastEventId - if (eventId === lastEventId) { - foundLastEvent = true; - continue; - } - - if (foundLastEvent) { - await send(eventId, message); - } - } - return streamId; - } -} diff --git a/src/utils/getBody.ts b/src/utils/getBody.ts deleted file mode 100644 index 939b7a7..0000000 --- a/src/utils/getBody.ts +++ /dev/null @@ -1,16 +0,0 @@ -import type { IncomingMessage } from "node:http"; - -export function getBody(request: IncomingMessage) { - return new Promise((resolve) => { - const bodyParts: Buffer[] = []; - let body: string; - request - .on("data", (chunk) => { - bodyParts.push(chunk); - }) - .on("end", () => { - body = Buffer.concat(bodyParts).toString(); - resolve(JSON.parse(body)); - }); - }); -} diff --git a/src/utils/index.ts b/src/utils/index.ts index 408f736..ea17ac0 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -1,5 +1,3 @@ export { callTool } from "./callTool"; export { generateChartUrl } from "./generate"; export { zodToJsonSchema } from "./schema"; -export { InMemoryEventStore } from "./InMemoryEventStore"; -export { getBody } from "./getBody";