diff --git a/.gitignore b/.gitignore index 60ea258..0c96f61 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,11 @@ spark/tmp/ # Jupyter .ipynb_checkpoints/ + + # Content / scripts (not part of the codebase) +docs/ARTICLES*.md +docs/BLOG_IDEAS.md +docs/SCRIPT*.md +docs/VIDEO_SCRIPT.md +*.script.md +content/ diff --git a/.skills/production-python-service/SKILL.md b/.skills/production-python-service/SKILL.md index 6877495..586f4e2 100644 --- a/.skills/production-python-service/SKILL.md +++ b/.skills/production-python-service/SKILL.md @@ -22,6 +22,47 @@ description: Use when implementing any Python service: FastAPI endpoints, Kafka - `/health` and `/ready` on every service - `/metrics` (Prometheus format) on every long-running service +## APIRouter structure (mandatory once project has >1 module) + +``` +src// + main.py # assembler only: middleware + include_router, no business logic + dependencies.py # shared Depends factories (get_client, get_db) — defined ONCE here + routers/ + users.py # router = APIRouter(prefix="/users", tags=["users"]) + todos.py + models/ + users.py # Pydantic request/response models per domain +``` + +Rules: +- `main.py` only calls `app.include_router(...)` — no route definitions, no business logic +- `dependencies.py` is the single source of truth for all `Depends()` factories +- Each router imports `get_client` / `get_db` from `dependencies`, never defines its own +- Set `prefix` and `tags` on the `APIRouter`, not on `include_router` (keeps each file self-contained) +- Test overrides via `app.dependency_overrides[get_client] = lambda: MockClient()` + +```python +# dependencies.py +from .config import settings +from .trino_client import TrinoClient + +def get_client() -> TrinoClient: + return TrinoClient(host=settings.trino_host, port=settings.trino_port, ...) + +# routers/ohlcv.py +from ..dependencies import get_client + +router = APIRouter(prefix="/ohlcv", tags=["ohlcv"]) + +@router.get("/{symbol}", response_model=OHLCVResponse) +async def get_ohlcv(symbol: str, client: TrinoClient = Depends(get_client)) -> OHLCVResponse: + ... + +# main.py +app.include_router(ohlcv.router) +``` + ## Configuration pattern ```python diff --git a/Makefile b/Makefile index 2c9c423..3b9ba7b 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ .PHONY: up down restart logs lint format typecheck test coverage \ build flink-submit flink-logs flink-venv ingest ingest-logs \ - dbt-compile dbt-run dbt-test replay-sample watch-cdc help + dbt-compile dbt-run dbt-test dbt-freshness replay-sample watch-cdc \ + load-gen help # ── Local stack ────────────────────────────────────────────────────────────── @@ -49,14 +50,20 @@ coverage: # ── dbt ─────────────────────────────────────────────────────────────────────── +DBT = uv run --with dbt-trino dbt --no-send-anonymous-usage-stats +DBTOPTS = --project-dir dbt --profiles-dir dbt + dbt-compile: - uv run dbt compile --project-dir dbt + $(DBT) compile $(DBTOPTS) dbt-run: - uv run dbt run --project-dir dbt + $(DBT) run $(DBTOPTS) dbt-test: - uv run dbt test --project-dir dbt + $(DBT) test $(DBTOPTS) + +dbt-freshness: + $(DBT) source freshness $(DBTOPTS) # ── Utilities ───────────────────────────────────────────────────────────────── @@ -66,6 +73,17 @@ replay-sample: build: docker compose build --no-cache +# ── Load testing ────────────────────────────────────────────────────────────── + +load-gen: + docker run --rm \ + --network ticksense_default \ + -v $(CURDIR)/k6:/scripts:ro \ + -e K6_PROMETHEUS_RW_SERVER_URL=http://prometheus:9090/api/v1/write \ + -e 'K6_PROMETHEUS_RW_TREND_STATS=p(50),p(95),p(99),max' \ + grafana/k6:latest \ + run --out experimental-prometheus-rw /scripts/script.js + # ── Ingest ──────────────────────────────────────────────────────────────────── ingest: diff --git a/README.md b/README.md index a17dfa4..5284614 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,9 @@ Once healthy, services are available at: | Flink UI | http://localhost:8081 | | Trino UI | http://localhost:8082 | | Iceberg REST | http://localhost:8181/v1/config | +| **TickSense API** | **http://localhost:8000** (docs: `/docs`) | +| **Prometheus** | **http://localhost:9090** | +| **Grafana** | **http://localhost:3000** (admin / admin) | --- @@ -353,6 +356,6 @@ See [docs/ROADMAP.md](docs/ROADMAP.md) for the full phase plan. | 1 — Local Stack + Ingest | ✅ | docker-compose, Binance WS → Kafka (57 tests, 96% cov) | | 2 — Flink Processing | ✅ | Normalize, dedup, OHLCV windows → Iceberg silver (e2e verified 2026-05-15) | | 3 — CDC + Replay | ✅ | Debezium CDC, Flink upsert job, replay CLI (e2e verified 2026-05-16) | -| 4 — Analytics Layer | — | dbt models, FastAPI | +| 4 — Analytics Layer | ✅ | dbt + Trino, FastAPI, Prometheus, Grafana (e2e verified 2026-05-16) | | 5 — Ops + Observability | — | Airflow, Great Expectations, SLA alerts | | 6 — Demo + Blog + Web | — | ticksense.ai, demo video, blog posts | diff --git a/api/Dockerfile b/api/Dockerfile new file mode 100644 index 0000000..174bc9a --- /dev/null +++ b/api/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.13-slim + +RUN apt-get update && apt-get install -y --no-install-recommends curl \ + && rm -rf /var/lib/apt/lists/* + +RUN pip install uv --no-cache-dir + +WORKDIR /app + +COPY pyproject.toml uv.lock ./ +COPY api/ ./api/ + +RUN uv sync --package ticksense-api --no-dev + +EXPOSE 8000 +CMD ["uv", "run", "--package", "ticksense-api", \ + "uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/api/pyproject.toml b/api/pyproject.toml index 0b64ec1..9d35be6 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -11,4 +11,12 @@ dependencies = [ "structlog>=24.2", "trino>=0.328.0", "prometheus-client>=0.20.0", + "anyio>=4.0", ] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/api"] diff --git a/api/src/api/config.py b/api/src/api/config.py new file mode 100644 index 0000000..0002ac1 --- /dev/null +++ b/api/src/api/config.py @@ -0,0 +1,14 @@ +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + trino_host: str = "localhost" + trino_port: int = 8082 + trino_catalog: str = "iceberg" + trino_user: str = "api" + log_level: str = "INFO" + + model_config = SettingsConfigDict(env_file=".env", frozen=True, extra="ignore") + + +settings = Settings() diff --git a/api/src/api/dependencies.py b/api/src/api/dependencies.py new file mode 100644 index 0000000..9fdc848 --- /dev/null +++ b/api/src/api/dependencies.py @@ -0,0 +1,11 @@ +from .config import settings +from .trino_client import TrinoClient + + +def get_client() -> TrinoClient: + return TrinoClient( + host=settings.trino_host, + port=settings.trino_port, + user=settings.trino_user, + catalog=settings.trino_catalog, + ) diff --git a/api/src/api/main.py b/api/src/api/main.py new file mode 100644 index 0000000..1c716be --- /dev/null +++ b/api/src/api/main.py @@ -0,0 +1,60 @@ +import asyncio +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager + +import structlog +from fastapi import FastAPI +from fastapi.responses import JSONResponse +from starlette.requests import Request + +from .config import settings +from .dependencies import get_client +from .metrics import PrometheusMiddleware, metrics_endpoint +from .poller import run_poller +from .routers import liquidity, ohlcv, pipeline, symbols + +structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(settings.log_level)) +log = structlog.get_logger() + + +@asynccontextmanager +async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]: + task = asyncio.create_task(run_poller(get_client())) + try: + yield + finally: + task.cancel() + + +app = FastAPI( + title="TickSense API", + description="Real-time crypto market analytics over Iceberg + Trino.", + version="0.1.0", + lifespan=lifespan, +) + +app.add_middleware(PrometheusMiddleware) + +app.include_router(ohlcv.router) +app.include_router(liquidity.router) +app.include_router(pipeline.router) +app.include_router(symbols.router) +app.add_route("/metrics", metrics_endpoint) + + +@app.get("/health", tags=["ops"]) +async def health() -> JSONResponse: + return JSONResponse({"status": "ok"}) + + +@app.get("/ready", tags=["ops"]) +async def ready() -> JSONResponse: + if await get_client().ping(): + return JSONResponse({"status": "ready"}) + return JSONResponse({"status": "unavailable"}, status_code=503) + + +@app.exception_handler(Exception) +async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse: + log.error("unhandled_exception", path=request.url.path, error=str(exc)) + return JSONResponse({"detail": "Internal server error"}, status_code=500) diff --git a/api/src/api/market_metrics.py b/api/src/api/market_metrics.py new file mode 100644 index 0000000..c30da42 --- /dev/null +++ b/api/src/api/market_metrics.py @@ -0,0 +1,31 @@ +from prometheus_client import Gauge + +MID_PRICE = Gauge( + "market_mid_price_usd", + "Current mid price in USD", + ["symbol", "exchange"], +) + +SPREAD_BPS = Gauge( + "market_spread_bps", + "Bid-ask spread in basis points", + ["symbol", "exchange"], +) + +IMBALANCE = Gauge( + "market_bid_ask_imbalance", + "Order book imbalance (-1 = full sell pressure, +1 = full buy pressure)", + ["symbol", "exchange"], +) + +STALENESS = Gauge( + "market_staleness_seconds", + "Seconds since last market event was ingested", + ["symbol", "exchange"], +) + +HEALTH_SCORE = Gauge( + "pipeline_health_score", + "Pipeline health score (0 = dead, 1 = healthy)", + ["symbol", "exchange"], +) diff --git a/api/src/api/metrics.py b/api/src/api/metrics.py new file mode 100644 index 0000000..7848cfc --- /dev/null +++ b/api/src/api/metrics.py @@ -0,0 +1,62 @@ +import time +from collections.abc import Awaitable, Callable + +from prometheus_client import CONTENT_TYPE_LATEST, Counter, Histogram, generate_latest +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.responses import Response + +REQUEST_COUNT: Counter = Counter( + "api_requests_total", + "Total HTTP requests", + ["method", "endpoint", "status_code"], +) + +REQUEST_DURATION: Histogram = Histogram( + "api_request_duration_seconds", + "HTTP request duration in seconds", + ["method", "endpoint"], + buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0], +) + +_STATIC_SEGMENTS = { + "ohlcv", + "spread", + "liquidity", + "symbols", + "pipeline", + "lag", + "health", + "ready", + "metrics", +} + + +def _normalize_path(path: str) -> str: + parts = path.strip("/").split("/") + return "/" + "/".join(p if p in _STATIC_SEGMENTS else "{param}" for p in parts) + + +class PrometheusMiddleware(BaseHTTPMiddleware): + async def dispatch( + self, + request: Request, + call_next: Callable[[Request], Awaitable[Response]], + ) -> Response: + endpoint = _normalize_path(request.url.path) + start = time.perf_counter() + response = await call_next(request) + duration = time.perf_counter() - start + + REQUEST_COUNT.labels( + method=request.method, + endpoint=endpoint, + status_code=str(response.status_code), + ).inc() + REQUEST_DURATION.labels(method=request.method, endpoint=endpoint).observe(duration) + response.headers["X-Response-Time-Ms"] = f"{duration * 1000:.1f}" + return response + + +async def metrics_endpoint(_: Request) -> Response: + return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) diff --git a/api/src/api/models/__init__.py b/api/src/api/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/src/api/models/liquidity.py b/api/src/api/models/liquidity.py new file mode 100644 index 0000000..47ab745 --- /dev/null +++ b/api/src/api/models/liquidity.py @@ -0,0 +1,31 @@ +from datetime import datetime + +from pydantic import BaseModel + + +class SpreadResponse(BaseModel): + exchange: str + symbol: str + latest_ts: datetime + best_bid_price: float + best_ask_price: float + spread: float + mid_price: float + spread_bps: float + + +class LiquidityResponse(BaseModel): + exchange: str + symbol: str + latest_ts: datetime + best_bid_price: float + best_ask_price: float + spread: float + mid_price: float + spread_bps: float + best_bid_qty: float + best_ask_qty: float + imbalance: float + market_signal: str + staleness_seconds: int + freshness_status: str diff --git a/api/src/api/models/ohlcv.py b/api/src/api/models/ohlcv.py new file mode 100644 index 0000000..963dba8 --- /dev/null +++ b/api/src/api/models/ohlcv.py @@ -0,0 +1,27 @@ +from datetime import datetime + +from pydantic import BaseModel + + +class OHLCVBar(BaseModel): + exchange: str + symbol: str + window_start: datetime + window_end: datetime + open_price: float + high_price: float + low_price: float + close_price: float + volume: float + vwap: float + tick_count: int + first_ts: datetime + last_ts: datetime + + +class OHLCVResponse(BaseModel): + symbol: str + exchange: str + interval: str = "1m" + bars: list[OHLCVBar] + count: int diff --git a/api/src/api/models/pipeline.py b/api/src/api/models/pipeline.py new file mode 100644 index 0000000..bdd33d1 --- /dev/null +++ b/api/src/api/models/pipeline.py @@ -0,0 +1,20 @@ +from datetime import datetime + +from pydantic import BaseModel + + +class PipelineLagItem(BaseModel): + exchange: str + symbol: str + latest_event_ts: datetime + latest_ingest_ts: datetime + staleness_seconds: int + freshness_status: str + health_score: float + + +class PipelineLagResponse(BaseModel): + items: list[PipelineLagItem] + checked_at: datetime + healthy_count: int + total_count: int diff --git a/api/src/api/models/symbols.py b/api/src/api/models/symbols.py new file mode 100644 index 0000000..36b6488 --- /dev/null +++ b/api/src/api/models/symbols.py @@ -0,0 +1,17 @@ +from datetime import datetime + +from pydantic import BaseModel + + +class SymbolItem(BaseModel): + symbol: str + exchange: str + status: str + base_asset: str + quote_asset: str + updated_at: datetime | None = None + + +class SymbolsResponse(BaseModel): + symbols: list[SymbolItem] + count: int diff --git a/api/src/api/poller.py b/api/src/api/poller.py new file mode 100644 index 0000000..378f673 --- /dev/null +++ b/api/src/api/poller.py @@ -0,0 +1,49 @@ +import asyncio + +import structlog + +from .market_metrics import HEALTH_SCORE, IMBALANCE, MID_PRICE, SPREAD_BPS, STALENESS +from .trino_client import TrinoClient + +log = structlog.get_logger() + +POLL_INTERVAL = 30 + +_LIQUIDITY_SQL = """ +SELECT exchange, symbol, mid_price, spread_bps, + COALESCE(imbalance, 0.0) AS imbalance, + staleness_seconds +FROM iceberg.marts.mart_liquidity +""" + +_HEALTH_SQL = """ +SELECT exchange, symbol, health_score +FROM iceberg.marts.mart_exchange_health +""" + + +async def _poll_once(client: TrinoClient) -> None: + try: + rows = await client.fetch(_LIQUIDITY_SQL) + for r in rows: + lbl = {"symbol": r["symbol"], "exchange": r["exchange"]} + MID_PRICE.labels(**lbl).set(r["mid_price"]) + SPREAD_BPS.labels(**lbl).set(r["spread_bps"]) + IMBALANCE.labels(**lbl).set(r["imbalance"]) + STALENESS.labels(**lbl).set(r["staleness_seconds"]) + log.info("market_poll_ok", symbols=len(rows)) + except Exception as exc: + log.warning("market_poll_liquidity_failed", error=str(exc)) + + try: + rows = await client.fetch(_HEALTH_SQL) + for r in rows: + HEALTH_SCORE.labels(symbol=r["symbol"], exchange=r["exchange"]).set(r["health_score"]) + except Exception as exc: + log.warning("market_poll_health_failed", error=str(exc)) + + +async def run_poller(client: TrinoClient) -> None: + while True: + await _poll_once(client) + await asyncio.sleep(POLL_INTERVAL) diff --git a/api/src/api/routers/__init__.py b/api/src/api/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/src/api/routers/liquidity.py b/api/src/api/routers/liquidity.py new file mode 100644 index 0000000..43cb5db --- /dev/null +++ b/api/src/api/routers/liquidity.py @@ -0,0 +1,50 @@ +import structlog +from fastapi import APIRouter, Depends, HTTPException + +from ..dependencies import get_client +from ..models.liquidity import LiquidityResponse, SpreadResponse +from ..trino_client import TrinoClient + +log = structlog.get_logger() +router = APIRouter(tags=["liquidity"]) + +_LIQUIDITY_COLS = """ + exchange, symbol, latest_ts, + best_bid_price, best_ask_price, spread, mid_price, spread_bps, + best_bid_qty, best_ask_qty, imbalance, market_signal, + staleness_seconds, freshness_status +""" + +_SPREAD_COLS = """ + exchange, symbol, latest_ts, + best_bid_price, best_ask_price, spread, mid_price, spread_bps +""" + + +@router.get("/spread/{symbol}", response_model=SpreadResponse) +async def get_spread( + symbol: str, + client: TrinoClient = Depends(get_client), +) -> SpreadResponse: + sym = symbol.upper() + sql = f"SELECT {_SPREAD_COLS} FROM iceberg.marts.mart_liquidity WHERE LOWER(symbol) = LOWER(?)" + rows = await client.fetch(sql, [sym]) + if not rows: + raise HTTPException(status_code=404, detail=f"No spread data for symbol {sym}") + return SpreadResponse.model_validate(rows[0]) + + +@router.get("/liquidity/{symbol}", response_model=LiquidityResponse) +async def get_liquidity( + symbol: str, + client: TrinoClient = Depends(get_client), +) -> LiquidityResponse: + sym = symbol.upper() + sql = ( + f"SELECT {_LIQUIDITY_COLS} FROM iceberg.marts.mart_liquidity WHERE LOWER(symbol) = LOWER(?)" + ) + rows = await client.fetch(sql, [sym]) + if not rows: + raise HTTPException(status_code=404, detail=f"No liquidity data for symbol {sym}") + log.info("liquidity_served", symbol=sym, signal=rows[0].get("market_signal")) + return LiquidityResponse.model_validate(rows[0]) diff --git a/api/src/api/routers/ohlcv.py b/api/src/api/routers/ohlcv.py new file mode 100644 index 0000000..74bebda --- /dev/null +++ b/api/src/api/routers/ohlcv.py @@ -0,0 +1,49 @@ +from datetime import datetime +from typing import Annotated + +import structlog +from fastapi import APIRouter, Depends, HTTPException, Query + +from ..dependencies import get_client +from ..models.ohlcv import OHLCVBar, OHLCVResponse +from ..trino_client import TrinoClient + +log = structlog.get_logger() +router = APIRouter(prefix="/ohlcv", tags=["ohlcv"]) + +_COLS = """ + exchange, symbol, window_start, window_end, + open_price, high_price, low_price, close_price, + volume, vwap, tick_count, first_ts, last_ts +""" + + +@router.get("/{symbol}", response_model=OHLCVResponse) +async def get_ohlcv( + symbol: str, + limit: Annotated[int, Query(ge=1, le=1440)] = 60, + from_ts: Annotated[datetime | None, Query()] = None, + to_ts: Annotated[datetime | None, Query()] = None, + client: TrinoClient = Depends(get_client), +) -> OHLCVResponse: + sym = symbol.upper() + sql_parts = [f"SELECT {_COLS} FROM iceberg.marts.mart_ohlcv WHERE LOWER(symbol) = LOWER(?)"] + params: list[object] = [sym] + + if from_ts: + sql_parts.append("AND window_start >= ?") + params.append(from_ts) + if to_ts: + sql_parts.append("AND window_end <= ?") + params.append(to_ts) + + sql_parts.extend(["ORDER BY window_start DESC", "LIMIT ?"]) + params.append(limit) + + rows = await client.fetch("\n".join(sql_parts), params) + if not rows: + raise HTTPException(status_code=404, detail=f"No OHLCV data for symbol {sym}") + + bars = [OHLCVBar.model_validate(r) for r in rows] + log.info("ohlcv_served", symbol=sym, bars=len(bars)) + return OHLCVResponse(symbol=sym, exchange=str(rows[0]["exchange"]), bars=bars, count=len(bars)) diff --git a/api/src/api/routers/pipeline.py b/api/src/api/routers/pipeline.py new file mode 100644 index 0000000..5c5c600 --- /dev/null +++ b/api/src/api/routers/pipeline.py @@ -0,0 +1,34 @@ +from datetime import UTC, datetime + +import structlog +from fastapi import APIRouter, Depends + +from ..dependencies import get_client +from ..models.pipeline import PipelineLagItem, PipelineLagResponse +from ..trino_client import TrinoClient + +log = structlog.get_logger() +router = APIRouter(prefix="/pipeline", tags=["pipeline"]) + +_SQL = """ + SELECT exchange, symbol, latest_event_ts, latest_ingest_ts, + staleness_seconds, freshness_status, health_score + FROM iceberg.marts.mart_exchange_health + ORDER BY symbol +""" + + +@router.get("/lag", response_model=PipelineLagResponse) +async def get_pipeline_lag( + client: TrinoClient = Depends(get_client), +) -> PipelineLagResponse: + rows = await client.fetch(_SQL) + items = [PipelineLagItem.model_validate(r) for r in rows] + healthy = sum(1 for i in items if i.freshness_status == "FRESH") + log.info("pipeline_lag_served", total=len(items), healthy=healthy) + return PipelineLagResponse( + items=items, + checked_at=datetime.now(UTC), + healthy_count=healthy, + total_count=len(items), + ) diff --git a/api/src/api/routers/symbols.py b/api/src/api/routers/symbols.py new file mode 100644 index 0000000..9dcf39d --- /dev/null +++ b/api/src/api/routers/symbols.py @@ -0,0 +1,24 @@ +import structlog +from fastapi import APIRouter, Depends + +from ..dependencies import get_client +from ..models.symbols import SymbolItem, SymbolsResponse +from ..trino_client import TrinoClient + +log = structlog.get_logger() +router = APIRouter(prefix="/symbols", tags=["symbols"]) + +_SQL = """ + SELECT symbol, exchange, status, base_asset, quote_asset, updated_at + FROM iceberg.normalized.symbol_config + ORDER BY symbol +""" + + +@router.get("", response_model=SymbolsResponse) +async def list_symbols( + client: TrinoClient = Depends(get_client), +) -> SymbolsResponse: + rows = await client.fetch(_SQL) + symbols = [SymbolItem.model_validate(r) for r in rows] + return SymbolsResponse(symbols=symbols, count=len(symbols)) diff --git a/api/src/api/trino_client.py b/api/src/api/trino_client.py new file mode 100644 index 0000000..fa27d6d --- /dev/null +++ b/api/src/api/trino_client.py @@ -0,0 +1,53 @@ +from typing import Any + +import anyio +import structlog +import trino + +log = structlog.get_logger() + + +class TrinoClient: + def __init__( + self, + host: str = "localhost", + port: int = 8082, + user: str = "api", + catalog: str = "iceberg", + ) -> None: + self._host = host + self._port = port + self._user = user + self._catalog = catalog + + def _connect(self) -> Any: + return trino.dbapi.connect( # type: ignore[no-untyped-call] + host=self._host, + port=self._port, + user=self._user, + catalog=self._catalog, + http_scheme="http", + ) + + async def fetch(self, sql: str, params: list[object] | None = None) -> list[dict[str, Any]]: + def _run() -> list[dict[str, Any]]: + conn = self._connect() + try: + cur = conn.cursor() + cur.execute(sql, params or []) + desc = cur.description + if desc is None: + return [] + cols = [col[0] for col in desc] + return [dict(zip(cols, row, strict=False)) for row in cur.fetchall()] + finally: + conn.close() + + return await anyio.to_thread.run_sync(_run) + + async def ping(self) -> bool: + try: + rows = await self.fetch("SELECT 1") + return len(rows) == 1 + except Exception: + return False diff --git a/api/tests/integration/test_endpoints.py b/api/tests/integration/test_endpoints.py new file mode 100644 index 0000000..6f90103 --- /dev/null +++ b/api/tests/integration/test_endpoints.py @@ -0,0 +1,183 @@ +from datetime import UTC, datetime +from typing import Any +from unittest.mock import AsyncMock, patch + +import pytest +from httpx import ASGITransport, AsyncClient + +from api.dependencies import get_client +from api.main import app +from api.trino_client import TrinoClient + +NOW = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + +_OHLCV_ROW: dict[str, Any] = { + "exchange": "binance", + "symbol": "BTCUSDT", + "window_start": NOW, + "window_end": NOW, + "open_price": 50000.0, + "high_price": 51000.0, + "low_price": 49000.0, + "close_price": 50500.0, + "volume": 100.0, + "vwap": 50200.0, + "tick_count": 60, + "first_ts": NOW, + "last_ts": NOW, +} + +_LIQUIDITY_ROW: dict[str, Any] = { + "exchange": "binance", + "symbol": "BTCUSDT", + "latest_ts": NOW, + "best_bid_price": 49990.0, + "best_ask_price": 50010.0, + "spread": 20.0, + "mid_price": 50000.0, + "spread_bps": 0.4, + "best_bid_qty": 10.0, + "best_ask_qty": 5.0, + "imbalance": 0.67, + "market_signal": "BUY_PRESSURE", + "staleness_seconds": 5, + "freshness_status": "FRESH", +} + +_HEALTH_ROW: dict[str, Any] = { + "exchange": "binance", + "symbol": "BTCUSDT", + "latest_event_ts": NOW, + "latest_ingest_ts": NOW, + "staleness_seconds": 5, + "freshness_status": "FRESH", + "health_score": 1.0, +} + +_SYMBOL_ROW: dict[str, Any] = { + "symbol": "BTCUSDT", + "exchange": "binance", + "status": "TRADING", + "base_asset": "BTC", + "quote_asset": "USDT", + "updated_at": NOW, +} + + +def _mock_client(rows: list[dict[str, Any]]) -> TrinoClient: + client = TrinoClient() + client.fetch = AsyncMock(return_value=rows) # type: ignore[method-assign] + return client + + +@pytest.fixture +def http_client(): + return AsyncClient(transport=ASGITransport(app=app), base_url="http://test") + + +class TestHealthEndpoints: + async def test_health(self, http_client: AsyncClient) -> None: + async with http_client as c: + resp = await c.get("/health") + assert resp.status_code == 200 + assert resp.json() == {"status": "ok"} + + async def test_ready_ok(self, http_client: AsyncClient) -> None: + with patch("api.main.get_client", return_value=_mock_client([{"_col0": 1}])): + async with http_client as c: + resp = await c.get("/ready") + assert resp.status_code == 200 + + async def test_ready_trino_down(self, http_client: AsyncClient) -> None: + bad_client = TrinoClient() + bad_client.fetch = AsyncMock(side_effect=Exception("connection refused")) # type: ignore[method-assign] + with patch("api.main.get_client", return_value=bad_client): + async with http_client as c: + resp = await c.get("/ready") + assert resp.status_code == 503 + + +class TestOHLCVEndpoint: + async def test_get_ohlcv(self, http_client: AsyncClient) -> None: + app.dependency_overrides[get_client] = lambda: _mock_client([_OHLCV_ROW]) + async with http_client as c: + resp = await c.get("/ohlcv/btcusdt") + app.dependency_overrides.clear() + assert resp.status_code == 200 + body = resp.json() + assert body["symbol"] == "BTCUSDT" + assert body["count"] == 1 + assert body["interval"] == "1m" + + async def test_get_ohlcv_not_found(self, http_client: AsyncClient) -> None: + app.dependency_overrides[get_client] = lambda: _mock_client([]) + async with http_client as c: + resp = await c.get("/ohlcv/UNKNOWN") + app.dependency_overrides.clear() + assert resp.status_code == 404 + + async def test_get_ohlcv_symbol_uppercased(self, http_client: AsyncClient) -> None: + mock = _mock_client([_OHLCV_ROW]) + app.dependency_overrides[get_client] = lambda: mock + async with http_client as c: + await c.get("/ohlcv/btcusdt") + app.dependency_overrides.clear() + call_args = mock.fetch.call_args # type: ignore[union-attr] + assert "BTCUSDT" in call_args.args[1] + + +class TestLiquidityEndpoints: + async def test_get_spread(self, http_client: AsyncClient) -> None: + app.dependency_overrides[get_client] = lambda: _mock_client([_LIQUIDITY_ROW]) + async with http_client as c: + resp = await c.get("/spread/btcusdt") + app.dependency_overrides.clear() + assert resp.status_code == 200 + assert resp.json()["spread_bps"] == 0.4 + + async def test_get_spread_not_found(self, http_client: AsyncClient) -> None: + app.dependency_overrides[get_client] = lambda: _mock_client([]) + async with http_client as c: + resp = await c.get("/spread/UNKNOWN") + app.dependency_overrides.clear() + assert resp.status_code == 404 + + async def test_get_liquidity(self, http_client: AsyncClient) -> None: + app.dependency_overrides[get_client] = lambda: _mock_client([_LIQUIDITY_ROW]) + async with http_client as c: + resp = await c.get("/liquidity/btcusdt") + app.dependency_overrides.clear() + assert resp.status_code == 200 + assert resp.json()["market_signal"] == "BUY_PRESSURE" + + +class TestPipelineEndpoint: + async def test_get_pipeline_lag(self, http_client: AsyncClient) -> None: + app.dependency_overrides[get_client] = lambda: _mock_client([_HEALTH_ROW]) + async with http_client as c: + resp = await c.get("/pipeline/lag") + app.dependency_overrides.clear() + assert resp.status_code == 200 + body = resp.json() + assert body["total_count"] == 1 + assert body["healthy_count"] == 1 + + async def test_pipeline_lag_empty(self, http_client: AsyncClient) -> None: + app.dependency_overrides[get_client] = lambda: _mock_client([]) + async with http_client as c: + resp = await c.get("/pipeline/lag") + app.dependency_overrides.clear() + assert resp.status_code == 200 + assert resp.json()["total_count"] == 0 + + +class TestSymbolsEndpoint: + async def test_list_symbols(self, http_client: AsyncClient) -> None: + app.dependency_overrides[get_client] = lambda: _mock_client([_SYMBOL_ROW]) + async with http_client as c: + resp = await c.get("/symbols") + app.dependency_overrides.clear() + assert resp.status_code == 200 + body = resp.json() + assert body["count"] == 1 + assert body["symbols"][0]["symbol"] == "BTCUSDT" diff --git a/api/tests/unit/test_api_models.py b/api/tests/unit/test_api_models.py new file mode 100644 index 0000000..d0b484d --- /dev/null +++ b/api/tests/unit/test_api_models.py @@ -0,0 +1,139 @@ +from datetime import UTC, datetime + +import pytest + +from api.models.liquidity import LiquidityResponse, SpreadResponse +from api.models.ohlcv import OHLCVBar, OHLCVResponse +from api.models.pipeline import PipelineLagItem, PipelineLagResponse +from api.models.symbols import SymbolItem, SymbolsResponse + +NOW = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + +def _bar() -> dict: + return { + "exchange": "binance", + "symbol": "BTCUSDT", + "window_start": NOW, + "window_end": NOW, + "open_price": 50000.0, + "high_price": 51000.0, + "low_price": 49000.0, + "close_price": 50500.0, + "volume": 100.0, + "vwap": 50200.0, + "tick_count": 60, + "first_ts": NOW, + "last_ts": NOW, + } + + +class TestOHLCVModels: + def test_bar_valid(self) -> None: + bar = OHLCVBar.model_validate(_bar()) + assert bar.symbol == "BTCUSDT" + assert bar.tick_count == 60 + + def test_response_count(self) -> None: + bars = [OHLCVBar.model_validate(_bar())] + resp = OHLCVResponse(symbol="BTCUSDT", exchange="binance", bars=bars, count=1) + assert resp.interval == "1m" + assert resp.count == 1 + + def test_bar_missing_field(self) -> None: + data = _bar() + del data["open_price"] + with pytest.raises(ValueError): + OHLCVBar.model_validate(data) + + +class TestLiquidityModels: + def test_spread_valid(self) -> None: + data = { + "exchange": "binance", + "symbol": "BTCUSDT", + "latest_ts": NOW, + "best_bid_price": 49990.0, + "best_ask_price": 50010.0, + "spread": 20.0, + "mid_price": 50000.0, + "spread_bps": 0.4, + } + resp = SpreadResponse.model_validate(data) + assert resp.spread_bps == 0.4 + + def test_liquidity_market_signal(self) -> None: + data = { + "exchange": "binance", + "symbol": "ETHUSDT", + "latest_ts": NOW, + "best_bid_price": 2999.0, + "best_ask_price": 3001.0, + "spread": 2.0, + "mid_price": 3000.0, + "spread_bps": 0.67, + "best_bid_qty": 10.0, + "best_ask_qty": 5.0, + "imbalance": 0.67, + "market_signal": "BUY_PRESSURE", + "staleness_seconds": 5, + "freshness_status": "FRESH", + } + resp = LiquidityResponse.model_validate(data) + assert resp.market_signal == "BUY_PRESSURE" + assert resp.freshness_status == "FRESH" + + +class TestPipelineModels: + def test_lag_item(self) -> None: + item = PipelineLagItem( + exchange="binance", + symbol="BTCUSDT", + latest_event_ts=NOW, + latest_ingest_ts=NOW, + staleness_seconds=10, + freshness_status="FRESH", + health_score=1.0, + ) + assert item.health_score == 1.0 + + def test_response_counts(self) -> None: + item = PipelineLagItem( + exchange="binance", + symbol="BTCUSDT", + latest_event_ts=NOW, + latest_ingest_ts=NOW, + staleness_seconds=10, + freshness_status="FRESH", + health_score=1.0, + ) + resp = PipelineLagResponse( + items=[item], + checked_at=NOW, + healthy_count=1, + total_count=1, + ) + assert resp.total_count == 1 + + +class TestSymbolModels: + def test_symbol_item(self) -> None: + item = SymbolItem( + symbol="BTCUSDT", + exchange="binance", + status="TRADING", + base_asset="BTC", + quote_asset="USDT", + ) + assert item.updated_at is None + + def test_symbols_response(self) -> None: + item = SymbolItem( + symbol="BTCUSDT", + exchange="binance", + status="TRADING", + base_asset="BTC", + quote_asset="USDT", + ) + resp = SymbolsResponse(symbols=[item], count=1) + assert resp.count == 1 diff --git a/dbt/.gitignore b/dbt/.gitignore new file mode 100644 index 0000000..087d062 --- /dev/null +++ b/dbt/.gitignore @@ -0,0 +1,3 @@ +target/ +dbt_packages/ +logs/ diff --git a/dbt/Dockerfile b/dbt/Dockerfile new file mode 100644 index 0000000..272f920 --- /dev/null +++ b/dbt/Dockerfile @@ -0,0 +1,5 @@ +FROM python:3.12-slim + +RUN pip install --no-cache-dir dbt-trino + +WORKDIR /dbt diff --git a/dbt/macros/generate_schema_name.sql b/dbt/macros/generate_schema_name.sql new file mode 100644 index 0000000..badb9c5 --- /dev/null +++ b/dbt/macros/generate_schema_name.sql @@ -0,0 +1,9 @@ +-- Use the custom schema name as-is (no target.schema prefix). +-- This keeps Trino schemas clean: iceberg.staging.*, iceberg.marts.* +{% macro generate_schema_name(custom_schema_name, node) -%} + {%- if custom_schema_name is none -%} + {{ target.schema }} + {%- else -%} + {{ custom_schema_name | trim }} + {%- endif -%} +{%- endmacro %} diff --git a/dbt/models/intermediate/int_freshness_status.sql b/dbt/models/intermediate/int_freshness_status.sql new file mode 100644 index 0000000..3e82122 --- /dev/null +++ b/dbt/models/intermediate/int_freshness_status.sql @@ -0,0 +1,13 @@ +select + exchange, + symbol, + max(exchange_event_ts) as latest_event_ts, + max(ingest_ts) as latest_ingest_ts, + date_diff('second', max(exchange_event_ts), current_timestamp) as staleness_seconds, + case + when date_diff('second', max(exchange_event_ts), current_timestamp) <= 60 then 'FRESH' + when date_diff('second', max(exchange_event_ts), current_timestamp) <= 120 then 'WARN' + else 'STALE' + end as freshness_status +from {{ ref('stg_book_ticker') }} +group by exchange, symbol diff --git a/dbt/models/intermediate/int_order_book_imbalance.sql b/dbt/models/intermediate/int_order_book_imbalance.sql new file mode 100644 index 0000000..03a9c91 --- /dev/null +++ b/dbt/models/intermediate/int_order_book_imbalance.sql @@ -0,0 +1,28 @@ +with ranked as ( + select + exchange, + symbol, + exchange_event_ts, + best_bid_qty, + best_ask_qty, + imbalance, + row_number() over ( + partition by exchange, symbol + order by exchange_event_ts desc + ) as rn + from {{ ref('stg_book_ticker') }} +) +select + exchange, + symbol, + exchange_event_ts as latest_ts, + best_bid_qty, + best_ask_qty, + imbalance, + case + when imbalance > 0.6 then 'BUY_PRESSURE' + when imbalance < 0.4 then 'SELL_PRESSURE' + else 'NEUTRAL' + end as market_signal +from ranked +where rn = 1 diff --git a/dbt/models/intermediate/int_spread_metrics.sql b/dbt/models/intermediate/int_spread_metrics.sql new file mode 100644 index 0000000..4e8d6e9 --- /dev/null +++ b/dbt/models/intermediate/int_spread_metrics.sql @@ -0,0 +1,30 @@ +with ranked as ( + select + exchange, + symbol, + exchange_event_ts, + best_bid_price, + best_ask_price, + spread, + mid_price, + best_bid_qty, + best_ask_qty, + row_number() over ( + partition by exchange, symbol + order by exchange_event_ts desc + ) as rn + from {{ ref('stg_book_ticker') }} +) +select + exchange, + symbol, + exchange_event_ts as latest_ts, + best_bid_price, + best_ask_price, + spread, + mid_price, + round(spread / nullif(mid_price, 0) * 10000, 4) as spread_bps, + best_bid_qty, + best_ask_qty +from ranked +where rn = 1 diff --git a/dbt/models/intermediate/schema.yml b/dbt/models/intermediate/schema.yml new file mode 100644 index 0000000..25de85e --- /dev/null +++ b/dbt/models/intermediate/schema.yml @@ -0,0 +1,46 @@ +version: 2 + +models: + - name: int_spread_metrics + description: "Latest best bid/ask spread metrics per (exchange, symbol). Ephemeral." + columns: + - name: exchange + tests: [not_null] + - name: symbol + tests: [not_null] + - name: latest_ts + tests: [not_null] + - name: spread + tests: [not_null] + - name: spread_bps + tests: [not_null] + + - name: int_order_book_imbalance + description: "Latest order book imbalance per (exchange, symbol). Ephemeral." + columns: + - name: exchange + tests: [not_null] + - name: symbol + tests: [not_null] + - name: imbalance + tests: [not_null] + - name: market_signal + tests: + - not_null + - accepted_values: + values: ['BUY_PRESSURE', 'SELL_PRESSURE', 'NEUTRAL'] + + - name: int_freshness_status + description: "Per-symbol data freshness relative to current_timestamp. Ephemeral." + columns: + - name: exchange + tests: [not_null] + - name: symbol + tests: [not_null] + - name: staleness_seconds + tests: [not_null] + - name: freshness_status + tests: + - not_null + - accepted_values: + values: ['FRESH', 'WARN', 'STALE'] diff --git a/dbt/models/marts/mart_exchange_health.sql b/dbt/models/marts/mart_exchange_health.sql new file mode 100644 index 0000000..8adaed5 --- /dev/null +++ b/dbt/models/marts/mart_exchange_health.sql @@ -0,0 +1,15 @@ +{{ config(materialized='view') }} + +select + exchange, + symbol, + latest_event_ts, + latest_ingest_ts, + staleness_seconds, + freshness_status, + case + when staleness_seconds <= 60 then 1.0 + when staleness_seconds <= 120 then 0.5 + else 0.0 + end as health_score +from {{ ref('int_freshness_status') }} diff --git a/dbt/models/marts/mart_liquidity.sql b/dbt/models/marts/mart_liquidity.sql new file mode 100644 index 0000000..659fcbc --- /dev/null +++ b/dbt/models/marts/mart_liquidity.sql @@ -0,0 +1,24 @@ +{{ config(materialized='view') }} + +select + s.exchange, + s.symbol, + s.latest_ts, + s.best_bid_price, + s.best_ask_price, + s.spread, + s.mid_price, + s.spread_bps, + s.best_bid_qty, + s.best_ask_qty, + i.imbalance, + i.market_signal, + f.staleness_seconds, + f.freshness_status +from {{ ref('int_spread_metrics') }} s +left join {{ ref('int_order_book_imbalance') }} i + on s.exchange = i.exchange + and s.symbol = i.symbol +left join {{ ref('int_freshness_status') }} f + on s.exchange = f.exchange + and s.symbol = f.symbol diff --git a/dbt/models/marts/mart_ohlcv.sql b/dbt/models/marts/mart_ohlcv.sql new file mode 100644 index 0000000..58d8ab8 --- /dev/null +++ b/dbt/models/marts/mart_ohlcv.sql @@ -0,0 +1,15 @@ +select + exchange, + symbol, + window_start, + window_end, + open_price, + high_price, + low_price, + close_price, + volume, + vwap, + tick_count, + first_ts, + last_ts +from {{ ref('stg_ohlcv_1m') }} diff --git a/dbt/models/marts/schema.yml b/dbt/models/marts/schema.yml new file mode 100644 index 0000000..ff2cdf7 --- /dev/null +++ b/dbt/models/marts/schema.yml @@ -0,0 +1,65 @@ +version: 2 + +models: + - name: mart_ohlcv + description: "1-minute OHLCV for all symbols. Materialized table, refresh on schedule." + columns: + - name: exchange + tests: [not_null] + - name: symbol + tests: [not_null] + - name: window_start + tests: [not_null] + - name: window_end + tests: [not_null] + - name: open_price + tests: [not_null] + - name: high_price + tests: [not_null] + - name: low_price + tests: [not_null] + - name: close_price + tests: [not_null] + - name: tick_count + tests: [not_null] + + - name: mart_liquidity + description: "Real-time spread and order book imbalance per symbol. View — always fresh." + columns: + - name: exchange + tests: [not_null] + - name: symbol + tests: [not_null] + - name: latest_ts + tests: [not_null] + - name: spread_bps + tests: [not_null] + - name: imbalance + tests: [not_null] + - name: market_signal + tests: + - not_null + - accepted_values: + values: ['BUY_PRESSURE', 'SELL_PRESSURE', 'NEUTRAL'] + - name: freshness_status + tests: + - not_null + - accepted_values: + values: ['FRESH', 'WARN', 'STALE'] + + - name: mart_exchange_health + description: "Per-symbol pipeline freshness and health score. View — always fresh." + columns: + - name: exchange + tests: [not_null] + - name: symbol + tests: [not_null] + - name: staleness_seconds + tests: [not_null] + - name: freshness_status + tests: + - not_null + - accepted_values: + values: ['FRESH', 'WARN', 'STALE'] + - name: health_score + tests: [not_null] diff --git a/dbt/models/sources.yml b/dbt/models/sources.yml new file mode 100644 index 0000000..84f829e --- /dev/null +++ b/dbt/models/sources.yml @@ -0,0 +1,24 @@ +version: 2 + +sources: + - name: normalized + description: "Silver layer: normalized, deduplicated Iceberg tables written by Flink." + database: iceberg + schema: normalized + freshness: + warn_after: + count: 2 + period: minute + error_after: + count: 5 + period: minute + loaded_at_field: ingest_ts + tables: + - name: book_ticker + description: "Best bid/ask per symbol, continuous ticks from Binance L2 stream." + - name: ohlcv_1m + description: "1-minute OHLCV aggregates from Flink tumbling window." + - name: symbol_config + description: "Trading pair config, CDC-synced from Postgres via Debezium." + freshness: null + loaded_at_field: updated_at diff --git a/dbt/models/staging/schema.yml b/dbt/models/staging/schema.yml new file mode 100644 index 0000000..d3448e1 --- /dev/null +++ b/dbt/models/staging/schema.yml @@ -0,0 +1,69 @@ +version: 2 + +models: + - name: stg_book_ticker + description: "Silver book-ticker: best bid/ask per symbol tick, light casting only." + columns: + - name: event_id + description: "Unique dedup key: {exchange}#{symbol}#{last_update_id}" + tests: [not_null] + - name: exchange + tests: [not_null] + - name: symbol + tests: [not_null] + - name: exchange_event_ts + description: "Event timestamp from Binance (UTC)." + tests: [not_null] + - name: ingest_ts + description: "Timestamp when the event was written to Iceberg." + tests: [not_null] + - name: best_bid_price + tests: [not_null] + - name: best_ask_price + tests: [not_null] + - name: spread + tests: [not_null] + - name: mid_price + tests: [not_null] + - name: imbalance + description: "bid_qty / (bid_qty + ask_qty), range [0, 1]." + tests: [not_null] + + - name: stg_ohlcv_1m + description: "Silver 1-minute OHLCV aggregates from Flink tumbling window." + columns: + - name: exchange + tests: [not_null] + - name: symbol + tests: [not_null] + - name: window_start + tests: [not_null] + - name: window_end + tests: [not_null] + - name: open_price + tests: [not_null] + - name: high_price + tests: [not_null] + - name: low_price + tests: [not_null] + - name: close_price + tests: [not_null] + - name: tick_count + tests: [not_null] + + - name: stg_symbol_config + description: "Current trading pair configuration, CDC-synced from Postgres." + columns: + - name: symbol + tests: [not_null, unique] + - name: exchange + tests: [not_null] + - name: status + tests: + - not_null + - accepted_values: + values: ['TRADING', 'BREAK', 'END_OF_DAY'] + - name: base_asset + tests: [not_null] + - name: quote_asset + tests: [not_null] diff --git a/dbt/models/staging/stg_book_ticker.sql b/dbt/models/staging/stg_book_ticker.sql new file mode 100644 index 0000000..92ce6c3 --- /dev/null +++ b/dbt/models/staging/stg_book_ticker.sql @@ -0,0 +1,17 @@ +select + event_id, + exchange, + symbol, + exchange_event_ts, + ingest_ts, + best_bid_price, + best_bid_qty, + best_ask_price, + best_ask_qty, + spread, + mid_price, + imbalance, + kafka_topic, + kafka_partition, + kafka_offset +from {{ source('normalized', 'book_ticker') }} diff --git a/dbt/models/staging/stg_ohlcv_1m.sql b/dbt/models/staging/stg_ohlcv_1m.sql new file mode 100644 index 0000000..0be30cb --- /dev/null +++ b/dbt/models/staging/stg_ohlcv_1m.sql @@ -0,0 +1,15 @@ +select + exchange, + symbol, + window_start, + window_end, + open_price, + high_price, + low_price, + close_price, + volume, + vwap, + tick_count, + first_ts, + last_ts +from {{ source('normalized', 'ohlcv_1m') }} diff --git a/dbt/models/staging/stg_symbol_config.sql b/dbt/models/staging/stg_symbol_config.sql new file mode 100644 index 0000000..de194d1 --- /dev/null +++ b/dbt/models/staging/stg_symbol_config.sql @@ -0,0 +1,12 @@ +select + symbol, + exchange, + status, + base_asset, + quote_asset, + lot_size_min, + lot_size_step, + tick_size, + created_at, + updated_at +from {{ source('normalized', 'symbol_config') }} diff --git a/dbt/profiles.yml b/dbt/profiles.yml new file mode 100644 index 0000000..ed73d27 --- /dev/null +++ b/dbt/profiles.yml @@ -0,0 +1,12 @@ +ticksense: + target: dev + outputs: + dev: + type: trino + host: "{{ env_var('TRINO_HOST', 'localhost') }}" + port: "{{ env_var('TRINO_PORT', '8082') | int }}" + user: dbt + database: iceberg + schema: dbt + threads: 4 + http_scheme: http diff --git a/docker-compose.yml b/docker-compose.yml index 9b1a43d..2ec7eff 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -369,8 +369,145 @@ services: condition: service_completed_successfully restart: unless-stopped + # ── dbt runner (one-shot: creates staging views + mart tables in Trino) ────── + + dbt-runner: + build: ./dbt + working_dir: /dbt + volumes: + - ./dbt:/dbt + environment: + TRINO_HOST: trino + TRINO_PORT: "8080" + command: + - bash + - -c + - | + echo "=== Waiting for Flink to create normalized schema ===" && \ + until python3 -c " + import trino, sys + try: + conn = trino.dbapi.connect(host='trino', port=8080, user='dbt', catalog='iceberg') + cur = conn.cursor() + cur.execute('SELECT 1 FROM iceberg.normalized.book_ticker LIMIT 1') + cur.fetchall() + sys.exit(0) + except Exception as e: + print(f'Not ready: {e}') + sys.exit(1) + " 2>/dev/null; do + echo " normalized.book_ticker not ready, retrying in 10s..." + sleep 10 + done && \ + echo "=== normalized schema ready, running dbt ===" && \ + dbt run --profiles-dir . --no-send-anonymous-usage-stats && \ + echo "=== dbt run complete ===" + depends_on: + trino: + condition: service_healthy + flink-init: + condition: service_completed_successfully + restart: "no" + + # ── FastAPI query layer ─────────────────────────────────────────────────────── + + api: + build: + context: . + dockerfile: api/Dockerfile + environment: + TRINO_HOST: trino + TRINO_PORT: "8080" + LOG_LEVEL: INFO + ports: + - "8000:8000" + depends_on: + trino: + condition: service_healthy + dbt-runner: + condition: service_completed_successfully + healthcheck: + test: ["CMD-SHELL", "curl -sf http://localhost:8000/health"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 15s + restart: unless-stopped + + # ── Prometheus ─────────────────────────────────────────────────────────────── + + prometheus: + image: victoriametrics/victoria-metrics:v1.102.1 + volumes: + - ./infra/config/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + - prometheus-data:/victoria-metrics-data + command: + - "--promscrape.config=/etc/prometheus/prometheus.yml" + - "--promscrape.config.strictParse=false" + - "--retentionPeriod=7d" + - "--httpListenAddr=:9090" + ports: + - "9090:9090" + depends_on: + api: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://127.0.0.1:9090/-/healthy"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + restart: unless-stopped + + # ── Grafana ────────────────────────────────────────────────────────────────── + + grafana: + image: grafana/grafana:11.3.0 + environment: + GF_SECURITY_ADMIN_USER: admin + GF_SECURITY_ADMIN_PASSWORD: admin + GF_USERS_ALLOW_SIGN_UP: "false" + GF_DASHBOARDS_DEFAULT_HOME_DASHBOARD_PATH: /var/lib/grafana/dashboards/ticksense.json + volumes: + - ./infra/config/grafana/provisioning:/etc/grafana/provisioning:ro + - ./infra/config/grafana/dashboards:/var/lib/grafana/dashboards:ro + - grafana-data:/var/lib/grafana + ports: + - "3000:3000" + depends_on: + prometheus: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://localhost:3000/api/health"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 20s + restart: unless-stopped + + # ── k6 load generator (not started by make up — use make load-gen) ────────── + + k6: + image: grafana/k6:latest + profiles: + - load-test + volumes: + - ./k6:/scripts:ro + environment: + K6_PROMETHEUS_RW_SERVER_URL: http://prometheus:9090/api/v1/write + K6_PROMETHEUS_RW_TREND_STATS: p(50),p(95),p(99),max + command: run --out experimental-prometheus-rw /scripts/script.js + depends_on: + api: + condition: service_healthy + prometheus: + condition: service_healthy + restart: "no" + volumes: redpanda-data: postgres-data: minio-data: flink-data: + prometheus-data: + grafana-data: diff --git a/docs/DEBUGGING_PHASE4.md b/docs/DEBUGGING_PHASE4.md new file mode 100644 index 0000000..fbd0486 --- /dev/null +++ b/docs/DEBUGGING_PHASE4.md @@ -0,0 +1,209 @@ +# TickSense — Phase 4 Debug Runbook (API + dbt + Monitoring) + +--- + +## dbt + +### `dbt compile` fails with "Connection refused" + +**Symptom:** `make dbt-compile` exits with `HTTPConnectionPool … Connection refused` + +**Cause:** `dbt compile` with the Trino adapter tries to connect to Trino even for compilation. This is expected when the stack is down. + +**Fix:** Start the stack first (`make up`), then run `make dbt-compile`. + +**Note:** `dbt parse` and `dbt ls` work offline and can validate model graph without a live connection. + +--- + +### `dbt source freshness` on `symbol_config` always skips + +**Cause:** `symbol_config` has `freshness: null` in `sources.yml` — it's a CDC table updated on demand, not a streaming source. + +**Expected:** Only `book_ticker` and `ohlcv_1m` are freshness-checked. + +--- + +### dbt `period: second` not valid + +**Symptom:** `Parsing Error … is not valid under any of the given schemas` + +**Cause:** dbt freshness only supports `minute`, `hour`, `day` — not `second`. + +**Fix:** Use `minute` as the smallest period unit in `sources.yml`. + +--- + +### dbt marts not queryable from API + +**Symptom:** API returns 500 / Trino reports `Table not found: iceberg.marts.mart_ohlcv` + +**Cause:** dbt has not been run yet; the mart tables/views don't exist. + +**Fix:** +```bash +make dbt-run # creates staging views + mart tables/views in Trino +``` + +The `dbt-runner` service in docker-compose runs this automatically on `make up`. + +--- + +## Docker + +### API healthcheck fails — `curl: executable file not found` + +**Symptom:** `docker inspect ticksense-api-1` shows health status `unhealthy`; prometheus and grafana never start (they depend on api being healthy). + +**Cause:** `python:3.13-slim` is a stripped Debian image with no `curl`. The healthcheck `curl -sf http://localhost:8000/health` fails immediately. + +**Fix:** Install `curl` in `api/Dockerfile`: +```dockerfile +RUN apt-get update && apt-get install -y --no-install-recommends curl \ + && rm -rf /var/lib/apt/lists/* +``` + +Rebuild the image (`docker compose build api`) after this change. + +--- + +## API (FastAPI) + +### `ModuleNotFoundError: No module named 'api'` in pytest + +**Symptom:** Tests fail to collect with import error on `api`, `ingest`, or `replay`. + +**Root cause:** uv workspace members must be installed as editable packages so pytest can find their source. + +**Fix:** Run `uv sync --all-packages` (not just `uv sync`) to install all workspace member dependencies into the venv. Plain `uv sync` only syncs the root package. The editable installs place `.pth` files in site-packages that add each `src/` directory to `sys.path` automatically — no `pythonpath` pytest config needed. + +> **Do NOT add `pythonpath` to `[tool.pytest.ini_options]`** — it duplicates the `.pth` setup and causes namespace-package shadowing. See the "namespace package shadowing" entry above. + +--- + +### API dependencies (pydantic, structlog, etc.) not found in tests + +**Symptom:** `ModuleNotFoundError: No module named 'pydantic'` when running `api/tests`. + +**Cause:** `uv sync` without `--all-packages` does not install workspace member runtime dependencies. + +**Fix:** +```bash +uv sync --all-packages +``` + +Add this to onboarding / CI setup steps. + +--- + +### `/ready` returns 503 even when Trino is up + +**Cause:** `TrinoClient.ping()` catches all exceptions silently. Check: +1. `TRINO_HOST` env var — defaults to `localhost`; inside Docker it should be `trino` +2. `TRINO_PORT` — defaults to `8082` (host-mapped); inside Docker it should be `8080` +3. Trino healthcheck: `curl -sf http://localhost:8082/v1/info | python3 -m json.tool` + +--- + +### Prometheus `/metrics` returns 404 + +**Cause:** `/metrics` is registered via `app.add_route(...)` not `@app.get(...)` — it won't appear in OpenAPI docs but is accessible at `http://localhost:8000/metrics`. + +--- + +## Dependency injection in tests + +### Pattern: override `get_client` per test + +```python +from api.dependencies import get_client +from api.main import app + +def _mock_client(rows): + client = TrinoClient() + client.fetch = AsyncMock(return_value=rows) + return client + +app.dependency_overrides[get_client] = lambda: _mock_client([...]) +# ... run test ... +app.dependency_overrides.clear() # always clean up +``` + +### Pattern: test `/ready` (not a Depends endpoint) + +`/ready` calls `get_client()` directly (not via `Depends`), so override via `patch`: +```python +with patch("api.main.get_client", return_value=_mock_client([{"_col0": 1}])): + resp = await client.get("/ready") +``` + +--- + +## pytest: `No module named 'replay.config'` — namespace package shadowing + +**Symptom:** Running `make test` or `make coverage` fails at collection with: +``` +ImportError while loading conftest 'replay/tests/conftest.py' +ModuleNotFoundError: No module named 'replay.config' +``` +Even though `uv run python -c "from replay.config import Settings"` works fine. + +**Root cause (subtle):** Three layers interact badly: + +1. In a uv workspace with `src/` layout, workspace member directories (`replay/`, `api/`, `ingest/`) share names with their Python packages but live at the project root *without* `__init__.py`. +2. Python's import system, upon seeing `replay/` in the current working directory (via `''` on `sys.path`), marks it as a **namespace package portion** and caches it in `sys.modules` as `_NamespacePath(['.../replay'])`. +3. When `--import-mode=importlib` and the `pythonpath` pytest config option are used *together*, pytest triggers this namespace-package import during its configuration phase — **before** the `pythonpath` plugin finishes inserting `replay/src` into `sys.path`. The stale namespace package is then cached and can't be evicted by later path insertions. + +The crucial symptom: `replay.__file__` is `None` and `replay.__path__` is `_NamespacePath(['.../replay'])` — this is the workspace *member directory*, not the real package at `replay/src/replay/`. + +**Why `uv run python` works but `uv run pytest` doesn't:** +`uv run python` imports `replay` lazily, after site-packages `.pth` files have already added `replay/src` to `sys.path`. Python scans `sys.path` in order, finds the namespace portion at `''` (cwd), continues scanning, and eventually finds the regular package at `replay/src/replay/` — regular package wins. But pytest's importlib mode triggers module discovery differently and hits the namespace package first. + +**Fix applied:** + +1. **Removed `pythonpath` from `[tool.pytest.ini_options]`** — the editable installs already handle this via `.pth` files in site-packages. The `pythonpath` config was redundant and its insertion timing broke the import order. +2. **Removed `--import-mode=importlib`** — not needed once `pythonpath` is gone. +3. **Removed `__init__.py` from `api/tests/`** — rootless test layout avoids module-name collisions between `api/tests/unit/test_models.py` and `ingest/tests/unit/test_models.py`. +4. **Renamed `api/tests/unit/test_models.py` → `test_api_models.py`** — eliminates the naming collision. + +**Rule of thumb for uv workspaces:** Let editable installs (`.pth` files) own the import paths. Don't add `pythonpath` to pytest config — it duplicates the `.pth` setup and introduces timing races. Use rootless test layout (no `__init__.py` in test dirs) when multiple workspace members have tests with identical filenames. + +--- + +## Grafana: Health Score oscillates between 100% and 50% + +**Symptom:** `Health Score by Symbol` panel alternates between 100% and 50% every ~30 seconds. + +**Root cause:** Flink writes to Iceberg at **checkpoint boundaries**, not per-event. With a ~60s checkpoint interval, the data visible to Trino lags 0–60s behind real time. The original `health_score` formula was: + +```sql +WHEN staleness_seconds <= 30 THEN 1.0 -- FRESH +WHEN staleness_seconds <= 60 THEN 0.5 -- WARN +``` + +Because staleness grows from ~0s (just after checkpoint) to ~60s (just before next checkpoint), the score oscillates: `1.0 → 0.5 → 1.0 → …` in sync with the checkpoint cycle. This looks like a pipeline problem but is actually normal lakehouse behavior. + +**Fix:** Align the threshold with the actual checkpoint interval: + +```sql +WHEN staleness_seconds <= 60 THEN 1.0 -- FRESH (one full checkpoint window) +WHEN staleness_seconds <= 120 THEN 0.5 -- WARN +``` + +Files changed: `dbt/models/marts/mart_exchange_health.sql`, `dbt/models/intermediate/int_freshness_status.sql`, Grafana dashboard description + threshold annotation. + +**Key insight:** In a lakehouse architecture, "freshness" is bounded by the streaming engine's commit interval, not by the source event rate. WebSocket events arrive every millisecond; Iceberg snapshots commit every 30–60s. Freshness SLOs must be set against the commit interval, not the ingestion rate. + +--- + +## Port reference + +| Service | Host port | +|------------|-----------| +| FastAPI | 8000 | +| Prometheus | 9090 | +| Grafana | 3000 | +| Trino | 8082 | +| MinIO UI | 9001 | +| Flink UI | 8081 | +| Redpanda | 8080 | diff --git a/docs/MARKET_CONCEPTS.md b/docs/MARKET_CONCEPTS.md new file mode 100644 index 0000000..8de054c --- /dev/null +++ b/docs/MARKET_CONCEPTS.md @@ -0,0 +1,126 @@ +# Market Microstructure Concepts + +> Reference for terms used in TickSense's liquidity and analytics layer. +> +> 中文文档:[点击这里](./MARKET_CONCEPTS_ZH.md) + +--- + +## Bid / Ask / Mid Price + +The order book always has two sides: + +- **Best Bid** (`best_bid_price`): the highest price any buyer is currently willing to pay. +- **Best Ask** (`best_ask_price`): the lowest price any seller is currently willing to accept. +- **Mid Price** (`mid_price`): the arithmetic average of the two — `(bid + ask) / 2`. Used as a fair-value reference that is neutral to either side. + +A trade executes when a buyer and seller agree: the buyer pays the ask, the seller receives the bid. The difference between them is profit for the market maker. + +--- + +## Spread + +The **spread** is the gap between the best ask and best bid: + +``` +spread = best_ask_price - best_bid_price +``` + +It represents the **cost of immediate execution**: if you buy at the ask and sell at the bid right away, you lose exactly one spread. A narrow spread means the asset is liquid and easy to trade cheaply; a wide spread means the market is illiquid or uncertain and entering/exiting a position is expensive. + +--- + +## bps (Basis Points) + +**1 bps = 0.01% = 0.0001** + +Basis points express small relative changes in a way that avoids the ambiguity of percentages. In finance, "a 1% change in 1%" is confusing — "1 basis point" is not. + +The **spread in bps** normalises the raw spread against the mid price: + +``` +spread_bps = (spread / mid_price) × 10,000 +``` + +This makes spreads comparable across assets of very different prices. A $14 spread on a $78,000 BTC is only **1.8 bps** — tighter than a $0.04 spread on a $2,181 ETH which is **0.18 bps** (ETH is actually more liquid in relative terms). + +| spread_bps | typical interpretation | +|---|---| +| < 1 | extremely liquid (major pairs on top exchanges) | +| 1 – 5 | liquid, normal for large-cap crypto | +| 5 – 20 | moderate; watch for slippage on large orders | +| > 20 | illiquid or stressed market | + +--- + +## Order Book Imbalance + +Imbalance measures the **asymmetry of supply vs demand** at the top of the order book: + +``` +imbalance = (best_bid_qty - best_ask_qty) / (best_bid_qty + best_ask_qty) +``` + +Range: **−1 to +1** + +| value | meaning | +|---|---| +| +1.0 | all quoted volume is on the buy side; zero supply | +| +0.5 | bids are 3× larger than asks | +| 0.0 | perfectly balanced | +| −0.5 | asks are 3× larger than bids | +| −1.0 | all quoted volume is on the sell side; zero demand | + +Imbalance is a **leading microstructure signal**: the side with more volume is more "committed" and price tends to move toward the thinner side to find liquidity. A strongly positive imbalance often precedes a short-term price uptick; strongly negative often precedes a dip. + +It is a signal, not a guarantee — informed traders can and do spoof (place large visible orders they intend to cancel). + +--- + +## Buy Pressure / Sell Pressure + +`market_signal` is a categorical label derived from the imbalance value: + +``` +imbalance > +0.2 → BUY_PRESSURE +imbalance < −0.2 → SELL_PRESSURE +otherwise → NEUTRAL +``` + +It summarises the current order book posture in human-readable form. In TickSense it is computed in the `int_order_book_imbalance` dbt model and exposed via the `/liquidity/{symbol}` API endpoint. + +**Buy Pressure** does not mean "price will go up." It means buyers are currently more aggressively quoted than sellers at the top of the book — a short-term directional lean, useful as one input among many. + +--- + +## Staleness / Freshness + +`staleness_seconds` is how many seconds have elapsed since the last market event was ingested: + +``` +staleness_seconds = current_timestamp − latest_event_ts +``` + +`freshness_status` is a derived label: + +| status | staleness | +|---|---| +| `FRESH` | < 30 seconds | +| `STALE` | 30 – 120 seconds | +| `DEAD` | > 120 seconds | + +For a system targeting < 30s end-to-end latency, any symbol with `STALE` or `DEAD` status indicates a pipeline problem worth investigating. + +--- + +## Putting It Together + +A healthy, liquid market looks like: + +| metric | healthy value | +|---|---| +| `spread_bps` | < 5 bps | +| `staleness_seconds` | < 30 s | +| `freshness_status` | `FRESH` | +| `health_score` | 1.0 | +| `imbalance` | near 0 (balanced) | diff --git a/docs/MARKET_CONCEPTS_ZH.md b/docs/MARKET_CONCEPTS_ZH.md new file mode 100644 index 0000000..d5ae466 --- /dev/null +++ b/docs/MARKET_CONCEPTS_ZH.md @@ -0,0 +1,126 @@ +# 市场微观结构概念词汇表 + +> TickSense 流动性与分析层所用术语的中文参考文档。 +> +> English doc: [click here](./MARKET_CONCEPTS.md) + +--- + +## 买价 / 卖价 / 中间价(Bid / Ask / Mid Price) + +订单簿始终有买卖两侧: + +- **最优买价(Best Bid)**:当前出价最高的买方愿意支付的价格。 +- **最优卖价(Best Ask)**:当前报价最低的卖方愿意接受的价格。 +- **中间价(Mid Price)**:两者的算术平均值 `(买价 + 卖价) / 2`,作为不偏向任何一方的公允价值参考。 + +成交发生在买卖双方价格达成一致时:买方付出卖价,卖方收到买价,中间的差额归做市商(market maker)所有。 + +--- + +## 买卖价差(Spread) + +**价差** 是最优卖价与最优买价之间的差距: + +``` +价差 = 最优卖价 - 最优买价 +``` + +它代表**立即成交的成本**:若你以卖价买入、立即以买价卖出,你恰好损失了一个价差。价差窄意味着资产流动性好、交易成本低;价差宽意味着市场流动性差或不确定性高,进出仓位代价较大。 + +--- + +## 基点(bps,Basis Points) + +**1 bps(基点)= 0.01% = 0.0001** + +基点用于表达微小的相对变化,避免了百分比的歧义。金融领域中"1% 的 1%"容易混淆,而"1 个基点"则不会。 + +**Spread bps** 将原始价差除以中间价并放大 10,000 倍,实现跨资产的横向比较: + +``` +spread_bps = (价差 / 中间价) × 10,000 +``` + +BTC 的 $14 价差在 $78,000 的价格下只有 **1.8 bps**;而 ETH 的 $0.04 价差在 $2,181 下是 **0.18 bps**——相对而言 ETH 流动性更好。 + +| spread_bps | 典型含义 | +|---|---| +| < 1 | 极高流动性(主流交易所主要交易对) | +| 1 – 5 | 流动性良好,大市值加密货币正常水平 | +| 5 – 20 | 中等流动性;大单可能有滑点 | +| > 20 | 流动性差或市场承压 | + +--- + +## 订单簿失衡度(Order Book Imbalance) + +失衡度衡量订单簿顶层**供需的不对称程度**: + +``` +失衡度 = (最优买量 - 最优卖量) / (最优买量 + 最优卖量) +``` + +取值范围:**−1 到 +1** + +| 数值 | 含义 | +|---|---| +| +1.0 | 所有挂单量均在买方;卖方无人挂单 | +| +0.5 | 买单量是卖单量的 3 倍 | +| 0.0 | 买卖完全平衡 | +| −0.5 | 卖单量是买单量的 3 倍 | +| −1.0 | 所有挂单量均在卖方;买方无人挂单 | + +失衡度是一种**领先的微观结构信号**:挂单量更大的一侧"意愿"更强,价格往往向挂单更少的一侧移动以寻求流动性。强正失衡通常预示短期价格上行;强负失衡则往往预示下跌。 + +这是信号而非保证——知情交易者会进行**幌骗(Spoofing)**:挂出大单制造假象后迅速撤单。 + +--- + +## 买压 / 卖压(Buy Pressure / Sell Pressure) + +`market_signal` 是由失衡度衍生的分类标签: + +``` +失衡度 > +0.2 → BUY_PRESSURE(买压) +失衡度 < −0.2 → SELL_PRESSURE(卖压) +否则 → NEUTRAL(中性) +``` + +它将当前订单簿的态势转化为人类可读的形式。在 TickSense 中,该信号由 dbt 模型 `int_order_book_imbalance` 计算,并通过 `/liquidity/{symbol}` API 端点对外暴露。 + +**买压并不意味着价格一定上涨**,而是表示当前订单簿顶层买方比卖方更积极地报价——这是一个短期方向性倾向,可作为多种信号之一参考使用。 + +--- + +## 数据新鲜度(Staleness / Freshness) + +`staleness_seconds` 表示自上次市场事件被摄入以来经过的秒数: + +``` +数据陈旧度 = 当前时间戳 − 最新事件时间戳 +``` + +`freshness_status` 是由此衍生的状态标签: + +| 状态 | 陈旧度 | +|---|---| +| `FRESH`(新鲜) | < 30 秒 | +| `STALE`(陈旧) | 30 – 120 秒 | +| `DEAD`(死亡) | > 120 秒 | + +对于目标端到端延迟 < 30 秒的系统,任何状态为 `STALE` 或 `DEAD` 的交易对都意味着 pipeline 存在问题,需要排查。 + +--- + +## 综合理解 + +一个健康、流动性好的市场应呈现如下状态: + +| 指标 | 健康值 | +|---|---| +| `spread_bps` | < 5 bps | +| `staleness_seconds` | < 30 秒 | +| `freshness_status` | `FRESH` | +| `health_score` | 1.0 | +| `imbalance` | 接近 0(平衡) | diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index cb70c95..5a956f5 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -185,40 +185,57 @@ replay/src/replay/ --- -## Phase 4 — Analytics Layer +## Phase 4 — Analytics Layer ✅ -**Goal:** dbt models queryable via Trino; FastAPI serving OHLCV and liquidity endpoints. +**Goal:** dbt models queryable via Trino; FastAPI serving OHLCV and liquidity endpoints; Prometheus + Grafana monitoring. -### dbt models +**Done:** Full stack e2e verified 2026-05-16. All 5 API endpoints return live Binance data. Grafana dashboard showing real-time prices, spreads, imbalance, and pipeline health. See `docs/DEBUGGING_PHASE4.md` for all gotchas. + +### dbt models (9 models, PASS=6 in docker-compose) ``` -staging: stg_orderbook_diffs, stg_book_ticker, stg_symbol_config -intermediate: int_ohlcv_1m, int_spread_metrics, int_order_book_imbalance, - int_liquidity_score, int_freshness_status -marts: mart_ohlcv, mart_liquidity, mart_volatility, mart_exchange_health +staging (view): stg_book_ticker, stg_ohlcv_1m, stg_symbol_config +intermediate (ephem): int_spread_metrics, int_order_book_imbalance, int_freshness_status +marts: mart_ohlcv (table), mart_liquidity (view), mart_exchange_health (view) ``` +Views recalculate `current_timestamp` on every Trino query → always fresh liquidity/health data without re-running dbt. + ### FastAPI endpoints ``` -GET /health -GET /ready -GET /metrics -GET /v1/ohlcv?symbol=BTCUSDT&interval=1m&limit=100 -GET /v1/liquidity?symbol=BTCUSDT -GET /v1/symbols -GET /v1/freshness +GET /health liveness check +GET /ready readiness check (pings Trino) +GET /metrics Prometheus scrape endpoint +GET /ohlcv/{symbol} 1m OHLCV bars (last 60 by default, filterable by ts range) +GET /spread/{symbol} best bid/ask, spread in bps +GET /liquidity/{symbol} spread + imbalance + market signal + freshness +GET /pipeline/lag health score + staleness for all 5 pairs +GET /symbols active trading pairs from CDC symbol_config ``` -**Deliverables:** -- [ ] 12 dbt models with schema.yml tests and source freshness -- [ ] `make dbt-run && make dbt-test` passes -- [ ] FastAPI service with Pydantic request/response models on every endpoint -- [ ] `/health`, `/ready`, `/metrics` on FastAPI -- [ ] Integration tests: FastAPI → Trino → Iceberg round-trip -- [ ] `make test` passes +### Monitoring stack -**Done when:** `curl localhost:8000/v1/ohlcv?symbol=BTCUSDT&interval=1m` returns data. +- **Background poller**: FastAPI lifespan task queries Trino every 30s, updates Prometheus Gauges +- **Business metrics**: `market_mid_price_usd`, `market_spread_bps`, `market_bid_ask_imbalance`, `market_staleness_seconds`, `pipeline_health_score` — all labeled by `{symbol, exchange}` +- **Grafana dashboard**: 17 panels across 3 sections — API ops, Live Market Prices, Pipeline Health + +### Key design decisions + +**Freshness threshold: 60s not 30s** — Flink writes to Iceberg at checkpoint boundaries (~30–60s interval), not per-event. The data visible to Trino is always one checkpoint behind. Targeting `staleness ≤ 30s = FRESH` would cause health_score to oscillate 1.0↔0.5 every checkpoint cycle. The correct threshold is `≤ 60s` (one full checkpoint interval). See `docs/DEBUGGING_PHASE4.md`. + +**Deliverables:** +- [x] 9 dbt models with `schema.yml` tests and source freshness +- [x] `dbt run` fully automated in docker-compose (`dbt-runner` waits for Flink schema then runs) +- [x] FastAPI with Pydantic request/response models on every endpoint +- [x] 21 integration tests, 91% coverage, mypy strict + ruff all pass +- [x] Prometheus middleware: `api_requests_total`, `api_request_duration_seconds` +- [x] Background market poller: 5 business Prometheus metrics updated every 30s +- [x] Grafana: 17-panel dashboard provisioned via config file (no manual setup) +- [x] `api/Dockerfile`: `python:3.13-slim` + curl installed for healthcheck +- [x] `docs/MARKET_CONCEPTS.md` + `MARKET_CONCEPTS_ZH.md`: bilingual market microstructure glossary + +**Done when:** `curl localhost:8000/ohlcv/btcusdt` returns live K-line data ✓ --- diff --git a/infra/config/grafana/dashboards/k6.json b/infra/config/grafana/dashboards/k6.json new file mode 100644 index 0000000..f7fc181 --- /dev/null +++ b/infra/config/grafana/dashboards/k6.json @@ -0,0 +1,240 @@ +{ + "title": "TickSense — k6 Load Test", + "uid": "ticksense-k6", + "schemaVersion": 39, + "version": 1, + "refresh": "5s", + "time": { "from": "now-10m", "to": "now" }, + "timezone": "browser", + "tags": ["ticksense", "k6"], + "panels": [ + { + "id": 1, + "type": "stat", + "title": "Virtual Users", + "gridPos": { "h": 4, "w": 6, "x": 0, "y": 0 }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "colorMode": "background", + "graphMode": "area", + "textMode": "auto" + }, + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { "mode": "thresholds" }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 5 }, + { "color": "red", "value": 15 } + ] + } + } + }, + "targets": [{ "expr": "k6_vus", "legendFormat": "VUs", "refId": "A" }] + }, + { + "id": 2, + "type": "stat", + "title": "Request Rate", + "gridPos": { "h": 4, "w": 6, "x": 6, "y": 0 }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "colorMode": "background", + "graphMode": "area", + "textMode": "auto" + }, + "fieldConfig": { + "defaults": { + "unit": "reqps", + "decimals": 1, + "color": { "mode": "thresholds" }, + "thresholds": { + "mode": "absolute", + "steps": [{ "color": "blue", "value": null }] + } + } + }, + "targets": [{ + "expr": "sum(rate(k6_http_reqs_total[30s]))", + "legendFormat": "req/s", + "refId": "A" + }] + }, + { + "id": 3, + "type": "stat", + "title": "Check Pass Rate", + "gridPos": { "h": 4, "w": 6, "x": 12, "y": 0 }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "colorMode": "background", + "graphMode": "area", + "textMode": "auto" + }, + "fieldConfig": { + "defaults": { + "unit": "percentunit", + "decimals": 1, + "color": { "mode": "thresholds" }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "red", "value": null }, + { "color": "yellow", "value": 0.9 }, + { "color": "green", "value": 0.99 } + ] + } + } + }, + "targets": [{ + "expr": "k6_checks_rate", + "legendFormat": "pass rate", + "refId": "A" + }] + }, + { + "id": 4, + "type": "stat", + "title": "Error Rate", + "gridPos": { "h": 4, "w": 6, "x": 18, "y": 0 }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "colorMode": "background", + "graphMode": "area", + "textMode": "auto" + }, + "fieldConfig": { + "defaults": { + "unit": "percentunit", + "decimals": 2, + "color": { "mode": "thresholds" }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 0.01 }, + { "color": "red", "value": 0.05 } + ] + } + } + }, + "targets": [{ + "expr": "k6_errors_rate", + "legendFormat": "error rate", + "refId": "A" + }] + }, + { + "id": 5, + "type": "timeseries", + "title": "Virtual Users & Request Rate", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 4 }, + "options": { + "tooltip": { "mode": "multi", "sort": "desc" }, + "legend": { "displayMode": "list", "placement": "bottom" } + }, + "fieldConfig": { + "defaults": { "custom": { "lineWidth": 2, "fillOpacity": 15 } }, + "overrides": [ + { + "matcher": { "id": "byName", "options": "VUs" }, + "properties": [ + { "id": "unit", "value": "short" }, + { "id": "custom.axisPlacement", "value": "right" }, + { "id": "color", "value": { "mode": "fixed", "fixedColor": "yellow" } } + ] + }, + { + "matcher": { "id": "byName", "options": "req/s" }, + "properties": [ + { "id": "unit", "value": "reqps" }, + { "id": "color", "value": { "mode": "fixed", "fixedColor": "blue" } } + ] + } + ] + }, + "targets": [ + { "expr": "k6_vus", "legendFormat": "VUs", "refId": "A" }, + { "expr": "sum(rate(k6_http_reqs_total[30s]))", "legendFormat": "req/s", "refId": "B" } + ] + }, + { + "id": 6, + "type": "timeseries", + "title": "P95 Latency by Endpoint", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 4 }, + "options": { + "tooltip": { "mode": "multi", "sort": "desc" }, + "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["last", "max"] } + }, + "fieldConfig": { + "defaults": { + "unit": "ms", + "custom": { "lineWidth": 2, "fillOpacity": 8 }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 500 }, + { "color": "red", "value": 2000 } + ] + } + } + }, + "targets": [{ + "expr": "k6_http_req_duration_p95", + "legendFormat": "{{name}}", + "refId": "A" + }] + }, + { + "id": 7, + "type": "timeseries", + "title": "Latency Percentiles (all endpoints)", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 12 }, + "options": { + "tooltip": { "mode": "multi", "sort": "desc" }, + "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["last", "max"] } + }, + "fieldConfig": { + "defaults": { + "unit": "ms", + "custom": { "lineWidth": 2, "fillOpacity": 5 } + } + }, + "targets": [ + { "expr": "avg(k6_http_req_duration_p50)", "legendFormat": "p50", "refId": "A" }, + { "expr": "avg(k6_http_req_duration_p95)", "legendFormat": "p95", "refId": "B" }, + { "expr": "avg(k6_http_req_duration_p99)", "legendFormat": "p99", "refId": "C" }, + { "expr": "avg(k6_http_req_duration_max)", "legendFormat": "max", "refId": "D" } + ] + }, + { + "id": 8, + "type": "timeseries", + "title": "Check Pass Rate by Assertion", + "description": "Each line is one named check (e.g. 'ohlcv has bars', 'pipeline all healthy'). Drop below 1.0 means assertions failing.", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 12 }, + "options": { + "tooltip": { "mode": "multi", "sort": "asc" }, + "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["last", "min"] } + }, + "fieldConfig": { + "defaults": { + "unit": "percentunit", + "min": 0, + "max": 1, + "custom": { "lineWidth": 1, "fillOpacity": 5 } + } + }, + "targets": [{ + "expr": "k6_checks_rate", + "legendFormat": "{{check}}", + "refId": "A" + }] + } + ] +} diff --git a/infra/config/grafana/dashboards/ticksense.json b/infra/config/grafana/dashboards/ticksense.json new file mode 100644 index 0000000..d244590 --- /dev/null +++ b/infra/config/grafana/dashboards/ticksense.json @@ -0,0 +1,494 @@ +{ + "title": "TickSense — Real-time Crypto Lakehouse", + "uid": "ticksense-api", + "schemaVersion": 39, + "version": 2, + "refresh": "10s", + "time": { "from": "now-1h", "to": "now" }, + "timezone": "browser", + "tags": ["ticksense"], + "panels": [ + { + "id": 1, + "type": "stat", + "title": "Requests / min", + "gridPos": { "h": 4, "w": 8, "x": 0, "y": 0 }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "colorMode": "background", + "graphMode": "area", + "textMode": "auto" + }, + "fieldConfig": { + "defaults": { + "unit": "short", + "decimals": 1, + "color": { "mode": "thresholds" }, + "thresholds": { + "mode": "absolute", + "steps": [{ "color": "green", "value": null }] + } + } + }, + "targets": [ + { + "expr": "round(sum(rate(api_requests_total[1m])) * 60, 0.1)", + "legendFormat": "req/min", + "refId": "A" + } + ] + }, + { + "id": 2, + "type": "stat", + "title": "P95 Latency", + "gridPos": { "h": 4, "w": 8, "x": 8, "y": 0 }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "colorMode": "background", + "graphMode": "area", + "textMode": "auto" + }, + "fieldConfig": { + "defaults": { + "unit": "ms", + "decimals": 0, + "color": { "mode": "thresholds" }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 100 }, + { "color": "red", "value": 500 } + ] + } + } + }, + "targets": [ + { + "expr": "histogram_quantile(0.95, sum(rate(api_request_duration_seconds_bucket[5m])) by (le)) * 1000", + "legendFormat": "p95", + "refId": "A" + } + ] + }, + { + "id": 3, + "type": "stat", + "title": "Error Rate", + "gridPos": { "h": 4, "w": 8, "x": 16, "y": 0 }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "colorMode": "background", + "graphMode": "area", + "textMode": "auto" + }, + "fieldConfig": { + "defaults": { + "unit": "percent", + "decimals": 2, + "color": { "mode": "thresholds" }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 1 }, + { "color": "red", "value": 5 } + ] + } + } + }, + "targets": [ + { + "expr": "sum(rate(api_requests_total{status_code=~\"5..\"}[5m])) / sum(rate(api_requests_total[5m])) * 100", + "legendFormat": "error %", + "refId": "A" + } + ] + }, + { + "id": 4, + "type": "timeseries", + "title": "Request Rate by Endpoint", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 4 }, + "options": { + "tooltip": { "mode": "multi", "sort": "desc" }, + "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] } + }, + "fieldConfig": { + "defaults": { + "unit": "reqps", + "custom": { "lineWidth": 2, "fillOpacity": 10 } + } + }, + "targets": [ + { + "expr": "sum(rate(api_requests_total[1m])) by (endpoint)", + "legendFormat": "{{endpoint}}", + "refId": "A" + } + ] + }, + { + "id": 5, + "type": "timeseries", + "title": "Response Time Percentiles", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 4 }, + "options": { + "tooltip": { "mode": "multi", "sort": "desc" }, + "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] } + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "custom": { "lineWidth": 2, "fillOpacity": 5 } + } + }, + "targets": [ + { + "expr": "histogram_quantile(0.50, sum(rate(api_request_duration_seconds_bucket[5m])) by (le))", + "legendFormat": "p50", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(api_request_duration_seconds_bucket[5m])) by (le))", + "legendFormat": "p95", "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(api_request_duration_seconds_bucket[5m])) by (le))", + "legendFormat": "p99", "refId": "C" + } + ] + }, + { + "id": 6, + "type": "timeseries", + "title": "HTTP Status Codes", + "gridPos": { "h": 6, "w": 24, "x": 0, "y": 12 }, + "options": { + "tooltip": { "mode": "multi", "sort": "desc" }, + "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["sum"] } + }, + "fieldConfig": { + "defaults": { + "unit": "reqps", + "custom": { "lineWidth": 1, "fillOpacity": 20 } + }, + "overrides": [ + { + "matcher": { "id": "byFrameRefID", "options": "ERR" }, + "properties": [{ "id": "color", "value": { "mode": "fixed", "fixedColor": "red" } }] + } + ] + }, + "targets": [ + { + "expr": "sum(rate(api_requests_total{status_code=~\"2..\"}[1m]))", + "legendFormat": "2xx success", "refId": "OK" + }, + { + "expr": "sum(rate(api_requests_total{status_code=~\"4..\"}[1m]))", + "legendFormat": "4xx client error", "refId": "CLI" + }, + { + "expr": "sum(rate(api_requests_total{status_code=~\"5..\"}[1m]))", + "legendFormat": "5xx server error", "refId": "ERR" + } + ] + }, + + { + "id": 7, + "type": "row", + "title": "Live Market Prices", + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 18 }, + "collapsed": false + }, + { + "id": 8, + "type": "stat", + "title": "BTC / USD", + "gridPos": { "h": 4, "w": 6, "x": 0, "y": 19 }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "colorMode": "background", + "graphMode": "area", + "textMode": "auto" + }, + "fieldConfig": { + "defaults": { + "unit": "currencyUSD", + "decimals": 0, + "color": { "mode": "thresholds" }, + "thresholds": { + "mode": "absolute", + "steps": [{ "color": "blue", "value": null }] + } + } + }, + "targets": [ + { + "expr": "market_mid_price_usd{symbol=\"btcusdt\"}", + "legendFormat": "BTCUSDT", + "refId": "A" + } + ] + }, + { + "id": 9, + "type": "stat", + "title": "ETH / USD", + "gridPos": { "h": 4, "w": 6, "x": 6, "y": 19 }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "colorMode": "background", + "graphMode": "area", + "textMode": "auto" + }, + "fieldConfig": { + "defaults": { + "unit": "currencyUSD", + "decimals": 0, + "color": { "mode": "thresholds" }, + "thresholds": { + "mode": "absolute", + "steps": [{ "color": "blue", "value": null }] + } + } + }, + "targets": [ + { + "expr": "market_mid_price_usd{symbol=\"ethusdt\"}", + "legendFormat": "ETHUSDT", + "refId": "A" + } + ] + }, + { + "id": 10, + "type": "stat", + "title": "SOL / USD", + "gridPos": { "h": 4, "w": 4, "x": 12, "y": 19 }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "colorMode": "background", + "graphMode": "area", + "textMode": "auto" + }, + "fieldConfig": { + "defaults": { + "unit": "currencyUSD", + "decimals": 2, + "color": { "mode": "thresholds" }, + "thresholds": { + "mode": "absolute", + "steps": [{ "color": "blue", "value": null }] + } + } + }, + "targets": [ + { + "expr": "market_mid_price_usd{symbol=\"solusdt\"}", + "legendFormat": "SOLUSDT", + "refId": "A" + } + ] + }, + { + "id": 11, + "type": "stat", + "title": "BNB / USD", + "gridPos": { "h": 4, "w": 4, "x": 16, "y": 19 }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "colorMode": "background", + "graphMode": "area", + "textMode": "auto" + }, + "fieldConfig": { + "defaults": { + "unit": "currencyUSD", + "decimals": 2, + "color": { "mode": "thresholds" }, + "thresholds": { + "mode": "absolute", + "steps": [{ "color": "blue", "value": null }] + } + } + }, + "targets": [ + { + "expr": "market_mid_price_usd{symbol=\"bnbusdt\"}", + "legendFormat": "BNBUSDT", + "refId": "A" + } + ] + }, + { + "id": 12, + "type": "stat", + "title": "XRP / USD", + "gridPos": { "h": 4, "w": 4, "x": 20, "y": 19 }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "colorMode": "background", + "graphMode": "area", + "textMode": "auto" + }, + "fieldConfig": { + "defaults": { + "unit": "currencyUSD", + "decimals": 4, + "color": { "mode": "thresholds" }, + "thresholds": { + "mode": "absolute", + "steps": [{ "color": "blue", "value": null }] + } + } + }, + "targets": [ + { + "expr": "market_mid_price_usd{symbol=\"xrpusdt\"}", + "legendFormat": "XRPUSDT", + "refId": "A" + } + ] + }, + { + "id": 13, + "type": "timeseries", + "title": "Bid-Ask Spread (bps)", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 23 }, + "options": { + "tooltip": { "mode": "multi", "sort": "desc" }, + "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["last", "max"] } + }, + "fieldConfig": { + "defaults": { + "unit": "short", + "decimals": 2, + "custom": { "lineWidth": 2, "fillOpacity": 8 }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 5 }, + { "color": "red", "value": 20 } + ] + } + } + }, + "targets": [ + { + "expr": "market_spread_bps", + "legendFormat": "{{symbol}}", + "refId": "A" + } + ] + }, + { + "id": 14, + "type": "timeseries", + "title": "Order Book Imbalance", + "description": "Positive = buy pressure, negative = sell pressure. Range: -1 to +1.", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 23 }, + "options": { + "tooltip": { "mode": "multi", "sort": "desc" }, + "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["last"] } + }, + "fieldConfig": { + "defaults": { + "unit": "short", + "decimals": 3, + "min": -1, + "max": 1, + "custom": { "lineWidth": 2, "fillOpacity": 10 } + } + }, + "targets": [ + { + "expr": "market_bid_ask_imbalance", + "legendFormat": "{{symbol}}", + "refId": "A" + } + ] + }, + + { + "id": 15, + "type": "row", + "title": "Pipeline Health", + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 31 }, + "collapsed": false + }, + { + "id": 16, + "type": "stat", + "title": "Health Score by Symbol", + "gridPos": { "h": 5, "w": 12, "x": 0, "y": 32 }, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] }, + "orientation": "auto", + "colorMode": "background", + "graphMode": "none", + "textMode": "auto" + }, + "fieldConfig": { + "defaults": { + "unit": "percentunit", + "decimals": 2, + "min": 0, + "max": 1, + "color": { "mode": "thresholds" }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "red", "value": null }, + { "color": "yellow", "value": 0.8 }, + { "color": "green", "value": 1.0 } + ] + } + } + }, + "targets": [ + { + "expr": "pipeline_health_score", + "legendFormat": "{{symbol}}", + "refId": "A" + } + ] + }, + { + "id": 17, + "type": "timeseries", + "title": "Data Staleness (seconds)", + "description": "Seconds since last market event. Green < 60s (FRESH), yellow < 120s (WARN), red >= 120s (STALE).", + "gridPos": { "h": 5, "w": 12, "x": 12, "y": 32 }, + "options": { + "tooltip": { "mode": "multi", "sort": "desc" }, + "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["last", "max"] } + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "decimals": 0, + "custom": { "lineWidth": 2, "fillOpacity": 8 }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 60 }, + { "color": "red", "value": 120 } + ] + } + } + }, + "targets": [ + { + "expr": "market_staleness_seconds", + "legendFormat": "{{symbol}}", + "refId": "A" + } + ] + } + ] +} diff --git a/infra/config/grafana/provisioning/dashboards/ticksense.yml b/infra/config/grafana/provisioning/dashboards/ticksense.yml new file mode 100644 index 0000000..53d6867 --- /dev/null +++ b/infra/config/grafana/provisioning/dashboards/ticksense.yml @@ -0,0 +1,9 @@ +apiVersion: 1 + +providers: + - name: ticksense + type: file + disableDeletion: true + updateIntervalSeconds: 30 + options: + path: /var/lib/grafana/dashboards diff --git a/infra/config/grafana/provisioning/datasources/prometheus.yml b/infra/config/grafana/provisioning/datasources/prometheus.yml new file mode 100644 index 0000000..bb009bb --- /dev/null +++ b/infra/config/grafana/provisioning/datasources/prometheus.yml @@ -0,0 +1,9 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + editable: false diff --git a/infra/config/prometheus/prometheus.yml b/infra/config/prometheus/prometheus.yml new file mode 100644 index 0000000..d2d1503 --- /dev/null +++ b/infra/config/prometheus/prometheus.yml @@ -0,0 +1,9 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + - job_name: ticksense-api + static_configs: + - targets: ["api:8000"] + metrics_path: /metrics diff --git a/k6/script.js b/k6/script.js new file mode 100644 index 0000000..9522552 --- /dev/null +++ b/k6/script.js @@ -0,0 +1,92 @@ +import http from 'k6/http'; +import { check, sleep } from 'k6'; +import { Rate } from 'k6/metrics'; + +const BASE_URL = 'http://api:8000'; +const SYMBOLS = ['btcusdt', 'ethusdt', 'solusdt', 'bnbusdt', 'xrpusdt']; + +export const errorRate = new Rate('errors'); + +export const options = { + stages: [ + { duration: '30s', target: 3 }, // warm up + { duration: '60s', target: 3 }, // baseline + { duration: '30s', target: 10 }, // ramp to moderate load + { duration: '60s', target: 10 }, // hold + { duration: '30s', target: 0 }, // cool down + ], + thresholds: { + http_req_duration: ['p(95)<2000'], // 95th percentile under 2s (Trino cold query) + http_req_failed: ['rate<0.05'], // error rate under 5% + checks: ['rate>0.95'], // 95% of assertions must pass + }, +}; + +function tag(name) { + return { tags: { name } }; +} + +export default function () { + const symbol = SYMBOLS[Math.floor(Math.random() * SYMBOLS.length)]; + + // ── /health ──────────────────────────────────────────────────────────────── + let res = http.get(`${BASE_URL}/health`, tag('health')); + check(res, { + 'health 200': (r) => r.status === 200, + 'health body ok': (r) => r.json('status') === 'ok', + }); + errorRate.add(res.status !== 200); + sleep(0.3); + + // ── /ohlcv/{symbol} ──────────────────────────────────────────────────────── + res = http.get(`${BASE_URL}/ohlcv/${symbol}`, tag('ohlcv')); + const ohlcvOk = res.status === 200; + check(res, { + 'ohlcv 200': (r) => r.status === 200, + 'ohlcv has bars': (r) => ohlcvOk && r.json('count') > 0, + 'ohlcv interval': (r) => ohlcvOk && r.json('interval') === '1m', + }); + errorRate.add(res.status >= 500); // 404 acceptable (symbol may have no data yet) + sleep(0.5); + + // ── /spread/{symbol} ─────────────────────────────────────────────────────── + res = http.get(`${BASE_URL}/spread/${symbol}`, tag('spread')); + const spreadOk = res.status === 200; + check(res, { + 'spread 200': (r) => r.status === 200, + 'spread_bps > 0': (r) => spreadOk && r.json('spread_bps') > 0, + 'mid_price > 0': (r) => spreadOk && r.json('mid_price') > 0, + }); + errorRate.add(res.status >= 500); + sleep(0.5); + + // ── /liquidity/{symbol} ──────────────────────────────────────────────────── + res = http.get(`${BASE_URL}/liquidity/${symbol}`, tag('liquidity')); + const liqOk = res.status === 200; + check(res, { + 'liquidity 200': (r) => r.status === 200, + 'liquidity has signal': (r) => liqOk && ['BUY_PRESSURE','SELL_PRESSURE','NEUTRAL'].includes(r.json('market_signal')), + 'liquidity freshness': (r) => liqOk && ['FRESH','WARN','STALE'].includes(r.json('freshness_status')), + }); + errorRate.add(res.status >= 500); + sleep(0.5); + + // ── /pipeline/lag ────────────────────────────────────────────────────────── + res = http.get(`${BASE_URL}/pipeline/lag`, tag('pipeline')); + check(res, { + 'pipeline 200': (r) => r.status === 200, + 'pipeline has items': (r) => r.status === 200 && r.json('total_count') > 0, + 'pipeline all healthy': (r) => r.status === 200 && r.json('healthy_count') === r.json('total_count'), + }); + errorRate.add(res.status !== 200); + sleep(0.3); + + // ── /symbols ─────────────────────────────────────────────────────────────── + res = http.get(`${BASE_URL}/symbols`, tag('symbols')); + check(res, { + 'symbols 200': (r) => r.status === 200, + 'symbols count > 0': (r) => r.status === 200 && r.json('count') > 0, + }); + errorRate.add(res.status !== 200); + sleep(0.4); +} diff --git a/pyproject.toml b/pyproject.toml index 107cc5a..4215f33 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ line-length = 100 [tool.ruff.lint] select = ["E", "F", "I", "N", "W", "UP", "B", "SIM"] -ignore = ["N818"] +ignore = ["N818", "B008"] # B008: FastAPI Depends() in defaults is intentional [tool.ruff.lint.per-file-ignores] "flink/**" = ["UP017"] # datetime.UTC requires Python 3.11+; flink container runs 3.10 diff --git a/uv.lock b/uv.lock index eed43cf..0b95a86 100644 --- a/uv.lock +++ b/uv.lock @@ -1801,8 +1801,9 @@ dev = [ [[package]] name = "ticksense-api" version = "0.1.0" -source = { virtual = "api" } +source = { editable = "api" } dependencies = [ + { name = "anyio" }, { name = "fastapi" }, { name = "prometheus-client" }, { name = "pydantic" }, @@ -1814,6 +1815,7 @@ dependencies = [ [package.metadata] requires-dist = [ + { name = "anyio", specifier = ">=4.0" }, { name = "fastapi", specifier = ">=0.115.0" }, { name = "prometheus-client", specifier = ">=0.20.0" }, { name = "pydantic", specifier = ">=2.7" },