From 36270dcbd527128df8a6a1f640caffd7a9c0bb6f Mon Sep 17 00:00:00 2001 From: Akpolo Ogagaoghene Prince Date: Fri, 19 Jun 2026 20:14:16 +0100 Subject: [PATCH] feat: add price alert webhooks for creator key thresholds Register one-shot price alerts that fire a callback when a creator key price crosses a target threshold during indexer trade processing. - POST /alerts registers an alert ({ creator_id, wallet_address, target_price, direction, callback_url }) and returns a unique alert ID - DELETE /alerts/:id cancels a pending alert before it fires - Alerts are evaluated on the indexer trade-event path: an `above` alert fires when the new price rises to/past the target, a `below` alert when it drops to/past the target; opposite movement does not fire - Successful delivery deletes the alert (one-shot); failed delivery is retried with exponential backoff up to WEBHOOK_RETRY_MAX_ATTEMPTS, then the alert is marked FAILED - Adds an Alert Prisma model + migration; reuses the trade-webhook delivery/retry conventions and is fanned out alongside webhook dispatch via a single processTradeEvent entry point Closes #423 --- prisma/schema/alert.prisma | 30 ++ .../migration.sql | 29 ++ src/modules/alerts/alert.controllers.ts | 65 ++++ src/modules/alerts/alert.integration.test.ts | 244 +++++++++++++++ src/modules/alerts/alert.router.ts | 9 + src/modules/alerts/alert.schemas.ts | 20 ++ src/modules/alerts/alert.service.test.ts | 278 ++++++++++++++++++ src/modules/alerts/alert.service.ts | 229 +++++++++++++++ src/modules/alerts/alert.types.ts | 42 +++ src/modules/alerts/index.ts | 3 + src/modules/index.ts | 2 + src/utils/trade-event.utils.ts | 41 +++ 12 files changed, 992 insertions(+) create mode 100644 prisma/schema/alert.prisma create mode 100644 prisma/schema/migrations/20260619000000_add_price_alerts/migration.sql create mode 100644 src/modules/alerts/alert.controllers.ts create mode 100644 src/modules/alerts/alert.integration.test.ts create mode 100644 src/modules/alerts/alert.router.ts create mode 100644 src/modules/alerts/alert.schemas.ts create mode 100644 src/modules/alerts/alert.service.test.ts create mode 100644 src/modules/alerts/alert.service.ts create mode 100644 src/modules/alerts/alert.types.ts create mode 100644 src/modules/alerts/index.ts create mode 100644 src/utils/trade-event.utils.ts diff --git a/prisma/schema/alert.prisma b/prisma/schema/alert.prisma new file mode 100644 index 0000000..81befcd --- /dev/null +++ b/prisma/schema/alert.prisma @@ -0,0 +1,30 @@ +// prisma/schema/alert.prisma + +enum AlertDirection { + ABOVE + BELOW +} + +enum AlertStatus { + PENDING + TRIGGERED + FAILED +} + +model Alert { + id String @id @default(cuid()) + creatorId String + walletAddress String + targetPrice Decimal @db.Decimal(38, 18) + direction AlertDirection + callbackUrl String + status AlertStatus @default(PENDING) + retryCount Int @default(0) + lastError String? + triggeredAt DateTime? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([creatorId]) + @@index([status]) +} diff --git a/prisma/schema/migrations/20260619000000_add_price_alerts/migration.sql b/prisma/schema/migrations/20260619000000_add_price_alerts/migration.sql new file mode 100644 index 0000000..8f3943a --- /dev/null +++ b/prisma/schema/migrations/20260619000000_add_price_alerts/migration.sql @@ -0,0 +1,29 @@ +-- CreateEnum +CREATE TYPE "AlertDirection" AS ENUM ('ABOVE', 'BELOW'); + +-- CreateEnum +CREATE TYPE "AlertStatus" AS ENUM ('PENDING', 'TRIGGERED', 'FAILED'); + +-- CreateTable +CREATE TABLE "Alert" ( + "id" TEXT NOT NULL, + "creatorId" TEXT NOT NULL, + "walletAddress" TEXT NOT NULL, + "targetPrice" DECIMAL(38,18) NOT NULL, + "direction" "AlertDirection" NOT NULL, + "callbackUrl" TEXT NOT NULL, + "status" "AlertStatus" NOT NULL DEFAULT 'PENDING', + "retryCount" INTEGER NOT NULL DEFAULT 0, + "lastError" TEXT, + "triggeredAt" TIMESTAMP(3), + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "Alert_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "Alert_creatorId_idx" ON "Alert"("creatorId"); + +-- CreateIndex +CREATE INDEX "Alert_status_idx" ON "Alert"("status"); diff --git a/src/modules/alerts/alert.controllers.ts b/src/modules/alerts/alert.controllers.ts new file mode 100644 index 0000000..9b42102 --- /dev/null +++ b/src/modules/alerts/alert.controllers.ts @@ -0,0 +1,65 @@ +import type { Request, Response } from 'express'; +import { + sendSuccess, + sendError, + sendValidationError, + sendNotFound, +} from '../../utils/api-response.utils'; +import { ErrorCode } from '../../constants/error.constants'; +import { CreateAlertSchema } from './alert.schemas'; +import * as alertService from './alert.service'; + +export async function registerAlertHandler( + req: Request, + res: Response +): Promise { + const parseResult = CreateAlertSchema.safeParse(req.body); + if (!parseResult.success) { + sendValidationError( + res, + 'Invalid alert registration data', + parseResult.error.issues.map((issue) => ({ + field: issue.path.join('.'), + message: issue.message, + })) + ); + return; + } + + try { + const result = await alertService.createAlert({ + creatorId: parseResult.data.creator_id, + walletAddress: parseResult.data.wallet_address, + targetPrice: parseResult.data.target_price, + direction: parseResult.data.direction, + callbackUrl: parseResult.data.callback_url, + }); + sendSuccess(res, result, 201, 'Alert registered successfully'); + } catch { + sendError(res, 500, ErrorCode.INTERNAL_ERROR, 'Failed to register alert'); + } +} + +export async function deleteAlertHandler( + req: Request, + res: Response +): Promise { + const rawAlertId = req.params.id; + const alertId = Array.isArray(rawAlertId) ? rawAlertId[0] : rawAlertId; + + if (!alertId) { + sendError(res, 400, ErrorCode.BAD_REQUEST, 'Missing alert ID in path'); + return; + } + + try { + const result = await alertService.deleteAlert(alertId); + if (!result) { + sendNotFound(res, 'Alert'); + return; + } + sendSuccess(res, result, 200, 'Alert cancelled successfully'); + } catch { + sendError(res, 500, ErrorCode.INTERNAL_ERROR, 'Failed to cancel alert'); + } +} diff --git a/src/modules/alerts/alert.integration.test.ts b/src/modules/alerts/alert.integration.test.ts new file mode 100644 index 0000000..f9022d1 --- /dev/null +++ b/src/modules/alerts/alert.integration.test.ts @@ -0,0 +1,244 @@ +import supertest from 'supertest'; +import { Prisma } from '@prisma/client'; +import { Keypair } from '@stellar/stellar-base'; + +// Mock Prisma so the integration test exercises the full HTTP + dispatch path +// without requiring a live database, matching the suite's mocking conventions. +jest.mock('../../utils/prisma.utils', () => ({ + prisma: { + alert: { + create: jest.fn(), + findFirst: jest.fn(), + findMany: jest.fn(), + delete: jest.fn(), + update: jest.fn(), + }, + }, +})); + +jest.mock('../../utils/logger.utils', () => ({ + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, +})); + +import app from '../../app'; +import { prisma } from '../../utils/prisma.utils'; +import { evaluateTradeForAlerts } from './alert.service'; +import { envConfig } from '../../config'; + +const mockPrisma = prisma as unknown as { + alert: { + create: jest.Mock; + findFirst: jest.Mock; + findMany: jest.Mock; + delete: jest.Mock; + update: jest.Mock; + }; +}; + +const walletAddress = Keypair.random().publicKey(); + +function decimal(v: string): Prisma.Decimal { + return new Prisma.Decimal(v); +} + +beforeEach(() => { + jest.clearAllMocks(); +}); + +describe('POST /api/v1/alerts', () => { + it('registers an alert and returns a unique alert ID', async () => { + mockPrisma.alert.create.mockResolvedValue({ + id: 'alert-generated-id', + creatorId: 'creator-1', + walletAddress, + targetPrice: decimal('15'), + direction: 'ABOVE', + callbackUrl: 'https://example.com/hook', + status: 'PENDING', + createdAt: new Date(), + }); + + const res = await supertest(app) + .post('/api/v1/alerts') + .send({ + creator_id: 'creator-1', + wallet_address: walletAddress, + target_price: '15', + direction: 'above', + callback_url: 'https://example.com/hook', + }); + + expect(res.status).toBe(201); + expect(res.body.success).toBe(true); + expect(res.body.data.id).toBe('alert-generated-id'); + expect(res.body.data.direction).toBe('above'); + }); + + it('returns 400 on invalid body (bad direction / url / price)', async () => { + const res = await supertest(app) + .post('/api/v1/alerts') + .send({ + creator_id: 'creator-1', + wallet_address: walletAddress, + target_price: '-5', + direction: 'sideways', + callback_url: 'not-a-url', + }); + + expect(res.status).toBe(400); + expect(mockPrisma.alert.create).not.toHaveBeenCalled(); + }); +}); + +describe('DELETE /api/v1/alerts/:id', () => { + it('cancels a pending alert before it fires', async () => { + mockPrisma.alert.findFirst.mockResolvedValue({ id: 'alert-1', status: 'PENDING' }); + mockPrisma.alert.delete.mockResolvedValue({ id: 'alert-1' }); + + const res = await supertest(app).delete('/api/v1/alerts/alert-1'); + + expect(res.status).toBe(200); + expect(res.body.success).toBe(true); + expect(mockPrisma.alert.delete).toHaveBeenCalledWith({ where: { id: 'alert-1' } }); + }); + + it('returns 404 for a non-existent alert', async () => { + mockPrisma.alert.findFirst.mockResolvedValue(null); + + const res = await supertest(app).delete('/api/v1/alerts/missing-id'); + + expect(res.status).toBe(404); + }); +}); + +describe('alert trigger evaluation', () => { + afterEach(() => { + jest.restoreAllMocks(); + }); + + it('fires on an above trigger and deletes the alert (one-shot)', async () => { + mockPrisma.alert.findMany.mockResolvedValue([ + { + id: 'alert-above', + creatorId: 'creator-1', + walletAddress, + targetPrice: decimal('10'), + direction: 'ABOVE', + callbackUrl: 'https://example.com/hook', + status: 'PENDING', + }, + ]); + mockPrisma.alert.delete.mockResolvedValue({ id: 'alert-above' }); + const mockFetch = jest.fn().mockResolvedValue({ ok: true, status: 200, statusText: 'OK' }); + (global.fetch as jest.Mock) = mockFetch; + + await evaluateTradeForAlerts({ + creatorId: 'creator-1', + price: '11', + timestamp: new Date().toISOString(), + }); + + expect(mockFetch).toHaveBeenCalledTimes(1); + expect(mockPrisma.alert.delete).toHaveBeenCalledWith({ where: { id: 'alert-above' } }); + }); + + it('fires on a below trigger and deletes the alert (one-shot)', async () => { + mockPrisma.alert.findMany.mockResolvedValue([ + { + id: 'alert-below', + creatorId: 'creator-1', + walletAddress, + targetPrice: decimal('10'), + direction: 'BELOW', + callbackUrl: 'https://example.com/hook', + status: 'PENDING', + }, + ]); + mockPrisma.alert.delete.mockResolvedValue({ id: 'alert-below' }); + const mockFetch = jest.fn().mockResolvedValue({ ok: true, status: 200, statusText: 'OK' }); + (global.fetch as jest.Mock) = mockFetch; + + await evaluateTradeForAlerts({ + creatorId: 'creator-1', + price: '9', + timestamp: new Date().toISOString(), + }); + + expect(mockFetch).toHaveBeenCalledTimes(1); + expect(mockPrisma.alert.delete).toHaveBeenCalledWith({ where: { id: 'alert-below' } }); + }); + + it('does not fire when price moves in the opposite direction', async () => { + mockPrisma.alert.findMany.mockResolvedValue([ + { + id: 'alert-above', + creatorId: 'creator-1', + walletAddress, + targetPrice: decimal('10'), + direction: 'ABOVE', + callbackUrl: 'https://example.com/hook', + status: 'PENDING', + }, + ]); + const mockFetch = jest.fn(); + (global.fetch as jest.Mock) = mockFetch; + + await evaluateTradeForAlerts({ + creatorId: 'creator-1', + price: '5', + timestamp: new Date().toISOString(), + }); + + expect(mockFetch).not.toHaveBeenCalled(); + expect(mockPrisma.alert.delete).not.toHaveBeenCalled(); + }); + + describe('failed delivery retry', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + it('retries failed delivery up to 3 times then marks the alert failed', async () => { + mockPrisma.alert.findMany.mockResolvedValue([ + { + id: 'alert-fail', + creatorId: 'creator-1', + walletAddress, + targetPrice: decimal('10'), + direction: 'ABOVE', + callbackUrl: 'https://nonexistent.example.com/fail', + status: 'PENDING', + }, + ]); + mockPrisma.alert.update.mockResolvedValue({}); + const mockFetch = jest.fn().mockRejectedValue(new Error('Network error')); + (global.fetch as jest.Mock) = mockFetch; + + const promise = evaluateTradeForAlerts({ + creatorId: 'creator-1', + price: '20', + timestamp: new Date().toISOString(), + }); + + for (let i = 0; i < envConfig.WEBHOOK_RETRY_MAX_ATTEMPTS; i++) { + await jest.advanceTimersByTimeAsync( + Math.pow(2, i) * envConfig.WEBHOOK_RETRY_BASE_DELAY_MS + ); + } + + await promise; + + expect(mockFetch).toHaveBeenCalledTimes(envConfig.WEBHOOK_RETRY_MAX_ATTEMPTS); + expect(mockPrisma.alert.delete).not.toHaveBeenCalled(); + expect(mockPrisma.alert.update).toHaveBeenLastCalledWith( + expect.objectContaining({ + data: expect.objectContaining({ status: 'FAILED' }), + }) + ); + }); + }); +}); diff --git a/src/modules/alerts/alert.router.ts b/src/modules/alerts/alert.router.ts new file mode 100644 index 0000000..5b3ac56 --- /dev/null +++ b/src/modules/alerts/alert.router.ts @@ -0,0 +1,9 @@ +import { Router } from 'express'; +import { registerAlertHandler, deleteAlertHandler } from './alert.controllers'; + +const router = Router(); + +router.post('/', registerAlertHandler); +router.delete('/:id', deleteAlertHandler); + +export default router; diff --git a/src/modules/alerts/alert.schemas.ts b/src/modules/alerts/alert.schemas.ts new file mode 100644 index 0000000..89e6443 --- /dev/null +++ b/src/modules/alerts/alert.schemas.ts @@ -0,0 +1,20 @@ +import { z } from 'zod'; +import { StellarAddressSchema } from '../wallet/wallet.schemas'; + +export const AlertDirectionEnum = z.enum(['above', 'below']); + +export const CreateAlertSchema = z.object({ + creator_id: z.string().min(1, 'creator_id is required'), + wallet_address: StellarAddressSchema, + target_price: z + .union([z.string(), z.number()]) + .transform((v) => String(v)) + .refine((v) => { + const n = Number(v); + return Number.isFinite(n) && n > 0; + }, 'target_price must be a positive number'), + direction: AlertDirectionEnum, + callback_url: z.string().url('callback_url must be a valid URL'), +}); + +export type CreateAlertType = z.infer; diff --git a/src/modules/alerts/alert.service.test.ts b/src/modules/alerts/alert.service.test.ts new file mode 100644 index 0000000..0aa58e2 --- /dev/null +++ b/src/modules/alerts/alert.service.test.ts @@ -0,0 +1,278 @@ +import { Prisma } from '@prisma/client'; +import { prisma } from '../../utils/prisma.utils'; +import { envConfig } from '../../config'; +import * as alertService from './alert.service'; + +jest.mock('../../utils/prisma.utils', () => ({ + prisma: { + alert: { + create: jest.fn(), + findFirst: jest.fn(), + findMany: jest.fn(), + delete: jest.fn(), + update: jest.fn(), + }, + }, +})); + +jest.mock('../../utils/logger.utils', () => ({ + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, +})); + +const mockPrisma = prisma as unknown as { + alert: { + create: jest.Mock; + findFirst: jest.Mock; + findMany: jest.Mock; + delete: jest.Mock; + update: jest.Mock; + }; +}; + +function decimal(v: string): Prisma.Decimal { + return new Prisma.Decimal(v); +} + +beforeEach(() => { + jest.clearAllMocks(); +}); + +describe('createAlert', () => { + it('creates an alert and returns a unique ID with normalized fields', async () => { + mockPrisma.alert.create.mockResolvedValue({ + id: 'alert-1', + creatorId: 'creator-1', + walletAddress: 'GABC', + targetPrice: decimal('10.5'), + direction: 'ABOVE', + callbackUrl: 'https://example.com/hook', + status: 'PENDING', + createdAt: new Date(), + }); + + const result = await alertService.createAlert({ + creatorId: 'creator-1', + walletAddress: 'GABC', + targetPrice: '10.5', + direction: 'above', + callbackUrl: 'https://example.com/hook', + }); + + expect(result.id).toBe('alert-1'); + expect(result.direction).toBe('above'); + expect(result.status).toBe('pending'); + expect(result.targetPrice).toBe('10.5'); + expect(mockPrisma.alert.create).toHaveBeenCalledTimes(1); + }); +}); + +describe('deleteAlert', () => { + it('cancels a pending alert', async () => { + mockPrisma.alert.findFirst.mockResolvedValue({ id: 'alert-1', status: 'PENDING' }); + mockPrisma.alert.delete.mockResolvedValue({ id: 'alert-1' }); + + const result = await alertService.deleteAlert('alert-1'); + + expect(result).toEqual({ id: 'alert-1' }); + expect(mockPrisma.alert.findFirst).toHaveBeenCalledWith({ + where: { id: 'alert-1', status: 'PENDING' }, + }); + expect(mockPrisma.alert.delete).toHaveBeenCalledWith({ where: { id: 'alert-1' } }); + }); + + it('returns null for a non-existent or already-fired alert', async () => { + mockPrisma.alert.findFirst.mockResolvedValue(null); + + const result = await alertService.deleteAlert('alert-1'); + + expect(result).toBeNull(); + expect(mockPrisma.alert.delete).not.toHaveBeenCalled(); + }); +}); + +describe('evaluateTradeForAlerts — threshold crossing', () => { + beforeEach(() => { + global.fetch = jest.fn(); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + it('fires an ABOVE alert when price rises past the target', async () => { + mockPrisma.alert.findMany.mockResolvedValue([ + { + id: 'alert-above', + creatorId: 'creator-1', + walletAddress: 'GABC', + targetPrice: decimal('10'), + direction: 'ABOVE', + callbackUrl: 'https://example.com/hook', + status: 'PENDING', + }, + ]); + const mockFetch = jest.fn().mockResolvedValue({ ok: true, status: 200, statusText: 'OK' }); + (global.fetch as jest.Mock) = mockFetch; + mockPrisma.alert.delete.mockResolvedValue({ id: 'alert-above' }); + + await alertService.evaluateTradeForAlerts({ + creatorId: 'creator-1', + price: '12', + timestamp: new Date().toISOString(), + }); + + expect(mockFetch).toHaveBeenCalledTimes(1); + const body = JSON.parse((mockFetch.mock.calls[0][1] as RequestInit).body as string); + expect(body).toMatchObject({ + creator_id: 'creator-1', + triggered_price: '12', + target_price: '10', + direction: 'above', + }); + expect(mockPrisma.alert.delete).toHaveBeenCalledWith({ where: { id: 'alert-above' } }); + }); + + it('fires a BELOW alert when price drops past the target', async () => { + mockPrisma.alert.findMany.mockResolvedValue([ + { + id: 'alert-below', + creatorId: 'creator-1', + walletAddress: 'GABC', + targetPrice: decimal('10'), + direction: 'BELOW', + callbackUrl: 'https://example.com/hook', + status: 'PENDING', + }, + ]); + const mockFetch = jest.fn().mockResolvedValue({ ok: true, status: 200, statusText: 'OK' }); + (global.fetch as jest.Mock) = mockFetch; + mockPrisma.alert.delete.mockResolvedValue({ id: 'alert-below' }); + + await alertService.evaluateTradeForAlerts({ + creatorId: 'creator-1', + price: '8', + timestamp: new Date().toISOString(), + }); + + expect(mockFetch).toHaveBeenCalledTimes(1); + const body = JSON.parse((mockFetch.mock.calls[0][1] as RequestInit).body as string); + expect(body.direction).toBe('below'); + expect(mockPrisma.alert.delete).toHaveBeenCalledWith({ where: { id: 'alert-below' } }); + }); + + it('does NOT fire an ABOVE alert when price moves the opposite direction', async () => { + mockPrisma.alert.findMany.mockResolvedValue([ + { + id: 'alert-above', + creatorId: 'creator-1', + walletAddress: 'GABC', + targetPrice: decimal('10'), + direction: 'ABOVE', + callbackUrl: 'https://example.com/hook', + status: 'PENDING', + }, + ]); + const mockFetch = jest.fn(); + (global.fetch as jest.Mock) = mockFetch; + + await alertService.evaluateTradeForAlerts({ + creatorId: 'creator-1', + price: '8', + timestamp: new Date().toISOString(), + }); + + expect(mockFetch).not.toHaveBeenCalled(); + expect(mockPrisma.alert.delete).not.toHaveBeenCalled(); + }); + + it('does NOT fire a BELOW alert when price moves the opposite direction', async () => { + mockPrisma.alert.findMany.mockResolvedValue([ + { + id: 'alert-below', + creatorId: 'creator-1', + walletAddress: 'GABC', + targetPrice: decimal('10'), + direction: 'BELOW', + callbackUrl: 'https://example.com/hook', + status: 'PENDING', + }, + ]); + const mockFetch = jest.fn(); + (global.fetch as jest.Mock) = mockFetch; + + await alertService.evaluateTradeForAlerts({ + creatorId: 'creator-1', + price: '12', + timestamp: new Date().toISOString(), + }); + + expect(mockFetch).not.toHaveBeenCalled(); + }); + + it('does nothing when no pending alerts exist for the creator', async () => { + mockPrisma.alert.findMany.mockResolvedValue([]); + const mockFetch = jest.fn(); + (global.fetch as jest.Mock) = mockFetch; + + await alertService.evaluateTradeForAlerts({ + creatorId: 'creator-1', + price: '99', + timestamp: new Date().toISOString(), + }); + + expect(mockFetch).not.toHaveBeenCalled(); + }); +}); + +describe('evaluateTradeForAlerts — delivery retry', () => { + beforeEach(() => { + jest.useFakeTimers(); + global.fetch = jest.fn(); + }); + + afterEach(() => { + jest.useRealTimers(); + jest.restoreAllMocks(); + }); + + it('retries delivery up to max attempts then marks the alert FAILED', async () => { + mockPrisma.alert.findMany.mockResolvedValue([ + { + id: 'alert-fail', + creatorId: 'creator-1', + walletAddress: 'GABC', + targetPrice: decimal('10'), + direction: 'ABOVE', + callbackUrl: 'https://nonexistent.example.com/fail', + status: 'PENDING', + }, + ]); + mockPrisma.alert.update.mockResolvedValue({}); + + const mockFetch = jest.fn().mockRejectedValue(new Error('Network error')); + (global.fetch as jest.Mock) = mockFetch; + + const promise = alertService.evaluateTradeForAlerts({ + creatorId: 'creator-1', + price: '12', + timestamp: new Date().toISOString(), + }); + + for (let i = 0; i < envConfig.WEBHOOK_RETRY_MAX_ATTEMPTS; i++) { + await jest.advanceTimersByTimeAsync( + Math.pow(2, i) * envConfig.WEBHOOK_RETRY_BASE_DELAY_MS + ); + } + + await promise; + + expect(mockFetch).toHaveBeenCalledTimes(envConfig.WEBHOOK_RETRY_MAX_ATTEMPTS); + expect(mockPrisma.alert.delete).not.toHaveBeenCalled(); + expect(mockPrisma.alert.update).toHaveBeenLastCalledWith( + expect.objectContaining({ + where: { id: 'alert-fail' }, + data: expect.objectContaining({ status: 'FAILED' }), + }) + ); + }); +}); diff --git a/src/modules/alerts/alert.service.ts b/src/modules/alerts/alert.service.ts new file mode 100644 index 0000000..d5be9cf --- /dev/null +++ b/src/modules/alerts/alert.service.ts @@ -0,0 +1,229 @@ +import { Prisma } from '@prisma/client'; +import { prisma } from '../../utils/prisma.utils'; +import { logger } from '../../utils/logger.utils'; +import { envConfig } from '../../config'; +import type { + AlertDirectionName, + AlertResponse, + AlertTradeEvent, + AlertTriggerPayload, + CreateAlertInput, +} from './alert.types'; + +type DbDirection = 'ABOVE' | 'BELOW'; +type DbStatus = 'PENDING' | 'TRIGGERED' | 'FAILED'; + +function toDbDirection(direction: AlertDirectionName): DbDirection { + return direction === 'above' ? 'ABOVE' : 'BELOW'; +} + +function fromDbDirection(direction: DbDirection): AlertDirectionName { + return direction === 'ABOVE' ? 'above' : 'below'; +} + +function fromDbStatus(status: DbStatus): AlertResponse['status'] { + switch (status) { + case 'TRIGGERED': + return 'triggered'; + case 'FAILED': + return 'failed'; + default: + return 'pending'; + } +} + +interface AlertRecord { + id: string; + creatorId: string; + walletAddress: string; + targetPrice: Prisma.Decimal; + direction: DbDirection; + callbackUrl: string; + status: DbStatus; + createdAt: Date; +} + +function toAlertResponse(alert: AlertRecord): AlertResponse { + return { + id: alert.id, + creatorId: alert.creatorId, + walletAddress: alert.walletAddress, + targetPrice: alert.targetPrice.toString(), + direction: fromDbDirection(alert.direction), + callbackUrl: alert.callbackUrl, + status: fromDbStatus(alert.status), + createdAt: alert.createdAt, + }; +} + +/** + * Registers a one-shot price alert and returns its unique ID. + */ +export async function createAlert( + input: CreateAlertInput +): Promise { + const alert = await prisma.alert.create({ + data: { + creatorId: input.creatorId, + walletAddress: input.walletAddress, + targetPrice: new Prisma.Decimal(input.targetPrice), + direction: toDbDirection(input.direction), + callbackUrl: input.callbackUrl, + }, + }); + + return toAlertResponse(alert as AlertRecord); +} + +/** + * Cancels a pending alert. Only alerts that have not already fired (PENDING) + * can be cancelled. Returns the deleted alert ID, or null when not found. + */ +export async function deleteAlert(alertId: string): Promise<{ id: string } | null> { + const alert = await prisma.alert.findFirst({ + where: { id: alertId, status: 'PENDING' }, + }); + + if (!alert) { + return null; + } + + await prisma.alert.delete({ where: { id: alertId } }); + return { id: alertId }; +} + +/** + * Returns true when the new price has crossed the registered threshold in the + * direction the alert was registered for. + * + * - `above`: fires when the new price is at or above the target. + * - `below`: fires when the new price is at or below the target. + */ +function crossesThreshold( + direction: DbDirection, + newPrice: number, + targetPrice: number +): boolean { + if (direction === 'ABOVE') { + return newPrice >= targetPrice; + } + return newPrice <= targetPrice; +} + +/** + * Evaluates all pending alerts for the creator against the new trade price and + * fires any whose threshold was crossed. + * + * Wire this into the indexer trade-event processing path alongside webhook + * dispatch — it processes the same trade event. + */ +export async function evaluateTradeForAlerts( + tradeEvent: AlertTradeEvent +): Promise { + const newPrice = Number(tradeEvent.price); + if (!Number.isFinite(newPrice)) { + logger.warn( + { creatorId: tradeEvent.creatorId, price: tradeEvent.price }, + 'Skipping alert evaluation: trade price is not a finite number' + ); + return; + } + + const alerts = await prisma.alert.findMany({ + where: { + creatorId: tradeEvent.creatorId, + status: 'PENDING', + }, + }); + + if (alerts.length === 0) return; + + for (const alert of alerts) { + const targetPrice = Number((alert.targetPrice as Prisma.Decimal).toString()); + if (!crossesThreshold(alert.direction as DbDirection, newPrice, targetPrice)) { + continue; + } + + const payload: AlertTriggerPayload = { + creator_id: alert.creatorId, + triggered_price: tradeEvent.price, + target_price: (alert.targetPrice as Prisma.Decimal).toString(), + direction: fromDbDirection(alert.direction as DbDirection), + timestamp: tradeEvent.timestamp, + }; + + await deliverAlert(alert.id, alert.callbackUrl, payload).catch((err) => { + logger.error( + { alertId: alert.id, error: err instanceof Error ? err.message : String(err) }, + 'Alert delivery failed unexpectedly' + ); + }); + } +} + +/** + * Delivers a triggered alert to its callback URL via HTTP POST. + * + * On success the alert is deleted (one-shot). On failure the delivery is retried + * up to `WEBHOOK_RETRY_MAX_ATTEMPTS` times with exponential backoff (with + * jitter); after the final failure the alert is marked FAILED. + */ +async function deliverAlert( + alertId: string, + callbackUrl: string, + payload: AlertTriggerPayload, + attempt = 1 +): Promise { + const maxAttempts = envConfig.WEBHOOK_RETRY_MAX_ATTEMPTS; + const baseDelayMs = envConfig.WEBHOOK_RETRY_BASE_DELAY_MS; + + try { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), 5000); + + const response = await fetch(callbackUrl, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(payload), + signal: controller.signal, + }); + + clearTimeout(timeout); + + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + + // One-shot: a successfully delivered alert is removed. + await prisma.alert.delete({ where: { id: alertId } }); + logger.info({ alertId, attempt }, 'Alert delivered and deleted'); + return; + } catch (error) { + const errMsg = error instanceof Error ? error.message : 'Unknown error'; + + await prisma.alert.update({ + where: { id: alertId }, + data: { retryCount: attempt, lastError: errMsg }, + }); + + if (attempt < maxAttempts) { + const delay = Math.pow(2, attempt - 1) * baseDelayMs; + logger.warn( + { alertId, attempt, maxAttempts, nextRetryMs: delay, error: errMsg }, + 'Alert delivery failed, retrying' + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + return deliverAlert(alertId, callbackUrl, payload, attempt + 1); + } + + await prisma.alert.update({ + where: { id: alertId }, + data: { status: 'FAILED', retryCount: attempt, triggeredAt: new Date() }, + }); + + logger.error( + { alertId, attempt, maxAttempts, error: errMsg }, + 'Alert delivery exhausted all retries, marked failed' + ); + } +} diff --git a/src/modules/alerts/alert.types.ts b/src/modules/alerts/alert.types.ts new file mode 100644 index 0000000..ba5a7c7 --- /dev/null +++ b/src/modules/alerts/alert.types.ts @@ -0,0 +1,42 @@ +export type AlertDirectionName = 'above' | 'below'; + +export interface CreateAlertInput { + creatorId: string; + walletAddress: string; + targetPrice: string; + direction: AlertDirectionName; + callbackUrl: string; +} + +export interface AlertResponse { + id: string; + creatorId: string; + walletAddress: string; + targetPrice: string; + direction: AlertDirectionName; + callbackUrl: string; + status: 'pending' | 'triggered' | 'failed'; + createdAt: Date; +} + +export interface AlertTriggerPayload { + creator_id: string; + triggered_price: string; + target_price: string; + direction: AlertDirectionName; + timestamp: string; +} + +/** + * Minimal trade-event shape required to evaluate price-alert thresholds. + * + * Mirrors the relevant fields of the trade-webhook `TradeEvent` so alert + * evaluation can be wired into the same indexer trade-event processing path. + */ +export interface AlertTradeEvent { + creatorId: string; + /** The new key price after the trade, as a decimal string. */ + price: string; + /** ISO-8601 timestamp of the trade. */ + timestamp: string; +} diff --git a/src/modules/alerts/index.ts b/src/modules/alerts/index.ts new file mode 100644 index 0000000..ed54cfa --- /dev/null +++ b/src/modules/alerts/index.ts @@ -0,0 +1,3 @@ +export { default as alertRouter } from './alert.router'; +export * from './alert.types'; +export { evaluateTradeForAlerts } from './alert.service'; diff --git a/src/modules/index.ts b/src/modules/index.ts index dd4cec5..c66711d 100644 --- a/src/modules/index.ts +++ b/src/modules/index.ts @@ -10,6 +10,7 @@ import adminRouter from './admin/admin.routes'; import activityRouter from './activity/activity.routes'; import ownershipRouter from './ownership/ownership.routes'; import webhookRouter from './webhooks/webhook.router'; +import alertRouter from './alerts/alert.router'; import { BASE as CREATORS_BASE } from '../constants/creator.constants'; const router = Router(); @@ -24,6 +25,7 @@ router.use('/ledger', ledgerRouter); router.use('/admin', adminRouter); router.use('/activity', activityRouter); router.use('/ownership', ownershipRouter); +router.use('/alerts', alertRouter); router.use(CREATORS_BASE, webhookRouter); export default router; diff --git a/src/utils/trade-event.utils.ts b/src/utils/trade-event.utils.ts new file mode 100644 index 0000000..48d803d --- /dev/null +++ b/src/utils/trade-event.utils.ts @@ -0,0 +1,41 @@ +import { logger } from './logger.utils'; +import { dispatchWebhookEvent } from '../modules/webhooks'; +import type { TradeEvent } from '../modules/webhooks'; +import { evaluateTradeForAlerts } from '../modules/alerts'; + +/** + * Single entry point for processing a creator-key trade event off the indexer. + * + * Fans out the trade event to every interested consumer: + * - trade webhooks registered by the creator (buy/sell notifications) + * - price-threshold alerts registered by fans (one-shot price-cross alerts) + * + * Each consumer is isolated: a failure in one does not prevent the other from + * running, so a webhook delivery problem never suppresses a price alert (and + * vice-versa). + */ +export async function processTradeEvent(tradeEvent: TradeEvent): Promise { + const results = await Promise.allSettled([ + dispatchWebhookEvent(tradeEvent), + evaluateTradeForAlerts({ + creatorId: tradeEvent.creatorId, + price: tradeEvent.price, + timestamp: tradeEvent.timestamp, + }), + ]); + + for (const result of results) { + if (result.status === 'rejected') { + logger.error( + { + creatorId: tradeEvent.creatorId, + error: + result.reason instanceof Error + ? result.reason.message + : String(result.reason), + }, + 'Trade-event consumer failed' + ); + } + } +}