diff --git a/packages/postgres/src/pgpool.spec.ts b/packages/postgres/src/pgpool.spec.ts new file mode 100644 index 000000000..aa8fa3335 --- /dev/null +++ b/packages/postgres/src/pgpool.spec.ts @@ -0,0 +1,69 @@ +import { suite, test } from "@webda/test"; +import * as assert from "node:assert"; +import { __resetPools, acquirePool, poolKey, releasePool } from "./pgpool.js"; + +const cfg = { + host: "localhost", + user: "webda.io", + database: "webda.io", + password: "webda.io", + statement_timeout: 60000, + max: 1 +}; + +@suite +class PgPoolTest { + async afterEach() { + await __resetPools(); + } + + @test + async sameConfigReturnsSamePool() { + const a = acquirePool(cfg); + const b = acquirePool(cfg); + assert.strictEqual(a, b, "same config must return the same pool instance"); + await releasePool(cfg); + await releasePool(cfg); + } + + @test + async keyIsOrderInsensitive() { + const k1 = poolKey({ host: "h", user: "u", database: "d" }); + const k2 = poolKey({ database: "d", host: "h", user: "u" }); + assert.strictEqual(k1, k2, "key order should not affect the registry hash"); + } + + @test + async differentConfigReturnsDifferentPool() { + const a = acquirePool(cfg); + const b = acquirePool({ ...cfg, database: "other" }); + assert.notStrictEqual(a, b, "different configs must produce distinct pools"); + await releasePool(cfg); + await releasePool({ ...cfg, database: "other" }); + } + + @test + async refcountKeepsPoolAliveUntilLastRelease() { + const a = acquirePool(cfg); + acquirePool(cfg); // refcount = 2 + + await releasePool(cfg); // refcount = 1, pool still alive + // Re-acquiring after partial release returns the same pool. + const c = acquirePool(cfg); + assert.strictEqual(a, c, "pool should still be alive while refcount > 0"); + + await releasePool(cfg); // refcount = 1 + await releasePool(cfg); // refcount = 0, pool ends + + // After full release, a fresh acquisition produces a new pool. + const d = acquirePool(cfg); + assert.notStrictEqual(a, d, "after final release a new pool must be created"); + await releasePool(cfg); + } + + @test + async releasingUnknownConfigIsNoop() { + // Should not throw — `stop()` paths call this unconditionally. + await releasePool({ host: "never-acquired", database: "x" }); + } +} diff --git a/packages/postgres/src/pgpool.ts b/packages/postgres/src/pgpool.ts new file mode 100644 index 000000000..9d3de9991 --- /dev/null +++ b/packages/postgres/src/pgpool.ts @@ -0,0 +1,114 @@ +import pg, { PoolConfig } from "pg"; + +/** + * Stable JSON serializer that sorts object keys so two configs with the + * same content but different key order hash to the same registry key. + * + * Functions and classes (e.g. `pg.Pool` custom types) are dropped — pools + * configured with non-serializable hooks must be created directly. + * + * @param value - any JSON-serializable value + * @returns a stable string representation + */ +function stableStringify(value: any): string { + if (value === null || typeof value !== "object") { + return JSON.stringify(value); + } + if (Array.isArray(value)) { + return "[" + value.map(stableStringify).join(",") + "]"; + } + const keys = Object.keys(value).sort(); + return ( + "{" + + keys + .map(k => { + const v = (value as any)[k]; + if (typeof v === "function") return undefined; + return JSON.stringify(k) + ":" + stableStringify(v); + }) + .filter(s => s !== undefined) + .join(",") + + "}" + ); +} + +interface PoolEntry { + pool: pg.Pool; + refCount: number; +} + +const registry = new Map(); + +/** + * Build the registry key for a config. Exposed for tests. + * + * @param config - the pg pool config (may be undefined — pg falls back to PG* env) + * @returns the registry key + */ +export function poolKey(config: PoolConfig | undefined): string { + return stableStringify(config ?? {}); +} + +/** + * Acquire a shared `pg.Pool` for the given config. Pools are cached by + * stable config-hash; callers that pass the same config (regardless of + * key order) get the same pool back. Each `acquirePool` increments a + * refcount; the pool is `end()`ed when the last consumer calls + * {@link releasePool}. + * + * Use this for stateless workloads (queries, queue receive, pub/sub + * publish). LISTEN must use a dedicated `pg.Client` because the + * subscription is bound to a single connection — pooled connections + * rotate and would silently drop the subscription. + * + * @param config - the pg pool config + * @returns the shared pool + */ +export function acquirePool(config: PoolConfig | undefined): pg.Pool { + const key = poolKey(config); + const entry = registry.get(key); + if (entry) { + entry.refCount++; + return entry.pool; + } + const pool = new pg.Pool(config); + registry.set(key, { pool, refCount: 1 }); + return pool; +} + +/** + * Release a previously-acquired pool. When the refcount reaches zero, + * the pool is `end()`ed and removed from the registry. Releasing a + * config that was never acquired (or already drained) is a no-op so + * `stop()` paths can call this unconditionally. + * + * @param config - the same config that was passed to {@link acquirePool} + * @returns a promise that resolves once the pool has been ended (or immediately if still in use / unknown) + */ +export async function releasePool(config: PoolConfig | undefined): Promise { + const key = poolKey(config); + const entry = registry.get(key); + if (!entry) return; + entry.refCount--; + if (entry.refCount > 0) return; + registry.delete(key); + await entry.pool.end().catch(() => { + /* already ended */ + }); +} + +/** + * Test-only: drain and forget every shared pool. Used between specs to + * keep the registry from leaking handles across files. + */ +export async function __resetPools(): Promise { + const entries = [...registry.values()]; + registry.clear(); + await Promise.all( + entries.map(e => + e.pool.end().catch(() => { + /* already ended */ + }) + ) + ); +} diff --git a/packages/postgres/src/postgrespubsub.ts b/packages/postgres/src/postgrespubsub.ts index c3a483360..a24a47334 100644 --- a/packages/postgres/src/postgrespubsub.ts +++ b/packages/postgres/src/postgrespubsub.ts @@ -1,7 +1,8 @@ import { PubSubService, ServiceParameters } from "@webda/core"; import { CancelablePromise, JSONUtils } from "@webda/utils"; import { useLog } from "@webda/workout"; -import pg, { ClientConfig } from "pg"; +import pg, { ClientConfig, PoolConfig } from "pg"; +import { acquirePool, releasePool } from "./pgpool.js"; /** * NOTIFY's payload limit is 8000 bytes (the underlying NAMEDATALEN / @@ -17,6 +18,11 @@ export class PostgresPubSubParameters extends ServiceParameters { /** * Channel name passed to LISTEN / NOTIFY. Must be a valid Postgres * identifier (lowercased, no quoting). Defaults to the service name. + * @pattern /^[a-z_][a-z0-9_]*$/ + * @example "myapp_events" + * @example "user_notifications" + * @example "cache_invalidation" + * @example "orders" */ channel?: string; /** @@ -68,9 +74,18 @@ export default class PostgresPubSubService< K extends PostgresPubSubParameters = PostgresPubSubParameters > extends PubSubService { /** - * Long-lived listener client. One per service instance. + * Long-lived listener client. One per service instance because LISTEN is + * scoped to the connection that issued it — a pool would silently rotate + * and drop the subscription. */ protected client?: pg.Client; + /** + * Shared pool used for the publish side (`pg_notify`). Acquired from + * the per-config pool registry so multiple services that target the + * same database share connections instead of each spinning up their + * own pool. Subscribe-only services skip this and never publish. + */ + protected publishPool?: pg.Pool; /** * Local callback registrations. Notifications dispatch to all of them. */ @@ -180,7 +195,14 @@ export default class PostgresPubSubService< ); } this.metrics?.messages_sent?.inc(); - await this.client.query("SELECT pg_notify($1, $2)", [this.channel(), payload]); + // Lazily acquire a shared publish pool: subscribe-only services never + // call sendMessage and so never pay the connection cost. Acquired + // pools are reference-counted in the registry, so multiple postgres + // services pointing at the same database share connections. + if (!this.publishPool) { + this.publishPool = acquirePool(this.parameters.postgresqlServer as PoolConfig); + } + await this.publishPool.query("SELECT pg_notify($1, $2)", [this.channel(), payload]); } /** @@ -232,6 +254,10 @@ export default class PostgresPubSubService< /* already ended */ }); } + if (this.publishPool) { + this.publishPool = undefined; + await releasePool(this.parameters.postgresqlServer as PoolConfig); + } await super.stop(); } } diff --git a/packages/postgres/src/postgresqueue.ts b/packages/postgres/src/postgresqueue.ts index 5aaa56ad7..bf00a22d9 100644 --- a/packages/postgres/src/postgresqueue.ts +++ b/packages/postgres/src/postgresqueue.ts @@ -2,6 +2,7 @@ import { MessageReceipt, Queue, QueueParameters } from "@webda/core"; import { JSONUtils } from "@webda/utils"; import { useLog } from "@webda/workout"; import pg, { ClientConfig, PoolConfig } from "pg"; +import { acquirePool, releasePool } from "./pgpool.js"; /** * Configuration for {@link PostgresQueueService}. @@ -11,6 +12,11 @@ export class PostgresQueueParameters extends QueueParameters { * Table name backing the queue. Auto-created on init if missing. * * @default "webda_queue" + * @pattern /^[a-zA-Z_][a-zA-Z0-9_]*$/ + * @example "myapp_queue" + * @example "user_notifications" + * @example "cache_invalidation" + * @example "orders" */ table?: string; /** @@ -88,6 +94,11 @@ export default class PostgresQueueService< * locks rotate across connections and benefit from concurrency. */ protected client?: pg.Client | pg.Pool; + /** + * True when {@link client} is a shared pool acquired from the registry. + * Drives the matching {@link releasePool} call in {@link stop}. + */ + protected ownsPool = false; /** * Resolved table identifier. Validated at init to keep the table name @@ -107,10 +118,11 @@ export default class PostgresQueueService< if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(this.table)) { throw new Error(`Invalid table name "${this.table}" — must match /^[a-zA-Z_][a-zA-Z0-9_]*$/`); } - this.client = this.parameters.usePool - ? new pg.Pool(this.parameters.postgresqlServer as PoolConfig) - : new pg.Client(this.parameters.postgresqlServer as ClientConfig); - if (this.client instanceof pg.Client) { + if (this.parameters.usePool) { + this.client = acquirePool(this.parameters.postgresqlServer as PoolConfig); + this.ownsPool = true; + } else { + this.client = new pg.Client(this.parameters.postgresqlServer as ClientConfig); await this.client.connect(); } if (this.parameters.autoCreateTable) { @@ -241,13 +253,13 @@ export default class PostgresQueueService< async stop(): Promise { if (this.client) { const client = this.client; + const owned = this.ownsPool; this.client = undefined; - if (client instanceof pg.Pool) { - await client.end().catch(() => { - /* already ended */ - }); + this.ownsPool = false; + if (owned) { + await releasePool(this.parameters.postgresqlServer as PoolConfig); } else { - await client.end().catch(() => { + await (client as pg.Client).end().catch(() => { /* already ended */ }); } diff --git a/packages/postgres/src/postgresstore.ts b/packages/postgres/src/postgresstore.ts index 34a835463..3859a3a5e 100644 --- a/packages/postgres/src/postgresstore.ts +++ b/packages/postgres/src/postgresstore.ts @@ -2,6 +2,7 @@ import { InstanceCache, useApplication, useCore, useModel, useModelMetadata } fr import type { ModelClass, Repository } from "@webda/core"; import { JSONSchema7 } from "json-schema"; import pg, { ClientConfig, PoolConfig } from "pg"; +import { acquirePool, releasePool } from "./pgpool.js"; import { PostgresRepository, SQLStore, SQLStoreParameters } from "./sqlstore.js"; import { useLog } from "@webda/workout"; @@ -35,6 +36,11 @@ export class PostgresParameters extends SQLStoreParameters { autoCreateTable?: boolean; /** * View name prefix + * @default "" + * @pattern /^[a-zA-Z_][a-zA-Z0-9_]*$/ + * @example "app_" + * @example "v_" + * @example "view_" */ viewPrefix?: string; /** @@ -85,13 +91,19 @@ export class PostgresParameters extends SQLStoreParameters { */ export class PostgresStore extends SQLStore { client: pg.Client | pg.Pool; + /** + * True when {@link client} is a shared pool acquired from the registry. + * Tracked so {@link stop} only releases pools it took a refcount on. + */ + protected ownsPool = false; /** * @override */ async init(): Promise { if (this.parameters.usePool) { - this.client = new pg.Pool(this.parameters.postgresqlServer); + this.client = acquirePool(this.parameters.postgresqlServer as PoolConfig); + this.ownsPool = true; } else { this.client = new pg.Client(this.parameters.postgresqlServer); await this.client.connect(); @@ -101,6 +113,22 @@ export class PostgresStore ex return this; } + /** + * @override + */ + async stop(): Promise { + if (this.ownsPool) { + this.ownsPool = false; + await releasePool(this.parameters.postgresqlServer as PoolConfig); + } else if (this.client && "end" in this.client) { + await (this.client as pg.Client).end().catch(() => { + /* already ended */ + }); + } + this.client = undefined as any; + await super.stop(); + } + /** * Resolve the table name for a given model class. * diff --git a/packages/postgres/webda.module.json b/packages/postgres/webda.module.json index a6ceade4b..6aebc185b 100644 --- a/packages/postgres/webda.module.json +++ b/packages/postgres/webda.module.json @@ -621,6 +621,13 @@ "properties": { "channel": { "description": "Channel name passed to LISTEN / NOTIFY. Must be a valid Postgres\nidentifier (lowercased, no quoting). Defaults to the service name.", + "example": [ + "myapp_events", + "user_notifications", + "cache_invalidation", + "orders" + ], + "pattern": "/^[a-z_][a-z0-9_]*$/", "type": "string" }, "postgresqlServer": { @@ -1425,6 +1432,13 @@ "table": { "default": "webda_queue", "description": "Table name backing the queue. Auto-created on init if missing.", + "example": [ + "myapp_queue", + "user_notifications", + "cache_invalidation", + "orders" + ], + "pattern": "/^[a-zA-Z_][a-zA-Z0-9_]*$/", "type": "string" }, "type": { @@ -2217,12 +2231,6 @@ } }, "properties": { - "_modelExplicit": { - "default": false, - "description": "True when `model` was explicitly provided in the raw configuration.\nStores that use the default model (RegistryEntry) without explicit\nconfiguration will not claim any model hierarchy.", - "internal": true, - "type": "boolean" - }, "additionalModels": { "default": [], "description": "Additional models\n\nAllow this store to manage other models", @@ -2300,7 +2308,14 @@ "type": "boolean" }, "viewPrefix": { + "default": "", "description": "View name prefix", + "example": [ + "app_", + "v_", + "view_" + ], + "pattern": "/^[a-zA-Z_][a-zA-Z0-9_]*$/", "type": "string" }, "views": { @@ -2375,5 +2390,5 @@ } }, "behaviors": {}, - "sourceDigest": "29d5516c000ff464267b110fae968aa0" + "sourceDigest": "b978479cb9042a8fefb9146a86c95666" } \ No newline at end of file