From 6db3a7148f7615eb09c5d5f478974e5668363d72 Mon Sep 17 00:00:00 2001 From: TYDev01 Date: Mon, 1 Jun 2026 14:29:49 +0100 Subject: [PATCH 1/3] feat(orders): add amendment history visibility (#512) --- ...260601_0005_add_order_amendment_columns.py | 38 +++ backend/app/models/order.py | 4 +- backend/app/routes/orders.py | 62 +++- backend/app/schemas/order.py | 28 +- backend/app/ws/events.py | 1 + backend/tests/test_orders.py | 102 ++++++- frontend/src/app/(orders)/orders/page.tsx | 277 +++++++++++++----- frontend/src/hooks/useOrderBook.ts | 65 +++- frontend/src/lib/api/orders.ts | 10 + frontend/src/lib/api/schemas.ts | 14 + frontend/src/types/api.ts | 22 ++ frontend/src/types/index.ts | 15 + 12 files changed, 553 insertions(+), 85 deletions(-) create mode 100644 backend/alembic/versions/20260601_0005_add_order_amendment_columns.py diff --git a/backend/alembic/versions/20260601_0005_add_order_amendment_columns.py b/backend/alembic/versions/20260601_0005_add_order_amendment_columns.py new file mode 100644 index 00000000..de646329 --- /dev/null +++ b/backend/alembic/versions/20260601_0005_add_order_amendment_columns.py @@ -0,0 +1,38 @@ +"""Add amendment_count and amendment_log to swap_orders.""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +revision: str = "20260601_0005" +down_revision: Union[str, None] = "20260530_0004" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "swap_orders", + sa.Column( + "amendment_count", + sa.Integer(), + nullable=False, + server_default="0", + ), + ) + op.add_column( + "swap_orders", + sa.Column( + "amendment_log", + postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + server_default="[]", + ), + ) + + +def downgrade() -> None: + op.drop_column("swap_orders", "amendment_log") + op.drop_column("swap_orders", "amendment_count") diff --git a/backend/app/models/order.py b/backend/app/models/order.py index 8631ad4b..766af46e 100644 --- a/backend/app/models/order.py +++ b/backend/app/models/order.py @@ -1,6 +1,6 @@ import uuid from sqlalchemy import Column, String, BigInteger, Integer, ForeignKey -from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.dialects.postgresql import UUID, JSONB from .base import Base, TimestampMixin @@ -27,3 +27,5 @@ class SwapOrder(Base, TimestampMixin): expiry = Column(BigInteger, nullable=False, index=True) status = Column(String, nullable=False, default="open", index=True) counterparty = Column(String, nullable=True) + amendment_count = Column(Integer, nullable=False, default=0) + amendment_log = Column(JSONB, nullable=False, default=list) diff --git a/backend/app/routes/orders.py b/backend/app/routes/orders.py index a38858b7..d3c502d2 100644 --- a/backend/app/routes/orders.py +++ b/backend/app/routes/orders.py @@ -1,5 +1,6 @@ -"""Order book endpoints: create, list, match, cancel (#26, #59).""" +"""Order book endpoints: create, list, match, cancel, amend (#26, #59, #512).""" +from datetime import datetime, timezone from typing import Annotated, Optional from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import select @@ -8,7 +9,7 @@ from app.config.database import get_db from app.config.redis import get_redis, CacheService from app.models.order import SwapOrder -from app.schemas.order import OrderCreate, OrderResponse, OrderMatch +from app.schemas.order import OrderAmend, OrderCreate, OrderResponse, OrderMatch from app.middleware.auth import require_api_key from app.services.order_matching import OrderMatchingService from app.ws.events import emit_order_event, EventType @@ -148,6 +149,63 @@ async def match_order( return response +@router.patch("/{order_id}/amend", response_model=OrderResponse) +async def amend_order( + order_id: str, + data: OrderAmend, + db: AsyncSession = Depends(get_db), + _=Depends(require_api_key), +): + result = await db.execute(select(SwapOrder).where(SwapOrder.id == order_id)) + order = result.scalar_one_or_none() + if not order: + raise HTTPException(status_code=404, detail="Order not found") + if order.status != "open": + raise HTTPException(status_code=400, detail="Only open orders can be amended") + + changes: dict = {} + if data.from_amount is not None and data.from_amount != order.from_amount: + changes["from_amount"] = {"before": int(order.from_amount), "after": data.from_amount} + order.from_amount = data.from_amount + if data.to_amount is not None and data.to_amount != order.to_amount: + changes["to_amount"] = {"before": int(order.to_amount), "after": data.to_amount} + order.to_amount = data.to_amount + if data.min_fill_amount is not None and data.min_fill_amount != order.min_fill_amount: + changes["min_fill_amount"] = { + "before": int(order.min_fill_amount) if order.min_fill_amount is not None else None, + "after": data.min_fill_amount, + } + order.min_fill_amount = data.min_fill_amount + if data.expiry is not None and data.expiry != order.expiry: + changes["expiry"] = {"before": int(order.expiry), "after": data.expiry} + order.expiry = data.expiry + + if not changes: + raise HTTPException(status_code=400, detail="No fields changed") + + entry = { + "sequence": int(order.amendment_count or 0) + 1, + "amended_at": datetime.now(timezone.utc).isoformat(), + "changes": changes, + } + if data.note: + entry["note"] = data.note + + order.amendment_count = int(order.amendment_count or 0) + 1 + order.amendment_log = list(order.amendment_log or []) + [entry] + + await db.commit() + await db.refresh(order) + + redis = get_redis() + cache = CacheService(redis) + await cache.invalidate_pattern("orders:*") + + response = OrderResponse.model_validate(order) + await emit_order_event(redis, EventType.ORDER_UPDATED, response.model_dump()) + return response + + @router.post("/{order_id}/cancel", response_model=OrderResponse) async def cancel_order( order_id: str, db: AsyncSession = Depends(get_db), _=Depends(require_api_key) diff --git a/backend/app/schemas/order.py b/backend/app/schemas/order.py index e725e52e..0005d62f 100644 --- a/backend/app/schemas/order.py +++ b/backend/app/schemas/order.py @@ -1,5 +1,5 @@ from pydantic import BaseModel, Field, field_validator, model_validator -from typing import Optional +from typing import Any, Optional from datetime import datetime from app.utils.address_validation import ( @@ -40,6 +40,30 @@ def validate_creator_address(self): return self +class OrderAmend(BaseModel): + from_amount: Optional[int] = Field(default=None, gt=0) + to_amount: Optional[int] = Field(default=None, gt=0) + min_fill_amount: Optional[int] = Field(default=None, gt=0) + expiry: Optional[int] = Field(default=None, gt=0) + note: Optional[str] = None + + @model_validator(mode="after") + def at_least_one_field(self): + if all( + v is None + for v in (self.from_amount, self.to_amount, self.min_fill_amount, self.expiry) + ): + raise ValueError("At least one amendable field must be provided") + return self + + +class OrderAmendmentEntry(BaseModel): + sequence: int + amended_at: str + changes: dict[str, Any] + note: Optional[str] = None + + class OrderMatch(BaseModel): counterparty: str fill_amount: Optional[int] = None @@ -69,6 +93,8 @@ class OrderResponse(BaseModel): status: str counterparty: Optional[str] = None created_at: Optional[datetime] = None + amendment_count: int = 0 + amendment_log: list[dict[str, Any]] = [] class Config: from_attributes = True diff --git a/backend/app/ws/events.py b/backend/app/ws/events.py index 178d7042..7c105c8a 100644 --- a/backend/app/ws/events.py +++ b/backend/app/ws/events.py @@ -46,6 +46,7 @@ class EventType(str, Enum): ORDER_MATCHED = "order.matched" ORDER_CANCELLED = "order.cancelled" ORDER_FILLED = "order.filled" + ORDER_UPDATED = "order.updated" def _build_event(event_type: EventType, channel: str, data: Any) -> str: diff --git a/backend/tests/test_orders.py b/backend/tests/test_orders.py index 751b6dc5..779d9e34 100644 --- a/backend/tests/test_orders.py +++ b/backend/tests/test_orders.py @@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, MagicMock from app.models.order import SwapOrder -from app.schemas.order import OrderResponse +from app.schemas.order import OrderAmend, OrderResponse from app.services.order_matching import OrderMatchingService @@ -28,6 +28,8 @@ def make_order(**overrides): "status": "open", "counterparty": None, "created_at": now, + "amendment_count": 0, + "amendment_log": [], } values.update(overrides) order = SwapOrder() @@ -54,6 +56,104 @@ def test_order_response_from_dict(self): resp = OrderResponse(**data) assert resp.id == "order-001" assert resp.status == "open" + assert resp.amendment_count == 0 + assert resp.amendment_log == [] + + def test_order_response_includes_amendment_fields(self): + log_entry = { + "sequence": 1, + "amended_at": "2026-06-01T10:00:00+00:00", + "changes": {"from_amount": {"before": 50, "after": 100}}, + } + data = { + "id": "order-002", + "creator": "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", + "from_chain": "stellar", + "to_chain": "ethereum", + "from_asset": "XLM", + "to_asset": "USDC", + "from_amount": 100, + "to_amount": 200, + "filled_amount": 0, + "expiry": 9999999999, + "status": "open", + "amendment_count": 1, + "amendment_log": [log_entry], + } + resp = OrderResponse(**data) + assert resp.amendment_count == 1 + assert len(resp.amendment_log) == 1 + assert resp.amendment_log[0]["sequence"] == 1 + + +class TestOrderAmendSchema: + def test_valid_amend_single_field(self): + data = OrderAmend(from_amount=500) + assert data.from_amount == 500 + assert data.to_amount is None + + def test_valid_amend_multiple_fields(self): + data = OrderAmend(from_amount=300, to_amount=600, note="repriced") + assert data.from_amount == 300 + assert data.to_amount == 600 + assert data.note == "repriced" + + def test_rejects_zero_fields_changed(self): + with pytest.raises(ValueError, match="At least one amendable field must be provided"): + OrderAmend() + + def test_rejects_zero_amounts(self): + with pytest.raises(ValueError): + OrderAmend(from_amount=0) + + def test_expiry_amend(self): + data = OrderAmend(expiry=9999999999) + assert data.expiry == 9999999999 + + +class TestAmendOrderLogic: + def test_amendment_increments_count(self): + order = make_order() + assert order.amendment_count == 0 + + order.from_amount = 150 + order.amendment_count = order.amendment_count + 1 + entry = { + "sequence": 1, + "amended_at": datetime.now(timezone.utc).isoformat(), + "changes": {"from_amount": {"before": 100, "after": 150}}, + } + order.amendment_log = list(order.amendment_log) + [entry] + + assert order.amendment_count == 1 + assert len(order.amendment_log) == 1 + assert order.amendment_log[0]["sequence"] == 1 + + def test_multiple_amendments_append_in_order(self): + order = make_order() + now = datetime.now(timezone.utc) + + for i in range(1, 4): + order.amendment_count = i + entry = { + "sequence": i, + "amended_at": now.isoformat(), + "changes": {"from_amount": {"before": i * 10, "after": (i + 1) * 10}}, + } + order.amendment_log = list(order.amendment_log) + [entry] + + assert order.amendment_count == 3 + assert [e["sequence"] for e in order.amendment_log] == [1, 2, 3] + + def test_no_changes_does_not_amend(self): + order = make_order(from_amount=100) + amend = OrderAmend(from_amount=100, to_amount=200) + changes = {} + if amend.from_amount is not None and amend.from_amount != order.from_amount: + changes["from_amount"] = {"before": order.from_amount, "after": amend.from_amount} + if amend.to_amount is not None and amend.to_amount != order.to_amount: + changes["to_amount"] = {"before": order.to_amount, "after": amend.to_amount} + assert changes == {} class TestOrderMatchingService: diff --git a/frontend/src/app/(orders)/orders/page.tsx b/frontend/src/app/(orders)/orders/page.tsx index b7567232..68c49ab1 100644 --- a/frontend/src/app/(orders)/orders/page.tsx +++ b/frontend/src/app/(orders)/orders/page.tsx @@ -5,8 +5,11 @@ import Link from "next/link"; import { usePathname, useRouter, useSearchParams } from "next/navigation"; import { BookmarkPlus, + ChevronDown, + ChevronUp, Clock3, Filter, + History, LayoutGrid, Rows3, RefreshCw, @@ -20,7 +23,7 @@ import { import { Button, Card, EmptyState, Input, Spinner, ToastContainer } from "@/components/ui"; import { WalletConnect } from "@/components/swap/WalletConnect"; import { DEMO_ORDER_OWNER, useMockOrders, useOrderBookStore } from "@/hooks/useOrderBook"; -import { Order, OrderStatus } from "@/types"; +import { Order, OrderAmendmentEntry, OrderStatus } from "@/types"; import { cn } from "@/lib/utils"; import { shortenHash } from "@/lib/format"; import { AdvancedFilterDrawer } from "@/components/filters/AdvancedFilterDrawer"; @@ -99,6 +102,7 @@ export default function OrdersPage() { const [pendingCancelId, setPendingCancelId] = useState(null); const [orderToCancel, setOrderToCancel] = useState(null); const [toasts, setToasts] = useState([]); + const [expandedAmendmentId, setExpandedAmendmentId] = useState(null); useEffect(() => { seedMockOrders(ownerAddress); @@ -459,89 +463,134 @@ export default function OrdersPage() { } /> ) : ( - visibleOrders.map((order) => ( - -
-
-
- {order.pair} - { + const isAmendmentExpanded = expandedAmendmentId === order.id; + const hasAmendments = (order.amendmentCount ?? 0) > 0; + return ( + +
+
+
+ {order.pair} + + {order.derivedStatus} + + {hasAmendments && ( + + + {order.amendmentCount} {order.amendmentCount === 1 ? "amendment" : "amendments"} + )} - > - {order.derivedStatus} - - {pendingCancelId === order.id && ( -
- - Cancelling -
- )} -
-
-

Maker: {shortAddress(order.maker)}

-

- Chain Route: {order.chainIn} to {order.chainOut} -

-

- Size: {order.amount} {order.tokenIn} -

-

- Total: {order.total} {order.tokenOut} -

-

- Expires:{" "} - {order.expiresAt ? new Date(order.expiresAt).toLocaleString() : "Not set"} -

-

Created: {new Date(order.timestamp).toLocaleString()}

+ {pendingCancelId === order.id && ( +
+ + Cancelling +
+ )} +
+
+

Maker: {shortAddress(order.maker)}

+

+ Chain Route: {order.chainIn} to {order.chainOut} +

+

+ Size: {order.amount} {order.tokenIn} +

+

+ Total: {order.total} {order.tokenOut} +

+

+ Expires:{" "} + {order.expiresAt ? new Date(order.expiresAt).toLocaleString() : "Not set"} +

+

Created: {new Date(order.timestamp).toLocaleString()}

+
-
-
-
-

- Order Summary -

-

- Type: {order.orderType ?? "limit"} -

-

- Partial fills:{" "} - - {order.allowPartialFills ? "Enabled" : "Disabled"} - -

-
+
+
+

+ Order Summary +

+

+ Type: {order.orderType ?? "limit"} +

+

+ Partial fills:{" "} + + {order.allowPartialFills ? "Enabled" : "Disabled"} + +

+ {hasAmendments && ( +

+ Last amended:{" "} + + {new Date( + order.amendmentLog![order.amendmentLog!.length - 1].amended_at + ).toLocaleString()} + +

+ )} +
+ + {hasAmendments && ( + + )} - + +
-
- - )) + + {isAmendmentExpanded && order.amendmentLog && order.amendmentLog.length > 0 && ( + + )} + + ); + }) )}
@@ -716,6 +765,76 @@ export default function OrdersPage() { ); } +const FIELD_LABELS: Record = { + from_amount: "Send amount", + to_amount: "Receive amount", + min_fill_amount: "Min fill", + expiry: "Expiry", +}; + +function AmendmentHistory({ entries }: { entries: OrderAmendmentEntry[] }) { + const sorted = [...entries].sort((a, b) => b.sequence - a.sequence); + return ( +
+

+ + Amendment History +

+
    + {sorted.map((entry, index) => ( +
  1. +
    + + {entry.sequence} + + {index < sorted.length - 1 && ( +
    + )} +
    +
    +
    + + {new Date(entry.amended_at).toLocaleString()} + + {index === 0 && ( + + Latest + + )} +
    +
      + {Object.entries(entry.changes).map(([field, change]) => ( +
    • + + {FIELD_LABELS[field] ?? field} + + {": "} + + {change.before ?? "—"} + {" "} + →{" "} + {change.after} +
    • + ))} +
    + {entry.note && ( +

    “{entry.note}”

    + )} +
    +
  2. + ))} +
+
+ ); +} + function StatCard({ label, value }: { label: string; value: string }) { return (
diff --git a/frontend/src/hooks/useOrderBook.ts b/frontend/src/hooks/useOrderBook.ts index d81317ae..884d4054 100644 --- a/frontend/src/hooks/useOrderBook.ts +++ b/frontend/src/hooks/useOrderBook.ts @@ -1,6 +1,13 @@ import { create } from "zustand"; import { persist } from "zustand/middleware"; -import { AdvancedOrderType, Order, OrderBookStore, OrderSide, OrderStatus } from "@/types"; +import { + AdvancedOrderType, + Order, + OrderAmendmentEntry, + OrderBookStore, + OrderSide, + OrderStatus, +} from "@/types"; import { useCallback } from "react"; export const DEMO_ORDER_OWNER = "cb-local-trader"; @@ -62,6 +69,16 @@ export const useMockOrders = () => { orderType: AdvancedOrderType.LIMIT, allowPartialFills: true, amendmentCount: 1, + amendmentLog: [ + { + sequence: 1, + amended_at: new Date(Date.now() - 1000 * 60 * 20).toISOString(), + changes: { + from_amount: { before: 8000, after: 10000 }, + }, + note: "Increased order size", + }, + ] as OrderAmendmentEntry[], minFillAmount: "2,500", takerFeeEstimate: "~0.0004 ETH", }, @@ -103,6 +120,24 @@ export const useMockOrders = () => { orderType: AdvancedOrderType.TWAP, allowPartialFills: true, amendmentCount: 2, + amendmentLog: [ + { + sequence: 1, + amended_at: new Date(Date.now() - 1000 * 60 * 25).toISOString(), + changes: { + to_amount: { before: 400000, after: 450000 }, + }, + }, + { + sequence: 2, + amended_at: new Date(Date.now() - 1000 * 60 * 10).toISOString(), + changes: { + min_fill_amount: { before: null, after: 1000 }, + expiry: { before: 1800000, after: 2400000 }, + }, + note: "Set min fill and extended expiry", + }, + ] as OrderAmendmentEntry[], minFillAmount: "0.01", takerFeeEstimate: "~35 XLM", }, @@ -144,6 +179,16 @@ export const useMockOrders = () => { orderType: AdvancedOrderType.LIMIT, allowPartialFills: true, amendmentCount: 1, + amendmentLog: [ + { + sequence: 1, + amended_at: new Date(Date.now() - 1000 * 60 * 7).toISOString(), + changes: { + to_amount: { before: 170000, after: 184000 }, + }, + note: "Adjusted rate", + }, + ] as OrderAmendmentEntry[], minFillAmount: "1,000", makerFeeEstimate: "~3 XLM", }, @@ -165,6 +210,24 @@ export const useMockOrders = () => { orderType: AdvancedOrderType.STOP_LOSS, allowPartialFills: false, amendmentCount: 3, + amendmentLog: [ + { + sequence: 1, + amended_at: new Date(Date.now() - 1000 * 60 * 80).toISOString(), + changes: { from_amount: { before: 1000, after: 2000 } }, + }, + { + sequence: 2, + amended_at: new Date(Date.now() - 1000 * 60 * 60).toISOString(), + changes: { to_amount: { before: 400000, after: 468000 } }, + note: "Updated target price", + }, + { + sequence: 3, + amended_at: new Date(Date.now() - 1000 * 60 * 30).toISOString(), + changes: { expiry: { before: 3600, after: 5400 } }, + }, + ] as OrderAmendmentEntry[], makerFeeEstimate: "~0.00003 BTC", }, ]; diff --git a/frontend/src/lib/api/orders.ts b/frontend/src/lib/api/orders.ts index d2134142..6bd241d5 100644 --- a/frontend/src/lib/api/orders.ts +++ b/frontend/src/lib/api/orders.ts @@ -1,6 +1,7 @@ import { createApiClient, getUserApiHeaders } from "@/lib/api/client"; import { ApiOrderRecordSchema, ApiOrderListSchema } from "@/lib/api/schemas"; import type { + AmendOrderPayload, ApiOrderRecord, CreateOrderPayload, ListOrdersParams, @@ -41,3 +42,12 @@ export function cancelOrder(orderId: string) { ApiOrderRecordSchema ); } + +export function amendOrder(orderId: string, payload: AmendOrderPayload) { + return ordersClient.patch( + `/${orderId}/amend`, + payload, + undefined, + ApiOrderRecordSchema + ); +} diff --git a/frontend/src/lib/api/schemas.ts b/frontend/src/lib/api/schemas.ts index 33c4cb9b..e3489701 100644 --- a/frontend/src/lib/api/schemas.ts +++ b/frontend/src/lib/api/schemas.ts @@ -6,6 +6,18 @@ import { z } from "zod"; // ── Order Schemas ────────────────────────────────────────────────────────────── +export const ApiOrderAmendmentChangeSchema = z.object({ + before: z.number().nullable(), + after: z.number(), +}); + +export const ApiOrderAmendmentEntrySchema = z.object({ + sequence: z.number(), + amended_at: z.string(), + changes: z.record(z.string(), ApiOrderAmendmentChangeSchema), + note: z.string().optional(), +}); + export const ApiOrderRecordSchema = z.object({ id: z.string(), onchain_id: z.number().nullable(), @@ -22,6 +34,8 @@ export const ApiOrderRecordSchema = z.object({ status: z.string(), counterparty: z.string().nullable(), created_at: z.string().nullable(), + amendment_count: z.number().default(0), + amendment_log: z.array(ApiOrderAmendmentEntrySchema).default([]), }); export const ApiOrderListSchema = z.array(ApiOrderRecordSchema); diff --git a/frontend/src/types/api.ts b/frontend/src/types/api.ts index 2ad6b5c7..02d39b79 100644 --- a/frontend/src/types/api.ts +++ b/frontend/src/types/api.ts @@ -5,6 +5,18 @@ export interface ApiErrorShape { details?: unknown; } +export interface ApiOrderAmendmentChange { + before: number | null; + after: number; +} + +export interface ApiOrderAmendmentEntry { + sequence: number; + amended_at: string; + changes: Record; + note?: string; +} + export interface ApiOrderRecord { id: string; onchain_id: number | null; @@ -21,6 +33,16 @@ export interface ApiOrderRecord { status: string; counterparty: string | null; created_at: string | null; + amendment_count: number; + amendment_log: ApiOrderAmendmentEntry[]; +} + +export interface AmendOrderPayload { + from_amount?: number; + to_amount?: number; + min_fill_amount?: number; + expiry?: number; + note?: string; } export interface CreateOrderPayload { diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index b064968d..5c719042 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -102,6 +102,18 @@ export enum OrderStatus { EXPIRED = "expired", } +export interface OrderAmendmentChange { + before: number | null; + after: number; +} + +export interface OrderAmendmentEntry { + sequence: number; + amended_at: string; + changes: Record; + note?: string; +} + export interface Order { id: string; maker: string; @@ -121,6 +133,7 @@ export interface Order { expiresAt?: string; allowPartialFills?: boolean; amendmentCount?: number; + amendmentLog?: OrderAmendmentEntry[]; minFillAmount?: string; makerFeeEstimate?: string; takerFeeEstimate?: string; @@ -283,9 +296,11 @@ export interface ProtocolStats { } export type { + AmendOrderPayload, ApiErrorShape, ApiHTLCBaseRecord, ApiHTLCRecord, + ApiOrderAmendmentEntry, ApiOrderRecord, ApiSwapRecord, ClaimHTLCPayload, From 922230574c978b91803c624a872288a8a266f2cd Mon Sep 17 00:00:00 2001 From: TYDev01 Date: Mon, 1 Jun 2026 14:47:52 +0100 Subject: [PATCH 2/3] feat(orders): add trigger_price and valid_from execution conditions (#511) --- ...0601_0006_add_advanced_order_conditions.py | 31 ++ backend/app/models/order.py | 4 +- backend/app/routes/orders.py | 19 ++ backend/app/schemas/order.py | 22 +- backend/app/services/order_matching.py | 23 +- backend/tests/test_orders.py | 303 +++++++++++++++++- 6 files changed, 397 insertions(+), 5 deletions(-) create mode 100644 backend/alembic/versions/20260601_0006_add_advanced_order_conditions.py diff --git a/backend/alembic/versions/20260601_0006_add_advanced_order_conditions.py b/backend/alembic/versions/20260601_0006_add_advanced_order_conditions.py new file mode 100644 index 00000000..1c8dadca --- /dev/null +++ b/backend/alembic/versions/20260601_0006_add_advanced_order_conditions.py @@ -0,0 +1,31 @@ +"""Add trigger_price and valid_from to swap_orders for advanced order conditions.""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "20260601_0006" +down_revision: Union[str, None] = "20260601_0005" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "swap_orders", + sa.Column("trigger_price", sa.Numeric(precision=36, scale=18), nullable=True), + ) + op.add_column( + "swap_orders", + sa.Column("valid_from", sa.BigInteger(), nullable=True), + ) + op.create_index( + op.f("ix_swap_orders_valid_from"), "swap_orders", ["valid_from"], unique=False + ) + + +def downgrade() -> None: + op.drop_index(op.f("ix_swap_orders_valid_from"), table_name="swap_orders") + op.drop_column("swap_orders", "valid_from") + op.drop_column("swap_orders", "trigger_price") diff --git a/backend/app/models/order.py b/backend/app/models/order.py index 766af46e..3e76331d 100644 --- a/backend/app/models/order.py +++ b/backend/app/models/order.py @@ -1,5 +1,5 @@ import uuid -from sqlalchemy import Column, String, BigInteger, Integer, ForeignKey +from sqlalchemy import Column, String, BigInteger, Integer, Numeric, ForeignKey from sqlalchemy.dialects.postgresql import UUID, JSONB from .base import Base, TimestampMixin @@ -29,3 +29,5 @@ class SwapOrder(Base, TimestampMixin): counterparty = Column(String, nullable=True) amendment_count = Column(Integer, nullable=False, default=0) amendment_log = Column(JSONB, nullable=False, default=list) + trigger_price = Column(Numeric(36, 18), nullable=True) + valid_from = Column(BigInteger, nullable=True, index=True) diff --git a/backend/app/routes/orders.py b/backend/app/routes/orders.py index d3c502d2..c31da3c2 100644 --- a/backend/app/routes/orders.py +++ b/backend/app/routes/orders.py @@ -47,6 +47,8 @@ async def create_order( to_amount=data.to_amount, min_fill_amount=data.min_fill_amount, expiry=data.expiry, + trigger_price=data.trigger_price, + valid_from=data.valid_from, status="open", ) db.add(order) @@ -179,6 +181,23 @@ async def amend_order( if data.expiry is not None and data.expiry != order.expiry: changes["expiry"] = {"before": int(order.expiry), "after": data.expiry} order.expiry = data.expiry + if data.trigger_price is not None and data.trigger_price != order.trigger_price: + changes["trigger_price"] = { + "before": float(order.trigger_price) if order.trigger_price is not None else None, + "after": float(data.trigger_price), + } + order.trigger_price = data.trigger_price + if data.valid_from is not None and data.valid_from != order.valid_from: + new_expiry = data.expiry if data.expiry is not None else int(order.expiry) + if data.valid_from >= new_expiry: + raise HTTPException( + status_code=400, detail="valid_from must be earlier than expiry" + ) + changes["valid_from"] = { + "before": int(order.valid_from) if order.valid_from is not None else None, + "after": data.valid_from, + } + order.valid_from = data.valid_from if not changes: raise HTTPException(status_code=400, detail="No fields changed") diff --git a/backend/app/schemas/order.py b/backend/app/schemas/order.py index 0005d62f..891bdda2 100644 --- a/backend/app/schemas/order.py +++ b/backend/app/schemas/order.py @@ -1,3 +1,4 @@ +from decimal import Decimal from pydantic import BaseModel, Field, field_validator, model_validator from typing import Any, Optional from datetime import datetime @@ -19,6 +20,8 @@ class OrderCreate(BaseModel): to_amount: int = Field(gt=0) min_fill_amount: Optional[int] = None expiry: int = Field(gt=0) + trigger_price: Optional[Decimal] = Field(default=None, gt=0) + valid_from: Optional[int] = Field(default=None, gt=0) @field_validator("from_chain", "to_chain") @classmethod @@ -39,19 +42,34 @@ def validate_creator_address(self): ) return self + @model_validator(mode="after") + def validate_time_window(self): + if self.valid_from is not None and self.valid_from >= self.expiry: + raise ValueError("valid_from must be earlier than expiry") + return self + class OrderAmend(BaseModel): from_amount: Optional[int] = Field(default=None, gt=0) to_amount: Optional[int] = Field(default=None, gt=0) min_fill_amount: Optional[int] = Field(default=None, gt=0) expiry: Optional[int] = Field(default=None, gt=0) + trigger_price: Optional[Decimal] = Field(default=None, gt=0) + valid_from: Optional[int] = Field(default=None, gt=0) note: Optional[str] = None @model_validator(mode="after") def at_least_one_field(self): if all( v is None - for v in (self.from_amount, self.to_amount, self.min_fill_amount, self.expiry) + for v in ( + self.from_amount, + self.to_amount, + self.min_fill_amount, + self.expiry, + self.trigger_price, + self.valid_from, + ) ): raise ValueError("At least one amendable field must be provided") return self @@ -95,6 +113,8 @@ class OrderResponse(BaseModel): created_at: Optional[datetime] = None amendment_count: int = 0 amendment_log: list[dict[str, Any]] = [] + trigger_price: Optional[Decimal] = None + valid_from: Optional[int] = None class Config: from_attributes = True diff --git a/backend/app/services/order_matching.py b/backend/app/services/order_matching.py index 8dc52fc9..a31b4349 100644 --- a/backend/app/services/order_matching.py +++ b/backend/app/services/order_matching.py @@ -20,6 +20,18 @@ def _is_expired(order: SwapOrder) -> bool: return int(order.expiry) <= int(datetime.now(timezone.utc).timestamp()) +def _is_not_yet_active(order: SwapOrder) -> bool: + if order.valid_from is None: + return False + return int(datetime.now(timezone.utc).timestamp()) < int(order.valid_from) + + +def _meets_trigger_price(order: SwapOrder, execution_price: Fraction) -> bool: + if order.trigger_price is None: + return True + return execution_price >= Fraction(order.trigger_price) + + def _counterparty_label(existing: str | None, new_value: str) -> str: if not existing or existing == new_value: return new_value @@ -72,7 +84,7 @@ class OrderMatchingService: """Price-time-priority matcher for reciprocal open orders.""" async def match_order(self, db: AsyncSession, order: SwapOrder) -> MatchingSummary: - if order.status not in {"open", "matched"} or _is_expired(order): + if order.status not in {"open", "matched"} or _is_expired(order) or _is_not_yet_active(order): return MatchingSummary( total_matches=0, filled_amount=int(order.filled_amount or 0), @@ -111,7 +123,7 @@ def candidate_price_key(candidate: SwapOrder) -> Fraction: ): if _remaining(order) <= 0: break - if _remaining(candidate) <= 0 or _is_expired(candidate): + if _remaining(candidate) <= 0 or _is_expired(candidate) or _is_not_yet_active(candidate): continue if not self._is_price_compatible(order, candidate): continue @@ -133,6 +145,13 @@ def candidate_price_key(candidate: SwapOrder) -> Fraction: ): continue + taker_execution_price = Fraction(counterparty_fill, max_fill) + if not _meets_trigger_price(order, taker_execution_price): + continue + maker_execution_price = Fraction(max_fill, counterparty_fill) + if not _meets_trigger_price(candidate, maker_execution_price): + continue + order.filled_amount = int(order.filled_amount or 0) + max_fill candidate.filled_amount = ( int(candidate.filled_amount or 0) + counterparty_fill diff --git a/backend/tests/test_orders.py b/backend/tests/test_orders.py index 779d9e34..23daa157 100644 --- a/backend/tests/test_orders.py +++ b/backend/tests/test_orders.py @@ -1,13 +1,14 @@ """Tests for order schemas and matching.""" from datetime import datetime, timedelta, timezone +from decimal import Decimal from uuid import uuid4 import pytest from unittest.mock import AsyncMock, MagicMock from app.models.order import SwapOrder -from app.schemas.order import OrderAmend, OrderResponse +from app.schemas.order import OrderAmend, OrderCreate, OrderResponse from app.services.order_matching import OrderMatchingService @@ -30,6 +31,8 @@ def make_order(**overrides): "created_at": now, "amendment_count": 0, "amendment_log": [], + "trigger_price": None, + "valid_from": None, } values.update(overrides) order = SwapOrder() @@ -218,3 +221,301 @@ async def test_skips_incompatible_price(self): assert summary.total_matches == 0 assert taker.status == "open" + + +class TestAdvancedOrderConditions: + """Tests for trigger_price and valid_from execution conditions.""" + + # --- Schema validation --- + + def test_order_create_accepts_trigger_price_and_valid_from(self): + now = int(datetime.now(timezone.utc).timestamp()) + data = OrderCreate( + creator="GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", + from_chain="stellar", + to_chain="ethereum", + from_asset="XLM", + to_asset="USDC", + from_amount=100, + to_amount=200, + expiry=now + 3600, + trigger_price=Decimal("1.5"), + valid_from=now + 60, + ) + assert data.trigger_price == Decimal("1.5") + assert data.valid_from == now + 60 + + def test_order_create_rejects_valid_from_after_expiry(self): + now = int(datetime.now(timezone.utc).timestamp()) + with pytest.raises(ValueError, match="valid_from must be earlier than expiry"): + OrderCreate( + creator="GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", + from_chain="stellar", + to_chain="ethereum", + from_asset="XLM", + to_asset="USDC", + from_amount=100, + to_amount=200, + expiry=now + 3600, + valid_from=now + 7200, + ) + + def test_order_create_rejects_valid_from_equal_to_expiry(self): + now = int(datetime.now(timezone.utc).timestamp()) + expiry = now + 3600 + with pytest.raises(ValueError, match="valid_from must be earlier than expiry"): + OrderCreate( + creator="GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", + from_chain="stellar", + to_chain="ethereum", + from_asset="XLM", + to_asset="USDC", + from_amount=100, + to_amount=200, + expiry=expiry, + valid_from=expiry, + ) + + def test_order_create_rejects_zero_trigger_price(self): + now = int(datetime.now(timezone.utc).timestamp()) + with pytest.raises(ValueError): + OrderCreate( + creator="GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", + from_chain="stellar", + to_chain="ethereum", + from_asset="XLM", + to_asset="USDC", + from_amount=100, + to_amount=200, + expiry=now + 3600, + trigger_price=Decimal("0"), + ) + + def test_order_amend_accepts_trigger_price_and_valid_from(self): + now = int(datetime.now(timezone.utc).timestamp()) + data = OrderAmend(trigger_price=Decimal("2.5"), valid_from=now + 120) + assert data.trigger_price == Decimal("2.5") + assert data.valid_from == now + 120 + + def test_order_amend_rejects_all_none_including_new_fields(self): + with pytest.raises(ValueError, match="At least one amendable field must be provided"): + OrderAmend() + + def test_order_response_exposes_new_fields(self): + resp = OrderResponse( + id="order-adv-001", + creator="GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", + from_chain="stellar", + to_chain="ethereum", + from_asset="XLM", + to_asset="USDC", + from_amount=100, + to_amount=200, + filled_amount=0, + expiry=9999999999, + status="open", + trigger_price=Decimal("1.8"), + valid_from=1700000000, + ) + assert resp.trigger_price == Decimal("1.8") + assert resp.valid_from == 1700000000 + + # --- Matching: valid_from window --- + + @pytest.mark.anyio + async def test_taker_not_yet_active_skips_matching(self): + """Taker with future valid_from should not be matched.""" + service = OrderMatchingService() + future_ts = int((datetime.now(timezone.utc) + timedelta(hours=1)).timestamp()) + taker = make_order(from_amount=100, to_amount=200, valid_from=future_ts) + candidate = make_order( + creator="0xde0b295669a9fd93d5f28d9ec85e40f4cb697bae", + from_chain="ethereum", + to_chain="stellar", + from_asset="USDC", + to_asset="XLM", + from_amount=200, + to_amount=100, + ) + + result = MagicMock() + result.scalars.return_value.all.return_value = [candidate] + db = AsyncMock() + db.execute = AsyncMock(return_value=result) + + summary = await service.match_order(db, taker) + + assert summary.total_matches == 0 + assert taker.status == "open" + + @pytest.mark.anyio + async def test_candidate_not_yet_active_is_skipped(self): + """Candidate with future valid_from should be skipped.""" + service = OrderMatchingService() + future_ts = int((datetime.now(timezone.utc) + timedelta(hours=1)).timestamp()) + taker = make_order(from_amount=100, to_amount=200) + candidate = make_order( + creator="0xde0b295669a9fd93d5f28d9ec85e40f4cb697bae", + from_chain="ethereum", + to_chain="stellar", + from_asset="USDC", + to_asset="XLM", + from_amount=200, + to_amount=100, + valid_from=future_ts, + ) + + result = MagicMock() + result.scalars.return_value.all.return_value = [candidate] + db = AsyncMock() + db.execute = AsyncMock(return_value=result) + + summary = await service.match_order(db, taker) + + assert summary.total_matches == 0 + assert taker.status == "open" + + @pytest.mark.anyio + async def test_order_within_valid_window_matches(self): + """Orders with valid_from in the past should match normally.""" + service = OrderMatchingService() + past_ts = int((datetime.now(timezone.utc) - timedelta(minutes=5)).timestamp()) + taker = make_order(from_amount=100, to_amount=200, valid_from=past_ts) + candidate = make_order( + creator="0xde0b295669a9fd93d5f28d9ec85e40f4cb697bae", + from_chain="ethereum", + to_chain="stellar", + from_asset="USDC", + to_asset="XLM", + from_amount=200, + to_amount=100, + valid_from=past_ts, + ) + + result = MagicMock() + result.scalars.return_value.all.return_value = [candidate] + db = AsyncMock() + db.execute = AsyncMock(return_value=result) + + summary = await service.match_order(db, taker) + + assert summary.total_matches == 1 + assert taker.status == "filled" + + # --- Matching: trigger_price --- + + @pytest.mark.anyio + async def test_taker_trigger_price_met_allows_match(self): + """Match proceeds when execution price meets taker's trigger_price.""" + service = OrderMatchingService() + # taker sends 100 XLM, wants 200 USDC => execution price 2.0 USDC/XLM + # trigger_price = 2.0 — exactly met + taker = make_order(from_amount=100, to_amount=200, trigger_price=Decimal("2.0")) + candidate = make_order( + creator="0xde0b295669a9fd93d5f28d9ec85e40f4cb697bae", + from_chain="ethereum", + to_chain="stellar", + from_asset="USDC", + to_asset="XLM", + from_amount=200, + to_amount=100, + ) + + result = MagicMock() + result.scalars.return_value.all.return_value = [candidate] + db = AsyncMock() + db.execute = AsyncMock(return_value=result) + + summary = await service.match_order(db, taker) + + assert summary.total_matches == 1 + assert taker.status == "filled" + + @pytest.mark.anyio + async def test_taker_trigger_price_not_met_skips_match(self): + """Match is skipped when execution price is below taker's trigger_price.""" + service = OrderMatchingService() + # taker sends 100 XLM, wants 200 USDC => execution price 2.0 USDC/XLM + # trigger_price = 2.5 — not met + taker = make_order(from_amount=100, to_amount=200, trigger_price=Decimal("2.5")) + candidate = make_order( + creator="0xde0b295669a9fd93d5f28d9ec85e40f4cb697bae", + from_chain="ethereum", + to_chain="stellar", + from_asset="USDC", + to_asset="XLM", + from_amount=200, + to_amount=100, + ) + + result = MagicMock() + result.scalars.return_value.all.return_value = [candidate] + db = AsyncMock() + db.execute = AsyncMock(return_value=result) + + summary = await service.match_order(db, taker) + + assert summary.total_matches == 0 + assert taker.status == "open" + + @pytest.mark.anyio + async def test_candidate_trigger_price_not_met_skips_match(self): + """Match is skipped when execution price is below the candidate's trigger_price.""" + service = OrderMatchingService() + # candidate sends 200 USDC, wants 100 XLM => maker exec price = 100/200 = 0.5 XLM/USDC + # candidate's trigger_price = 1.0 — not met + taker = make_order(from_amount=100, to_amount=200) + candidate = make_order( + creator="0xde0b295669a9fd93d5f28d9ec85e40f4cb697bae", + from_chain="ethereum", + to_chain="stellar", + from_asset="USDC", + to_asset="XLM", + from_amount=200, + to_amount=100, + trigger_price=Decimal("1.0"), + ) + + result = MagicMock() + result.scalars.return_value.all.return_value = [candidate] + db = AsyncMock() + db.execute = AsyncMock(return_value=result) + + summary = await service.match_order(db, taker) + + assert summary.total_matches == 0 + assert taker.status == "open" + + # --- Partial-fill preservation --- + + @pytest.mark.anyio + async def test_min_fill_amount_preserved_alongside_trigger_price(self): + """min_fill_amount enforcement still applies when trigger_price is set.""" + service = OrderMatchingService() + # taker wants 200 USDC for 100 XLM with trigger_price=2.0, min_fill=80 + # candidate only fills 50 XLM, counterparty_fill = 100 USDC < min_fill 80 of taker + taker = make_order( + from_amount=100, + to_amount=200, + min_fill_amount=80, + trigger_price=Decimal("2.0"), + ) + candidate = make_order( + creator="0xde0b295669a9fd93d5f28d9ec85e40f4cb697bae", + from_chain="ethereum", + to_chain="stellar", + from_asset="USDC", + to_asset="XLM", + from_amount=100, + to_amount=50, + ) + + result = MagicMock() + result.scalars.return_value.all.return_value = [candidate] + db = AsyncMock() + db.execute = AsyncMock(return_value=result) + + summary = await service.match_order(db, taker) + + assert summary.total_matches == 0 + assert taker.status == "open" From 8bf43f95321f70117497c95ac603f084ea0c19e9 Mon Sep 17 00:00:00 2001 From: TYDev01 Date: Mon, 1 Jun 2026 15:51:33 +0100 Subject: [PATCH 3/3] feat(liquidity): add pool health and reserve drift monitoring (#507) --- backend/alembic/env.py | 1 + .../20260601_0007_add_pool_health_tables.py | 82 ++++++ backend/app/models/__init__.py | 1 + backend/app/models/pool.py | 46 +++ backend/app/observability/pool_metrics.py | 51 ++++ backend/app/routes/__init__.py | 2 + backend/app/routes/liquidity.py | 114 ++++++++ backend/app/schemas/pool.py | 48 ++++ backend/app/services/pool_health.py | 199 +++++++++++++ backend/tests/test_pool_health.py | 262 ++++++++++++++++++ 10 files changed, 806 insertions(+) create mode 100644 backend/alembic/versions/20260601_0007_add_pool_health_tables.py create mode 100644 backend/app/models/pool.py create mode 100644 backend/app/observability/pool_metrics.py create mode 100644 backend/app/routes/liquidity.py create mode 100644 backend/app/schemas/pool.py create mode 100644 backend/app/services/pool_health.py create mode 100644 backend/tests/test_pool_health.py diff --git a/backend/alembic/env.py b/backend/alembic/env.py index e82a86be..475f40d5 100644 --- a/backend/alembic/env.py +++ b/backend/alembic/env.py @@ -8,6 +8,7 @@ # Import model modules so metadata is fully populated. from app.models import api_key, dispute, htlc, order, swap, transaction, user # noqa: F401 +from app.models import pool as pool_models # noqa: F401 config = context.config diff --git a/backend/alembic/versions/20260601_0007_add_pool_health_tables.py b/backend/alembic/versions/20260601_0007_add_pool_health_tables.py new file mode 100644 index 00000000..bc1b9d8c --- /dev/null +++ b/backend/alembic/versions/20260601_0007_add_pool_health_tables.py @@ -0,0 +1,82 @@ +"""Add liquidity_pool_snapshots and liquidity_pool_health tables for reserve drift monitoring (#507).""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +revision: str = "20260601_0007" +down_revision: Union[str, None] = "20260601_0006" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "liquidity_pool_snapshots", + sa.Column( + "id", + postgresql.UUID(as_uuid=True), + primary_key=True, + server_default=sa.text("gen_random_uuid()"), + ), + sa.Column("pool_id", sa.BigInteger(), nullable=False), + sa.Column("asset_a", sa.String(), nullable=False), + sa.Column("asset_b", sa.String(), nullable=False), + sa.Column("reserve_a", sa.BigInteger(), nullable=False), + sa.Column("reserve_b", sa.BigInteger(), nullable=False), + sa.Column("ratio", sa.Float(), nullable=True), + sa.Column( + "captured_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + onupdate=sa.text("now()"), + ), + ) + op.create_index("ix_liquidity_pool_snapshots_pool_id", "liquidity_pool_snapshots", ["pool_id"]) + op.create_index( + "ix_liquidity_pool_snapshots_captured_at", "liquidity_pool_snapshots", ["captured_at"] + ) + op.create_index( + "ix_pool_snapshots_pool_captured", + "liquidity_pool_snapshots", + ["pool_id", "captured_at"], + ) + + op.create_table( + "liquidity_pool_health", + sa.Column("pool_id", sa.BigInteger(), primary_key=True), + sa.Column("asset_a", sa.String(), nullable=False), + sa.Column("asset_b", sa.String(), nullable=False), + sa.Column("reserve_a", sa.BigInteger(), nullable=False), + sa.Column("reserve_b", sa.BigInteger(), nullable=False), + sa.Column("ratio", sa.Float(), nullable=True), + sa.Column("ratio_drift_pct", sa.Float(), nullable=True), + sa.Column("status", sa.String(), nullable=False, server_default="healthy"), + sa.Column( + "last_checked_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + ) + + +def downgrade() -> None: + op.drop_table("liquidity_pool_health") + op.drop_index("ix_pool_snapshots_pool_captured", table_name="liquidity_pool_snapshots") + op.drop_index("ix_liquidity_pool_snapshots_captured_at", table_name="liquidity_pool_snapshots") + op.drop_index("ix_liquidity_pool_snapshots_pool_id", table_name="liquidity_pool_snapshots") + op.drop_table("liquidity_pool_snapshots") diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 94d12095..7e85147e 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -8,3 +8,4 @@ from .user import User from .asset import Asset from .protocol import GovernanceProposal, ReferralCampaign, ReferralReward +from .pool import LiquidityPoolSnapshot, LiquidityPoolHealth diff --git a/backend/app/models/pool.py b/backend/app/models/pool.py new file mode 100644 index 00000000..abbd69e6 --- /dev/null +++ b/backend/app/models/pool.py @@ -0,0 +1,46 @@ +import uuid +from sqlalchemy import Column, String, BigInteger, Float, DateTime, func, Index +from sqlalchemy.dialects.postgresql import UUID +from .base import Base, TimestampMixin + + +class LiquidityPoolSnapshot(Base, TimestampMixin): + """Point-in-time record of a pool's reserves, used to track drift over time.""" + + __tablename__ = "liquidity_pool_snapshots" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + pool_id = Column(BigInteger, nullable=False, index=True) + asset_a = Column(String, nullable=False) + asset_b = Column(String, nullable=False) + reserve_a = Column(BigInteger, nullable=False) + reserve_b = Column(BigInteger, nullable=False) + # ratio = reserve_a / reserve_b; stored as float for fast comparisons + ratio = Column(Float, nullable=True) + captured_at = Column( + DateTime(timezone=True), server_default=func.now(), nullable=False, index=True + ) + + __table_args__ = ( + Index("ix_pool_snapshots_pool_captured", "pool_id", "captured_at"), + ) + + +class LiquidityPoolHealth(Base): + """Latest computed health record for each tracked pool.""" + + __tablename__ = "liquidity_pool_health" + + pool_id = Column(BigInteger, primary_key=True) + asset_a = Column(String, nullable=False) + asset_b = Column(String, nullable=False) + reserve_a = Column(BigInteger, nullable=False) + reserve_b = Column(BigInteger, nullable=False) + ratio = Column(Float, nullable=True) + # Percentage change in ratio since the oldest snapshot in the monitoring window + ratio_drift_pct = Column(Float, nullable=True) + # healthy | warning | critical + status = Column(String, nullable=False, default="healthy") + last_checked_at = Column( + DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False + ) diff --git a/backend/app/observability/pool_metrics.py b/backend/app/observability/pool_metrics.py new file mode 100644 index 00000000..d8d58999 --- /dev/null +++ b/backend/app/observability/pool_metrics.py @@ -0,0 +1,51 @@ +"""Prometheus metrics for liquidity pool health monitoring (#507).""" + +from __future__ import annotations + +from typing import Optional + +from prometheus_client import Gauge + +POOL_RESERVE_A = Gauge( + "chainbridge_pool_reserve_a", + "Current reserve_a balance of a liquidity pool", + ["pool_id", "asset_a", "asset_b"], +) + +POOL_RESERVE_B = Gauge( + "chainbridge_pool_reserve_b", + "Current reserve_b balance of a liquidity pool", + ["pool_id", "asset_a", "asset_b"], +) + +POOL_RATIO = Gauge( + "chainbridge_pool_ratio", + "Current reserve_a / reserve_b ratio of a liquidity pool", + ["pool_id", "asset_a", "asset_b"], +) + +# 0 = healthy, 1 = warning, 2 = critical +POOL_HEALTH_STATUS = Gauge( + "chainbridge_pool_health_status", + "Health status of a liquidity pool (0=healthy, 1=warning, 2=critical)", + ["pool_id", "asset_a", "asset_b"], +) + +_STATUS_CODES = {"healthy": 0, "warning": 1, "critical": 2} + + +def update_pool_metrics( + pool_id: int, + asset_a: str, + asset_b: str, + reserve_a: int, + reserve_b: int, + ratio: Optional[float], + status: str, +) -> None: + labels = {"pool_id": str(pool_id), "asset_a": asset_a, "asset_b": asset_b} + POOL_RESERVE_A.labels(**labels).set(reserve_a) + POOL_RESERVE_B.labels(**labels).set(reserve_b) + if ratio is not None: + POOL_RATIO.labels(**labels).set(ratio) + POOL_HEALTH_STATUS.labels(**labels).set(_STATUS_CODES.get(status, 2)) diff --git a/backend/app/routes/__init__.py b/backend/app/routes/__init__.py index 8b844b7b..caa853ba 100644 --- a/backend/app/routes/__init__.py +++ b/backend/app/routes/__init__.py @@ -12,6 +12,7 @@ from .assets import router as assets_router from .chains import router as chains_router from .protocol import router as protocol_router +from .liquidity import router as liquidity_router api_router = APIRouter(prefix="/api/v1") api_router.include_router(htlc_router, prefix="/htlcs", tags=["HTLCs"]) @@ -27,3 +28,4 @@ api_router.include_router(assets_router, prefix="/assets", tags=["Assets"]) api_router.include_router(chains_router, prefix="/chains", tags=["Chains"]) api_router.include_router(protocol_router, prefix="/protocol", tags=["Protocol"]) +api_router.include_router(liquidity_router, prefix="/liquidity", tags=["Liquidity"]) diff --git a/backend/app/routes/liquidity.py b/backend/app/routes/liquidity.py new file mode 100644 index 00000000..a2d4f60e --- /dev/null +++ b/backend/app/routes/liquidity.py @@ -0,0 +1,114 @@ +"""Liquidity pool health and reserve drift monitoring endpoints (#507).""" + +import logging +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config.database import get_db +from app.middleware.auth import require_api_key +from app.schemas.pool import ( + PoolHealthResponse, + PoolHealthSummary, + PoolSnapshotCreate, + PoolSnapshotResponse, +) +from app.services import pool_health as svc + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.post( + "/pools/{pool_id}/snapshots", + response_model=PoolSnapshotResponse, + status_code=201, + summary="Record a pool reserve snapshot", +) +async def create_snapshot( + pool_id: int, + body: PoolSnapshotCreate, + db: AsyncSession = Depends(get_db), + _=Depends(require_api_key), +): + """Ingest current reserve figures for a pool and refresh its health record.""" + if body.pool_id != pool_id: + raise HTTPException( + status_code=422, + detail="pool_id in path and body must match", + ) + snapshot = await svc.record_snapshot( + db=db, + pool_id=pool_id, + asset_a=body.asset_a, + asset_b=body.asset_b, + reserve_a=body.reserve_a, + reserve_b=body.reserve_b, + ) + return PoolSnapshotResponse( + id=str(snapshot.id), + pool_id=snapshot.pool_id, + asset_a=snapshot.asset_a, + asset_b=snapshot.asset_b, + reserve_a=snapshot.reserve_a, + reserve_b=snapshot.reserve_b, + ratio=snapshot.ratio, + captured_at=snapshot.captured_at, + ) + + +@router.get( + "/pools/{pool_id}/snapshots", + response_model=list[PoolSnapshotResponse], + summary="List recent reserve snapshots for a pool", +) +async def list_snapshots( + pool_id: int, + limit: Annotated[int, Query(ge=1, le=200)] = 50, + db: AsyncSession = Depends(get_db), +): + rows = await svc.get_pool_snapshots(db=db, pool_id=pool_id, limit=limit) + return [ + PoolSnapshotResponse( + id=str(s.id), + pool_id=s.pool_id, + asset_a=s.asset_a, + asset_b=s.asset_b, + reserve_a=s.reserve_a, + reserve_b=s.reserve_b, + ratio=s.ratio, + captured_at=s.captured_at, + ) + for s in rows + ] + + +@router.get( + "/pools/{pool_id}/health", + response_model=PoolHealthResponse, + summary="Get health status for a single pool", +) +async def get_pool_health(pool_id: int, db: AsyncSession = Depends(get_db)): + health = await svc.get_pool_health(db=db, pool_id=pool_id) + if health is None: + raise HTTPException(status_code=404, detail="No health data for this pool") + return PoolHealthResponse.model_validate(health) + + +@router.get( + "/pools/health", + response_model=PoolHealthSummary, + summary="List health status for all monitored pools", +) +async def list_pool_health(db: AsyncSession = Depends(get_db)): + pools = await svc.list_pool_health(db=db) + validated = [PoolHealthResponse.model_validate(p) for p in pools] + return PoolHealthSummary( + total_pools=len(validated), + healthy=sum(1 for p in validated if p.status == "healthy"), + warning=sum(1 for p in validated if p.status == "warning"), + critical=sum(1 for p in validated if p.status == "critical"), + pools=validated, + ) diff --git a/backend/app/schemas/pool.py b/backend/app/schemas/pool.py new file mode 100644 index 00000000..67abfab3 --- /dev/null +++ b/backend/app/schemas/pool.py @@ -0,0 +1,48 @@ +from pydantic import BaseModel, Field, model_validator +from typing import Optional +from datetime import datetime + + +class PoolSnapshotCreate(BaseModel): + pool_id: int = Field(..., ge=0) + asset_a: str + asset_b: str + reserve_a: int = Field(..., ge=0) + reserve_b: int = Field(..., ge=0) + + +class PoolSnapshotResponse(BaseModel): + id: str + pool_id: int + asset_a: str + asset_b: str + reserve_a: int + reserve_b: int + ratio: Optional[float] + captured_at: datetime + + class Config: + from_attributes = True + + +class PoolHealthResponse(BaseModel): + pool_id: int + asset_a: str + asset_b: str + reserve_a: int + reserve_b: int + ratio: Optional[float] + ratio_drift_pct: Optional[float] + status: str + last_checked_at: datetime + + class Config: + from_attributes = True + + +class PoolHealthSummary(BaseModel): + total_pools: int + healthy: int + warning: int + critical: int + pools: list[PoolHealthResponse] diff --git a/backend/app/services/pool_health.py b/backend/app/services/pool_health.py new file mode 100644 index 00000000..b69c7cfb --- /dev/null +++ b/backend/app/services/pool_health.py @@ -0,0 +1,199 @@ +"""Pool health and reserve drift monitoring service (#507).""" + +from __future__ import annotations + +import logging +from datetime import datetime, timedelta, timezone +from typing import Optional + +from sqlalchemy import select, delete +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.pool import LiquidityPoolHealth, LiquidityPoolSnapshot +from app.observability.pool_metrics import update_pool_metrics + +logger = logging.getLogger(__name__) + +# Monitoring window: compare current ratio against the snapshot taken this far back. +DRIFT_WINDOW_HOURS = 24 + +# Health thresholds — all values are percentages or absolute ratios. +DRIFT_WARN_PCT = 10.0 # ratio changed >10% in the window → warning +DRIFT_CRITICAL_PCT = 30.0 # ratio changed >30% → critical + +# A ratio outside [RATIO_LOW, RATIO_HIGH] means one reserve is very thin. +RATIO_WARN_LOW = 0.25 +RATIO_WARN_HIGH = 4.0 +RATIO_CRITICAL_LOW = 0.1 +RATIO_CRITICAL_HIGH = 10.0 + +# Keep at most this many snapshots per pool to bound table growth. +MAX_SNAPSHOTS_PER_POOL = 500 + + +def _safe_ratio(reserve_a: int, reserve_b: int) -> Optional[float]: + if reserve_b == 0: + return None + return reserve_a / reserve_b + + +def _classify( + ratio: Optional[float], + drift_pct: Optional[float], +) -> str: + if ratio is None: + return "critical" + + if ratio < RATIO_CRITICAL_LOW or ratio > RATIO_CRITICAL_HIGH: + return "critical" + if drift_pct is not None and abs(drift_pct) >= DRIFT_CRITICAL_PCT: + return "critical" + + if ratio < RATIO_WARN_LOW or ratio > RATIO_WARN_HIGH: + return "warning" + if drift_pct is not None and abs(drift_pct) >= DRIFT_WARN_PCT: + return "warning" + + return "healthy" + + +async def record_snapshot( + db: AsyncSession, + pool_id: int, + asset_a: str, + asset_b: str, + reserve_a: int, + reserve_b: int, +) -> LiquidityPoolSnapshot: + """Persist a new reserve snapshot and refresh the pool's health record.""" + ratio = _safe_ratio(reserve_a, reserve_b) + + snapshot = LiquidityPoolSnapshot( + pool_id=pool_id, + asset_a=asset_a, + asset_b=asset_b, + reserve_a=reserve_a, + reserve_b=reserve_b, + ratio=ratio, + ) + db.add(snapshot) + + await _prune_old_snapshots(db, pool_id) + health = await _refresh_health(db, pool_id, asset_a, asset_b, reserve_a, reserve_b, ratio) + + await db.commit() + await db.refresh(snapshot) + + update_pool_metrics( + pool_id=pool_id, + asset_a=asset_a, + asset_b=asset_b, + reserve_a=reserve_a, + reserve_b=reserve_b, + ratio=ratio, + status=health.status, + ) + + return snapshot + + +async def _prune_old_snapshots(db: AsyncSession, pool_id: int) -> None: + """Delete the oldest snapshots when the per-pool cap is exceeded.""" + count_result = await db.execute( + select(LiquidityPoolSnapshot).where(LiquidityPoolSnapshot.pool_id == pool_id) + ) + rows = count_result.scalars().all() + if len(rows) < MAX_SNAPSHOTS_PER_POOL: + return + + # Sort ascending by capture time and drop the excess oldest rows. + oldest = sorted(rows, key=lambda s: s.captured_at)[: len(rows) - MAX_SNAPSHOTS_PER_POOL + 1] + for row in oldest: + await db.delete(row) + + +async def _refresh_health( + db: AsyncSession, + pool_id: int, + asset_a: str, + asset_b: str, + reserve_a: int, + reserve_b: int, + ratio: Optional[float], +) -> LiquidityPoolHealth: + """Compute drift and upsert the LiquidityPoolHealth record for pool_id.""" + window_start = datetime.now(timezone.utc) - timedelta(hours=DRIFT_WINDOW_HOURS) + + oldest_result = await db.execute( + select(LiquidityPoolSnapshot) + .where( + LiquidityPoolSnapshot.pool_id == pool_id, + LiquidityPoolSnapshot.captured_at >= window_start, + ) + .order_by(LiquidityPoolSnapshot.captured_at.asc()) + .limit(1) + ) + baseline = oldest_result.scalar_one_or_none() + + drift_pct: Optional[float] = None + if baseline is not None and baseline.ratio is not None and ratio is not None: + if baseline.ratio != 0: + drift_pct = ((ratio - baseline.ratio) / baseline.ratio) * 100.0 + + status = _classify(ratio, drift_pct) + + result = await db.execute( + select(LiquidityPoolHealth).where(LiquidityPoolHealth.pool_id == pool_id) + ) + health = result.scalar_one_or_none() + + if health is None: + health = LiquidityPoolHealth( + pool_id=pool_id, + asset_a=asset_a, + asset_b=asset_b, + reserve_a=reserve_a, + reserve_b=reserve_b, + ratio=ratio, + ratio_drift_pct=drift_pct, + status=status, + ) + db.add(health) + else: + health.asset_a = asset_a + health.asset_b = asset_b + health.reserve_a = reserve_a + health.reserve_b = reserve_b + health.ratio = ratio + health.ratio_drift_pct = drift_pct + health.status = status + + return health + + +async def get_pool_health(db: AsyncSession, pool_id: int) -> Optional[LiquidityPoolHealth]: + result = await db.execute( + select(LiquidityPoolHealth).where(LiquidityPoolHealth.pool_id == pool_id) + ) + return result.scalar_one_or_none() + + +async def list_pool_health(db: AsyncSession) -> list[LiquidityPoolHealth]: + result = await db.execute( + select(LiquidityPoolHealth).order_by(LiquidityPoolHealth.pool_id) + ) + return list(result.scalars().all()) + + +async def get_pool_snapshots( + db: AsyncSession, + pool_id: int, + limit: int = 50, +) -> list[LiquidityPoolSnapshot]: + result = await db.execute( + select(LiquidityPoolSnapshot) + .where(LiquidityPoolSnapshot.pool_id == pool_id) + .order_by(LiquidityPoolSnapshot.captured_at.desc()) + .limit(limit) + ) + return list(result.scalars().all()) diff --git a/backend/tests/test_pool_health.py b/backend/tests/test_pool_health.py new file mode 100644 index 00000000..f928dbe0 --- /dev/null +++ b/backend/tests/test_pool_health.py @@ -0,0 +1,262 @@ +"""Tests for pool health and reserve drift monitoring (#507).""" + +from __future__ import annotations + +import pytest +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +from app.services.pool_health import ( + DRIFT_CRITICAL_PCT, + DRIFT_WARN_PCT, + RATIO_CRITICAL_HIGH, + RATIO_CRITICAL_LOW, + RATIO_WARN_HIGH, + RATIO_WARN_LOW, + _classify, + _safe_ratio, + record_snapshot, + get_pool_health, + list_pool_health, + get_pool_snapshots, +) + + +# --------------------------------------------------------------------------- +# Pure unit tests — no DB required +# --------------------------------------------------------------------------- + + +class TestSafeRatio: + def test_normal(self): + assert _safe_ratio(200, 100) == pytest.approx(2.0) + + def test_zero_denominator(self): + assert _safe_ratio(100, 0) is None + + def test_equal_reserves(self): + assert _safe_ratio(500, 500) == pytest.approx(1.0) + + def test_zero_numerator(self): + assert _safe_ratio(0, 100) == pytest.approx(0.0) + + +class TestClassify: + def test_healthy(self): + assert _classify(1.0, 0.0) == "healthy" + + def test_healthy_no_drift(self): + assert _classify(1.5, 5.0) == "healthy" + + def test_warning_drift(self): + assert _classify(1.0, DRIFT_WARN_PCT) == "warning" + + def test_warning_negative_drift(self): + assert _classify(1.0, -DRIFT_WARN_PCT) == "warning" + + def test_critical_drift(self): + assert _classify(1.0, DRIFT_CRITICAL_PCT) == "critical" + + def test_warning_low_ratio(self): + assert _classify(RATIO_WARN_LOW - 0.01, 0.0) == "warning" + + def test_warning_high_ratio(self): + assert _classify(RATIO_WARN_HIGH + 0.1, 0.0) == "warning" + + def test_critical_low_ratio(self): + assert _classify(RATIO_CRITICAL_LOW - 0.01, 0.0) == "critical" + + def test_critical_high_ratio(self): + assert _classify(RATIO_CRITICAL_HIGH + 0.1, 0.0) == "critical" + + def test_none_ratio_is_critical(self): + assert _classify(None, None) == "critical" + + def test_ratio_takes_priority_over_drift(self): + # Even mild drift doesn't override a critical ratio. + assert _classify(RATIO_CRITICAL_LOW - 0.01, 5.0) == "critical" + + +# --------------------------------------------------------------------------- +# Service tests — mock the DB session +# --------------------------------------------------------------------------- + + +def _make_snapshot(pool_id=1, reserve_a=1000, reserve_b=1000, ratio=1.0): + snap = MagicMock() + snap.id = "test-uuid" + snap.pool_id = pool_id + snap.asset_a = "XLM" + snap.asset_b = "USDC" + snap.reserve_a = reserve_a + snap.reserve_b = reserve_b + snap.ratio = ratio + snap.captured_at = datetime.now(timezone.utc) + return snap + + +def _make_health(pool_id=1, status="healthy", ratio=1.0, drift=0.0): + h = MagicMock() + h.pool_id = pool_id + h.asset_a = "XLM" + h.asset_b = "USDC" + h.reserve_a = 1000 + h.reserve_b = 1000 + h.ratio = ratio + h.ratio_drift_pct = drift + h.status = status + h.last_checked_at = datetime.now(timezone.utc) + return h + + +class TestGetPoolHealth: + @pytest.mark.anyio + async def test_returns_none_when_no_record(self): + db = AsyncMock() + result_mock = MagicMock() + result_mock.scalar_one_or_none.return_value = None + db.execute = AsyncMock(return_value=result_mock) + + result = await get_pool_health(db, pool_id=99) + assert result is None + + @pytest.mark.anyio + async def test_returns_health_record(self): + db = AsyncMock() + health = _make_health(pool_id=1, status="healthy") + result_mock = MagicMock() + result_mock.scalar_one_or_none.return_value = health + db.execute = AsyncMock(return_value=result_mock) + + result = await get_pool_health(db, pool_id=1) + assert result is health + assert result.status == "healthy" + + +class TestListPoolHealth: + @pytest.mark.anyio + async def test_returns_empty_list(self): + db = AsyncMock() + result_mock = MagicMock() + result_mock.scalars.return_value.all.return_value = [] + db.execute = AsyncMock(return_value=result_mock) + + result = await list_pool_health(db) + assert result == [] + + @pytest.mark.anyio + async def test_returns_all_pools(self): + db = AsyncMock() + pools = [_make_health(pool_id=i) for i in range(3)] + result_mock = MagicMock() + result_mock.scalars.return_value.all.return_value = pools + db.execute = AsyncMock(return_value=result_mock) + + result = await list_pool_health(db) + assert len(result) == 3 + + +class TestGetPoolSnapshots: + @pytest.mark.anyio + async def test_returns_snapshots(self): + db = AsyncMock() + snaps = [_make_snapshot(pool_id=1) for _ in range(5)] + result_mock = MagicMock() + result_mock.scalars.return_value.all.return_value = snaps + db.execute = AsyncMock(return_value=result_mock) + + result = await get_pool_snapshots(db, pool_id=1, limit=10) + assert len(result) == 5 + + @pytest.mark.anyio + async def test_empty_snapshots(self): + db = AsyncMock() + result_mock = MagicMock() + result_mock.scalars.return_value.all.return_value = [] + db.execute = AsyncMock(return_value=result_mock) + + result = await get_pool_snapshots(db, pool_id=42, limit=50) + assert result == [] + + +# --------------------------------------------------------------------------- +# API endpoint tests +# --------------------------------------------------------------------------- + + +class TestLiquidityRoutes: + @pytest.mark.anyio + async def test_get_pool_health_not_found(self): + from app.routes.liquidity import get_pool_health as route_get + from fastapi import HTTPException + + db = AsyncMock() + result_mock = MagicMock() + result_mock.scalar_one_or_none.return_value = None + db.execute = AsyncMock(return_value=result_mock) + + with pytest.raises(HTTPException) as exc_info: + await route_get(pool_id=999, db=db) + assert exc_info.value.status_code == 404 + + @pytest.mark.anyio + async def test_list_pool_health_empty(self): + from app.routes.liquidity import list_pool_health as route_list + + db = AsyncMock() + result_mock = MagicMock() + result_mock.scalars.return_value.all.return_value = [] + db.execute = AsyncMock(return_value=result_mock) + + summary = await route_list(db=db) + assert summary.total_pools == 0 + assert summary.healthy == 0 + assert summary.warning == 0 + assert summary.critical == 0 + + @pytest.mark.anyio + async def test_list_pool_health_counts(self): + from app.routes.liquidity import list_pool_health as route_list + from app.schemas.pool import PoolHealthResponse + + db = AsyncMock() + now = datetime.now(timezone.utc) + + def _h(pool_id, status): + h = MagicMock() + h.pool_id = pool_id + h.asset_a = "XLM" + h.asset_b = "USDC" + h.reserve_a = 1000 + h.reserve_b = 1000 + h.ratio = 1.0 + h.ratio_drift_pct = 0.0 + h.status = status + h.last_checked_at = now + return h + + pools = [_h(1, "healthy"), _h(2, "warning"), _h(3, "critical")] + result_mock = MagicMock() + result_mock.scalars.return_value.all.return_value = pools + db.execute = AsyncMock(return_value=result_mock) + + summary = await route_list(db=db) + assert summary.total_pools == 3 + assert summary.healthy == 1 + assert summary.warning == 1 + assert summary.critical == 1 + + @pytest.mark.anyio + async def test_create_snapshot_path_body_mismatch(self): + from app.routes.liquidity import create_snapshot + from app.schemas.pool import PoolSnapshotCreate + from fastapi import HTTPException + + db = AsyncMock() + body = PoolSnapshotCreate( + pool_id=2, asset_a="XLM", asset_b="USDC", reserve_a=1000, reserve_b=1000 + ) + + with pytest.raises(HTTPException) as exc_info: + await create_snapshot(pool_id=1, body=body, db=db) + assert exc_info.value.status_code == 422