Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,11 @@ spark/tmp/

# Jupyter
.ipynb_checkpoints/

# Content / scripts (not part of the codebase)
docs/ARTICLES*.md
docs/BLOG_IDEAS.md
docs/SCRIPT*.md
docs/VIDEO_SCRIPT.md
*.script.md
content/
41 changes: 41 additions & 0 deletions .skills/production-python-service/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,47 @@ description: Use when implementing any Python service: FastAPI endpoints, Kafka
- `/health` and `/ready` on every service
- `/metrics` (Prometheus format) on every long-running service

## APIRouter structure (mandatory once project has >1 module)

```
src/<package>/
main.py # assembler only: middleware + include_router, no business logic
dependencies.py # shared Depends factories (get_client, get_db) — defined ONCE here
routers/
users.py # router = APIRouter(prefix="/users", tags=["users"])
todos.py
models/
users.py # Pydantic request/response models per domain
```

Rules:
- `main.py` only calls `app.include_router(...)` — no route definitions, no business logic
- `dependencies.py` is the single source of truth for all `Depends()` factories
- Each router imports `get_client` / `get_db` from `dependencies`, never defines its own
- Set `prefix` and `tags` on the `APIRouter`, not on `include_router` (keeps each file self-contained)
- Test overrides via `app.dependency_overrides[get_client] = lambda: MockClient()`

```python
# dependencies.py
from .config import settings
from .trino_client import TrinoClient

def get_client() -> TrinoClient:
return TrinoClient(host=settings.trino_host, port=settings.trino_port, ...)

# routers/ohlcv.py
from ..dependencies import get_client

router = APIRouter(prefix="/ohlcv", tags=["ohlcv"])

@router.get("/{symbol}", response_model=OHLCVResponse)
async def get_ohlcv(symbol: str, client: TrinoClient = Depends(get_client)) -> OHLCVResponse:
...

# main.py
app.include_router(ohlcv.router)
```

## Configuration pattern

