Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 172 additions & 2 deletions .skills/production-python-service/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ description: Use when implementing any Python service: FastAPI endpoints, Kafka
src/<package>/
main.py # assembler only: middleware + include_router, no business logic
dependencies.py # shared Depends factories (get_client, get_db) — defined ONCE here
read_model.py # ReadModel dataclass + singleton (see OLAP section below)
poller.py # background task: single writer of ReadModel
routers/
users.py # router = APIRouter(prefix="/users", tags=["users"])
todos.py
Expand All @@ -38,19 +40,23 @@ src/<package>/
Rules:
- `main.py` only calls `app.include_router(...)` — no route definitions, no business logic
- `dependencies.py` is the single source of truth for all `Depends()` factories
- Client factories use `@lru_cache(maxsize=1)` so the same instance (and its connection pool) is reused across requests
- Each router imports `get_client` / `get_db` from `dependencies`, never defines its own
- Set `prefix` and `tags` on the `APIRouter`, not on `include_router` (keeps each file self-contained)
- Test overrides via `app.dependency_overrides[get_client] = lambda: MockClient()`
- For hot-path endpoints backed by ReadModel: test by populating the model; do NOT use `app.dependency_overrides`
- For cold-path (Trino) endpoints: test overrides via `app.dependency_overrides[get_client] = lambda: MockClient()`

```python
# dependencies.py
from functools import lru_cache
from .config import settings
from .trino_client import TrinoClient

@lru_cache(maxsize=1)
def get_client() -> TrinoClient:
return TrinoClient(host=settings.trino_host, port=settings.trino_port, ...)

# routers/ohlcv.py
# routers/ohlcv.py — cold path still uses Depends; hot path reads from ReadModel
from ..dependencies import get_client

router = APIRouter(prefix="/ohlcv", tags=["ohlcv"])
Expand All @@ -63,6 +69,165 @@ async def get_ohlcv(symbol: str, client: TrinoClient = Depends(get_client)) -> O
app.include_router(ohlcv.router)
```

---

## OLAP-backed API — ReadModel pattern

When the query layer sits on an OLAP engine (Trino, BigQuery, Spark SQL), every request
pays a minimum 50-100 ms query-planning floor regardless of query complexity.
The fix is a **poller-as-read-model**: one background task owns all writes to an in-memory
dict; request handlers only read from it.

### Decision tree for new endpoints

| Data type | Pattern | Latency |
|---|---|---|
| Current-state snapshot ("what is X right now?") | ReadModel — pre-loaded by poller | < 1 ms |
| Historical / arbitrary-parameter query | Trino cold path + `TTLCache` | 50–200 ms |
| Derivable from existing model data | Compute in router, no DB call | < 1 ms |

### File: `read_model.py`

```python
from dataclasses import dataclass, field

@dataclass
class ReadModel:
# Always replace the whole dict (atomic under CPython GIL).
# Never mutate an existing dict in place.
spread: dict[str, SpreadResponse] = field(default_factory=dict)
ohlcv: dict[str, OHLCVResponse] = field(default_factory=dict)
pipeline: PipelineLagResponse | None = None
ready: bool = False # False until first poll cycle completes → endpoints return 503

_model = ReadModel()

def get_read_model() -> ReadModel:
return _model
```

### File: `poller.py` — single writer

```python
async def _poll_something(client: TrinoClient) -> None:
try:
rows = await client.fetch(_SQL)
# Build the new dict first, then swap atomically.
_model.something = {str(r["key"]): Model.model_validate(r) for r in rows}
log.info("poll_ok", count=len(rows))
except Exception as exc:
log.warning("poll_failed", error=str(exc)) # stale value stays; no crash

async def run_poller(client: TrinoClient) -> None:
# Warm up before serving requests.
await asyncio.gather(_poll_something(client), ..., return_exceptions=True)
_model.ready = True

last_slow = time.monotonic()
while True:
await asyncio.sleep(30)
t = time.monotonic()
tasks: list[Coroutine[Any, Any, None]] = [_poll_something(client)]
if t - last_slow >= 300: # slow data refreshed less often
tasks.append(_poll_slow(client))
last_slow = t
await asyncio.gather(*tasks, return_exceptions=True)
```

### Hot-path router (reads model, zero Trino)

```python
@router.get("/spread/{symbol}", response_model=SpreadResponse)
async def get_spread(symbol: str) -> SpreadResponse:
model = get_read_model()
if not model.ready:
raise HTTPException(status_code=503, detail="Service warming up")
result = model.spread.get(symbol.upper())
if result is None:
raise HTTPException(status_code=404, detail=f"No data for {symbol}")
return result
```

