From 52f812049efbb66fa9455ce0bc03d63ead07627f Mon Sep 17 00:00:00 2001 From: Remi Cattiau Date: Sat, 9 May 2026 05:45:56 -0700 Subject: [PATCH] feat(postgres): share pg.Pool across services with same config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a refcounted pool registry keyed by stable JSON hash of the pg config so multiple postgres services targeting the same database share connections instead of each spinning up their own pool. PostgresStore and PostgresQueueService acquire the shared pool when `usePool: true`; PostgresPubSubService keeps a dedicated pg.Client for LISTEN (subscription is bound to a single connection — a pool would silently rotate and drop it) but its publish path lazily acquires the shared pool, so subscribe-only services pay zero pool cost. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/postgres/src/pgpool.spec.ts | 69 ++++++++++++++ packages/postgres/src/pgpool.ts | 114 ++++++++++++++++++++++++ packages/postgres/src/postgrespubsub.ts | 32 ++++++- packages/postgres/src/postgresqueue.ts | 30 +++++-- packages/postgres/src/postgresstore.ts | 30 ++++++- packages/postgres/webda.module.json | 29 ++++-- 6 files changed, 284 insertions(+), 20 deletions(-) create mode 100644 packages/postgres/src/pgpool.spec.ts create mode 100644 packages/postgres/src/pgpool.ts diff --git a/packages/postgres/src/pgpool.spec.ts b/packages/postgres/src/pgpool.spec.ts new file mode 100644 index 000000000..aa8fa3335 --- /dev/null +++ b/packages/postgres/src/pgpool.spec.ts @@ -0,0 +1,69 @@ +import { suite, test } from "@webda/test"; +import * as assert from "node:assert"; +import { __resetPools, acquirePool, poolKey, releasePool } from "./pgpool.js"; + +const cfg = { + host: "localhost", + user: "webda.io", + database: "webda.io", + password: "webda.io", + statement_timeout: 60000, + max: 1 +}; + +@suite +class PgPoolTest { + async afterEach() { + await __resetPools(); + } + + @test + async sameConfigReturnsSamePool() { + const a = acquirePool(cfg); + const b = acquirePool(cfg); + assert.strictEqual(a, b, "same config must return the same pool instance"); + await releasePool(cfg); + await releasePool(cfg); + } + + @test + async keyIsOrderInsensitive() { + const k1 = poolKey({ host: "h", user: "u", database: "d" }); + const k2 = poolKey({ database: "d", host: "h", user: "u" }); + assert.strictEqual(k1, k2, "key order should not affect the registry hash"); + } + + @test + async differentConfigReturnsDifferentPool() { + const a = acquirePool(cfg); + const b = acquirePool({ ...cfg, database: "other" }); + assert.notStrictEqual(a, b, "different configs must produce distinct pools"); + await releasePool(cfg); + await releasePool({ ...cfg, database: "other" }); + } + + @test + async refcountKeepsPoolAliveUntilLastRelease() { + const a = acquirePool(cfg); + acquirePool(cfg); // refcount = 2 + + await releasePool(cfg); // refcount = 1, pool still alive + // Re-acquiring after partial release returns the same pool. + const c = acquirePool(cfg); + assert.strictEqual(a, c, "pool should still be alive while refcount > 0"); + + await releasePool(cfg); // refcount = 1 + await releasePool(cfg); // refcount = 0, pool ends + + // After full release, a fresh acquisition produces a new pool. + const d = acquirePool(cfg); + assert.notStrictEqual(a, d, "after final release a new pool must be created"); + await releasePool(cfg); + } + + @test + async releasingUnknownConfigIsNoop() { + // Should not throw — `stop()` paths call this unconditionally. + await releasePool({ host: "never-acquired", database: "x" }); + } +} diff --git a/packages/postgres/src/pgpool.ts b/packages/postgres/src/pgpool.ts new file mode 100644 index 000000000..9d3de9991 --- /dev/null +++ b/packages/postgres/src/pgpool.ts @@ -0,0 +1,114 @@ +import pg, { PoolConfig } from "pg"; + +/** + * Stable JSON serializer that sorts object keys so two configs with the + * same content but different key order hash to the same registry key. + * + * Functions and classes (e.g. `pg.Pool` custom types) are dropped — pools + * configured with non-serializable hooks must be created directly. + * + * @param value - any JSON-serializable value + * @returns a stable string representation + */ +function stableStringify(value: any): string { + if (value === null || typeof value !== "object") { + return JSON.stringify(value); + } + if (Array.isArray(value)) { + return "[" + value.map(stableStringify).join(",") + "]"; + } + const keys = Object.keys(value).sort(); + return ( + "{" + + keys + .map(k => { + const v = (value as any)[k]; + if (typeof v === "function") return undefined; + return JSON.stringify(k) + ":" + stableStringify(v); + }) + .filter(s => s !== undefined) + .join(",") + + "}" + ); +} + +interface PoolEntry { + pool: pg.Pool; + refCount: number; +} + +const registry = new Map(); + +/** + * Build the registry key for a config. Exposed for tests. + * + * @param config - the pg pool config (may be undefined — pg falls back to PG* env) + * @returns the registry key + */ +export function poolKey(config: PoolConfig | undefined): string { + return stableStringify(config ?? {}); +} + +/** + * Acquire a shared `pg.Pool` for the given config. Pools are cached by + * stable config-hash; callers that pass the same config (regardless of + * key order) get the same pool back. Each `acquirePool` increments a + * refcount; the pool is `end()`ed when the last consumer calls + * {@link releasePool}. + * + * Use this for stateless workloads (queries, queue receive, pub/sub + * publish). LISTEN must use a dedicated `pg.Client` because the + * subscription is bound to a single connection — pooled connections + * rotate and would silently drop the subscription. + * + * @param config - the pg pool config + * @returns the shared pool + */ +export function acquirePool(config: PoolConfig | undefined): pg.Pool { + const key = poolKey(config); + const entry = registry.get(key); + if (entry) { + entry.refCount++; + return entry.pool; + } + const pool = new pg.Pool(config); + registry.set(key, { pool, refCount: 1 }); + return pool; +} + +/** + * Release a previously-acquired pool. When the refcount reaches zero, + * the pool is `end()`ed and removed from the registry. Releasing a + * config that was never acquired (or already drained) is a no-op so + * `stop()` paths can call this unconditionally. + * + * @param config - the same config that was passed to {@link acquirePool} + * @returns a promise that resolves once the pool has been ended (or immediately if still in use / unknown) + */ +export async function releasePool(config: PoolConfig | undefined): Promise { + const key = poolKey(config); + const entry = registry.get(key); + if (!entry) return; + entry.refCount--; + if (entry.refCount > 0) return; + registry.delete(key); + await entry.pool.end().catch(() => { + /* already ended */ + }); +} + +/** + * Test-only: drain and forget every shared pool. Used between specs to + * keep the registry from leaking handles across files. + */ +export async function __resetPools(): Promise { + const entries = [...registry.values()]; + registry.clear(); + await Promise.all( + entries.map(e => + e.pool.end().catch(() => { + /* already ended */ + }) + ) + ); +} diff --git a/packages/postgres/src/postgrespubsub.ts b/packages/postgres/src/postgrespubsub.ts index c3a483360..a24a47334 100644 --- a/packages/postgres/src/postgrespubsub.ts +++ b/packages/postgres/src/postgrespubsub.ts @@ -1,7 +1,8 @@ import { PubSubService, ServiceParameters } from "@webda/core"; import { CancelablePromise, JSONUtils } from "@webda/utils"; import { useLog } from "@webda/workout"; -import pg, { ClientConfig } from "pg"; +import pg, { ClientConfig, PoolConfig } from "pg"; +import { acquirePool, releasePool } from "./pgpool.js"; /** * NOTIFY's payload limit is 8000 bytes (the underlying NAMEDATALEN / @@ -17,6 +18,11 @@ export class PostgresPubSubParameters extends ServiceParameters { /** * Channel name passed to LISTEN / NOTIFY. Must be a valid Postgres * identifier (lowercased, no quoting). Defaults to the service name. + * @pattern /^[a-z_][a-z0-9_]*$/ + * @example "myapp_events" + * @example "user_notifications" + * @example "cache_invalidation" + * @example "orders" */ channel?: string; /** @@ -68,9 +74,18 @@ export default class PostgresPubSubService< K extends PostgresPubSubParameters = PostgresPubSubParameters > extends PubSubService { /** - * Long-lived listener client. One per service instance. + * Long-lived listener client. One per service instance because LISTEN is + * scoped to the connection that issued it — a pool would silently rotate + * and drop the subscription. */ protected client?: pg.Client; + /** + * Shared pool used for the publish side (`pg_notify`). Acquired from + * the per-config pool registry so multiple services that target the + * same database share connections instead of each spinning up their + * own pool. Subscribe-only services skip this and never publish. + */ + protected publishPool?: pg.Pool; /** * Local callback registrations. Notifications dispatch to all of them. */ @@ -180,7 +195,14 @@ export default class PostgresPubSubService< ); } this.metrics?.messages_sent?.inc(); - await this.client.query("SELECT pg_notify($1, $2)", [this.channel(), payload]); + // Lazily acquire a shared publish pool: subscribe-only services never + // call sendMessage and so never pay the connection cost. Acquired + // pools are reference-counted in the registry, so multiple postgres + // services pointing at the same database share connections. + if (!this.publishPool) { + this.publishPool = acquirePool(this.parameters.postgresqlServer as PoolConfig); + } + await this.publishPool.query("SELECT pg_notify($1, $2)", [this.channel(), payload]); } /** @@ -232,6 +254,10 @@ export default class PostgresPubSubService< /* already ended */ }); } + if (this.publishPool) { + this.publishPool = undefined; + await releasePool(this.parameters.postgresqlServer as PoolConfig); + } await super.stop(); } } diff --git a/packages/postgres/src/postgresqueue.ts b/packages/postgres/src/postgresqueue.ts index 5aaa56ad7..bf00a22d9 100644 --- a/packages/postgres/src/postgresqueue.ts +++ b/packages/postgres/src/postgresqueue.ts @@ -2,6 +2,7 @@ import { MessageReceipt, Queue, QueueParameters } from "@webda/core"; import { JSONUtils } from "@webda/utils"; import { useLog } from "@webda/workout"; import pg, { ClientConfig, PoolConfig } from "pg"; +import { acquirePool, releasePool } from "./pgpool.js"; /** * Configuration for {@link PostgresQueueService}. @@ -11,6 +12,11 @@ export class PostgresQueueParameters extends QueueParameters { * Table name backing the queue. Auto-created on init if missing. * * @default "webda_queue" + * @pattern /^[a-zA-Z_][a-zA-Z0-9_]*$/ + * @example "myapp_queue" + * @example "user_notifications" + * @example "cache_invalidation" + * @example "orders" */ table?: string; /** @@ -88,6 +94,11 @@ export default class PostgresQueueService< * locks rotate across connections and benefit from concurrency. */ protected client?: pg.Client | pg.Pool; + /** + * True when {@link client} is a shared pool acquired from the registry. + * Drives the matching {@link releasePool} call in {@link stop}. + */ + protected ownsPool = false; /** * Resolved table identifier. Validated at init to keep the table name @@ -107,10 +118,11 @@ export default class PostgresQueueService< if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(this.table)) { throw new Error(`Invalid table name "${this.table}" — must match /^[a-zA-Z_][a-zA-Z0-9_]*$/`); } - this.client = this.parameters.usePool - ? new pg.Pool(this.parameters.postgresqlServer as PoolConfig) - : new pg.Client(this.parameters.postgresqlServer as ClientConfig); - if (this.client instanceof pg.Client) { + if (this.parameters.usePool) { + this.client = acquirePool(this.parameters.postgresqlServer as PoolConfig); + this.ownsPool = true; + } else { + this.client = new pg.Client(this.parameters.postgresqlServer as ClientConfig); await this.client.connect(); } if (this.parameters.autoCreateTable) { @@ -241,13 +253,13 @@ export default class PostgresQueueService< async stop(): Promise { if (this.client) { const client = this.client; + const owned = this.ownsPool; this.client = undefined; - if (client instanceof pg.Pool) { - await client.end().catch(() => { - /* already ended */ - }); + this.ownsPool = false; + if (owned) { + await releasePool(this.parameters.postgresqlServer as PoolConfig); } else { - await client.end().catch(() => { + await (client as pg.Client).end().catch(() => { /* already ended */ }); } diff --git a/packages/postgres/src/postgresstore.ts b/packages/postgres/src/postgresstore.ts index 34a835463..3859a3a5e 100644 --- a/packages/postgres/src/postgresstore.ts +++ b/packages/postgres/src/postgresstore.ts @@ -2,6 +2,7 @@ import { InstanceCache, useApplication, useCore, useModel, useModelMetadata } fr import type { ModelClass, Repository } from "@webda/core"; import { JSONSchema7 } from "json-schema"; import pg, { ClientConfig, PoolConfig } from "pg"; +import { acquirePool, releasePool } from "./pgpool.js"; import { PostgresRepository, SQLStore, SQLStoreParameters } from "./sqlstore.js"; import { useLog } from "@webda/workout"; @@ -35,6 +36,11 @@ export class PostgresParameters extends SQLStoreParameters { autoCreateTable?: boolean; /** * View name prefix + * @default "" + * @pattern /^[a-zA-Z_][a-zA-Z0-9_]*$/ + * @example "app_" + * @example "v_" + * @example "view_" */ viewPrefix?: string; /** @@ -85,13 +91,19 @@ export class PostgresParameters extends SQLStoreParameters { */ export class PostgresStore extends SQLStore { client: pg.Client | pg.Pool; + /** + * True when {@link client} is a shared pool acquired from the registry. + * Tracked so {@link stop} only releases pools it took a refcount on. + */ + protected ownsPool = false; /** * @override */ async init(): Promise { if (this.parameters.usePool) { - this.client = new pg.Pool(this.parameters.postgresqlServer); + this.client = acquirePool(this.parameters.postgresqlServer as PoolConfig); + this.ownsPool = true; } else { this.client = new pg.Client(this.parameters.postgresqlServer); await this.client.connect(); @@ -101,6 +113,22 @@ export class PostgresStore ex return this; } + /** + * @override + */ + async stop(): Promise { + if (this.ownsPool) { + this.ownsPool = false; + await releasePool(this.parameters.postgresqlServer as PoolConfig); + } else if (this.client && "end" in this.client) { + await (this.client as pg.Client).end().catch(() => { + /* already ended */ + }); + } + this.client = undefined as any; + await super.stop(); + } + /** * Resolve the table name for a given model class. * diff --git a/packages/postgres/webda.module.json b/packages/postgres/webda.module.json index a6ceade4b..6aebc185b 100644 --- a/packages/postgres/webda.module.json +++ b/packages/postgres/webda.module.json @@ -621,6 +621,13 @@ "properties": { "channel": { "description": "Channel name passed to LISTEN / NOTIFY. Must be a valid Postgres\nidentifier (lowercased, no quoting). Defaults to the service name.", + "example": [ + "myapp_events", + "user_notifications", + "cache_invalidation", + "orders" + ], + "pattern": "/^[a-z_][a-z0-9_]*$/", "type": "string" }, "postgresqlServer": { @@ -1425,6 +1432,13 @@ "table": { "default": "webda_queue", "description": "Table name backing the queue. Auto-created on init if missing.", + "example": [ + "myapp_queue", + "user_notifications", + "cache_invalidation", + "orders" + ], + "pattern": "/^[a-zA-Z_][a-zA-Z0-9_]*$/", "type": "string" }, "type": { @@ -2217,12 +2231,6 @@ } }, "properties": { - "_modelExplicit": { - "default": false, - "description": "True when `model` was explicitly provided in the raw configuration.\nStores that use the default model (RegistryEntry) without explicit\nconfiguration will not claim any model hierarchy.", - "internal": true, - "type": "boolean" - }, "additionalModels": { "default": [], "description": "Additional models\n\nAllow this store to manage other models", @@ -2300,7 +2308,14 @@ "type": "boolean" }, "viewPrefix": { + "default": "", "description": "View name prefix", + "example": [ + "app_", + "v_", + "view_" + ], + "pattern": "/^[a-zA-Z_][a-zA-Z0-9_]*$/", "type": "string" }, "views": { @@ -2375,5 +2390,5 @@ } }, "behaviors": {}, - "sourceDigest": "29d5516c000ff464267b110fae968aa0" + "sourceDigest": "b978479cb9042a8fefb9146a86c95666" } \ No newline at end of file