Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions migrations/1790000000000_contract-events-type-created-at-index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* @type {import('node-pg-migrate').ColumnDefinitions | undefined}
*/
export const shorthands = undefined;

// Issue #2: CREATE INDEX CONCURRENTLY cannot run inside a transaction, and
// node-pg-migrate wraps each migration in a transaction by default. Disabling
// the transaction wrapper lets the CONCURRENTLY clause survive while still
// going through the migrate:up tooling.
export const disableTransaction = true;

/**
* @param pgm {import('node-pg-migrate').MigrationBuilder}
* @returns {Promise<void> | void}
*/
export const up = (pgm) => {
// The orphaned src/db/migrations SQL targeted `loan_events`, but as of
// 1788000000019_unified-contract-events the loan_events relation is a
// backward-compatibility view over contract_events. CREATE INDEX
// CONCURRENTLY cannot index a view, so we target the underlying table
// directly — the index name follows the contract_events_* convention
// the rest of that migration established.
pgm.sql(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_contract_events_type_created_at
ON contract_events (event_type, created_at);`,
);
};

/**
* @param pgm {import('node-pg-migrate').MigrationBuilder}
* @returns {Promise<void> | void}
*/
export const down = (pgm) => {
pgm.sql(
"DROP INDEX CONCURRENTLY IF EXISTS idx_contract_events_type_created_at;",
);
};

This file was deleted.

131 changes: 98 additions & 33 deletions src/middleware/idempotency.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,29 @@ const IDEMPOTENCY_TTL = 24 * 60 * 60; // 24 hours in seconds

interface CachedResponse {
status: number;
body: any;
body: unknown;
inProgress?: boolean;
}

/**
* Middleware to handle Idempotency-Key headers.
* If the key is present and a cached response exists, it returns the cached response.
* Otherwise, it intercepts the response, captures it, and stores it in Redis.
*
* Reserves the key with an atomic SET NX *before* the downstream handler
* runs, so two concurrent requests sharing the same key can't both miss the
* cache check and submit the same money-moving transaction twice. The race
* window in the original implementation — which only wrote the cache on the
* `res.on("finish")` callback — is closed by writing an `inProgress`
* placeholder up front and overwriting it with the real response when the
* handler completes.
*
* Behaviour:
* * No `Idempotency-Key` header → forward.
* * Key reserved successfully → handler runs, the response body is cached
* on finish with `X-Idempotency-Cache: STORED`.
* * Key already cached as a finished response → return the cached body
* with `X-Idempotency-Cache: HIT`.
* * Key already reserved but the first request is still in flight → 409
* with `X-Idempotency-Cache: IN_PROGRESS`.
*/
export const idempotencyMiddleware = async (
req: Request,
Expand All @@ -25,38 +41,84 @@ export const idempotencyMiddleware = async (
return next();
}

const cacheKey = `idemp:${key}`;

try {
const cacheKey = `idemp:${key}`;
const cached = await cacheService.get<CachedResponse>(cacheKey);
// ── Race-free key reservation ──────────────────────────────────────────
// Try to claim the key first. If we lose the race, fall back to reading
// whatever the first request stored (which is guaranteed to exist because
// the winner has written at least the in-progress placeholder).
const reserved = await cacheService.setNotExists(
cacheKey,
{ status: 0, body: null, inProgress: true } satisfies CachedResponse,
IDEMPOTENCY_TTL,
);

if (!reserved) {
const cached = await cacheService.get<CachedResponse>(cacheKey);

// The setNX call failed but the read came back empty — almost certainly
// a transient cache miss between the failed reserve and the read.
// Treat it as "in progress" so the client retries rather than letting
// the handler run uncoordinated.
if (!cached) {
logger.warn(
`Idempotency reservation lost the race but no cached value found; treating as in-progress`,
{ key, url: req.originalUrl, method: req.method },
);
res
.status(409)
.set("X-Idempotency-Cache", "IN_PROGRESS")
.json({
error: "request_in_progress",
message:
"Another request with this Idempotency-Key is still being processed.",
});
return;
}

if (cached.inProgress) {
logger.info(`Idempotency in-progress for key: ${key}`, {
url: req.originalUrl,
method: req.method,
});
res
.status(409)
.set("X-Idempotency-Cache", "IN_PROGRESS")
.json({
error: "request_in_progress",
message:
"Another request with this Idempotency-Key is still being processed.",
});
return;
}

if (cached) {
logger.info(`Idempotency hit for key: ${key}`, {
url: req.originalUrl,
method: req.method,
});

res
.status(cached.status)
.set("X-Idempotency-Cache", "HIT")
.json(cached.body);
return;
}

// Capture the original methods to intercept the response body
const originalJson = res.json;
const originalSend = res.send;
// ── Intercept response body to overwrite the placeholder on finish ────
const originalJson = res.json.bind(res);
const originalSend = res.send.bind(res);

let responseBody: any;
let responseBody: unknown;
let bodyCaptured = false;

// Override res.json
res.json = function (body: any) {
res.json = function (body: unknown) {
responseBody = body;
return originalJson.call(this, body);
bodyCaptured = true;
return originalJson(body);
};

// Override res.send (as res.json eventually calls res.send)
res.send = function (body: any) {
if (!responseBody) {
res.send = function (body: unknown) {
if (!bodyCaptured) {
if (typeof body === "string") {
try {
responseBody = JSON.parse(body);
Expand All @@ -66,27 +128,30 @@ export const idempotencyMiddleware = async (
} else {
responseBody = body;
}
bodyCaptured = true;
}
return originalSend.call(this, body);
return originalSend(body);
};

// Store the response in cache once the request is finished
res.on("finish", async () => {
// Only cache 2xx and 4xx status codes.
// 5xx errors should usually be retried without returning a cached failure.
if (res.statusCode >= 200 && res.statusCode < 500 && responseBody) {
try {
await cacheService.set(
cacheKey,
{
status: res.statusCode,
body: responseBody,
},
IDEMPOTENCY_TTL,
);
} catch (error) {
logger.error(`Error caching idempotency key ${key}`, { error });
try {
// Only cache 2xx and 4xx so retries on 5xx are not poisoned with a
// stale failure. On 5xx, drop the in-progress placeholder so future
// requests with the same key can try again.
if (res.statusCode >= 500) {
await cacheService.delete(cacheKey);
return;
}
await cacheService.set(
cacheKey,
{
status: res.statusCode,
body: responseBody ?? null,
} satisfies CachedResponse,
IDEMPOTENCY_TTL,
);
} catch (error) {
logger.error(`Error caching idempotency key ${key}`, { error });
}
});

Expand Down
Loading