### Cold-path router (Trino + TTLCache for repeated queries)

```python
from cachetools import TTLCache
from threading import Lock

_cache: TTLCache[tuple, HistoryResponse] = TTLCache(maxsize=500, ttl=60)
_lock = Lock()

@router.get("/history/{symbol}", response_model=HistoryResponse)
async def get_history(
symbol: str, from_ts: datetime, to_ts: datetime,
client: TrinoClient = Depends(get_client),
) -> HistoryResponse:
key = (symbol.upper(), from_ts, to_ts)
with _lock:
hit: HistoryResponse | None = _cache.get(key)
if hit:
return hit
rows = await client.fetch(_SQL, [symbol, from_ts, to_ts])
if not rows:
raise HTTPException(404)
result = HistoryResponse(...)
with _lock:
_cache[key] = result
return result
```

### Test pattern

```python
@pytest.fixture(autouse=True)
def setup_read_model():
model = get_read_model()
model.ready = True
model.spread["BTCUSDT"] = SpreadResponse(...)
yield
model.ready = False
model.spread.clear()
_cold_cache.clear() # clear any TTLCache used by cold-path tests

# Hot-path test: populate model, no Trino mock needed.
async def test_get_spread(http_client):
async with http_client as c:
resp = await c.get("/spread/btcusdt")
assert resp.status_code == 200

# Cold-path test: override the client, use params= for datetime URL encoding.
async def test_get_history(http_client):
app.dependency_overrides[get_client] = lambda: _mock_client([_ROW])
async with http_client as c:
resp = await c.get("/history/btcusdt", params={"from_ts": NOW.isoformat()})
app.dependency_overrides.clear()
assert resp.status_code == 200
```

### Thread-safety rules

| Object | Safe? | Rule |
|---|---|---|
| `ReadModel` dict swap (`_model.x = new_dict`) | ✅ atomic under GIL | Never mutate in place |
| `ReadModel` field read (`.get(sym)`) | ✅ atomic under GIL | No lock needed |
| `cachetools.TTLCache` | ❌ not thread-safe | Always wrap with `threading.Lock` |
| `requests.Session` (Trino HTTP pool) | ✅ thread-safe | Share one instance via `@lru_cache` |

### Extending the model — checklist

Adding a new endpoint that needs fresh data:

1. Add a field to `ReadModel` in `read_model.py`
2. Write `_poll_<name>(client)` in `poller.py`
3. Register it in the initial `asyncio.gather` and the interval loop
4. Router reads `get_read_model().<field>.get(sym)` — no `Depends(get_client)` needed
5. Test: populate the field in `setup_read_model` fixture; no Trino mock required

Adding a new endpoint derivable from existing model data (no new poll needed):

- Just write the router. Read from existing model fields. Compute in Python.
- Example: `GET /top-movers` computes price change from `model.ohlcv[sym].bars`.

## Configuration pattern

