diff --git a/CHANGES.md b/CHANGES.md index ace4583ef..7dfe26199 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -126,6 +126,18 @@ To be released. held activities call the outbox permanent failure handler with `reason: "circuit-breaker-ttl"`. [[#620], [#778]] + - Added `benchmarkMode` to `createFederation()` and + `FederationBuilder.build()` for cooperative federation benchmarking. + When enabled, Fedify exposes `GET /.well-known/fedify/bench/stats` + for in-process OpenTelemetry metric snapshots and + `POST /.well-known/fedify/bench/trigger` for driving `sendActivity()` + to server-configured benchmark sink recipients. Benchmark mode also + defaults `allowPrivateAddress` to `true` when built-in loaders are used, + defaults `signatureTimeWindow` to `false`, reports queue depth through + the new `fedify.queue.depth` gauge, and adds explicit low-latency + buckets to the signature verification duration histogram. + [[#744], [#782], [#787]] + - Added OpenTelemetry metrics for ActivityPub fanout and activity lifecycle events, complementing the per-recipient `activitypub.delivery.*` counters and the per-task @@ -248,6 +260,7 @@ To be released. [#740]: https://github.com/fedify-dev/fedify/issues/740 [#741]: https://github.com/fedify-dev/fedify/issues/741 [#742]: https://github.com/fedify-dev/fedify/issues/742 +[#744]: https://github.com/fedify-dev/fedify/issues/744 [#748]: https://github.com/fedify-dev/fedify/pull/748 [#752]: https://github.com/fedify-dev/fedify/issues/752 [#753]: https://github.com/fedify-dev/fedify/pull/753 @@ -261,6 +274,8 @@ To be released. [#772]: https://github.com/fedify-dev/fedify/pull/772 [#777]: https://github.com/fedify-dev/fedify/pull/777 [#778]: https://github.com/fedify-dev/fedify/pull/778 +[#782]: https://github.com/fedify-dev/fedify/issues/782 +[#787]: https://github.com/fedify-dev/fedify/pull/787 ### @fedify/fixture diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 7fa2d3978..a71503133 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -154,6 +154,7 @@ const MANUAL = { { text: "Linting", link: "/manual/lint.md" }, { text: "Logging", link: "/manual/log.md" }, { text: "OpenTelemetry", link: "/manual/opentelemetry.md" }, + { text: "Benchmarking", link: "/manual/benchmarking.md" }, { text: "Deployment", link: "/manual/deploy.md" }, ], activeMatch: "/manual", diff --git a/docs/manual/benchmarking.md b/docs/manual/benchmarking.md new file mode 100644 index 000000000..022ba37a1 --- /dev/null +++ b/docs/manual/benchmarking.md @@ -0,0 +1,173 @@ +--- +description: >- + Fedify can expose cooperative benchmark endpoints for measuring federation + workloads without requiring an external metrics backend. +--- + +Benchmarking +============ + +*This API is available since Fedify 2.3.0.* + +Fedify can run as a cooperative benchmark target by enabling +`~FederationOptions.benchmarkMode`. This mode exposes local benchmark +endpoints under `/.well-known/fedify/bench/` and configures an in-process +OpenTelemetry metrics reader so benchmark clients can collect server-side +measurements without a separate metrics backend. + +> [!WARNING] +> Do not enable `benchmarkMode` in production. It is intended for benchmark +> targets that you control. + + +Enabling benchmark mode +----------------------- + +Enable `benchmarkMode` when creating the `Federation` object. If you use the +benchmark trigger endpoint, configure the sink inboxes on the server: + +~~~~ typescript twoslash +import type { KvStore } from "@fedify/fedify"; +// ---cut-before--- +import { createFederation } from "@fedify/fedify"; + +const federation = createFederation({ +// ---cut-start--- + kv: null as unknown as KvStore, +// ---cut-end--- + benchmarkMode: { + triggerSinks: ["https://sink.example/inbox"], + }, +}); +~~~~ + +When enabled, Fedify changes only benchmark-target defaults: + + - `~FederationOptions.allowPrivateAddress` defaults to `true`, unless a + custom document loader factory is configured. + - `~FederationOptions.signatureTimeWindow` defaults to `false`. + - Explicit `allowPrivateAddress` and `signatureTimeWindow` values still win. + - Inbox idempotency is unchanged. Benchmark clients that need repeated + deliveries should mint unique activity IDs. + +If you provide `meterProvider` together with `benchmarkMode`, Fedify throws a +`TypeError`. OpenTelemetry metric readers have to be attached when a +`MeterProvider` is constructed, so benchmark mode owns its in-process provider. + +If the same application code sometimes runs with benchmark mode and sometimes +runs with your normal OpenTelemetry pipeline, pass your application +`meterProvider` only when benchmark mode is off: + +~~~~ typescript twoslash +import type { KvStore } from "@fedify/fedify"; +import type { MeterProvider } from "@opentelemetry/api"; +// ---cut-start--- +declare const process: { env: Record }; +const kv = null as unknown as KvStore; +const meterProvider = null as unknown as MeterProvider; +// ---cut-end--- +import { createFederation } from "@fedify/fedify"; + +const benchmarkEnabled = process.env.FEDIFY_BENCHMARK === "1"; + +const federation = createFederation({ + kv, + benchmarkMode: benchmarkEnabled + ? { triggerSinks: ["https://sink.example/inbox"] } + : false, + meterProvider: benchmarkEnabled ? undefined : meterProvider, +}); +~~~~ + + +Benchmark stats endpoint +------------------------ + +`GET /.well-known/fedify/bench/stats` returns a versioned JSON snapshot of the +server-side metrics collected by the benchmark mode reader: + +~~~~ json +{ + "version": 1, + "source": "server", + "generatedAt": "2026-06-02T00:00:00.000Z", + "scopeMetrics": [], + "errors": [] +} +~~~~ + +The `scopeMetrics` field contains serialized OpenTelemetry scope metrics. +Observable queue depth is included when configured queues implement +`MessageQueue.getDepth()`. + + +Benchmark trigger endpoint +-------------------------- + +`POST /.well-known/fedify/bench/trigger` asks the target application to call +`Context.sendActivity()` with an explicit sender, recipients, and activity. +This exercises the target's normal outbox and queue path. + +The request body has this shape: + +~~~~ json +{ + "sender": { "identifier": "alice" }, + "recipients": [ + { + "@context": "https://www.w3.org/ns/activitystreams", + "type": "Service", + "id": "https://sink.example/actors/bob", + "inbox": "https://sink.example/inbox" + } + ], + "activity": { + "@context": "https://www.w3.org/ns/activitystreams", + "type": "Create", + "id": "https://example.com/activities/bench-1", + "actor": "https://example.com/users/alice", + "object": { + "type": "Note", + "id": "https://example.com/notes/bench-1", + "content": "benchmark" + } + } +} +~~~~ + +The `sender` must be either `{ "identifier": string }` or +`{ "username": string }`. Recipients are parsed as ActivityPub actors and must +have `id` and `inbox` properties. The activity is parsed as an ActivityPub +`Activity`. + +By default, every recipient inbox must appear in the server-configured +`~FederationBenchmarkOptions.triggerSinks` list. This keeps benchmark traffic +pointed at benchmark sink inboxes and prevents callers from choosing their own +allowlist. To bypass this guard for a controlled run, set +`~FederationBenchmarkOptions.allowUnsafeTriggerRecipients` to `true` in the +application configuration. + +A successful trigger returns `202 Accepted`: + +~~~~ json +{ + "version": 1, + "activityId": "https://example.com/activities/bench-1", + "queueCorrelationId": "https://example.com/activities/bench-1", + "recipientCount": 1, + "inboxCount": 1 +} +~~~~ + +The `queueCorrelationId` is the activity ID preserved on the queued fanout or +outbox work. + + +Metrics +------- + +Benchmark mode uses the same Fedify metrics documented in +[*OpenTelemetry*](./opentelemetry.md), including queue task metrics, queue +depth, HTTP server metrics, and signature verification histograms. The +benchmark endpoints themselves are classified as `fedify.endpoint=benchmark` +in `fedify.http.server.request.*` metrics. diff --git a/docs/manual/federation.md b/docs/manual/federation.md index cefea841e..42bb3c558 100644 --- a/docs/manual/federation.md +++ b/docs/manual/federation.md @@ -275,6 +275,27 @@ Turned off by default. [SSRF]: https://owasp.org/www-community/attacks/Server_Side_Request_Forgery +### `benchmarkMode` + +*This API is available since Fedify 2.3.0.* + +Whether to enable cooperative benchmark mode. When enabled, Fedify exposes +benchmark endpoints under `/.well-known/fedify/bench/` and configures an +in-process metrics reader for benchmark clients. + +This mode changes only benchmark-target defaults: + + - `allowPrivateAddress` defaults to `true`, unless a custom document loader + factory is configured. + - `signatureTimeWindow` defaults to `false`. + - Explicit option values still win. + +> [!WARNING] +> Do not enable `benchmarkMode` in production. + +See the [*Benchmarking* section](./benchmarking.md) for endpoint details and +safety rules. + ### `userAgent` *This API is available since Fedify 1.3.0.* diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index f2f51b931..b0d7476bf 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -370,6 +370,7 @@ Fedify records the following OpenTelemetry metrics: | `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. | | `fedify.queue.task.duration` | Histogram | `ms` | Measures queue task processing duration in Fedify workers. | | `fedify.queue.task.in_flight` | UpDownCounter | `{task}` | Tracks queue tasks currently in flight in this Fedify process. | +| `fedify.queue.depth` | Gauge | `{message}` | Reports queued, ready, and delayed queue depth when the queue backend supports it. | ### Metric attributes @@ -841,6 +842,17 @@ Fedify records the following OpenTelemetry metrics: Fedify process*, not cross-process totals. Aggregate it across replicas in your metrics backend. +`fedify.queue.depth` +: `fedify.queue.depth.state` is always present and is one of `queued`, + `ready`, or `delayed`. `fedify.queue.role` is `inbox`, `outbox`, + `fanout`, or `shared`; `shared` means the same queue instance backs more + than one Fedify queue role, and `fedify.queue.roles` lists those roles as a + comma-separated string. `fedify.queue.backend` and + `fedify.queue.native_retrial` follow the same rules as the queue task + metrics. `fedify.federation.instance_id` is an opaque per-Federation + instance identifier that keeps queue depth series distinct when multiple + Federation instances share one [`MeterProvider`]. + The `fedify.queue.task.*` metrics describe what Fedify's workers do with queued messages. They complement the backend-side [`MessageQueue.getDepth()` API](./mq.md#queue-depth-reporting), which @@ -849,6 +861,10 @@ Reading both signals together (task throughput plus backlog depth) makes it possible to distinguish a small, slow queue from a large, fast one and to set alerting thresholds for delivery latency under load. +When [`benchmarkMode`](./benchmarking.md) is enabled, Fedify serves a +versioned snapshot of these in-process metrics from +`/.well-known/fedify/bench/stats`. + The `activitypub.inbox.activity`, `activitypub.outbox.activity`, and `activitypub.fanout.recipients` metrics describe what is happening at the *activity* level, complementing the per-recipient @@ -951,6 +967,7 @@ for ActivityPub: | `docloader.document_url` | string | The final URL of the fetched document (after following redirects). | `"https://example.com/object/1"` | | `fedify.actor.identifier` | string | The identifier of the actor. | `"1"` | | `fedify.endpoint` | string | The bounded endpoint category that classified an inbound HTTP request handled by `Federation.fetch()`. | `"actor"` | +| `fedify.federation.instance_id` | string | Opaque per-Federation instance identifier used to distinguish queue depth series on a shared `MeterProvider`. | `"fedify-1"` | | `fedify.route.template` | string | The matched URI Template, with parameter names (not values). | `"/users/{identifier}"` | | `fedify.inbox.recipient` | string | The identifier of the inbox recipient. | `"1"` | | `fedify.object.type` | string | The URI of the object type. | `"https://www.w3.org/ns/activitystreams#Note"` | @@ -958,9 +975,11 @@ for ActivityPub: | `fedify.collection.dispatcher` | string | The collection dispatcher family: `built_in` or `custom`. | `"built_in"` | | `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | | `fedify.collection.items` | number | The number of materialized items in the collection response or page. It can be less than the total items. | `10` | -| `fedify.queue.role` | string | The Fedify queue role for the task: `inbox`, `outbox`, or `fanout`. | `"outbox"` | +| `fedify.queue.role` | string | The Fedify queue role: `inbox`, `outbox`, `fanout`, or `shared` for queue depth rows where one queue backs multiple roles. | `"outbox"` | | `fedify.queue.backend` | string | The queue implementation's constructor name (best-effort backend identifier). | `"RedisMessageQueue"` | | `fedify.queue.native_retrial` | boolean | Whether the queue backend declares `nativeRetrial`, meaning Fedify defers retry handling to the backend. | `true` | +| `fedify.queue.depth.state` | string | Queue depth count kind: `queued`, `ready`, or `delayed`. | `"queued"` | +| `fedify.queue.roles` | string | Comma-separated queue roles when one queue instance backs multiple roles. | `"fanout,inbox,outbox"` | | `fedify.queue.task.attempt` | int | The zero-based attempt number recorded on `fedify.queue.task.enqueued`; non-zero for retry re-enqueues. | `1` | | `fedify.queue.task.result` | string | The terminal outcome of queue task processing: `completed`, `failed`, or `aborted`. | `"failed"` | | `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | diff --git a/packages/fedify/package.json b/packages/fedify/package.json index d5e13579a..216e72cd2 100644 --- a/packages/fedify/package.json +++ b/packages/fedify/package.json @@ -147,6 +147,7 @@ "@logtape/logtape": "catalog:", "@opentelemetry/api": "catalog:", "@opentelemetry/core": "catalog:", + "@opentelemetry/sdk-metrics": "catalog:", "@opentelemetry/sdk-trace-base": "catalog:", "@opentelemetry/semantic-conventions": "catalog:", "byte-encodings": "catalog:", @@ -159,7 +160,6 @@ "devDependencies": { "@fedify/fixture": "workspace:*", "@fedify/vocab-tools": "workspace:^", - "@opentelemetry/sdk-metrics": "catalog:", "@std/assert": "jsr:^0.226.0", "@std/path": "catalog:", "@types/node": "^24.2.1", diff --git a/packages/fedify/src/federation/bench.ts b/packages/fedify/src/federation/bench.ts new file mode 100644 index 000000000..7ab1910d8 --- /dev/null +++ b/packages/fedify/src/federation/bench.ts @@ -0,0 +1,386 @@ +import { Activity, Object as VocabObject, type Recipient } from "@fedify/vocab"; +import { + DataPointType, + type ExponentialHistogram, + type Histogram, + MeterProvider, + type MetricData, + MetricReader, + type ResourceMetrics, + type ScopeMetrics, +} from "@opentelemetry/sdk-metrics"; +import type { Context } from "./context.ts"; +import { extractInboxes } from "./send.ts"; + +/** + * Metric reader owned by `benchmarkMode`. + * @since 2.3.0 + */ +export class BenchmarkMetricReader extends MetricReader { + protected onShutdown(): Promise { + return Promise.resolve(); + } + + protected onForceFlush(): Promise { + return Promise.resolve(); + } +} + +/** + * Creates the in-process OpenTelemetry meter provider used by benchmark mode. + * @returns The meter provider and the metric reader attached to it. + * @since 2.3.0 + */ +export function createBenchmarkMeterProvider(): { + readonly meterProvider: MeterProvider; + readonly reader: BenchmarkMetricReader; +} { + const reader = new BenchmarkMetricReader(); + return { + meterProvider: new MeterProvider({ readers: [reader] }), + reader, + }; +} + +/** + * A serialized snapshot of all benchmark-mode OpenTelemetry metrics. + * + * The `scopeMetrics` field contains the collected metrics grouped by + * instrumentation scope. The `errors` field contains stringified collection + * errors reported by the metric reader. + * @since 2.3.0 + */ +export interface BenchmarkMetricSnapshot { + /** The schema version of this snapshot shape. */ + readonly version: 1; + /** The snapshot source. Always `"server"` for Fedify benchmark targets. */ + readonly source: "server"; + /** The ISO 8601 time when the snapshot was generated. */ + readonly generatedAt: string; + /** Metrics grouped by OpenTelemetry instrumentation scope. */ + readonly scopeMetrics: readonly BenchmarkScopeMetrics[]; + /** Stringified metric collection errors, if any. */ + readonly errors: readonly string[]; +} + +/** + * Metrics collected from one OpenTelemetry instrumentation scope. + * @since 2.3.0 + */ +export interface BenchmarkScopeMetrics { + /** The OpenTelemetry instrumentation scope descriptor. */ + readonly scope: { + /** The instrumentation scope name. */ + readonly name: string; + /** The instrumentation scope version, if provided. */ + readonly version?: string; + }; + /** The metrics emitted by the scope. */ + readonly metrics: readonly BenchmarkMetric[]; +} + +/** + * A serialized OpenTelemetry metric in a benchmark snapshot. + * @since 2.3.0 + */ +export interface BenchmarkMetric { + /** The OpenTelemetry metric name. */ + readonly name: string; + /** The OpenTelemetry metric description. */ + readonly description: string; + /** The OpenTelemetry metric unit, such as `ms` or `{count}`. */ + readonly unit: string; + /** The metric data point kind. */ + readonly dataPointType: + | "histogram" + | "exponential_histogram" + | "gauge" + | "sum"; + /** The serialized data points for the metric. */ + readonly dataPoints: readonly BenchmarkDataPoint[]; +} + +/** + * A serialized OpenTelemetry metric data point. + * + * The timestamp fields use OpenTelemetry high-resolution time tuples. + * Histogram values preserve their SDK histogram shape, including bucket + * boundaries and counts. + * @since 2.3.0 + */ +export interface BenchmarkDataPoint { + /** The metric attributes attached to the data point. */ + readonly attributes: Record; + /** The OpenTelemetry data point start time. */ + readonly startTime: readonly [number, number]; + /** The OpenTelemetry data point end time. */ + readonly endTime: readonly [number, number]; + /** The data point value or histogram payload. */ + readonly value: + | number + | Histogram + | ExponentialHistogram; +} + +/** + * Collects and serializes benchmark-mode metrics from a benchmark reader. + * @param reader The benchmark metric reader to collect from. + * @returns A server metric snapshot with any collection errors stringified. + * @since 2.3.0 + */ +export async function collectBenchmarkMetrics( + reader: BenchmarkMetricReader, +): Promise { + const result = await reader.collect(); + return { + version: 1, + source: "server", + generatedAt: new Date().toISOString(), + scopeMetrics: serializeScopeMetrics(result.resourceMetrics), + errors: result.errors.map((error) => String(error)), + }; +} + +/** + * Handles `GET /.well-known/fedify/bench/stats`. + * @param request The HTTP request to handle. + * @param reader The benchmark metric reader to collect from. + * @returns A JSON metric snapshot response, or `405 Method Not Allowed`. + * @since 2.3.0 + */ +export async function handleBenchmarkStats( + request: Request, + reader: BenchmarkMetricReader, +): Promise { + if (request.method !== "GET") { + return new Response("Method not allowed", { + status: 405, + headers: { "Allow": "GET" }, + }); + } + return jsonResponse(await collectBenchmarkMetrics(reader)); +} + +/** + * Handles `POST /.well-known/fedify/bench/trigger`. + * + * The handler validates a benchmark trigger request, checks recipients against + * server-controlled trigger options, and calls `Context.sendActivity()` to use + * the target's normal outbox path. + * @param request The HTTP request to handle. + * @param context The Fedify context used to resolve actors and send activity. + * @param options Server-controlled benchmark trigger delivery options. + * @returns A JSON response describing the sent activity, or a validation error. + * @since 2.3.0 + */ +export async function handleBenchmarkTrigger( + request: Request, + context: Context, + options: BenchmarkTriggerOptions = {}, +): Promise { + if (request.method !== "POST") { + return new Response("Method not allowed", { + status: 405, + headers: { "Allow": "POST" }, + }); + } + let json: unknown; + try { + json = await request.json(); + } catch { + return jsonResponse({ error: "Invalid JSON request body." }, 400); + } + try { + const body = asRecord(json, "request body"); + const sender = parseSender(body.sender); + const recipients = await parseRecipients(body.recipients, context); + const activity = await parseActivity(body.activity, context); + if (activity.id == null) { + throw new BenchmarkTriggerError("activity must have an id."); + } + const activityId = activity.id.href; + const inboxes = extractInboxes({ recipients }); + const inboxUrls = Object.keys(inboxes); + if (inboxUrls.length < 1) { + throw new BenchmarkTriggerError( + "No valid recipient inboxes found. The recipients list must not be empty.", + ); + } + const unsafeInboxes = options.allowUnsafeRecipients + ? [] + : inboxUrls.filter((inbox) => !options.sinks?.has(inbox)); + if (unsafeInboxes.length > 0) { + return jsonResponse( + { + error: "unsafe_recipient", + unsafeInboxes, + }, + 403, + ); + } + await context.sendActivity(sender, recipients, activity); + return jsonResponse( + { + version: 1, + activityId, + queueCorrelationId: activityId, + recipientCount: recipients.length, + inboxCount: inboxUrls.length, + }, + 202, + ); + } catch (error) { + if (error instanceof BenchmarkTriggerError) { + return jsonResponse({ error: error.message }, error.status); + } + throw error; + } +} + +/** + * Server-controlled options for benchmark trigger delivery. + * @since 2.3.0 + */ +export interface BenchmarkTriggerOptions { + /** Inbox URLs that the trigger endpoint may deliver to. */ + readonly sinks?: ReadonlySet; + /** + * Whether recipients outside {@link BenchmarkTriggerOptions.sinks} may be + * used. + */ + readonly allowUnsafeRecipients?: boolean; +} + +class BenchmarkTriggerError extends Error { + constructor(message: string, readonly status = 400) { + super(message); + } +} + +type BenchmarkSender = { identifier: string } | { username: string }; + +function parseSender(value: unknown): BenchmarkSender { + const sender = asRecord(value, "sender"); + if (typeof sender.identifier === "string") { + return { identifier: sender.identifier }; + } + if (typeof sender.username === "string") { + return { username: sender.username }; + } + throw new BenchmarkTriggerError( + "sender must be { identifier } or { username }.", + ); +} + +async function parseRecipients( + value: unknown, + context: Context, +): Promise { + if (!Array.isArray(value)) { + throw new BenchmarkTriggerError("recipients must be an array."); + } + return await Promise.all(value.map(async (item) => { + let object: VocabObject; + try { + object = await VocabObject.fromJsonLd(item, { + documentLoader: context.documentLoader, + contextLoader: context.contextLoader, + }); + } catch (error) { + throw new BenchmarkTriggerError( + `Invalid ActivityPub recipient: ${error}`, + ); + } + if (!isRecipient(object)) { + throw new BenchmarkTriggerError( + "each recipient must be an ActivityPub actor.", + ); + } + const recipient: Recipient = object; + if (recipient.id == null || recipient.inboxId == null) { + throw new BenchmarkTriggerError( + "each recipient must have id and inbox properties.", + ); + } + return recipient; + })); +} + +function isRecipient(value: unknown): value is Recipient { + return value != null && typeof value === "object" && "id" in value && + "inboxId" in value; +} + +async function parseActivity( + value: unknown, + context: Context, +): Promise { + try { + return await Activity.fromJsonLd(value, { + documentLoader: context.documentLoader, + contextLoader: context.contextLoader, + }); + } catch (error) { + throw new BenchmarkTriggerError(`Invalid ActivityPub activity: ${error}`); + } +} + +function asRecord(value: unknown, name: string): Record { + if (value == null || typeof value !== "object" || Array.isArray(value)) { + throw new BenchmarkTriggerError(`${name} must be an object.`); + } + return value as Record; +} + +function jsonResponse(body: unknown, status = 200): Response { + return new Response(JSON.stringify(body), { + status, + headers: { "Content-Type": "application/json" }, + }); +} + +function serializeScopeMetrics( + resourceMetrics: ResourceMetrics, +): readonly BenchmarkScopeMetrics[] { + return resourceMetrics.scopeMetrics.map(serializeScope); +} + +function serializeScope(scopeMetrics: ScopeMetrics): BenchmarkScopeMetrics { + return { + scope: { + name: scopeMetrics.scope.name, + version: scopeMetrics.scope.version, + }, + metrics: scopeMetrics.metrics.map(serializeMetric), + }; +} + +function serializeMetric(metric: MetricData): BenchmarkMetric { + return { + name: metric.descriptor.name, + description: metric.descriptor.description, + unit: metric.descriptor.unit, + dataPointType: serializeDataPointType(metric.dataPointType), + dataPoints: metric.dataPoints.map((point) => ({ + attributes: { ...point.attributes }, + startTime: point.startTime, + endTime: point.endTime, + value: point.value, + })), + }; +} + +function serializeDataPointType( + dataPointType: DataPointType, +): BenchmarkMetric["dataPointType"] { + switch (dataPointType) { + case DataPointType.HISTOGRAM: + return "histogram"; + case DataPointType.EXPONENTIAL_HISTOGRAM: + return "exponential_histogram"; + case DataPointType.GAUGE: + return "gauge"; + case DataPointType.SUM: + return "sum"; + } +} diff --git a/packages/fedify/src/federation/builder.test.ts b/packages/fedify/src/federation/builder.test.ts index 038741f94..a2af0d6a8 100644 --- a/packages/fedify/src/federation/builder.test.ts +++ b/packages/fedify/src/federation/builder.test.ts @@ -166,6 +166,29 @@ test("FederationBuilder", async (t) => { }, ); + await t.step("passes benchmarkMode to the built federation", async () => { + const builder = createFederationBuilder(); + const federation = await builder.build({ + kv: new MemoryKvStore(), + benchmarkMode: true, + }); + const impl = federation as FederationImpl; + assertEquals(impl.benchmarkMode, true); + assertEquals(impl.allowPrivateAddress, true); + assertEquals(impl.signatureTimeWindow, false); + + const overridden = await builder.build({ + kv: new MemoryKvStore(), + benchmarkMode: true, + allowPrivateAddress: false, + signatureTimeWindow: { minutes: 10 }, + }); + const overriddenImpl = overridden as FederationImpl; + assertEquals(overriddenImpl.benchmarkMode, true); + assertEquals(overriddenImpl.allowPrivateAddress, false); + assertEquals(overriddenImpl.signatureTimeWindow, { minutes: 10 }); + }); + await t.step("should snapshot router state on build", async () => { const builder = createFederationBuilder(); const kv = new MemoryKvStore(); diff --git a/packages/fedify/src/federation/builder.ts b/packages/fedify/src/federation/builder.ts index 95c989a00..6aa15016b 100644 --- a/packages/fedify/src/federation/builder.ts +++ b/packages/fedify/src/federation/builder.ts @@ -195,6 +195,13 @@ export class FederationBuilderImpl this.collectionTypeIds = {}; } + /** + * Builds the federation object. + * @param options Parameters for initializing the federation object. + * @returns The federation object. + * @throws {TypeError} If benchmark mode and `meterProvider` are both + * specified. + */ async build( options: FederationOptions, ): Promise> { diff --git a/packages/fedify/src/federation/federation.ts b/packages/fedify/src/federation/federation.ts index 7d650650c..e1d5e9d40 100644 --- a/packages/fedify/src/federation/federation.ts +++ b/packages/fedify/src/federation/federation.ts @@ -802,7 +802,10 @@ export interface FederationBuilder extends Federatable { /** * Builds the federation object. + * @param options Parameters for initializing the federation object. * @returns The federation object. + * @throws {TypeError} If benchmark mode and `meterProvider` are both + * specified. */ build( options: FederationOptions, @@ -846,6 +849,27 @@ export interface InboxChallengePolicy { nonceTtlSeconds?: number; } +/** + * Options for cooperative benchmark mode. + * @since 2.3.0 + */ +export interface FederationBenchmarkOptions { + /** + * Server-controlled inbox URLs that the benchmark trigger endpoint may + * deliver to. + */ + triggerSinks?: readonly (string | URL)[]; + + /** + * Whether the benchmark trigger endpoint may deliver to recipients outside + * {@link FederationBenchmarkOptions.triggerSinks}. + * + * Do not enable this option unless the benchmark endpoint is only reachable + * by a trusted benchmark controller. + */ + allowUnsafeTriggerRecipients?: boolean; +} + /** * Options for creating a {@link Federation} object. * @template TContextData The context data to pass to the {@link Context}. @@ -934,6 +958,22 @@ export interface FederationOptions { */ allowPrivateAddress?: boolean; + /** + * Whether to enable cooperative benchmark mode. This mode exposes + * benchmark-only endpoints and relaxes selected defaults for benchmark + * targets. Pass an object to configure benchmark trigger delivery. + * Do not enable this option in production. + * + * When enabled, {@link FederationOptions.allowPrivateAddress} defaults to + * `true` unless {@link FederationOptions.documentLoaderFactory} or + * {@link FederationOptions.contextLoaderFactory} is configured, and + * {@link FederationOptions.signatureTimeWindow} defaults to `false`. + * + * Turned off by default. + * @since 2.3.0 + */ + benchmarkMode?: boolean | FederationBenchmarkOptions; + /** * Options for making `User-Agent` strings for HTTP requests. * If a string is provided, it is used as the `User-Agent` header. diff --git a/packages/fedify/src/federation/metrics.test.ts b/packages/fedify/src/federation/metrics.test.ts index 0392fb7d5..c0bd54d6a 100644 --- a/packages/fedify/src/federation/metrics.test.ts +++ b/packages/fedify/src/federation/metrics.test.ts @@ -1,10 +1,17 @@ import { createTestMeterProvider, test } from "@fedify/fixture"; import { assertEquals, assertRejects } from "@std/assert"; +import { + DataPointType, + type HistogramMetricData, + MeterProvider, + MetricReader, +} from "@opentelemetry/sdk-metrics"; import type { DocumentLoader, RemoteDocument } from "@fedify/vocab-runtime"; import { FetchError } from "@fedify/vocab-runtime"; -import type { MessageQueue } from "./mq.ts"; +import type { MessageQueue, MessageQueueDepth } from "./mq.ts"; import { classifyFetchError, + getFederationMetrics, getRemoteHost, instrumentDocumentLoader, recordCircuitBreakerStateChange, @@ -20,8 +27,19 @@ import { recordOutboxActivity, recordOutboxEnqueue, recordWebFingerHandle, + registerQueueDepthGauge, } from "./metrics.ts"; +class TestMetricReader extends MetricReader { + protected onShutdown(): Promise { + return Promise.resolve(); + } + + protected onForceFlush(): Promise { + return Promise.resolve(); + } +} + const noopQueue: MessageQueue = { enqueue() { return Promise.resolve(); @@ -79,6 +97,166 @@ test("recordFanoutRecipients() omits activity type when unknown", () => { ); }); +test("signature verification duration uses explicit low-latency buckets", async () => { + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ readers: [reader] }); + try { + getFederationMetrics(meterProvider).recordSignatureVerificationDuration( + 7, + "http", + "verified", + ); + + const result = await reader.collect(); + const metric = result.resourceMetrics.scopeMetrics + .flatMap((scope) => scope.metrics) + .find((metric) => + metric.descriptor.name === + "activitypub.signature.verification.duration" + ); + assertEquals(metric?.dataPointType, DataPointType.HISTOGRAM); + const histogram = metric as HistogramMetricData | undefined; + assertEquals(histogram?.dataPoints[0].value.buckets.boundaries, [ + 0.1, + 0.25, + 0.5, + 1, + 2.5, + 5, + 10, + 25, + 50, + 100, + 250, + 500, + 1000, + ]); + } finally { + await meterProvider.shutdown(); + } +}); + +test("registerQueueDepthGauge() skips unavailable depth snapshots", async () => { + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ readers: [reader] }); + try { + const throwingQueue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + throw new TypeError("backend unavailable"); + }, + }; + const nullDepthQueue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return Promise.resolve(null as unknown as MessageQueueDepth); + }, + }; + const healthyQueue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return Promise.resolve({ queued: 7 }); + }, + }; + + registerQueueDepthGauge(meterProvider, [ + { role: "inbox", queue: throwingQueue }, + { role: "outbox", queue: nullDepthQueue }, + { role: "fanout", queue: healthyQueue }, + ]); + + const result = await reader.collect(); + assertEquals(result.errors, []); + const queueDepth = result.resourceMetrics.scopeMetrics + .flatMap((scope) => scope.metrics) + .find((metric) => metric.descriptor.name === "fedify.queue.depth"); + assertEquals(queueDepth?.dataPointType, DataPointType.GAUGE); + assertEquals( + queueDepth?.dataPoints.map((point) => ({ + state: point.attributes["fedify.queue.depth.state"], + role: point.attributes["fedify.queue.role"], + value: point.value, + })), + [ + { state: "queued", role: "fanout", value: 7 }, + ], + ); + } finally { + await meterProvider.shutdown(); + } +}); + +test("registerQueueDepthGauge() queries queue depths in parallel", async () => { + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ readers: [reader] }); + let releaseSlowDepth: ((depth: MessageQueueDepth) => void) | undefined; + let fastDepthStarted = false; + try { + const slowQueue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return new Promise((resolve) => { + releaseSlowDepth = resolve; + }); + }, + }; + const fastQueue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + fastDepthStarted = true; + return Promise.resolve({ queued: 5 }); + }, + }; + + registerQueueDepthGauge(meterProvider, [ + { role: "inbox", queue: slowQueue }, + { role: "outbox", queue: fastQueue }, + ]); + + const collection = reader.collect(); + await Promise.resolve(); + assertEquals(fastDepthStarted, true); + releaseSlowDepth?.({ queued: 3 }); + + const result = await collection; + const queueDepth = result.resourceMetrics.scopeMetrics + .flatMap((scope) => scope.metrics) + .find((metric) => metric.descriptor.name === "fedify.queue.depth"); + assertEquals( + queueDepth?.dataPoints.map((point) => point.value).sort(), + [3, 5], + ); + } finally { + releaseSlowDepth?.({ queued: 0 }); + await meterProvider.shutdown(); + } +}); + test("recordInboxActivity() records counter with result and activity type", () => { const [meterProvider, recorder] = createTestMeterProvider(); for ( diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index b69b32ac7..2c53ca86d 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -6,6 +6,7 @@ import { type Histogram, type MeterProvider, metrics, + type ObservableGauge, type UpDownCounter, } from "@opentelemetry/api"; import metadata from "../../deno.json" with { type: "json" }; @@ -92,6 +93,37 @@ export interface QueueTaskCommonAttributes { activityType?: string; } +/** + * An entry for observing one queue role in `fedify.queue.depth`. + * + * This public API is used by {@link registerQueueDepthGauge()} to associate a + * queue depth source with the task role it represents. + * @since 2.3.0 + */ +export interface QueueDepthGaugeEntry { + /** + * The task role whose queue depth is observed. + */ + role: QueueTaskRole; + + /** + * The message queue to observe, or `undefined` when the role has no queue. + */ + queue?: MessageQueue; +} + +/** + * Options for observing queue depth metrics. + * @since 2.3.0 + */ +export interface QueueDepthGaugeOptions { + /** + * An opaque source identifier to distinguish queue depth series registered on + * the same meter provider. + */ + sourceId?: string; +} + /** * The kind of ActivityPub signature verified, used as the * `activitypub.signature.kind` metric attribute. @@ -478,6 +510,7 @@ class FederationMetrics { readonly queueTaskFailed: Counter; readonly queueTaskDuration: Histogram; readonly queueTaskInFlight: UpDownCounter; + readonly queueDepth: ObservableGauge; readonly fanoutRecipients: Histogram; readonly inboxActivity: Counter; readonly outboxActivity: Counter; @@ -521,6 +554,23 @@ class FederationMetrics { "Duration of ActivityPub signature verification, including local " + "key lookup and remote key fetches.", unit: "ms", + advice: { + explicitBucketBoundaries: [ + 0.1, + 0.25, + 0.5, + 1, + 2.5, + 5, + 10, + 25, + 50, + 100, + 250, + 500, + 1000, + ], + }, }, ); this.signatureKeyFetchDuration = meter.createHistogram( @@ -635,6 +685,12 @@ class FederationMetrics { unit: "{task}", }, ); + this.queueDepth = meter.createObservableGauge("fedify.queue.depth", { + description: + "Messages waiting in configured Fedify queues, as reported by the " + + "queue backend.", + unit: "{message}", + }); this.fanoutRecipients = meter.createHistogram( "activitypub.fanout.recipients", { @@ -1167,6 +1223,85 @@ export function getQueueBackend(queue?: MessageQueue): string | undefined { return name; } +/** + * Registers a callback for observing queue backend depth. + * @since 2.3.0 + */ +export function registerQueueDepthGauge( + meterProvider: MeterProvider, + entries: readonly QueueDepthGaugeEntry[], + options: QueueDepthGaugeOptions = {}, +): void { + const uniqueQueues = new Map(); + for (const { role, queue } of entries) { + if (queue?.getDepth == null) continue; + const roles = uniqueQueues.get(queue); + if (roles == null) { + uniqueQueues.set(queue, [role]); + } else if (!roles.includes(role)) { + roles.push(role); + } + } + if (uniqueQueues.size < 1) return; + const queueEntries = Array.from(uniqueQueues.entries()); + const gauge = getFederationMetrics(meterProvider).queueDepth; + gauge.addCallback(async (observableResult) => { + await Promise.all(queueEntries.map(async ([queue, roles]) => { + let depth; + try { + depth = await queue.getDepth!(); + } catch { + return; + } + if (depth == null) return; + const attributes = buildQueueDepthAttributes(queue, roles, options); + observableResult.observe(depth.queued, { + ...attributes, + "fedify.queue.depth.state": "queued", + }); + if (depth.ready != null) { + observableResult.observe(depth.ready, { + ...attributes, + "fedify.queue.depth.state": "ready", + }); + } + if (depth.delayed != null) { + observableResult.observe(depth.delayed, { + ...attributes, + "fedify.queue.depth.state": "delayed", + }); + } + })); + }); +} + +function buildQueueDepthAttributes( + queue: MessageQueue, + roles: readonly QueueTaskRole[], + options: QueueDepthGaugeOptions, +): Attributes { + const sortedRoles = roles.toSorted(); + const role = sortedRoles.length === 1 ? sortedRoles[0] : "shared"; + const attributes: Attributes = { + "fedify.queue.role": role, + }; + if (options.sourceId != null) { + attributes["fedify.federation.instance_id"] = options.sourceId; + } + if (role === "shared") { + attributes["fedify.queue.roles"] = sortedRoles.join(","); + } + const backend = getQueueBackend(queue); + if (backend != null) { + attributes["fedify.queue.backend"] = backend; + } + const nativeRetrial = queue.nativeRetrial; + if (typeof nativeRetrial === "boolean") { + attributes["fedify.queue.native_retrial"] = nativeRetrial; + } + return attributes; +} + /** * Records `fedify.queue.task.enqueued` for an outgoing outbox enqueue and, * for the initial attempt, also records diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index d669daefd..ad073b5df 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -50,9 +50,15 @@ import { rsaPublicKey3, } from "../testing/keys.ts"; import { FetchError, getDocumentLoader } from "@fedify/vocab-runtime"; -import { SpanStatusCode } from "@opentelemetry/api"; +import { metrics, SpanStatusCode } from "@opentelemetry/api"; +import { + DataPointType, + MeterProvider, + MetricReader, +} from "@opentelemetry/sdk-metrics"; import { getAuthenticatedDocumentLoader } from "../utils/docloader.ts"; import { CircuitBreaker } from "./circuit-breaker.ts"; +import { handleBenchmarkTrigger } from "./bench.ts"; const documentLoader = getDocumentLoader(); import type { Context, GetActorOptions } from "./context.ts"; @@ -64,6 +70,7 @@ import { InboxContextImpl, KvSpecDeterminer, } from "./middleware.ts"; +import { recordInboxActivity } from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; import type { InboxMessage, Message, OutboxMessage } from "./queue.ts"; @@ -79,6 +86,16 @@ async function withLogtapeLock(fn: () => Promise): Promise { return await run; } +class TestMetricReader extends MetricReader { + protected onShutdown(): Promise { + return Promise.resolve(); + } + + protected onForceFlush(): Promise { + return Promise.resolve(); + } +} + test("createFederation()", async (t) => { const kv = new MemoryKvStore(); @@ -97,6 +114,131 @@ test("createFederation()", async (t) => { }), TypeError); }); + await t.step("benchmarkMode applies cooperative benchmark defaults", () => { + const federation = createFederation({ + kv, + benchmarkMode: true, + }); + assertInstanceOf(federation, FederationImpl); + assertEquals(federation.allowPrivateAddress, true); + assertEquals(federation.signatureTimeWindow, false); + }); + + await t.step("benchmarkMode preserves explicit option overrides", () => { + const federation = createFederation({ + kv, + benchmarkMode: true, + allowPrivateAddress: false, + signatureTimeWindow: { minutes: 10 }, + }); + assertInstanceOf(federation, FederationImpl); + assertEquals(federation.allowPrivateAddress, false); + assertEquals(federation.signatureTimeWindow, { minutes: 10 }); + }); + + await t.step("benchmarkMode tolerates calendar time windows", () => { + const federation = createFederation({ + kv, + benchmarkMode: true, + signatureTimeWindow: { months: 1 }, + }); + assertInstanceOf(federation, FederationImpl); + assertEquals(federation.signatureTimeWindow, { months: 1 }); + }); + + await t.step("benchmarkMode leaves custom loader factories alone", () => { + const federation = createFederation({ + kv, + benchmarkMode: true, + documentLoaderFactory: () => mockDocumentLoader, + contextLoaderFactory: () => mockDocumentLoader, + authenticatedDocumentLoaderFactory: () => mockDocumentLoader, + }); + assertInstanceOf(federation, FederationImpl); + assertEquals(federation.allowPrivateAddress, false); + }); + + await t.step( + "benchmarkMode keeps private-address default with auth loader only", + () => { + const federation = createFederation({ + kv, + benchmarkMode: true, + authenticatedDocumentLoaderFactory: () => mockDocumentLoader, + }); + assertInstanceOf(federation, FederationImpl); + assertEquals(federation.allowPrivateAddress, true); + }, + ); + + await t.step("benchmarkMode rejects an explicit meterProvider", () => { + const [meterProvider] = createTestMeterProvider(); + assertThrows( + () => + createFederation({ + kv, + benchmarkMode: true, + meterProvider, + }), + TypeError, + "benchmarkMode requires Fedify to own the meterProvider", + ); + }); + + await t.step( + "benchmarkMode warns that benchmark-only relaxations are on", + async () => { + await withLogtapeLock(async () => { + const records: LogRecord[] = []; + await reset(); + try { + await configure({ + sinks: { + test(record) { + records.push(record); + }, + }, + loggers: [ + { + category: ["fedify", "federation", "benchmark"], + lowestLevel: "warning", + sinks: ["test"], + }, + ], + }); + createFederation({ kv, benchmarkMode: true }); + assertEquals(records.length, 1); + assertEquals(records[0].level, "warning"); + assertEquals( + records[0].rawMessage, + "Fedify benchmarkMode is enabled; private address checks " + + "disabled (allowPrivateAddress=true); HTTP Signature time " + + "window disabled (signatureTimeWindow=false). Benchmark " + + "endpoints are active and must not be used in production.", + ); + assertEquals( + records[0].properties.relaxations, + [ + { + protection: "private_address_checks", + effect: "disabled", + effectiveValue: true, + }, + { + protection: "http_signature_time_window", + effect: "disabled", + effectiveValue: false, + secureDefaultSeconds: 3600, + }, + ], + ); + } finally { + await reset(); + } + }); + }, + ); + await t.step("origin", () => { const f = createFederation({ kv, origin: "http://example.com:8080" }); assertInstanceOf(f, FederationImpl); @@ -214,6 +356,444 @@ test("createFederation()", async (t) => { }); }); +test("benchmarkMode stats endpoint", async (t) => { + await t.step("is absent when benchmarkMode is off", async () => { + const federation = createFederation({ kv: new MemoryKvStore() }); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/stats"), + { contextData: undefined }, + ); + assertEquals(response.status, 404); + }); + + await t.step("returns a v1 in-process metrics snapshot", async () => { + const queue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return Promise.resolve({ queued: 3, ready: 2, delayed: 1 }); + }, + }; + const federation = createFederation({ + kv: new MemoryKvStore(), + benchmarkMode: true, + queue, + }); + recordInboxActivity( + (federation as FederationImpl).meterProvider, + "processed", + vocab.Create.typeId.href, + ); + + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/stats"), + { contextData: undefined }, + ); + + assertEquals(response.status, 200); + assertEquals(response.headers.get("Content-Type"), "application/json"); + const body = await response.json() as { + version: number; + source: string; + generatedAt: string; + scopeMetrics: { + metrics: { + name: string; + dataPointType: string; + dataPoints: { attributes: Record; value: unknown }[]; + }[]; + }[]; + }; + assertEquals(body.version, 1); + assertEquals(body.source, "server"); + assertEquals(Number.isNaN(Date.parse(body.generatedAt)), false); + const metrics = body.scopeMetrics.flatMap((scope) => scope.metrics); + assertExists( + metrics.find((metric) => metric.name === "activitypub.inbox.activity"), + ); + const queueDepth = metrics.find((metric) => + metric.name === "fedify.queue.depth" + ); + assertExists(queueDepth); + assertEquals(queueDepth.dataPointType, "gauge"); + assertEquals( + queueDepth.dataPoints.map((point) => ({ + state: point.attributes["fedify.queue.depth.state"], + role: point.attributes["fedify.queue.role"], + value: point.value, + })).sort((a, b) => String(a.state).localeCompare(String(b.state))), + [ + { state: "delayed", role: "shared", value: 1 }, + { state: "queued", role: "shared", value: 3 }, + { state: "ready", role: "shared", value: 2 }, + ], + ); + }); +}); + +test("createFederation() registers queue depth for regular metrics", async () => { + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ readers: [reader] }); + try { + const queue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return Promise.resolve({ queued: 5, ready: 4, delayed: 3 }); + }, + }; + createFederation({ + kv: new MemoryKvStore(), + meterProvider, + queue, + }); + + const result = await reader.collect(); + const queueDepth = result.resourceMetrics.scopeMetrics + .flatMap((scope) => scope.metrics) + .find((metric) => metric.descriptor.name === "fedify.queue.depth"); + + assertExists(queueDepth); + assertEquals(queueDepth.dataPointType, DataPointType.GAUGE); + assertEquals( + queueDepth.dataPoints.map((point) => ({ + state: point.attributes["fedify.queue.depth.state"], + role: point.attributes["fedify.queue.role"], + value: point.value, + })).sort((a, b) => String(a.state).localeCompare(String(b.state))), + [ + { state: "delayed", role: "shared", value: 3 }, + { state: "queued", role: "shared", value: 5 }, + { state: "ready", role: "shared", value: 4 }, + ], + ); + } finally { + await meterProvider.shutdown(); + } +}); + +test("createFederation() registers queue depth after global meterProvider is set", async () => { + metrics.disable(); + const queue: MessageQueue = { + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return Promise.resolve({ queued: 8 }); + }, + }; + const federation = createFederation({ + kv: new MemoryKvStore(), + queue, + }); + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ readers: [reader] }); + try { + metrics.setGlobalMeterProvider(meterProvider); + (federation as FederationImpl).meterProvider; + + const result = await reader.collect(); + const queueDepth = result.resourceMetrics.scopeMetrics + .flatMap((scope) => scope.metrics) + .find((metric) => metric.descriptor.name === "fedify.queue.depth"); + + assertExists(queueDepth); + assertEquals(queueDepth.dataPointType, DataPointType.GAUGE); + assertEquals( + queueDepth.dataPoints.map((point) => ({ + state: point.attributes["fedify.queue.depth.state"], + role: point.attributes["fedify.queue.role"], + value: point.value, + })), + [ + { state: "queued", role: "shared", value: 8 }, + ], + ); + } finally { + metrics.disable(); + await meterProvider.shutdown(); + } +}); + +test("createFederation() distinguishes queue depth series per federation", async () => { + const reader = new TestMetricReader(); + const meterProvider = new MeterProvider({ readers: [reader] }); + try { + const createQueue = (queued: number): MessageQueue => ({ + enqueue() { + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + getDepth() { + return Promise.resolve({ queued }); + }, + }); + createFederation({ + kv: new MemoryKvStore(), + meterProvider, + queue: createQueue(1), + }); + createFederation({ + kv: new MemoryKvStore(), + meterProvider, + queue: createQueue(2), + }); + + const result = await reader.collect(); + const queueDepth = result.resourceMetrics.scopeMetrics + .flatMap((scope) => scope.metrics) + .find((metric) => metric.descriptor.name === "fedify.queue.depth"); + + assertExists(queueDepth); + const queuedPoints = queueDepth.dataPoints.filter((point) => + point.attributes["fedify.queue.depth.state"] === "queued" + ); + assertEquals( + queuedPoints.map((point) => point.value).sort(), + [1, 2], + ); + const instanceIds = queuedPoints.map((point) => + point.attributes["fedify.federation.instance_id"] + ); + assertEquals( + instanceIds.every((id) => typeof id === "string"), + true, + ); + assertEquals(new Set(instanceIds).size, 2); + } finally { + await meterProvider.shutdown(); + } +}); + +test("benchmarkMode trigger endpoint", async (t) => { + const createTriggerTarget = ( + options: { allowUnsafeTriggerRecipients?: boolean } = {}, + ) => { + const messages: OutboxMessage[] = []; + const queue: MessageQueue = { + enqueue(message: OutboxMessage) { + messages.push(message); + return Promise.resolve(); + }, + listen() { + return Promise.resolve(); + }, + }; + const federation = createFederation({ + kv: new MemoryKvStore(), + benchmarkMode: { + triggerSinks: ["https://sink.example/inbox"], + allowUnsafeTriggerRecipients: options.allowUnsafeTriggerRecipients, + }, + contextLoaderFactory: () => mockDocumentLoader, + queue: { outbox: queue }, + }); + federation + .setActorDispatcher( + "/users/{identifier}", + (ctx, identifier) => + new vocab.Person({ + id: ctx.getActorUri(identifier), + inbox: ctx.getInboxUri(identifier), + }), + ) + .setKeyPairsDispatcher(() => [ + { privateKey: rsaPrivateKey2, publicKey: rsaPublicKey2.publicKey! }, + ]); + return { federation, messages }; + }; + + const createTriggerBody = async ( + options: { + recipientInbox?: string; + recipients?: unknown[]; + sinks?: string[]; + allowUnsafeRecipients?: boolean; + } = {}, + ) => ({ + sender: { identifier: "alice" }, + sinks: options.sinks, + recipients: options.recipients ?? [ + { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Service", + id: "https://sink.example/actors/bob", + inbox: options.recipientInbox ?? "https://sink.example/inbox", + }, + ], + activity: await new vocab.Create({ + id: new URL("https://example.com/activities/bench-1"), + actor: new URL("https://example.com/users/alice"), + object: new vocab.Note({ + id: new URL("https://example.com/notes/bench-1"), + attribution: new URL("https://example.com/users/alice"), + content: "benchmark", + }), + }).toJsonLd({ contextLoader: mockDocumentLoader }), + allowUnsafeRecipients: options.allowUnsafeRecipients, + }); + + await t.step("is absent when benchmarkMode is off", async () => { + const federation = createFederation({ kv: new MemoryKvStore() }); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/trigger", { + method: "POST", + }), + { contextData: undefined }, + ); + assertEquals(response.status, 404); + }); + + await t.step("rejects unreadable JSON request bodies", async () => { + const request = { + method: "POST", + json() { + throw new TypeError("body is unavailable"); + }, + } as unknown as Request; + const response = await handleBenchmarkTrigger( + request, + {} as Context, + ); + assertEquals(response.status, 400); + assertEquals(await response.json(), { + error: "Invalid JSON request body.", + }); + }); + + await t.step("rejects empty recipient lists", async () => { + const { federation, messages } = createTriggerTarget(); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/trigger", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(await createTriggerBody({ recipients: [] })), + }), + { contextData: undefined }, + ); + assertEquals(response.status, 400); + assertEquals(await response.json(), { + error: + "No valid recipient inboxes found. The recipients list must not be empty.", + }); + assertEquals(messages, []); + }); + + await t.step( + "rejects recipients outside configured trigger sinks", + async () => { + const { federation, messages } = createTriggerTarget(); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/trigger", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify( + await createTriggerBody({ + recipientInbox: "https://not-a-sink.example/inbox", + }), + ), + }), + { contextData: undefined }, + ); + assertEquals(response.status, 403); + assertEquals(messages, []); + }, + ); + + await t.step( + "does not trust request-provided trigger sinks or bypasses", + async () => { + const { federation, messages } = createTriggerTarget(); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/trigger", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify( + await createTriggerBody({ + recipientInbox: "https://not-a-sink.example/inbox", + sinks: ["https://not-a-sink.example/inbox"], + allowUnsafeRecipients: true, + }), + ), + }), + { contextData: undefined }, + ); + assertEquals(response.status, 403); + assertEquals(messages, []); + }, + ); + + await t.step( + "allows unsafe recipients only with a server override", + async () => { + const { federation, messages } = createTriggerTarget({ + allowUnsafeTriggerRecipients: true, + }); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/trigger", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify( + await createTriggerBody({ + recipientInbox: "https://not-a-sink.example/inbox", + }), + ), + }), + { contextData: undefined }, + ); + assertEquals(response.status, 202); + assertEquals(messages.length, 1); + assertEquals(messages[0].inbox, "https://not-a-sink.example/inbox"); + }, + ); + + await t.step("sends the activity to explicit sink recipients", async () => { + const { federation, messages } = createTriggerTarget(); + const response = await federation.fetch( + new Request("https://example.com/.well-known/fedify/bench/trigger", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(await createTriggerBody()), + }), + { contextData: undefined }, + ); + + assertEquals(response.status, 202); + const body = await response.json() as { + version: number; + activityId: string; + queueCorrelationId: string; + recipientCount: number; + inboxCount: number; + }; + assertEquals(body.version, 1); + assertEquals(body.activityId, "https://example.com/activities/bench-1"); + assertEquals( + body.queueCorrelationId, + "https://example.com/activities/bench-1", + ); + assertEquals(body.recipientCount, 1); + assertEquals(body.inboxCount, 1); + assertEquals(messages.length, 1); + assertEquals(messages[0].type, "outbox"); + assertEquals(messages[0].activityId, body.queueCorrelationId); + assertEquals(messages[0].inbox, "https://sink.example/inbox"); + }); +}); + test({ name: "Federation.createContext()", permissions: { env: true, read: true }, @@ -7617,13 +8197,20 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { await t.step("expired held activity is dropped", async () => { fetchMock.hardReset(); fetchMock.spyGlobal(); + const now = Temporal.Instant.from("2026-05-25T00:00:02Z"); let dropped: { remoteHost: string; heldSince: Temporal.Instant } | null = null; - const { federation, queued } = setup({ - failureThreshold: 1, - heldActivityTtl: { seconds: 1 }, - onActivityDrop(remoteHost, details) { - dropped = { remoteHost, heldSince: details.heldSince }; + const { federation, queued, kv } = setup(false); + federation.circuitBreaker = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { + failureThreshold: 1, + heldActivityTtl: { seconds: 1 }, + onActivityDrop(remoteHost, details) { + dropped = { remoteHost, heldSince: details.heldSince }; + }, }, }); let permanentFailureReason: unknown; diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 105608188..a29891a77 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -57,6 +57,13 @@ import type { ActivityTransformer } from "../compat/types.ts"; import { getNodeInfo, type GetNodeInfoOptions } from "../nodeinfo/client.ts"; import { handleNodeInfo, handleNodeInfoJrd } from "../nodeinfo/handler.ts"; import type { JsonValue, NodeInfo } from "../nodeinfo/types.ts"; +import { + type BenchmarkMetricReader, + type BenchmarkTriggerOptions, + createBenchmarkMeterProvider, + handleBenchmarkStats, + handleBenchmarkTrigger, +} from "./bench.ts"; import { type HttpMessageSignaturesSpec, type HttpMessageSignaturesSpecDeterminer, @@ -105,6 +112,7 @@ import type { import type { ConstructorWithTypeId, Federation, + FederationBenchmarkOptions, FederationFetchOptions, FederationOptions, FederationStartQueueOptions, @@ -131,6 +139,7 @@ import { getRemoteHost, instrumentDocumentLoader, isAbortError, + type QueueDepthGaugeEntry, type QueueTaskCommonAttributes, type QueueTaskResult, recordCircuitBreakerStateChange, @@ -139,6 +148,7 @@ import { recordInboxActivity, recordOutboxActivity, recordOutboxEnqueue, + registerQueueDepthGauge, } from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; import { acceptsJsonLd } from "./negotiation.ts"; @@ -160,6 +170,7 @@ import { handleWebFinger } from "./webfinger.ts"; import { hasMalformedKnownTemporalLiteral } from "./temporal.ts"; const circuitBreakerCasWarningKvStores = new WeakSet(); +let nextQueueDepthGaugeSourceId = 0; const retryAfterHttpDate = new RegExp( "^(?:" + "(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun), \\d{2} " + @@ -213,6 +224,100 @@ function clampNegativeDelay(delay: Temporal.Duration): Temporal.Duration { return delay.sign < 0 ? Temporal.Duration.from({ seconds: 0 }) : delay; } +type BenchmarkRelaxation = + | { + readonly protection: "private_address_checks"; + readonly effect: "disabled"; + readonly effectiveValue: true; + } + | { + readonly protection: "http_signature_time_window"; + readonly effect: "disabled"; + readonly effectiveValue: false; + readonly secureDefaultSeconds: 3600; + } + | { + readonly protection: "http_signature_time_window"; + readonly effect: "changed"; + readonly effectiveSeconds: number; + readonly secureDefaultSeconds: 3600; + }; + +function getBenchmarkRelaxations( + allowPrivateAddress: boolean, + signatureTimeWindow: Temporal.Duration | Temporal.DurationLike | false, +): BenchmarkRelaxation[] { + const relaxations: BenchmarkRelaxation[] = []; + if (allowPrivateAddress) { + relaxations.push({ + protection: "private_address_checks", + effect: "disabled", + effectiveValue: true, + }); + } + if (signatureTimeWindow === false) { + relaxations.push({ + protection: "http_signature_time_window", + effect: "disabled", + effectiveValue: false, + secureDefaultSeconds: 3600, + }); + } else { + try { + const seconds = Temporal.Duration.from(signatureTimeWindow).total({ + unit: "seconds", + }); + if (seconds !== 3600) { + relaxations.push({ + protection: "http_signature_time_window", + effect: "changed", + effectiveSeconds: seconds, + secureDefaultSeconds: 3600, + }); + } + } catch { + // Keep benchmark warning formatting best-effort for unusual + // DurationLike values. + } + } + return relaxations; +} + +function formatBenchmarkRelaxations( + relaxations: readonly BenchmarkRelaxation[], +): string { + if (relaxations.length < 1) return "no benchmark-only protections relaxed"; + return relaxations.map((relaxation) => { + switch (relaxation.protection) { + case "private_address_checks": + return "private address checks disabled (allowPrivateAddress=true)"; + case "http_signature_time_window": + if (relaxation.effect === "disabled") { + return `HTTP Signature time window disabled (signatureTimeWindow=false)`; + } + return `HTTP Signature time window set to ${relaxation.effectiveSeconds}s ` + + `(secure default: ${relaxation.secureDefaultSeconds}s)`; + } + }).join("; "); +} + +function getBenchmarkTriggerOptions( + benchmarkOptions: FederationBenchmarkOptions, +): BenchmarkTriggerOptions { + const sinks = benchmarkOptions.triggerSinks?.map((sink) => { + try { + return new URL(sink).href; + } catch { + throw new TypeError("benchmarkMode.triggerSinks must contain only URLs."); + } + }); + return { + sinks: sinks == null ? undefined : new Set(sinks), + allowUnsafeRecipients: + benchmarkOptions.allowUnsafeTriggerRecipients === true, + }; +} + function maxDelay( first: Temporal.Duration, second: Temporal.Duration, @@ -417,8 +522,10 @@ export interface FederationOrigin { /** * Create a new {@link Federation} instance. - * @param parameters Parameters for initializing the instance. + * @param options Parameters for initializing the instance. * @returns A new {@link Federation} instance. + * @throws {TypeError} If benchmark mode and `meterProvider` are both + * specified. * @since 0.10.0 */ export function createFederation( @@ -457,9 +564,52 @@ export class FederationImpl _meterProvider: MeterProvider | undefined; firstKnock?: HttpMessageSignaturesSpec; inboxChallengePolicy?: InboxChallengePolicy; + benchmarkMode: boolean; + benchmarkMetricReader?: BenchmarkMetricReader; + benchmarkTriggerOptions: BenchmarkTriggerOptions; + readonly #queueDepthGaugeSourceId = `fedify-${ + (++nextQueueDepthGaugeSourceId).toString(36) + }`; + #queueDepthGaugeEntries: readonly QueueDepthGaugeEntry[] = []; + #queueDepthGaugeMeterProvider?: MeterProvider; constructor(options: FederationOptions) { super(); + const benchmarkMode = options.benchmarkMode != null && + options.benchmarkMode !== false; + const benchmarkOptions = typeof options.benchmarkMode === "object" + ? options.benchmarkMode + : {}; + const hasCustomLoaderFactory = options.documentLoaderFactory != null || + options.contextLoaderFactory != null; + const allowPrivateAddress = options.allowPrivateAddress ?? + (benchmarkMode && !hasCustomLoaderFactory ? true : false); + const signatureTimeWindow = options.signatureTimeWindow ?? + (benchmarkMode ? false : { hours: 1 }); + if (benchmarkMode && options.meterProvider != null) { + throw new TypeError( + "benchmarkMode requires Fedify to own the meterProvider; " + + "OpenTelemetry metric readers cannot be added after a " + + "MeterProvider is constructed.", + ); + } + if (benchmarkMode) { + const relaxations = getBenchmarkRelaxations( + allowPrivateAddress, + signatureTimeWindow, + ); + const relaxationSummary = formatBenchmarkRelaxations(relaxations); + getLogger(["fedify", "federation", "benchmark"]).warn( + `Fedify benchmarkMode is enabled; ${relaxationSummary}. Benchmark endpoints are active and must not be used in production.`, + { + relaxations, + }, + ); + } + this.benchmarkMode = benchmarkMode; + this.benchmarkTriggerOptions = benchmarkMode + ? getBenchmarkTriggerOptions(benchmarkOptions) + : {}; this.kv = options.kv; this.kvPrefixes = { ...({ @@ -566,7 +716,7 @@ export class FederationImpl this.router.trailingSlashInsensitive = options.trailingSlashInsensitive ?? false; this._initializeRouter(); - if (options.allowPrivateAddress || options.userAgent != null) { + if (options.allowPrivateAddress === true || options.userAgent != null) { if (options.documentLoaderFactory != null) { throw new TypeError( "Cannot set documentLoaderFactory with allowPrivateAddress or " + @@ -586,8 +736,8 @@ export class FederationImpl ); } } - const { allowPrivateAddress, userAgent } = options; - this.allowPrivateAddress = allowPrivateAddress ?? false; + const { userAgent } = options; + this.allowPrivateAddress = allowPrivateAddress; // The loader factory closures below read `this._meterProvider` at // call time, not when they are created. Factories are only invoked // after the constructor has assigned `_meterProvider` (see below), so @@ -685,7 +835,7 @@ export class FederationImpl this.onOutboxError = options.onOutboxError; this.permanentFailureStatusCodes = options.permanentFailureStatusCodes ?? [404, 410]; - this.signatureTimeWindow = options.signatureTimeWindow ?? { hours: 1 }; + this.signatureTimeWindow = signatureTimeWindow; this.skipSignatureVerification = options.skipSignatureVerification ?? false; this.inboxChallengePolicy = options.inboxChallengePolicy; this.outboxRetryPolicy = options.outboxRetryPolicy ?? @@ -695,7 +845,21 @@ export class FederationImpl this.activityTransformers = options.activityTransformers ?? getDefaultActivityTransformers(); this._tracerProvider = options.tracerProvider; - this._meterProvider = options.meterProvider; + if (benchmarkMode) { + const benchmarkMetrics = createBenchmarkMeterProvider(); + this._meterProvider = benchmarkMetrics.meterProvider; + this.benchmarkMetricReader = benchmarkMetrics.reader; + } else { + this._meterProvider = options.meterProvider; + } + this.#queueDepthGaugeEntries = [ + { role: "inbox", queue: this.inboxQueue }, + { role: "outbox", queue: this.outboxQueue }, + { role: "fanout", queue: this.fanoutQueue }, + ]; + this.#registerQueueDepthGauge( + this._meterProvider ?? metrics.getMeterProvider(), + ); this.firstKnock = options.firstKnock; } @@ -704,12 +868,26 @@ export class FederationImpl } get meterProvider(): MeterProvider { - return this._meterProvider ?? metrics.getMeterProvider(); + const meterProvider = this._meterProvider ?? metrics.getMeterProvider(); + this.#registerQueueDepthGauge(meterProvider); + return meterProvider; + } + + #registerQueueDepthGauge(meterProvider: MeterProvider): void { + if (meterProvider === this.#queueDepthGaugeMeterProvider) return; + registerQueueDepthGauge(meterProvider, this.#queueDepthGaugeEntries, { + sourceId: this.#queueDepthGaugeSourceId, + }); + this.#queueDepthGaugeMeterProvider = meterProvider; } _initializeRouter(): void { this.router.add("/.well-known/webfinger", "webfinger"); this.router.add("/.well-known/nodeinfo", "nodeInfoJrd"); + if (this.benchmarkMode) { + this.router.add("/.well-known/fedify/bench/stats", "benchmarkStats"); + this.router.add("/.well-known/fedify/bench/trigger", "benchmarkTrigger"); + } } override _getTracer(): Tracer { @@ -2322,6 +2500,14 @@ export class FederationImpl context, nodeInfoDispatcher: this.nodeInfoDispatcher!, }); + case "benchmarkStats": + return await handleBenchmarkStats(request, this.benchmarkMetricReader!); + case "benchmarkTrigger": + return await handleBenchmarkTrigger( + request, + context, + this.benchmarkTriggerOptions, + ); } // Routes that require JSON-LD Accepts header: @@ -2611,6 +2797,7 @@ type FedifyEndpoint = | "featured" | "featured_tags" | "collection" + | "benchmark" | "not_found" | "not_acceptable" | "error"; @@ -2653,6 +2840,9 @@ function getEndpointCategory(routeName: string): FedifyEndpoint { return "featured"; case "featuredTags": return "featured_tags"; + case "benchmarkStats": + case "benchmarkTrigger": + return "benchmark"; default: return "not_found"; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f3eba0e1b..f6f7bec59 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1173,6 +1173,9 @@ importers: '@opentelemetry/core': specifier: 'catalog:' version: 2.7.1(@opentelemetry/api@1.9.1) + '@opentelemetry/sdk-metrics': + specifier: 'catalog:' + version: 2.7.1(@opentelemetry/api@1.9.1) '@opentelemetry/sdk-trace-base': specifier: 'catalog:' version: 2.7.1(@opentelemetry/api@1.9.1) @@ -1204,9 +1207,6 @@ importers: '@fedify/vocab-tools': specifier: workspace:^ version: link:../vocab-tools - '@opentelemetry/sdk-metrics': - specifier: 'catalog:' - version: 2.7.1(@opentelemetry/api@1.9.1) '@std/assert': specifier: jsr:^0.226.0 version: '@jsr/std__assert@0.226.0'