Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions packages/postgres/src/pgpool.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { suite, test } from "@webda/test";
import * as assert from "node:assert";
import { __resetPools, acquirePool, poolKey, releasePool } from "./pgpool.js";

const cfg = {
host: "localhost",
user: "webda.io",
database: "webda.io",
password: "webda.io",
statement_timeout: 60000,
max: 1
};

@suite
class PgPoolTest {
async afterEach() {
await __resetPools();
}

@test
async sameConfigReturnsSamePool() {
const a = acquirePool(cfg);
const b = acquirePool(cfg);
assert.strictEqual(a, b, "same config must return the same pool instance");
await releasePool(cfg);
await releasePool(cfg);
}

@test
async keyIsOrderInsensitive() {
const k1 = poolKey({ host: "h", user: "u", database: "d" });
const k2 = poolKey({ database: "d", host: "h", user: "u" });
assert.strictEqual(k1, k2, "key order should not affect the registry hash");
}

@test
async differentConfigReturnsDifferentPool() {
const a = acquirePool(cfg);
const b = acquirePool({ ...cfg, database: "other" });
assert.notStrictEqual(a, b, "different configs must produce distinct pools");
await releasePool(cfg);
await releasePool({ ...cfg, database: "other" });
}

@test
async refcountKeepsPoolAliveUntilLastRelease() {
const a = acquirePool(cfg);
acquirePool(cfg); // refcount = 2

await releasePool(cfg); // refcount = 1, pool still alive
// Re-acquiring after partial release returns the same pool.
const c = acquirePool(cfg);
assert.strictEqual(a, c, "pool should still be alive while refcount > 0");

await releasePool(cfg); // refcount = 1
await releasePool(cfg); // refcount = 0, pool ends

// After full release, a fresh acquisition produces a new pool.
const d = acquirePool(cfg);
assert.notStrictEqual(a, d, "after final release a new pool must be created");
await releasePool(cfg);
}

@test
async releasingUnknownConfigIsNoop() {
// Should not throw — `stop()` paths call this unconditionally.
await releasePool({ host: "never-acquired", database: "x" });
}
}
114 changes: 114 additions & 0 deletions packages/postgres/src/pgpool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import pg, { PoolConfig } from "pg";

/**
* Stable JSON serializer that sorts object keys so two configs with the
* same content but different key order hash to the same registry key.
*
* Functions and classes (e.g. `pg.Pool` custom types) are dropped — pools
* configured with non-serializable hooks must be created directly.
*
* @param value - any JSON-serializable value
* @returns a stable string representation
*/
function stableStringify(value: any): string {
if (value === null || typeof value !== "object") {
return JSON.stringify(value);
}
if (Array.isArray(value)) {
return "[" + value.map(stableStringify).join(",") + "]";
}
const keys = Object.keys(value).sort();
return (
"{" +
keys
.map(k => {
const v = (value as any)[k];
if (typeof v === "function") return undefined;
return JSON.stringify(k) + ":" + stableStringify(v);
})
.filter(s => s !== undefined)
.join(",") +
"}"
);
}

interface PoolEntry {
pool: pg.Pool;
refCount: number;
}

const registry = new Map<string, PoolEntry>();

/**
* Build the registry key for a config. Exposed for tests.
*
* @param config - the pg pool config (may be undefined — pg falls back to PG* env)
* @returns the registry key
*/
export function poolKey(config: PoolConfig | undefined): string {
return stableStringify(config ?? {});
}

/**
* Acquire a shared `pg.Pool` for the given config. Pools are cached by
* stable config-hash; callers that pass the same config (regardless of
* key order) get the same pool back. Each `acquirePool` increments a
* refcount; the pool is `end()`ed when the last consumer calls
* {@link releasePool}.
*
* Use this for stateless workloads (queries, queue receive, pub/sub
* publish). LISTEN must use a dedicated `pg.Client` because the
* subscription is bound to a single connection — pooled connections
* rotate and would silently drop the subscription.
*
* @param config - the pg pool config
* @returns the shared pool
*/
export function acquirePool(config: PoolConfig | undefined): pg.Pool {
const key = poolKey(config);
const entry = registry.get(key);
if (entry) {
entry.refCount++;
return entry.pool;
}
const pool = new pg.Pool(config);
registry.set(key, { pool, refCount: 1 });
return pool;
}