```python
Expand Down Expand Up @@ -92,3 +257,8 @@ log.info("event_produced", topic=topic, key=key, offset=offset)
- `os.environ["KEY"]` instead of pydantic-settings
- Catching bare `Exception` without re-raising or logging with context
- Mutable default arguments or module-level mutable state
- Querying an OLAP engine on every request — always check the decision tree first
- Creating a new DB connection inside the request handler — use `@lru_cache` on the factory and share `requests.Session` across queries
- Mutating a `ReadModel` dict in place — always build a new dict and swap the reference atomically
- Using `TTLCache` without a `threading.Lock` — it is not thread-safe under concurrent async workers
- `datetime.isoformat()` in a URL query string without URL-encoding — the `+` in `+00:00` is parsed as a space; use `params={"key": value}` in httpx/requests so encoding is handled automatically
127 changes: 126 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.PHONY: up down restart logs lint format typecheck test coverage \
build flink-submit flink-logs flink-venv ingest ingest-logs \
dbt-compile dbt-run dbt-test dbt-freshness replay-sample watch-cdc \
load-gen help
load-gen load-test-full help

# ── Local stack ──────────────────────────────────────────────────────────────

Expand Down Expand Up @@ -75,6 +75,8 @@ build:

# ── Load testing ──────────────────────────────────────────────────────────────

# make load-gen: runs k6 only — no prerequisite checks.
# Use `make load-test-full` for a fully-orchestrated test with all checks.
load-gen:
docker run --rm \
--network ticksense_default \
Expand All @@ -84,6 +86,129 @@ load-gen:
grafana/k6:latest \
run --out experimental-prometheus-rw /scripts/script.js

# make load-test-full: end-to-end load test with full prerequisite verification.
#
# WHY this exists — `make load-gen` alone always silently produces 100% 404s
# unless the full data pipeline is ready beforehand. The prerequisites are:
#
# 1. Flink jobs RUNNING — Flink writes to normalized.* and raw.*.
# flink-init only *submits* the jobs; they need a few seconds to reach
# RUNNING state before any messages are processed.
#
# 2. ingest producing to Kafka — normalized.book_ticker stays empty until
# the WebSocket ingest is active. ingest runs outside Docker because
# apache-flink cannot be installed in the same uv workspace as the host
# Python 3.13 environment. This target auto-starts it in the background
# if it is not already running, logging to /tmp/ticksense-ingest.log.
#
# 3. Flink first checkpoint complete (~60 s) — Iceberg files are written
# on checkpoint, not on every message. Nothing appears in normalized.*
# until the first checkpoint commits.
#
# 4. dbt run AFTER Flink has data — the Docker dbt-runner executes once at
# stack startup, before Flink has written anything. mart_liquidity and
# mart_ohlcv are empty until dbt is re-run against live Flink output.
#
# 5. API poller warm — the API background poller must complete at least one
# full cycle after dbt runs before the ReadModel has data. The poller
# starts immediately on API startup and refreshes every 30 s; after
# `dbt run` populates the marts the next poll cycle picks it up.
#
load-test-full:
@echo ""
@echo "╔═══════════════════════════════════════════╗"
@echo "║ TickSense — Full Load Test Pipeline ║"
@echo "╚═══════════════════════════════════════════╝"
@echo ""
@echo "── Step 1/6 Flink jobs RUNNING ────────────"
@echo " (normalize + ohlcv_1m + CDC must all be in RUNNING state)"
@_flink_running() { \
curl -sf http://localhost:8081/jobs/overview 2>/dev/null \
| python3 -c 'import sys,json; d=json.load(sys.stdin); print(sum(1 for j in d["jobs"] if j["state"]=="RUNNING"))' \
2>/dev/null || echo 0; \
}; \
_ts=$$(date +%s); \
until [ "$$(_flink_running)" -ge 2 ]; do \
if [ $$(( $$(date +%s) - $$_ts )) -ge 180 ]; then \
echo " ERROR: Flink jobs did not reach RUNNING within 3 min"; \
echo " Debug: docker compose logs flink-jobmanager flink-init"; \
exit 1; \
fi; \
echo " waiting for Flink jobs RUNNING (currently $$(_flink_running))..."; \
sleep 5; \
done
@echo " ✓ Flink jobs RUNNING"
@echo ""
@echo "── Step 2/6 Verify ingest ──────────────────"
@echo " (ingest runs as a Docker service; a local process is also supported)"
@if docker compose ps ingest 2>/dev/null | grep -q "Up"; then \
echo " Docker ingest service is running — OK"; \
elif pgrep -f "ingest.main" > /dev/null 2>&1; then \
echo " host ingest already running (PID $$(pgrep -f 'ingest.main'))"; \
else \
echo " Starting ingest in background → logs at /tmp/ticksense-ingest.log"; \
uv run python -m ingest.main >> /tmp/ticksense-ingest.log 2>&1 & \
echo " ingest started (PID $$!)"; \
fi
@echo ""
@echo "── Step 3/6 Wait for normalized data ──────"
@echo " (book_ticker: Flink first checkpoint ~60 s)"
@echo " (ohlcv_1m: 1-minute window must close + checkpoint ~120 s)"
@_ts=$$(date +%s); \
until docker compose exec -T trino trino \
--execute "SELECT count(*) FROM iceberg.normalized.book_ticker" \
2>/dev/null | grep -qE "[1-9]"; do \
if [ $$(( $$(date +%s) - $$_ts )) -ge 180 ]; then \
echo " ERROR: normalized.book_ticker still empty after 3 min — is ingest running?"; \
exit 1; \
fi; \
echo " normalized.book_ticker still empty — retry in 15 s..."; \
sleep 15; \
done
@echo " ✓ normalized.book_ticker has rows"
@_ts=$$(date +%s); \
until docker compose exec -T trino trino \
--execute "SELECT count(*) FROM iceberg.normalized.ohlcv_1m WHERE window_start >= NOW() - INTERVAL '2' HOUR" \
2>/dev/null | grep -qE "[1-9]"; do \
if [ $$(( $$(date +%s) - $$_ts )) -ge 300 ]; then \
echo " ERROR: no recent OHLCV bars after 5 min — check: docker compose logs flink-jobmanager"; \
exit 1; \
fi; \
echo " normalized.ohlcv_1m has no recent data (waiting for 1-min window to close) — retry in 15 s..."; \
sleep 15; \
done
@echo " ✓ normalized.ohlcv_1m has recent rows"
@echo ""
@echo "── Step 4/6 Run dbt ───────────────────────"
@echo " (WHY: the Docker dbt-runner ran once at startup before Flink had data;"
@echo " mart_liquidity / mart_ohlcv are empty until we re-run dbt now)"
$(DBT) run $(DBTOPTS)
@echo " ✓ dbt run complete"
@echo ""
@echo "── Step 5/6 Verify marts + API ohlcv ready ─"
@docker compose exec -T trino trino \
--execute "SELECT count(*) FROM iceberg.marts.mart_liquidity" \
2>/dev/null | grep -qE "[1-9]" || \
(echo " ERROR: mart_liquidity still empty after dbt run — check dbt logs" && exit 1)
@echo " ✓ mart_liquidity has rows"
@docker compose exec -T trino trino \
--execute "SELECT count(*) FROM iceberg.marts.mart_ohlcv" \
2>/dev/null | grep -qE "[1-9]" || \
(echo " ERROR: mart_ohlcv still empty after dbt run — check dbt logs" && exit 1)
@echo " ✓ mart_ohlcv has rows"
@echo " Waiting for /ready (ReadModel ohlcv populated, poller refreshes every 60 s)..."
@until curl -sf http://localhost:8000/ready 2>/dev/null; do \
echo " API not ready — retry in 10 s..."; \
sleep 10; \
done
@echo " ✓ API ready (ReadModel populated)"
@echo ""
@echo "── Step 6/6 k6 load test ──────────────────"
@echo " (k6 connects to api:8000 on ticksense_default Docker network)"
@echo " (ingest keeps running after test — kill with: pkill -f ingest.main)"
@echo ""
$(MAKE) load-gen

# ── Ingest ────────────────────────────────────────────────────────────────────

ingest:
Expand Down
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,12 @@ ingest/ Binance WebSocket ingestion → Kafka
client.py Async WS client, reconnect, backoff
main.py Entry point: asyncio.gather over symbols

api/ FastAPI query layer over Trino
api/ FastAPI query layer over Trino + Iceberg
src/api/
read_model.py in-process ReadModel (hot-path data, no Trino on reads)
poller.py background task: refreshes ReadModel every 30–60 s
routers/ hot-path endpoints read model; cold-path (history) hits Trino
models/ Pydantic response models per domain
replay/ Kafka offset replay producer
flink/ PyFlink streaming jobs (Phase 2)
airflow/ Orchestration DAGs (Phase 5)
Expand Down Expand Up @@ -357,5 +362,5 @@ See [docs/ROADMAP.md](docs/ROADMAP.md) for the full phase plan.
| 2 — Flink Processing | ✅ | Normalize, dedup, OHLCV windows → Iceberg silver (e2e verified 2026-05-15) |
| 3 — CDC + Replay | ✅ | Debezium CDC, Flink upsert job, replay CLI (e2e verified 2026-05-16) |
| 4 — Analytics Layer | ✅ | dbt + Trino, FastAPI, Prometheus, Grafana (e2e verified 2026-05-16) |
| 5 — Ops + Observability | | Airflow, Great Expectations, SLA alerts |
| 5 — Ops + Observability | | Airflow, Great Expectations, SLA alerts (load test: p(95)=10ms, 0% errors — 2026-05-18; Airflow/Spark pending E2E) |
| 6 — Demo + Blog + Web | — | ticksense.ai, demo video, blog posts |
1 change: 1 addition & 0 deletions api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies = [
"trino>=0.328.0",
"prometheus-client>=0.20.0",
"anyio>=4.0",
"cachetools>=5.3",
]

[build-system]
Expand Down
3 changes: 3 additions & 0 deletions api/src/api/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from functools import lru_cache

from .config import settings
from .trino_client import TrinoClient


@lru_cache(maxsize=1)
def get_client() -> TrinoClient:
return TrinoClient(
host=settings.trino_host,
Expand Down
Loading
Loading