diff --git a/.skills/production-python-service/SKILL.md b/.skills/production-python-service/SKILL.md index 586f4e2..166e73a 100644 --- a/.skills/production-python-service/SKILL.md +++ b/.skills/production-python-service/SKILL.md @@ -28,6 +28,8 @@ description: Use when implementing any Python service: FastAPI endpoints, Kafka src// main.py # assembler only: middleware + include_router, no business logic dependencies.py # shared Depends factories (get_client, get_db) — defined ONCE here + read_model.py # ReadModel dataclass + singleton (see OLAP section below) + poller.py # background task: single writer of ReadModel routers/ users.py # router = APIRouter(prefix="/users", tags=["users"]) todos.py @@ -38,19 +40,23 @@ src// Rules: - `main.py` only calls `app.include_router(...)` — no route definitions, no business logic - `dependencies.py` is the single source of truth for all `Depends()` factories +- Client factories use `@lru_cache(maxsize=1)` so the same instance (and its connection pool) is reused across requests - Each router imports `get_client` / `get_db` from `dependencies`, never defines its own - Set `prefix` and `tags` on the `APIRouter`, not on `include_router` (keeps each file self-contained) -- Test overrides via `app.dependency_overrides[get_client] = lambda: MockClient()` +- For hot-path endpoints backed by ReadModel: test by populating the model; do NOT use `app.dependency_overrides` +- For cold-path (Trino) endpoints: test overrides via `app.dependency_overrides[get_client] = lambda: MockClient()` ```python # dependencies.py +from functools import lru_cache from .config import settings from .trino_client import TrinoClient +@lru_cache(maxsize=1) def get_client() -> TrinoClient: return TrinoClient(host=settings.trino_host, port=settings.trino_port, ...) -# routers/ohlcv.py +# routers/ohlcv.py — cold path still uses Depends; hot path reads from ReadModel from ..dependencies import get_client router = APIRouter(prefix="/ohlcv", tags=["ohlcv"]) @@ -63,6 +69,165 @@ async def get_ohlcv(symbol: str, client: TrinoClient = Depends(get_client)) -> O app.include_router(ohlcv.router) ``` +--- + +## OLAP-backed API — ReadModel pattern + +When the query layer sits on an OLAP engine (Trino, BigQuery, Spark SQL), every request +pays a minimum 50-100 ms query-planning floor regardless of query complexity. +The fix is a **poller-as-read-model**: one background task owns all writes to an in-memory +dict; request handlers only read from it. + +### Decision tree for new endpoints + +| Data type | Pattern | Latency | +|---|---|---| +| Current-state snapshot ("what is X right now?") | ReadModel — pre-loaded by poller | < 1 ms | +| Historical / arbitrary-parameter query | Trino cold path + `TTLCache` | 50–200 ms | +| Derivable from existing model data | Compute in router, no DB call | < 1 ms | + +### File: `read_model.py` + +```python +from dataclasses import dataclass, field + +@dataclass +class ReadModel: + # Always replace the whole dict (atomic under CPython GIL). + # Never mutate an existing dict in place. + spread: dict[str, SpreadResponse] = field(default_factory=dict) + ohlcv: dict[str, OHLCVResponse] = field(default_factory=dict) + pipeline: PipelineLagResponse | None = None + ready: bool = False # False until first poll cycle completes → endpoints return 503 + +_model = ReadModel() + +def get_read_model() -> ReadModel: + return _model +``` + +### File: `poller.py` — single writer + +```python +async def _poll_something(client: TrinoClient) -> None: + try: + rows = await client.fetch(_SQL) + # Build the new dict first, then swap atomically. + _model.something = {str(r["key"]): Model.model_validate(r) for r in rows} + log.info("poll_ok", count=len(rows)) + except Exception as exc: + log.warning("poll_failed", error=str(exc)) # stale value stays; no crash + +async def run_poller(client: TrinoClient) -> None: + # Warm up before serving requests. + await asyncio.gather(_poll_something(client), ..., return_exceptions=True) + _model.ready = True + + last_slow = time.monotonic() + while True: + await asyncio.sleep(30) + t = time.monotonic() + tasks: list[Coroutine[Any, Any, None]] = [_poll_something(client)] + if t - last_slow >= 300: # slow data refreshed less often + tasks.append(_poll_slow(client)) + last_slow = t + await asyncio.gather(*tasks, return_exceptions=True) +``` + +### Hot-path router (reads model, zero Trino) + +```python +@router.get("/spread/{symbol}", response_model=SpreadResponse) +async def get_spread(symbol: str) -> SpreadResponse: + model = get_read_model() + if not model.ready: + raise HTTPException(status_code=503, detail="Service warming up") + result = model.spread.get(symbol.upper()) + if result is None: + raise HTTPException(status_code=404, detail=f"No data for {symbol}") + return result +``` + +### Cold-path router (Trino + TTLCache for repeated queries) + +```python +from cachetools import TTLCache +from threading import Lock + +_cache: TTLCache[tuple, HistoryResponse] = TTLCache(maxsize=500, ttl=60) +_lock = Lock() + +@router.get("/history/{symbol}", response_model=HistoryResponse) +async def get_history( + symbol: str, from_ts: datetime, to_ts: datetime, + client: TrinoClient = Depends(get_client), +) -> HistoryResponse: + key = (symbol.upper(), from_ts, to_ts) + with _lock: + hit: HistoryResponse | None = _cache.get(key) + if hit: + return hit + rows = await client.fetch(_SQL, [symbol, from_ts, to_ts]) + if not rows: + raise HTTPException(404) + result = HistoryResponse(...) + with _lock: + _cache[key] = result + return result +``` + +### Test pattern + +```python +@pytest.fixture(autouse=True) +def setup_read_model(): + model = get_read_model() + model.ready = True + model.spread["BTCUSDT"] = SpreadResponse(...) + yield + model.ready = False + model.spread.clear() + _cold_cache.clear() # clear any TTLCache used by cold-path tests + +# Hot-path test: populate model, no Trino mock needed. +async def test_get_spread(http_client): + async with http_client as c: + resp = await c.get("/spread/btcusdt") + assert resp.status_code == 200 + +# Cold-path test: override the client, use params= for datetime URL encoding. +async def test_get_history(http_client): + app.dependency_overrides[get_client] = lambda: _mock_client([_ROW]) + async with http_client as c: + resp = await c.get("/history/btcusdt", params={"from_ts": NOW.isoformat()}) + app.dependency_overrides.clear() + assert resp.status_code == 200 +``` + +### Thread-safety rules + +| Object | Safe? | Rule | +|---|---|---| +| `ReadModel` dict swap (`_model.x = new_dict`) | ✅ atomic under GIL | Never mutate in place | +| `ReadModel` field read (`.get(sym)`) | ✅ atomic under GIL | No lock needed | +| `cachetools.TTLCache` | ❌ not thread-safe | Always wrap with `threading.Lock` | +| `requests.Session` (Trino HTTP pool) | ✅ thread-safe | Share one instance via `@lru_cache` | + +### Extending the model — checklist + +Adding a new endpoint that needs fresh data: + +1. Add a field to `ReadModel` in `read_model.py` +2. Write `_poll_(client)` in `poller.py` +3. Register it in the initial `asyncio.gather` and the interval loop +4. Router reads `get_read_model()..get(sym)` — no `Depends(get_client)` needed +5. Test: populate the field in `setup_read_model` fixture; no Trino mock required + +Adding a new endpoint derivable from existing model data (no new poll needed): + +- Just write the router. Read from existing model fields. Compute in Python. +- Example: `GET /top-movers` computes price change from `model.ohlcv[sym].bars`. + ## Configuration pattern ```python @@ -92,3 +257,8 @@ log.info("event_produced", topic=topic, key=key, offset=offset) - `os.environ["KEY"]` instead of pydantic-settings - Catching bare `Exception` without re-raising or logging with context - Mutable default arguments or module-level mutable state +- Querying an OLAP engine on every request — always check the decision tree first +- Creating a new DB connection inside the request handler — use `@lru_cache` on the factory and share `requests.Session` across queries +- Mutating a `ReadModel` dict in place — always build a new dict and swap the reference atomically +- Using `TTLCache` without a `threading.Lock` — it is not thread-safe under concurrent async workers +- `datetime.isoformat()` in a URL query string without URL-encoding — the `+` in `+00:00` is parsed as a space; use `params={"key": value}` in httpx/requests so encoding is handled automatically diff --git a/Makefile b/Makefile index 3b9ba7b..be52827 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ .PHONY: up down restart logs lint format typecheck test coverage \ build flink-submit flink-logs flink-venv ingest ingest-logs \ dbt-compile dbt-run dbt-test dbt-freshness replay-sample watch-cdc \ - load-gen help + load-gen load-test-full help # ── Local stack ────────────────────────────────────────────────────────────── @@ -75,6 +75,8 @@ build: # ── Load testing ────────────────────────────────────────────────────────────── +# make load-gen: runs k6 only — no prerequisite checks. +# Use `make load-test-full` for a fully-orchestrated test with all checks. load-gen: docker run --rm \ --network ticksense_default \ @@ -84,6 +86,129 @@ load-gen: grafana/k6:latest \ run --out experimental-prometheus-rw /scripts/script.js +# make load-test-full: end-to-end load test with full prerequisite verification. +# +# WHY this exists — `make load-gen` alone always silently produces 100% 404s +# unless the full data pipeline is ready beforehand. The prerequisites are: +# +# 1. Flink jobs RUNNING — Flink writes to normalized.* and raw.*. +# flink-init only *submits* the jobs; they need a few seconds to reach +# RUNNING state before any messages are processed. +# +# 2. ingest producing to Kafka — normalized.book_ticker stays empty until +# the WebSocket ingest is active. ingest runs outside Docker because +# apache-flink cannot be installed in the same uv workspace as the host +# Python 3.13 environment. This target auto-starts it in the background +# if it is not already running, logging to /tmp/ticksense-ingest.log. +# +# 3. Flink first checkpoint complete (~60 s) — Iceberg files are written +# on checkpoint, not on every message. Nothing appears in normalized.* +# until the first checkpoint commits. +# +# 4. dbt run AFTER Flink has data — the Docker dbt-runner executes once at +# stack startup, before Flink has written anything. mart_liquidity and +# mart_ohlcv are empty until dbt is re-run against live Flink output. +# +# 5. API poller warm — the API background poller must complete at least one +# full cycle after dbt runs before the ReadModel has data. The poller +# starts immediately on API startup and refreshes every 30 s; after +# `dbt run` populates the marts the next poll cycle picks it up. +# +load-test-full: + @echo "" + @echo "╔═══════════════════════════════════════════╗" + @echo "║ TickSense — Full Load Test Pipeline ║" + @echo "╚═══════════════════════════════════════════╝" + @echo "" + @echo "── Step 1/6 Flink jobs RUNNING ────────────" + @echo " (normalize + ohlcv_1m + CDC must all be in RUNNING state)" + @_flink_running() { \ + curl -sf http://localhost:8081/jobs/overview 2>/dev/null \ + | python3 -c 'import sys,json; d=json.load(sys.stdin); print(sum(1 for j in d["jobs"] if j["state"]=="RUNNING"))' \ + 2>/dev/null || echo 0; \ + }; \ + _ts=$$(date +%s); \ + until [ "$$(_flink_running)" -ge 2 ]; do \ + if [ $$(( $$(date +%s) - $$_ts )) -ge 180 ]; then \ + echo " ERROR: Flink jobs did not reach RUNNING within 3 min"; \ + echo " Debug: docker compose logs flink-jobmanager flink-init"; \ + exit 1; \ + fi; \ + echo " waiting for Flink jobs RUNNING (currently $$(_flink_running))..."; \ + sleep 5; \ + done + @echo " ✓ Flink jobs RUNNING" + @echo "" + @echo "── Step 2/6 Verify ingest ──────────────────" + @echo " (ingest runs as a Docker service; a local process is also supported)" + @if docker compose ps ingest 2>/dev/null | grep -q "Up"; then \ + echo " Docker ingest service is running — OK"; \ + elif pgrep -f "ingest.main" > /dev/null 2>&1; then \ + echo " host ingest already running (PID $$(pgrep -f 'ingest.main'))"; \ + else \ + echo " Starting ingest in background → logs at /tmp/ticksense-ingest.log"; \ + uv run python -m ingest.main >> /tmp/ticksense-ingest.log 2>&1 & \ + echo " ingest started (PID $$!)"; \ + fi + @echo "" + @echo "── Step 3/6 Wait for normalized data ──────" + @echo " (book_ticker: Flink first checkpoint ~60 s)" + @echo " (ohlcv_1m: 1-minute window must close + checkpoint ~120 s)" + @_ts=$$(date +%s); \ + until docker compose exec -T trino trino \ + --execute "SELECT count(*) FROM iceberg.normalized.book_ticker" \ + 2>/dev/null | grep -qE "[1-9]"; do \ + if [ $$(( $$(date +%s) - $$_ts )) -ge 180 ]; then \ + echo " ERROR: normalized.book_ticker still empty after 3 min — is ingest running?"; \ + exit 1; \ + fi; \ + echo " normalized.book_ticker still empty — retry in 15 s..."; \ + sleep 15; \ + done + @echo " ✓ normalized.book_ticker has rows" + @_ts=$$(date +%s); \ + until docker compose exec -T trino trino \ + --execute "SELECT count(*) FROM iceberg.normalized.ohlcv_1m WHERE window_start >= NOW() - INTERVAL '2' HOUR" \ + 2>/dev/null | grep -qE "[1-9]"; do \ + if [ $$(( $$(date +%s) - $$_ts )) -ge 300 ]; then \ + echo " ERROR: no recent OHLCV bars after 5 min — check: docker compose logs flink-jobmanager"; \ + exit 1; \ + fi; \ + echo " normalized.ohlcv_1m has no recent data (waiting for 1-min window to close) — retry in 15 s..."; \ + sleep 15; \ + done + @echo " ✓ normalized.ohlcv_1m has recent rows" + @echo "" + @echo "── Step 4/6 Run dbt ───────────────────────" + @echo " (WHY: the Docker dbt-runner ran once at startup before Flink had data;" + @echo " mart_liquidity / mart_ohlcv are empty until we re-run dbt now)" + $(DBT) run $(DBTOPTS) + @echo " ✓ dbt run complete" + @echo "" + @echo "── Step 5/6 Verify marts + API ohlcv ready ─" + @docker compose exec -T trino trino \ + --execute "SELECT count(*) FROM iceberg.marts.mart_liquidity" \ + 2>/dev/null | grep -qE "[1-9]" || \ + (echo " ERROR: mart_liquidity still empty after dbt run — check dbt logs" && exit 1) + @echo " ✓ mart_liquidity has rows" + @docker compose exec -T trino trino \ + --execute "SELECT count(*) FROM iceberg.marts.mart_ohlcv" \ + 2>/dev/null | grep -qE "[1-9]" || \ + (echo " ERROR: mart_ohlcv still empty after dbt run — check dbt logs" && exit 1) + @echo " ✓ mart_ohlcv has rows" + @echo " Waiting for /ready (ReadModel ohlcv populated, poller refreshes every 60 s)..." + @until curl -sf http://localhost:8000/ready 2>/dev/null; do \ + echo " API not ready — retry in 10 s..."; \ + sleep 10; \ + done + @echo " ✓ API ready (ReadModel populated)" + @echo "" + @echo "── Step 6/6 k6 load test ──────────────────" + @echo " (k6 connects to api:8000 on ticksense_default Docker network)" + @echo " (ingest keeps running after test — kill with: pkill -f ingest.main)" + @echo "" + $(MAKE) load-gen + # ── Ingest ──────────────────────────────────────────────────────────────────── ingest: diff --git a/README.md b/README.md index 5284614..4a527fc 100644 --- a/README.md +++ b/README.md @@ -310,7 +310,12 @@ ingest/ Binance WebSocket ingestion → Kafka client.py Async WS client, reconnect, backoff main.py Entry point: asyncio.gather over symbols -api/ FastAPI query layer over Trino +api/ FastAPI query layer over Trino + Iceberg + src/api/ + read_model.py in-process ReadModel (hot-path data, no Trino on reads) + poller.py background task: refreshes ReadModel every 30–60 s + routers/ hot-path endpoints read model; cold-path (history) hits Trino + models/ Pydantic response models per domain replay/ Kafka offset replay producer flink/ PyFlink streaming jobs (Phase 2) airflow/ Orchestration DAGs (Phase 5) @@ -357,5 +362,5 @@ See [docs/ROADMAP.md](docs/ROADMAP.md) for the full phase plan. | 2 — Flink Processing | ✅ | Normalize, dedup, OHLCV windows → Iceberg silver (e2e verified 2026-05-15) | | 3 — CDC + Replay | ✅ | Debezium CDC, Flink upsert job, replay CLI (e2e verified 2026-05-16) | | 4 — Analytics Layer | ✅ | dbt + Trino, FastAPI, Prometheus, Grafana (e2e verified 2026-05-16) | -| 5 — Ops + Observability | — | Airflow, Great Expectations, SLA alerts | +| 5 — Ops + Observability | ✅ | Airflow, Great Expectations, SLA alerts (load test: p(95)=10ms, 0% errors — 2026-05-18; Airflow/Spark pending E2E) | | 6 — Demo + Blog + Web | — | ticksense.ai, demo video, blog posts | diff --git a/api/pyproject.toml b/api/pyproject.toml index 9d35be6..487d116 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -12,6 +12,7 @@ dependencies = [ "trino>=0.328.0", "prometheus-client>=0.20.0", "anyio>=4.0", + "cachetools>=5.3", ] [build-system] diff --git a/api/src/api/dependencies.py b/api/src/api/dependencies.py index 9fdc848..bdeeb04 100644 --- a/api/src/api/dependencies.py +++ b/api/src/api/dependencies.py @@ -1,7 +1,10 @@ +from functools import lru_cache + from .config import settings from .trino_client import TrinoClient +@lru_cache(maxsize=1) def get_client() -> TrinoClient: return TrinoClient( host=settings.trino_host, diff --git a/api/src/api/main.py b/api/src/api/main.py index 1c716be..c678760 100644 --- a/api/src/api/main.py +++ b/api/src/api/main.py @@ -49,9 +49,14 @@ async def health() -> JSONResponse: @app.get("/ready", tags=["ops"]) async def ready() -> JSONResponse: - if await get_client().ping(): - return JSONResponse({"status": "ready"}) - return JSONResponse({"status": "unavailable"}, status_code=503) + from .read_model import get_read_model + + if not await get_client().ping(): + return JSONResponse({"status": "unavailable"}, status_code=503) + model = get_read_model() + if not model.ready or not model.ohlcv: + return JSONResponse({"status": "warming_up"}, status_code=503) + return JSONResponse({"status": "ready"}) @app.exception_handler(Exception) diff --git a/api/src/api/poller.py b/api/src/api/poller.py index 378f673..9750c79 100644 --- a/api/src/api/poller.py +++ b/api/src/api/poller.py @@ -1,49 +1,175 @@ +"""Background poller — the single writer of the in-process ReadModel. + +Refresh cadence +--------------- +Liquidity + spread 30 s (order book data, changes every tick) +Pipeline health 30 s (staleness / freshness metrics) +OHLCV last-60 60 s (aligned with Flink's 1-minute bar interval) +Symbol config 5 min (CDC-driven, changes very rarely) + +On startup all four are refreshed concurrently so the model is ready before +the first request arrives. After that, only the due refreshes run each tick. + +Prometheus gauges are updated alongside the read model so Grafana still works. +""" + +from __future__ import annotations + import asyncio +import time +from collections.abc import Coroutine +from datetime import UTC, datetime +from typing import Any import structlog from .market_metrics import HEALTH_SCORE, IMBALANCE, MID_PRICE, SPREAD_BPS, STALENESS +from .models.liquidity import LiquidityResponse, SpreadResponse +from .models.ohlcv import OHLCVBar, OHLCVResponse +from .models.pipeline import PipelineLagItem, PipelineLagResponse +from .models.symbols import SymbolItem, SymbolsResponse +from .read_model import _model from .trino_client import TrinoClient log = structlog.get_logger() -POLL_INTERVAL = 30 +_LIQUIDITY_INTERVAL = 30 +_OHLCV_INTERVAL = 60 +_SYMBOLS_INTERVAL = 300 _LIQUIDITY_SQL = """ -SELECT exchange, symbol, mid_price, spread_bps, - COALESCE(imbalance, 0.0) AS imbalance, - staleness_seconds -FROM iceberg.marts.mart_liquidity + SELECT exchange, symbol, latest_ts, + best_bid_price, best_ask_price, spread, mid_price, spread_bps, + best_bid_qty, best_ask_qty, COALESCE(imbalance, 0.0) AS imbalance, + market_signal, staleness_seconds, freshness_status + FROM iceberg.marts.mart_liquidity """ -_HEALTH_SQL = """ -SELECT exchange, symbol, health_score -FROM iceberg.marts.mart_exchange_health +# One query for all symbols: window function picks the 60 most-recent bars per +# symbol, bounded to the last 2 hours so the table scan stays cheap. +_OHLCV_SQL = """ + SELECT exchange, symbol, window_start, window_end, + open_price, high_price, low_price, close_price, + volume, vwap, tick_count, first_ts, last_ts + FROM ( + SELECT *, + ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY window_start DESC) AS rn + FROM iceberg.marts.mart_ohlcv + WHERE window_start >= NOW() - INTERVAL '2' HOUR + ) t + WHERE t.rn <= 60 + ORDER BY symbol, window_start DESC """ +_PIPELINE_SQL = """ + SELECT exchange, symbol, latest_event_ts, latest_ingest_ts, + staleness_seconds, freshness_status, health_score + FROM iceberg.marts.mart_exchange_health + ORDER BY symbol +""" + +_SYMBOLS_SQL = """ + SELECT symbol, exchange, status, base_asset, quote_asset, updated_at + FROM iceberg.normalized.symbol_config + ORDER BY symbol +""" -async def _poll_once(client: TrinoClient) -> None: + +async def _poll_liquidity(client: TrinoClient) -> None: try: rows = await client.fetch(_LIQUIDITY_SQL) + new_spread: dict[str, SpreadResponse] = {} + new_liquidity: dict[str, LiquidityResponse] = {} for r in rows: - lbl = {"symbol": r["symbol"], "exchange": r["exchange"]} + sym = str(r["symbol"]).upper() + lbl = {"symbol": str(r["symbol"]).lower(), "exchange": str(r["exchange"])} + new_spread[sym] = SpreadResponse.model_validate(r) + new_liquidity[sym] = LiquidityResponse.model_validate(r) MID_PRICE.labels(**lbl).set(r["mid_price"]) SPREAD_BPS.labels(**lbl).set(r["spread_bps"]) IMBALANCE.labels(**lbl).set(r["imbalance"]) STALENESS.labels(**lbl).set(r["staleness_seconds"]) - log.info("market_poll_ok", symbols=len(rows)) + # Atomic swap: request handlers either see the old dict or the new one. + _model.spread = new_spread + _model.liquidity = new_liquidity + log.info("poll_liquidity_ok", symbols=len(rows)) except Exception as exc: - log.warning("market_poll_liquidity_failed", error=str(exc)) + log.warning("poll_liquidity_failed", error=str(exc)) + +async def _poll_ohlcv(client: TrinoClient) -> None: try: - rows = await client.fetch(_HEALTH_SQL) + rows = await client.fetch(_OHLCV_SQL) + by_symbol: dict[str, list[OHLCVBar]] = {} + exchange_by_symbol: dict[str, str] = {} for r in rows: - HEALTH_SCORE.labels(symbol=r["symbol"], exchange=r["exchange"]).set(r["health_score"]) + sym = str(r["symbol"]).upper() + by_symbol.setdefault(sym, []).append(OHLCVBar.model_validate(r)) + exchange_by_symbol[sym] = str(r["exchange"]) + _model.ohlcv = { + sym: OHLCVResponse( + symbol=sym, + exchange=exchange_by_symbol[sym], + bars=bars, + count=len(bars), + ) + for sym, bars in by_symbol.items() + } + log.info("poll_ohlcv_ok", symbols=len(by_symbol)) except Exception as exc: - log.warning("market_poll_health_failed", error=str(exc)) + log.warning("poll_ohlcv_failed", error=str(exc)) + + +async def _poll_pipeline(client: TrinoClient) -> None: + try: + rows = await client.fetch(_PIPELINE_SQL) + items = [PipelineLagItem.model_validate(r) for r in rows] + for r in rows: + HEALTH_SCORE.labels(symbol=str(r["symbol"]), exchange=str(r["exchange"])).set( + r["health_score"] + ) + healthy = sum(1 for i in items if i.freshness_status == "FRESH") + _model.pipeline = PipelineLagResponse( + items=items, + checked_at=datetime.now(UTC), + healthy_count=healthy, + total_count=len(items), + ) + log.info("poll_pipeline_ok", total=len(items), healthy=healthy) + except Exception as exc: + log.warning("poll_pipeline_failed", error=str(exc)) + + +async def _poll_symbols(client: TrinoClient) -> None: + try: + rows = await client.fetch(_SYMBOLS_SQL) + symbols = [SymbolItem.model_validate(r) for r in rows] + _model.symbols = SymbolsResponse(symbols=symbols, count=len(symbols)) + log.info("poll_symbols_ok", count=len(symbols)) + except Exception as exc: + log.warning("poll_symbols_failed", error=str(exc)) async def run_poller(client: TrinoClient) -> None: + # Initial full refresh so the model is warm before the first request. + await asyncio.gather( + _poll_liquidity(client), + _poll_pipeline(client), + _poll_symbols(client), + _poll_ohlcv(client), + return_exceptions=True, + ) + _model.ready = True + + last_ohlcv = last_symbols = time.monotonic() while True: - await _poll_once(client) - await asyncio.sleep(POLL_INTERVAL) + await asyncio.sleep(30) + t = time.monotonic() + tasks: list[Coroutine[Any, Any, None]] = [_poll_liquidity(client), _poll_pipeline(client)] + if t - last_ohlcv >= _OHLCV_INTERVAL: + tasks.append(_poll_ohlcv(client)) + last_ohlcv = t + if t - last_symbols >= _SYMBOLS_INTERVAL: + tasks.append(_poll_symbols(client)) + last_symbols = t + await asyncio.gather(*tasks, return_exceptions=True) diff --git a/api/src/api/read_model.py b/api/src/api/read_model.py new file mode 100644 index 0000000..c7ab152 --- /dev/null +++ b/api/src/api/read_model.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + +from .models.liquidity import LiquidityResponse, SpreadResponse +from .models.ohlcv import OHLCVResponse +from .models.pipeline import PipelineLagResponse +from .models.symbols import SymbolsResponse + + +@dataclass +class ReadModel: + """Process-local read model populated by the background poller. + + All fields are replaced atomically (full dict swap or object assignment). + CPython's GIL guarantees that a reference swap is atomic, so no lock is + needed for the reads that happen in request handlers. + + ready is False until the first full poll cycle completes. Endpoints + return 503 until ready is True. + """ + + spread: dict[str, SpreadResponse] = field(default_factory=dict) + liquidity: dict[str, LiquidityResponse] = field(default_factory=dict) + # last 60 bars per symbol, newest-first (mirrors the original Trino ORDER BY DESC) + ohlcv: dict[str, OHLCVResponse] = field(default_factory=dict) + pipeline: PipelineLagResponse | None = None + symbols: SymbolsResponse | None = None + ready: bool = False + + +_model = ReadModel() + + +def get_read_model() -> ReadModel: + return _model diff --git a/api/src/api/routers/liquidity.py b/api/src/api/routers/liquidity.py index 43cb5db..67bf620 100644 --- a/api/src/api/routers/liquidity.py +++ b/api/src/api/routers/liquidity.py @@ -1,50 +1,34 @@ import structlog -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, HTTPException -from ..dependencies import get_client from ..models.liquidity import LiquidityResponse, SpreadResponse -from ..trino_client import TrinoClient +from ..read_model import get_read_model log = structlog.get_logger() router = APIRouter(tags=["liquidity"]) -_LIQUIDITY_COLS = """ - exchange, symbol, latest_ts, - best_bid_price, best_ask_price, spread, mid_price, spread_bps, - best_bid_qty, best_ask_qty, imbalance, market_signal, - staleness_seconds, freshness_status -""" -_SPREAD_COLS = """ - exchange, symbol, latest_ts, - best_bid_price, best_ask_price, spread, mid_price, spread_bps -""" +def _check_ready() -> None: + if not get_read_model().ready: + raise HTTPException(status_code=503, detail="Service warming up") @router.get("/spread/{symbol}", response_model=SpreadResponse) -async def get_spread( - symbol: str, - client: TrinoClient = Depends(get_client), -) -> SpreadResponse: +async def get_spread(symbol: str) -> SpreadResponse: + _check_ready() sym = symbol.upper() - sql = f"SELECT {_SPREAD_COLS} FROM iceberg.marts.mart_liquidity WHERE LOWER(symbol) = LOWER(?)" - rows = await client.fetch(sql, [sym]) - if not rows: + result = get_read_model().spread.get(sym) + if result is None: raise HTTPException(status_code=404, detail=f"No spread data for symbol {sym}") - return SpreadResponse.model_validate(rows[0]) + return result @router.get("/liquidity/{symbol}", response_model=LiquidityResponse) -async def get_liquidity( - symbol: str, - client: TrinoClient = Depends(get_client), -) -> LiquidityResponse: +async def get_liquidity(symbol: str) -> LiquidityResponse: + _check_ready() sym = symbol.upper() - sql = ( - f"SELECT {_LIQUIDITY_COLS} FROM iceberg.marts.mart_liquidity WHERE LOWER(symbol) = LOWER(?)" - ) - rows = await client.fetch(sql, [sym]) - if not rows: + result = get_read_model().liquidity.get(sym) + if result is None: raise HTTPException(status_code=404, detail=f"No liquidity data for symbol {sym}") - log.info("liquidity_served", symbol=sym, signal=rows[0].get("market_signal")) - return LiquidityResponse.model_validate(rows[0]) + log.info("liquidity_served", symbol=sym, signal=result.market_signal) + return result diff --git a/api/src/api/routers/ohlcv.py b/api/src/api/routers/ohlcv.py index 74bebda..f7b8ef0 100644 --- a/api/src/api/routers/ohlcv.py +++ b/api/src/api/routers/ohlcv.py @@ -1,16 +1,29 @@ from datetime import datetime +from threading import Lock from typing import Annotated import structlog +from cachetools import TTLCache from fastapi import APIRouter, Depends, HTTPException, Query from ..dependencies import get_client from ..models.ohlcv import OHLCVBar, OHLCVResponse +from ..read_model import get_read_model from ..trino_client import TrinoClient log = structlog.get_logger() router = APIRouter(prefix="/ohlcv", tags=["ohlcv"]) +# The poller pre-loads this many bars per symbol (newest-first). +# Requests within this limit are served from the in-memory model. +_PRELOADED_BARS = 60 + +# Cold-path cache: historical range queries are rare but may be repeated +# (e.g., a dashboard refreshing the same date range every minute). +# TTL=60s aligns with the bar interval; maxsize covers 500 distinct queries. +_cold_cache: TTLCache[tuple, OHLCVResponse] = TTLCache(maxsize=500, ttl=60) +_cold_lock = Lock() + _COLS = """ exchange, symbol, window_start, window_end, open_price, high_price, low_price, close_price, @@ -27,23 +40,51 @@ async def get_ohlcv( client: TrinoClient = Depends(get_client), ) -> OHLCVResponse: sym = symbol.upper() + model = get_read_model() + + # ── Hot path ─────────────────────────────────────────────────────────────── + # No time-range filter and limit fits what the poller pre-loaded: + # serve from the in-memory model — sub-millisecond, no Trino call. + if from_ts is None and to_ts is None and limit <= _PRELOADED_BARS: + if not model.ready: + raise HTTPException(status_code=503, detail="Service warming up") + cached = model.ohlcv.get(sym) + if cached is None: + raise HTTPException(status_code=404, detail=f"No OHLCV data for symbol {sym}") + bars = cached.bars[:limit] + log.debug("ohlcv_model_hit", symbol=sym, limit=limit) + return OHLCVResponse(symbol=sym, exchange=cached.exchange, bars=bars, count=len(bars)) + + # ── Cold path ────────────────────────────────────────────────────────────── + # Historical range query (from_ts / to_ts set) or limit > 60: hit Trino. + # These are analytical / backfill requests; higher latency is acceptable, + # but repeated identical queries are cached for 60 s. + cache_key = (sym, limit, from_ts, to_ts) + with _cold_lock: + cold_cached: OHLCVResponse | None = _cold_cache.get(cache_key) + if cold_cached is not None: + log.debug("ohlcv_cold_cache_hit", symbol=sym) + return cold_cached + sql_parts = [f"SELECT {_COLS} FROM iceberg.marts.mart_ohlcv WHERE LOWER(symbol) = LOWER(?)"] params: list[object] = [sym] - if from_ts: sql_parts.append("AND window_start >= ?") params.append(from_ts) if to_ts: sql_parts.append("AND window_end <= ?") params.append(to_ts) - sql_parts.extend(["ORDER BY window_start DESC", "LIMIT ?"]) params.append(limit) rows = await client.fetch("\n".join(sql_parts), params) if not rows: raise HTTPException(status_code=404, detail=f"No OHLCV data for symbol {sym}") - - bars = [OHLCVBar.model_validate(r) for r in rows] - log.info("ohlcv_served", symbol=sym, bars=len(bars)) - return OHLCVResponse(symbol=sym, exchange=str(rows[0]["exchange"]), bars=bars, count=len(bars)) + bars_list = [OHLCVBar.model_validate(r) for r in rows] + log.info("ohlcv_trino_served", symbol=sym, bars=len(bars_list)) + result = OHLCVResponse( + symbol=sym, exchange=str(rows[0]["exchange"]), bars=bars_list, count=len(bars_list) + ) + with _cold_lock: + _cold_cache[cache_key] = result + return result diff --git a/api/src/api/routers/pipeline.py b/api/src/api/routers/pipeline.py index 5c5c600..aafcefd 100644 --- a/api/src/api/routers/pipeline.py +++ b/api/src/api/routers/pipeline.py @@ -1,34 +1,19 @@ -from datetime import UTC, datetime - import structlog -from fastapi import APIRouter, Depends +from fastapi import APIRouter, HTTPException -from ..dependencies import get_client -from ..models.pipeline import PipelineLagItem, PipelineLagResponse -from ..trino_client import TrinoClient +from ..models.pipeline import PipelineLagResponse +from ..read_model import get_read_model log = structlog.get_logger() router = APIRouter(prefix="/pipeline", tags=["pipeline"]) -_SQL = """ - SELECT exchange, symbol, latest_event_ts, latest_ingest_ts, - staleness_seconds, freshness_status, health_score - FROM iceberg.marts.mart_exchange_health - ORDER BY symbol -""" - @router.get("/lag", response_model=PipelineLagResponse) -async def get_pipeline_lag( - client: TrinoClient = Depends(get_client), -) -> PipelineLagResponse: - rows = await client.fetch(_SQL) - items = [PipelineLagItem.model_validate(r) for r in rows] - healthy = sum(1 for i in items if i.freshness_status == "FRESH") - log.info("pipeline_lag_served", total=len(items), healthy=healthy) - return PipelineLagResponse( - items=items, - checked_at=datetime.now(UTC), - healthy_count=healthy, - total_count=len(items), - ) +async def get_pipeline_lag() -> PipelineLagResponse: + model = get_read_model() + if not model.ready: + raise HTTPException(status_code=503, detail="Service warming up") + if model.pipeline is None: + raise HTTPException(status_code=503, detail="Service warming up") + log.info("pipeline_lag_served", total=model.pipeline.total_count) + return model.pipeline diff --git a/api/src/api/routers/symbols.py b/api/src/api/routers/symbols.py index 9dcf39d..db8c974 100644 --- a/api/src/api/routers/symbols.py +++ b/api/src/api/routers/symbols.py @@ -1,24 +1,14 @@ -import structlog -from fastapi import APIRouter, Depends +from fastapi import APIRouter, HTTPException -from ..dependencies import get_client -from ..models.symbols import SymbolItem, SymbolsResponse -from ..trino_client import TrinoClient +from ..models.symbols import SymbolsResponse +from ..read_model import get_read_model -log = structlog.get_logger() router = APIRouter(prefix="/symbols", tags=["symbols"]) -_SQL = """ - SELECT symbol, exchange, status, base_asset, quote_asset, updated_at - FROM iceberg.normalized.symbol_config - ORDER BY symbol -""" - @router.get("", response_model=SymbolsResponse) -async def list_symbols( - client: TrinoClient = Depends(get_client), -) -> SymbolsResponse: - rows = await client.fetch(_SQL) - symbols = [SymbolItem.model_validate(r) for r in rows] - return SymbolsResponse(symbols=symbols, count=len(symbols)) +async def list_symbols() -> SymbolsResponse: + model = get_read_model() + if not model.ready or model.symbols is None: + raise HTTPException(status_code=503, detail="Service warming up") + return model.symbols diff --git a/api/src/api/trino_client.py b/api/src/api/trino_client.py index fa27d6d..0d80728 100644 --- a/api/src/api/trino_client.py +++ b/api/src/api/trino_client.py @@ -1,6 +1,7 @@ from typing import Any import anyio +import requests import structlog import trino @@ -19,6 +20,9 @@ def __init__( self._port = port self._user = user self._catalog = catalog + # Shared requests.Session so urllib3's HTTPConnectionPool persists across + # queries — avoids a new TCP handshake on every fetch() call. + self._http_session = requests.Session() def _connect(self) -> Any: return trino.dbapi.connect( # type: ignore[no-untyped-call] @@ -27,6 +31,7 @@ def _connect(self) -> Any: user=self._user, catalog=self._catalog, http_scheme="http", + http_session=self._http_session, ) async def fetch(self, sql: str, params: list[object] | None = None) -> list[dict[str, Any]]: diff --git a/api/tests/integration/test_endpoints.py b/api/tests/integration/test_endpoints.py index 6f90103..bb8f6eb 100644 --- a/api/tests/integration/test_endpoints.py +++ b/api/tests/integration/test_endpoints.py @@ -7,6 +7,12 @@ from api.dependencies import get_client from api.main import app +from api.models.liquidity import LiquidityResponse, SpreadResponse +from api.models.ohlcv import OHLCVBar, OHLCVResponse +from api.models.pipeline import PipelineLagItem, PipelineLagResponse +from api.models.symbols import SymbolItem, SymbolsResponse +from api.read_model import get_read_model +from api.routers.ohlcv import _cold_cache from api.trino_client import TrinoClient NOW = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) @@ -70,6 +76,45 @@ def _mock_client(rows: list[dict[str, Any]]) -> TrinoClient: return client +@pytest.fixture(autouse=True) +def setup_read_model(): + """Pre-populate the read model before each test and reset it after. + + This replaces the old pattern of dependency-overriding get_client() for + every endpoint test. Endpoints that still use Trino (OHLCV cold path) + continue to use app.dependency_overrides within their own tests. + """ + model = get_read_model() + model.ready = True + model.spread["BTCUSDT"] = SpreadResponse.model_validate(_LIQUIDITY_ROW) + model.liquidity["BTCUSDT"] = LiquidityResponse.model_validate(_LIQUIDITY_ROW) + model.ohlcv["BTCUSDT"] = OHLCVResponse( + symbol="BTCUSDT", + exchange="binance", + bars=[OHLCVBar.model_validate(_OHLCV_ROW)], + count=1, + ) + model.pipeline = PipelineLagResponse( + items=[PipelineLagItem.model_validate(_HEALTH_ROW)], + checked_at=NOW, + healthy_count=1, + total_count=1, + ) + model.symbols = SymbolsResponse( + symbols=[SymbolItem.model_validate(_SYMBOL_ROW)], + count=1, + ) + _cold_cache.clear() + yield + model.ready = False + model.spread.clear() + model.liquidity.clear() + model.ohlcv.clear() + model.pipeline = None + model.symbols = None + _cold_cache.clear() + + @pytest.fixture def http_client(): return AsyncClient(transport=ASGITransport(app=app), base_url="http://test") @@ -99,10 +144,8 @@ async def test_ready_trino_down(self, http_client: AsyncClient) -> None: class TestOHLCVEndpoint: async def test_get_ohlcv(self, http_client: AsyncClient) -> None: - app.dependency_overrides[get_client] = lambda: _mock_client([_OHLCV_ROW]) async with http_client as c: resp = await c.get("/ohlcv/btcusdt") - app.dependency_overrides.clear() assert resp.status_code == 200 body = resp.json() assert body["symbol"] == "BTCUSDT" @@ -110,74 +153,113 @@ async def test_get_ohlcv(self, http_client: AsyncClient) -> None: assert body["interval"] == "1m" async def test_get_ohlcv_not_found(self, http_client: AsyncClient) -> None: - app.dependency_overrides[get_client] = lambda: _mock_client([]) async with http_client as c: resp = await c.get("/ohlcv/UNKNOWN") - app.dependency_overrides.clear() assert resp.status_code == 404 async def test_get_ohlcv_symbol_uppercased(self, http_client: AsyncClient) -> None: - mock = _mock_client([_OHLCV_ROW]) - app.dependency_overrides[get_client] = lambda: mock + # lowercase request → model is keyed uppercase → response symbol is uppercase + async with http_client as c: + resp = await c.get("/ohlcv/btcusdt") + assert resp.status_code == 200 + assert resp.json()["symbol"] == "BTCUSDT" + + async def test_get_ohlcv_limit_slice(self, http_client: AsyncClient) -> None: + # limit < preloaded bars should slice correctly + model = get_read_model() + bar = OHLCVBar.model_validate(_OHLCV_ROW) + model.ohlcv["BTCUSDT"] = OHLCVResponse( + symbol="BTCUSDT", exchange="binance", bars=[bar, bar, bar], count=3 + ) async with http_client as c: - await c.get("/ohlcv/btcusdt") + resp = await c.get("/ohlcv/btcusdt?limit=2") + assert resp.status_code == 200 + assert resp.json()["count"] == 2 + + async def test_get_ohlcv_not_ready(self, http_client: AsyncClient) -> None: + get_read_model().ready = False + async with http_client as c: + resp = await c.get("/ohlcv/btcusdt") + assert resp.status_code == 503 + + async def test_get_ohlcv_historical_range(self, http_client: AsyncClient) -> None: + # from_ts triggers the cold (Trino) path; use params= so httpx encodes + correctly + app.dependency_overrides[get_client] = lambda: _mock_client([_OHLCV_ROW]) + async with http_client as c: + resp = await c.get("/ohlcv/btcusdt", params={"from_ts": NOW.isoformat()}) app.dependency_overrides.clear() - call_args = mock.fetch.call_args # type: ignore[union-attr] - assert "BTCUSDT" in call_args.args[1] + assert resp.status_code == 200 + assert resp.json()["count"] == 1 + + async def test_get_ohlcv_historical_not_found(self, http_client: AsyncClient) -> None: + app.dependency_overrides[get_client] = lambda: _mock_client([]) + async with http_client as c: + resp = await c.get("/ohlcv/UNKNOWN", params={"from_ts": NOW.isoformat()}) + app.dependency_overrides.clear() + assert resp.status_code == 404 class TestLiquidityEndpoints: async def test_get_spread(self, http_client: AsyncClient) -> None: - app.dependency_overrides[get_client] = lambda: _mock_client([_LIQUIDITY_ROW]) async with http_client as c: resp = await c.get("/spread/btcusdt") - app.dependency_overrides.clear() assert resp.status_code == 200 assert resp.json()["spread_bps"] == 0.4 async def test_get_spread_not_found(self, http_client: AsyncClient) -> None: - app.dependency_overrides[get_client] = lambda: _mock_client([]) async with http_client as c: resp = await c.get("/spread/UNKNOWN") - app.dependency_overrides.clear() assert resp.status_code == 404 + async def test_get_spread_not_ready(self, http_client: AsyncClient) -> None: + get_read_model().ready = False + async with http_client as c: + resp = await c.get("/spread/btcusdt") + assert resp.status_code == 503 + async def test_get_liquidity(self, http_client: AsyncClient) -> None: - app.dependency_overrides[get_client] = lambda: _mock_client([_LIQUIDITY_ROW]) async with http_client as c: resp = await c.get("/liquidity/btcusdt") - app.dependency_overrides.clear() assert resp.status_code == 200 assert resp.json()["market_signal"] == "BUY_PRESSURE" class TestPipelineEndpoint: async def test_get_pipeline_lag(self, http_client: AsyncClient) -> None: - app.dependency_overrides[get_client] = lambda: _mock_client([_HEALTH_ROW]) async with http_client as c: resp = await c.get("/pipeline/lag") - app.dependency_overrides.clear() assert resp.status_code == 200 body = resp.json() assert body["total_count"] == 1 assert body["healthy_count"] == 1 async def test_pipeline_lag_empty(self, http_client: AsyncClient) -> None: - app.dependency_overrides[get_client] = lambda: _mock_client([]) + get_read_model().pipeline = PipelineLagResponse( + items=[], checked_at=NOW, healthy_count=0, total_count=0 + ) async with http_client as c: resp = await c.get("/pipeline/lag") - app.dependency_overrides.clear() assert resp.status_code == 200 assert resp.json()["total_count"] == 0 + async def test_pipeline_not_ready(self, http_client: AsyncClient) -> None: + get_read_model().ready = False + async with http_client as c: + resp = await c.get("/pipeline/lag") + assert resp.status_code == 503 + class TestSymbolsEndpoint: async def test_list_symbols(self, http_client: AsyncClient) -> None: - app.dependency_overrides[get_client] = lambda: _mock_client([_SYMBOL_ROW]) async with http_client as c: resp = await c.get("/symbols") - app.dependency_overrides.clear() assert resp.status_code == 200 body = resp.json() assert body["count"] == 1 assert body["symbols"][0]["symbol"] == "BTCUSDT" + + async def test_symbols_not_ready(self, http_client: AsyncClient) -> None: + get_read_model().ready = False + async with http_client as c: + resp = await c.get("/symbols") + assert resp.status_code == 503 diff --git a/docker-compose.yml b/docker-compose.yml index 2ec7eff..e9abd9a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -383,7 +383,7 @@ services: - bash - -c - | - echo "=== Waiting for Flink to create normalized schema ===" && \ + echo "=== Waiting for normalized.book_ticker ===" && \ until python3 -c " import trino, sys try: @@ -399,7 +399,23 @@ services: echo " normalized.book_ticker not ready, retrying in 10s..." sleep 10 done && \ - echo "=== normalized schema ready, running dbt ===" && \ + echo "=== Waiting for first OHLCV bar (1-min window + checkpoint ~120 s) ===" && \ + until python3 -c " + import trino, sys + try: + conn = trino.dbapi.connect(host='trino', port=8080, user='dbt', catalog='iceberg') + cur = conn.cursor() + cur.execute('SELECT 1 FROM iceberg.normalized.ohlcv_1m LIMIT 1') + rows = cur.fetchall() + sys.exit(0 if rows else 1) + except Exception as e: + print(f'Not ready: {e}') + sys.exit(1) + " 2>/dev/null; do + echo " normalized.ohlcv_1m not ready, retrying in 10s..." + sleep 10 + done && \ + echo "=== normalized data ready, running dbt ===" && \ dbt run --profiles-dir . --no-send-anonymous-usage-stats && \ echo "=== dbt run complete ===" depends_on: diff --git a/docs/DEBUGGING_PHASE5.md b/docs/DEBUGGING_PHASE5.md index 667970f..a73332a 100644 --- a/docs/DEBUGGING_PHASE5.md +++ b/docs/DEBUGGING_PHASE5.md @@ -225,3 +225,300 @@ Phase 5 originally used a bare `lib/` directory in both `flink/jobs/` and `spark | Flink UI | 8081 | | Redpanda Console | 8080 | | Debezium | 8083 | + +--- + +## Load Test — OHLCV 404 on Fresh Start — dbt Timing + Poller Race (2026-05-18) + +### Symptom + +After `docker system prune -f` + `make up` + `make load-test-full`, k6 shows: +- `ohlcv 200` → 0% for the first 60–90 s of the test, then 100% +- k6 final report: all checks pass (100%) — but only because warm-up 404s happened before the threshold window closes + +Grafana "Success Rate by Endpoint": `/ohlcv/{param}` appears with 100% success only from the moment k6 starts, not from `make up`. + +### Root cause 1 — `dbt-runner` creates empty `mart_ohlcv` + +`dbt-runner` in docker-compose waited for `normalized.book_ticker` to have rows, then immediately ran `dbt`. At that moment, `normalized.ohlcv_1m` was still empty — Flink needs ~120 s for the first 1-minute tumbling window to close and checkpoint. Result: `mart_ohlcv` created with 0 rows. The dbt log line that reveals this: + +``` +4 of 6 OK created sql table model marts.mart_ohlcv ... [CREATE TABLE (0 rows)] +``` + +**Fix applied:** Updated `dbt-runner` command in `docker-compose.yml` to wait for BOTH `normalized.book_ticker` AND `normalized.ohlcv_1m` before running dbt. `make up` now takes ~2 min longer, but `mart_ohlcv` is always correctly populated on first start. + +### Root cause 2 — `make load-test-full` had no timeout guards + +All three wait loops were infinite: +- Step 1 (Flink RUNNING): could spin forever if `flink-init` failed silently +- Step 3 (book_ticker): could spin if ingest was broken +- Step 3 (ohlcv_1m): could spin if the Flink OHLCV job crashed after submission + +**Fix applied:** Added `date +%s` based timeouts to all three loops: +- Flink RUNNING: 3-minute timeout (job submit + state transition) +- book_ticker: 3-minute timeout (first checkpoint) +- ohlcv_1m: 5-minute timeout (1-min window must close first) + +On timeout each loop prints a specific diagnostic command and exits 1. + +### Root cause 3 — Step 5 only verified `mart_liquidity`, not `mart_ohlcv` + +The original Step 5 checked `mart_liquidity` row count as a proxy for "dbt ran successfully". But dbt can successfully create an empty `mart_ohlcv` (0 rows) while `mart_liquidity` has rows — they source from different upstream tables. + +**Fix applied:** Added explicit `mart_ohlcv` row count check to Step 5. + +### Root cause 4 — k6 started before API poller refreshed from newly-populated mart + +Sequence: +1. Step 4 runs dbt → `mart_ohlcv` now has rows +2. Step 5 (old): immediately ran k6 +3. API poller refreshes OHLCV every 60 s — the ReadModel still had the old empty dict +4. First 0–60 s of k6: all OHLCV requests return 404 + +**Fix applied:** Added readiness poll at the end of Step 5 using the `/ready` endpoint. k6 starts only after `/ready` returns 200. + +### Root cause 5 — Step 5 polling used the real OHLCV endpoint + +The polling loop (`curl /ohlcv/btcusdt`) generated 404 traffic that Prometheus recorded against the `/ohlcv/{param}` endpoint. This: +- Made the "Success Rate by Endpoint" panel show the endpoint appearing late (only when 2xx started) +- Added noise to the "HTTP Status Codes" panel + +**Fix applied:** Extended `/ready` to also check `model.ready and model.ohlcv` (not just Trino ping). Step 5 now polls `/ready` instead of the real endpoint. The `/ready` endpoint has three meaningful states: + +| Response | Meaning | +|---|---| +| 200 `ready` | Trino reachable + ReadModel populated | +| 503 `warming_up` | Trino reachable but ReadModel ohlcv dict is empty | +| 503 `unavailable` | Trino not reachable | + +### `docker system prune -f` does NOT remove named volumes + +**Common misconception:** "I ran `docker system prune -f` to clean up data" — this removes stopped containers, unused networks, dangling images, and build cache. Named volumes (`redpanda-data`, `minio-data`, etc.) are NOT removed unless you also pass `--volumes`. + +**Correct cleanup command:** +```bash +make down # = docker compose down -v — removes containers AND named volumes +make up # fresh start, all volumes recreated +``` + +If you run `docker system prune -f` without `make down` first, old Iceberg data, Kafka offsets, and Flink checkpoints all survive in named volumes. This can cause confusing state where old data from a previous session passes "does the table have rows?" checks without being recent. + +--- + +## Grafana — "Live Market Prices" Shows "No Data" (2026-05-18) + +### Symptom + +The `BTC / USD`, `ETH / USD`, `SOL / USD`, `BNB / USD`, `XRP / USD` stat panels all show "No data". Other panels (Bid-Ask Spread, Order Book Imbalance, Health Score) work correctly. + +### Root cause — Prometheus label case mismatch + +Grafana queries: `market_mid_price_usd{symbol="btcusdt"}` (lowercase) + +But `_poll_liquidity` in `poller.py` built Prometheus labels from: +```python +sym = str(r["symbol"]).upper() # "BTCUSDT" +lbl = {"symbol": sym, ...} # label is uppercase +MID_PRICE.labels(**lbl).set(r["mid_price"]) +``` + +Prometheus stored `market_mid_price_usd{symbol="BTCUSDT"}`. The query `{symbol="btcusdt"}` finds nothing → "No data". + +**Why Bid-Ask Spread still worked:** Its Grafana query is `market_spread_bps` with no `{symbol=...}` filter — all series (regardless of case) are returned. + +**Fix applied:** Split the label construction from the ReadModel key: +```python +sym = str(r["symbol"]).upper() # ReadModel dict key (uppercase) +lbl = {"symbol": str(r["symbol"]).lower(), ...} # Prometheus labels (lowercase) +``` + +**Rule:** ReadModel lookups use uppercase (routers call `.upper()` on the URL path param). Prometheus labels use lowercase (matches Grafana query conventions and the mart's storage format). These are two different concerns — don't conflate them via a shared `lbl` dict. + +--- + +## Which Component Fails Most Often? + +Short answer: **Flink**, followed by the startup sequencing glue. + +### Flink (highest failure rate) + +Flink is the most operationally fragile component for several reasons: + +1. **Job submission failures** — `flink-init` is a one-shot container. If any Flink job fails to submit (ImportError, Java class not found, SQL parse error, classpath issue), the entire downstream cascade fails: `flink-init → dbt-runner → api → prometheus → grafana` all stay in "Created" state. The failure mode is silent from the outside — you see `docker ps -a` showing downstream services as "Created" (not "Exited"), which is confusing until you know the cascade rule. **Diagnosis:** Always check `docker logs ticksense-flink-init-1` first. + +2. **Python/Java binding gaps** — PyFlink 1.18 does not expose many Java-side APIs (`KafkaRecordDeserializationSchema`, `set_deserializer()`). The `try/except ImportError: pass` pattern that PyFlink uses internally can silently swallow import failures, leaving classes undefined. No error at import time — error only when the class is used. + +3. **Image rebuild trap** — Flink serializes job functions via `cloudpickle` at submission time, capturing the Python environment from the jobmanager. If jobmanager and taskmanager have different versions of any dependency, the deserialized function fails on the taskmanager. Always rebuild all three images together: `docker compose build flink-jobmanager flink-taskmanager flink-init`. + +4. **Checkpoint/S3 state** — After a restart, Flink tries to resume from the latest checkpoint in `s3://ticksense/flink-checkpoints`. If the checkpoint references Kafka offsets that no longer exist (topic was re-created with different offsets), Flink fails to restart the job. Fix: delete the checkpoint dir in MinIO and restart fresh. + +5. **Memory pressure** — The default `taskmanager.numberOfTaskSlots: 4` with 3 running jobs can exhaust the 512M JVM heap on resource-constrained machines. Symptom: `OutOfMemoryError` in taskmanager logs, jobs restart repeatedly. + +### Startup sequencing (second most fragile) + +The `dbt-runner → api → prometheus → grafana` chain means a dbt failure (even a partial one like empty marts) propagates to the API serving stale/empty data. This is not a crash — it's a silent data quality failure that only surfaces when you query the API or run a load test. + +**The key insight:** `depends_on: service_completed_successfully` only checks that a service exited with code 0. It says nothing about whether the data that service was supposed to produce is actually there or is fresh. Build readiness checks into your operational tooling (like `make load-test-full`), not just into docker-compose dependency conditions. + +### Trino (cold-start flakiness) + +Trino takes 30–40 s to fully initialize. The healthcheck (`/v1/info` must show `"starting": false`) catches most issues, but early query failures can still occur if Trino is warming up its query planner. Symptom: first few API requests get `503 unavailable` from `/ready`. Fix: add a retry loop in your client or wait 10 s after `make up` before querying. + +**If Trino fails:** +```bash +docker compose logs trino | grep -i error +docker compose restart trino +# wait 40 s for healthcheck to pass +curl http://localhost:8082/v1/info +``` + +### Redpanda (most stable) + +Redpanda is by far the most stable component. In all testing sessions it has never failed to start or become unavailable once running. The only operational issue is topic creation on restart — `redpanda-init` uses `|| true` so it silently skips already-existing topics, which is correct behavior. + +**If Redpanda fails:** +```bash +docker compose logs redpanda | tail -30 +# Check disk space — Redpanda is sensitive to disk pressure +docker compose restart redpanda +# Let healthcheck pass (rpk cluster health), then restart redpanda-init to recreate topics if needed +``` + +### MinIO/Iceberg (stable, but stateful) + +MinIO is stable but stateful. If MinIO data is corrupted (e.g., partial writes during a crash), Iceberg metadata can become inconsistent. The symptom is Trino returning errors on specific tables. Fix: `make down && make up` (fresh volumes). Never delete individual files from MinIO manually — always operate through Iceberg procedures or `make down`. + +--- + +## Load Test — k6 OHLCV 100% 404 + High Latency Diagnosis (2026-05-17) + +### Symptom + +k6 load test (10 VUs, 3.5 min ramp-up profile) shows: +- `ohlcv 200`: 0% — all 409 requests return 404 +- `http_req_failed`: 16.66% (= 409 failed / 2454 total — all failures are from ohlcv) +- `errors`: 0.00% — no 5xx errors +- p(95) latency: 149ms at only 10 VUs + +### Root cause 1 — OHLCV 404 (data pipeline gap) + +`iceberg.marts.mart_ohlcv` is empty. The API correctly returns 404 when no rows exist. + +The chain is: `Flink ohlcv_1m.py → iceberg_cat.normalized.ohlcv_1m → dbt stg_ohlcv_1m → dbt mart_ohlcv → API`. + +If either step is missing, the mart is empty: +1. The Flink OHLCV job (`flink/jobs/ohlcv_1m.py`) was not running during the test +2. `dbt run` was not executed after Flink started writing to `normalized.ohlcv_1m` + +The distinguishing signal is `errors: 0%` — if the table didn't exist at all, Trino would throw an exception → the API would return 500 → errors would be non-zero. 0% errors + 16.66% http_req_failed = 404 = table exists, data absent. + +**Fix:** Ensure the Flink OHLCV job is running before load testing, and that `dbt run` has been executed. In docker-compose, `dbt-runner` runs automatically after Flink initialises the schema. + +### Root cause 2 — Latency (no caching + new connection per request) + +Three stacked problems: +1. `TrinoClient._connect()` opens a new TCP connection to Trino on **every request** (connection setup ~20-50ms overhead even before the query executes) +2. Trino has a ~50-100ms minimum query latency floor per query due to distributed query planning overhead — even `WHERE symbol = X LIMIT 60` pays this cost +3. Zero response caching: OHLCV bars change at most every 60s (one checkpoint), liquidity every ~5s, but every API request hits Trino cold + +At 10 VUs: p(95)=149ms. At 100+ VUs: Trino connection storm pushes latency past 500ms. + +**Fix applied (2026-05-17):** +- `api/src/api/dependencies.py`: `get_client()` decorated with `@lru_cache(maxsize=1)` → singleton TrinoClient +- `api/src/api/routers/ohlcv.py`: `TTLCache(maxsize=500, ttl=60)` keyed by `(symbol, limit, from_ts, to_ts)` — 60s TTL aligns with the 1-minute bar interval; cached hits are microsecond-fast +- `api/src/api/routers/liquidity.py`: `TTLCache(maxsize=200, ttl=5)` for spread and liquidity — 5s TTL matches order book update cadence +- `api/tests/integration/test_endpoints.py`: `autouse` fixture clears module-level caches before each test to prevent cross-test cache hits poisoning `mock.fetch.call_args` +- `api/pyproject.toml`: added `cachetools>=5.3` dependency + +**Thread safety note:** `cachetools.TTLCache` is not thread-safe. All cache reads/writes are protected with a `threading.Lock`. This matters because `anyio.to_thread.run_sync` runs Trino calls in a thread pool — without the lock, concurrent requests can corrupt the cache. + +### Poller-as-read-model (2026-05-17) + +The next architectural step was removing Trino entirely from the hot request path. + +**Before:** Every API request → TTL cache check → Trino query (on cache miss) +**After:** Every API request → in-process `ReadModel` dict lookup (<1 µs). Trino is only called by the background poller on its refresh schedule. + +**`api/src/api/read_model.py`** — New module. `ReadModel` dataclass holds: +- `spread: dict[str, SpreadResponse]` and `liquidity: dict[str, LiquidityResponse]` — keyed by uppercase symbol +- `ohlcv: dict[str, OHLCVResponse]` — last 60 bars per symbol, newest-first +- `pipeline: PipelineLagResponse | None` +- `symbols: SymbolsResponse | None` +- `ready: bool` — False until first full poll cycle completes + +**`poller.py`** — Full rewrite. Replaces the old single-query poller with four separate refresh functions, each called on its own cadence: +- `_poll_liquidity` / `_poll_pipeline`: every 30 s, also update Prometheus gauges +- `_poll_ohlcv`: every 60 s, single window-function query for ALL symbols +- `_poll_symbols`: every 300 s (CDC data rarely changes) + +OHLCV SQL uses `ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY window_start DESC)` to get the 60 most-recent bars per symbol in one round trip, bounded to `WHERE window_start >= NOW() - INTERVAL '2' HOUR` to keep the scan cheap. + +**Endpoint routing:** +- Spread / liquidity / pipeline / symbols: read from `ReadModel`, no Trino at all +- OHLCV default (`limit ≤ 60`, no time filter): read from `ReadModel`, sub-millisecond +- OHLCV historical (`from_ts` / `to_ts` set, or `limit > 60`): cold path to Trino, with `TTLCache(60 s)` so repeated analytical queries don't pound Trino + +**Cold-start (before first poll completes):** endpoints return `503 Service Warming Up`. The initial poll runs immediately in `run_poller()` via `asyncio.gather(return_exceptions=True)` — in practice the warm-up window is < 1 s on a healthy stack. + +**Thread-safety:** `ReadModel` fields are updated by swapping entire dicts (`_model.spread = new_dict`). Under CPython's GIL, dict reference assignment is atomic — request handlers either see the old dict or the new one, never a partial update. The `TTLCache` on the cold path still uses a `threading.Lock` as before. + +--- + +## Load Test — Symbol Case Bug: spread/liquidity 100% 404 Despite Mart Having Data (2026-05-18) + +### Symptom + +After the ReadModel architecture was deployed (p(95)=10 ms confirmed), a second load test showed: +- `spread 200` ↳ 0% (all 404) +- `liquidity 200` ↳ 0% (all 404) +- `ohlcv 200` ↳ 0% (all 404) + +Yet `mart_liquidity` had 5 rows and `poll_liquidity_ok symbols=5` appeared in the API logs. + +### Root cause 1 — Symbol case mismatch (spread + liquidity) + +`mart_liquidity` stores symbols in **lowercase** (`btcusdt`, `ethusdt`, …). The poller built the ReadModel dict with lowercase keys: + +```python +sym = str(r["symbol"]) # "btcusdt" +new_spread[sym] = ... # keyed "btcusdt" +``` + +But the router looked up with `.upper()`: + +```python +sym = symbol.upper() # "BTCUSDT" +model.spread.get(sym) # None → 404 +``` + +The unit tests passed because the test fixture directly populated the model with `"BTCUSDT"` (uppercase), matching what the router expected. The mismatch only appeared in production where the mart uses lowercase. + +**Fix:** `sym = str(r["symbol"]).upper()` in both `_poll_liquidity` and `_poll_ohlcv` in `poller.py`. + +**Rule:** Always normalize symbol keys to uppercase at the point where the ReadModel dict is built, not at the lookup site. That way, routers can use `.upper()` without coupling to the mart's storage convention. + +### Root cause 2 — mart_ohlcv empty (dbt timing) + +`mart_ohlcv` had 0 rows even though Flink was running. The cause: + +``` +make up + → flink-init submits Flink jobs (~30 s) + → dbt-runner runs dbt IMMEDIATELY after flink-init exits + At this point Flink has not completed a single checkpoint. + normalized.ohlcv_1m is empty → mart_ohlcv is empty. + → api starts, poller polls → poll_ohlcv_ok symbols=0 +``` + +Docker Compose `depends_on: service_completed_successfully` only waits for the service to *exit*, not for the system it triggered to *produce data*. The dbt-runner exits before Flink writes its first row. + +**Fix:** After `make up`, wait ~60 s for Flink to checkpoint, then run `make dbt-run` manually. The `make load-test-full` target automates this: + +1. Waits until 2 Flink jobs are `RUNNING` +2. Waits until `normalized.book_ticker` has rows (proxy for first checkpoint) +3. Runs `dbt run` +4. Verifies `mart_liquidity` has rows +5. Runs k6 + +**Rule:** Never assume `dbt-runner` produced fresh marts. It runs once at startup under ideal conditions (stack just started, no data yet). For any load test or manual verification, run `make dbt-run` after the pipeline has been flowing for at least one minute. diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index d3738dd..115e568 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -205,7 +205,7 @@ Views recalculate `current_timestamp` on every Trino query → always fresh liqu ``` GET /health liveness check -GET /ready readiness check (pings Trino) +GET /ready readiness: ready (200) / warming_up (503 — ReadModel cold) / unavailable (503 — Trino down) GET /metrics Prometheus scrape endpoint GET /ohlcv/{symbol} 1m OHLCV bars (last 60 by default, filterable by ts range) GET /spread/{symbol} best bid/ask, spread in bps @@ -239,11 +239,19 @@ GET /symbols active trading pairs from CDC symbol_config --- -## Phase 5 — Ops + Observability (partial ✅ — Airflow + Spark pending E2E) +## Phase 5 — Ops + Observability (✅ load test passing — Airflow + Spark pending E2E) **Goal:** Airflow orchestration, Spark backfill/compaction, data quality checks, freshness SLA alerts. -**Status (2026-05-17):** Code complete. Flink kafka-offset fix shipped and verified. Airflow and Spark implemented but not yet E2E tested against the live stack. +**Status (2026-05-18):** Code complete. Flink kafka-offset fix shipped and verified. Airflow and Spark implemented but not yet E2E tested against the live stack. + +Load test fully passing as of 2026-05-18 (branch `fix/ohlcv_endpoints`): k6 10 VUs, 3.5 min — `p(95)=10ms`, `http_req_failed=0.00%`, `checks=100%`. All 16 per-endpoint checks pass. See `DEBUGGING_PHASE5.md` for the full diagnosis (5 root causes fixed). + +**Fixes shipped (2026-05-18):** +- `dbt-runner` now waits for `normalized.ohlcv_1m` rows before running dbt (previously ran before Flink's first 1-min window closed) +- `make load-test-full` hardened: each wait loop has an explicit timeout (3 min / 3 min / 5 min) and exits with a clear error; Step 5 verifies `mart_ohlcv` rows and polls `/ready` before k6 starts +- `/ready` endpoint extended to three meaningful states: `ready` (200), `warming_up` (503 — Trino OK but ReadModel not yet populated), `unavailable` (503 — Trino unreachable) +- Prometheus label case fixed: ReadModel dict keys uppercase (`"BTCUSDT"`), Prometheus labels lowercase (`"btcusdt"`) — Grafana queries with `{symbol="btcusdt"}` now resolve correctly ### Kafka offset fix ✅ diff --git a/flink/jobs/normalize.py b/flink/jobs/normalize.py index 32aae82..f1ace57 100644 --- a/flink/jobs/normalize.py +++ b/flink/jobs/normalize.py @@ -126,6 +126,28 @@ def _to_epoch_ms(ts: Any) -> int: return int(datetime.now(UTC).timestamp() * 1000) +# TODO: this does not pass the lint test. Refactor later +# def _is_valid_record(record: Row) -> bool: +# """Return False for messages that can't be keyed (malformed JSON or missing fields).""" +# try: +# data = json.loads(record[PAYLOAD_IDX]) +# if "exchange" not in data or "symbol" not in data: +# log.warning( +# "dropping_malformed_record", +# offset=record[OFFSET_IDX], +# partition=record[PARTITION_IDX] +# ) +# return False +# return True +# except Exception: +# log.warning( +# "dropping_unparseable_record", +# offset=record[OFFSET_IDX], +# partition=record[PARTITION_IDX] +# ) +# return False + + def _kafka_key(record: Row) -> str: """Extract routing key from Kafka record Row(payload, partition, offset).""" data = json.loads(record[PAYLOAD_IDX]) @@ -296,7 +318,14 @@ def main() -> None: KAFKA_RECORD_TYPE, ) + # TODO: finish after refactor `_is_valid` # ── Stateful DataStream processing ──────────────────────────────────────── + # processed_stream = ( + # raw_stream.filter(_is_valid_record).key_by(_kafka_key).process( + # OrderBookProcessor(), output_type=BOOK_TICKER_TYPE + # ) + # ) + processed_stream = raw_stream.key_by(_kafka_key).process( OrderBookProcessor(), output_type=BOOK_TICKER_TYPE ) diff --git a/infra/config/grafana/dashboards/ticksense.json b/infra/config/grafana/dashboards/ticksense.json index d244590..bcdbc5b 100644 --- a/infra/config/grafana/dashboards/ticksense.json +++ b/infra/config/grafana/dashboards/ticksense.json @@ -1,20 +1,34 @@ { - "title": "TickSense — Real-time Crypto Lakehouse", + "title": "TickSense \u2014 Real-time Crypto Lakehouse", "uid": "ticksense-api", "schemaVersion": 39, "version": 2, "refresh": "10s", - "time": { "from": "now-1h", "to": "now" }, + "time": { + "from": "now-1h", + "to": "now" + }, "timezone": "browser", - "tags": ["ticksense"], + "tags": [ + "ticksense" + ], "panels": [ { "id": 1, "type": "stat", "title": "Requests / min", - "gridPos": { "h": 4, "w": 8, "x": 0, "y": 0 }, + "gridPos": { + "h": 4, + "w": 8, + "x": 0, + "y": 0 + }, "options": { - "reduceOptions": { "calcs": ["lastNotNull"] }, + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, "colorMode": "background", "graphMode": "area", "textMode": "auto" @@ -23,10 +37,17 @@ "defaults": { "unit": "short", "decimals": 1, - "color": { "mode": "thresholds" }, + "color": { + "mode": "thresholds" + }, "thresholds": { "mode": "absolute", - "steps": [{ "color": "green", "value": null }] + "steps": [ + { + "color": "green", + "value": null + } + ] } } }, @@ -42,9 +63,18 @@ "id": 2, "type": "stat", "title": "P95 Latency", - "gridPos": { "h": 4, "w": 8, "x": 8, "y": 0 }, + "gridPos": { + "h": 4, + "w": 8, + "x": 8, + "y": 0 + }, "options": { - "reduceOptions": { "calcs": ["lastNotNull"] }, + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, "colorMode": "background", "graphMode": "area", "textMode": "auto" @@ -53,13 +83,24 @@ "defaults": { "unit": "ms", "decimals": 0, - "color": { "mode": "thresholds" }, + "color": { + "mode": "thresholds" + }, "thresholds": { "mode": "absolute", "steps": [ - { "color": "green", "value": null }, - { "color": "yellow", "value": 100 }, - { "color": "red", "value": 500 } + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 100 + }, + { + "color": "red", + "value": 500 + } ] } } @@ -76,9 +117,18 @@ "id": 3, "type": "stat", "title": "Error Rate", - "gridPos": { "h": 4, "w": 8, "x": 16, "y": 0 }, + "gridPos": { + "h": 4, + "w": 8, + "x": 16, + "y": 0 + }, "options": { - "reduceOptions": { "calcs": ["lastNotNull"] }, + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, "colorMode": "background", "graphMode": "area", "textMode": "auto" @@ -87,13 +137,24 @@ "defaults": { "unit": "percent", "decimals": 2, - "color": { "mode": "thresholds" }, + "color": { + "mode": "thresholds" + }, "thresholds": { "mode": "absolute", "steps": [ - { "color": "green", "value": null }, - { "color": "yellow", "value": 1 }, - { "color": "red", "value": 5 } + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + }, + { + "color": "red", + "value": 5 + } ] } } @@ -110,15 +171,33 @@ "id": 4, "type": "timeseries", "title": "Request Rate by Endpoint", - "gridPos": { "h": 8, "w": 12, "x": 0, "y": 4 }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 4 + }, "options": { - "tooltip": { "mode": "multi", "sort": "desc" }, - "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] } + "tooltip": { + "mode": "multi", + "sort": "desc" + }, + "legend": { + "displayMode": "table", + "placement": "bottom", + "calcs": [ + "mean", + "max" + ] + } }, "fieldConfig": { "defaults": { "unit": "reqps", - "custom": { "lineWidth": 2, "fillOpacity": 10 } + "custom": { + "lineWidth": 2, + "fillOpacity": 10 + } } }, "targets": [ @@ -133,29 +212,50 @@ "id": 5, "type": "timeseries", "title": "Response Time Percentiles", - "gridPos": { "h": 8, "w": 12, "x": 12, "y": 4 }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 4 + }, "options": { - "tooltip": { "mode": "multi", "sort": "desc" }, - "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] } + "tooltip": { + "mode": "multi", + "sort": "desc" + }, + "legend": { + "displayMode": "table", + "placement": "bottom", + "calcs": [ + "mean", + "max" + ] + } }, "fieldConfig": { "defaults": { "unit": "s", - "custom": { "lineWidth": 2, "fillOpacity": 5 } + "custom": { + "lineWidth": 2, + "fillOpacity": 5 + } } }, "targets": [ { "expr": "histogram_quantile(0.50, sum(rate(api_request_duration_seconds_bucket[5m])) by (le))", - "legendFormat": "p50", "refId": "A" + "legendFormat": "p50", + "refId": "A" }, { "expr": "histogram_quantile(0.95, sum(rate(api_request_duration_seconds_bucket[5m])) by (le))", - "legendFormat": "p95", "refId": "B" + "legendFormat": "p95", + "refId": "B" }, { "expr": "histogram_quantile(0.99, sum(rate(api_request_duration_seconds_bucket[5m])) by (le))", - "legendFormat": "p99", "refId": "C" + "legendFormat": "p99", + "refId": "C" } ] }, @@ -163,53 +263,97 @@ "id": 6, "type": "timeseries", "title": "HTTP Status Codes", - "gridPos": { "h": 6, "w": 24, "x": 0, "y": 12 }, + "gridPos": { + "h": 6, + "w": 12, + "x": 0, + "y": 12 + }, "options": { - "tooltip": { "mode": "multi", "sort": "desc" }, - "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["sum"] } + "tooltip": { + "mode": "multi", + "sort": "desc" + }, + "legend": { + "displayMode": "table", + "placement": "bottom", + "calcs": [ + "sum" + ] + } }, "fieldConfig": { "defaults": { "unit": "reqps", - "custom": { "lineWidth": 1, "fillOpacity": 20 } + "custom": { + "lineWidth": 1, + "fillOpacity": 20 + } }, "overrides": [ { - "matcher": { "id": "byFrameRefID", "options": "ERR" }, - "properties": [{ "id": "color", "value": { "mode": "fixed", "fixedColor": "red" } }] + "matcher": { + "id": "byFrameRefID", + "options": "ERR" + }, + "properties": [ + { + "id": "color", + "value": { + "mode": "fixed", + "fixedColor": "red" + } + } + ] } ] }, "targets": [ { "expr": "sum(rate(api_requests_total{status_code=~\"2..\"}[1m]))", - "legendFormat": "2xx success", "refId": "OK" + "legendFormat": "2xx success", + "refId": "OK" }, { "expr": "sum(rate(api_requests_total{status_code=~\"4..\"}[1m]))", - "legendFormat": "4xx client error", "refId": "CLI" + "legendFormat": "4xx client error", + "refId": "CLI" }, { "expr": "sum(rate(api_requests_total{status_code=~\"5..\"}[1m]))", - "legendFormat": "5xx server error", "refId": "ERR" + "legendFormat": "5xx server error", + "refId": "ERR" } ] }, - { "id": 7, "type": "row", "title": "Live Market Prices", - "gridPos": { "h": 1, "w": 24, "x": 0, "y": 18 }, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 18 + }, "collapsed": false }, { "id": 8, "type": "stat", "title": "BTC / USD", - "gridPos": { "h": 4, "w": 6, "x": 0, "y": 19 }, + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 19 + }, "options": { - "reduceOptions": { "calcs": ["lastNotNull"] }, + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, "colorMode": "background", "graphMode": "area", "textMode": "auto" @@ -218,10 +362,17 @@ "defaults": { "unit": "currencyUSD", "decimals": 0, - "color": { "mode": "thresholds" }, + "color": { + "mode": "thresholds" + }, "thresholds": { "mode": "absolute", - "steps": [{ "color": "blue", "value": null }] + "steps": [ + { + "color": "blue", + "value": null + } + ] } } }, @@ -237,9 +388,18 @@ "id": 9, "type": "stat", "title": "ETH / USD", - "gridPos": { "h": 4, "w": 6, "x": 6, "y": 19 }, + "gridPos": { + "h": 4, + "w": 6, + "x": 6, + "y": 19 + }, "options": { - "reduceOptions": { "calcs": ["lastNotNull"] }, + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, "colorMode": "background", "graphMode": "area", "textMode": "auto" @@ -248,10 +408,17 @@ "defaults": { "unit": "currencyUSD", "decimals": 0, - "color": { "mode": "thresholds" }, + "color": { + "mode": "thresholds" + }, "thresholds": { "mode": "absolute", - "steps": [{ "color": "blue", "value": null }] + "steps": [ + { + "color": "blue", + "value": null + } + ] } } }, @@ -267,9 +434,18 @@ "id": 10, "type": "stat", "title": "SOL / USD", - "gridPos": { "h": 4, "w": 4, "x": 12, "y": 19 }, + "gridPos": { + "h": 4, + "w": 4, + "x": 12, + "y": 19 + }, "options": { - "reduceOptions": { "calcs": ["lastNotNull"] }, + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, "colorMode": "background", "graphMode": "area", "textMode": "auto" @@ -278,10 +454,17 @@ "defaults": { "unit": "currencyUSD", "decimals": 2, - "color": { "mode": "thresholds" }, + "color": { + "mode": "thresholds" + }, "thresholds": { "mode": "absolute", - "steps": [{ "color": "blue", "value": null }] + "steps": [ + { + "color": "blue", + "value": null + } + ] } } }, @@ -297,9 +480,18 @@ "id": 11, "type": "stat", "title": "BNB / USD", - "gridPos": { "h": 4, "w": 4, "x": 16, "y": 19 }, + "gridPos": { + "h": 4, + "w": 4, + "x": 16, + "y": 19 + }, "options": { - "reduceOptions": { "calcs": ["lastNotNull"] }, + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, "colorMode": "background", "graphMode": "area", "textMode": "auto" @@ -308,10 +500,17 @@ "defaults": { "unit": "currencyUSD", "decimals": 2, - "color": { "mode": "thresholds" }, + "color": { + "mode": "thresholds" + }, "thresholds": { "mode": "absolute", - "steps": [{ "color": "blue", "value": null }] + "steps": [ + { + "color": "blue", + "value": null + } + ] } } }, @@ -327,9 +526,18 @@ "id": 12, "type": "stat", "title": "XRP / USD", - "gridPos": { "h": 4, "w": 4, "x": 20, "y": 19 }, + "gridPos": { + "h": 4, + "w": 4, + "x": 20, + "y": 19 + }, "options": { - "reduceOptions": { "calcs": ["lastNotNull"] }, + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, "colorMode": "background", "graphMode": "area", "textMode": "auto" @@ -338,10 +546,17 @@ "defaults": { "unit": "currencyUSD", "decimals": 4, - "color": { "mode": "thresholds" }, + "color": { + "mode": "thresholds" + }, "thresholds": { "mode": "absolute", - "steps": [{ "color": "blue", "value": null }] + "steps": [ + { + "color": "blue", + "value": null + } + ] } } }, @@ -357,22 +572,49 @@ "id": 13, "type": "timeseries", "title": "Bid-Ask Spread (bps)", - "gridPos": { "h": 8, "w": 12, "x": 0, "y": 23 }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 23 + }, "options": { - "tooltip": { "mode": "multi", "sort": "desc" }, - "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["last", "max"] } + "tooltip": { + "mode": "multi", + "sort": "desc" + }, + "legend": { + "displayMode": "table", + "placement": "bottom", + "calcs": [ + "last", + "max" + ] + } }, "fieldConfig": { "defaults": { "unit": "short", "decimals": 2, - "custom": { "lineWidth": 2, "fillOpacity": 8 }, + "custom": { + "lineWidth": 2, + "fillOpacity": 8 + }, "thresholds": { "mode": "absolute", "steps": [ - { "color": "green", "value": null }, - { "color": "yellow", "value": 5 }, - { "color": "red", "value": 20 } + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 5 + }, + { + "color": "red", + "value": 20 + } ] } } @@ -390,10 +632,24 @@ "type": "timeseries", "title": "Order Book Imbalance", "description": "Positive = buy pressure, negative = sell pressure. Range: -1 to +1.", - "gridPos": { "h": 8, "w": 12, "x": 12, "y": 23 }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 23 + }, "options": { - "tooltip": { "mode": "multi", "sort": "desc" }, - "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["last"] } + "tooltip": { + "mode": "multi", + "sort": "desc" + }, + "legend": { + "displayMode": "table", + "placement": "bottom", + "calcs": [ + "last" + ] + } }, "fieldConfig": { "defaults": { @@ -401,7 +657,10 @@ "decimals": 3, "min": -1, "max": 1, - "custom": { "lineWidth": 2, "fillOpacity": 10 } + "custom": { + "lineWidth": 2, + "fillOpacity": 10 + } } }, "targets": [ @@ -412,21 +671,34 @@ } ] }, - { "id": 15, "type": "row", "title": "Pipeline Health", - "gridPos": { "h": 1, "w": 24, "x": 0, "y": 31 }, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 31 + }, "collapsed": false }, { "id": 16, "type": "stat", "title": "Health Score by Symbol", - "gridPos": { "h": 5, "w": 12, "x": 0, "y": 32 }, + "gridPos": { + "h": 5, + "w": 12, + "x": 0, + "y": 32 + }, "options": { - "reduceOptions": { "calcs": ["lastNotNull"] }, + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, "orientation": "auto", "colorMode": "background", "graphMode": "none", @@ -438,13 +710,24 @@ "decimals": 2, "min": 0, "max": 1, - "color": { "mode": "thresholds" }, + "color": { + "mode": "thresholds" + }, "thresholds": { "mode": "absolute", "steps": [ - { "color": "red", "value": null }, - { "color": "yellow", "value": 0.8 }, - { "color": "green", "value": 1.0 } + { + "color": "red", + "value": null + }, + { + "color": "yellow", + "value": 0.8 + }, + { + "color": "green", + "value": 1.0 + } ] } } @@ -462,22 +745,49 @@ "type": "timeseries", "title": "Data Staleness (seconds)", "description": "Seconds since last market event. Green < 60s (FRESH), yellow < 120s (WARN), red >= 120s (STALE).", - "gridPos": { "h": 5, "w": 12, "x": 12, "y": 32 }, + "gridPos": { + "h": 5, + "w": 12, + "x": 12, + "y": 32 + }, "options": { - "tooltip": { "mode": "multi", "sort": "desc" }, - "legend": { "displayMode": "table", "placement": "bottom", "calcs": ["last", "max"] } + "tooltip": { + "mode": "multi", + "sort": "desc" + }, + "legend": { + "displayMode": "table", + "placement": "bottom", + "calcs": [ + "last", + "max" + ] + } }, "fieldConfig": { "defaults": { "unit": "s", "decimals": 0, - "custom": { "lineWidth": 2, "fillOpacity": 8 }, + "custom": { + "lineWidth": 2, + "fillOpacity": 8 + }, "thresholds": { "mode": "absolute", "steps": [ - { "color": "green", "value": null }, - { "color": "yellow", "value": 60 }, - { "color": "red", "value": 120 } + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 60 + }, + { + "color": "red", + "value": 120 + } ] } } @@ -489,6 +799,65 @@ "refId": "A" } ] + }, + { + "id": 18, + "type": "timeseries", + "title": "Success Rate by Endpoint", + "description": "Percentage of 2xx responses per endpoint. A drop below 100% means the data pipeline (Flink \u2192 dbt \u2192 mart tables) has a gap. Endpoints backed by the ReadModel return 404 when the mart is empty \u2014 this is a data quality signal, not a server error.", + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 12 + }, + "fieldConfig": { + "defaults": { + "unit": "percent", + "min": 0, + "max": 100, + "custom": { + "lineWidth": 2, + "fillOpacity": 10 + }, + "color": { + "mode": "palette-classic" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "yellow", + "value": 95 + }, + { + "color": "green", + "value": 99 + } + ] + } + } + }, + "options": { + "legend": { + "displayMode": "table", + "placement": "bottom", + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "sum(rate(api_requests_total{status_code=~\"2..\"}[2m])) by (endpoint) / sum(rate(api_requests_total[2m])) by (endpoint) * 100", + "legendFormat": "{{endpoint}}", + "refId": "A" + } + ] } ] } diff --git a/uv.lock b/uv.lock index 0b95a86..f17be34 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.12" resolution-markers = [ "python_full_version >= '3.15' and sys_platform == 'win32'", @@ -1804,6 +1804,7 @@ version = "0.1.0" source = { editable = "api" } dependencies = [ { name = "anyio" }, + { name = "cachetools" }, { name = "fastapi" }, { name = "prometheus-client" }, { name = "pydantic" }, @@ -1816,6 +1817,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "anyio", specifier = ">=4.0" }, + { name = "cachetools", specifier = ">=5.3" }, { name = "fastapi", specifier = ">=0.115.0" }, { name = "prometheus-client", specifier = ">=0.20.0" }, { name = "pydantic", specifier = ">=2.7" },