/**
* Release a previously-acquired pool. When the refcount reaches zero,
* the pool is `end()`ed and removed from the registry. Releasing a
* config that was never acquired (or already drained) is a no-op so
* `stop()` paths can call this unconditionally.
*
* @param config - the same config that was passed to {@link acquirePool}
* @returns a promise that resolves once the pool has been ended (or immediately if still in use / unknown)
*/
export async function releasePool(config: PoolConfig | undefined): Promise<void> {
const key = poolKey(config);
const entry = registry.get(key);
if (!entry) return;
entry.refCount--;
if (entry.refCount > 0) return;
registry.delete(key);
await entry.pool.end().catch(() => {
/* already ended */
});
}

/**
* Test-only: drain and forget every shared pool. Used between specs to
* keep the registry from leaking handles across files.
*/
export async function __resetPools(): Promise<void> {
const entries = [...registry.values()];
registry.clear();
await Promise.all(
entries.map(e =>
e.pool.end().catch(() => {
/* already ended */
})
)
);
}
32 changes: 29 additions & 3 deletions packages/postgres/src/postgrespubsub.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { PubSubService, ServiceParameters } from "@webda/core";
import { CancelablePromise, JSONUtils } from "@webda/utils";
import { useLog } from "@webda/workout";
import pg, { ClientConfig } from "pg";
import pg, { ClientConfig, PoolConfig } from "pg";
import { acquirePool, releasePool } from "./pgpool.js";

/**
* NOTIFY's payload limit is 8000 bytes (the underlying NAMEDATALEN /
Expand All @@ -17,6 +18,11 @@ export class PostgresPubSubParameters extends ServiceParameters {
/**
* Channel name passed to LISTEN / NOTIFY. Must be a valid Postgres
* identifier (lowercased, no quoting). Defaults to the service name.
* @pattern /^[a-z_][a-z0-9_]*$/
* @example "myapp_events"
* @example "user_notifications"
* @example "cache_invalidation"
* @example "orders"
*/
channel?: string;
/**
Expand Down Expand Up @@ -68,9 +74,18 @@ export default class PostgresPubSubService<
K extends PostgresPubSubParameters = PostgresPubSubParameters
> extends PubSubService<T, K> {
/**
* Long-lived listener client. One per service instance.
* Long-lived listener client. One per service instance because LISTEN is
* scoped to the connection that issued it — a pool would silently rotate
* and drop the subscription.
*/
protected client?: pg.Client;
/**
* Shared pool used for the publish side (`pg_notify`). Acquired from
* the per-config pool registry so multiple services that target the
* same database share connections instead of each spinning up their
* own pool. Subscribe-only services skip this and never publish.
*/
protected publishPool?: pg.Pool;
/**
* Local callback registrations. Notifications dispatch to all of them.
*/
Expand Down Expand Up @@ -180,7 +195,14 @@ export default class PostgresPubSubService<
);
}
this.metrics?.messages_sent?.inc();
await this.client.query("SELECT pg_notify($1, $2)", [this.channel(), payload]);
// Lazily acquire a shared publish pool: subscribe-only services never
// call sendMessage and so never pay the connection cost. Acquired
// pools are reference-counted in the registry, so multiple postgres
// services pointing at the same database share connections.
if (!this.publishPool) {
this.publishPool = acquirePool(this.parameters.postgresqlServer as PoolConfig);
}
await this.publishPool.query("SELECT pg_notify($1, $2)", [this.channel(), payload]);
}

/**
Expand Down Expand Up @@ -232,6 +254,10 @@ export default class PostgresPubSubService<
/* already ended */
});
}
if (this.publishPool) {
this.publishPool = undefined;
await releasePool(this.parameters.postgresqlServer as PoolConfig);
}
await super.stop();
}
}
Expand Down
30 changes: 21 additions & 9 deletions packages/postgres/src/postgresqueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { MessageReceipt, Queue, QueueParameters } from "@webda/core";
import { JSONUtils } from "@webda/utils";
import { useLog } from "@webda/workout";
import pg, { ClientConfig, PoolConfig } from "pg";
import { acquirePool, releasePool } from "./pgpool.js";

