From a1c42c67b5e05aeb53e1d7fa4eefb988f2ef684d Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Sat, 8 Nov 2025 22:37:02 +0100 Subject: [PATCH] Refactor: Use async LabelerServer.create() for initializing the server instance This allows us to correctly initialize fastify before creating the LabelerServer instance, which allows a public API where the server can be extended, closing #8. --- src/LabelerServer.ts | 161 +++++++++++++++++++++++-------------------- src/index.ts | 3 - 2 files changed, 88 insertions(+), 76 deletions(-) diff --git a/src/LabelerServer.ts b/src/LabelerServer.ts index 68d2881..85aa1ee 100644 --- a/src/LabelerServer.ts +++ b/src/LabelerServer.ts @@ -4,7 +4,7 @@ import type { ToolsOzoneModerationEmitEvent } from "@atcute/ozone"; import { XRPCError } from "@atcute/xrpc-server"; import { fastifyWebsocket } from "@fastify/websocket"; -import { Client, createClient } from "@libsql/client"; +import { Client, Config, createClient } from "@libsql/client"; import fastify, { type FastifyInstance, type FastifyListenOptions, @@ -71,15 +71,23 @@ export interface LabelerOptions { dbToken?: string; } -export class LabelerServer { - /** The Fastify application instance. */ +interface LabelerContext { + auth: (did: string) => boolean | Promise; + databaseConfig: Config; app: FastifyInstance; + did: Did; + signingKey: Uint8Array; +} - /** The SQLite database instance. */ - db: Client; +export class LabelerServer { + /** The Fastify application instance. */ + readonly app: FastifyInstance; /** The DID of the labeler account. */ - did: Did; + readonly did: Did; + + /** The SQLite database instance. */ + private db: Client; /** A function that returns whether a DID is authorized to create labels. */ private auth: (did: string) => boolean | Promise; @@ -88,59 +96,78 @@ export class LabelerServer { private connections = new Map>(); /** The signing key used for the labeler. */ - #signingKey: Uint8Array; + readonly #signingKey: Uint8Array; + + /** + * Promise that resolves when database initialization is complete. This should + * be awaited before any database operations. It is automatically awaited + * before starting the server. + */ + readonly #dbInitLock?: Promise; /** - * Promise that resolves when database initialization is complete. - * This should be awaited before any database operations. + * @private */ - private readonly dbInitLock?: Promise; + constructor(context: LabelerContext) { + this.app = context.app; + this.did = context.did; + this.auth = context.auth; + this.#signingKey = context.signingKey; + + this.db = createClient(context.databaseConfig); + this.#dbInitLock = this.initializeDatabase(); + + // Register the routes: + this.app.get("/xrpc/com.atproto.label.queryLabels", this.queryLabelsHandler); + this.app.post("/xrpc/tools.ozone.moderation.emitEvent", this.emitEventHandler); + this.app.get( + "/xrpc/com.atproto.label.subscribeLabels", + { websocket: true }, + this.subscribeLabelsHandler, + ); + this.app.get("/xrpc/_health", this.healthHandler); + this.app.get("/xrpc/*", this.unknownMethodHandler); + this.app.setErrorHandler(this.errorHandler); + } /** * Create a labeler server. * @param options Configuration options. */ - constructor(options: LabelerOptions) { - this.did = options.did as Did; - this.auth = options.auth ?? ((did) => did === this.did); - - if (!isDid(this.did)) { + static async create(options: LabelerOptions): Promise { + if (!isDid(options.did)) { throw new Error(INVALID_DID_ERROR); } + let signingKey; try { if (options.signingKey.startsWith("did:key:")) throw 0; - this.#signingKey = parsePrivateKey(options.signingKey); - if (this.#signingKey.byteLength !== 32) throw 0; + signingKey = parsePrivateKey(options.signingKey); + if (signingKey.byteLength !== 32) throw 0; } catch { throw new Error(INVALID_SIGNING_KEY_ERROR); } + let databaseConfig: Config = { url: "file:" + (options.dbPath ?? "labels.db") }; + if (options.dbUrl) { if (!options.dbToken) { throw new Error( "The `dbToken` option is required when using a remote database URL.", ); } - this.db = createClient({ url: options.dbUrl, authToken: options.dbToken }); - } else { - this.db = createClient({ url: "file:" + (options.dbPath ?? "labels.db") }); + databaseConfig = { url: options.dbUrl, authToken: options.dbToken }; } - this.dbInitLock = this.initializeDatabase(); + const app = fastify(); + await app.register(fastifyWebsocket); - this.app = fastify(); - void this.app.register(fastifyWebsocket).then(() => { - this.app.get("/xrpc/com.atproto.label.queryLabels", this.queryLabelsHandler); - this.app.post("/xrpc/tools.ozone.moderation.emitEvent", this.emitEventHandler); - this.app.get( - "/xrpc/com.atproto.label.subscribeLabels", - { websocket: true }, - this.subscribeLabelsHandler, - ); - this.app.get("/xrpc/_health", this.healthHandler); - this.app.get("/xrpc/*", this.unknownMethodHandler); - this.app.setErrorHandler(this.errorHandler); + return new LabelerServer({ + databaseConfig, + app, + signingKey, + did: options.did, + auth: typeof options.auth === "function" ? options.auth : (did) => did === options.did, }); } @@ -176,43 +203,30 @@ export class LabelerServer { /** * Start the server. * @param port The port to listen on. - * @param callback A callback to run when the server is started. + * @returns address of the server */ - start(port: number, callback: (error: Error | null, address: string) => void): void; + async start(port: number): Promise; /** * Start the server. * @param options Options for the server. - * @param callback A callback to run when the server is started. + * @returns address of the server */ - start( - options: FastifyListenOptions, - callback: (error: Error | null, address: string) => void, - ): void; - start( - portOrOptions: number | FastifyListenOptions, - callback: (error: Error | null, address: string) => void = () => {}, - ) { - if (typeof portOrOptions === "number") { - this.app.listen({ port: portOrOptions }, callback); - } else { - this.app.listen(portOrOptions, callback); - } - } + async start(options: FastifyListenOptions): Promise; + async start(portOrOptions: number | FastifyListenOptions) { + // ensure the database is setup prior to listening: + await this.#dbInitLock; - /** - * Stop the server. - * @param callback A callback to run when the server is stopped. - */ - close(callback: () => void = () => {}) { - this.app.close(callback); + const options = typeof portOrOptions === "number" ? { port: portOrOptions } : portOrOptions; + + return await this.app.listen(options); } /** - * Alias for {@link LabelerServer#close}. - * @param callback A callback to run when the server is stopped. + * Stop the server & closes the database: */ - stop(callback: () => void = () => {}) { - this.close(callback); + async stop(): Promise { + await this.app.close(); + this.db.close(); } /** @@ -221,8 +235,6 @@ export class LabelerServer { * @returns The inserted label. */ private async saveLabel(label: UnsignedLabel): Promise { - await this.dbInitLock; - const signed = toSignedLabel(label, this.#signingKey); const { src, uri, cid, val, neg, cts, exp, sig } = signed; @@ -249,6 +261,9 @@ export class LabelerServer { * @returns The created label. */ async createLabel(label: CreateLabelData): Promise { + // Ensure the database is setup prior to creating any labels: + await this.#dbInitLock; + return await this.saveLabel( excludeNullish({ ...label, @@ -268,7 +283,8 @@ export class LabelerServer { subject: LabelSubject, labels: { create?: Array; negate?: Array }, ): Promise> { - await this.dbInitLock; + // Ensure the database is setup prior to creating any labels: + await this.#dbInitLock; const { create, negate } = labels; @@ -336,9 +352,10 @@ export class LabelerServer { /** * Handler for [com.atproto.label.queryLabels](https://github.com/bluesky-social/atproto/blob/main/lexicons/com/atproto/label/queryLabels.json). */ - queryLabelsHandler: QueryHandler = async (req, res) => { - await this.dbInitLock; - + private queryLabelsHandler: QueryHandler = async ( + req, + res, + ) => { let uriPatterns: Array; if (!req.query.uriPatterns) { uriPatterns = []; @@ -443,9 +460,7 @@ export class LabelerServer { /** * Handler for [com.atproto.label.subscribeLabels](https://github.com/bluesky-social/atproto/blob/main/lexicons/com/atproto/label/subscribeLabels.json). */ - subscribeLabelsHandler: SubscriptionHandler<{ cursor?: string }> = async (ws, req) => { - await this.dbInitLock; - + private subscribeLabelsHandler: SubscriptionHandler<{ cursor?: string }> = async (ws, req) => { const cursor = parseInt(req.query.cursor ?? "NaN", 10); if (!Number.isNaN(cursor)) { @@ -511,7 +526,7 @@ export class LabelerServer { /** * Handler for [tools.ozone.moderation.emitEvent](https://github.com/bluesky-social/atproto/blob/main/lexicons/tools/ozone/moderation/emitEvent.json). */ - emitEventHandler: ProcedureHandler = async ( + private emitEventHandler: ProcedureHandler = async ( req, res, ) => { @@ -594,7 +609,7 @@ export class LabelerServer { /** * Handler for the health check endpoint. */ - healthHandler: QueryHandler = async (_req, res) => { + private healthHandler: QueryHandler = async (_req, res) => { const VERSION = "0.2.0"; try { await this.db.execute({ sql: "SELECT 1", args: [] }); @@ -607,13 +622,13 @@ export class LabelerServer { /** * Catch-all handler for unknown XRPC methods. */ - unknownMethodHandler: QueryHandler = async (_req, res) => + private unknownMethodHandler: QueryHandler = async (_req, res) => res.status(501).send({ error: "MethodNotImplemented", message: "Method Not Implemented" }); /** * Default error handler. */ - errorHandler: typeof this.app.errorHandler = async (err, _req, res) => { + private errorHandler: typeof this.app.errorHandler = async (err, _req, res) => { if (err instanceof XRPCError) { return res.status(err.status).send({ error: err.error, message: err.description }); } else { diff --git a/src/index.ts b/src/index.ts index 0da6fd9..f96e8f8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,10 +3,7 @@ export { formatLabel, labelIsSigned, signLabel } from "./util/labels.js"; export type { CreateLabelData, FormattedLabel, - ProcedureHandler, - QueryHandler, SavedLabel, SignedLabel, - SubscriptionHandler, UnsignedLabel, } from "./util/types.js";