Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,4 @@ credentials/
# Agents
.agents/
.claude/
AGENTS.md
CLAUDE.md
AGENTS.md
136 changes: 136 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# CLAUDE.md — btick

## Project Overview

btick is a **Bitcoin price oracle service** for prediction market settlement. It aggregates real-time BTC/USD price data from four exchanges (Binance, Coinbase, Kraken, OKX) and produces a canonical, manipulation-resistant median price with sub-second precision.

**Tech stack:** Go 1.23, PostgreSQL 17+ with TimescaleDB 2.24+, gorilla/websocket, pgx/v5, shopspring/decimal.

Comment on lines +1 to +8
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description/title indicates this is a docs-only change (adding CLAUDE.md and un-ignoring it), but the diff includes substantial functional changes (multi-symbol pipelines, new Engine interface, WebSocket connection limiting/eviction, new metrics). Please update the PR title/description to reflect the actual scope so reviewers can assess risk appropriately.

Copilot uses AI. Check for mistakes.
## Quick Reference Commands

```bash
# Build & run
go build -o btick ./cmd/btick
go run ./cmd/btick
go run ./cmd/btick -config custom.yaml

# Tests
go test ./... # All tests
go test -v ./internal/engine/ # Verbose, single package
go test -run TestFoo ./internal/pkg/ # Single test

# Docker
docker build -t btick .
docker run -e DATABASE_URL=... -p 8080:8080 btick
```

## Project Structure

```
cmd/btick/main.go # Entry point, goroutine orchestration, graceful shutdown
internal/
adapter/ # Exchange WebSocket connectors (Binance, Coinbase, Kraken, OKX)
base.go # BaseAdapter: connection lifecycle, reconnect with backoff
api/ # REST + WebSocket API server
handlers.go # REST endpoints: /v1/price/latest, /v1/snapshots, /v1/ticks, /v1/health
websocket.go # WebSocket broadcast at /ws/price
config/ # YAML config loading with env var expansion
domain/ # Immutable domain types (RawEvent, CanonicalTick, Snapshot1s, etc.)
engine/ # Core pricing logic: median calculation, outlier rejection, snapshots
snapshot.go # SnapshotEngine: 1s snapshots, carry-forward, canonical tick generation
multi.go # MultiEngine: aggregates per-symbol engines for the API layer
persistence.go # Tick/snapshot DB persistence
normalizer/ # Event deduplication (sync.Map sharded by source), UUID v7 assignment
storage/ # PostgreSQL/TimescaleDB layer (pgx, no ORM)
postgres.go # Pool creation, migrations
queries.go # Read queries
writer.go # Batch writes (raw events, ticks, snapshots)
pruner.go # Data retention cleanup
metrics/ # In-memory Prometheus-compatible metrics at /metrics
migrations/
001_init.sql # Idempotent schema: raw_ticks, canonical_ticks, snapshots_1s hypertables
docs/ # API.md, ARCHITECTURE.md, DATABASE.md, DEPLOYMENT.md, OPTIMIZATION.md, openapi.yaml
```

## Architecture & Data Flow

```
Per symbol (BTC/USD, ETH/USD, ...):
Exchanges → Adapters[sym] → Normalizer[sym] → Fan-out[sym] → Engine[sym]
↓ ↓
shared Writer MultiEngine → API/WebSocket
PostgreSQL (TimescaleDB)
```

- **Multi-symbol**: each symbol gets an independent pipeline (adapters, normalizer, engine).
- Config supports both legacy single-symbol (`canonical_symbol` + `sources`) and new multi-symbol (`symbols[]`) format.
- Each adapter runs in its own goroutine with auto-reconnect and exponential backoff.
- Normalizer deduplicates by `(source, trade_id)` and assigns UUID v7 IDs. It stamps the configured canonical symbol on all events.
- Fan-out uses **non-blocking channel sends** — full channels drop events rather than block.
- Engine produces 1-second snapshots with median price from healthy sources.
- Canonical ticks are emitted only on price changes.
- `MultiEngine` merges snapshot/tick channels from all per-symbol engines and routes `LatestState(symbol)` to the correct engine.
- REST endpoints accept `?symbol=` query param (defaults to first configured symbol).
- WebSocket messages include a `symbol` field.

