From f9943e169d09c5c6e6d188a89f2d20c26ae82f79 Mon Sep 17 00:00:00 2001 From: Parv Ahuja Date: Mon, 22 Jun 2026 22:10:55 -0700 Subject: [PATCH 1/4] Add Datadog metrics for MPP Discovery Service MCP --- .../workflows/mpp-discovery-service-mcp.yml | 45 --- package.json | 1 + .../provision-mpp-discovery-service-mcp.mjs | 143 ++++++++ workers/mcp-services/README.md | 61 ++++ workers/mcp-services/src/cache.ts | 10 +- workers/mcp-services/src/datadog.test.ts | 78 +++++ workers/mcp-services/src/datadog.ts | 107 ++++++ workers/mcp-services/src/health.test.ts | 142 ++++++++ workers/mcp-services/src/health.ts | 322 ++++++++++++++++++ workers/mcp-services/src/index.test.ts | 168 ++++++++- workers/mcp-services/src/index.ts | 157 +++++++-- workers/mcp-services/src/mcp.ts | 14 +- workers/mcp-services/src/request-metrics.ts | 195 +++++++++++ workers/mcp-services/src/types.ts | 14 +- workers/mcp-services/wrangler.jsonc | 6 +- 15 files changed, 1374 insertions(+), 89 deletions(-) delete mode 100644 .github/workflows/mpp-discovery-service-mcp.yml create mode 100644 scripts/datadog/provision-mpp-discovery-service-mcp.mjs create mode 100644 workers/mcp-services/src/datadog.test.ts create mode 100644 workers/mcp-services/src/datadog.ts create mode 100644 workers/mcp-services/src/health.test.ts create mode 100644 workers/mcp-services/src/health.ts create mode 100644 workers/mcp-services/src/request-metrics.ts diff --git a/.github/workflows/mpp-discovery-service-mcp.yml b/.github/workflows/mpp-discovery-service-mcp.yml deleted file mode 100644 index 58f1adcd..00000000 --- a/.github/workflows/mpp-discovery-service-mcp.yml +++ /dev/null @@ -1,45 +0,0 @@ -name: MPP Discovery Service MCP - -on: - schedule: - - cron: "0 15 * * *" - workflow_dispatch: - -permissions: - contents: read - -concurrency: - group: mpp-discovery-service-mcp - cancel-in-progress: false - -env: - MPP_DISCOVERY_MCP_URL: https://mpp.dev/mcp/services - -jobs: - health: - name: Production health - runs-on: ubuntu-latest - timeout-minutes: 10 - - steps: - - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 - with: - persist-credentials: false - - - uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6.4.0 - with: - node-version: "24" - - - name: Check Slack webhook - run: | - test -n "$SLACK_WEBHOOK_URL" || { - echo "SLACK_WEBHOOK_URL_ENG_DEVELOPERS_MONITOR is required" - exit 1 - } - env: - SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL_ENG_DEVELOPERS_MONITOR }} - - - name: Run health check - run: node scripts/check-mpp-discovery-service-mcp.mjs - env: - SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL_ENG_DEVELOPERS_MONITOR }} diff --git a/package.json b/package.json index 1b8fea74..b7ffaf79 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,7 @@ "dev": "node scripts/generate-discovery.ts && vite dev", "generate:discovery": "node scripts/generate-discovery.ts", "generate:pages": "node scripts/generate-pages.ts", + "mcp-services:datadog:provision": "node scripts/datadog/provision-mpp-discovery-service-mcp.mjs", "mcp-services:check": "pnpm --filter mpp-services-mcp run check", "mcp-services:deploy": "pnpm --filter mpp-services-mcp run deploy", "preinstall": "pnpx only-allow pnpm", diff --git a/scripts/datadog/provision-mpp-discovery-service-mcp.mjs b/scripts/datadog/provision-mpp-discovery-service-mcp.mjs new file mode 100644 index 00000000..d5a5a443 --- /dev/null +++ b/scripts/datadog/provision-mpp-discovery-service-mcp.mjs @@ -0,0 +1,143 @@ +const DEFAULT_SITE = "us5.datadoghq.com"; +const DEFAULT_SERVICE = "mpp-discovery-service-mcp"; +const DEFAULT_ENV = "production"; +const DEFAULT_ALERT_TARGET = "@slack-eng-developers-monitor"; + +const apiKey = requiredEnv("DATADOG_API_KEY"); +const appKey = requiredEnv("DATADOG_APP_KEY"); +const site = process.env.DATADOG_SITE || DEFAULT_SITE; +const service = process.env.DATADOG_SERVICE || DEFAULT_SERVICE; +const env = process.env.DATADOG_ENV || DEFAULT_ENV; +const alertTarget = process.env.DATADOG_ALERT_TARGET || DEFAULT_ALERT_TARGET; +const apiBase = datadogApiBase(site); +const metricFilter = `service:${service},env:${env}`; +const monitorTags = [`service:${service}`, `env:${env}`, "managed_by:repo"]; + +const monitors = [ + { + name: "MPP Discovery Service MCP health failing", + type: "metric alert", + query: `avg(last_3m):avg:mpp.discovery_mcp.health.ok{${metricFilter}} < 1`, + message: [ + `MPP Discovery Service MCP public health is failing. ${alertTarget}`, + "", + "The Cloudflare cron check calls https://mpp.dev/mcp/services and validates the MCP card, initialize, tools/list, catalog status, and search_services.", + ].join("\n"), + options: { + thresholds: { critical: 1 }, + notify_no_data: true, + no_data_timeframe: 5, + renotify_interval: 60, + include_tags: true, + }, + }, + { + name: "MPP Discovery Service MCP catalog cache stale", + type: "metric alert", + query: `avg(last_5m):max:mpp.discovery_mcp.catalog.cache_age_seconds{${metricFilter}} > 10800`, + message: `MPP Discovery Service MCP catalog cache is older than 3 hours. ${alertTarget}`, + options: { + thresholds: { critical: 10800 }, + notify_no_data: false, + renotify_interval: 60, + include_tags: true, + }, + }, + { + name: "MPP Discovery Service MCP service count low", + type: "metric alert", + query: `avg(last_15m):min:mpp.discovery_mcp.catalog.services{${metricFilter}} < 100`, + message: `MPP Discovery Service MCP catalog has fewer than 100 services. ${alertTarget}`, + options: { + thresholds: { critical: 100 }, + notify_no_data: false, + renotify_interval: 60, + include_tags: true, + }, + }, + { + name: "MPP Discovery Service MCP offer count low", + type: "metric alert", + query: `avg(last_15m):min:mpp.discovery_mcp.catalog.offers{${metricFilter}} < 1000`, + message: `MPP Discovery Service MCP catalog has fewer than 1000 payment offers. ${alertTarget}`, + options: { + thresholds: { critical: 1000 }, + notify_no_data: false, + renotify_interval: 60, + include_tags: true, + }, + }, + { + name: "MPP Discovery Service MCP server errors", + type: "metric alert", + query: `sum(last_5m):sum:mpp.discovery_mcp.mcp.error.count{${metricFilter},error_class:server}.as_count() > 0`, + message: `MPP Discovery Service MCP is returning server-side MCP errors. ${alertTarget}`, + options: { + thresholds: { critical: 0 }, + notify_no_data: false, + renotify_interval: 60, + include_tags: true, + }, + }, +]; + +const existing = await listManagedMonitors(); +for (const monitor of monitors) { + const match = existing.find((item) => item.name === monitor.name); + if (match) { + await datadogRequest(`/api/v1/monitor/${match.id}`, { + method: "PUT", + body: { ...monitor, tags: monitorTags }, + }); + console.log(`Updated monitor ${monitor.name} (${match.id})`); + } else { + const created = await datadogRequest("/api/v1/monitor", { + method: "POST", + body: { ...monitor, tags: monitorTags }, + }); + console.log(`Created monitor ${monitor.name} (${created.id})`); + } +} + +async function listManagedMonitors() { + const query = new URLSearchParams({ + monitor_tags: "managed_by:repo", + name: "MPP Discovery Service MCP", + }); + return datadogRequest(`/api/v1/monitor?${query.toString()}`, { + method: "GET", + }); +} + +async function datadogRequest(path, { method, body }) { + const response = await fetch(`${apiBase}${path}`, { + method, + headers: { + accept: "application/json", + "content-type": "application/json", + "DD-API-KEY": apiKey, + "DD-APPLICATION-KEY": appKey, + }, + body: body ? JSON.stringify(body) : undefined, + }); + const text = await response.text(); + const payload = text ? JSON.parse(text) : undefined; + if (!response.ok) { + throw new Error( + `Datadog API ${method} ${path} failed: ${response.status} ${text}`, + ); + } + return payload; +} + +function datadogApiBase(value) { + if (/^https?:\/\//i.test(value)) return value.replace(/\/+$/, ""); + if (value.startsWith("api.")) return `https://${value}`; + return `https://api.${value}`; +} + +function requiredEnv(name) { + const value = process.env[name]; + if (!value) throw new Error(`${name} is required`); + return value; +} diff --git a/workers/mcp-services/README.md b/workers/mcp-services/README.md index 628146a6..5f4a6e3e 100644 --- a/workers/mcp-services/README.md +++ b/workers/mcp-services/README.md @@ -39,6 +39,7 @@ authoritative. - KV binding: `MPP_CATALOG_CACHE` - Cache key: `mpp:services:v1` - Hourly cron: `0 * * * *` +- Health cron: `* * * * *` - Requests use fresh KV data when it is less than one hour old. - If KV data is stale, requests serve the last-good cached catalog and refresh in the background. @@ -46,6 +47,66 @@ authoritative. and keeps the last-good KV value. - There is no public write, sync, registration, payment, or auth path. +## Datadog monitoring + +The Worker emits custom Datadog metrics directly from production. Runtime +request metrics are emitted with `ctx.waitUntil()` so user-facing MCP responses +do not wait on Datadog ingestion. + +The one-minute health cron calls the public endpoint +`https://mpp.dev/mcp/services`, then checks: + +- `GET /mcp/services` +- `HEAD /mcp/services` +- JSON-RPC `initialize` +- JSON-RPC `tools/list` +- JSON-RPC `tools/call` for `get_catalog_status` +- JSON-RPC `tools/call` for `search_services` + +Metrics use the `mpp.discovery_mcp.*` namespace. Important metrics include: + +- `mpp.discovery_mcp.http.request.count` +- `mpp.discovery_mcp.http.response.duration_ms` +- `mpp.discovery_mcp.mcp.request.count` +- `mpp.discovery_mcp.mcp.response.duration_ms` +- `mpp.discovery_mcp.mcp.error.count` +- `mpp.discovery_mcp.health.ok` +- `mpp.discovery_mcp.health.check.ok` +- `mpp.discovery_mcp.health.check.duration_ms` +- `mpp.discovery_mcp.catalog.services` +- `mpp.discovery_mcp.catalog.offers` +- `mpp.discovery_mcp.catalog.cache_age_seconds` +- `mpp.discovery_mcp.catalog.refresh.ok` + +Production Worker vars: + +```text +DATADOG_ENABLED=true +DATADOG_ENV=production +DATADOG_SERVICE=mpp-discovery-service-mcp +DATADOG_SITE=us5.datadoghq.com +``` + +Production Worker secret: + +```text +DATADOG_API_KEY= +``` + +`DATADOG_APP_KEY` must not be configured on the Worker. It is only used outside +runtime to provision monitors: + +```bash +DATADOG_API_KEY= \ +DATADOG_APP_KEY= \ +DATADOG_ALERT_TARGET=@slack-eng-developers-monitor \ +pnpm mcp-services:datadog:provision +``` + +The provisioner creates or updates Datadog monitors for missing health metrics, +failed health checks, stale catalog cache, low service/offer counts, and +server-side MCP errors. Alerts route through Datadog's Slack integration. + ## Tools All tool responses include `structuredContent`, an `outputSchema`, and a text diff --git a/workers/mcp-services/src/cache.ts b/workers/mcp-services/src/cache.ts index 10f73f0b..76abd08e 100644 --- a/workers/mcp-services/src/cache.ts +++ b/workers/mcp-services/src/cache.ts @@ -1,4 +1,4 @@ -import type { ServicesResponse } from "./types.js"; +import type { ServicesResponse, WorkerEnv } from "./types.js"; const CACHE_KEY = "mpp:services:v1"; const CACHE_MAX_AGE_MS = 60 * 60 * 1000; @@ -15,7 +15,7 @@ export type CatalogSnapshot = CachedCatalog & { }; export async function getCatalog( - env: Env, + env: WorkerEnv, ctx?: ExecutionContext, ): Promise { const cached = await readCachedCatalog(env); @@ -39,7 +39,7 @@ export async function getCatalog( return refreshCatalog(env); } -export async function refreshCatalog(env: Env): Promise { +export async function refreshCatalog(env: WorkerEnv): Promise { const sourceUrl = env.MPP_SERVICES_URL || DEFAULT_SERVICES_URL; const catalog = await fetchCatalog(sourceUrl); const cached: CachedCatalog = { @@ -51,7 +51,9 @@ export async function refreshCatalog(env: Env): Promise { return { ...cached, cacheStatus: "refreshed" }; } -async function readCachedCatalog(env: Env): Promise { +async function readCachedCatalog( + env: WorkerEnv, +): Promise { const cached = await env.MPP_CATALOG_CACHE.get(CACHE_KEY, "json"); if (isCachedCatalog(cached)) return cached; return undefined; diff --git a/workers/mcp-services/src/datadog.test.ts b/workers/mcp-services/src/datadog.test.ts new file mode 100644 index 00000000..933ef1a3 --- /dev/null +++ b/workers/mcp-services/src/datadog.test.ts @@ -0,0 +1,78 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { datadogEnabled, metricName, sendDatadogMetrics } from "./datadog.js"; +import type { WorkerEnv } from "./types.js"; + +describe("datadog metrics", () => { + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it("posts metrics to the configured Datadog site", async () => { + const requests: Request[] = []; + vi.stubGlobal( + "fetch", + vi.fn(async (input: RequestInfo, init?: RequestInit) => { + requests.push(new Request(input, init)); + return new Response("{}", { status: 202 }); + }), + ); + + await sendDatadogMetrics(env(), [ + { + metric: metricName("health.ok"), + type: "gauge", + value: 1, + tags: ["check:initialize"], + }, + ]); + + expect(requests).toHaveLength(1); + expect(requests[0].url).toBe("https://api.us5.datadoghq.com/api/v1/series"); + expect(requests[0].headers.get("DD-API-KEY")).toBe("dd-key"); + const body = (await requests[0].json()) as { + series: Array<{ + metric: string; + points: [[number, number]]; + tags: string[]; + }>; + }; + expect(body.series[0]).toEqual( + expect.objectContaining({ + metric: "mpp.discovery_mcp.health.ok", + points: [[expect.any(Number), 1]], + tags: [ + "service:mpp-discovery-service-mcp", + "env:production", + "check:initialize", + ], + }), + ); + }); + + it("skips posting when disabled", async () => { + const fetch = vi.fn(); + vi.stubGlobal("fetch", fetch); + + await sendDatadogMetrics({ ...env(), DATADOG_ENABLED: "false" }, [ + { metric: metricName("health.ok"), type: "gauge", value: 1 }, + ]); + + expect(fetch).not.toHaveBeenCalled(); + }); + + it("requires explicit enablement or an API key", () => { + expect(datadogEnabled({} as WorkerEnv)).toBe(false); + expect(datadogEnabled({ DATADOG_ENABLED: "true" } as WorkerEnv)).toBe(true); + expect(datadogEnabled({ DATADOG_API_KEY: "key" } as WorkerEnv)).toBe(true); + }); +}); + +function env(): WorkerEnv { + return { + DATADOG_API_KEY: "dd-key", + DATADOG_ENABLED: "true", + DATADOG_ENV: "production", + DATADOG_SERVICE: "mpp-discovery-service-mcp", + DATADOG_SITE: "us5.datadoghq.com", + } as WorkerEnv; +} diff --git a/workers/mcp-services/src/datadog.ts b/workers/mcp-services/src/datadog.ts new file mode 100644 index 00000000..eccc391c --- /dev/null +++ b/workers/mcp-services/src/datadog.ts @@ -0,0 +1,107 @@ +import type { WorkerEnv } from "./types.js"; + +type MetricType = "count" | "gauge"; + +export type DatadogMetric = { + metric: string; + value: number; + type: MetricType; + tags?: string[]; +}; + +const DEFAULT_DATADOG_SITE = "us5.datadoghq.com"; +const METRIC_PREFIX = "mpp.discovery_mcp"; + +export function metricName(name: string): string { + return `${METRIC_PREFIX}.${name}`; +} + +export function emitDatadogMetrics( + env: WorkerEnv, + ctx: ExecutionContext, + metrics: DatadogMetric[], +): void { + if (metrics.length === 0 || !datadogEnabled(env)) return; + + ctx.waitUntil( + sendDatadogMetrics(env, metrics).catch((error) => { + console.error( + JSON.stringify({ + message: "datadog.metrics_failed", + error: errorMessage(error), + }), + ); + }), + ); +} + +export async function sendDatadogMetrics( + env: WorkerEnv, + metrics: DatadogMetric[], +): Promise { + if (metrics.length === 0 || !datadogEnabled(env)) return; + + const apiKey = env.DATADOG_API_KEY; + if (!apiKey) { + console.warn( + JSON.stringify({ + message: "datadog.metrics_skipped", + reason: "missing_api_key", + }), + ); + return; + } + + const timestamp = Math.floor(Date.now() / 1000); + const tags = baseTags(env); + const response = await fetch(`${datadogApiBase(env)}/api/v1/series`, { + method: "POST", + headers: { + "content-type": "application/json", + "DD-API-KEY": apiKey, + }, + body: JSON.stringify({ + series: metrics.map((metric) => ({ + metric: metric.metric, + type: metric.type, + points: [[timestamp, metric.value]], + tags: [...tags, ...(metric.tags ?? [])], + })), + }), + }); + + if (!response.ok) { + throw new Error(`Datadog metrics API failed: ${response.status}`); + } +} + +export function datadogEnabled(env: WorkerEnv): boolean { + const configured = normalized(env.DATADOG_ENABLED); + if (configured === "true") return true; + if (configured === "false") return false; + return Boolean(env.DATADOG_API_KEY); +} + +function datadogApiBase(env: WorkerEnv): string { + const site = env.DATADOG_SITE || DEFAULT_DATADOG_SITE; + if (/^https?:\/\//i.test(site)) return site.replace(/\/+$/, ""); + if (site.startsWith("api.")) return `https://${site}`; + return `https://api.${site}`; +} + +function baseTags(env: WorkerEnv): string[] { + return [ + `service:${env.DATADOG_SERVICE || "mpp-discovery-service-mcp"}`, + `env:${env.DATADOG_ENV || "production"}`, + ]; +} + +function normalized(value: string | undefined): string { + return String(value ?? "") + .trim() + .toLowerCase(); +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/workers/mcp-services/src/health.test.ts b/workers/mcp-services/src/health.test.ts new file mode 100644 index 00000000..0e5d2f80 --- /dev/null +++ b/workers/mcp-services/src/health.test.ts @@ -0,0 +1,142 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { runPublicHealthCheck } from "./health.js"; +import type { WorkerEnv } from "./types.js"; + +const endpoint = "https://mpp.dev/mcp/services"; + +describe("public health check", () => { + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it("emits healthy metrics for representative MCP checks", async () => { + vi.stubGlobal("fetch", vi.fn(healthyFetch)); + + const metrics = await runPublicHealthCheck(env()); + + expect(metricValue(metrics, "mpp.discovery_mcp.health.ok")).toBe(1); + expect(metricValue(metrics, "mpp.discovery_mcp.catalog.services")).toBe( + 137, + ); + expect(metricValue(metrics, "mpp.discovery_mcp.catalog.offers")).toBe(1200); + expect(metricValue(metrics, "mpp.discovery_mcp.tools.advertised")).toBe(11); + }); + + it("emits unhealthy metrics when a public check fails", async () => { + vi.stubGlobal( + "fetch", + vi.fn(async (input: RequestInfo, init?: RequestInit) => { + const request = new Request(input, init); + if (request.method === "GET") { + return new Response("missing", { status: 503 }); + } + return healthyFetch(input, init); + }), + ); + + const metrics = await runPublicHealthCheck(env()); + + expect(metricValue(metrics, "mpp.discovery_mcp.health.ok")).toBe(0); + expect(metricValue(metrics, "mpp.discovery_mcp.health.failure.count")).toBe( + 1, + ); + expect( + metrics.find( + (metric) => + metric.metric === "mpp.discovery_mcp.health.check.ok" && + metric.tags?.includes("check:get_card"), + )?.value, + ).toBe(0); + }); +}); + +async function healthyFetch( + input: RequestInfo, + init?: RequestInit, +): Promise { + const request = new Request(input, init); + if (request.url !== endpoint) + throw new Error(`unexpected url ${request.url}`); + + if (request.method === "GET") { + return Response.json({ + serverInfo: { name: "mpp-services-mcp" }, + transport: { endpoint }, + instructions: "Runtime 402 Challenges are authoritative.", + }); + } + + if (request.method === "HEAD") { + return new Response(null, { status: 200 }); + } + + const body = (await request.json()) as { + method: string; + params?: { name?: string }; + }; + if (body.method === "initialize") { + return rpcResult({ + serverInfo: { name: "mpp-services-mcp" }, + instructions: "Runtime 402 Challenges are authoritative.", + }); + } + if (body.method === "tools/list") { + return rpcResult({ + tools: [ + { name: "list_services" }, + { name: "search_services" }, + { name: "search_offers" }, + { name: "recommend_services" }, + { name: "get_usage_recipe" }, + { name: "get_facets" }, + { name: "get_services_by_recipient" }, + { name: "get_catalog_status" }, + { name: "get_service" }, + { name: "get_offers" }, + { name: "get_openapi" }, + ], + }); + } + if ( + body.method === "tools/call" && + body.params?.name === "get_catalog_status" + ) { + return rpcResult({ + structuredContent: { + serviceCount: 137, + offerCount: 1200, + cacheAgeSeconds: 60, + }, + }); + } + if (body.method === "tools/call" && body.params?.name === "search_services") { + return rpcResult({ + structuredContent: { + total: 27, + returned: 1, + }, + }); + } + + return Response.json( + { jsonrpc: "2.0", id: 1, error: { code: -32601, message: "missing" } }, + { status: 200 }, + ); +} + +function rpcResult(result: unknown): Response { + return Response.json({ jsonrpc: "2.0", id: 1, result }); +} + +function metricValue( + metrics: Array<{ metric: string; value: number }>, + name: string, +) { + return metrics.find((metric) => metric.metric === name)?.value; +} + +function env(): WorkerEnv { + return { + PUBLIC_MCP_ENDPOINT: endpoint, + } as WorkerEnv; +} diff --git a/workers/mcp-services/src/health.ts b/workers/mcp-services/src/health.ts new file mode 100644 index 00000000..dcd3516b --- /dev/null +++ b/workers/mcp-services/src/health.ts @@ -0,0 +1,322 @@ +import { type DatadogMetric, metricName } from "./datadog.js"; +import type { WorkerEnv } from "./types.js"; + +type JsonObject = Record; + +type HealthCheckResult = { + name: string; + ok: boolean; + durationMs: number; + error?: string; + metrics?: DatadogMetric[]; +}; + +const REQUEST_TIMEOUT_MS = 10000; +const MAX_CACHE_AGE_SECONDS = 3 * 60 * 60; +const MIN_SERVICE_COUNT = 100; +const MIN_OFFER_COUNT = 1000; + +export async function runPublicHealthCheck( + env: WorkerEnv, +): Promise { + const endpoint = env.PUBLIC_MCP_ENDPOINT || "https://mpp.dev/mcp/services"; + const results = await sequentialChecks([ + () => checkGetCard(endpoint), + () => checkHead(endpoint), + () => checkInitialize(endpoint), + () => checkToolsList(endpoint), + () => checkCatalogStatus(endpoint), + () => checkSearchServices(endpoint), + ]); + + const failures = results.filter((result) => !result.ok); + const metrics: DatadogMetric[] = [ + { + metric: metricName("health.ok"), + type: "gauge", + value: failures.length === 0 ? 1 : 0, + tags: ["endpoint:public"], + }, + { + metric: metricName("health.failure.count"), + type: "gauge", + value: failures.length, + tags: ["endpoint:public"], + }, + ]; + + for (const result of results) { + metrics.push( + { + metric: metricName("health.check.ok"), + type: "gauge", + value: result.ok ? 1 : 0, + tags: [`check:${result.name}`, "endpoint:public"], + }, + { + metric: metricName("health.check.duration_ms"), + type: "gauge", + value: result.durationMs, + tags: [`check:${result.name}`, "endpoint:public"], + }, + ...(result.metrics ?? []), + ); + } + + if (failures.length > 0) { + console.error( + JSON.stringify({ + message: "mcp.health_failed", + endpoint, + failures: failures.map((failure) => ({ + check: failure.name, + error: failure.error, + })), + }), + ); + } + + return metrics; +} + +async function sequentialChecks( + checks: Array<() => Promise>, +): Promise { + const results: HealthCheckResult[] = []; + for (const check of checks) { + results.push(await check()); + } + return results; +} + +async function checkGetCard(endpoint: string): Promise { + return measured("get_card", async () => { + const response = await fetchWithTimeout(endpoint, { + headers: { accept: "application/json" }, + }); + if (response.status !== 200) { + throw new Error(`expected 200, received ${response.status}`); + } + + const body = await response.json(); + const card = object(body); + const transport = object(card.transport); + const instructions = stringValue(card.instructions); + if (stringValue(object(card.serverInfo).name) !== "mpp-services-mcp") { + throw new Error("server card name mismatch"); + } + if (stringValue(transport.endpoint) !== endpoint) { + throw new Error("server card endpoint mismatch"); + } + if (!instructions.includes("402")) { + throw new Error("server card is missing payment authority instructions"); + } + }); +} + +async function checkHead(endpoint: string): Promise { + return measured("head", async () => { + const response = await fetchWithTimeout(endpoint, { method: "HEAD" }); + if (response.status !== 200) { + throw new Error(`expected 200, received ${response.status}`); + } + }); +} + +async function checkInitialize(endpoint: string): Promise { + return measured("initialize", async () => { + const result = await rpc(endpoint, "initialize", {}); + if (stringValue(object(result.serverInfo).name) !== "mpp-services-mcp") { + throw new Error("initialize serverInfo mismatch"); + } + if (!stringValue(result.instructions).includes("402")) { + throw new Error("initialize instructions missing 402 authority text"); + } + }); +} + +async function checkToolsList(endpoint: string): Promise { + return measured("tools_list", async () => { + const result = await rpc(endpoint, "tools/list", {}); + const tools = arrayValue(result.tools); + const names = new Set( + tools + .map((tool) => object(tool).name) + .filter((name): name is string => typeof name === "string"), + ); + for (const required of [ + "list_services", + "search_services", + "get_catalog_status", + ]) { + if (!names.has(required)) throw new Error(`missing tool ${required}`); + } + return [ + { + metric: metricName("tools.advertised"), + type: "gauge", + value: tools.length, + }, + ]; + }); +} + +async function checkCatalogStatus( + endpoint: string, +): Promise { + return measured("get_catalog_status", async () => { + const result = await callTool(endpoint, "get_catalog_status", {}); + const content = object(result.structuredContent); + const serviceCount = numberValue(content.serviceCount); + const offerCount = numberValue(content.offerCount); + const cacheAgeSeconds = numberValue(content.cacheAgeSeconds); + if (serviceCount < MIN_SERVICE_COUNT) { + throw new Error(`serviceCount below ${MIN_SERVICE_COUNT}`); + } + if (offerCount < MIN_OFFER_COUNT) { + throw new Error(`offerCount below ${MIN_OFFER_COUNT}`); + } + if (cacheAgeSeconds > MAX_CACHE_AGE_SECONDS) { + throw new Error(`cacheAgeSeconds above ${MAX_CACHE_AGE_SECONDS}`); + } + return [ + { + metric: metricName("catalog.services"), + type: "gauge", + value: serviceCount, + }, + { + metric: metricName("catalog.offers"), + type: "gauge", + value: offerCount, + }, + { + metric: metricName("catalog.cache_age_seconds"), + type: "gauge", + value: cacheAgeSeconds, + }, + ]; + }); +} + +async function checkSearchServices( + endpoint: string, +): Promise { + return measured("search_services", async () => { + const result = await callTool(endpoint, "search_services", { + category: "ai", + limit: 1, + }); + const content = object(result.structuredContent); + const total = numberValue(content.total); + const returned = numberValue(content.returned); + if (total <= 0) throw new Error("expected at least one ai service"); + if (returned <= 0) throw new Error("expected one returned ai service"); + }); +} + +async function measured( + name: string, + check: () => Promise, +): Promise { + const startedAt = Date.now(); + try { + const metrics = await check(); + return { + name, + ok: true, + durationMs: Date.now() - startedAt, + ...(metrics ? { metrics } : {}), + }; + } catch (error) { + return { + name, + ok: false, + durationMs: Date.now() - startedAt, + error: errorMessage(error), + }; + } +} + +async function callTool( + endpoint: string, + name: string, + args: JsonObject, +): Promise { + const result = await rpc(endpoint, "tools/call", { + name, + arguments: args, + }); + if (result.isError === true) { + throw new Error(`tool returned isError: ${name}`); + } + return result; +} + +async function rpc( + endpoint: string, + method: string, + params: JsonObject, +): Promise { + const response = await fetchWithTimeout(endpoint, { + method: "POST", + headers: { + accept: "application/json", + "content-type": "application/json", + }, + body: JSON.stringify({ + jsonrpc: "2.0", + id: 1, + method, + params, + }), + }); + + if (response.status !== 200) { + throw new Error(`expected 200, received ${response.status}`); + } + + const body = object(await response.json()); + if (body.error) { + throw new Error( + `json-rpc error: ${stringValue(object(body.error).message)}`, + ); + } + return object(body.result); +} + +async function fetchWithTimeout( + input: RequestInfo, + init?: RequestInit, +): Promise { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), REQUEST_TIMEOUT_MS); + try { + return await fetch(input, { ...init, signal: controller.signal }); + } finally { + clearTimeout(timeout); + } +} + +function object(value: unknown): JsonObject { + if (typeof value === "object" && value !== null && !Array.isArray(value)) { + return value as JsonObject; + } + return {}; +} + +function arrayValue(value: unknown): unknown[] { + return Array.isArray(value) ? value : []; +} + +function stringValue(value: unknown): string { + return typeof value === "string" ? value : ""; +} + +function numberValue(value: unknown): number { + return typeof value === "number" && Number.isFinite(value) ? value : 0; +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/workers/mcp-services/src/index.test.ts b/workers/mcp-services/src/index.test.ts index 72ed535f..999a821c 100644 --- a/workers/mcp-services/src/index.test.ts +++ b/workers/mcp-services/src/index.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import type { CachedCatalog } from "./cache.js"; import worker from "./index.js"; import type { Service, WorkerEnv } from "./types.js"; @@ -18,6 +18,10 @@ const services: Service[] = [ ]; describe("worker routes", () => { + afterEach(() => { + vi.unstubAllGlobals(); + }); + it("serves the services MCP card from /mcp/services", async () => { const response = await worker.fetch( new Request("https://worker.example.com/mcp/services"), @@ -87,6 +91,66 @@ describe("worker routes", () => { expect(response.status).toBe(404); }); + + it("does not block MCP responses on Datadog request metrics", async () => { + vi.stubGlobal( + "fetch", + vi.fn(async () => { + throw new Error("Datadog unavailable"); + }), + ); + + const response = await worker.fetch( + new Request("https://worker.example.com/mcp/services", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + jsonrpc: "2.0", + id: 1, + method: "initialize", + params: {}, + }), + }), + { + ...envWithCatalog(), + DATADOG_API_KEY: "dd-key", + DATADOG_ENABLED: "true", + }, + testContext(), + ); + + expect(response.status).toBe(200); + }); +}); + +describe("scheduled health", () => { + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it("runs public health checks and posts Datadog metrics", async () => { + const datadogBodies: unknown[] = []; + vi.stubGlobal( + "fetch", + vi.fn(async (input: RequestInfo, init?: RequestInit) => { + const request = new Request(input, init); + if (request.url === "https://api.us5.datadoghq.com/api/v1/series") { + datadogBodies.push(await request.json()); + return new Response("{}", { status: 202 }); + } + return healthyPublicEndpoint(request); + }), + ); + + const ctx = collectingContext(); + await worker.scheduled?.(scheduled("* * * * *"), datadogEnv(), ctx); + await ctx.drain(); + + expect(datadogBodies).toHaveLength(1); + expect(JSON.stringify(datadogBodies[0])).toContain( + "mpp.discovery_mcp.health.ok", + ); + }); }); function envWithCatalog(): WorkerEnv { @@ -99,6 +163,7 @@ function envWithCatalog(): WorkerEnv { return { MPP_SERVICES_URL: "https://mpp.dev/api/services", PUBLIC_MCP_ENDPOINT: "https://mpp.dev/mcp/services", + DATADOG_ENABLED: "false", MPP_CATALOG_CACHE: { async get() { return catalog; @@ -115,3 +180,104 @@ function testContext(): ExecutionContext { props: {}, } as unknown as ExecutionContext; } + +function datadogEnv(): WorkerEnv { + return { + ...envWithCatalog(), + DATADOG_API_KEY: "dd-key", + DATADOG_ENABLED: "true", + DATADOG_ENV: "production", + DATADOG_SERVICE: "mpp-discovery-service-mcp", + DATADOG_SITE: "us5.datadoghq.com", + }; +} + +function collectingContext(): ExecutionContext & { drain(): Promise } { + const promises: Promise[] = []; + return { + waitUntil(promise: Promise) { + promises.push(promise); + }, + passThroughOnException() {}, + props: {}, + async drain() { + for (let index = 0; index < promises.length; index += 1) { + await promises[index]; + } + }, + } as unknown as ExecutionContext & { drain(): Promise }; +} + +function scheduled(cron: string): ScheduledController { + return { + cron, + scheduledTime: Date.now(), + noRetry() {}, + } as unknown as ScheduledController; +} + +async function healthyPublicEndpoint(request: Request): Promise { + if (request.url !== "https://mpp.dev/mcp/services") { + throw new Error(`unexpected url ${request.url}`); + } + + if (request.method === "GET") { + return Response.json({ + serverInfo: { name: "mpp-services-mcp" }, + transport: { endpoint: "https://mpp.dev/mcp/services" }, + instructions: "Runtime 402 Challenges are authoritative.", + }); + } + + if (request.method === "HEAD") { + return new Response(null, { status: 200 }); + } + + const body = (await request.json()) as { + method: string; + params?: { name?: string }; + }; + if (body.method === "initialize") { + return rpcResult({ + serverInfo: { name: "mpp-services-mcp" }, + instructions: "Runtime 402 Challenges are authoritative.", + }); + } + if (body.method === "tools/list") { + return rpcResult({ + tools: [ + { name: "list_services" }, + { name: "search_services" }, + { name: "get_catalog_status" }, + ], + }); + } + if ( + body.method === "tools/call" && + body.params?.name === "get_catalog_status" + ) { + return rpcResult({ + structuredContent: { + serviceCount: 137, + offerCount: 1200, + cacheAgeSeconds: 60, + }, + }); + } + if (body.method === "tools/call" && body.params?.name === "search_services") { + return rpcResult({ + structuredContent: { + total: 27, + returned: 1, + }, + }); + } + return Response.json( + { jsonrpc: "2.0", id: 1, error: { code: -32601, message: "missing" } }, + { status: 200 }, + ); +} + +function rpcResult(result: unknown): Response { + return Response.json({ jsonrpc: "2.0", id: 1, result }); +} diff --git a/workers/mcp-services/src/index.ts b/workers/mcp-services/src/index.ts index 0c50080a..c7c4bbab 100644 --- a/workers/mcp-services/src/index.ts +++ b/workers/mcp-services/src/index.ts @@ -1,5 +1,9 @@ import { refreshCatalog } from "./cache.js"; +import { emitDatadogMetrics, metricName } from "./datadog.js"; +import { countPaymentOffers } from "./discovery.js"; +import { runPublicHealthCheck } from "./health.js"; import { handleMcp, jsonHeaders, optionsResponse, serverCard } from "./mcp.js"; +import { withRequestMetrics } from "./request-metrics.js"; import type { WorkerEnv } from "./types.js"; export default { @@ -11,51 +15,95 @@ export default { const url = new URL(request.url); const publicEndpoint = publicMcpEndpoint(url, env); - if (request.method === "OPTIONS") return optionsResponse(); + return withRequestMetrics( + request, + env, + ctx, + routeName(url.pathname, request.method), + async () => { + if (request.method === "OPTIONS") return optionsResponse(); - if ( - (url.pathname === "/mcp/services" || - url.pathname === "/mcp" || - url.pathname === "/") && - request.method === "POST" - ) { - return handleMcp(request, env, ctx); - } + if ( + (url.pathname === "/mcp/services" || + url.pathname === "/mcp" || + url.pathname === "/") && + request.method === "POST" + ) { + return handleMcp(request, env, ctx); + } - if (url.pathname === "/" && request.method === "GET") { - return Response.json( - { - name: "mpp-services-mcp", - mcp: publicEndpoint, - serverCard: publicEndpoint, - description: - "Read-only MCP server for the MPP service discovery catalog.", - }, - { headers: jsonHeaders() }, - ); - } + if (url.pathname === "/" && request.method === "GET") { + return Response.json( + { + name: "mpp-services-mcp", + mcp: publicEndpoint, + serverCard: publicEndpoint, + description: + "Read-only MCP server for the MPP service discovery catalog.", + }, + { headers: jsonHeaders() }, + ); + } - if (url.pathname === "/mcp/services" && request.method === "GET") { - return Response.json(serverCard(publicEndpoint), { - headers: jsonHeaders(), - }); - } + if (url.pathname === "/mcp/services" && request.method === "GET") { + return Response.json(serverCard(publicEndpoint), { + headers: jsonHeaders(), + }); + } - if (url.pathname === "/mcp/services" && request.method === "HEAD") { - return new Response(null, { status: 200, headers: jsonHeaders() }); - } + if (url.pathname === "/mcp/services" && request.method === "HEAD") { + return new Response(null, { status: 200, headers: jsonHeaders() }); + } - return Response.json( - { error: "not found" }, - { status: 404, headers: jsonHeaders() }, + return Response.json( + { error: "not found" }, + { status: 404, headers: jsonHeaders() }, + ); + }, ); }, - async scheduled(event: ScheduledController, env: Env, ctx: ExecutionContext) { + async scheduled( + event: ScheduledController, + env: WorkerEnv, + ctx: ExecutionContext, + ) { + if (event.cron === "* * * * *") { + ctx.waitUntil(runScheduledHealthCheck(event, env, ctx)); + return; + } + const startedAt = Date.now(); ctx.waitUntil( refreshCatalog(env) .then((catalog) => { + emitDatadogMetrics(env, ctx, [ + { + metric: metricName("catalog.refresh.ok"), + type: "gauge", + value: 1, + }, + { + metric: metricName("catalog.refresh.duration_ms"), + type: "gauge", + value: Date.now() - startedAt, + }, + { + metric: metricName("catalog.services"), + type: "gauge", + value: catalog.services.length, + }, + { + metric: metricName("catalog.offers"), + type: "gauge", + value: countPaymentOffers(catalog.services), + }, + { + metric: metricName("catalog.cache_age_seconds"), + type: "gauge", + value: 0, + }, + ]); console.log( JSON.stringify({ message: "catalog.refresh_complete", @@ -69,6 +117,18 @@ export default { ); }) .catch((error) => { + emitDatadogMetrics(env, ctx, [ + { + metric: metricName("catalog.refresh.ok"), + type: "gauge", + value: 0, + }, + { + metric: metricName("catalog.refresh.duration_ms"), + type: "gauge", + value: Date.now() - startedAt, + }, + ]); console.error( JSON.stringify({ message: "catalog.refresh_failed", @@ -86,3 +146,34 @@ export default { function publicMcpEndpoint(url: URL, env: WorkerEnv): string { return env.PUBLIC_MCP_ENDPOINT || `${url.origin}/mcp/services`; } + +async function runScheduledHealthCheck( + event: ScheduledController, + env: WorkerEnv, + ctx: ExecutionContext, +): Promise { + const startedAt = Date.now(); + const metrics = await runPublicHealthCheck(env); + emitDatadogMetrics(env, ctx, metrics); + console.log( + JSON.stringify({ + message: "mcp.health_complete", + cron: event.cron, + scheduledTime: new Date(event.scheduledTime).toISOString(), + durationMs: Date.now() - startedAt, + }), + ); +} + +function routeName(pathname: string, method: string): string { + if (method === "OPTIONS") return "options"; + if ( + method === "POST" && + (pathname === "/mcp/services" || pathname === "/mcp" || pathname === "/") + ) { + return "mcp"; + } + if (pathname === "/mcp/services") return "mcp_services_card"; + if (pathname === "/") return "root"; + return "not_found"; +} diff --git a/workers/mcp-services/src/mcp.ts b/workers/mcp-services/src/mcp.ts index e9fe24d3..0ea51450 100644 --- a/workers/mcp-services/src/mcp.ts +++ b/workers/mcp-services/src/mcp.ts @@ -13,7 +13,13 @@ import { searchServices, servicesByRecipient, } from "./discovery.js"; -import { CATEGORIES, INTEGRATIONS, type Service, STATUSES } from "./types.js"; +import { + CATEGORIES, + INTEGRATIONS, + type Service, + STATUSES, + type WorkerEnv, +} from "./types.js"; const PROTOCOL_VERSION = "2025-06-18"; const SERVER_VERSION = "1.0.0"; @@ -126,7 +132,7 @@ type FetchedOpenApi = { export async function handleMcp( request: Request, - env: Env, + env: WorkerEnv, ctx: ExecutionContext, ): Promise { let payload: unknown; @@ -198,7 +204,7 @@ export function optionsResponse(): Response { async function handleMessage( request: JsonRpcRequest | undefined, - env: Env, + env: WorkerEnv, ctx: ExecutionContext, ): Promise { if (request?.jsonrpc !== "2.0" || typeof request.method !== "string") { @@ -239,7 +245,7 @@ async function handleMessage( async function handleToolCall( params: ToolCallParams, - env: Env, + env: WorkerEnv, ctx: ExecutionContext, ) { const name = typeof params.name === "string" ? params.name : ""; diff --git a/workers/mcp-services/src/request-metrics.ts b/workers/mcp-services/src/request-metrics.ts new file mode 100644 index 00000000..58e526cb --- /dev/null +++ b/workers/mcp-services/src/request-metrics.ts @@ -0,0 +1,195 @@ +import { + type DatadogMetric, + metricName, + sendDatadogMetrics, +} from "./datadog.js"; +import type { WorkerEnv } from "./types.js"; + +type McpRequestSummary = { + method?: string; + tool?: string; +}; + +const KNOWN_TOOLS = new Set([ + "list_services", + "search_services", + "search_offers", + "recommend_services", + "get_usage_recipe", + "get_facets", + "get_services_by_recipient", + "get_catalog_status", + "get_service", + "get_offers", + "get_openapi", +]); + +export async function withRequestMetrics( + request: Request, + env: WorkerEnv, + ctx: ExecutionContext, + route: string, + handler: () => Promise, +): Promise { + const startedAt = Date.now(); + const mcpSummaryPromise = + route === "mcp" ? summarizeMcpRequest(request) : Promise.resolve(undefined); + let response: Response; + try { + response = await handler(); + } catch (error) { + queueRequestMetrics( + env, + ctx, + mcpSummaryPromise, + request, + route, + 500, + Date.now() - startedAt, + true, + ); + throw error; + } + + queueRequestMetrics( + env, + ctx, + mcpSummaryPromise, + request, + route, + response.status, + Date.now() - startedAt, + false, + ); + return response; +} + +function queueRequestMetrics( + env: WorkerEnv, + ctx: ExecutionContext, + summaryPromise: Promise, + request: Request, + route: string, + status: number, + durationMs: number, + thrown: boolean, +): void { + ctx.waitUntil( + summaryPromise + .then((summary) => + sendDatadogMetrics( + env, + requestMetrics(request, route, status, durationMs, summary, thrown), + ), + ) + .catch((error) => { + console.error( + JSON.stringify({ + message: "datadog.request_metrics_failed", + error: errorMessage(error), + }), + ); + }), + ); +} + +function requestMetrics( + request: Request, + route: string, + status: number, + durationMs: number, + summary: McpRequestSummary | undefined, + thrown: boolean, +): DatadogMetric[] { + const tags = [ + `route:${route}`, + `method:${request.method}`, + `status_class:${statusClass(status)}`, + ]; + const metrics: DatadogMetric[] = [ + { + metric: metricName("http.request.count"), + type: "count", + value: 1, + tags, + }, + { + metric: metricName("http.response.duration_ms"), + type: "gauge", + value: durationMs, + tags, + }, + ]; + + if (summary?.method) { + const mcpTags = [ + `mcp_method:${summary.method}`, + ...(summary.tool ? [`tool:${summary.tool}`] : []), + `status:${thrown || status >= 500 ? "server_error" : "ok"}`, + ]; + metrics.push( + { + metric: metricName("mcp.request.count"), + type: "count", + value: 1, + tags: mcpTags, + }, + { + metric: metricName("mcp.response.duration_ms"), + type: "gauge", + value: durationMs, + tags: mcpTags, + }, + ); + if (thrown || status >= 500) { + metrics.push({ + metric: metricName("mcp.error.count"), + type: "count", + value: 1, + tags: [...mcpTags, "error_class:server"], + }); + } + } + + return metrics; +} + +async function summarizeMcpRequest( + request: Request, +): Promise { + try { + const body = await request.clone().json(); + const message = Array.isArray(body) ? body[0] : body; + if (typeof message !== "object" || message === null) return undefined; + const method = valueAsString((message as { method?: unknown }).method); + const params = (message as { params?: unknown }).params; + if ( + method !== "tools/call" || + typeof params !== "object" || + params === null + ) { + return method ? { method } : undefined; + } + const rawTool = valueAsString((params as { name?: unknown }).name); + const tool = rawTool && KNOWN_TOOLS.has(rawTool) ? rawTool : "unknown"; + return { method, tool }; + } catch { + return undefined; + } +} + +function valueAsString(value: unknown): string | undefined { + return typeof value === "string" && value.length > 0 ? value : undefined; +} + +function statusClass(status: number): string { + if (status >= 500) return "5xx"; + if (status >= 400) return "4xx"; + if (status >= 300) return "3xx"; + if (status >= 200) return "2xx"; + return "other"; +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/workers/mcp-services/src/types.ts b/workers/mcp-services/src/types.ts index 9a888c46..ee45071f 100644 --- a/workers/mcp-services/src/types.ts +++ b/workers/mcp-services/src/types.ts @@ -72,6 +72,18 @@ export interface ServicesResponse { services: Service[]; } -export type WorkerEnv = Env & { +export type WorkerEnv = Omit< + Env, + | "DATADOG_ENABLED" + | "DATADOG_ENV" + | "DATADOG_SERVICE" + | "DATADOG_SITE" + | "PUBLIC_MCP_ENDPOINT" +> & { + DATADOG_API_KEY?: string; + DATADOG_ENABLED?: string; + DATADOG_ENV?: string; + DATADOG_SERVICE?: string; + DATADOG_SITE?: string; PUBLIC_MCP_ENDPOINT?: string; }; diff --git a/workers/mcp-services/wrangler.jsonc b/workers/mcp-services/wrangler.jsonc index 4f074255..06ca31ca 100644 --- a/workers/mcp-services/wrangler.jsonc +++ b/workers/mcp-services/wrangler.jsonc @@ -17,7 +17,7 @@ } }, "triggers": { - "crons": ["0 * * * *"] + "crons": ["* * * * *", "0 * * * *"] }, "kv_namespaces": [ { @@ -26,6 +26,10 @@ } ], "vars": { + "DATADOG_ENABLED": "true", + "DATADOG_ENV": "production", + "DATADOG_SERVICE": "mpp-discovery-service-mcp", + "DATADOG_SITE": "us5.datadoghq.com", "MPP_SERVICES_URL": "https://mpp.dev/api/services", "PUBLIC_MCP_ENDPOINT": "https://mpp.dev/mcp/services" } From 8f35eeaf1072fefa70574592bc778197dd83ccfa Mon Sep 17 00:00:00 2001 From: Parv Ahuja Date: Mon, 22 Jun 2026 22:23:55 -0700 Subject: [PATCH 2/4] Simplify Datadog metrics architecture --- package.json | 1 - .../provision-mpp-discovery-service-mcp.mjs | 143 ------- workers/mcp-services/README.md | 15 +- workers/mcp-services/src/datadog.test.ts | 29 +- workers/mcp-services/src/datadog.ts | 146 +++---- workers/mcp-services/src/health.test.ts | 13 +- workers/mcp-services/src/health.ts | 377 +++++++----------- workers/mcp-services/src/index.ts | 99 ++--- workers/mcp-services/src/mcp-client.ts | 104 +++++ workers/mcp-services/src/request-metrics.ts | 236 ++++++----- 10 files changed, 500 insertions(+), 663 deletions(-) delete mode 100644 scripts/datadog/provision-mpp-discovery-service-mcp.mjs create mode 100644 workers/mcp-services/src/mcp-client.ts diff --git a/package.json b/package.json index b7ffaf79..1b8fea74 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,6 @@ "dev": "node scripts/generate-discovery.ts && vite dev", "generate:discovery": "node scripts/generate-discovery.ts", "generate:pages": "node scripts/generate-pages.ts", - "mcp-services:datadog:provision": "node scripts/datadog/provision-mpp-discovery-service-mcp.mjs", "mcp-services:check": "pnpm --filter mpp-services-mcp run check", "mcp-services:deploy": "pnpm --filter mpp-services-mcp run deploy", "preinstall": "pnpx only-allow pnpm", diff --git a/scripts/datadog/provision-mpp-discovery-service-mcp.mjs b/scripts/datadog/provision-mpp-discovery-service-mcp.mjs deleted file mode 100644 index d5a5a443..00000000 --- a/scripts/datadog/provision-mpp-discovery-service-mcp.mjs +++ /dev/null @@ -1,143 +0,0 @@ -const DEFAULT_SITE = "us5.datadoghq.com"; -const DEFAULT_SERVICE = "mpp-discovery-service-mcp"; -const DEFAULT_ENV = "production"; -const DEFAULT_ALERT_TARGET = "@slack-eng-developers-monitor"; - -const apiKey = requiredEnv("DATADOG_API_KEY"); -const appKey = requiredEnv("DATADOG_APP_KEY"); -const site = process.env.DATADOG_SITE || DEFAULT_SITE; -const service = process.env.DATADOG_SERVICE || DEFAULT_SERVICE; -const env = process.env.DATADOG_ENV || DEFAULT_ENV; -const alertTarget = process.env.DATADOG_ALERT_TARGET || DEFAULT_ALERT_TARGET; -const apiBase = datadogApiBase(site); -const metricFilter = `service:${service},env:${env}`; -const monitorTags = [`service:${service}`, `env:${env}`, "managed_by:repo"]; - -const monitors = [ - { - name: "MPP Discovery Service MCP health failing", - type: "metric alert", - query: `avg(last_3m):avg:mpp.discovery_mcp.health.ok{${metricFilter}} < 1`, - message: [ - `MPP Discovery Service MCP public health is failing. ${alertTarget}`, - "", - "The Cloudflare cron check calls https://mpp.dev/mcp/services and validates the MCP card, initialize, tools/list, catalog status, and search_services.", - ].join("\n"), - options: { - thresholds: { critical: 1 }, - notify_no_data: true, - no_data_timeframe: 5, - renotify_interval: 60, - include_tags: true, - }, - }, - { - name: "MPP Discovery Service MCP catalog cache stale", - type: "metric alert", - query: `avg(last_5m):max:mpp.discovery_mcp.catalog.cache_age_seconds{${metricFilter}} > 10800`, - message: `MPP Discovery Service MCP catalog cache is older than 3 hours. ${alertTarget}`, - options: { - thresholds: { critical: 10800 }, - notify_no_data: false, - renotify_interval: 60, - include_tags: true, - }, - }, - { - name: "MPP Discovery Service MCP service count low", - type: "metric alert", - query: `avg(last_15m):min:mpp.discovery_mcp.catalog.services{${metricFilter}} < 100`, - message: `MPP Discovery Service MCP catalog has fewer than 100 services. ${alertTarget}`, - options: { - thresholds: { critical: 100 }, - notify_no_data: false, - renotify_interval: 60, - include_tags: true, - }, - }, - { - name: "MPP Discovery Service MCP offer count low", - type: "metric alert", - query: `avg(last_15m):min:mpp.discovery_mcp.catalog.offers{${metricFilter}} < 1000`, - message: `MPP Discovery Service MCP catalog has fewer than 1000 payment offers. ${alertTarget}`, - options: { - thresholds: { critical: 1000 }, - notify_no_data: false, - renotify_interval: 60, - include_tags: true, - }, - }, - { - name: "MPP Discovery Service MCP server errors", - type: "metric alert", - query: `sum(last_5m):sum:mpp.discovery_mcp.mcp.error.count{${metricFilter},error_class:server}.as_count() > 0`, - message: `MPP Discovery Service MCP is returning server-side MCP errors. ${alertTarget}`, - options: { - thresholds: { critical: 0 }, - notify_no_data: false, - renotify_interval: 60, - include_tags: true, - }, - }, -]; - -const existing = await listManagedMonitors(); -for (const monitor of monitors) { - const match = existing.find((item) => item.name === monitor.name); - if (match) { - await datadogRequest(`/api/v1/monitor/${match.id}`, { - method: "PUT", - body: { ...monitor, tags: monitorTags }, - }); - console.log(`Updated monitor ${monitor.name} (${match.id})`); - } else { - const created = await datadogRequest("/api/v1/monitor", { - method: "POST", - body: { ...monitor, tags: monitorTags }, - }); - console.log(`Created monitor ${monitor.name} (${created.id})`); - } -} - -async function listManagedMonitors() { - const query = new URLSearchParams({ - monitor_tags: "managed_by:repo", - name: "MPP Discovery Service MCP", - }); - return datadogRequest(`/api/v1/monitor?${query.toString()}`, { - method: "GET", - }); -} - -async function datadogRequest(path, { method, body }) { - const response = await fetch(`${apiBase}${path}`, { - method, - headers: { - accept: "application/json", - "content-type": "application/json", - "DD-API-KEY": apiKey, - "DD-APPLICATION-KEY": appKey, - }, - body: body ? JSON.stringify(body) : undefined, - }); - const text = await response.text(); - const payload = text ? JSON.parse(text) : undefined; - if (!response.ok) { - throw new Error( - `Datadog API ${method} ${path} failed: ${response.status} ${text}`, - ); - } - return payload; -} - -function datadogApiBase(value) { - if (/^https?:\/\//i.test(value)) return value.replace(/\/+$/, ""); - if (value.startsWith("api.")) return `https://${value}`; - return `https://api.${value}`; -} - -function requiredEnv(name) { - const value = process.env[name]; - if (!value) throw new Error(`${name} is required`); - return value; -} diff --git a/workers/mcp-services/README.md b/workers/mcp-services/README.md index 5f4a6e3e..2c397ba4 100644 --- a/workers/mcp-services/README.md +++ b/workers/mcp-services/README.md @@ -93,19 +93,8 @@ Production Worker secret: DATADOG_API_KEY= ``` -`DATADOG_APP_KEY` must not be configured on the Worker. It is only used outside -runtime to provision monitors: - -```bash -DATADOG_API_KEY= \ -DATADOG_APP_KEY= \ -DATADOG_ALERT_TARGET=@slack-eng-developers-monitor \ -pnpm mcp-services:datadog:provision -``` - -The provisioner creates or updates Datadog monitors for missing health metrics, -failed health checks, stale catalog cache, low service/offer counts, and -server-side MCP errors. Alerts route through Datadog's Slack integration. +Only the Worker metrics API key is required by this package. Configure Datadog +notifications manually from these emitted metrics. ## Tools diff --git a/workers/mcp-services/src/datadog.test.ts b/workers/mcp-services/src/datadog.test.ts index 933ef1a3..8b0bb93a 100644 --- a/workers/mcp-services/src/datadog.test.ts +++ b/workers/mcp-services/src/datadog.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import { datadogEnabled, metricName, sendDatadogMetrics } from "./datadog.js"; +import { DatadogMetricsClient, gauge } from "./datadog.js"; import type { WorkerEnv } from "./types.js"; describe("datadog metrics", () => { @@ -17,13 +17,8 @@ describe("datadog metrics", () => { }), ); - await sendDatadogMetrics(env(), [ - { - metric: metricName("health.ok"), - type: "gauge", - value: 1, - tags: ["check:initialize"], - }, + await new DatadogMetricsClient(env()).send([ + gauge("health.ok", 1, ["check:initialize"]), ]); expect(requests).toHaveLength(1); @@ -53,17 +48,23 @@ describe("datadog metrics", () => { const fetch = vi.fn(); vi.stubGlobal("fetch", fetch); - await sendDatadogMetrics({ ...env(), DATADOG_ENABLED: "false" }, [ - { metric: metricName("health.ok"), type: "gauge", value: 1 }, - ]); + await new DatadogMetricsClient({ + ...env(), + DATADOG_ENABLED: "false", + }).send([gauge("health.ok", 1)]); expect(fetch).not.toHaveBeenCalled(); }); it("requires explicit enablement or an API key", () => { - expect(datadogEnabled({} as WorkerEnv)).toBe(false); - expect(datadogEnabled({ DATADOG_ENABLED: "true" } as WorkerEnv)).toBe(true); - expect(datadogEnabled({ DATADOG_API_KEY: "key" } as WorkerEnv)).toBe(true); + expect(new DatadogMetricsClient({} as WorkerEnv).enabled).toBe(false); + expect( + new DatadogMetricsClient({ DATADOG_ENABLED: "true" } as WorkerEnv) + .enabled, + ).toBe(true); + expect( + new DatadogMetricsClient({ DATADOG_API_KEY: "key" } as WorkerEnv).enabled, + ).toBe(true); }); }); diff --git a/workers/mcp-services/src/datadog.ts b/workers/mcp-services/src/datadog.ts index eccc391c..db2c41c9 100644 --- a/workers/mcp-services/src/datadog.ts +++ b/workers/mcp-services/src/datadog.ts @@ -2,7 +2,7 @@ import type { WorkerEnv } from "./types.js"; type MetricType = "count" | "gauge"; -export type DatadogMetric = { +export type MetricPoint = { metric: string; value: number; type: MetricType; @@ -11,89 +11,99 @@ export type DatadogMetric = { const DEFAULT_DATADOG_SITE = "us5.datadoghq.com"; const METRIC_PREFIX = "mpp.discovery_mcp"; +const DEFAULT_SERVICE = "mpp-discovery-service-mcp"; +const DEFAULT_ENV = "production"; export function metricName(name: string): string { return `${METRIC_PREFIX}.${name}`; } -export function emitDatadogMetrics( - env: WorkerEnv, - ctx: ExecutionContext, - metrics: DatadogMetric[], -): void { - if (metrics.length === 0 || !datadogEnabled(env)) return; +export function gauge( + name: string, + value: number, + tags?: string[], +): MetricPoint { + return { metric: metricName(name), type: "gauge", value, tags }; +} - ctx.waitUntil( - sendDatadogMetrics(env, metrics).catch((error) => { - console.error( - JSON.stringify({ - message: "datadog.metrics_failed", - error: errorMessage(error), - }), - ); - }), - ); +export function count(name: string, value = 1, tags?: string[]): MetricPoint { + return { metric: metricName(name), type: "count", value, tags }; } -export async function sendDatadogMetrics( - env: WorkerEnv, - metrics: DatadogMetric[], -): Promise { - if (metrics.length === 0 || !datadogEnabled(env)) return; - - const apiKey = env.DATADOG_API_KEY; - if (!apiKey) { - console.warn( - JSON.stringify({ - message: "datadog.metrics_skipped", - reason: "missing_api_key", +export class DatadogMetricsClient { + constructor(private readonly env: WorkerEnv) {} + + queue(ctx: ExecutionContext, metrics: MetricPoint[]): void { + if (!this.enabled || metrics.length === 0) return; + + ctx.waitUntil( + this.send(metrics).catch((error) => { + console.error( + JSON.stringify({ + message: "datadog.metrics_failed", + error: errorMessage(error), + }), + ); }), ); - return; } - const timestamp = Math.floor(Date.now() / 1000); - const tags = baseTags(env); - const response = await fetch(`${datadogApiBase(env)}/api/v1/series`, { - method: "POST", - headers: { - "content-type": "application/json", - "DD-API-KEY": apiKey, - }, - body: JSON.stringify({ - series: metrics.map((metric) => ({ - metric: metric.metric, - type: metric.type, - points: [[timestamp, metric.value]], - tags: [...tags, ...(metric.tags ?? [])], - })), - }), - }); - - if (!response.ok) { - throw new Error(`Datadog metrics API failed: ${response.status}`); + async send(metrics: MetricPoint[]): Promise { + if (!this.enabled || metrics.length === 0) return; + + const apiKey = this.env.DATADOG_API_KEY; + if (!apiKey) { + console.warn( + JSON.stringify({ + message: "datadog.metrics_skipped", + reason: "missing_api_key", + }), + ); + return; + } + + const timestamp = Math.floor(Date.now() / 1000); + const response = await fetch(`${this.apiBase}/api/v1/series`, { + method: "POST", + headers: { + "content-type": "application/json", + "DD-API-KEY": apiKey, + }, + body: JSON.stringify({ + series: metrics.map((metric) => ({ + metric: metric.metric, + type: metric.type, + points: [[timestamp, metric.value]], + tags: [...this.baseTags, ...(metric.tags ?? [])], + })), + }), + }); + + if (!response.ok) { + throw new Error(`Datadog metrics API failed: ${response.status}`); + } } -} -export function datadogEnabled(env: WorkerEnv): boolean { - const configured = normalized(env.DATADOG_ENABLED); - if (configured === "true") return true; - if (configured === "false") return false; - return Boolean(env.DATADOG_API_KEY); -} + get enabled(): boolean { + const configured = normalized(this.env.DATADOG_ENABLED); + if (configured === "true") return true; + if (configured === "false") return false; + return Boolean(this.env.DATADOG_API_KEY); + } -function datadogApiBase(env: WorkerEnv): string { - const site = env.DATADOG_SITE || DEFAULT_DATADOG_SITE; - if (/^https?:\/\//i.test(site)) return site.replace(/\/+$/, ""); - if (site.startsWith("api.")) return `https://${site}`; - return `https://api.${site}`; -} + private get apiBase(): string { + const site = this.env.DATADOG_SITE || DEFAULT_DATADOG_SITE; + if (/^https?:\/\//i.test(site)) return site.replace(/\/+$/, ""); + if (site.startsWith("api.")) return `https://${site}`; + return `https://api.${site}`; + } -function baseTags(env: WorkerEnv): string[] { - return [ - `service:${env.DATADOG_SERVICE || "mpp-discovery-service-mcp"}`, - `env:${env.DATADOG_ENV || "production"}`, - ]; + private get baseTags(): string[] { + return [ + `service:${this.env.DATADOG_SERVICE || DEFAULT_SERVICE}`, + `env:${this.env.DATADOG_ENV || DEFAULT_ENV}`, + ]; + } } function normalized(value: string | undefined): string { diff --git a/workers/mcp-services/src/health.test.ts b/workers/mcp-services/src/health.test.ts index 0e5d2f80..58ba9337 100644 --- a/workers/mcp-services/src/health.test.ts +++ b/workers/mcp-services/src/health.test.ts @@ -1,6 +1,5 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import { runPublicHealthCheck } from "./health.js"; -import type { WorkerEnv } from "./types.js"; +import { McpHealthChecker } from "./health.js"; const endpoint = "https://mpp.dev/mcp/services"; @@ -12,7 +11,7 @@ describe("public health check", () => { it("emits healthy metrics for representative MCP checks", async () => { vi.stubGlobal("fetch", vi.fn(healthyFetch)); - const metrics = await runPublicHealthCheck(env()); + const metrics = await new McpHealthChecker(endpoint).metrics(); expect(metricValue(metrics, "mpp.discovery_mcp.health.ok")).toBe(1); expect(metricValue(metrics, "mpp.discovery_mcp.catalog.services")).toBe( @@ -34,7 +33,7 @@ describe("public health check", () => { }), ); - const metrics = await runPublicHealthCheck(env()); + const metrics = await new McpHealthChecker(endpoint).metrics(); expect(metricValue(metrics, "mpp.discovery_mcp.health.ok")).toBe(0); expect(metricValue(metrics, "mpp.discovery_mcp.health.failure.count")).toBe( @@ -134,9 +133,3 @@ function metricValue( ) { return metrics.find((metric) => metric.metric === name)?.value; } - -function env(): WorkerEnv { - return { - PUBLIC_MCP_ENDPOINT: endpoint, - } as WorkerEnv; -} diff --git a/workers/mcp-services/src/health.ts b/workers/mcp-services/src/health.ts index dcd3516b..6d75be7f 100644 --- a/workers/mcp-services/src/health.ts +++ b/workers/mcp-services/src/health.ts @@ -1,175 +1,174 @@ -import { type DatadogMetric, metricName } from "./datadog.js"; +import { gauge, type MetricPoint } from "./datadog.js"; +import { + arrayValue, + HttpMcpClient, + numberValue, + object, + stringValue, +} from "./mcp-client.js"; import type { WorkerEnv } from "./types.js"; -type JsonObject = Record; +type CheckFn = () => Promise; type HealthCheckResult = { name: string; ok: boolean; durationMs: number; error?: string; - metrics?: DatadogMetric[]; + metrics?: MetricPoint[]; }; -const REQUEST_TIMEOUT_MS = 10000; const MAX_CACHE_AGE_SECONDS = 3 * 60 * 60; const MIN_SERVICE_COUNT = 100; const MIN_OFFER_COUNT = 1000; +const REQUIRED_TOOLS = [ + "list_services", + "search_services", + "get_catalog_status", +]; -export async function runPublicHealthCheck( - env: WorkerEnv, -): Promise { - const endpoint = env.PUBLIC_MCP_ENDPOINT || "https://mpp.dev/mcp/services"; - const results = await sequentialChecks([ - () => checkGetCard(endpoint), - () => checkHead(endpoint), - () => checkInitialize(endpoint), - () => checkToolsList(endpoint), - () => checkCatalogStatus(endpoint), - () => checkSearchServices(endpoint), - ]); +export class McpHealthChecker { + private readonly client: HttpMcpClient; - const failures = results.filter((result) => !result.ok); - const metrics: DatadogMetric[] = [ - { - metric: metricName("health.ok"), - type: "gauge", - value: failures.length === 0 ? 1 : 0, - tags: ["endpoint:public"], - }, - { - metric: metricName("health.failure.count"), - type: "gauge", - value: failures.length, - tags: ["endpoint:public"], - }, - ]; + constructor(private readonly endpoint: string) { + this.client = new HttpMcpClient(endpoint); + } - for (const result of results) { - metrics.push( - { - metric: metricName("health.check.ok"), - type: "gauge", - value: result.ok ? 1 : 0, - tags: [`check:${result.name}`, "endpoint:public"], - }, - { - metric: metricName("health.check.duration_ms"), - type: "gauge", - value: result.durationMs, - tags: [`check:${result.name}`, "endpoint:public"], - }, - ...(result.metrics ?? []), + static fromEnv(env: WorkerEnv): McpHealthChecker { + return new McpHealthChecker( + env.PUBLIC_MCP_ENDPOINT || "https://mpp.dev/mcp/services", ); } - if (failures.length > 0) { - console.error( - JSON.stringify({ - message: "mcp.health_failed", - endpoint, - failures: failures.map((failure) => ({ - check: failure.name, - error: failure.error, - })), - }), - ); + async metrics(): Promise { + const results = await this.runChecks(); + const failures = results.filter((result) => !result.ok); + const metrics = [ + gauge("health.ok", failures.length === 0 ? 1 : 0, ["endpoint:public"]), + gauge("health.failure.count", failures.length, ["endpoint:public"]), + ]; + + for (const result of results) { + const tags = [`check:${result.name}`, "endpoint:public"]; + metrics.push( + gauge("health.check.ok", result.ok ? 1 : 0, tags), + gauge("health.check.duration_ms", result.durationMs, tags), + ...(result.metrics ?? []), + ); + } + + if (failures.length > 0) { + console.error( + JSON.stringify({ + message: "mcp.health_failed", + endpoint: this.endpoint, + failures: failures.map((failure) => ({ + check: failure.name, + error: failure.error, + })), + }), + ); + } + + return metrics; } - return metrics; -} + private runChecks(): Promise { + return this.sequential([ + ["get_card", () => this.assertServerCard()], + ["head", () => this.assertHead()], + ["initialize", () => this.assertInitialize()], + ["tools_list", () => this.checkToolsList()], + ["get_catalog_status", () => this.checkCatalogStatus()], + ["search_services", () => this.assertSearchServices()], + ]); + } -async function sequentialChecks( - checks: Array<() => Promise>, -): Promise { - const results: HealthCheckResult[] = []; - for (const check of checks) { - results.push(await check()); + private async sequential( + checks: Array<[string, CheckFn]>, + ): Promise { + const results: HealthCheckResult[] = []; + for (const [name, check] of checks) { + results.push(await this.measured(name, check)); + } + return results; } - return results; -} -async function checkGetCard(endpoint: string): Promise { - return measured("get_card", async () => { - const response = await fetchWithTimeout(endpoint, { - headers: { accept: "application/json" }, - }); - if (response.status !== 200) { - throw new Error(`expected 200, received ${response.status}`); + private async measured( + name: string, + check: CheckFn, + ): Promise { + const startedAt = Date.now(); + try { + const metrics = await check(); + return { + name, + ok: true, + durationMs: Date.now() - startedAt, + ...(metrics ? { metrics } : {}), + }; + } catch (error) { + return { + name, + ok: false, + durationMs: Date.now() - startedAt, + error: errorMessage(error), + }; } + } - const body = await response.json(); - const card = object(body); + private async assertServerCard(): Promise { + const card = await this.client.serverCard(); const transport = object(card.transport); - const instructions = stringValue(card.instructions); if (stringValue(object(card.serverInfo).name) !== "mpp-services-mcp") { throw new Error("server card name mismatch"); } - if (stringValue(transport.endpoint) !== endpoint) { + if (stringValue(transport.endpoint) !== this.endpoint) { throw new Error("server card endpoint mismatch"); } - if (!instructions.includes("402")) { + if (!stringValue(card.instructions).includes("402")) { throw new Error("server card is missing payment authority instructions"); } - }); -} + return undefined; + } -async function checkHead(endpoint: string): Promise { - return measured("head", async () => { - const response = await fetchWithTimeout(endpoint, { method: "HEAD" }); - if (response.status !== 200) { - throw new Error(`expected 200, received ${response.status}`); - } - }); -} + private async assertHead(): Promise { + await this.client.head(); + return undefined; + } -async function checkInitialize(endpoint: string): Promise { - return measured("initialize", async () => { - const result = await rpc(endpoint, "initialize", {}); + private async assertInitialize(): Promise { + const result = await this.client.initialize(); if (stringValue(object(result.serverInfo).name) !== "mpp-services-mcp") { throw new Error("initialize serverInfo mismatch"); } if (!stringValue(result.instructions).includes("402")) { throw new Error("initialize instructions missing 402 authority text"); } - }); -} + return undefined; + } -async function checkToolsList(endpoint: string): Promise { - return measured("tools_list", async () => { - const result = await rpc(endpoint, "tools/list", {}); + private async checkToolsList(): Promise { + const result = await this.client.listTools(); const tools = arrayValue(result.tools); const names = new Set( tools .map((tool) => object(tool).name) .filter((name): name is string => typeof name === "string"), ); - for (const required of [ - "list_services", - "search_services", - "get_catalog_status", - ]) { + for (const required of REQUIRED_TOOLS) { if (!names.has(required)) throw new Error(`missing tool ${required}`); } - return [ - { - metric: metricName("tools.advertised"), - type: "gauge", - value: tools.length, - }, - ]; - }); -} + return [gauge("tools.advertised", tools.length)]; + } -async function checkCatalogStatus( - endpoint: string, -): Promise { - return measured("get_catalog_status", async () => { - const result = await callTool(endpoint, "get_catalog_status", {}); + private async checkCatalogStatus(): Promise { + const result = await this.checkedTool("get_catalog_status", {}); const content = object(result.structuredContent); const serviceCount = numberValue(content.serviceCount); const offerCount = numberValue(content.offerCount); const cacheAgeSeconds = numberValue(content.cacheAgeSeconds); + if (serviceCount < MIN_SERVICE_COUNT) { throw new Error(`serviceCount below ${MIN_SERVICE_COUNT}`); } @@ -179,142 +178,36 @@ async function checkCatalogStatus( if (cacheAgeSeconds > MAX_CACHE_AGE_SECONDS) { throw new Error(`cacheAgeSeconds above ${MAX_CACHE_AGE_SECONDS}`); } + return [ - { - metric: metricName("catalog.services"), - type: "gauge", - value: serviceCount, - }, - { - metric: metricName("catalog.offers"), - type: "gauge", - value: offerCount, - }, - { - metric: metricName("catalog.cache_age_seconds"), - type: "gauge", - value: cacheAgeSeconds, - }, + gauge("catalog.services", serviceCount), + gauge("catalog.offers", offerCount), + gauge("catalog.cache_age_seconds", cacheAgeSeconds), ]; - }); -} + } -async function checkSearchServices( - endpoint: string, -): Promise { - return measured("search_services", async () => { - const result = await callTool(endpoint, "search_services", { + private async assertSearchServices(): Promise { + const result = await this.checkedTool("search_services", { category: "ai", limit: 1, }); const content = object(result.structuredContent); - const total = numberValue(content.total); - const returned = numberValue(content.returned); - if (total <= 0) throw new Error("expected at least one ai service"); - if (returned <= 0) throw new Error("expected one returned ai service"); - }); -} - -async function measured( - name: string, - check: () => Promise, -): Promise { - const startedAt = Date.now(); - try { - const metrics = await check(); - return { - name, - ok: true, - durationMs: Date.now() - startedAt, - ...(metrics ? { metrics } : {}), - }; - } catch (error) { - return { - name, - ok: false, - durationMs: Date.now() - startedAt, - error: errorMessage(error), - }; - } -} - -async function callTool( - endpoint: string, - name: string, - args: JsonObject, -): Promise { - const result = await rpc(endpoint, "tools/call", { - name, - arguments: args, - }); - if (result.isError === true) { - throw new Error(`tool returned isError: ${name}`); - } - return result; -} - -async function rpc( - endpoint: string, - method: string, - params: JsonObject, -): Promise { - const response = await fetchWithTimeout(endpoint, { - method: "POST", - headers: { - accept: "application/json", - "content-type": "application/json", - }, - body: JSON.stringify({ - jsonrpc: "2.0", - id: 1, - method, - params, - }), - }); - - if (response.status !== 200) { - throw new Error(`expected 200, received ${response.status}`); - } - - const body = object(await response.json()); - if (body.error) { - throw new Error( - `json-rpc error: ${stringValue(object(body.error).message)}`, - ); - } - return object(body.result); -} - -async function fetchWithTimeout( - input: RequestInfo, - init?: RequestInit, -): Promise { - const controller = new AbortController(); - const timeout = setTimeout(() => controller.abort(), REQUEST_TIMEOUT_MS); - try { - return await fetch(input, { ...init, signal: controller.signal }); - } finally { - clearTimeout(timeout); + if (numberValue(content.total) <= 0) { + throw new Error("expected at least one ai service"); + } + if (numberValue(content.returned) <= 0) { + throw new Error("expected one returned ai service"); + } + return undefined; } -} -function object(value: unknown): JsonObject { - if (typeof value === "object" && value !== null && !Array.isArray(value)) { - return value as JsonObject; + private async checkedTool(name: string, args: Record) { + const result = await this.client.callTool(name, args); + if (result.isError === true) { + throw new Error(`tool returned isError: ${name}`); + } + return result; } - return {}; -} - -function arrayValue(value: unknown): unknown[] { - return Array.isArray(value) ? value : []; -} - -function stringValue(value: unknown): string { - return typeof value === "string" ? value : ""; -} - -function numberValue(value: unknown): number { - return typeof value === "number" && Number.isFinite(value) ? value : 0; } function errorMessage(error: unknown): string { diff --git a/workers/mcp-services/src/index.ts b/workers/mcp-services/src/index.ts index c7c4bbab..47d0b24b 100644 --- a/workers/mcp-services/src/index.ts +++ b/workers/mcp-services/src/index.ts @@ -1,9 +1,9 @@ import { refreshCatalog } from "./cache.js"; -import { emitDatadogMetrics, metricName } from "./datadog.js"; +import { DatadogMetricsClient, gauge, type MetricPoint } from "./datadog.js"; import { countPaymentOffers } from "./discovery.js"; -import { runPublicHealthCheck } from "./health.js"; +import { McpHealthChecker } from "./health.js"; import { handleMcp, jsonHeaders, optionsResponse, serverCard } from "./mcp.js"; -import { withRequestMetrics } from "./request-metrics.js"; +import { RequestMetricsRecorder } from "./request-metrics.js"; import type { WorkerEnv } from "./types.js"; export default { @@ -14,10 +14,11 @@ export default { ): Promise { const url = new URL(request.url); const publicEndpoint = publicMcpEndpoint(url, env); + const metrics = new DatadogMetricsClient(env); + const requestMetrics = new RequestMetricsRecorder(metrics); - return withRequestMetrics( + return requestMetrics.trace( request, - env, ctx, routeName(url.pathname, request.method), async () => { @@ -68,8 +69,10 @@ export default { env: WorkerEnv, ctx: ExecutionContext, ) { + const metrics = new DatadogMetricsClient(env); + if (event.cron === "* * * * *") { - ctx.waitUntil(runScheduledHealthCheck(event, env, ctx)); + ctx.waitUntil(runScheduledHealthCheck(event, env, metrics, ctx)); return; } @@ -77,33 +80,15 @@ export default { ctx.waitUntil( refreshCatalog(env) .then((catalog) => { - emitDatadogMetrics(env, ctx, [ - { - metric: metricName("catalog.refresh.ok"), - type: "gauge", - value: 1, - }, - { - metric: metricName("catalog.refresh.duration_ms"), - type: "gauge", - value: Date.now() - startedAt, - }, - { - metric: metricName("catalog.services"), - type: "gauge", - value: catalog.services.length, - }, - { - metric: metricName("catalog.offers"), - type: "gauge", - value: countPaymentOffers(catalog.services), - }, - { - metric: metricName("catalog.cache_age_seconds"), - type: "gauge", - value: 0, - }, - ]); + metrics.queue( + ctx, + catalogRefreshMetrics({ + ok: true, + durationMs: Date.now() - startedAt, + services: catalog.services.length, + offers: countPaymentOffers(catalog.services), + }), + ); console.log( JSON.stringify({ message: "catalog.refresh_complete", @@ -117,18 +102,13 @@ export default { ); }) .catch((error) => { - emitDatadogMetrics(env, ctx, [ - { - metric: metricName("catalog.refresh.ok"), - type: "gauge", - value: 0, - }, - { - metric: metricName("catalog.refresh.duration_ms"), - type: "gauge", - value: Date.now() - startedAt, - }, - ]); + metrics.queue( + ctx, + catalogRefreshMetrics({ + ok: false, + durationMs: Date.now() - startedAt, + }), + ); console.error( JSON.stringify({ message: "catalog.refresh_failed", @@ -150,11 +130,12 @@ function publicMcpEndpoint(url: URL, env: WorkerEnv): string { async function runScheduledHealthCheck( event: ScheduledController, env: WorkerEnv, + metrics: DatadogMetricsClient, ctx: ExecutionContext, ): Promise { const startedAt = Date.now(); - const metrics = await runPublicHealthCheck(env); - emitDatadogMetrics(env, ctx, metrics); + const healthMetrics = await McpHealthChecker.fromEnv(env).metrics(); + metrics.queue(ctx, healthMetrics); console.log( JSON.stringify({ message: "mcp.health_complete", @@ -177,3 +158,27 @@ function routeName(pathname: string, method: string): string { if (pathname === "/") return "root"; return "not_found"; } + +function catalogRefreshMetrics({ + ok, + durationMs, + services, + offers, +}: { + ok: boolean; + durationMs: number; + services?: number; + offers?: number; +}): MetricPoint[] { + return [ + gauge("catalog.refresh.ok", ok ? 1 : 0), + gauge("catalog.refresh.duration_ms", durationMs), + ...(ok && services !== undefined && offers !== undefined + ? [ + gauge("catalog.services", services), + gauge("catalog.offers", offers), + gauge("catalog.cache_age_seconds", 0), + ] + : []), + ]; +} diff --git a/workers/mcp-services/src/mcp-client.ts b/workers/mcp-services/src/mcp-client.ts new file mode 100644 index 00000000..ed3a3c52 --- /dev/null +++ b/workers/mcp-services/src/mcp-client.ts @@ -0,0 +1,104 @@ +type JsonObject = Record; + +type RpcRequest = { + method: string; + params?: JsonObject; +}; + +const DEFAULT_TIMEOUT_MS = 10000; + +export class HttpMcpClient { + constructor( + private readonly endpoint: string, + private readonly timeoutMs = DEFAULT_TIMEOUT_MS, + ) {} + + async serverCard(): Promise { + const response = await this.fetch({ + headers: { accept: "application/json" }, + }); + if (response.status !== 200) { + throw new Error(`expected 200, received ${response.status}`); + } + return object(await response.json()); + } + + async head(): Promise { + const response = await this.fetch({ method: "HEAD" }); + if (response.status !== 200) { + throw new Error(`expected 200, received ${response.status}`); + } + } + + initialize(): Promise { + return this.rpc({ method: "initialize", params: {} }); + } + + listTools(): Promise { + return this.rpc({ method: "tools/list", params: {} }); + } + + callTool(name: string, args: JsonObject): Promise { + return this.rpc({ + method: "tools/call", + params: { name, arguments: args }, + }); + } + + private async rpc(request: RpcRequest): Promise { + const response = await this.fetch({ + method: "POST", + headers: { + accept: "application/json", + "content-type": "application/json", + }, + body: JSON.stringify({ + jsonrpc: "2.0", + id: 1, + method: request.method, + params: request.params ?? {}, + }), + }); + + if (response.status !== 200) { + throw new Error(`expected 200, received ${response.status}`); + } + + const body = object(await response.json()); + if (body.error) { + throw new Error( + `json-rpc error: ${stringValue(object(body.error).message)}`, + ); + } + return object(body.result); + } + + private async fetch(init?: RequestInit): Promise { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), this.timeoutMs); + try { + return await fetch(this.endpoint, { ...init, signal: controller.signal }); + } finally { + clearTimeout(timeout); + } + } +} + +export function object(value: unknown): JsonObject { + if (typeof value === "object" && value !== null && !Array.isArray(value)) { + return value as JsonObject; + } + return {}; +} + +export function arrayValue(value: unknown): unknown[] { + return Array.isArray(value) ? value : []; +} + +export function stringValue(value: unknown): string { + return typeof value === "string" ? value : ""; +} + +export function numberValue(value: unknown): number { + return typeof value === "number" && Number.isFinite(value) ? value : 0; +} diff --git a/workers/mcp-services/src/request-metrics.ts b/workers/mcp-services/src/request-metrics.ts index 58e526cb..546649ee 100644 --- a/workers/mcp-services/src/request-metrics.ts +++ b/workers/mcp-services/src/request-metrics.ts @@ -1,9 +1,9 @@ import { - type DatadogMetric, - metricName, - sendDatadogMetrics, + count, + type DatadogMetricsClient, + gauge, + type MetricPoint, } from "./datadog.js"; -import type { WorkerEnv } from "./types.js"; type McpRequestSummary = { method?: string; @@ -24,136 +24,120 @@ const KNOWN_TOOLS = new Set([ "get_openapi", ]); -export async function withRequestMetrics( - request: Request, - env: WorkerEnv, - ctx: ExecutionContext, - route: string, - handler: () => Promise, -): Promise { - const startedAt = Date.now(); - const mcpSummaryPromise = - route === "mcp" ? summarizeMcpRequest(request) : Promise.resolve(undefined); - let response: Response; - try { - response = await handler(); - } catch (error) { - queueRequestMetrics( - env, - ctx, - mcpSummaryPromise, - request, - route, - 500, - Date.now() - startedAt, - true, - ); - throw error; +export class RequestMetricsRecorder { + constructor(private readonly metrics: DatadogMetricsClient) {} + + async trace( + request: Request, + ctx: ExecutionContext, + route: string, + handler: () => Promise, + ): Promise { + const startedAt = Date.now(); + const summaryPromise = + route === "mcp" + ? summarizeMcpRequest(request) + : Promise.resolve(undefined); + + try { + const response = await handler(); + this.queue(ctx, { + request, + route, + status: response.status, + durationMs: Date.now() - startedAt, + summaryPromise, + thrown: false, + }); + return response; + } catch (error) { + this.queue(ctx, { + request, + route, + status: 500, + durationMs: Date.now() - startedAt, + summaryPromise, + thrown: true, + }); + throw error; + } } - queueRequestMetrics( - env, - ctx, - mcpSummaryPromise, - request, - route, - response.status, - Date.now() - startedAt, - false, - ); - return response; + private queue(ctx: ExecutionContext, sample: RequestMetricSample): void { + ctx.waitUntil( + sample.summaryPromise + .then((summary) => + this.metrics.send( + buildRequestMetrics({ + ...sample, + summary, + }), + ), + ) + .catch((error) => { + console.error( + JSON.stringify({ + message: "datadog.request_metrics_failed", + error: errorMessage(error), + }), + ); + }), + ); + } } -function queueRequestMetrics( - env: WorkerEnv, - ctx: ExecutionContext, - summaryPromise: Promise, - request: Request, - route: string, - status: number, - durationMs: number, - thrown: boolean, -): void { - ctx.waitUntil( - summaryPromise - .then((summary) => - sendDatadogMetrics( - env, - requestMetrics(request, route, status, durationMs, summary, thrown), - ), - ) - .catch((error) => { - console.error( - JSON.stringify({ - message: "datadog.request_metrics_failed", - error: errorMessage(error), - }), - ); - }), - ); -} +type RequestMetricSample = { + request: Request; + route: string; + status: number; + durationMs: number; + summaryPromise: Promise; + thrown: boolean; +}; -function requestMetrics( - request: Request, - route: string, - status: number, - durationMs: number, - summary: McpRequestSummary | undefined, - thrown: boolean, -): DatadogMetric[] { +type ResolvedRequestMetricSample = Omit< + RequestMetricSample, + "summaryPromise" +> & { + summary?: McpRequestSummary; +}; + +function buildRequestMetrics( + sample: ResolvedRequestMetricSample, +): MetricPoint[] { const tags = [ - `route:${route}`, - `method:${request.method}`, - `status_class:${statusClass(status)}`, + `route:${sample.route}`, + `method:${sample.request.method}`, + `status_class:${statusClass(sample.status)}`, ]; - const metrics: DatadogMetric[] = [ - { - metric: metricName("http.request.count"), - type: "count", - value: 1, - tags, - }, - { - metric: metricName("http.response.duration_ms"), - type: "gauge", - value: durationMs, - tags, - }, + const metrics = [ + count("http.request.count", 1, tags), + gauge("http.response.duration_ms", sample.durationMs, tags), ]; - if (summary?.method) { - const mcpTags = [ - `mcp_method:${summary.method}`, - ...(summary.tool ? [`tool:${summary.tool}`] : []), - `status:${thrown || status >= 500 ? "server_error" : "ok"}`, - ]; - metrics.push( - { - metric: metricName("mcp.request.count"), - type: "count", - value: 1, - tags: mcpTags, - }, - { - metric: metricName("mcp.response.duration_ms"), - type: "gauge", - value: durationMs, - tags: mcpTags, - }, - ); - if (thrown || status >= 500) { - metrics.push({ - metric: metricName("mcp.error.count"), - type: "count", - value: 1, - tags: [...mcpTags, "error_class:server"], - }); - } + if (sample.summary?.method) { + metrics.push(...mcpMetrics(sample)); } return metrics; } +function mcpMetrics(sample: ResolvedRequestMetricSample): MetricPoint[] { + const status = sample.thrown || sample.status >= 500 ? "server_error" : "ok"; + const tags = [ + `mcp_method:${sample.summary?.method}`, + ...(sample.summary?.tool ? [`tool:${sample.summary.tool}`] : []), + `status:${status}`, + ]; + return [ + count("mcp.request.count", 1, tags), + gauge("mcp.response.duration_ms", sample.durationMs, tags), + ...(status === "server_error" + ? [count("mcp.error.count", 1, [...tags, "error_class:server"])] + : []), + ]; +} + async function summarizeMcpRequest( request: Request, ): Promise { @@ -161,7 +145,7 @@ async function summarizeMcpRequest( const body = await request.clone().json(); const message = Array.isArray(body) ? body[0] : body; if (typeof message !== "object" || message === null) return undefined; - const method = valueAsString((message as { method?: unknown }).method); + const method = stringValue((message as { method?: unknown }).method); const params = (message as { params?: unknown }).params; if ( method !== "tools/call" || @@ -170,15 +154,17 @@ async function summarizeMcpRequest( ) { return method ? { method } : undefined; } - const rawTool = valueAsString((params as { name?: unknown }).name); - const tool = rawTool && KNOWN_TOOLS.has(rawTool) ? rawTool : "unknown"; - return { method, tool }; + const rawTool = stringValue((params as { name?: unknown }).name); + return { + method, + tool: rawTool && KNOWN_TOOLS.has(rawTool) ? rawTool : "unknown", + }; } catch { return undefined; } } -function valueAsString(value: unknown): string | undefined { +function stringValue(value: unknown): string | undefined { return typeof value === "string" && value.length > 0 ? value : undefined; } From 467ed7a903c3205a8ef2fcafb93be520cbdb47ac Mon Sep 17 00:00:00 2001 From: Parv Ahuja Date: Mon, 22 Jun 2026 22:27:15 -0700 Subject: [PATCH 3/4] Remove checker notification hooks --- scripts/check-mpp-discovery-service-mcp.mjs | 37 --------------------- 1 file changed, 37 deletions(-) diff --git a/scripts/check-mpp-discovery-service-mcp.mjs b/scripts/check-mpp-discovery-service-mcp.mjs index bf628fb3..79cc108d 100644 --- a/scripts/check-mpp-discovery-service-mcp.mjs +++ b/scripts/check-mpp-discovery-service-mcp.mjs @@ -6,7 +6,6 @@ const DEFAULT_ENDPOINT = "https://mpp.dev/mcp/services"; const endpoint = normalizeEndpoint( process.env.MPP_DISCOVERY_MCP_URL ?? DEFAULT_ENDPOINT, ); -const slackWebhookUrl = process.env.SLACK_WEBHOOK_URL; const requestTimeoutMs = Number( process.env.MPP_DISCOVERY_MCP_TIMEOUT_MS ?? 30_000, ); @@ -317,7 +316,6 @@ async function main() { return "invalid enum rejected"; }); - await reportToSlack(); await writeStepSummary(); const failed = results.filter((result) => !result.ok); @@ -524,41 +522,6 @@ async function writeStepSummary() { await appendFile(process.env.GITHUB_STEP_SUMMARY, summary); } -async function reportToSlack() { - if (!slackWebhookUrl) { - console.log("SLACK_WEBHOOK_URL is not set; skipping Slack report."); - return; - } - const startedAt = Date.now(); - try { - const response = await fetchWithTimeout(slackWebhookUrl, { - method: "POST", - headers: { "content-type": "application/json" }, - body: JSON.stringify({ text: buildSummaryText() }), - }); - if (!response.ok) { - const text = await response.text(); - results.push({ - name: "Slack report", - ok: false, - detail: `Slack webhook returned HTTP ${response.status}: ${snippet(text)}`, - durationMs: Date.now() - startedAt, - }); - console.error(results.at(-1).detail); - return; - } - console.log("ok - Slack report"); - } catch (error) { - results.push({ - name: "Slack report", - ok: false, - detail: errorMessage(error), - durationMs: Date.now() - startedAt, - }); - console.error(results.at(-1).detail); - } -} - main().catch((error) => { console.error(errorMessage(error)); process.exitCode = 1; From 5b4cf97d65e928ab8b2f3cdd2851532160551227 Mon Sep 17 00:00:00 2001 From: Parv Ahuja Date: Tue, 23 Jun 2026 18:05:21 -0700 Subject: [PATCH 4/4] Simplify Datadog metrics plumbing --- workers/mcp-services/README.md | 5 +- workers/mcp-services/src/datadog.test.ts | 28 +- workers/mcp-services/src/datadog.ts | 133 ++++--- workers/mcp-services/src/health.test.ts | 13 +- workers/mcp-services/src/health.ts | 397 +++++++++++--------- workers/mcp-services/src/index.ts | 185 ++++++--- workers/mcp-services/src/mcp-client.ts | 104 ----- workers/mcp-services/src/request-metrics.ts | 181 --------- 8 files changed, 429 insertions(+), 617 deletions(-) delete mode 100644 workers/mcp-services/src/mcp-client.ts delete mode 100644 workers/mcp-services/src/request-metrics.ts diff --git a/workers/mcp-services/README.md b/workers/mcp-services/README.md index 2c397ba4..2cb781bd 100644 --- a/workers/mcp-services/README.md +++ b/workers/mcp-services/README.md @@ -67,9 +67,7 @@ Metrics use the `mpp.discovery_mcp.*` namespace. Important metrics include: - `mpp.discovery_mcp.http.request.count` - `mpp.discovery_mcp.http.response.duration_ms` -- `mpp.discovery_mcp.mcp.request.count` -- `mpp.discovery_mcp.mcp.response.duration_ms` -- `mpp.discovery_mcp.mcp.error.count` +- `mpp.discovery_mcp.http.error.count` - `mpp.discovery_mcp.health.ok` - `mpp.discovery_mcp.health.check.ok` - `mpp.discovery_mcp.health.check.duration_ms` @@ -77,6 +75,7 @@ Metrics use the `mpp.discovery_mcp.*` namespace. Important metrics include: - `mpp.discovery_mcp.catalog.offers` - `mpp.discovery_mcp.catalog.cache_age_seconds` - `mpp.discovery_mcp.catalog.refresh.ok` +- `mpp.discovery_mcp.catalog.refresh.duration_ms` Production Worker vars: diff --git a/workers/mcp-services/src/datadog.test.ts b/workers/mcp-services/src/datadog.test.ts index 8b0bb93a..b301cf8b 100644 --- a/workers/mcp-services/src/datadog.test.ts +++ b/workers/mcp-services/src/datadog.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import { DatadogMetricsClient, gauge } from "./datadog.js"; +import { datadogEnabled, gauge, postMetrics } from "./datadog.js"; import type { WorkerEnv } from "./types.js"; describe("datadog metrics", () => { @@ -17,9 +17,7 @@ describe("datadog metrics", () => { }), ); - await new DatadogMetricsClient(env()).send([ - gauge("health.ok", 1, ["check:initialize"]), - ]); + await postMetrics(env(), [gauge("health.ok", 1, ["check:initialize"])]); expect(requests).toHaveLength(1); expect(requests[0].url).toBe("https://api.us5.datadoghq.com/api/v1/series"); @@ -48,23 +46,21 @@ describe("datadog metrics", () => { const fetch = vi.fn(); vi.stubGlobal("fetch", fetch); - await new DatadogMetricsClient({ - ...env(), - DATADOG_ENABLED: "false", - }).send([gauge("health.ok", 1)]); + await postMetrics( + { + ...env(), + DATADOG_ENABLED: "false", + }, + [gauge("health.ok", 1)], + ); expect(fetch).not.toHaveBeenCalled(); }); it("requires explicit enablement or an API key", () => { - expect(new DatadogMetricsClient({} as WorkerEnv).enabled).toBe(false); - expect( - new DatadogMetricsClient({ DATADOG_ENABLED: "true" } as WorkerEnv) - .enabled, - ).toBe(true); - expect( - new DatadogMetricsClient({ DATADOG_API_KEY: "key" } as WorkerEnv).enabled, - ).toBe(true); + expect(datadogEnabled({} as WorkerEnv)).toBe(false); + expect(datadogEnabled({ DATADOG_ENABLED: "true" } as WorkerEnv)).toBe(true); + expect(datadogEnabled({ DATADOG_API_KEY: "key" } as WorkerEnv)).toBe(true); }); }); diff --git a/workers/mcp-services/src/datadog.ts b/workers/mcp-services/src/datadog.ts index db2c41c9..8941a91a 100644 --- a/workers/mcp-services/src/datadog.ts +++ b/workers/mcp-services/src/datadog.ts @@ -14,96 +14,91 @@ const METRIC_PREFIX = "mpp.discovery_mcp"; const DEFAULT_SERVICE = "mpp-discovery-service-mcp"; const DEFAULT_ENV = "production"; -export function metricName(name: string): string { - return `${METRIC_PREFIX}.${name}`; -} - export function gauge( name: string, value: number, tags?: string[], ): MetricPoint { - return { metric: metricName(name), type: "gauge", value, tags }; + return { metric: `${METRIC_PREFIX}.${name}`, type: "gauge", value, tags }; } export function count(name: string, value = 1, tags?: string[]): MetricPoint { - return { metric: metricName(name), type: "count", value, tags }; + return { metric: `${METRIC_PREFIX}.${name}`, type: "count", value, tags }; } -export class DatadogMetricsClient { - constructor(private readonly env: WorkerEnv) {} - - queue(ctx: ExecutionContext, metrics: MetricPoint[]): void { - if (!this.enabled || metrics.length === 0) return; - - ctx.waitUntil( - this.send(metrics).catch((error) => { - console.error( - JSON.stringify({ - message: "datadog.metrics_failed", - error: errorMessage(error), - }), - ); - }), - ); - } - - async send(metrics: MetricPoint[]): Promise { - if (!this.enabled || metrics.length === 0) return; +export function queueMetrics( + ctx: ExecutionContext, + env: WorkerEnv, + metrics: MetricPoint[], +): void { + if (!datadogEnabled(env) || metrics.length === 0) return; - const apiKey = this.env.DATADOG_API_KEY; - if (!apiKey) { - console.warn( + ctx.waitUntil( + postMetrics(env, metrics).catch((error) => { + console.error( JSON.stringify({ - message: "datadog.metrics_skipped", - reason: "missing_api_key", + message: "datadog.metrics_failed", + error: errorMessage(error), }), ); - return; - } + }), + ); +} - const timestamp = Math.floor(Date.now() / 1000); - const response = await fetch(`${this.apiBase}/api/v1/series`, { - method: "POST", - headers: { - "content-type": "application/json", - "DD-API-KEY": apiKey, - }, - body: JSON.stringify({ - series: metrics.map((metric) => ({ - metric: metric.metric, - type: metric.type, - points: [[timestamp, metric.value]], - tags: [...this.baseTags, ...(metric.tags ?? [])], - })), +export async function postMetrics( + env: WorkerEnv, + metrics: MetricPoint[], +): Promise { + if (!datadogEnabled(env) || metrics.length === 0) return; + + if (!env.DATADOG_API_KEY) { + console.warn( + JSON.stringify({ + message: "datadog.metrics_skipped", + reason: "missing_api_key", }), - }); - - if (!response.ok) { - throw new Error(`Datadog metrics API failed: ${response.status}`); - } + ); + return; } - get enabled(): boolean { - const configured = normalized(this.env.DATADOG_ENABLED); - if (configured === "true") return true; - if (configured === "false") return false; - return Boolean(this.env.DATADOG_API_KEY); + const timestamp = Math.floor(Date.now() / 1000); + const baseTags = [ + `service:${env.DATADOG_SERVICE || DEFAULT_SERVICE}`, + `env:${env.DATADOG_ENV || DEFAULT_ENV}`, + ]; + const response = await fetch(`${apiBase(env)}/api/v1/series`, { + method: "POST", + headers: { + "content-type": "application/json", + "DD-API-KEY": env.DATADOG_API_KEY, + }, + body: JSON.stringify({ + series: metrics.map((metric) => ({ + metric: metric.metric, + type: metric.type, + points: [[timestamp, metric.value]], + tags: [...baseTags, ...(metric.tags ?? [])], + })), + }), + }); + + if (!response.ok) { + throw new Error(`Datadog metrics API failed: ${response.status}`); } +} - private get apiBase(): string { - const site = this.env.DATADOG_SITE || DEFAULT_DATADOG_SITE; - if (/^https?:\/\//i.test(site)) return site.replace(/\/+$/, ""); - if (site.startsWith("api.")) return `https://${site}`; - return `https://api.${site}`; - } +export function datadogEnabled(env: WorkerEnv): boolean { + const configured = normalized(env.DATADOG_ENABLED); + if (configured === "true") return true; + if (configured === "false") return false; + return Boolean(env.DATADOG_API_KEY); +} - private get baseTags(): string[] { - return [ - `service:${this.env.DATADOG_SERVICE || DEFAULT_SERVICE}`, - `env:${this.env.DATADOG_ENV || DEFAULT_ENV}`, - ]; - } +function apiBase(env: WorkerEnv): string { + const site = env.DATADOG_SITE || DEFAULT_DATADOG_SITE; + if (/^https?:\/\//i.test(site)) return site.replace(/\/+$/, ""); + if (site.startsWith("api.")) return `https://${site}`; + return `https://api.${site}`; } function normalized(value: string | undefined): string { diff --git a/workers/mcp-services/src/health.test.ts b/workers/mcp-services/src/health.test.ts index 58ba9337..7e7f3c52 100644 --- a/workers/mcp-services/src/health.test.ts +++ b/workers/mcp-services/src/health.test.ts @@ -1,5 +1,6 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import { McpHealthChecker } from "./health.js"; +import { healthMetrics } from "./health.js"; +import type { WorkerEnv } from "./types.js"; const endpoint = "https://mpp.dev/mcp/services"; @@ -11,7 +12,7 @@ describe("public health check", () => { it("emits healthy metrics for representative MCP checks", async () => { vi.stubGlobal("fetch", vi.fn(healthyFetch)); - const metrics = await new McpHealthChecker(endpoint).metrics(); + const metrics = await healthMetrics(env()); expect(metricValue(metrics, "mpp.discovery_mcp.health.ok")).toBe(1); expect(metricValue(metrics, "mpp.discovery_mcp.catalog.services")).toBe( @@ -33,7 +34,7 @@ describe("public health check", () => { }), ); - const metrics = await new McpHealthChecker(endpoint).metrics(); + const metrics = await healthMetrics(env()); expect(metricValue(metrics, "mpp.discovery_mcp.health.ok")).toBe(0); expect(metricValue(metrics, "mpp.discovery_mcp.health.failure.count")).toBe( @@ -133,3 +134,9 @@ function metricValue( ) { return metrics.find((metric) => metric.metric === name)?.value; } + +function env(): WorkerEnv { + return { + PUBLIC_MCP_ENDPOINT: endpoint, + } as WorkerEnv; +} diff --git a/workers/mcp-services/src/health.ts b/workers/mcp-services/src/health.ts index 6d75be7f..958ca63a 100644 --- a/workers/mcp-services/src/health.ts +++ b/workers/mcp-services/src/health.ts @@ -1,23 +1,17 @@ import { gauge, type MetricPoint } from "./datadog.js"; -import { - arrayValue, - HttpMcpClient, - numberValue, - object, - stringValue, -} from "./mcp-client.js"; import type { WorkerEnv } from "./types.js"; -type CheckFn = () => Promise; - -type HealthCheckResult = { +type JsonObject = Record; +type HealthResult = { name: string; ok: boolean; durationMs: number; error?: string; metrics?: MetricPoint[]; }; +type CheckFn = () => Promise; +const DEFAULT_ENDPOINT = "https://mpp.dev/mcp/services"; const MAX_CACHE_AGE_SECONDS = 3 * 60 * 60; const MIN_SERVICE_COUNT = 100; const MIN_OFFER_COUNT = 1000; @@ -27,187 +21,222 @@ const REQUIRED_TOOLS = [ "get_catalog_status", ]; -export class McpHealthChecker { - private readonly client: HttpMcpClient; - - constructor(private readonly endpoint: string) { - this.client = new HttpMcpClient(endpoint); +export async function healthMetrics(env: WorkerEnv): Promise { + const endpoint = env.PUBLIC_MCP_ENDPOINT || DEFAULT_ENDPOINT; + const checks = await runChecks(endpoint); + const failures = checks.filter((check) => !check.ok); + const metrics = [ + gauge("health.ok", failures.length === 0 ? 1 : 0, ["endpoint:public"]), + gauge("health.failure.count", failures.length, ["endpoint:public"]), + ]; + + for (const check of checks) { + const tags = [`check:${check.name}`, "endpoint:public"]; + metrics.push( + gauge("health.check.ok", check.ok ? 1 : 0, tags), + gauge("health.check.duration_ms", check.durationMs, tags), + ...(check.metrics ?? []), + ); } - static fromEnv(env: WorkerEnv): McpHealthChecker { - return new McpHealthChecker( - env.PUBLIC_MCP_ENDPOINT || "https://mpp.dev/mcp/services", + if (failures.length > 0) { + console.error( + JSON.stringify({ + message: "mcp.health_failed", + endpoint, + failures: failures.map((failure) => ({ + check: failure.name, + error: failure.error, + })), + }), ); } - async metrics(): Promise { - const results = await this.runChecks(); - const failures = results.filter((result) => !result.ok); - const metrics = [ - gauge("health.ok", failures.length === 0 ? 1 : 0, ["endpoint:public"]), - gauge("health.failure.count", failures.length, ["endpoint:public"]), - ]; - - for (const result of results) { - const tags = [`check:${result.name}`, "endpoint:public"]; - metrics.push( - gauge("health.check.ok", result.ok ? 1 : 0, tags), - gauge("health.check.duration_ms", result.durationMs, tags), - ...(result.metrics ?? []), - ); - } - - if (failures.length > 0) { - console.error( - JSON.stringify({ - message: "mcp.health_failed", - endpoint: this.endpoint, - failures: failures.map((failure) => ({ - check: failure.name, - error: failure.error, - })), - }), - ); - } - - return metrics; - } - - private runChecks(): Promise { - return this.sequential([ - ["get_card", () => this.assertServerCard()], - ["head", () => this.assertHead()], - ["initialize", () => this.assertInitialize()], - ["tools_list", () => this.checkToolsList()], - ["get_catalog_status", () => this.checkCatalogStatus()], - ["search_services", () => this.assertSearchServices()], - ]); - } - - private async sequential( - checks: Array<[string, CheckFn]>, - ): Promise { - const results: HealthCheckResult[] = []; - for (const [name, check] of checks) { - results.push(await this.measured(name, check)); - } - return results; - } - - private async measured( - name: string, - check: CheckFn, - ): Promise { - const startedAt = Date.now(); - try { - const metrics = await check(); - return { - name, - ok: true, - durationMs: Date.now() - startedAt, - ...(metrics ? { metrics } : {}), - }; - } catch (error) { - return { - name, - ok: false, - durationMs: Date.now() - startedAt, - error: errorMessage(error), - }; - } - } - - private async assertServerCard(): Promise { - const card = await this.client.serverCard(); - const transport = object(card.transport); - if (stringValue(object(card.serverInfo).name) !== "mpp-services-mcp") { - throw new Error("server card name mismatch"); - } - if (stringValue(transport.endpoint) !== this.endpoint) { - throw new Error("server card endpoint mismatch"); - } - if (!stringValue(card.instructions).includes("402")) { - throw new Error("server card is missing payment authority instructions"); - } - return undefined; - } - - private async assertHead(): Promise { - await this.client.head(); - return undefined; - } - - private async assertInitialize(): Promise { - const result = await this.client.initialize(); - if (stringValue(object(result.serverInfo).name) !== "mpp-services-mcp") { - throw new Error("initialize serverInfo mismatch"); - } - if (!stringValue(result.instructions).includes("402")) { - throw new Error("initialize instructions missing 402 authority text"); - } - return undefined; - } - - private async checkToolsList(): Promise { - const result = await this.client.listTools(); - const tools = arrayValue(result.tools); - const names = new Set( - tools - .map((tool) => object(tool).name) - .filter((name): name is string => typeof name === "string"), + return metrics; +} + +async function runChecks(endpoint: string): Promise { + const checks: Array<[string, CheckFn]> = [ + ["get_card", () => assertServerCard(endpoint)], + ["head", () => assertHead(endpoint)], + ["initialize", () => assertInitialize(endpoint)], + ["tools_list", () => checkTools(endpoint)], + ["get_catalog_status", () => checkCatalog(endpoint)], + ["search_services", () => assertSearch(endpoint)], + ]; + const results: HealthResult[] = []; + for (const [name, fn] of checks) results.push(await measure(name, fn)); + return results; +} + +async function measure(name: string, fn: CheckFn): Promise { + const startedAt = Date.now(); + try { + const metrics = await fn(); + return { + name, + ok: true, + durationMs: Date.now() - startedAt, + ...(metrics ? { metrics } : {}), + }; + } catch (error) { + return { + name, + ok: false, + durationMs: Date.now() - startedAt, + error: errorMessage(error), + }; + } +} + +async function assertServerCard(endpoint: string): Promise { + const card = await fetchJson(endpoint, { + headers: { accept: "application/json" }, + }); + if (stringValue(object(card.serverInfo).name) !== "mpp-services-mcp") { + throw new Error("server card name mismatch"); + } + if (stringValue(object(card.transport).endpoint) !== endpoint) { + throw new Error("server card endpoint mismatch"); + } + if (!stringValue(card.instructions).includes("402")) { + throw new Error("server card missing 402 instructions"); + } + return undefined; +} + +async function assertHead(endpoint: string): Promise { + const response = await fetch(endpoint, { method: "HEAD" }); + if (response.status !== 200) { + throw new Error(`HEAD expected 200, received ${response.status}`); + } + return undefined; +} + +async function assertInitialize(endpoint: string): Promise { + const result = await rpc(endpoint, "initialize"); + if (stringValue(object(result.serverInfo).name) !== "mpp-services-mcp") { + throw new Error("initialize serverInfo mismatch"); + } + if (!stringValue(result.instructions).includes("402")) { + throw new Error("initialize missing 402 instructions"); + } + return undefined; +} + +async function checkTools(endpoint: string): Promise { + const result = await rpc(endpoint, "tools/list"); + const tools = arrayValue(result.tools); + const names = new Set( + tools + .map((tool) => object(tool).name) + .filter((name): name is string => typeof name === "string"), + ); + for (const tool of REQUIRED_TOOLS) { + if (!names.has(tool)) throw new Error(`missing tool ${tool}`); + } + return [gauge("tools.advertised", tools.length)]; +} + +async function checkCatalog(endpoint: string): Promise { + const content = object( + (await callTool(endpoint, "get_catalog_status", {})).structuredContent, + ); + const serviceCount = numberValue(content.serviceCount); + const offerCount = numberValue(content.offerCount); + const cacheAgeSeconds = numberValue(content.cacheAgeSeconds); + + if (serviceCount < MIN_SERVICE_COUNT) { + throw new Error(`serviceCount below ${MIN_SERVICE_COUNT}`); + } + if (offerCount < MIN_OFFER_COUNT) { + throw new Error(`offerCount below ${MIN_OFFER_COUNT}`); + } + if (cacheAgeSeconds > MAX_CACHE_AGE_SECONDS) { + throw new Error(`cacheAgeSeconds above ${MAX_CACHE_AGE_SECONDS}`); + } + + return [ + gauge("catalog.services", serviceCount), + gauge("catalog.offers", offerCount), + gauge("catalog.cache_age_seconds", cacheAgeSeconds), + ]; +} + +async function assertSearch(endpoint: string): Promise { + const content = object( + (await callTool(endpoint, "search_services", { category: "ai", limit: 1 })) + .structuredContent, + ); + if (numberValue(content.total) <= 0 || numberValue(content.returned) <= 0) { + throw new Error("expected at least one returned ai service"); + } + return undefined; +} + +async function callTool( + endpoint: string, + name: string, + args: JsonObject, +): Promise { + const result = await rpc(endpoint, "tools/call", { + name, + arguments: args, + }); + if (result.isError === true) + throw new Error(`tool returned isError: ${name}`); + return result; +} + +async function rpc( + endpoint: string, + method: string, + params: JsonObject = {}, +): Promise { + const body = await fetchJson(endpoint, { + method: "POST", + headers: { + accept: "application/json", + "content-type": "application/json", + }, + body: JSON.stringify({ jsonrpc: "2.0", id: 1, method, params }), + }); + if (body.error) { + throw new Error( + `json-rpc error: ${stringValue(object(body.error).message)}`, ); - for (const required of REQUIRED_TOOLS) { - if (!names.has(required)) throw new Error(`missing tool ${required}`); - } - return [gauge("tools.advertised", tools.length)]; - } - - private async checkCatalogStatus(): Promise { - const result = await this.checkedTool("get_catalog_status", {}); - const content = object(result.structuredContent); - const serviceCount = numberValue(content.serviceCount); - const offerCount = numberValue(content.offerCount); - const cacheAgeSeconds = numberValue(content.cacheAgeSeconds); - - if (serviceCount < MIN_SERVICE_COUNT) { - throw new Error(`serviceCount below ${MIN_SERVICE_COUNT}`); - } - if (offerCount < MIN_OFFER_COUNT) { - throw new Error(`offerCount below ${MIN_OFFER_COUNT}`); - } - if (cacheAgeSeconds > MAX_CACHE_AGE_SECONDS) { - throw new Error(`cacheAgeSeconds above ${MAX_CACHE_AGE_SECONDS}`); - } - - return [ - gauge("catalog.services", serviceCount), - gauge("catalog.offers", offerCount), - gauge("catalog.cache_age_seconds", cacheAgeSeconds), - ]; - } - - private async assertSearchServices(): Promise { - const result = await this.checkedTool("search_services", { - category: "ai", - limit: 1, - }); - const content = object(result.structuredContent); - if (numberValue(content.total) <= 0) { - throw new Error("expected at least one ai service"); - } - if (numberValue(content.returned) <= 0) { - throw new Error("expected one returned ai service"); - } - return undefined; - } - - private async checkedTool(name: string, args: Record) { - const result = await this.client.callTool(name, args); - if (result.isError === true) { - throw new Error(`tool returned isError: ${name}`); - } - return result; } + return object(body.result); +} + +async function fetchJson( + endpoint: string, + init?: RequestInit, +): Promise { + const response = await fetch(endpoint, init); + if (response.status !== 200) { + throw new Error(`expected 200, received ${response.status}`); + } + return object(await response.json()); +} + +function object(value: unknown): JsonObject { + if (typeof value === "object" && value !== null && !Array.isArray(value)) { + return value as JsonObject; + } + return {}; +} + +function arrayValue(value: unknown): unknown[] { + return Array.isArray(value) ? value : []; +} + +function stringValue(value: unknown): string { + return typeof value === "string" ? value : ""; +} + +function numberValue(value: unknown): number { + return typeof value === "number" && Number.isFinite(value) ? value : 0; } function errorMessage(error: unknown): string { diff --git a/workers/mcp-services/src/index.ts b/workers/mcp-services/src/index.ts index 47d0b24b..83292b0d 100644 --- a/workers/mcp-services/src/index.ts +++ b/workers/mcp-services/src/index.ts @@ -1,9 +1,14 @@ import { refreshCatalog } from "./cache.js"; -import { DatadogMetricsClient, gauge, type MetricPoint } from "./datadog.js"; +import { + count, + gauge, + type MetricPoint, + postMetrics, + queueMetrics, +} from "./datadog.js"; import { countPaymentOffers } from "./discovery.js"; -import { McpHealthChecker } from "./health.js"; +import { healthMetrics } from "./health.js"; import { handleMcp, jsonHeaders, optionsResponse, serverCard } from "./mcp.js"; -import { RequestMetricsRecorder } from "./request-metrics.js"; import type { WorkerEnv } from "./types.js"; export default { @@ -14,11 +19,10 @@ export default { ): Promise { const url = new URL(request.url); const publicEndpoint = publicMcpEndpoint(url, env); - const metrics = new DatadogMetricsClient(env); - const requestMetrics = new RequestMetricsRecorder(metrics); - return requestMetrics.trace( + return trackRequest( request, + env, ctx, routeName(url.pathname, request.method), async () => { @@ -69,60 +73,41 @@ export default { env: WorkerEnv, ctx: ExecutionContext, ) { - const metrics = new DatadogMetricsClient(env); - if (event.cron === "* * * * *") { - ctx.waitUntil(runScheduledHealthCheck(event, env, metrics, ctx)); + ctx.waitUntil(runScheduledHealthCheck(event, env)); return; } - const startedAt = Date.now(); - ctx.waitUntil( - refreshCatalog(env) - .then((catalog) => { - metrics.queue( - ctx, - catalogRefreshMetrics({ - ok: true, - durationMs: Date.now() - startedAt, - services: catalog.services.length, - offers: countPaymentOffers(catalog.services), - }), - ); - console.log( - JSON.stringify({ - message: "catalog.refresh_complete", - cron: event.cron, - scheduledTime: new Date(event.scheduledTime).toISOString(), - durationMs: Date.now() - startedAt, - version: catalog.version, - services: catalog.services.length, - fetchedAt: catalog.fetchedAt, - }), - ); - }) - .catch((error) => { - metrics.queue( - ctx, - catalogRefreshMetrics({ - ok: false, - durationMs: Date.now() - startedAt, - }), - ); - console.error( - JSON.stringify({ - message: "catalog.refresh_failed", - cron: event.cron, - scheduledTime: new Date(event.scheduledTime).toISOString(), - durationMs: Date.now() - startedAt, - error: error instanceof Error ? error.message : String(error), - }), - ); - }), - ); + ctx.waitUntil(runCatalogRefresh(event, env)); }, } satisfies ExportedHandler; +async function trackRequest( + request: Request, + env: WorkerEnv, + ctx: ExecutionContext, + route: string, + handler: () => Promise, +): Promise { + const startedAt = Date.now(); + try { + const response = await handler(); + queueMetrics( + ctx, + env, + requestMetrics(request, route, response.status, Date.now() - startedAt), + ); + return response; + } catch (error) { + queueMetrics( + ctx, + env, + requestMetrics(request, route, 500, Date.now() - startedAt, true), + ); + throw error; + } +} + function publicMcpEndpoint(url: URL, env: WorkerEnv): string { return env.PUBLIC_MCP_ENDPOINT || `${url.origin}/mcp/services`; } @@ -130,12 +115,9 @@ function publicMcpEndpoint(url: URL, env: WorkerEnv): string { async function runScheduledHealthCheck( event: ScheduledController, env: WorkerEnv, - metrics: DatadogMetricsClient, - ctx: ExecutionContext, ): Promise { const startedAt = Date.now(); - const healthMetrics = await McpHealthChecker.fromEnv(env).metrics(); - metrics.queue(ctx, healthMetrics); + await postMetrics(env, await healthMetrics(env)).catch(logMetricsError); console.log( JSON.stringify({ message: "mcp.health_complete", @@ -146,6 +128,53 @@ async function runScheduledHealthCheck( ); } +async function runCatalogRefresh( + event: ScheduledController, + env: WorkerEnv, +): Promise { + const startedAt = Date.now(); + try { + const catalog = await refreshCatalog(env); + await postMetrics( + env, + catalogRefreshMetrics({ + ok: true, + durationMs: Date.now() - startedAt, + services: catalog.services.length, + offers: countPaymentOffers(catalog.services), + }), + ).catch(logMetricsError); + console.log( + JSON.stringify({ + message: "catalog.refresh_complete", + cron: event.cron, + scheduledTime: new Date(event.scheduledTime).toISOString(), + durationMs: Date.now() - startedAt, + version: catalog.version, + services: catalog.services.length, + fetchedAt: catalog.fetchedAt, + }), + ); + } catch (error) { + await postMetrics( + env, + catalogRefreshMetrics({ + ok: false, + durationMs: Date.now() - startedAt, + }), + ).catch(logMetricsError); + console.error( + JSON.stringify({ + message: "catalog.refresh_failed", + cron: event.cron, + scheduledTime: new Date(event.scheduledTime).toISOString(), + durationMs: Date.now() - startedAt, + error: errorMessage(error), + }), + ); + } +} + function routeName(pathname: string, method: string): string { if (method === "OPTIONS") return "options"; if ( @@ -159,6 +188,27 @@ function routeName(pathname: string, method: string): string { return "not_found"; } +function requestMetrics( + request: Request, + route: string, + status: number, + durationMs: number, + thrown = false, +): MetricPoint[] { + const tags = [ + `route:${route}`, + `method:${request.method}`, + `status_class:${statusClass(status)}`, + ]; + return [ + count("http.request.count", 1, tags), + gauge("http.response.duration_ms", durationMs, tags), + ...(thrown || status >= 500 + ? [count("http.error.count", 1, [...tags, "error_class:server"])] + : []), + ]; +} + function catalogRefreshMetrics({ ok, durationMs, @@ -182,3 +232,24 @@ function catalogRefreshMetrics({ : []), ]; } + +function statusClass(status: number): string { + if (status >= 500) return "5xx"; + if (status >= 400) return "4xx"; + if (status >= 300) return "3xx"; + if (status >= 200) return "2xx"; + return "other"; +} + +function logMetricsError(error: unknown): void { + console.error( + JSON.stringify({ + message: "datadog.metrics_failed", + error: errorMessage(error), + }), + ); +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/workers/mcp-services/src/mcp-client.ts b/workers/mcp-services/src/mcp-client.ts deleted file mode 100644 index ed3a3c52..00000000 --- a/workers/mcp-services/src/mcp-client.ts +++ /dev/null @@ -1,104 +0,0 @@ -type JsonObject = Record; - -type RpcRequest = { - method: string; - params?: JsonObject; -}; - -const DEFAULT_TIMEOUT_MS = 10000; - -export class HttpMcpClient { - constructor( - private readonly endpoint: string, - private readonly timeoutMs = DEFAULT_TIMEOUT_MS, - ) {} - - async serverCard(): Promise { - const response = await this.fetch({ - headers: { accept: "application/json" }, - }); - if (response.status !== 200) { - throw new Error(`expected 200, received ${response.status}`); - } - return object(await response.json()); - } - - async head(): Promise { - const response = await this.fetch({ method: "HEAD" }); - if (response.status !== 200) { - throw new Error(`expected 200, received ${response.status}`); - } - } - - initialize(): Promise { - return this.rpc({ method: "initialize", params: {} }); - } - - listTools(): Promise { - return this.rpc({ method: "tools/list", params: {} }); - } - - callTool(name: string, args: JsonObject): Promise { - return this.rpc({ - method: "tools/call", - params: { name, arguments: args }, - }); - } - - private async rpc(request: RpcRequest): Promise { - const response = await this.fetch({ - method: "POST", - headers: { - accept: "application/json", - "content-type": "application/json", - }, - body: JSON.stringify({ - jsonrpc: "2.0", - id: 1, - method: request.method, - params: request.params ?? {}, - }), - }); - - if (response.status !== 200) { - throw new Error(`expected 200, received ${response.status}`); - } - - const body = object(await response.json()); - if (body.error) { - throw new Error( - `json-rpc error: ${stringValue(object(body.error).message)}`, - ); - } - return object(body.result); - } - - private async fetch(init?: RequestInit): Promise { - const controller = new AbortController(); - const timeout = setTimeout(() => controller.abort(), this.timeoutMs); - try { - return await fetch(this.endpoint, { ...init, signal: controller.signal }); - } finally { - clearTimeout(timeout); - } - } -} - -export function object(value: unknown): JsonObject { - if (typeof value === "object" && value !== null && !Array.isArray(value)) { - return value as JsonObject; - } - return {}; -} - -export function arrayValue(value: unknown): unknown[] { - return Array.isArray(value) ? value : []; -} - -export function stringValue(value: unknown): string { - return typeof value === "string" ? value : ""; -} - -export function numberValue(value: unknown): number { - return typeof value === "number" && Number.isFinite(value) ? value : 0; -} diff --git a/workers/mcp-services/src/request-metrics.ts b/workers/mcp-services/src/request-metrics.ts deleted file mode 100644 index 546649ee..00000000 --- a/workers/mcp-services/src/request-metrics.ts +++ /dev/null @@ -1,181 +0,0 @@ -import { - count, - type DatadogMetricsClient, - gauge, - type MetricPoint, -} from "./datadog.js"; - -type McpRequestSummary = { - method?: string; - tool?: string; -}; - -const KNOWN_TOOLS = new Set([ - "list_services", - "search_services", - "search_offers", - "recommend_services", - "get_usage_recipe", - "get_facets", - "get_services_by_recipient", - "get_catalog_status", - "get_service", - "get_offers", - "get_openapi", -]); - -export class RequestMetricsRecorder { - constructor(private readonly metrics: DatadogMetricsClient) {} - - async trace( - request: Request, - ctx: ExecutionContext, - route: string, - handler: () => Promise, - ): Promise { - const startedAt = Date.now(); - const summaryPromise = - route === "mcp" - ? summarizeMcpRequest(request) - : Promise.resolve(undefined); - - try { - const response = await handler(); - this.queue(ctx, { - request, - route, - status: response.status, - durationMs: Date.now() - startedAt, - summaryPromise, - thrown: false, - }); - return response; - } catch (error) { - this.queue(ctx, { - request, - route, - status: 500, - durationMs: Date.now() - startedAt, - summaryPromise, - thrown: true, - }); - throw error; - } - } - - private queue(ctx: ExecutionContext, sample: RequestMetricSample): void { - ctx.waitUntil( - sample.summaryPromise - .then((summary) => - this.metrics.send( - buildRequestMetrics({ - ...sample, - summary, - }), - ), - ) - .catch((error) => { - console.error( - JSON.stringify({ - message: "datadog.request_metrics_failed", - error: errorMessage(error), - }), - ); - }), - ); - } -} - -type RequestMetricSample = { - request: Request; - route: string; - status: number; - durationMs: number; - summaryPromise: Promise; - thrown: boolean; -}; - -type ResolvedRequestMetricSample = Omit< - RequestMetricSample, - "summaryPromise" -> & { - summary?: McpRequestSummary; -}; - -function buildRequestMetrics( - sample: ResolvedRequestMetricSample, -): MetricPoint[] { - const tags = [ - `route:${sample.route}`, - `method:${sample.request.method}`, - `status_class:${statusClass(sample.status)}`, - ]; - const metrics = [ - count("http.request.count", 1, tags), - gauge("http.response.duration_ms", sample.durationMs, tags), - ]; - - if (sample.summary?.method) { - metrics.push(...mcpMetrics(sample)); - } - - return metrics; -} - -function mcpMetrics(sample: ResolvedRequestMetricSample): MetricPoint[] { - const status = sample.thrown || sample.status >= 500 ? "server_error" : "ok"; - const tags = [ - `mcp_method:${sample.summary?.method}`, - ...(sample.summary?.tool ? [`tool:${sample.summary.tool}`] : []), - `status:${status}`, - ]; - return [ - count("mcp.request.count", 1, tags), - gauge("mcp.response.duration_ms", sample.durationMs, tags), - ...(status === "server_error" - ? [count("mcp.error.count", 1, [...tags, "error_class:server"])] - : []), - ]; -} - -async function summarizeMcpRequest( - request: Request, -): Promise { - try { - const body = await request.clone().json(); - const message = Array.isArray(body) ? body[0] : body; - if (typeof message !== "object" || message === null) return undefined; - const method = stringValue((message as { method?: unknown }).method); - const params = (message as { params?: unknown }).params; - if ( - method !== "tools/call" || - typeof params !== "object" || - params === null - ) { - return method ? { method } : undefined; - } - const rawTool = stringValue((params as { name?: unknown }).name); - return { - method, - tool: rawTool && KNOWN_TOOLS.has(rawTool) ? rawTool : "unknown", - }; - } catch { - return undefined; - } -} - -function stringValue(value: unknown): string | undefined { - return typeof value === "string" && value.length > 0 ? value : undefined; -} - -function statusClass(status: number): string { - if (status >= 500) return "5xx"; - if (status >= 400) return "4xx"; - if (status >= 300) return "3xx"; - if (status >= 200) return "2xx"; - return "other"; -} - -function errorMessage(error: unknown): string { - return error instanceof Error ? error.message : String(error); -}