```python
Expand Down
26 changes: 22 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.PHONY: up down restart logs lint format typecheck test coverage \
build flink-submit flink-logs flink-venv ingest ingest-logs \
dbt-compile dbt-run dbt-test replay-sample watch-cdc help
dbt-compile dbt-run dbt-test dbt-freshness replay-sample watch-cdc \
load-gen help

# ── Local stack ──────────────────────────────────────────────────────────────

Expand Down Expand Up @@ -49,14 +50,20 @@ coverage:

# ── dbt ───────────────────────────────────────────────────────────────────────

DBT = uv run --with dbt-trino dbt --no-send-anonymous-usage-stats
DBTOPTS = --project-dir dbt --profiles-dir dbt

dbt-compile:
uv run dbt compile --project-dir dbt
$(DBT) compile $(DBTOPTS)

dbt-run:
uv run dbt run --project-dir dbt
$(DBT) run $(DBTOPTS)

dbt-test:
uv run dbt test --project-dir dbt
$(DBT) test $(DBTOPTS)

dbt-freshness:
$(DBT) source freshness $(DBTOPTS)

# ── Utilities ─────────────────────────────────────────────────────────────────

Expand All @@ -66,6 +73,17 @@ replay-sample:
build:
docker compose build --no-cache

# ── Load testing ──────────────────────────────────────────────────────────────

load-gen:
docker run --rm \
--network ticksense_default \
-v $(CURDIR)/k6:/scripts:ro \
-e K6_PROMETHEUS_RW_SERVER_URL=http://prometheus:9090/api/v1/write \
-e 'K6_PROMETHEUS_RW_TREND_STATS=p(50),p(95),p(99),max' \
grafana/k6:latest \
run --out experimental-prometheus-rw /scripts/script.js

# ── Ingest ────────────────────────────────────────────────────────────────────

ingest:
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ Once healthy, services are available at:
| Flink UI | http://localhost:8081 |
| Trino UI | http://localhost:8082 |
| Iceberg REST | http://localhost:8181/v1/config |
| **TickSense API** | **http://localhost:8000** (docs: `/docs`) |
| **Prometheus** | **http://localhost:9090** |
| **Grafana** | **http://localhost:3000** (admin / admin) |

---

Expand Down Expand Up @@ -353,6 +356,6 @@ See [docs/ROADMAP.md](docs/ROADMAP.md) for the full phase plan.
| 1 — Local Stack + Ingest | ✅ | docker-compose, Binance WS → Kafka (57 tests, 96% cov) |
| 2 — Flink Processing | ✅ | Normalize, dedup, OHLCV windows → Iceberg silver (e2e verified 2026-05-15) |
| 3 — CDC + Replay | ✅ | Debezium CDC, Flink upsert job, replay CLI (e2e verified 2026-05-16) |
| 4 — Analytics Layer | | dbt models, FastAPI |
| 4 — Analytics Layer | | dbt + Trino, FastAPI, Prometheus, Grafana (e2e verified 2026-05-16) |
| 5 — Ops + Observability | — | Airflow, Great Expectations, SLA alerts |
| 6 — Demo + Blog + Web | — | ticksense.ai, demo video, blog posts |
17 changes: 17 additions & 0 deletions api/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM python:3.13-slim

RUN apt-get update && apt-get install -y --no-install-recommends curl \
&& rm -rf /var/lib/apt/lists/*

RUN pip install uv --no-cache-dir

WORKDIR /app

COPY pyproject.toml uv.lock ./
COPY api/ ./api/

RUN uv sync --package ticksense-api --no-dev

EXPOSE 8000
CMD ["uv", "run", "--package", "ticksense-api", \
"uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000"]
8 changes: 8 additions & 0 deletions api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@ dependencies = [
"structlog>=24.2",
"trino>=0.328.0",
"prometheus-client>=0.20.0",
"anyio>=4.0",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/api"]
14 changes: 14 additions & 0 deletions api/src/api/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
trino_host: str = "localhost"
trino_port: int = 8082
trino_catalog: str = "iceberg"
trino_user: str = "api"
log_level: str = "INFO"

model_config = SettingsConfigDict(env_file=".env", frozen=True, extra="ignore")


settings = Settings()
11 changes: 11 additions & 0 deletions api/src/api/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .config import settings
from .trino_client import TrinoClient


def get_client() -> TrinoClient:
return TrinoClient(
host=settings.trino_host,
port=settings.trino_port,
user=settings.trino_user,
catalog=settings.trino_catalog,
)
60 changes: 60 additions & 0 deletions api/src/api/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import asyncio
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

import structlog
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from starlette.requests import Request

from .config import settings
from .dependencies import get_client
from .metrics import PrometheusMiddleware, metrics_endpoint
from .poller import run_poller
from .routers import liquidity, ohlcv, pipeline, symbols

structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(settings.log_level))
log = structlog.get_logger()


@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]:
task = asyncio.create_task(run_poller(get_client()))
try:
yield
finally:
task.cancel()


app = FastAPI(
title="TickSense API",
description="Real-time crypto market analytics over Iceberg + Trino.",
version="0.1.0",
lifespan=lifespan,
)

app.add_middleware(PrometheusMiddleware)

app.include_router(ohlcv.router)
app.include_router(liquidity.router)
app.include_router(pipeline.router)
app.include_router(symbols.router)
app.add_route("/metrics", metrics_endpoint)


@app.get("/health", tags=["ops"])
async def health() -> JSONResponse:
return JSONResponse({"status": "ok"})


@app.get("/ready", tags=["ops"])
async def ready() -> JSONResponse:
if await get_client().ping():
return JSONResponse({"status": "ready"})
return JSONResponse({"status": "unavailable"}, status_code=503)


@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse:
log.error("unhandled_exception", path=request.url.path, error=str(exc))
return JSONResponse({"detail": "Internal server error"}, status_code=500)
31 changes: 31 additions & 0 deletions api/src/api/market_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from prometheus_client import Gauge

MID_PRICE = Gauge(
"market_mid_price_usd",
"Current mid price in USD",
["symbol", "exchange"],
)

SPREAD_BPS = Gauge(
"market_spread_bps",
"Bid-ask spread in basis points",
["symbol", "exchange"],
)

IMBALANCE = Gauge(
"market_bid_ask_imbalance",
"Order book imbalance (-1 = full sell pressure, +1 = full buy pressure)",
["symbol", "exchange"],
)

STALENESS = Gauge(
"market_staleness_seconds",
"Seconds since last market event was ingested",
["symbol", "exchange"],
)

HEALTH_SCORE = Gauge(
"pipeline_health_score",
"Pipeline health score (0 = dead, 1 = healthy)",
["symbol", "exchange"],
)
62 changes: 62 additions & 0 deletions api/src/api/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import time
from collections.abc import Awaitable, Callable

from prometheus_client import CONTENT_TYPE_LATEST, Counter, Histogram, generate_latest
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response

REQUEST_COUNT: Counter = Counter(
"api_requests_total",
"Total HTTP requests",
["method", "endpoint", "status_code"],
)

REQUEST_DURATION: Histogram = Histogram(
"api_request_duration_seconds",
"HTTP request duration in seconds",
["method", "endpoint"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
)

_STATIC_SEGMENTS = {
"ohlcv",
"spread",
"liquidity",
"symbols",
"pipeline",
"lag",
"health",
"ready",
"metrics",
}


def _normalize_path(path: str) -> str:
parts = path.strip("/").split("/")
return "/" + "/".join(p if p in _STATIC_SEGMENTS else "{param}" for p in parts)


class PrometheusMiddleware(BaseHTTPMiddleware):
async def dispatch(
self,
request: Request,
call_next: Callable[[Request], Awaitable[Response]],
) -> Response:
endpoint = _normalize_path(request.url.path)
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start

REQUEST_COUNT.labels(
method=request.method,
endpoint=endpoint,
status_code=str(response.status_code),
).inc()
REQUEST_DURATION.labels(method=request.method, endpoint=endpoint).observe(duration)
response.headers["X-Response-Time-Ms"] = f"{duration * 1000:.1f}"
return response


async def metrics_endpoint(_: Request) -> Response:
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
Empty file added api/src/api/models/__init__.py
Empty file.
31 changes: 31 additions & 0 deletions api/src/api/models/liquidity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from datetime import datetime

from pydantic import BaseModel


class SpreadResponse(BaseModel):
exchange: str
symbol: str
latest_ts: datetime
best_bid_price: float
best_ask_price: float
spread: float
mid_price: float
spread_bps: float


class LiquidityResponse(BaseModel):
exchange: str
symbol: str
latest_ts: datetime
best_bid_price: float
best_ask_price: float
spread: float
mid_price: float
spread_bps: float
best_bid_qty: float
best_ask_qty: float
imbalance: float
market_signal: str
staleness_seconds: int
freshness_status: str
Loading
Loading