## Key Conventions

### Code Style
- **Standard Go conventions**: `gofmt`, `go vet`, idiomatic naming (CamelCase types, camelCase unexported).
- **Short variable names**: `evt`, `cfg`, `db`, `mu`, `wg`, `ctx`.
- **Packages**: lowercase, single word (`adapter`, `engine`, `domain`).
- **Interfaces**: named with `-er` suffix or explicit nouns (`Store`, `MessageHandler`); defined on consumer side.

### Error Handling
- Every error is explicitly checked: `if err != nil { ... }`.
- Errors are wrapped with context: `fmt.Errorf("context: %w", err)`.
- Structured logging on errors via `slog` with relevant fields.

### Logging
- Framework: `log/slog` with structured JSON output in production.
- Levels: Debug, Info, Warn, Error.
- Add context with `.With()`: component, source, error details.

### Testing
- Go standard `testing` package only — no external test frameworks.
- Table-driven tests are preferred.
- Mocks defined locally in test files (e.g., `mockStore`).
- Test helpers use `slog.NewTextHandler` at ErrorLevel to suppress noise.
- Use `httptest` for HTTP handler tests.
- ~188 test functions across all packages.

### Database
- **Direct pgx** — no ORM.
- **Split connection pools**: ingest pool (writes, 12 conns) and query pool (reads, 8 conns).
- **Idempotent migrations**: `IF NOT EXISTS`, `DO $$ ... EXCEPTION` blocks.
- **TimescaleDB hypertables** with automatic compression policies.
- **shopspring/decimal** for all price arithmetic — never use `float64` for money.

### Concurrency
- `safeGo()` wrapper for goroutines: panic recovery + graceful shutdown.
- Channels are non-blocking (drop on full, never block producers).
- Drop counters logged every 1000 events.
- Context cancellation propagates shutdown through all goroutines.

## Configuration

Config loaded from `config.yaml` (see `config.yaml.example`). Environment variable expansion supported via `${VAR}` syntax. `DATABASE_URL` env var overrides config DSN.

Key sections: `server`, `database`, `sources` (per-exchange), `pricing` (median mode, outlier %, freshness), `storage` (retention, batching), `health`.
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration summary lists sources as a top-level key section, but the example config and code now prefer symbols (with per-symbol sources). Update this line to mention symbols (and optionally note the legacy canonical_symbol + sources fallback) to avoid misleading readers.

Suggested change
Key sections: `server`, `database`, `sources` (per-exchange), `pricing` (median mode, outlier %, freshness), `storage` (retention, batching), `health`.
Key sections: `server`, `database`, `symbols` (per-symbol `sources`; legacy `canonical_symbol` + top-level `sources` still supported), `pricing` (median mode, outlier %, freshness), `storage` (retention, batching), `health`.

Copilot uses AI. Check for mistakes.

## CI/CD

- **GitHub Actions** (`.github/workflows/build-timescaledb.yml`): builds custom TimescaleDB Docker image to GHCR.
- **No automated test CI** — run `go test ./...` locally before pushing.
- **Deployment**: Docker or Railway.

## Dependencies (direct)

| Module | Purpose |
|--------|---------|
| `github.com/google/uuid` | UUID v7 generation |
| `github.com/gorilla/websocket` | WebSocket client/server |
| `github.com/jackc/pgx/v5` | PostgreSQL driver |
| `github.com/shopspring/decimal` | Exact decimal arithmetic |
| `gopkg.in/yaml.v3` | YAML config parsing |
199 changes: 109 additions & 90 deletions cmd/btick/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ func main() {
os.Exit(1)
}

if len(cfg.Symbols) == 0 {
logger.Error("no symbols configured")
os.Exit(1)
}

// Context with signal handling
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -100,113 +105,126 @@ func main() {
apiDB = ingestDB
}

