From c5de8efe0081186c8516900136bbd076a76076a2 Mon Sep 17 00:00:00 2001 From: Edouard Maleix Date: Thu, 12 Feb 2026 19:58:13 +0100 Subject: [PATCH 1/2] feat(transport): add DELETE /mcp for session termination Clients can now explicitly terminate sessions via DELETE with the Mcp-Session-Id header, per the MCP transport spec. The handler force-closes active SSE streams, unsubscribes from the message broker, and deletes the session from the store. Returns 204 on success. Co-Authored-By: Claude Opus 4.6 --- src/routes/mcp.ts | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/src/routes/mcp.ts b/src/routes/mcp.ts index abcf82d..d78a4bb 100644 --- a/src/routes/mcp.ts +++ b/src/routes/mcp.ts @@ -329,6 +329,45 @@ const mcpPubSubRoutesPlugin: FastifyPluginAsync = async } }) + // DELETE endpoint for explicit session termination (MCP spec) + if (enableSSE) { + app.delete('/mcp', async (request: FastifyRequest, reply: FastifyReply) => { + const sessionId = request.headers['mcp-session-id'] as string + if (!sessionId) { + reply.code(400).send({ error: 'Missing Mcp-Session-Id header' }) + return + } + + const session = await sessionStore.get(sessionId) + if (!session) { + reply.code(404).send({ error: 'Session not found' }) + return + } + + // Force-close any active SSE streams for this session + const streams = localStreams.get(sessionId) + if (streams) { + for (const stream of streams) { + try { + stream.raw.end() + } catch { + // stream may already be closed + } + } + localStreams.delete(sessionId) + } + + // Unsubscribe from message broker + await messageBroker.unsubscribe(`mcp/session/${sessionId}/message`) + + // Delete session from store + await sessionStore.delete(sessionId) + + app.log.info({ sessionId }, 'Session terminated via DELETE') + reply.code(204).send() + }) + } + // Subscribe to broadcast notifications if (enableSSE) { messageBroker.subscribe('mcp/broadcast/notification', (notification: JSONRPCMessage) => { From acd2267949a2707055dcf810794b60cba543f89b Mon Sep 17 00:00:00 2001 From: getlarge Date: Wed, 8 Apr 2026 21:48:12 +0200 Subject: [PATCH 2/2] test(transport): add tests for DELETE /mcp session termination Covers: - 204 on successful deletion - 400 when Mcp-Session-Id header is missing - 404 when session does not exist - SSE stream is force-closed after DELETE - session is removed from the store - DELETE route is not registered when enableSSE is false Co-Authored-By: Claude Opus 4.6 --- test/session-delete.test.ts | 238 ++++++++++++++++++++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 test/session-delete.test.ts diff --git a/test/session-delete.test.ts b/test/session-delete.test.ts new file mode 100644 index 0000000..86aa4b8 --- /dev/null +++ b/test/session-delete.test.ts @@ -0,0 +1,238 @@ +import { test, describe } from 'node:test' +import type { TestContext } from 'node:test' +import Fastify from 'fastify' +import { request, Agent, setGlobalDispatcher } from 'undici' +import mcpPlugin from '../src/index.ts' +import { JSONRPC_VERSION, LATEST_PROTOCOL_VERSION } from '../src/schema.ts' + +setGlobalDispatcher(new Agent({ + keepAliveTimeout: 10, + keepAliveMaxTimeout: 10 +})) + +describe('Session DELETE', () => { + test('returns 204 on successful session deletion', async (t: TestContext) => { + const app = Fastify({ logger: false }) + t.after(() => app.close()) + + await app.register(mcpPlugin, { + serverInfo: { name: 'test-server', version: '1.0.0' }, + enableSSE: true + }) + + await app.listen({ port: 0 }) + const address = app.server.address() + const port = typeof address === 'object' && address ? address.port : 0 + const baseUrl = `http://localhost:${port}` + + // Create a session via POST initialize + const initResponse = await request(`${baseUrl}/mcp`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + jsonrpc: JSONRPC_VERSION, + id: 1, + method: 'initialize', + params: { + protocolVersion: LATEST_PROTOCOL_VERSION, + capabilities: {}, + clientInfo: { name: 'test-client', version: '1.0.0' } + } + }) + }) + + const sessionId = initResponse.headers['mcp-session-id'] as string + t.assert.ok(sessionId) + + // DELETE the session + const deleteResponse = await request(`${baseUrl}/mcp`, { + method: 'DELETE', + headers: { 'mcp-session-id': sessionId } + }) + + t.assert.strictEqual(deleteResponse.statusCode, 204) + }) + + test('returns 400 when mcp-session-id header is missing', async (t: TestContext) => { + const app = Fastify({ logger: false }) + t.after(() => app.close()) + + await app.register(mcpPlugin, { + serverInfo: { name: 'test-server', version: '1.0.0' }, + enableSSE: true + }) + + await app.listen({ port: 0 }) + const address = app.server.address() + const port = typeof address === 'object' && address ? address.port : 0 + const baseUrl = `http://localhost:${port}` + + const deleteResponse = await request(`${baseUrl}/mcp`, { + method: 'DELETE' + }) + + t.assert.strictEqual(deleteResponse.statusCode, 400) + const body = await deleteResponse.body.json() as { error: string } + t.assert.ok(body.error.includes('Mcp-Session-Id')) + }) + + test('returns 404 when session does not exist', async (t: TestContext) => { + const app = Fastify({ logger: false }) + t.after(() => app.close()) + + await app.register(mcpPlugin, { + serverInfo: { name: 'test-server', version: '1.0.0' }, + enableSSE: true + }) + + await app.listen({ port: 0 }) + const address = app.server.address() + const port = typeof address === 'object' && address ? address.port : 0 + const baseUrl = `http://localhost:${port}` + + const deleteResponse = await request(`${baseUrl}/mcp`, { + method: 'DELETE', + headers: { 'mcp-session-id': 'nonexistent-session-id' } + }) + + t.assert.strictEqual(deleteResponse.statusCode, 404) + }) + + test('SSE stream is closed after DELETE', async (t: TestContext) => { + const app = Fastify({ logger: false }) + t.after(() => app.close()) + + await app.register(mcpPlugin, { + serverInfo: { name: 'test-server', version: '1.0.0' }, + enableSSE: true + }) + + await app.listen({ port: 0 }) + const address = app.server.address() + const port = typeof address === 'object' && address ? address.port : 0 + const baseUrl = `http://localhost:${port}` + + // Create a session via POST initialize + const initResponse = await request(`${baseUrl}/mcp`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + jsonrpc: JSONRPC_VERSION, + id: 1, + method: 'initialize', + params: { + protocolVersion: LATEST_PROTOCOL_VERSION, + capabilities: {}, + clientInfo: { name: 'test-client', version: '1.0.0' } + } + }) + }) + + const sessionId = initResponse.headers['mcp-session-id'] as string + t.assert.ok(sessionId) + + // Open SSE stream + const sseResponse = await request(`${baseUrl}/mcp`, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + 'mcp-session-id': sessionId + } + }) + + t.assert.strictEqual(sseResponse.statusCode, 200) + + // Consume the stream — it will resolve when the server closes it + const streamConsumed = (async () => { + const chunks: Buffer[] = [] + for await (const chunk of sseResponse.body) { + chunks.push(chunk as Buffer) + } + return Buffer.concat(chunks).toString() + })() + + // DELETE the session + const deleteResponse = await request(`${baseUrl}/mcp`, { + method: 'DELETE', + headers: { 'mcp-session-id': sessionId } + }) + + t.assert.strictEqual(deleteResponse.statusCode, 204) + + // Stream should complete (server closed it) within a reasonable time + const result = await Promise.race([ + streamConsumed.then(() => 'closed'), + new Promise((resolve) => setTimeout(() => resolve('timeout'), 5000)) + ]) + + t.assert.strictEqual(result, 'closed') + }) + + test('session is removed from store after DELETE', async (t: TestContext) => { + const app = Fastify({ logger: false }) + t.after(() => app.close()) + + await app.register(mcpPlugin, { + serverInfo: { name: 'test-server', version: '1.0.0' }, + enableSSE: true + }) + + await app.listen({ port: 0 }) + const address = app.server.address() + const port = typeof address === 'object' && address ? address.port : 0 + const baseUrl = `http://localhost:${port}` + + // Create a session via POST initialize + const initResponse = await request(`${baseUrl}/mcp`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + jsonrpc: JSONRPC_VERSION, + id: 1, + method: 'initialize', + params: { + protocolVersion: LATEST_PROTOCOL_VERSION, + capabilities: {}, + clientInfo: { name: 'test-client', version: '1.0.0' } + } + }) + }) + + const sessionId = initResponse.headers['mcp-session-id'] as string + t.assert.ok(sessionId) + + // DELETE the session + await request(`${baseUrl}/mcp`, { + method: 'DELETE', + headers: { 'mcp-session-id': sessionId } + }) + + // Try to DELETE again — should be 404 + const secondDelete = await request(`${baseUrl}/mcp`, { + method: 'DELETE', + headers: { 'mcp-session-id': sessionId } + }) + + t.assert.strictEqual(secondDelete.statusCode, 404) + }) + + test('DELETE route is not registered when SSE is disabled', async (t: TestContext) => { + const app = Fastify({ logger: false }) + t.after(() => app.close()) + + await app.register(mcpPlugin, { + serverInfo: { name: 'test-server', version: '1.0.0' }, + enableSSE: false + }) + await app.ready() + + const response = await app.inject({ + method: 'DELETE', + url: '/mcp', + headers: { 'mcp-session-id': 'some-session' } + }) + + // Fastify returns 404 for unregistered routes + t.assert.strictEqual(response.statusCode, 404) + }) +})