/**
* Configuration for {@link PostgresQueueService}.
Expand All @@ -11,6 +12,11 @@ export class PostgresQueueParameters extends QueueParameters {
* Table name backing the queue. Auto-created on init if missing.
*
* @default "webda_queue"
* @pattern /^[a-zA-Z_][a-zA-Z0-9_]*$/
* @example "myapp_queue"
* @example "user_notifications"
* @example "cache_invalidation"
* @example "orders"
*/
table?: string;
/**
Expand Down Expand Up @@ -88,6 +94,11 @@ export default class PostgresQueueService<
* locks rotate across connections and benefit from concurrency.
*/
protected client?: pg.Client | pg.Pool;
/**
* True when {@link client} is a shared pool acquired from the registry.
* Drives the matching {@link releasePool} call in {@link stop}.
*/
protected ownsPool = false;

/**
* Resolved table identifier. Validated at init to keep the table name
Expand All @@ -107,10 +118,11 @@ export default class PostgresQueueService<
if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(this.table)) {
throw new Error(`Invalid table name "${this.table}" — must match /^[a-zA-Z_][a-zA-Z0-9_]*$/`);
}
this.client = this.parameters.usePool
? new pg.Pool(this.parameters.postgresqlServer as PoolConfig)
: new pg.Client(this.parameters.postgresqlServer as ClientConfig);
if (this.client instanceof pg.Client) {
if (this.parameters.usePool) {
this.client = acquirePool(this.parameters.postgresqlServer as PoolConfig);
this.ownsPool = true;
} else {
this.client = new pg.Client(this.parameters.postgresqlServer as ClientConfig);
await this.client.connect();
}
if (this.parameters.autoCreateTable) {
Expand Down Expand Up @@ -241,13 +253,13 @@ export default class PostgresQueueService<
async stop(): Promise<void> {
if (this.client) {
const client = this.client;
const owned = this.ownsPool;
this.client = undefined;
if (client instanceof pg.Pool) {
await client.end().catch(() => {
/* already ended */
});
this.ownsPool = false;
if (owned) {
await releasePool(this.parameters.postgresqlServer as PoolConfig);
} else {
await client.end().catch(() => {
await (client as pg.Client).end().catch(() => {
/* already ended */
});
}
Expand Down
30 changes: 29 additions & 1 deletion packages/postgres/src/postgresstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { InstanceCache, useApplication, useCore, useModel, useModelMetadata } fr
import type { ModelClass, Repository } from "@webda/core";
import { JSONSchema7 } from "json-schema";
import pg, { ClientConfig, PoolConfig } from "pg";
import { acquirePool, releasePool } from "./pgpool.js";
import { PostgresRepository, SQLStore, SQLStoreParameters } from "./sqlstore.js";
import { useLog } from "@webda/workout";

Expand Down Expand Up @@ -35,6 +36,11 @@ export class PostgresParameters extends SQLStoreParameters {
autoCreateTable?: boolean;
/**
* View name prefix
* @default ""
* @pattern /^[a-zA-Z_][a-zA-Z0-9_]*$/
* @example "app_"
* @example "v_"
* @example "view_"
*/
viewPrefix?: string;
/**
Expand Down Expand Up @@ -85,13 +91,19 @@ export class PostgresParameters extends SQLStoreParameters {
*/
export class PostgresStore<K extends PostgresParameters = PostgresParameters> extends SQLStore<K> {
client: pg.Client | pg.Pool;
/**
* True when {@link client} is a shared pool acquired from the registry.
* Tracked so {@link stop} only releases pools it took a refcount on.
*/
protected ownsPool = false;

/**
* @override
*/
async init(): Promise<this> {
if (this.parameters.usePool) {
this.client = new pg.Pool(this.parameters.postgresqlServer);
this.client = acquirePool(this.parameters.postgresqlServer as PoolConfig);
this.ownsPool = true;
} else {
this.client = new pg.Client(this.parameters.postgresqlServer);
await this.client.connect();
Expand All @@ -101,6 +113,22 @@ export class PostgresStore<K extends PostgresParameters = PostgresParameters> ex
return this;
}

/**
* @override
*/
async stop(): Promise<void> {
if (this.ownsPool) {
this.ownsPool = false;
await releasePool(this.parameters.postgresqlServer as PoolConfig);
} else if (this.client && "end" in this.client) {
await (this.client as pg.Client).end().catch(() => {
/* already ended */
});
}
this.client = undefined as any;
await super.stop();
}

/**
* Resolve the table name for a given model class.
*
Expand Down
Loading
Loading