// Channels
// Adapters -> Normalizer
rawCh := make(chan domain.RawEvent, 10000)
// Normalizer -> (Writer + SnapshotEngine)
normalizedCh := make(chan domain.RawEvent, 10000)
// Fan-out from normalizedCh to writer and engine
writerCh := make(chan domain.RawEvent, 10000)
engineCh := make(chan domain.RawEvent, 10000)

var wg sync.WaitGroup

// Start feed adapters
for _, src := range cfg.Sources {
if !src.Enabled {
logger.Info("source disabled, skipping", "source", src.Name)
continue
}
// Shared writer channel — all symbols' raw events go to the same DB.
writerCh := make(chan domain.RawEvent, 10000)

s := src
safeGo(&wg, cancel, logger, "adapter-"+s.Name, func() {
startAdapter(ctx, s, rawCh, logger)
// Raw event writer (shared across symbols)
if ingestDB != nil {
writer := storage.NewWriter(ingestDB,
cfg.Storage.BatchInsertMaxRows,
cfg.Storage.BatchInsertMaxDelay(),
logger,
)
safeGo(&wg, cancel, logger, "writer", func() {
writer.Run(ctx, writerCh)
})
}

// Normalizer
norm := normalizer.New(rawCh, normalizedCh, logger)
safeGo(&wg, cancel, logger, "normalizer", func() {
norm.Run(ctx)
})
// Build per-symbol pipelines.
engines := make(map[string]*engine.SnapshotEngine, len(cfg.Symbols))
symbols := make([]string, 0, len(cfg.Symbols))

// Fan-out: normalizedCh -> writerCh + engineCh
wg.Add(1)
go func() {
defer wg.Done()
defer close(writerCh)
defer close(engineCh)
var writerDrops, engineDrops int64
for {
select {
case <-ctx.Done():
if writerDrops > 0 || engineDrops > 0 {
logger.Warn("fan-out final drop counts",
"writer_drops", writerDrops,
"engine_drops", engineDrops,
)
}
return
case evt, ok := <-normalizedCh:
if !ok {
return
}
for _, sym := range cfg.Symbols {
canonical := sym.Canonical
symbols = append(symbols, canonical)
symLogger := logger.With("symbol", canonical)

Comment on lines +129 to +133
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

canonical := sym.Canonical is used without validation. If a symbol entry omits canonical (empty string), this will create an engine keyed by "" and produce events/WS messages with an empty symbol. Consider validating each symbol config (non-empty canonical, and ideally uniqueness) and failing fast with a clear error before starting goroutines.

Copilot uses AI. Check for mistakes.
// Channels for this symbol's pipeline.
rawCh := make(chan domain.RawEvent, 10000)
normalizedCh := make(chan domain.RawEvent, 10000)
engineCh := make(chan domain.RawEvent, 10000)

// Collect source names for dedup shard pre-init.
sourceNames := make([]string, 0, len(sym.Sources))
for _, src := range sym.Sources {
sourceNames = append(sourceNames, src.Name)
}

// Start feed adapters for this symbol.
for _, src := range sym.Sources {
if !src.Enabled {
symLogger.Info("source disabled, skipping", "source", src.Name)
continue
}
s := src
safeGo(&wg, cancel, logger, "adapter-"+canonical+"-"+s.Name, func() {
startAdapter(ctx, s, rawCh, symLogger)
})
}
Comment on lines +129 to +155
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The per-symbol pipeline loop reuses variables declared with := inside the loop body (e.g., rawCh, normalizedCh, engineCh, symLogger). The goroutines started in this loop capture those variables, so subsequent iterations overwrite them and can cause adapters/normalizers/fan-out to publish into the wrong symbol’s channels/logger. Create a per-iteration scope (e.g., wrap the pipeline construction in a helper function or an inner { ... } block) and/or pass the channels/logger into goroutines as parameters so each symbol’s goroutines retain the correct values.

Copilot uses AI. Check for mistakes.

// Normalizer for this symbol.
norm := normalizer.New(rawCh, normalizedCh, canonical, sourceNames, symLogger)
safeGo(&wg, cancel, logger, "normalizer-"+canonical, func() {
norm.Run(ctx)
})

// Fan-out: normalizedCh → shared writerCh + per-symbol engineCh.
wg.Add(1)
go func(sym string) {
defer wg.Done()
defer close(engineCh)
var writerDrops, engineDrops int64
for {
select {
case writerCh <- evt:
default:
writerDrops++
metrics.IncChannelDrop("writer")
if writerDrops%1000 == 1 {
logger.Warn("writer channel full, dropping event",
"source", evt.Source,
"total_drops", writerDrops,
case <-ctx.Done():
if writerDrops > 0 || engineDrops > 0 {
symLogger.Warn("fan-out final drop counts",
"writer_drops", writerDrops,
"engine_drops", engineDrops,
)
}
}
select {
case engineCh <- evt:
default:
engineDrops++
metrics.IncChannelDrop("engine")
if engineDrops%1000 == 1 {
logger.Warn("engine channel full, dropping event",
"source", evt.Source,
"total_drops", engineDrops,
)
return
case evt, ok := <-normalizedCh:
if !ok {
return
}
select {
case writerCh <- evt:
default:
writerDrops++
metrics.IncChannelDrop("writer")
if writerDrops%1000 == 1 {
symLogger.Warn("writer channel full, dropping event",
"source", evt.Source,
"total_drops", writerDrops,
)
}
}
select {
case engineCh <- evt:
default:
engineDrops++
metrics.IncChannelDrop("engine")
if engineDrops%1000 == 1 {
symLogger.Warn("engine channel full, dropping event",
"source", evt.Source,
"total_drops", engineDrops,
)
}
}
}
}
}
}()
}(canonical)

// Raw event writer
if ingestDB != nil {
writer := storage.NewWriter(ingestDB,
cfg.Storage.BatchInsertMaxRows,
cfg.Storage.BatchInsertMaxDelay(),
logger,
// Snapshot engine for this symbol.
eng := engine.NewSnapshotEngine(
cfg.Pricing,
canonical,
cfg.Health.CanonicalStaleAfter(),
ingestDB,
engineCh,
symLogger,
)
safeGo(&wg, cancel, logger, "writer", func() {
writer.Run(ctx, writerCh)
engines[canonical] = eng
safeGo(&wg, cancel, logger, "snapshot-engine-"+canonical, func() {
eng.Run(ctx)
})
}

// Snapshot engine
eng := engine.NewSnapshotEngine(
cfg.Pricing,
cfg.CanonicalSymbol,
cfg.Health.CanonicalStaleAfter(),
ingestDB,
engineCh,
logger,
)
safeGo(&wg, cancel, logger, "snapshot-engine", func() {
eng.Run(ctx)
})

// Note: canonical ticks are written to DB by the snapshot engine directly
// and broadcast to WebSocket clients by the API server's broadcast loop.
// No separate tick writer goroutine is needed.
// MultiEngine aggregates all per-symbol engines for the API.
multi := engine.NewMultiEngine(engines, symbols)

// Retention pruner
if ingestDB != nil {
Expand All @@ -228,7 +246,7 @@ func main() {
}

// API server
srv := api.NewServer(cfg.Server.HTTPAddr, cfg.Server.WSPath, cfg.Server.WS, cfg.Pricing, apiDB, eng, logger)
srv := api.NewServer(cfg.Server.HTTPAddr, cfg.Server.WSPath, cfg.Server.WS, cfg.Pricing, apiDB, multi, logger)
safeGo(&wg, cancel, logger, "api-server", func() {
if err := srv.Run(ctx); err != nil {
logger.Error("API server error", "error", err)
Expand All @@ -238,6 +256,7 @@ func main() {
logger.Info("all components running",
"http", cfg.Server.HTTPAddr,
"ws", cfg.Server.WSPath,
"symbols", symbols,
)

// Wait for shutdown
Expand Down
Loading