Skip to content
This repository was archived by the owner on Feb 18, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 88 additions & 73 deletions src/LabelerServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { ToolsOzoneModerationEmitEvent } from "@atcute/ozone";
import { XRPCError } from "@atcute/xrpc-server";

import { fastifyWebsocket } from "@fastify/websocket";
import { Client, createClient } from "@libsql/client";
import { Client, Config, createClient } from "@libsql/client";
import fastify, {
type FastifyInstance,
type FastifyListenOptions,
Expand Down Expand Up @@ -71,15 +71,23 @@ export interface LabelerOptions {
dbToken?: string;
}

export class LabelerServer {
/** The Fastify application instance. */
interface LabelerContext {
auth: (did: string) => boolean | Promise<boolean>;
databaseConfig: Config;
app: FastifyInstance;
did: Did;
signingKey: Uint8Array;
}

/** The SQLite database instance. */
db: Client;
export class LabelerServer {
/** The Fastify application instance. */
readonly app: FastifyInstance;

/** The DID of the labeler account. */
did: Did;
readonly did: Did;

/** The SQLite database instance. */
private db: Client;

/** A function that returns whether a DID is authorized to create labels. */
private auth: (did: string) => boolean | Promise<boolean>;
Expand All @@ -88,59 +96,78 @@ export class LabelerServer {
private connections = new Map<string, Set<WebSocket>>();

/** The signing key used for the labeler. */
#signingKey: Uint8Array;
readonly #signingKey: Uint8Array;

/**
* Promise that resolves when database initialization is complete. This should
* be awaited before any database operations. It is automatically awaited
* before starting the server.
*/
readonly #dbInitLock?: Promise<void>;

/**
* Promise that resolves when database initialization is complete.
* This should be awaited before any database operations.
* @private
*/
private readonly dbInitLock?: Promise<void>;
constructor(context: LabelerContext) {
this.app = context.app;
this.did = context.did;
this.auth = context.auth;
this.#signingKey = context.signingKey;

this.db = createClient(context.databaseConfig);
this.#dbInitLock = this.initializeDatabase();

// Register the routes:
this.app.get("/xrpc/com.atproto.label.queryLabels", this.queryLabelsHandler);
this.app.post("/xrpc/tools.ozone.moderation.emitEvent", this.emitEventHandler);
this.app.get(
"/xrpc/com.atproto.label.subscribeLabels",
{ websocket: true },
this.subscribeLabelsHandler,
);
this.app.get("/xrpc/_health", this.healthHandler);
this.app.get("/xrpc/*", this.unknownMethodHandler);
this.app.setErrorHandler(this.errorHandler);
}

/**
* Create a labeler server.
* @param options Configuration options.
*/
constructor(options: LabelerOptions) {
this.did = options.did as Did;
this.auth = options.auth ?? ((did) => did === this.did);

if (!isDid(this.did)) {
static async create(options: LabelerOptions): Promise<LabelerServer> {
if (!isDid(options.did)) {
throw new Error(INVALID_DID_ERROR);
}

let signingKey;
try {
if (options.signingKey.startsWith("did:key:")) throw 0;
this.#signingKey = parsePrivateKey(options.signingKey);
if (this.#signingKey.byteLength !== 32) throw 0;
signingKey = parsePrivateKey(options.signingKey);
if (signingKey.byteLength !== 32) throw 0;
} catch {
throw new Error(INVALID_SIGNING_KEY_ERROR);
}

let databaseConfig: Config = { url: "file:" + (options.dbPath ?? "labels.db") };

if (options.dbUrl) {
if (!options.dbToken) {
throw new Error(
"The `dbToken` option is required when using a remote database URL.",
);
}
this.db = createClient({ url: options.dbUrl, authToken: options.dbToken });
} else {
this.db = createClient({ url: "file:" + (options.dbPath ?? "labels.db") });
databaseConfig = { url: options.dbUrl, authToken: options.dbToken };
}

this.dbInitLock = this.initializeDatabase();
const app = fastify();
await app.register(fastifyWebsocket);

this.app = fastify();
void this.app.register(fastifyWebsocket).then(() => {
this.app.get("/xrpc/com.atproto.label.queryLabels", this.queryLabelsHandler);
this.app.post("/xrpc/tools.ozone.moderation.emitEvent", this.emitEventHandler);
this.app.get(
"/xrpc/com.atproto.label.subscribeLabels",
{ websocket: true },
this.subscribeLabelsHandler,
);
this.app.get("/xrpc/_health", this.healthHandler);
this.app.get("/xrpc/*", this.unknownMethodHandler);
this.app.setErrorHandler(this.errorHandler);
return new LabelerServer({
databaseConfig,
app,
signingKey,
did: options.did,
auth: typeof options.auth === "function" ? options.auth : (did) => did === options.did,
});
}

Expand Down Expand Up @@ -176,43 +203,30 @@ export class LabelerServer {
/**
* Start the server.
* @param port The port to listen on.
* @param callback A callback to run when the server is started.
* @returns address of the server
*/
start(port: number, callback: (error: Error | null, address: string) => void): void;
async start(port: number): Promise<string>;
/**
* Start the server.
* @param options Options for the server.
* @param callback A callback to run when the server is started.
* @returns address of the server
*/
start(
options: FastifyListenOptions,
callback: (error: Error | null, address: string) => void,
): void;
start(
portOrOptions: number | FastifyListenOptions,
callback: (error: Error | null, address: string) => void = () => {},
) {
if (typeof portOrOptions === "number") {
this.app.listen({ port: portOrOptions }, callback);
} else {
this.app.listen(portOrOptions, callback);
}
}
async start(options: FastifyListenOptions): Promise<string>;
async start(portOrOptions: number | FastifyListenOptions) {
// ensure the database is setup prior to listening:
await this.#dbInitLock;

/**
* Stop the server.
* @param callback A callback to run when the server is stopped.
*/
close(callback: () => void = () => {}) {
this.app.close(callback);
const options = typeof portOrOptions === "number" ? { port: portOrOptions } : portOrOptions;

return await this.app.listen(options);
}

/**
* Alias for {@link LabelerServer#close}.
* @param callback A callback to run when the server is stopped.
* Stop the server & closes the database:
*/
stop(callback: () => void = () => {}) {
this.close(callback);
async stop(): Promise<void> {
await this.app.close();
this.db.close();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would mean that you couldn't emit labels after the server has stopped, but I think that's fine?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason this method also doesn't show up in the docs as a promise, I've no idea why.

}

/**
Expand All @@ -221,8 +235,6 @@ export class LabelerServer {
* @returns The inserted label.
*/
private async saveLabel(label: UnsignedLabel): Promise<SavedLabel> {
await this.dbInitLock;

const signed = toSignedLabel(label, this.#signingKey);
const { src, uri, cid, val, neg, cts, exp, sig } = signed;

Expand All @@ -249,6 +261,9 @@ export class LabelerServer {
* @returns The created label.
*/
async createLabel(label: CreateLabelData): Promise<SavedLabel> {
// Ensure the database is setup prior to creating any labels:
await this.#dbInitLock;

return await this.saveLabel(
excludeNullish({
...label,
Expand All @@ -268,7 +283,8 @@ export class LabelerServer {
subject: LabelSubject,
labels: { create?: Array<string>; negate?: Array<string> },
): Promise<Array<SavedLabel>> {
await this.dbInitLock;
// Ensure the database is setup prior to creating any labels:
await this.#dbInitLock;

const { create, negate } = labels;

Expand Down Expand Up @@ -336,9 +352,10 @@ export class LabelerServer {
/**
* Handler for [com.atproto.label.queryLabels](https://github.com/bluesky-social/atproto/blob/main/lexicons/com/atproto/label/queryLabels.json).
*/
queryLabelsHandler: QueryHandler<ComAtprotoLabelQueryLabels.$params> = async (req, res) => {
await this.dbInitLock;

private queryLabelsHandler: QueryHandler<ComAtprotoLabelQueryLabels.$params> = async (
req,
res,
) => {
let uriPatterns: Array<string>;
if (!req.query.uriPatterns) {
uriPatterns = [];
Expand Down Expand Up @@ -443,9 +460,7 @@ export class LabelerServer {
/**
* Handler for [com.atproto.label.subscribeLabels](https://github.com/bluesky-social/atproto/blob/main/lexicons/com/atproto/label/subscribeLabels.json).
*/
subscribeLabelsHandler: SubscriptionHandler<{ cursor?: string }> = async (ws, req) => {
await this.dbInitLock;

private subscribeLabelsHandler: SubscriptionHandler<{ cursor?: string }> = async (ws, req) => {
const cursor = parseInt(req.query.cursor ?? "NaN", 10);

if (!Number.isNaN(cursor)) {
Expand Down Expand Up @@ -511,7 +526,7 @@ export class LabelerServer {
/**
* Handler for [tools.ozone.moderation.emitEvent](https://github.com/bluesky-social/atproto/blob/main/lexicons/tools/ozone/moderation/emitEvent.json).
*/
emitEventHandler: ProcedureHandler<ToolsOzoneModerationEmitEvent.$output> = async (
private emitEventHandler: ProcedureHandler<ToolsOzoneModerationEmitEvent.$output> = async (
req,
res,
) => {
Expand Down Expand Up @@ -594,7 +609,7 @@ export class LabelerServer {
/**
* Handler for the health check endpoint.
*/
healthHandler: QueryHandler = async (_req, res) => {
private healthHandler: QueryHandler = async (_req, res) => {
const VERSION = "0.2.0";
try {
await this.db.execute({ sql: "SELECT 1", args: [] });
Expand All @@ -607,13 +622,13 @@ export class LabelerServer {
/**
* Catch-all handler for unknown XRPC methods.
*/
unknownMethodHandler: QueryHandler = async (_req, res) =>
private unknownMethodHandler: QueryHandler = async (_req, res) =>
res.status(501).send({ error: "MethodNotImplemented", message: "Method Not Implemented" });

/**
* Default error handler.
*/
errorHandler: typeof this.app.errorHandler = async (err, _req, res) => {
private errorHandler: typeof this.app.errorHandler = async (err, _req, res) => {
if (err instanceof XRPCError) {
return res.status(err.status).send({ error: err.error, message: err.description });
} else {
Expand Down
3 changes: 0 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ export { formatLabel, labelIsSigned, signLabel } from "./util/labels.js";
export type {
CreateLabelData,
FormattedLabel,
ProcedureHandler,
QueryHandler,
SavedLabel,
SignedLabel,
SubscriptionHandler,
UnsignedLabel,
} from "./util/types.js";