From 389b36dd12585c391f9a71dfba6f60122323efa2 Mon Sep 17 00:00:00 2001 From: ochafik Date: Mon, 30 Mar 2026 17:01:19 +0100 Subject: [PATCH] pdf-server: add CommandQueue, Vercel deployment, improved tool description MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add CommandQueue with pluggable backends (in-memory for stdio, Redis/Upstash REST for serverless). Supports long-polling with wake-on-enqueue, TTL pruning, and AbortSignal cancellation. - Replace ~130 lines of hand-rolled queue infrastructure (Maps, pollWaiters, pruneStaleQueues) with CommandQueue. Same behavior, fewer moving parts. - Add Vercel deployment support: stateless HTTP handler (http.ts), DOMMatrix/ImageData/Path2D polyfills for pdfjs-dist on serverless, CORS headers, vercel.json config. - Gate interact tool on Redis availability for HTTP deployments — without Redis, stateless handlers lose command queue state between requests, so only read-only tools are exposed. - Update display_pdf description to tell the model that the viewer's widget context automatically reports current page, total pages, text selection, search results, and annotation details. - 29 new tests for CommandQueue (memory backend, Redis mock, Upstash REST fetch mock, MCP tool roundtrip simulation). Co-Authored-By: Claude Opus 4.6 (1M context) --- examples/pdf-server/.gitignore | 2 + examples/pdf-server/.vercelignore | 5 + examples/pdf-server/api/mcp.mjs | 70 ++ examples/pdf-server/http.ts | 94 +++ examples/pdf-server/package.json | 2 +- examples/pdf-server/server.ts | 197 ++--- examples/pdf-server/serverless-polyfills.ts | 77 ++ examples/pdf-server/src/command-queue.test.ts | 574 +++++++++++++++ examples/pdf-server/src/command-queue.ts | 694 ++++++++++++++++++ examples/pdf-server/vercel.json | 12 + package-lock.json | 39 + 11 files changed, 1630 insertions(+), 136 deletions(-) create mode 100644 examples/pdf-server/.vercelignore create mode 100644 examples/pdf-server/api/mcp.mjs create mode 100644 examples/pdf-server/http.ts create mode 100644 examples/pdf-server/serverless-polyfills.ts create mode 100644 examples/pdf-server/src/command-queue.test.ts create mode 100644 examples/pdf-server/src/command-queue.ts create mode 100644 examples/pdf-server/vercel.json diff --git a/examples/pdf-server/.gitignore b/examples/pdf-server/.gitignore index a856bb2b..fb2fdce5 100644 --- a/examples/pdf-server/.gitignore +++ b/examples/pdf-server/.gitignore @@ -1,2 +1,4 @@ dist/ *.jsonl +.vercel +.env*.local diff --git a/examples/pdf-server/.vercelignore b/examples/pdf-server/.vercelignore new file mode 100644 index 00000000..fcaaa3e0 --- /dev/null +++ b/examples/pdf-server/.vercelignore @@ -0,0 +1,5 @@ +node_modules +src +*.jsonl +*.test.ts +.vercel diff --git a/examples/pdf-server/api/mcp.mjs b/examples/pdf-server/api/mcp.mjs new file mode 100644 index 00000000..06f71b25 --- /dev/null +++ b/examples/pdf-server/api/mcp.mjs @@ -0,0 +1,70 @@ +// Polyfill DOMMatrix/ImageData/Path2D before pdfjs-dist loads. +// Uses dynamic import() so polyfills execute before pdfjs-dist initializes. +if (typeof globalThis.DOMMatrix === "undefined") { + globalThis.DOMMatrix = class DOMMatrix { + constructor(init) { + this.a = 1; + this.b = 0; + this.c = 0; + this.d = 1; + this.e = 0; + this.f = 0; + if (Array.isArray(init)) + [this.a, this.b, this.c, this.d, this.e, this.f] = init; + } + get isIdentity() { + return ( + this.a === 1 && + this.b === 0 && + this.c === 0 && + this.d === 1 && + this.e === 0 && + this.f === 0 + ); + } + translate() { + return new DOMMatrix(); + } + scale() { + return new DOMMatrix(); + } + inverse() { + return new DOMMatrix(); + } + multiply() { + return new DOMMatrix(); + } + transformPoint(p) { + return p ?? { x: 0, y: 0 }; + } + static fromMatrix() { + return new DOMMatrix(); + } + }; +} +if (typeof globalThis.ImageData === "undefined") { + globalThis.ImageData = class ImageData { + constructor(w, h) { + this.width = w; + this.height = h; + this.data = new Uint8ClampedArray(w * h * 4); + } + }; +} +if (typeof globalThis.Path2D === "undefined") { + globalThis.Path2D = class Path2D { + moveTo() {} + lineTo() {} + bezierCurveTo() {} + quadraticCurveTo() {} + arc() {} + arcTo() {} + ellipse() {} + rect() {} + closePath() {} + }; +} + +// Dynamic import so polyfills above execute first. +const { default: handler } = await import("../dist/http.js"); +export default handler; diff --git a/examples/pdf-server/http.ts b/examples/pdf-server/http.ts new file mode 100644 index 00000000..4b54ceaf --- /dev/null +++ b/examples/pdf-server/http.ts @@ -0,0 +1,94 @@ +/** + * Vercel serverless handler for the PDF MCP server. + * + * Stateless: each request creates a fresh MCP server instance. + * The CommandQueue persists state across requests via Redis (Upstash). + * + * Deploy: vercel deploy --prod + * Env vars: UPSTASH_REDIS_REST_URL + TOKEN, or KV_REST_API_URL + TOKEN + */ + +// Must be first import — pdfjs-dist checks for DOMMatrix at module init. +import "./serverless-polyfills.js"; + +import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; +import type { IncomingMessage, ServerResponse } from "node:http"; +import { createServer } from "./server.js"; + +type Req = IncomingMessage & { body?: unknown }; +type Res = ServerResponse; + +function setCors(res: Res): void { + res.setHeader("Access-Control-Allow-Origin", "*"); + res.setHeader("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS"); + res.setHeader( + "Access-Control-Allow-Headers", + "Content-Type, Accept, Mcp-Session-Id", + ); + res.setHeader("Access-Control-Expose-Headers", "Mcp-Session-Id"); +} + +export default async function handler(req: Req, res: Res): Promise { + setCors(res); + + if (req.method === "OPTIONS") { + res.writeHead(204); + res.end(); + return; + } + + if (req.method === "DELETE") { + res.writeHead(200); + res.end(); + return; + } + + const url = new URL( + req.url ?? "/", + `http://${req.headers.host ?? "localhost"}`, + ); + + if (url.pathname !== "/mcp" && url.pathname !== "/api/mcp") { + const redisUrl = + process.env.UPSTASH_REDIS_REST_URL ?? process.env.KV_REST_API_URL; + res.writeHead(200, { "Content-Type": "text/plain" }); + res.end( + `PDF MCP Server\n\nMCP endpoint: ${url.origin}/mcp\n` + + `Redis: ${redisUrl ? "configured" : "not configured (in-memory)"}`, + ); + return; + } + + // Stateless: fresh server + transport per request. + // The interact tool + command queue require Redis for cross-request state. + // Without Redis, only read-only tools (list_pdfs, display_pdf) are exposed. + const hasRedis = !!( + process.env.UPSTASH_REDIS_REST_URL ?? process.env.KV_REST_API_URL + ); + const server = createServer({ enableInteract: hasRedis }); + const transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: undefined, // stateless — no sessions needed + }); + + res.on("close", () => { + transport.close().catch(() => {}); + server.close().catch(() => {}); + }); + + try { + await server.connect(transport); + await transport.handleRequest(req, res, req.body); + } catch (error) { + console.error("MCP error:", error); + if (!res.headersSent) { + res.writeHead(500, { "Content-Type": "application/json" }); + res.end( + JSON.stringify({ + jsonrpc: "2.0", + error: { code: -32603, message: "Internal server error" }, + id: null, + }), + ); + } + } +} diff --git a/examples/pdf-server/package.json b/examples/pdf-server/package.json index 520e2342..8b684e83 100644 --- a/examples/pdf-server/package.json +++ b/examples/pdf-server/package.json @@ -14,7 +14,7 @@ "dist" ], "scripts": { - "build": "tsc --noEmit && cross-env INPUT=mcp-app.html vite build && tsc -p tsconfig.server.json && bun build server.ts --outdir dist --target node --external pdfjs-dist && bun build main.ts --outfile dist/index.js --target node --external \"./server.js\" --external pdfjs-dist --banner \"#!/usr/bin/env node\"", + "build": "tsc --noEmit && cross-env INPUT=mcp-app.html vite build && tsc -p tsconfig.server.json && bun build server.ts --outdir dist --target node --external pdfjs-dist && bun build main.ts --outfile dist/index.js --target node --external \"./server.js\" --external pdfjs-dist --banner \"#!/usr/bin/env node\" && bun build http.ts --outfile dist/http.js --target node --external pdfjs-dist", "watch": "cross-env INPUT=mcp-app.html vite build --watch", "serve": "bun --watch main.ts --enable-interact", "serve:stdio": "bun main.ts --stdio", diff --git a/examples/pdf-server/server.ts b/examples/pdf-server/server.ts index 5bc2502f..9cb3d799 100644 --- a/examples/pdf-server/server.ts +++ b/examples/pdf-server/server.ts @@ -20,6 +20,8 @@ import { registerAppTool, RESOURCE_MIME_TYPE, } from "@modelcontextprotocol/ext-apps/server"; +import { CommandQueue, RedisCommandStore } from "./src/command-queue.js"; +import type { PdfCommand } from "./src/commands.js"; import { RootsListChangedNotificationSchema, type CallToolResult, @@ -146,18 +148,26 @@ const DIST_DIR = import.meta.filename.endsWith(".ts") : import.meta.dirname; // ============================================================================= -// Command Queue (shared across stateless server instances) +// Command Queue (backed by CommandQueue from ext-apps/server) // ============================================================================= -/** Commands expire after this many ms if never polled */ -const COMMAND_TTL_MS = 60_000; // 60 seconds - -/** Periodic sweep interval to drop stale queues */ -const SWEEP_INTERVAL_MS = 30_000; // 30 seconds +const redisUrl = + process.env.UPSTASH_REDIS_REST_URL ?? process.env.KV_REST_API_URL; +const redisToken = + process.env.UPSTASH_REDIS_REST_TOKEN ?? process.env.KV_REST_API_TOKEN; + +const commandQueue = new CommandQueue( + redisUrl + ? { + store: new RedisCommandStore({ + url: redisUrl, + token: redisToken!, + }), + } + : undefined, +); -/** Fixed batch window: when commands are present, wait this long before returning to let more accumulate */ -const POLL_BATCH_WAIT_MS = 200; -const LONG_POLL_TIMEOUT_MS = 30_000; // Max time to hold a long-poll request open +export type { PdfCommand }; // ============================================================================= // Interact Tool Input Schemas (runtime validators) @@ -178,16 +188,6 @@ const PageInterval = z.object({ end: z.number().min(1).optional(), }); -// ============================================================================= -// Command Queue — wire protocol shared with the viewer -// ============================================================================= - -// PdfCommand is the single source of truth for what flows through the -// poll queue. Defined once in src/commands.ts; both sides import it. -// (`import type` → no pdf-lib bundled into the server.) -import type { PdfCommand } from "./src/commands.js"; -export type { PdfCommand }; - // ============================================================================= // Pending get_pages Requests (request-response bridge via client) // ============================================================================= @@ -232,17 +232,6 @@ function waitForPageData( }); } -interface QueueEntry { - commands: PdfCommand[]; - /** Timestamp of the most recent enqueue or dequeue */ - lastActivity: number; -} - -const commandQueues = new Map(); - -/** Waiters for long-poll: resolve callback wakes up a blocked poll_pdf_commands */ -const pollWaiters = new Map void>(); - /** Valid form field names per viewer UUID (populated during display_pdf) */ const viewFieldNames = new Map>(); @@ -262,68 +251,14 @@ interface ViewFileWatch { } const viewFileWatches = new Map(); -/** - * Per-view heartbeat. THIS is what the sweep iterates — not commandQueues. - * - * Why not commandQueues: display_pdf populates viewFieldNames/viewFieldInfo/ - * viewFileWatches but never touches commandQueues (only enqueueCommand does, - * and it's triply gated). And dequeueCommands deletes the entry on every poll, - * so even when it exists the sweep's TTL window is ~200ms wide. Net effect: - * the sweep found nothing and the aux maps leaked every display_pdf call. - * viewFileWatches entries hold an fs.StatWatcher (FD + timer) — slow FD - * exhaustion on HTTP --enable-interact. - */ -const viewLastActivity = new Map(); - -/** Register or refresh the heartbeat for a view. */ -function touchView(uuid: string): void { - viewLastActivity.set(uuid, Date.now()); -} - -function pruneStaleQueues(): void { - const now = Date.now(); - for (const [uuid, lastActivity] of viewLastActivity) { - if (now - lastActivity > COMMAND_TTL_MS) { - viewLastActivity.delete(uuid); - commandQueues.delete(uuid); - viewFieldNames.delete(uuid); - viewFieldInfo.delete(uuid); - stopFileWatch(uuid); - } +// Clean up per-view auxiliary state when the command queue prunes a view. +commandQueue.onPrune((ids) => { + for (const uuid of ids) { + viewFieldNames.delete(uuid); + viewFieldInfo.delete(uuid); + stopFileWatch(uuid); } -} - -// Periodic sweep so abandoned views don't leak -setInterval(pruneStaleQueues, SWEEP_INTERVAL_MS).unref(); - -function enqueueCommand(viewUUID: string, command: PdfCommand): void { - let entry = commandQueues.get(viewUUID); - if (!entry) { - entry = { commands: [], lastActivity: Date.now() }; - commandQueues.set(viewUUID, entry); - } - entry.commands.push(command); - entry.lastActivity = Date.now(); - touchView(viewUUID); - - // Wake up any long-polling request waiting for this viewUUID - const waiter = pollWaiters.get(viewUUID); - if (waiter) { - pollWaiters.delete(viewUUID); - waiter(); - } -} - -function dequeueCommands(viewUUID: string): PdfCommand[] { - // Poll is activity — keep the view alive even when the queue is empty - // (the common case: viewer polls every ~30s with nothing to receive). - touchView(viewUUID); - const entry = commandQueues.get(viewUUID); - if (!entry) return []; - const commands = entry.commands; - commandQueues.delete(viewUUID); - return commands; -} +}); // ============================================================================= // File Watching (local files, stdio only) @@ -362,7 +297,10 @@ export function startFileWatch(viewUUID: string, filePath: string): void { } if (s.mtimeMs === entry.lastMtimeMs) return; // spurious / already sent entry.lastMtimeMs = s.mtimeMs; - enqueueCommand(viewUUID, { type: "file_changed", mtimeMs: s.mtimeMs }); + void commandQueue.enqueue(viewUUID, { + type: "file_changed", + mtimeMs: s.mtimeMs, + }); }, FILE_WATCH_DEBOUNCE_MS); // Atomic saves replace the inode — old watcher stops firing. Re-attach. @@ -375,7 +313,7 @@ export function startFileWatch(viewUUID: string, filePath: string): void { try { entry.watcher = fs.watch(resolved, onEvent); } catch { - // File removed, not replaced. Leave closed; pruneStaleQueues cleans up. + // File removed, not replaced. Leave closed; commandQueue.sweep() cleans up. } } }; @@ -838,10 +776,11 @@ async function extractFormFieldInfo( verbosity: VerbosityLevel.ERRORS, }); const pdfDoc = await loadingTask.promise; + const pageCount = pdfDoc.numPages; const fields: FormFieldInfo[] = []; try { - for (let i = 1; i <= pdfDoc.numPages; i++) { + for (let i = 1; i <= pageCount; i++) { const page = await pdfDoc.getPage(i); const pageHeight = page.getViewport({ scale: 1.0 }).height; const annotations = await page.getAnnotations(); @@ -1005,8 +944,12 @@ async function extractFormSchema( export interface CreateServerOptions { /** * Enable the `interact` tool and related command-queue infrastructure - * (in-memory command queue, `poll_pdf_commands`, `submit_page_data`). - * Only suitable for single-instance deployments (e.g. stdio transport). + * (`poll_pdf_commands`, `submit_page_data`). + * + * Safe for: stdio transport, single-instance HTTP, or stateless HTTP + * with Redis configured (UPSTASH_REDIS_REST_URL / KV_REST_API_URL). + * Without Redis on stateless HTTP, commands are lost between requests. + * * Defaults to false — server exposes only `list_pdfs` and `display_pdf` (read-only). */ enableInteract?: boolean; @@ -1238,6 +1181,8 @@ Returns a viewUUID in structuredContent. Pass it to \`interact\`: - navigate, search, find, search_navigate, zoom - get_text, get_screenshot (extract content) +The viewer's widget context automatically updates with the current page number, total page count, any text selection, search results, and annotation details — check it before deciding what to do next. + Accepts local files (use list_pdfs), client MCP root directories, or any HTTPS URL. Set \`elicit_form_inputs\` to true to prompt the user to fill form fields before display.`, inputSchema: { @@ -1307,7 +1252,7 @@ Set \`elicit_form_inputs\` to true to prompt the user to fill form fields before const uuid = randomUUID(); // Start the heartbeat now so the sweep can clean up viewFieldNames/ // viewFieldInfo/viewFileWatches even if no interact calls ever happen. - if (!disableInteract) touchView(uuid); + if (!disableInteract) void commandQueue.touch(uuid); // Check writability (governs save button; see isWritablePath doc). // Also requires OS-level W_OK so we don't lie on read-only mounts. @@ -1350,7 +1295,6 @@ Set \`elicit_form_inputs\` to true to prompt the user to fill form fields before fieldInfo = await extractFormFieldInfo(normalized, readPdfRange); if (fieldInfo.length > 0) { viewFieldInfo.set(uuid, fieldInfo); - // Also populate viewFieldNames from field info if not already set if (!viewFieldNames.has(uuid)) { viewFieldNames.set( uuid, @@ -1381,7 +1325,7 @@ Set \`elicit_form_inputs\` to true to prompt the user to fill form fields before } } // Queue fill_form command so the viewer picks it up - enqueueCommand(uuid, { + await commandQueue.enqueue(uuid, { type: "fill_form", fields: Object.entries(formFieldValues).map( ([name, value]) => ({ name, value }), @@ -1724,7 +1668,7 @@ URL: ${normalized}`, content: [{ type: "text", text: "navigate requires `page`" }], isError: true, }; - enqueueCommand(uuid, { type: "navigate", page }); + await commandQueue.enqueue(uuid, { type: "navigate", page }); description = `navigate to page ${page}`; break; case "search": @@ -1733,7 +1677,7 @@ URL: ${normalized}`, content: [{ type: "text", text: "search requires `query`" }], isError: true, }; - enqueueCommand(uuid, { type: "search", query }); + await commandQueue.enqueue(uuid, { type: "search", query }); description = `search for "${query}"`; break; case "find": @@ -1742,7 +1686,7 @@ URL: ${normalized}`, content: [{ type: "text", text: "find requires `query`" }], isError: true, }; - enqueueCommand(uuid, { type: "find", query }); + await commandQueue.enqueue(uuid, { type: "find", query }); description = `find "${query}" (silent)`; break; case "search_navigate": @@ -1756,7 +1700,10 @@ URL: ${normalized}`, ], isError: true, }; - enqueueCommand(uuid, { type: "search_navigate", matchIndex }); + await commandQueue.enqueue(uuid, { + type: "search_navigate", + matchIndex, + }); description = `go to match #${matchIndex}`; break; case "zoom": @@ -1765,7 +1712,7 @@ URL: ${normalized}`, content: [{ type: "text", text: "zoom requires `scale`" }], isError: true, }; - enqueueCommand(uuid, { type: "zoom", scale }); + await commandQueue.enqueue(uuid, { type: "zoom", scale }); description = `zoom to ${Math.round(scale * 100)}%`; break; case "add_annotations": @@ -1799,7 +1746,7 @@ URL: ${normalized}`, isError: true, }; } - enqueueCommand(uuid, { + await commandQueue.enqueue(uuid, { type: "add_annotations", // resolveImageAnnotation populates optional x/y/width/height; // input is validated as Record[] so this cast is @@ -1822,7 +1769,7 @@ URL: ${normalized}`, ], isError: true, }; - enqueueCommand(uuid, { + await commandQueue.enqueue(uuid, { type: "update_annotations", annotations: annotations as Extract< PdfCommand, @@ -1842,7 +1789,7 @@ URL: ${normalized}`, ], isError: true, }; - enqueueCommand(uuid, { type: "remove_annotations", ids }); + await commandQueue.enqueue(uuid, { type: "remove_annotations", ids }); description = `remove ${ids.length} annotation(s)`; break; case "highlight_text": { @@ -1854,7 +1801,7 @@ URL: ${normalized}`, isError: true, }; const id = `ht_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; - enqueueCommand(uuid, { + await commandQueue.enqueue(uuid, { type: "highlight_text", id, query, @@ -1884,7 +1831,10 @@ URL: ${normalized}`, } } if (validFields.length > 0) { - enqueueCommand(uuid, { type: "fill_form", fields: validFields }); + await commandQueue.enqueue(uuid, { + type: "fill_form", + fields: validFields, + }); } const parts: string[] = []; if (validFields.length > 0) { @@ -1915,7 +1865,7 @@ URL: ${normalized}`, const requestId = randomUUID(); - enqueueCommand(uuid, { + await commandQueue.enqueue(uuid, { type: "get_pages", requestId, intervals: resolvedIntervals, @@ -1963,7 +1913,7 @@ URL: ${normalized}`, const requestId = randomUUID(); - enqueueCommand(uuid, { + await commandQueue.enqueue(uuid, { type: "get_pages", requestId, intervals: [{ start: page, end: page }], @@ -2280,30 +2230,7 @@ Example — add a signature image and a stamp, then screenshot to verify: _meta: { ui: { visibility: ["app"] } }, }, async ({ viewUUID: uuid }): Promise => { - // If commands are already queued, wait briefly to let more accumulate - if (commandQueues.has(uuid)) { - await new Promise((r) => setTimeout(r, POLL_BATCH_WAIT_MS)); - } else { - // Long-poll: wait for commands to arrive or timeout - await new Promise((resolve) => { - const timer = setTimeout(() => { - pollWaiters.delete(uuid); - resolve(); - }, LONG_POLL_TIMEOUT_MS); - // Cancel any existing waiter for this uuid - const prev = pollWaiters.get(uuid); - if (prev) prev(); - pollWaiters.set(uuid, () => { - clearTimeout(timer); - resolve(); - }); - }); - // After waking, wait briefly for batching - if (commandQueues.has(uuid)) { - await new Promise((r) => setTimeout(r, POLL_BATCH_WAIT_MS)); - } - } - const commands = dequeueCommands(uuid); + const commands = await commandQueue.poll(uuid); return { content: [{ type: "text", text: `${commands.length} command(s)` }], structuredContent: { commands }, diff --git a/examples/pdf-server/serverless-polyfills.ts b/examples/pdf-server/serverless-polyfills.ts new file mode 100644 index 00000000..935fe85b --- /dev/null +++ b/examples/pdf-server/serverless-polyfills.ts @@ -0,0 +1,77 @@ +/** + * Minimal DOM polyfills for pdfjs-dist on serverless runtimes (Vercel, etc.) + * that don't provide browser globals. + * + * Must be imported BEFORE any pdfjs-dist import. + */ + +if (typeof globalThis.DOMMatrix === "undefined") { + globalThis.DOMMatrix = class DOMMatrix { + a = 1; + b = 0; + c = 0; + d = 1; + e = 0; + f = 0; + constructor(init?: number[] | string) { + if (Array.isArray(init)) { + [this.a, this.b, this.c, this.d, this.e, this.f] = init; + } + } + get isIdentity() { + return ( + this.a === 1 && + this.b === 0 && + this.c === 0 && + this.d === 1 && + this.e === 0 && + this.f === 0 + ); + } + translate() { + return new DOMMatrix(); + } + scale() { + return new DOMMatrix(); + } + inverse() { + return new DOMMatrix(); + } + multiply() { + return new DOMMatrix(); + } + transformPoint(p?: { x: number; y: number }) { + return p ?? { x: 0, y: 0 }; + } + static fromMatrix() { + return new DOMMatrix(); + } + } as unknown as typeof DOMMatrix; +} + +if (typeof globalThis.ImageData === "undefined") { + globalThis.ImageData = class ImageData { + readonly width: number; + readonly height: number; + readonly data: Uint8ClampedArray; + constructor(w: number, h: number) { + this.width = w; + this.height = h; + this.data = new Uint8ClampedArray(w * h * 4); + } + } as unknown as typeof ImageData; +} + +if (typeof globalThis.Path2D === "undefined") { + globalThis.Path2D = class Path2D { + moveTo() {} + lineTo() {} + bezierCurveTo() {} + quadraticCurveTo() {} + arc() {} + arcTo() {} + ellipse() {} + rect() {} + closePath() {} + } as unknown as typeof Path2D; +} diff --git a/examples/pdf-server/src/command-queue.test.ts b/examples/pdf-server/src/command-queue.test.ts new file mode 100644 index 00000000..2da1b8af --- /dev/null +++ b/examples/pdf-server/src/command-queue.test.ts @@ -0,0 +1,574 @@ +import { describe, it, expect, beforeEach, afterEach } from "bun:test"; +import { + CommandQueue, + MemoryCommandStore, + MemoryCommandNotifier, + RedisCommandStore, + RedisCommandNotifier, + type RedisLike, + type CommandStore, + type CommandNotifier, +} from "./command-queue"; + +// ─── In-memory mock of RedisLike ──────────────────────────────────────────── + +class MockRedis implements RedisLike { + private store = new Map(); + private lists = new Map(); + private subs = new Map void>>(); + + async lpush(key: string, ...values: string[]): Promise { + let list = this.lists.get(key); + if (!list) { + list = []; + this.lists.set(key, list); + } + // LPUSH prepends; push to front + list.unshift(...values); + return list.length; + } + + async lrange(key: string, start: number, stop: number): Promise { + const list = this.lists.get(key) ?? []; + if (stop === -1) return list.slice(start); + return list.slice(start, stop + 1); + } + + async del(...keys: string[]): Promise { + let count = 0; + for (const k of keys) { + if (this.store.delete(k)) count++; + if (this.lists.delete(k)) count++; + } + return count; + } + + async set(key: string, value: string): Promise { + this.store.set(key, value); + return "OK"; + } + + async get(key: string): Promise { + return this.store.get(key) ?? null; + } + + async keys(pattern: string): Promise { + const prefix = pattern.replace("*", ""); + return [...this.store.keys()].filter((k) => k.startsWith(prefix)); + } + + async expire(): Promise { + return 1; + } + + async publish(channel: string, message: string): Promise { + const set = this.subs.get(channel); + if (set) for (const cb of set) cb(message); + return set?.size ?? 0; + } + + async subscribe( + channel: string, + callback: (message: string) => void, + ): Promise<() => Promise> { + let set = this.subs.get(channel); + if (!set) { + set = new Set(); + this.subs.set(channel, set); + } + set.add(callback); + return async () => { + set!.delete(callback); + if (set!.size === 0) this.subs.delete(channel); + }; + } +} + +// ─── Shared test suite (runs for both backends) ───────────────────────────── + +type Command = { type: string; page?: number }; + +function suiteFor( + name: string, + makeQueue: () => { + queue: CommandQueue; + store: CommandStore; + notifier: CommandNotifier; + }, +) { + describe(name, () => { + let queue: CommandQueue; + + beforeEach(() => { + const ctx = makeQueue(); + queue = ctx.queue; + }); + + afterEach(async () => { + await queue.close(); + }); + + it("enqueue then poll returns commands in FIFO order", async () => { + await queue.enqueue("v1", { type: "navigate", page: 1 }); + await queue.enqueue("v1", { type: "navigate", page: 2 }); + + const cmds = await queue.poll("v1", { + timeoutMs: 100, + batchWaitMs: 10, + }); + + expect(cmds).toEqual([ + { type: "navigate", page: 1 }, + { type: "navigate", page: 2 }, + ]); + }); + + it("poll returns empty array on timeout when no commands", async () => { + const cmds = await queue.poll("v1", { + timeoutMs: 50, + batchWaitMs: 10, + }); + expect(cmds).toEqual([]); + }); + + it("poll wakes up when command is enqueued during wait", async () => { + const t0 = Date.now(); + + // Start polling in background + const pollPromise = queue.poll("v1", { + timeoutMs: 5000, + batchWaitMs: 10, + }); + + // Enqueue after a short delay + await sleep(30); + await queue.enqueue("v1", { type: "navigate", page: 3 }); + + const cmds = await pollPromise; + const elapsed = Date.now() - t0; + + expect(cmds).toEqual([{ type: "navigate", page: 3 }]); + // Should have woken up well before the 5s timeout + expect(elapsed).toBeLessThan(2000); + }); + + it("poll drains queue — second poll gets nothing", async () => { + await queue.enqueue("v1", { type: "navigate", page: 1 }); + + const first = await queue.poll("v1", { + timeoutMs: 100, + batchWaitMs: 10, + }); + expect(first).toHaveLength(1); + + const second = await queue.poll("v1", { + timeoutMs: 50, + batchWaitMs: 10, + }); + expect(second).toEqual([]); + }); + + it("different queueIds are independent", async () => { + await queue.enqueue("v1", { type: "navigate", page: 1 }); + await queue.enqueue("v2", { type: "navigate", page: 2 }); + + const cmds1 = await queue.poll("v1", { + timeoutMs: 100, + batchWaitMs: 10, + }); + const cmds2 = await queue.poll("v2", { + timeoutMs: 100, + batchWaitMs: 10, + }); + + expect(cmds1).toEqual([{ type: "navigate", page: 1 }]); + expect(cmds2).toEqual([{ type: "navigate", page: 2 }]); + }); + + it("enqueueBatch adds multiple commands at once", async () => { + await queue.enqueueBatch("v1", [ + { type: "navigate", page: 1 }, + { type: "navigate", page: 2 }, + { type: "navigate", page: 3 }, + ]); + + const cmds = await queue.poll("v1", { + timeoutMs: 100, + batchWaitMs: 10, + }); + expect(cmds).toHaveLength(3); + expect(cmds[0]).toEqual({ type: "navigate", page: 1 }); + expect(cmds[2]).toEqual({ type: "navigate", page: 3 }); + }); + + it("poll respects AbortSignal", async () => { + const controller = new AbortController(); + + // Abort quickly + setTimeout(() => controller.abort(), 30); + + const t0 = Date.now(); + const cmds = await queue.poll("v1", { + timeoutMs: 5000, + signal: controller.signal, + }); + const elapsed = Date.now() - t0; + + expect(cmds).toEqual([]); + expect(elapsed).toBeLessThan(2000); + }); + + it("sweep prunes stale queues", async () => { + // Create a queue with very short TTL + await queue.close(); + const ctx = makeQueue(); + queue = new CommandQueue({ + store: ctx.store, + notifier: ctx.notifier, + ttlMs: 10, + sweepIntervalMs: 999_999, // manual sweep + }); + + await queue.enqueue("v1", { type: "navigate", page: 1 }); + await sleep(50); + + const pruned = await queue.sweep(); + expect(pruned).toContain("v1"); + + // Queue should be empty after prune + const cmds = await queue.poll("v1", { + timeoutMs: 50, + batchWaitMs: 10, + }); + expect(cmds).toEqual([]); + }); + + it("touch prevents pruning", async () => { + await queue.close(); + const ctx = makeQueue(); + queue = new CommandQueue({ + store: ctx.store, + notifier: ctx.notifier, + ttlMs: 100, + sweepIntervalMs: 999_999, + }); + + await queue.enqueue("v1", { type: "navigate", page: 1 }); + await sleep(60); + await queue.touch("v1"); + await sleep(60); + + const pruned = await queue.sweep(); + expect(pruned).not.toContain("v1"); + }); + + it("onPrune callback fires with pruned IDs", async () => { + await queue.close(); + const ctx = makeQueue(); + queue = new CommandQueue({ + store: ctx.store, + notifier: ctx.notifier, + ttlMs: 10, + sweepIntervalMs: 999_999, + }); + + await queue.enqueue("v1", { type: "navigate", page: 1 }); + await queue.enqueue("v2", { type: "navigate", page: 2 }); + await sleep(50); + + const prunedIds: string[][] = []; + queue.onPrune((ids) => prunedIds.push(ids)); + + await queue.sweep(); + expect(prunedIds).toHaveLength(1); + expect(prunedIds[0]).toContain("v1"); + expect(prunedIds[0]).toContain("v2"); + }); + }); +} + +// ─── Run suite for Memory backend ─────────────────────────────────────────── + +suiteFor("CommandQueue (memory)", () => { + const store = new MemoryCommandStore(); + const notifier = new MemoryCommandNotifier(); + return { + store, + notifier, + queue: new CommandQueue({ + store, + notifier, + sweepIntervalMs: 999_999, // manual sweep in tests + }), + }; +}); + +// ─── Run suite for Redis backend ──────────────────────────────────────────── + +suiteFor("CommandQueue (redis mock)", () => { + const redis = new MockRedis(); + const store = new RedisCommandStore({ redis }); + const notifier = new RedisCommandNotifier(store, redis, { + pollIntervalMs: 50, + }); + return { + store, + notifier, + queue: new CommandQueue({ + store, + notifier, + sweepIntervalMs: 999_999, + }), + }; +}); + +// ─── Redis-specific tests ─────────────────────────────────────────────────── + +describe("RedisCommandNotifier pub/sub wake", () => { + it("wakes poll via publish when subscribe is available", async () => { + const redis = new MockRedis(); + const store = new RedisCommandStore({ redis }); + const notifier = new RedisCommandNotifier(store, redis); + const queue = new CommandQueue({ + store, + notifier, + sweepIntervalMs: 999_999, + }); + + const t0 = Date.now(); + const pollPromise = queue.poll("v1", { + timeoutMs: 5000, + batchWaitMs: 10, + }); + + await sleep(30); + await queue.enqueue("v1", { type: "navigate", page: 42 }); + + const cmds = await pollPromise; + const elapsed = Date.now() - t0; + + expect(cmds).toEqual([{ type: "navigate", page: 42 }]); + expect(elapsed).toBeLessThan(2000); + + await queue.close(); + }); +}); + +describe("RedisCommandNotifier polling fallback", () => { + it("wakes poll via polling when subscribe is not available", async () => { + const redis = new MockRedis(); + // Remove subscribe/publish to force polling mode + const limitedRedis: RedisLike = { + lpush: redis.lpush.bind(redis), + lrange: redis.lrange.bind(redis), + del: redis.del.bind(redis), + set: redis.set.bind(redis), + get: redis.get.bind(redis), + keys: redis.keys.bind(redis), + expire: redis.expire.bind(redis), + // No publish, no subscribe + }; + + const store = new RedisCommandStore({ redis: limitedRedis }); + const notifier = new RedisCommandNotifier(store, limitedRedis, { + pollIntervalMs: 50, + }); + const queue = new CommandQueue({ + store, + notifier, + sweepIntervalMs: 999_999, + }); + + const t0 = Date.now(); + const pollPromise = queue.poll("v1", { + timeoutMs: 5000, + batchWaitMs: 10, + }); + + await sleep(30); + await queue.enqueue("v1", { type: "navigate", page: 99 }); + + const cmds = await pollPromise; + const elapsed = Date.now() - t0; + + expect(cmds).toEqual([{ type: "navigate", page: 99 }]); + // Polling at 50ms intervals means ~80ms worst case + expect(elapsed).toBeLessThan(2000); + + await queue.close(); + }); +}); + +// ─── Default constructor (no options) ─────────────────────────────────────── + +describe("CommandQueue defaults", () => { + it("works with zero-arg constructor (memory backend)", async () => { + const queue = new CommandQueue(); + await queue.enqueue("v1", { type: "navigate", page: 1 }); + const cmds = await queue.poll("v1", { + timeoutMs: 100, + batchWaitMs: 10, + }); + expect(cmds).toEqual([{ type: "navigate", page: 1 }]); + await queue.close(); + }); +}); + +// ─── UpstashRestClient tests (via RedisCommandStore + fetch mock) ──────────── + +describe("RedisCommandStore with Upstash credentials", () => { + const calls: { url: string; init: RequestInit }[] = []; + let originalFetch: typeof globalThis.fetch; + let mockResponses: Array<{ result: unknown }>; + + beforeEach(() => { + calls.length = 0; + mockResponses = []; + originalFetch = globalThis.fetch; + globalThis.fetch = (async ( + input: string | URL | Request, + init?: RequestInit, + ) => { + const url = typeof input === "string" ? input : String(input); + calls.push({ url, init: init ?? {} }); + const body = mockResponses.shift() ?? { result: null }; + return new Response(JSON.stringify(body), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + }) as unknown as typeof fetch; + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + }); + + it("sends correct Authorization header and URL encoding", async () => { + mockResponses.push({ result: 1 }); // LPUSH response + mockResponses.push({ result: "OK" }); // SET response + + const store = new RedisCommandStore({ + url: "https://my-redis.upstash.io", + token: "my-secret-token", + }); + await store.push("view/1", ['{"type":"test"}']); + + expect(calls.length).toBeGreaterThanOrEqual(1); + expect(calls[0].url).toContain("https://my-redis.upstash.io/"); + expect(calls[0].url).toContain("LPUSH"); + expect(calls[0].init.headers).toEqual({ + Authorization: "Bearer my-secret-token", + }); + // Queue ID with "/" should be encoded + expect(calls[0].url).toContain("view%2F1"); + }); + + it("handles Upstash error responses", async () => { + globalThis.fetch = (async () => { + return new Response("Unauthorized", { status: 401 }); + }) as unknown as typeof fetch; + + const store = new RedisCommandStore({ + url: "https://my-redis.upstash.io", + token: "bad-token", + }); + + await expect(store.push("v1", ["cmd"])).rejects.toThrow( + "Upstash LPUSH: 401", + ); + }); + + it("popAll reverses LRANGE result for FIFO order", async () => { + // LRANGE returns newest-first (LPUSH order) + mockResponses.push({ + result: ['{"type":"b"}', '{"type":"a"}'], + }); + mockResponses.push({ result: 1 }); // DEL + + const store = new RedisCommandStore({ + url: "https://test.upstash.io", + token: "tok", + }); + const items = await store.popAll("v1"); + + // Should be reversed to FIFO: a first, then b + expect(items).toEqual(['{"type":"a"}', '{"type":"b"}']); + }); + + it("set with EX passes TTL arguments", async () => { + mockResponses.push({ result: "OK" }); + + const store = new RedisCommandStore({ + url: "https://test.upstash.io", + token: "tok", + }); + await store.touch("v1"); + + const setCall = calls.find((c) => c.url.includes("SET")); + expect(setCall).toBeDefined(); + expect(setCall!.url).toContain("EX"); + expect(setCall!.url).toContain("120"); // default TTL + }); +}); + +// ─── MCP tool integration test ────────────────────────────────────────────── + +describe("CommandQueue through MCP tool roundtrip", () => { + it("enqueue via one tool, poll via another (simulated)", async () => { + const queue = new CommandQueue(); + + // Simulate show_tool creating a view + const viewId = "test-view-123"; + await queue.touch(viewId); + + // Simulate interact_tool enqueuing commands + await queue.enqueue(viewId, { type: "navigate", page: 10 }); + await queue.enqueue(viewId, { type: "navigate", page: 20 }); + + // Simulate poll_tool draining + const cmds = await queue.poll(viewId, { + timeoutMs: 100, + batchWaitMs: 10, + }); + expect(cmds).toEqual([ + { type: "navigate", page: 10 }, + { type: "navigate", page: 20 }, + ]); + + // Second poll should be empty + const cmds2 = await queue.poll(viewId, { + timeoutMs: 50, + batchWaitMs: 10, + }); + expect(cmds2).toEqual([]); + + await queue.close(); + }); + + it("concurrent enqueue during poll wakes immediately", async () => { + const queue = new CommandQueue(); + const viewId = "concurrent-test"; + + const t0 = Date.now(); + + // Start poll (will block) + const pollPromise = queue.poll(viewId, { + timeoutMs: 5000, + batchWaitMs: 10, + }); + + // Enqueue from "another tool" after delay + await sleep(20); + await queue.enqueue(viewId, { type: "navigate", page: 42 }); + + const cmds = await pollPromise; + expect(cmds).toEqual([{ type: "navigate", page: 42 }]); + expect(Date.now() - t0).toBeLessThan(2000); + + await queue.close(); + }); +}); + +function sleep(ms: number): Promise { + return new Promise((r) => setTimeout(r, ms)); +} diff --git a/examples/pdf-server/src/command-queue.ts b/examples/pdf-server/src/command-queue.ts new file mode 100644 index 00000000..6931a6b5 --- /dev/null +++ b/examples/pdf-server/src/command-queue.ts @@ -0,0 +1,694 @@ +/** + * A generic command queue with long-polling support, for server↔viewer + * communication in MCP Apps. + * + * The server enqueues commands (e.g. "navigate to page 5", "fill form"); the + * viewer polls via {@link app!App.callServerTool `callServerTool`} and receives batches. + * + * Two built-in storage backends: + * + * - **Memory** (default) — zero-latency wake-on-enqueue, ideal for stdio / + * single-instance deployments. + * - **Redis** — cross-instance, works with Upstash REST on Vercel or any + * ioredis-compatible client. Enqueue notifications use a pub/sub channel + * when available, otherwise falls back to polling. + * + * @example + * ```ts + * import { CommandQueue } from "@modelcontextprotocol/ext-apps/server"; + * + * // In-memory (stdio / single-instance) + * const queue = new CommandQueue(); + * + * // Redis-backed (remote / serverless) + * const queue = new CommandQueue({ + * store: new RedisCommandStore({ + * url: process.env.UPSTASH_REDIS_REST_URL!, + * token: process.env.UPSTASH_REDIS_REST_TOKEN!, + * }), + * }); + * + * // Enqueue from a tool handler + * await queue.enqueue(viewUUID, { type: "navigate", page: 5 }); + * + * // Long-poll from a poll tool (blocks until commands arrive or timeout) + * const commands = await queue.poll(viewUUID); + * ``` + * + * @module command-queue + */ + +// ─── Store interface ───────────────────────────────────────────────────────── + +/** + * Pluggable storage backend for {@link CommandQueue}. + * + * Implementations must be safe to call concurrently from multiple async + * contexts (but not necessarily from multiple processes — that's what the + * Redis store is for). + */ +export interface CommandStore { + /** Append serialised commands to the queue for `queueId`. */ + push(queueId: string, items: string[]): Promise; + + /** + * Atomically drain all commands for `queueId`, returning them in FIFO + * order. Returns `[]` when empty. + */ + popAll(queueId: string): Promise; + + /** Return `true` if the queue has at least one command. */ + hasItems(queueId: string): Promise; + + /** Record activity for `queueId` (prevents TTL pruning). */ + touch(queueId: string): Promise; + + /** + * Remove queues whose last activity is older than `maxAgeMs`. + * Returns the IDs of pruned queues. + */ + prune(maxAgeMs: number): Promise; + + /** Release resources (timers, connections). */ + close(): Promise; +} + +// ─── Notifier interface ────────────────────────────────────────────────────── + +/** + * Optional real-time notification layer. When a notifier is present, + * {@link CommandQueue.poll} wakes immediately on enqueue instead of + * polling at intervals. + */ +export interface CommandNotifier { + /** Signal that `queueId` has new items. */ + notify(queueId: string): void; + + /** + * Block until `queueId` is notified or `timeoutMs` elapses. + * Returns `true` if woken by a notification, `false` on timeout. + * Must respect `signal` for cancellation. + */ + wait( + queueId: string, + timeoutMs: number, + signal?: AbortSignal, + ): Promise; + + /** Release resources. */ + close(): void; +} + +// ─── Memory implementations ───────────────────────────────────────────────── + +/** In-memory store — zero overhead, single-process only. */ +export class MemoryCommandStore implements CommandStore { + private queues = new Map(); + private activity = new Map(); + + async push(queueId: string, items: string[]): Promise { + let queue = this.queues.get(queueId); + if (!queue) { + queue = []; + this.queues.set(queueId, queue); + } + queue.push(...items); + this.activity.set(queueId, Date.now()); + } + + async popAll(queueId: string): Promise { + const queue = this.queues.get(queueId); + if (!queue || queue.length === 0) return []; + const items = queue.splice(0); + this.queues.delete(queueId); + return items; + } + + async hasItems(queueId: string): Promise { + const q = this.queues.get(queueId); + return !!q && q.length > 0; + } + + async touch(queueId: string): Promise { + this.activity.set(queueId, Date.now()); + } + + async prune(maxAgeMs: number): Promise { + const now = Date.now(); + const pruned: string[] = []; + for (const [id, ts] of this.activity) { + if (now - ts > maxAgeMs) { + this.activity.delete(id); + this.queues.delete(id); + pruned.push(id); + } + } + return pruned; + } + + async close(): Promise { + this.queues.clear(); + this.activity.clear(); + } +} + +/** + * In-memory notifier — resolves waiters synchronously on + * {@link MemoryCommandNotifier.notify notify()}, giving zero-latency + * wake-on-enqueue for single-process deployments. + * + * Only one waiter is active per queue ID at a time — a new `wait()` + * cancels the previous one (returns `false`). This matches the + * expected single-consumer-per-view pattern. + */ +export class MemoryCommandNotifier implements CommandNotifier { + /** Maps queueId → { wake(), cancel() } for the active waiter. */ + private waiters = new Map void; cancel: () => void }>(); + + notify(queueId: string): void { + const w = this.waiters.get(queueId); + if (w) { + this.waiters.delete(queueId); + w.wake(); + } + } + + wait( + queueId: string, + timeoutMs: number, + signal?: AbortSignal, + ): Promise { + return new Promise((resolve) => { + if (signal?.aborted) { + resolve(false); + return; + } + + // Cancel any existing waiter for this queue — only the latest + // poll should drain the queue. + const prev = this.waiters.get(queueId); + if (prev) prev.cancel(); + + const cleanup = () => { + clearTimeout(timer); + signal?.removeEventListener("abort", onAbort); + if (this.waiters.get(queueId)?.wake === wake) { + this.waiters.delete(queueId); + } + }; + + const wake = () => { + cleanup(); + resolve(true); + }; + + const cancel = () => { + cleanup(); + resolve(false); + }; + + const onAbort = () => cancel(); + + const timer = setTimeout(() => cancel(), timeoutMs); + + signal?.addEventListener("abort", onAbort); + this.waiters.set(queueId, { wake, cancel }); + }); + } + + close(): void { + for (const w of this.waiters.values()) w.cancel(); + this.waiters.clear(); + } +} + +// ─── Redis implementations ────────────────────────────────────────────────── + +/** + * Minimal interface for the Redis operations used by {@link RedisCommandStore}. + * + * Compatible with both Upstash REST (`@upstash/redis`) and ioredis-style + * clients. If you use a different Redis library, adapt it to this interface. + */ +export interface RedisLike { + /** LPUSH — prepend values to list (newest first). */ + lpush(key: string, ...values: string[]): Promise; + /** LRANGE — read range from list. */ + lrange(key: string, start: number, stop: number): Promise; + /** DEL — delete key(s). */ + del(...keys: string[]): Promise; + /** SET with optional EX (TTL in seconds). */ + set(key: string, value: string, options?: { ex?: number }): Promise; + /** GET — read string value. */ + get(key: string): Promise; + /** KEYS — pattern match (only used for pruning, not hot path). */ + keys(pattern: string): Promise; + /** EXPIRE — set TTL. */ + expire(key: string, seconds: number): Promise; + /** + * PUBLISH — optional, for real-time wake notifications. + * If not provided, {@link RedisCommandNotifier} falls back to polling. + */ + publish?(channel: string, message: string): Promise; + /** + * SUBSCRIBE — optional, for real-time wake notifications. + * Should return a cleanup function. + */ + subscribe?( + channel: string, + callback: (message: string) => void, + ): Promise<() => Promise>; +} + +/** + * Options for creating a {@link RedisCommandStore} from Upstash REST + * credentials (no persistent connection required — Vercel-safe). + */ +export interface UpstashCredentials { + url: string; + token: string; +} + +/** + * Minimal Upstash REST adapter implementing {@link RedisLike}. + * Uses `fetch` only — no persistent connections, safe for serverless. + */ +class UpstashRestClient implements RedisLike { + constructor(private creds: UpstashCredentials) {} + + private async cmd(...args: (string | number)[]): Promise { + const res = await fetch( + `${this.creds.url}/${args.map(encodeURIComponent).join("/")}`, + { + headers: { Authorization: `Bearer ${this.creds.token}` }, + method: args.length > 1 ? undefined : "GET", + }, + ); + if (!res.ok) { + throw new Error(`Upstash ${args[0]}: ${res.status} ${await res.text()}`); + } + const json = (await res.json()) as { result: T }; + return json.result; + } + + async lpush(key: string, ...values: string[]): Promise { + return this.cmd("LPUSH", key, ...values); + } + async lrange(key: string, start: number, stop: number): Promise { + return this.cmd("LRANGE", key, String(start), String(stop)); + } + async del(...keys: string[]): Promise { + return this.cmd("DEL", ...keys); + } + async set( + key: string, + value: string, + options?: { ex?: number }, + ): Promise { + if (options?.ex) { + return this.cmd("SET", key, value, "EX", options.ex); + } + return this.cmd("SET", key, value); + } + async get(key: string): Promise { + return this.cmd("GET", key); + } + async keys(pattern: string): Promise { + return this.cmd("KEYS", pattern); + } + async expire(key: string, seconds: number): Promise { + return this.cmd("EXPIRE", key, seconds); + } +} + +const REDIS_PREFIX = "mcp:cmdq:"; + +/** Redis-backed command store for cross-instance deployments. */ +export class RedisCommandStore implements CommandStore { + readonly redis: RedisLike; + + constructor(options: { redis: RedisLike } | UpstashCredentials) { + if ("redis" in options) { + this.redis = options.redis; + } else { + this.redis = new UpstashRestClient(options); + } + } + + private listKey(queueId: string): string { + return `${REDIS_PREFIX}list:${queueId}`; + } + private activityKey(queueId: string): string { + return `${REDIS_PREFIX}activity:${queueId}`; + } + + async push(queueId: string, items: string[]): Promise { + // LPUSH adds to head; we reverse so LRANGE 0 -1 returns FIFO order. + await this.redis.lpush(this.listKey(queueId), ...[...items].reverse()); + await this.redis.set(this.activityKey(queueId), String(Date.now()), { + ex: 120, + }); + } + + async popAll(queueId: string): Promise { + const key = this.listKey(queueId); + // Read all, then delete. Not truly atomic, but acceptable: + // worst case a concurrent push between LRANGE and DEL is lost, + // and the viewer will re-poll immediately. + const items = await this.redis.lrange(key, 0, -1); + if (items.length > 0) { + await this.redis.del(key); + } + // LRANGE returns newest-first (LPUSH order); reverse for FIFO. + return items.reverse(); + } + + async hasItems(queueId: string): Promise { + const items = await this.redis.lrange(this.listKey(queueId), 0, 0); + return items.length > 0; + } + + async touch(queueId: string): Promise { + await this.redis.set(this.activityKey(queueId), String(Date.now()), { + ex: 120, + }); + } + + async prune(maxAgeMs: number): Promise { + const now = Date.now(); + const keys = await this.redis.keys(`${REDIS_PREFIX}activity:*`); + const pruned: string[] = []; + for (const key of keys) { + const val = await this.redis.get(key); + if (val && now - Number(val) > maxAgeMs) { + const queueId = key.slice(`${REDIS_PREFIX}activity:`.length); + await this.redis.del(key, this.listKey(queueId)); + pruned.push(queueId); + } + } + return pruned; + } + + async close(): Promise { + // No persistent connections to close for REST-based clients. + } +} + +/** + * Redis-backed notifier. If the {@link RedisLike} client supports + * `publish` + `subscribe`, uses real-time pub/sub. Otherwise falls + * back to polling `hasItems` on the store at `pollIntervalMs`. + */ +export class RedisCommandNotifier implements CommandNotifier { + private store: CommandStore; + private redis: RedisLike; + private pollIntervalMs: number; + private cleanups: (() => Promise)[] = []; + + constructor( + store: CommandStore, + redis: RedisLike, + options?: { pollIntervalMs?: number }, + ) { + this.store = store; + this.redis = redis; + this.pollIntervalMs = options?.pollIntervalMs ?? 300; + } + + private channelKey(queueId: string): string { + return `${REDIS_PREFIX}notify:${queueId}`; + } + + notify(queueId: string): void { + if (this.redis.publish) { + void this.redis.publish(this.channelKey(queueId), "1"); + } + // If no publish, poll-based waiters will pick it up on next tick. + } + + async wait( + queueId: string, + timeoutMs: number, + signal?: AbortSignal, + ): Promise { + if (signal?.aborted) return false; + + // Try pub/sub first. + if (this.redis.subscribe) { + return this.waitPubSub(queueId, timeoutMs, signal); + } + // Fallback: poll store. + return this.waitPoll(queueId, timeoutMs, signal); + } + + private waitPubSub( + queueId: string, + timeoutMs: number, + signal?: AbortSignal, + ): Promise { + return new Promise((resolve) => { + let settled = false; + const settle = (val: boolean) => { + if (settled) return; + settled = true; + clearTimeout(timer); + signal?.removeEventListener("abort", onAbort); + void cleanupSub?.(); + resolve(val); + }; + + const timer = setTimeout(() => settle(false), timeoutMs); + const onAbort = () => settle(false); + signal?.addEventListener("abort", onAbort); + + let cleanupSub: (() => Promise) | undefined; + this.redis.subscribe!(this.channelKey(queueId), () => settle(true)) + .then((unsub) => { + cleanupSub = unsub; + if (settled) void unsub(); + }) + .catch(() => { + // Subscription failed — fall back to polling. + this.waitPoll(queueId, timeoutMs, signal).then(resolve); + }); + }); + } + + private waitPoll( + queueId: string, + timeoutMs: number, + signal?: AbortSignal, + ): Promise { + return new Promise((resolve) => { + const deadline = Date.now() + timeoutMs; + const check = () => { + if (signal?.aborted || Date.now() >= deadline) { + resolve(false); + return; + } + void this.store.hasItems(queueId).then((has) => { + if (has) { + resolve(true); + } else { + setTimeout(check, this.pollIntervalMs); + } + }); + }; + check(); + }); + } + + close(): void { + for (const fn of this.cleanups) void fn(); + this.cleanups = []; + } +} + +// ─── CommandQueue ──────────────────────────────────────────────────────────── + +/** Configuration for {@link CommandQueue}. */ +export interface CommandQueueOptions { + /** + * Storage backend. Defaults to {@link MemoryCommandStore}. + * Pass a {@link RedisCommandStore} (or any {@link CommandStore}) for + * cross-instance deployments. + */ + store?: CommandStore; + + /** + * Real-time notification layer. Defaults to + * {@link MemoryCommandNotifier} when `store` is memory, + * or {@link RedisCommandNotifier} when `store` is + * {@link RedisCommandStore}. + */ + notifier?: CommandNotifier; + + /** Max time (ms) a queue can be idle before pruning. Default: 60 000. */ + ttlMs?: number; + + /** Sweep interval (ms) for pruning stale queues. Default: 30 000. */ + sweepIntervalMs?: number; + + /** + * After commands are detected, wait this long (ms) for more to + * accumulate before returning the batch. Default: 200. + */ + batchWaitMs?: number; + + /** Max time (ms) to hold a long-poll open. Default: 30 000. */ + pollTimeoutMs?: number; +} + +/** + * Generic command queue with long-polling for MCP Apps server↔viewer + * communication. + * + * @typeParam T - The command type (e.g. `PdfCommand`). + */ +export class CommandQueue { + readonly store: CommandStore; + readonly notifier: CommandNotifier; + readonly ttlMs: number; + readonly batchWaitMs: number; + readonly pollTimeoutMs: number; + + private sweepTimer: ReturnType | undefined; + private onPruneCallbacks = new Set<(queueIds: string[]) => void>(); + + constructor(options?: CommandQueueOptions) { + const store = options?.store ?? new MemoryCommandStore(); + this.store = store; + + // Auto-create a matching notifier if not provided. + if (options?.notifier) { + this.notifier = options.notifier; + } else if (store instanceof RedisCommandStore) { + this.notifier = new RedisCommandNotifier(store, store.redis); + } else { + this.notifier = new MemoryCommandNotifier(); + } + + this.ttlMs = options?.ttlMs ?? 60_000; + this.batchWaitMs = options?.batchWaitMs ?? 200; + this.pollTimeoutMs = options?.pollTimeoutMs ?? 30_000; + + const sweepMs = options?.sweepIntervalMs ?? 30_000; + this.sweepTimer = setInterval(() => this.sweep(), sweepMs); + // Allow the process to exit without waiting for the sweep timer. + if (typeof this.sweepTimer === "object" && "unref" in this.sweepTimer) { + this.sweepTimer.unref(); + } + } + + /** + * Register a callback that fires when queues are pruned due to + * inactivity. Useful for cleaning up associated resources (e.g. + * file watchers, form field caches). + */ + onPrune(callback: (queueIds: string[]) => void): () => void { + this.onPruneCallbacks.add(callback); + return () => this.onPruneCallbacks.delete(callback); + } + + /** Enqueue a command for a given view/queue. */ + async enqueue(queueId: string, command: T): Promise { + const pushed = this.store.push(queueId, [JSON.stringify(command)]); + // Notify synchronously so in-memory waiters wake in the same tick. + this.notifier.notify(queueId); + await pushed; + } + + /** Enqueue multiple commands at once. */ + async enqueueBatch(queueId: string, commands: T[]): Promise { + if (commands.length === 0) return; + const pushed = this.store.push( + queueId, + commands.map((c) => JSON.stringify(c)), + ); + this.notifier.notify(queueId); + await pushed; + } + + /** + * Long-poll for commands. Blocks until commands are available or + * `pollTimeoutMs` elapses, then returns all accumulated commands. + * + * This is the method you call inside your `poll_*_commands` tool handler. + */ + async poll( + queueId: string, + options?: { + timeoutMs?: number; + batchWaitMs?: number; + signal?: AbortSignal; + }, + ): Promise { + const timeoutMs = options?.timeoutMs ?? this.pollTimeoutMs; + const batchWaitMs = options?.batchWaitMs ?? this.batchWaitMs; + + const signal = options?.signal; + + // If already aborted, return immediately without draining. + if (signal?.aborted) return []; + + // Touch on every poll to keep the queue alive. + await this.store.touch(queueId); + + const hasItems = await this.store.hasItems(queueId); + + if (hasItems) { + // Commands already queued — wait briefly to let more accumulate. + await sleep(batchWaitMs); + } else { + // Long-poll: wait for notification or timeout. + const woken = await this.notifier.wait(queueId, timeoutMs, signal); + if (signal?.aborted) return []; + if (!woken) { + // Timed out or cancelled — don't drain, just return empty. + return []; + } + // Woken: batch-wait for more commands to accumulate. + await sleep(batchWaitMs); + } + + const raw = await this.store.popAll(queueId); + return raw.map((s) => JSON.parse(s) as T); + } + + /** + * Record activity for a queue (prevents TTL pruning). + * Call this when a view is first created or on any interaction. + */ + async touch(queueId: string): Promise { + await this.store.touch(queueId); + } + + /** Run a pruning sweep (also runs automatically on the sweep timer). */ + async sweep(): Promise { + const pruned = await this.store.prune(this.ttlMs); + if (pruned.length > 0) { + for (const cb of this.onPruneCallbacks) { + try { + cb(pruned); + } catch { + // Don't let a bad callback break the sweep. + } + } + } + return pruned; + } + + /** Shut down: stop sweep timer, close store and notifier. */ + async close(): Promise { + if (this.sweepTimer !== undefined) { + clearInterval(this.sweepTimer); + this.sweepTimer = undefined; + } + this.notifier.close(); + await this.store.close(); + } +} + +function sleep(ms: number): Promise { + return new Promise((r) => setTimeout(r, ms)); +} diff --git a/examples/pdf-server/vercel.json b/examples/pdf-server/vercel.json new file mode 100644 index 00000000..6f5b699f --- /dev/null +++ b/examples/pdf-server/vercel.json @@ -0,0 +1,12 @@ +{ + "$schema": "https://openapi.vercel.sh/vercel.json", + "version": 2, + "buildCommand": "echo 'Using pre-built dist/'", + "functions": { + "api/mcp.mjs": { + "maxDuration": 60, + "includeFiles": "dist/**" + } + }, + "rewrites": [{ "source": "/(.*)", "destination": "/api/mcp" }] +} diff --git a/package-lock.json b/package-lock.json index cec8c63a..e740bc44 100644 --- a/package-lock.json +++ b/package-lock.json @@ -854,6 +854,41 @@ "dev": true, "license": "MIT" }, + "examples/stateful-server": { + "name": "@modelcontextprotocol/server-stateful-counter", + "version": "1.3.2", + "license": "MIT", + "dependencies": { + "@modelcontextprotocol/ext-apps": "^1.0.0", + "@modelcontextprotocol/sdk": "^1.24.0", + "zod": "^4.1.13" + }, + "bin": { + "mcp-server-stateful-counter": "dist/index.js" + }, + "devDependencies": { + "@types/node": "22.10.0", + "cross-env": "^10.1.0", + "typescript": "^5.9.3" + } + }, + "examples/stateful-server/node_modules/@types/node": { + "version": "22.10.0", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.10.0.tgz", + "integrity": "sha512-XC70cRZVElFHfIUB40FgZOBbgJYFKKMa5nb9lxcwYstFG/Mi+/Y0bGS+rs6Dmhmkpq4pnNiLiuZAbc02YCOnmA==", + "dev": true, + "license": "MIT", + "dependencies": { + "undici-types": "~6.20.0" + } + }, + "examples/stateful-server/node_modules/undici-types": { + "version": "6.20.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.20.0.tgz", + "integrity": "sha512-Ny6QZ2Nju20vw1SRHe3d9jVu6gJ+4e3+MMpqu7pqE5HT6WsTSlce++GQmK5UXS8mzV8DSYHrQH+Xrf2jVcuKNg==", + "dev": true, + "license": "MIT" + }, "examples/system-monitor-server": { "name": "@modelcontextprotocol/server-system-monitor", "version": "1.3.2", @@ -2673,6 +2708,10 @@ "resolved": "examples/sheet-music-server", "link": true }, + "node_modules/@modelcontextprotocol/server-stateful-counter": { + "resolved": "examples/stateful-server", + "link": true + }, "node_modules/@modelcontextprotocol/server-system-monitor": { "resolved": "examples/system-monitor-server", "link": true