diff --git a/.github/workflows/mpp-discovery-service-mcp.yml b/.github/workflows/mpp-discovery-service-mcp.yml deleted file mode 100644 index 58f1adcd..00000000 --- a/.github/workflows/mpp-discovery-service-mcp.yml +++ /dev/null @@ -1,45 +0,0 @@ -name: MPP Discovery Service MCP - -on: - schedule: - - cron: "0 15 * * *" - workflow_dispatch: - -permissions: - contents: read - -concurrency: - group: mpp-discovery-service-mcp - cancel-in-progress: false - -env: - MPP_DISCOVERY_MCP_URL: https://mpp.dev/mcp/services - -jobs: - health: - name: Production health - runs-on: ubuntu-latest - timeout-minutes: 10 - - steps: - - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 - with: - persist-credentials: false - - - uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6.4.0 - with: - node-version: "24" - - - name: Check Slack webhook - run: | - test -n "$SLACK_WEBHOOK_URL" || { - echo "SLACK_WEBHOOK_URL_ENG_DEVELOPERS_MONITOR is required" - exit 1 - } - env: - SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL_ENG_DEVELOPERS_MONITOR }} - - - name: Run health check - run: node scripts/check-mpp-discovery-service-mcp.mjs - env: - SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL_ENG_DEVELOPERS_MONITOR }} diff --git a/scripts/check-mpp-discovery-service-mcp.mjs b/scripts/check-mpp-discovery-service-mcp.mjs index bf628fb3..79cc108d 100644 --- a/scripts/check-mpp-discovery-service-mcp.mjs +++ b/scripts/check-mpp-discovery-service-mcp.mjs @@ -6,7 +6,6 @@ const DEFAULT_ENDPOINT = "https://mpp.dev/mcp/services"; const endpoint = normalizeEndpoint( process.env.MPP_DISCOVERY_MCP_URL ?? DEFAULT_ENDPOINT, ); -const slackWebhookUrl = process.env.SLACK_WEBHOOK_URL; const requestTimeoutMs = Number( process.env.MPP_DISCOVERY_MCP_TIMEOUT_MS ?? 30_000, ); @@ -317,7 +316,6 @@ async function main() { return "invalid enum rejected"; }); - await reportToSlack(); await writeStepSummary(); const failed = results.filter((result) => !result.ok); @@ -524,41 +522,6 @@ async function writeStepSummary() { await appendFile(process.env.GITHUB_STEP_SUMMARY, summary); } -async function reportToSlack() { - if (!slackWebhookUrl) { - console.log("SLACK_WEBHOOK_URL is not set; skipping Slack report."); - return; - } - const startedAt = Date.now(); - try { - const response = await fetchWithTimeout(slackWebhookUrl, { - method: "POST", - headers: { "content-type": "application/json" }, - body: JSON.stringify({ text: buildSummaryText() }), - }); - if (!response.ok) { - const text = await response.text(); - results.push({ - name: "Slack report", - ok: false, - detail: `Slack webhook returned HTTP ${response.status}: ${snippet(text)}`, - durationMs: Date.now() - startedAt, - }); - console.error(results.at(-1).detail); - return; - } - console.log("ok - Slack report"); - } catch (error) { - results.push({ - name: "Slack report", - ok: false, - detail: errorMessage(error), - durationMs: Date.now() - startedAt, - }); - console.error(results.at(-1).detail); - } -} - main().catch((error) => { console.error(errorMessage(error)); process.exitCode = 1; diff --git a/workers/mcp-services/README.md b/workers/mcp-services/README.md index 628146a6..2cb781bd 100644 --- a/workers/mcp-services/README.md +++ b/workers/mcp-services/README.md @@ -39,6 +39,7 @@ authoritative. - KV binding: `MPP_CATALOG_CACHE` - Cache key: `mpp:services:v1` - Hourly cron: `0 * * * *` +- Health cron: `* * * * *` - Requests use fresh KV data when it is less than one hour old. - If KV data is stale, requests serve the last-good cached catalog and refresh in the background. @@ -46,6 +47,54 @@ authoritative. and keeps the last-good KV value. - There is no public write, sync, registration, payment, or auth path. +## Datadog monitoring + +The Worker emits custom Datadog metrics directly from production. Runtime +request metrics are emitted with `ctx.waitUntil()` so user-facing MCP responses +do not wait on Datadog ingestion. + +The one-minute health cron calls the public endpoint +`https://mpp.dev/mcp/services`, then checks: + +- `GET /mcp/services` +- `HEAD /mcp/services` +- JSON-RPC `initialize` +- JSON-RPC `tools/list` +- JSON-RPC `tools/call` for `get_catalog_status` +- JSON-RPC `tools/call` for `search_services` + +Metrics use the `mpp.discovery_mcp.*` namespace. Important metrics include: + +- `mpp.discovery_mcp.http.request.count` +- `mpp.discovery_mcp.http.response.duration_ms` +- `mpp.discovery_mcp.http.error.count` +- `mpp.discovery_mcp.health.ok` +- `mpp.discovery_mcp.health.check.ok` +- `mpp.discovery_mcp.health.check.duration_ms` +- `mpp.discovery_mcp.catalog.services` +- `mpp.discovery_mcp.catalog.offers` +- `mpp.discovery_mcp.catalog.cache_age_seconds` +- `mpp.discovery_mcp.catalog.refresh.ok` +- `mpp.discovery_mcp.catalog.refresh.duration_ms` + +Production Worker vars: + +```text +DATADOG_ENABLED=true +DATADOG_ENV=production +DATADOG_SERVICE=mpp-discovery-service-mcp +DATADOG_SITE=us5.datadoghq.com +``` + +Production Worker secret: + +```text +DATADOG_API_KEY= +``` + +Only the Worker metrics API key is required by this package. Configure Datadog +notifications manually from these emitted metrics. + ## Tools All tool responses include `structuredContent`, an `outputSchema`, and a text diff --git a/workers/mcp-services/src/cache.ts b/workers/mcp-services/src/cache.ts index 10f73f0b..76abd08e 100644 --- a/workers/mcp-services/src/cache.ts +++ b/workers/mcp-services/src/cache.ts @@ -1,4 +1,4 @@ -import type { ServicesResponse } from "./types.js"; +import type { ServicesResponse, WorkerEnv } from "./types.js"; const CACHE_KEY = "mpp:services:v1"; const CACHE_MAX_AGE_MS = 60 * 60 * 1000; @@ -15,7 +15,7 @@ export type CatalogSnapshot = CachedCatalog & { }; export async function getCatalog( - env: Env, + env: WorkerEnv, ctx?: ExecutionContext, ): Promise { const cached = await readCachedCatalog(env); @@ -39,7 +39,7 @@ export async function getCatalog( return refreshCatalog(env); } -export async function refreshCatalog(env: Env): Promise { +export async function refreshCatalog(env: WorkerEnv): Promise { const sourceUrl = env.MPP_SERVICES_URL || DEFAULT_SERVICES_URL; const catalog = await fetchCatalog(sourceUrl); const cached: CachedCatalog = { @@ -51,7 +51,9 @@ export async function refreshCatalog(env: Env): Promise { return { ...cached, cacheStatus: "refreshed" }; } -async function readCachedCatalog(env: Env): Promise { +async function readCachedCatalog( + env: WorkerEnv, +): Promise { const cached = await env.MPP_CATALOG_CACHE.get(CACHE_KEY, "json"); if (isCachedCatalog(cached)) return cached; return undefined; diff --git a/workers/mcp-services/src/datadog.test.ts b/workers/mcp-services/src/datadog.test.ts new file mode 100644 index 00000000..b301cf8b --- /dev/null +++ b/workers/mcp-services/src/datadog.test.ts @@ -0,0 +1,75 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { datadogEnabled, gauge, postMetrics } from "./datadog.js"; +import type { WorkerEnv } from "./types.js"; + +describe("datadog metrics", () => { + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it("posts metrics to the configured Datadog site", async () => { + const requests: Request[] = []; + vi.stubGlobal( + "fetch", + vi.fn(async (input: RequestInfo, init?: RequestInit) => { + requests.push(new Request(input, init)); + return new Response("{}", { status: 202 }); + }), + ); + + await postMetrics(env(), [gauge("health.ok", 1, ["check:initialize"])]); + + expect(requests).toHaveLength(1); + expect(requests[0].url).toBe("https://api.us5.datadoghq.com/api/v1/series"); + expect(requests[0].headers.get("DD-API-KEY")).toBe("dd-key"); + const body = (await requests[0].json()) as { + series: Array<{ + metric: string; + points: [[number, number]]; + tags: string[]; + }>; + }; + expect(body.series[0]).toEqual( + expect.objectContaining({ + metric: "mpp.discovery_mcp.health.ok", + points: [[expect.any(Number), 1]], + tags: [ + "service:mpp-discovery-service-mcp", + "env:production", + "check:initialize", + ], + }), + ); + }); + + it("skips posting when disabled", async () => { + const fetch = vi.fn(); + vi.stubGlobal("fetch", fetch); + + await postMetrics( + { + ...env(), + DATADOG_ENABLED: "false", + }, + [gauge("health.ok", 1)], + ); + + expect(fetch).not.toHaveBeenCalled(); + }); + + it("requires explicit enablement or an API key", () => { + expect(datadogEnabled({} as WorkerEnv)).toBe(false); + expect(datadogEnabled({ DATADOG_ENABLED: "true" } as WorkerEnv)).toBe(true); + expect(datadogEnabled({ DATADOG_API_KEY: "key" } as WorkerEnv)).toBe(true); + }); +}); + +function env(): WorkerEnv { + return { + DATADOG_API_KEY: "dd-key", + DATADOG_ENABLED: "true", + DATADOG_ENV: "production", + DATADOG_SERVICE: "mpp-discovery-service-mcp", + DATADOG_SITE: "us5.datadoghq.com", + } as WorkerEnv; +} diff --git a/workers/mcp-services/src/datadog.ts b/workers/mcp-services/src/datadog.ts new file mode 100644 index 00000000..8941a91a --- /dev/null +++ b/workers/mcp-services/src/datadog.ts @@ -0,0 +1,112 @@ +import type { WorkerEnv } from "./types.js"; + +type MetricType = "count" | "gauge"; + +export type MetricPoint = { + metric: string; + value: number; + type: MetricType; + tags?: string[]; +}; + +const DEFAULT_DATADOG_SITE = "us5.datadoghq.com"; +const METRIC_PREFIX = "mpp.discovery_mcp"; +const DEFAULT_SERVICE = "mpp-discovery-service-mcp"; +const DEFAULT_ENV = "production"; + +export function gauge( + name: string, + value: number, + tags?: string[], +): MetricPoint { + return { metric: `${METRIC_PREFIX}.${name}`, type: "gauge", value, tags }; +} + +export function count(name: string, value = 1, tags?: string[]): MetricPoint { + return { metric: `${METRIC_PREFIX}.${name}`, type: "count", value, tags }; +} + +export function queueMetrics( + ctx: ExecutionContext, + env: WorkerEnv, + metrics: MetricPoint[], +): void { + if (!datadogEnabled(env) || metrics.length === 0) return; + + ctx.waitUntil( + postMetrics(env, metrics).catch((error) => { + console.error( + JSON.stringify({ + message: "datadog.metrics_failed", + error: errorMessage(error), + }), + ); + }), + ); +} + +export async function postMetrics( + env: WorkerEnv, + metrics: MetricPoint[], +): Promise { + if (!datadogEnabled(env) || metrics.length === 0) return; + + if (!env.DATADOG_API_KEY) { + console.warn( + JSON.stringify({ + message: "datadog.metrics_skipped", + reason: "missing_api_key", + }), + ); + return; + } + + const timestamp = Math.floor(Date.now() / 1000); + const baseTags = [ + `service:${env.DATADOG_SERVICE || DEFAULT_SERVICE}`, + `env:${env.DATADOG_ENV || DEFAULT_ENV}`, + ]; + const response = await fetch(`${apiBase(env)}/api/v1/series`, { + method: "POST", + headers: { + "content-type": "application/json", + "DD-API-KEY": env.DATADOG_API_KEY, + }, + body: JSON.stringify({ + series: metrics.map((metric) => ({ + metric: metric.metric, + type: metric.type, + points: [[timestamp, metric.value]], + tags: [...baseTags, ...(metric.tags ?? [])], + })), + }), + }); + + if (!response.ok) { + throw new Error(`Datadog metrics API failed: ${response.status}`); + } +} + +export function datadogEnabled(env: WorkerEnv): boolean { + const configured = normalized(env.DATADOG_ENABLED); + if (configured === "true") return true; + if (configured === "false") return false; + return Boolean(env.DATADOG_API_KEY); +} + +function apiBase(env: WorkerEnv): string { + const site = env.DATADOG_SITE || DEFAULT_DATADOG_SITE; + if (/^https?:\/\//i.test(site)) return site.replace(/\/+$/, ""); + if (site.startsWith("api.")) return `https://${site}`; + return `https://api.${site}`; +} + +function normalized(value: string | undefined): string { + return String(value ?? "") + .trim() + .toLowerCase(); +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/workers/mcp-services/src/health.test.ts b/workers/mcp-services/src/health.test.ts new file mode 100644 index 00000000..7e7f3c52 --- /dev/null +++ b/workers/mcp-services/src/health.test.ts @@ -0,0 +1,142 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { healthMetrics } from "./health.js"; +import type { WorkerEnv } from "./types.js"; + +const endpoint = "https://mpp.dev/mcp/services"; + +describe("public health check", () => { + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it("emits healthy metrics for representative MCP checks", async () => { + vi.stubGlobal("fetch", vi.fn(healthyFetch)); + + const metrics = await healthMetrics(env()); + + expect(metricValue(metrics, "mpp.discovery_mcp.health.ok")).toBe(1); + expect(metricValue(metrics, "mpp.discovery_mcp.catalog.services")).toBe( + 137, + ); + expect(metricValue(metrics, "mpp.discovery_mcp.catalog.offers")).toBe(1200); + expect(metricValue(metrics, "mpp.discovery_mcp.tools.advertised")).toBe(11); + }); + + it("emits unhealthy metrics when a public check fails", async () => { + vi.stubGlobal( + "fetch", + vi.fn(async (input: RequestInfo, init?: RequestInit) => { + const request = new Request(input, init); + if (request.method === "GET") { + return new Response("missing", { status: 503 }); + } + return healthyFetch(input, init); + }), + ); + + const metrics = await healthMetrics(env()); + + expect(metricValue(metrics, "mpp.discovery_mcp.health.ok")).toBe(0); + expect(metricValue(metrics, "mpp.discovery_mcp.health.failure.count")).toBe( + 1, + ); + expect( + metrics.find( + (metric) => + metric.metric === "mpp.discovery_mcp.health.check.ok" && + metric.tags?.includes("check:get_card"), + )?.value, + ).toBe(0); + }); +}); + +async function healthyFetch( + input: RequestInfo, + init?: RequestInit, +): Promise { + const request = new Request(input, init); + if (request.url !== endpoint) + throw new Error(`unexpected url ${request.url}`); + + if (request.method === "GET") { + return Response.json({ + serverInfo: { name: "mpp-services-mcp" }, + transport: { endpoint }, + instructions: "Runtime 402 Challenges are authoritative.", + }); + } + + if (request.method === "HEAD") { + return new Response(null, { status: 200 }); + } + + const body = (await request.json()) as { + method: string; + params?: { name?: string }; + }; + if (body.method === "initialize") { + return rpcResult({ + serverInfo: { name: "mpp-services-mcp" }, + instructions: "Runtime 402 Challenges are authoritative.", + }); + } + if (body.method === "tools/list") { + return rpcResult({ + tools: [ + { name: "list_services" }, + { name: "search_services" }, + { name: "search_offers" }, + { name: "recommend_services" }, + { name: "get_usage_recipe" }, + { name: "get_facets" }, + { name: "get_services_by_recipient" }, + { name: "get_catalog_status" }, + { name: "get_service" }, + { name: "get_offers" }, + { name: "get_openapi" }, + ], + }); + } + if ( + body.method === "tools/call" && + body.params?.name === "get_catalog_status" + ) { + return rpcResult({ + structuredContent: { + serviceCount: 137, + offerCount: 1200, + cacheAgeSeconds: 60, + }, + }); + } + if (body.method === "tools/call" && body.params?.name === "search_services") { + return rpcResult({ + structuredContent: { + total: 27, + returned: 1, + }, + }); + } + + return Response.json( + { jsonrpc: "2.0", id: 1, error: { code: -32601, message: "missing" } }, + { status: 200 }, + ); +} + +function rpcResult(result: unknown): Response { + return Response.json({ jsonrpc: "2.0", id: 1, result }); +} + +function metricValue( + metrics: Array<{ metric: string; value: number }>, + name: string, +) { + return metrics.find((metric) => metric.metric === name)?.value; +} + +function env(): WorkerEnv { + return { + PUBLIC_MCP_ENDPOINT: endpoint, + } as WorkerEnv; +} diff --git a/workers/mcp-services/src/health.ts b/workers/mcp-services/src/health.ts new file mode 100644 index 00000000..958ca63a --- /dev/null +++ b/workers/mcp-services/src/health.ts @@ -0,0 +1,244 @@ +import { gauge, type MetricPoint } from "./datadog.js"; +import type { WorkerEnv } from "./types.js"; + +type JsonObject = Record; +type HealthResult = { + name: string; + ok: boolean; + durationMs: number; + error?: string; + metrics?: MetricPoint[]; +}; +type CheckFn = () => Promise; + +const DEFAULT_ENDPOINT = "https://mpp.dev/mcp/services"; +const MAX_CACHE_AGE_SECONDS = 3 * 60 * 60; +const MIN_SERVICE_COUNT = 100; +const MIN_OFFER_COUNT = 1000; +const REQUIRED_TOOLS = [ + "list_services", + "search_services", + "get_catalog_status", +]; + +export async function healthMetrics(env: WorkerEnv): Promise { + const endpoint = env.PUBLIC_MCP_ENDPOINT || DEFAULT_ENDPOINT; + const checks = await runChecks(endpoint); + const failures = checks.filter((check) => !check.ok); + const metrics = [ + gauge("health.ok", failures.length === 0 ? 1 : 0, ["endpoint:public"]), + gauge("health.failure.count", failures.length, ["endpoint:public"]), + ]; + + for (const check of checks) { + const tags = [`check:${check.name}`, "endpoint:public"]; + metrics.push( + gauge("health.check.ok", check.ok ? 1 : 0, tags), + gauge("health.check.duration_ms", check.durationMs, tags), + ...(check.metrics ?? []), + ); + } + + if (failures.length > 0) { + console.error( + JSON.stringify({ + message: "mcp.health_failed", + endpoint, + failures: failures.map((failure) => ({ + check: failure.name, + error: failure.error, + })), + }), + ); + } + + return metrics; +} + +async function runChecks(endpoint: string): Promise { + const checks: Array<[string, CheckFn]> = [ + ["get_card", () => assertServerCard(endpoint)], + ["head", () => assertHead(endpoint)], + ["initialize", () => assertInitialize(endpoint)], + ["tools_list", () => checkTools(endpoint)], + ["get_catalog_status", () => checkCatalog(endpoint)], + ["search_services", () => assertSearch(endpoint)], + ]; + const results: HealthResult[] = []; + for (const [name, fn] of checks) results.push(await measure(name, fn)); + return results; +} + +async function measure(name: string, fn: CheckFn): Promise { + const startedAt = Date.now(); + try { + const metrics = await fn(); + return { + name, + ok: true, + durationMs: Date.now() - startedAt, + ...(metrics ? { metrics } : {}), + }; + } catch (error) { + return { + name, + ok: false, + durationMs: Date.now() - startedAt, + error: errorMessage(error), + }; + } +} + +async function assertServerCard(endpoint: string): Promise { + const card = await fetchJson(endpoint, { + headers: { accept: "application/json" }, + }); + if (stringValue(object(card.serverInfo).name) !== "mpp-services-mcp") { + throw new Error("server card name mismatch"); + } + if (stringValue(object(card.transport).endpoint) !== endpoint) { + throw new Error("server card endpoint mismatch"); + } + if (!stringValue(card.instructions).includes("402")) { + throw new Error("server card missing 402 instructions"); + } + return undefined; +} + +async function assertHead(endpoint: string): Promise { + const response = await fetch(endpoint, { method: "HEAD" }); + if (response.status !== 200) { + throw new Error(`HEAD expected 200, received ${response.status}`); + } + return undefined; +} + +async function assertInitialize(endpoint: string): Promise { + const result = await rpc(endpoint, "initialize"); + if (stringValue(object(result.serverInfo).name) !== "mpp-services-mcp") { + throw new Error("initialize serverInfo mismatch"); + } + if (!stringValue(result.instructions).includes("402")) { + throw new Error("initialize missing 402 instructions"); + } + return undefined; +} + +async function checkTools(endpoint: string): Promise { + const result = await rpc(endpoint, "tools/list"); + const tools = arrayValue(result.tools); + const names = new Set( + tools + .map((tool) => object(tool).name) + .filter((name): name is string => typeof name === "string"), + ); + for (const tool of REQUIRED_TOOLS) { + if (!names.has(tool)) throw new Error(`missing tool ${tool}`); + } + return [gauge("tools.advertised", tools.length)]; +} + +async function checkCatalog(endpoint: string): Promise { + const content = object( + (await callTool(endpoint, "get_catalog_status", {})).structuredContent, + ); + const serviceCount = numberValue(content.serviceCount); + const offerCount = numberValue(content.offerCount); + const cacheAgeSeconds = numberValue(content.cacheAgeSeconds); + + if (serviceCount < MIN_SERVICE_COUNT) { + throw new Error(`serviceCount below ${MIN_SERVICE_COUNT}`); + } + if (offerCount < MIN_OFFER_COUNT) { + throw new Error(`offerCount below ${MIN_OFFER_COUNT}`); + } + if (cacheAgeSeconds > MAX_CACHE_AGE_SECONDS) { + throw new Error(`cacheAgeSeconds above ${MAX_CACHE_AGE_SECONDS}`); + } + + return [ + gauge("catalog.services", serviceCount), + gauge("catalog.offers", offerCount), + gauge("catalog.cache_age_seconds", cacheAgeSeconds), + ]; +} + +async function assertSearch(endpoint: string): Promise { + const content = object( + (await callTool(endpoint, "search_services", { category: "ai", limit: 1 })) + .structuredContent, + ); + if (numberValue(content.total) <= 0 || numberValue(content.returned) <= 0) { + throw new Error("expected at least one returned ai service"); + } + return undefined; +} + +async function callTool( + endpoint: string, + name: string, + args: JsonObject, +): Promise { + const result = await rpc(endpoint, "tools/call", { + name, + arguments: args, + }); + if (result.isError === true) + throw new Error(`tool returned isError: ${name}`); + return result; +} + +async function rpc( + endpoint: string, + method: string, + params: JsonObject = {}, +): Promise { + const body = await fetchJson(endpoint, { + method: "POST", + headers: { + accept: "application/json", + "content-type": "application/json", + }, + body: JSON.stringify({ jsonrpc: "2.0", id: 1, method, params }), + }); + if (body.error) { + throw new Error( + `json-rpc error: ${stringValue(object(body.error).message)}`, + ); + } + return object(body.result); +} + +async function fetchJson( + endpoint: string, + init?: RequestInit, +): Promise { + const response = await fetch(endpoint, init); + if (response.status !== 200) { + throw new Error(`expected 200, received ${response.status}`); + } + return object(await response.json()); +} + +function object(value: unknown): JsonObject { + if (typeof value === "object" && value !== null && !Array.isArray(value)) { + return value as JsonObject; + } + return {}; +} + +function arrayValue(value: unknown): unknown[] { + return Array.isArray(value) ? value : []; +} + +function stringValue(value: unknown): string { + return typeof value === "string" ? value : ""; +} + +function numberValue(value: unknown): number { + return typeof value === "number" && Number.isFinite(value) ? value : 0; +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/workers/mcp-services/src/index.test.ts b/workers/mcp-services/src/index.test.ts index 72ed535f..999a821c 100644 --- a/workers/mcp-services/src/index.test.ts +++ b/workers/mcp-services/src/index.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import type { CachedCatalog } from "./cache.js"; import worker from "./index.js"; import type { Service, WorkerEnv } from "./types.js"; @@ -18,6 +18,10 @@ const services: Service[] = [ ]; describe("worker routes", () => { + afterEach(() => { + vi.unstubAllGlobals(); + }); + it("serves the services MCP card from /mcp/services", async () => { const response = await worker.fetch( new Request("https://worker.example.com/mcp/services"), @@ -87,6 +91,66 @@ describe("worker routes", () => { expect(response.status).toBe(404); }); + + it("does not block MCP responses on Datadog request metrics", async () => { + vi.stubGlobal( + "fetch", + vi.fn(async () => { + throw new Error("Datadog unavailable"); + }), + ); + + const response = await worker.fetch( + new Request("https://worker.example.com/mcp/services", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + jsonrpc: "2.0", + id: 1, + method: "initialize", + params: {}, + }), + }), + { + ...envWithCatalog(), + DATADOG_API_KEY: "dd-key", + DATADOG_ENABLED: "true", + }, + testContext(), + ); + + expect(response.status).toBe(200); + }); +}); + +describe("scheduled health", () => { + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it("runs public health checks and posts Datadog metrics", async () => { + const datadogBodies: unknown[] = []; + vi.stubGlobal( + "fetch", + vi.fn(async (input: RequestInfo, init?: RequestInit) => { + const request = new Request(input, init); + if (request.url === "https://api.us5.datadoghq.com/api/v1/series") { + datadogBodies.push(await request.json()); + return new Response("{}", { status: 202 }); + } + return healthyPublicEndpoint(request); + }), + ); + + const ctx = collectingContext(); + await worker.scheduled?.(scheduled("* * * * *"), datadogEnv(), ctx); + await ctx.drain(); + + expect(datadogBodies).toHaveLength(1); + expect(JSON.stringify(datadogBodies[0])).toContain( + "mpp.discovery_mcp.health.ok", + ); + }); }); function envWithCatalog(): WorkerEnv { @@ -99,6 +163,7 @@ function envWithCatalog(): WorkerEnv { return { MPP_SERVICES_URL: "https://mpp.dev/api/services", PUBLIC_MCP_ENDPOINT: "https://mpp.dev/mcp/services", + DATADOG_ENABLED: "false", MPP_CATALOG_CACHE: { async get() { return catalog; @@ -115,3 +180,104 @@ function testContext(): ExecutionContext { props: {}, } as unknown as ExecutionContext; } + +function datadogEnv(): WorkerEnv { + return { + ...envWithCatalog(), + DATADOG_API_KEY: "dd-key", + DATADOG_ENABLED: "true", + DATADOG_ENV: "production", + DATADOG_SERVICE: "mpp-discovery-service-mcp", + DATADOG_SITE: "us5.datadoghq.com", + }; +} + +function collectingContext(): ExecutionContext & { drain(): Promise } { + const promises: Promise[] = []; + return { + waitUntil(promise: Promise) { + promises.push(promise); + }, + passThroughOnException() {}, + props: {}, + async drain() { + for (let index = 0; index < promises.length; index += 1) { + await promises[index]; + } + }, + } as unknown as ExecutionContext & { drain(): Promise }; +} + +function scheduled(cron: string): ScheduledController { + return { + cron, + scheduledTime: Date.now(), + noRetry() {}, + } as unknown as ScheduledController; +} + +async function healthyPublicEndpoint(request: Request): Promise { + if (request.url !== "https://mpp.dev/mcp/services") { + throw new Error(`unexpected url ${request.url}`); + } + + if (request.method === "GET") { + return Response.json({ + serverInfo: { name: "mpp-services-mcp" }, + transport: { endpoint: "https://mpp.dev/mcp/services" }, + instructions: "Runtime 402 Challenges are authoritative.", + }); + } + + if (request.method === "HEAD") { + return new Response(null, { status: 200 }); + } + + const body = (await request.json()) as { + method: string; + params?: { name?: string }; + }; + if (body.method === "initialize") { + return rpcResult({ + serverInfo: { name: "mpp-services-mcp" }, + instructions: "Runtime 402 Challenges are authoritative.", + }); + } + if (body.method === "tools/list") { + return rpcResult({ + tools: [ + { name: "list_services" }, + { name: "search_services" }, + { name: "get_catalog_status" }, + ], + }); + } + if ( + body.method === "tools/call" && + body.params?.name === "get_catalog_status" + ) { + return rpcResult({ + structuredContent: { + serviceCount: 137, + offerCount: 1200, + cacheAgeSeconds: 60, + }, + }); + } + if (body.method === "tools/call" && body.params?.name === "search_services") { + return rpcResult({ + structuredContent: { + total: 27, + returned: 1, + }, + }); + } + return Response.json( + { jsonrpc: "2.0", id: 1, error: { code: -32601, message: "missing" } }, + { status: 200 }, + ); +} + +function rpcResult(result: unknown): Response { + return Response.json({ jsonrpc: "2.0", id: 1, result }); +} diff --git a/workers/mcp-services/src/index.ts b/workers/mcp-services/src/index.ts index 0c50080a..83292b0d 100644 --- a/workers/mcp-services/src/index.ts +++ b/workers/mcp-services/src/index.ts @@ -1,4 +1,13 @@ import { refreshCatalog } from "./cache.js"; +import { + count, + gauge, + type MetricPoint, + postMetrics, + queueMetrics, +} from "./datadog.js"; +import { countPaymentOffers } from "./discovery.js"; +import { healthMetrics } from "./health.js"; import { handleMcp, jsonHeaders, optionsResponse, serverCard } from "./mcp.js"; import type { WorkerEnv } from "./types.js"; @@ -11,78 +20,236 @@ export default { const url = new URL(request.url); const publicEndpoint = publicMcpEndpoint(url, env); - if (request.method === "OPTIONS") return optionsResponse(); + return trackRequest( + request, + env, + ctx, + routeName(url.pathname, request.method), + async () => { + if (request.method === "OPTIONS") return optionsResponse(); - if ( - (url.pathname === "/mcp/services" || - url.pathname === "/mcp" || - url.pathname === "/") && - request.method === "POST" - ) { - return handleMcp(request, env, ctx); - } + if ( + (url.pathname === "/mcp/services" || + url.pathname === "/mcp" || + url.pathname === "/") && + request.method === "POST" + ) { + return handleMcp(request, env, ctx); + } - if (url.pathname === "/" && request.method === "GET") { - return Response.json( - { - name: "mpp-services-mcp", - mcp: publicEndpoint, - serverCard: publicEndpoint, - description: - "Read-only MCP server for the MPP service discovery catalog.", - }, - { headers: jsonHeaders() }, - ); - } + if (url.pathname === "/" && request.method === "GET") { + return Response.json( + { + name: "mpp-services-mcp", + mcp: publicEndpoint, + serverCard: publicEndpoint, + description: + "Read-only MCP server for the MPP service discovery catalog.", + }, + { headers: jsonHeaders() }, + ); + } - if (url.pathname === "/mcp/services" && request.method === "GET") { - return Response.json(serverCard(publicEndpoint), { - headers: jsonHeaders(), - }); - } + if (url.pathname === "/mcp/services" && request.method === "GET") { + return Response.json(serverCard(publicEndpoint), { + headers: jsonHeaders(), + }); + } - if (url.pathname === "/mcp/services" && request.method === "HEAD") { - return new Response(null, { status: 200, headers: jsonHeaders() }); - } + if (url.pathname === "/mcp/services" && request.method === "HEAD") { + return new Response(null, { status: 200, headers: jsonHeaders() }); + } - return Response.json( - { error: "not found" }, - { status: 404, headers: jsonHeaders() }, + return Response.json( + { error: "not found" }, + { status: 404, headers: jsonHeaders() }, + ); + }, ); }, - async scheduled(event: ScheduledController, env: Env, ctx: ExecutionContext) { - const startedAt = Date.now(); - ctx.waitUntil( - refreshCatalog(env) - .then((catalog) => { - console.log( - JSON.stringify({ - message: "catalog.refresh_complete", - cron: event.cron, - scheduledTime: new Date(event.scheduledTime).toISOString(), - durationMs: Date.now() - startedAt, - version: catalog.version, - services: catalog.services.length, - fetchedAt: catalog.fetchedAt, - }), - ); - }) - .catch((error) => { - console.error( - JSON.stringify({ - message: "catalog.refresh_failed", - cron: event.cron, - scheduledTime: new Date(event.scheduledTime).toISOString(), - durationMs: Date.now() - startedAt, - error: error instanceof Error ? error.message : String(error), - }), - ); - }), - ); + async scheduled( + event: ScheduledController, + env: WorkerEnv, + ctx: ExecutionContext, + ) { + if (event.cron === "* * * * *") { + ctx.waitUntil(runScheduledHealthCheck(event, env)); + return; + } + + ctx.waitUntil(runCatalogRefresh(event, env)); }, } satisfies ExportedHandler; +async function trackRequest( + request: Request, + env: WorkerEnv, + ctx: ExecutionContext, + route: string, + handler: () => Promise, +): Promise { + const startedAt = Date.now(); + try { + const response = await handler(); + queueMetrics( + ctx, + env, + requestMetrics(request, route, response.status, Date.now() - startedAt), + ); + return response; + } catch (error) { + queueMetrics( + ctx, + env, + requestMetrics(request, route, 500, Date.now() - startedAt, true), + ); + throw error; + } +} + function publicMcpEndpoint(url: URL, env: WorkerEnv): string { return env.PUBLIC_MCP_ENDPOINT || `${url.origin}/mcp/services`; } + +async function runScheduledHealthCheck( + event: ScheduledController, + env: WorkerEnv, +): Promise { + const startedAt = Date.now(); + await postMetrics(env, await healthMetrics(env)).catch(logMetricsError); + console.log( + JSON.stringify({ + message: "mcp.health_complete", + cron: event.cron, + scheduledTime: new Date(event.scheduledTime).toISOString(), + durationMs: Date.now() - startedAt, + }), + ); +} + +async function runCatalogRefresh( + event: ScheduledController, + env: WorkerEnv, +): Promise { + const startedAt = Date.now(); + try { + const catalog = await refreshCatalog(env); + await postMetrics( + env, + catalogRefreshMetrics({ + ok: true, + durationMs: Date.now() - startedAt, + services: catalog.services.length, + offers: countPaymentOffers(catalog.services), + }), + ).catch(logMetricsError); + console.log( + JSON.stringify({ + message: "catalog.refresh_complete", + cron: event.cron, + scheduledTime: new Date(event.scheduledTime).toISOString(), + durationMs: Date.now() - startedAt, + version: catalog.version, + services: catalog.services.length, + fetchedAt: catalog.fetchedAt, + }), + ); + } catch (error) { + await postMetrics( + env, + catalogRefreshMetrics({ + ok: false, + durationMs: Date.now() - startedAt, + }), + ).catch(logMetricsError); + console.error( + JSON.stringify({ + message: "catalog.refresh_failed", + cron: event.cron, + scheduledTime: new Date(event.scheduledTime).toISOString(), + durationMs: Date.now() - startedAt, + error: errorMessage(error), + }), + ); + } +} + +function routeName(pathname: string, method: string): string { + if (method === "OPTIONS") return "options"; + if ( + method === "POST" && + (pathname === "/mcp/services" || pathname === "/mcp" || pathname === "/") + ) { + return "mcp"; + } + if (pathname === "/mcp/services") return "mcp_services_card"; + if (pathname === "/") return "root"; + return "not_found"; +} + +function requestMetrics( + request: Request, + route: string, + status: number, + durationMs: number, + thrown = false, +): MetricPoint[] { + const tags = [ + `route:${route}`, + `method:${request.method}`, + `status_class:${statusClass(status)}`, + ]; + return [ + count("http.request.count", 1, tags), + gauge("http.response.duration_ms", durationMs, tags), + ...(thrown || status >= 500 + ? [count("http.error.count", 1, [...tags, "error_class:server"])] + : []), + ]; +} + +function catalogRefreshMetrics({ + ok, + durationMs, + services, + offers, +}: { + ok: boolean; + durationMs: number; + services?: number; + offers?: number; +}): MetricPoint[] { + return [ + gauge("catalog.refresh.ok", ok ? 1 : 0), + gauge("catalog.refresh.duration_ms", durationMs), + ...(ok && services !== undefined && offers !== undefined + ? [ + gauge("catalog.services", services), + gauge("catalog.offers", offers), + gauge("catalog.cache_age_seconds", 0), + ] + : []), + ]; +} + +function statusClass(status: number): string { + if (status >= 500) return "5xx"; + if (status >= 400) return "4xx"; + if (status >= 300) return "3xx"; + if (status >= 200) return "2xx"; + return "other"; +} + +function logMetricsError(error: unknown): void { + console.error( + JSON.stringify({ + message: "datadog.metrics_failed", + error: errorMessage(error), + }), + ); +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/workers/mcp-services/src/mcp.ts b/workers/mcp-services/src/mcp.ts index e9fe24d3..0ea51450 100644 --- a/workers/mcp-services/src/mcp.ts +++ b/workers/mcp-services/src/mcp.ts @@ -13,7 +13,13 @@ import { searchServices, servicesByRecipient, } from "./discovery.js"; -import { CATEGORIES, INTEGRATIONS, type Service, STATUSES } from "./types.js"; +import { + CATEGORIES, + INTEGRATIONS, + type Service, + STATUSES, + type WorkerEnv, +} from "./types.js"; const PROTOCOL_VERSION = "2025-06-18"; const SERVER_VERSION = "1.0.0"; @@ -126,7 +132,7 @@ type FetchedOpenApi = { export async function handleMcp( request: Request, - env: Env, + env: WorkerEnv, ctx: ExecutionContext, ): Promise { let payload: unknown; @@ -198,7 +204,7 @@ export function optionsResponse(): Response { async function handleMessage( request: JsonRpcRequest | undefined, - env: Env, + env: WorkerEnv, ctx: ExecutionContext, ): Promise { if (request?.jsonrpc !== "2.0" || typeof request.method !== "string") { @@ -239,7 +245,7 @@ async function handleMessage( async function handleToolCall( params: ToolCallParams, - env: Env, + env: WorkerEnv, ctx: ExecutionContext, ) { const name = typeof params.name === "string" ? params.name : ""; diff --git a/workers/mcp-services/src/types.ts b/workers/mcp-services/src/types.ts index 9a888c46..ee45071f 100644 --- a/workers/mcp-services/src/types.ts +++ b/workers/mcp-services/src/types.ts @@ -72,6 +72,18 @@ export interface ServicesResponse { services: Service[]; } -export type WorkerEnv = Env & { +export type WorkerEnv = Omit< + Env, + | "DATADOG_ENABLED" + | "DATADOG_ENV" + | "DATADOG_SERVICE" + | "DATADOG_SITE" + | "PUBLIC_MCP_ENDPOINT" +> & { + DATADOG_API_KEY?: string; + DATADOG_ENABLED?: string; + DATADOG_ENV?: string; + DATADOG_SERVICE?: string; + DATADOG_SITE?: string; PUBLIC_MCP_ENDPOINT?: string; }; diff --git a/workers/mcp-services/wrangler.jsonc b/workers/mcp-services/wrangler.jsonc index 4f074255..06ca31ca 100644 --- a/workers/mcp-services/wrangler.jsonc +++ b/workers/mcp-services/wrangler.jsonc @@ -17,7 +17,7 @@ } }, "triggers": { - "crons": ["0 * * * *"] + "crons": ["* * * * *", "0 * * * *"] }, "kv_namespaces": [ { @@ -26,6 +26,10 @@ } ], "vars": { + "DATADOG_ENABLED": "true", + "DATADOG_ENV": "production", + "DATADOG_SERVICE": "mpp-discovery-service-mcp", + "DATADOG_SITE": "us5.datadoghq.com", "MPP_SERVICES_URL": "https://mpp.dev/api/services", "PUBLIC_MCP_ENDPOINT": "https://mpp.dev/mcp/services" }