-
Notifications
You must be signed in to change notification settings - Fork 0
docs: add comprehensive CLAUDE.md for AI assistant context #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
21eaaf1
7bd17a6
cfdab6c
60c597b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,5 +71,4 @@ credentials/ | |
| # Agents | ||
| .agents/ | ||
| .claude/ | ||
| AGENTS.md | ||
| CLAUDE.md | ||
| AGENTS.md | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,136 @@ | ||||||
| # CLAUDE.md — btick | ||||||
|
|
||||||
| ## Project Overview | ||||||
|
|
||||||
| btick is a **Bitcoin price oracle service** for prediction market settlement. It aggregates real-time BTC/USD price data from four exchanges (Binance, Coinbase, Kraken, OKX) and produces a canonical, manipulation-resistant median price with sub-second precision. | ||||||
|
|
||||||
| **Tech stack:** Go 1.23, PostgreSQL 17+ with TimescaleDB 2.24+, gorilla/websocket, pgx/v5, shopspring/decimal. | ||||||
|
|
||||||
| ## Quick Reference Commands | ||||||
|
|
||||||
| ```bash | ||||||
| # Build & run | ||||||
| go build -o btick ./cmd/btick | ||||||
| go run ./cmd/btick | ||||||
| go run ./cmd/btick -config custom.yaml | ||||||
|
|
||||||
| # Tests | ||||||
| go test ./... # All tests | ||||||
| go test -v ./internal/engine/ # Verbose, single package | ||||||
| go test -run TestFoo ./internal/pkg/ # Single test | ||||||
|
|
||||||
| # Docker | ||||||
| docker build -t btick . | ||||||
| docker run -e DATABASE_URL=... -p 8080:8080 btick | ||||||
| ``` | ||||||
|
|
||||||
| ## Project Structure | ||||||
|
|
||||||
| ``` | ||||||
| cmd/btick/main.go # Entry point, goroutine orchestration, graceful shutdown | ||||||
| internal/ | ||||||
| adapter/ # Exchange WebSocket connectors (Binance, Coinbase, Kraken, OKX) | ||||||
| base.go # BaseAdapter: connection lifecycle, reconnect with backoff | ||||||
| api/ # REST + WebSocket API server | ||||||
| handlers.go # REST endpoints: /v1/price/latest, /v1/snapshots, /v1/ticks, /v1/health | ||||||
| websocket.go # WebSocket broadcast at /ws/price | ||||||
| config/ # YAML config loading with env var expansion | ||||||
| domain/ # Immutable domain types (RawEvent, CanonicalTick, Snapshot1s, etc.) | ||||||
| engine/ # Core pricing logic: median calculation, outlier rejection, snapshots | ||||||
| snapshot.go # SnapshotEngine: 1s snapshots, carry-forward, canonical tick generation | ||||||
| multi.go # MultiEngine: aggregates per-symbol engines for the API layer | ||||||
| persistence.go # Tick/snapshot DB persistence | ||||||
| normalizer/ # Event deduplication (sync.Map sharded by source), UUID v7 assignment | ||||||
| storage/ # PostgreSQL/TimescaleDB layer (pgx, no ORM) | ||||||
| postgres.go # Pool creation, migrations | ||||||
| queries.go # Read queries | ||||||
| writer.go # Batch writes (raw events, ticks, snapshots) | ||||||
| pruner.go # Data retention cleanup | ||||||
| metrics/ # In-memory Prometheus-compatible metrics at /metrics | ||||||
| migrations/ | ||||||
| 001_init.sql # Idempotent schema: raw_ticks, canonical_ticks, snapshots_1s hypertables | ||||||
| docs/ # API.md, ARCHITECTURE.md, DATABASE.md, DEPLOYMENT.md, OPTIMIZATION.md, openapi.yaml | ||||||
| ``` | ||||||
|
|
||||||
| ## Architecture & Data Flow | ||||||
|
|
||||||
| ``` | ||||||
| Per symbol (BTC/USD, ETH/USD, ...): | ||||||
| Exchanges → Adapters[sym] → Normalizer[sym] → Fan-out[sym] → Engine[sym] | ||||||
| ↓ ↓ | ||||||
| shared Writer MultiEngine → API/WebSocket | ||||||
| ↓ | ||||||
| PostgreSQL (TimescaleDB) | ||||||
| ``` | ||||||
|
|
||||||
| - **Multi-symbol**: each symbol gets an independent pipeline (adapters, normalizer, engine). | ||||||
| - Config supports both legacy single-symbol (`canonical_symbol` + `sources`) and new multi-symbol (`symbols[]`) format. | ||||||
| - Each adapter runs in its own goroutine with auto-reconnect and exponential backoff. | ||||||
| - Normalizer deduplicates by `(source, trade_id)` and assigns UUID v7 IDs. It stamps the configured canonical symbol on all events. | ||||||
| - Fan-out uses **non-blocking channel sends** — full channels drop events rather than block. | ||||||
| - Engine produces 1-second snapshots with median price from healthy sources. | ||||||
| - Canonical ticks are emitted only on price changes. | ||||||
| - `MultiEngine` merges snapshot/tick channels from all per-symbol engines and routes `LatestState(symbol)` to the correct engine. | ||||||
| - REST endpoints accept `?symbol=` query param (defaults to first configured symbol). | ||||||
| - WebSocket messages include a `symbol` field. | ||||||
|
|
||||||
| ## Key Conventions | ||||||
|
|
||||||
| ### Code Style | ||||||
| - **Standard Go conventions**: `gofmt`, `go vet`, idiomatic naming (CamelCase types, camelCase unexported). | ||||||
| - **Short variable names**: `evt`, `cfg`, `db`, `mu`, `wg`, `ctx`. | ||||||
| - **Packages**: lowercase, single word (`adapter`, `engine`, `domain`). | ||||||
| - **Interfaces**: named with `-er` suffix or explicit nouns (`Store`, `MessageHandler`); defined on consumer side. | ||||||
|
|
||||||
| ### Error Handling | ||||||
| - Every error is explicitly checked: `if err != nil { ... }`. | ||||||
| - Errors are wrapped with context: `fmt.Errorf("context: %w", err)`. | ||||||
| - Structured logging on errors via `slog` with relevant fields. | ||||||
|
|
||||||
| ### Logging | ||||||
| - Framework: `log/slog` with structured JSON output in production. | ||||||
| - Levels: Debug, Info, Warn, Error. | ||||||
| - Add context with `.With()`: component, source, error details. | ||||||
|
|
||||||
| ### Testing | ||||||
| - Go standard `testing` package only — no external test frameworks. | ||||||
| - Table-driven tests are preferred. | ||||||
| - Mocks defined locally in test files (e.g., `mockStore`). | ||||||
| - Test helpers use `slog.NewTextHandler` at ErrorLevel to suppress noise. | ||||||
| - Use `httptest` for HTTP handler tests. | ||||||
| - ~188 test functions across all packages. | ||||||
|
|
||||||
| ### Database | ||||||
| - **Direct pgx** — no ORM. | ||||||
| - **Split connection pools**: ingest pool (writes, 12 conns) and query pool (reads, 8 conns). | ||||||
| - **Idempotent migrations**: `IF NOT EXISTS`, `DO $$ ... EXCEPTION` blocks. | ||||||
| - **TimescaleDB hypertables** with automatic compression policies. | ||||||
| - **shopspring/decimal** for all price arithmetic — never use `float64` for money. | ||||||
|
|
||||||
| ### Concurrency | ||||||
| - `safeGo()` wrapper for goroutines: panic recovery + graceful shutdown. | ||||||
| - Channels are non-blocking (drop on full, never block producers). | ||||||
| - Drop counters logged every 1000 events. | ||||||
| - Context cancellation propagates shutdown through all goroutines. | ||||||
|
|
||||||
| ## Configuration | ||||||
|
|
||||||
| Config loaded from `config.yaml` (see `config.yaml.example`). Environment variable expansion supported via `${VAR}` syntax. `DATABASE_URL` env var overrides config DSN. | ||||||
|
|
||||||
| Key sections: `server`, `database`, `sources` (per-exchange), `pricing` (median mode, outlier %, freshness), `storage` (retention, batching), `health`. | ||||||
|
||||||
| Key sections: `server`, `database`, `sources` (per-exchange), `pricing` (median mode, outlier %, freshness), `storage` (retention, batching), `health`. | |
| Key sections: `server`, `database`, `symbols` (per-symbol `sources`; legacy `canonical_symbol` + top-level `sources` still supported), `pricing` (median mode, outlier %, freshness), `storage` (retention, batching), `health`. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,6 +62,11 @@ func main() { | |
| os.Exit(1) | ||
| } | ||
|
|
||
| if len(cfg.Symbols) == 0 { | ||
| logger.Error("no symbols configured") | ||
| os.Exit(1) | ||
| } | ||
|
|
||
| // Context with signal handling | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
|
|
@@ -100,113 +105,126 @@ func main() { | |
| apiDB = ingestDB | ||
| } | ||
|
|
||
| // Channels | ||
| // Adapters -> Normalizer | ||
| rawCh := make(chan domain.RawEvent, 10000) | ||
| // Normalizer -> (Writer + SnapshotEngine) | ||
| normalizedCh := make(chan domain.RawEvent, 10000) | ||
| // Fan-out from normalizedCh to writer and engine | ||
| writerCh := make(chan domain.RawEvent, 10000) | ||
| engineCh := make(chan domain.RawEvent, 10000) | ||
|
|
||
| var wg sync.WaitGroup | ||
|
|
||
| // Start feed adapters | ||
| for _, src := range cfg.Sources { | ||
| if !src.Enabled { | ||
| logger.Info("source disabled, skipping", "source", src.Name) | ||
| continue | ||
| } | ||
| // Shared writer channel — all symbols' raw events go to the same DB. | ||
| writerCh := make(chan domain.RawEvent, 10000) | ||
|
|
||
| s := src | ||
| safeGo(&wg, cancel, logger, "adapter-"+s.Name, func() { | ||
| startAdapter(ctx, s, rawCh, logger) | ||
| // Raw event writer (shared across symbols) | ||
| if ingestDB != nil { | ||
| writer := storage.NewWriter(ingestDB, | ||
| cfg.Storage.BatchInsertMaxRows, | ||
| cfg.Storage.BatchInsertMaxDelay(), | ||
| logger, | ||
| ) | ||
| safeGo(&wg, cancel, logger, "writer", func() { | ||
| writer.Run(ctx, writerCh) | ||
| }) | ||
| } | ||
|
|
||
| // Normalizer | ||
| norm := normalizer.New(rawCh, normalizedCh, logger) | ||
| safeGo(&wg, cancel, logger, "normalizer", func() { | ||
| norm.Run(ctx) | ||
| }) | ||
| // Build per-symbol pipelines. | ||
| engines := make(map[string]*engine.SnapshotEngine, len(cfg.Symbols)) | ||
| symbols := make([]string, 0, len(cfg.Symbols)) | ||
|
|
||
| // Fan-out: normalizedCh -> writerCh + engineCh | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| defer close(writerCh) | ||
| defer close(engineCh) | ||
| var writerDrops, engineDrops int64 | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| if writerDrops > 0 || engineDrops > 0 { | ||
| logger.Warn("fan-out final drop counts", | ||
| "writer_drops", writerDrops, | ||
| "engine_drops", engineDrops, | ||
| ) | ||
| } | ||
| return | ||
| case evt, ok := <-normalizedCh: | ||
| if !ok { | ||
| return | ||
| } | ||
| for _, sym := range cfg.Symbols { | ||
| canonical := sym.Canonical | ||
| symbols = append(symbols, canonical) | ||
| symLogger := logger.With("symbol", canonical) | ||
|
|
||
|
Comment on lines
+129
to
+133
|
||
| // Channels for this symbol's pipeline. | ||
| rawCh := make(chan domain.RawEvent, 10000) | ||
| normalizedCh := make(chan domain.RawEvent, 10000) | ||
| engineCh := make(chan domain.RawEvent, 10000) | ||
|
|
||
| // Collect source names for dedup shard pre-init. | ||
| sourceNames := make([]string, 0, len(sym.Sources)) | ||
| for _, src := range sym.Sources { | ||
| sourceNames = append(sourceNames, src.Name) | ||
| } | ||
|
|
||
| // Start feed adapters for this symbol. | ||
| for _, src := range sym.Sources { | ||
| if !src.Enabled { | ||
| symLogger.Info("source disabled, skipping", "source", src.Name) | ||
| continue | ||
| } | ||
| s := src | ||
| safeGo(&wg, cancel, logger, "adapter-"+canonical+"-"+s.Name, func() { | ||
| startAdapter(ctx, s, rawCh, symLogger) | ||
| }) | ||
| } | ||
|
Comment on lines
+129
to
+155
|
||
|
|
||
| // Normalizer for this symbol. | ||
| norm := normalizer.New(rawCh, normalizedCh, canonical, sourceNames, symLogger) | ||
| safeGo(&wg, cancel, logger, "normalizer-"+canonical, func() { | ||
| norm.Run(ctx) | ||
| }) | ||
|
|
||
| // Fan-out: normalizedCh → shared writerCh + per-symbol engineCh. | ||
| wg.Add(1) | ||
| go func(sym string) { | ||
| defer wg.Done() | ||
| defer close(engineCh) | ||
| var writerDrops, engineDrops int64 | ||
| for { | ||
| select { | ||
| case writerCh <- evt: | ||
| default: | ||
| writerDrops++ | ||
| metrics.IncChannelDrop("writer") | ||
| if writerDrops%1000 == 1 { | ||
| logger.Warn("writer channel full, dropping event", | ||
| "source", evt.Source, | ||
| "total_drops", writerDrops, | ||
| case <-ctx.Done(): | ||
| if writerDrops > 0 || engineDrops > 0 { | ||
| symLogger.Warn("fan-out final drop counts", | ||
| "writer_drops", writerDrops, | ||
| "engine_drops", engineDrops, | ||
| ) | ||
| } | ||
| } | ||
| select { | ||
| case engineCh <- evt: | ||
| default: | ||
| engineDrops++ | ||
| metrics.IncChannelDrop("engine") | ||
| if engineDrops%1000 == 1 { | ||
| logger.Warn("engine channel full, dropping event", | ||
| "source", evt.Source, | ||
| "total_drops", engineDrops, | ||
| ) | ||
| return | ||
| case evt, ok := <-normalizedCh: | ||
| if !ok { | ||
| return | ||
| } | ||
| select { | ||
| case writerCh <- evt: | ||
| default: | ||
| writerDrops++ | ||
| metrics.IncChannelDrop("writer") | ||
| if writerDrops%1000 == 1 { | ||
| symLogger.Warn("writer channel full, dropping event", | ||
| "source", evt.Source, | ||
| "total_drops", writerDrops, | ||
| ) | ||
| } | ||
| } | ||
| select { | ||
| case engineCh <- evt: | ||
| default: | ||
| engineDrops++ | ||
| metrics.IncChannelDrop("engine") | ||
| if engineDrops%1000 == 1 { | ||
| symLogger.Warn("engine channel full, dropping event", | ||
| "source", evt.Source, | ||
| "total_drops", engineDrops, | ||
| ) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| }() | ||
| }(canonical) | ||
|
|
||
| // Raw event writer | ||
| if ingestDB != nil { | ||
| writer := storage.NewWriter(ingestDB, | ||
| cfg.Storage.BatchInsertMaxRows, | ||
| cfg.Storage.BatchInsertMaxDelay(), | ||
| logger, | ||
| // Snapshot engine for this symbol. | ||
| eng := engine.NewSnapshotEngine( | ||
| cfg.Pricing, | ||
| canonical, | ||
| cfg.Health.CanonicalStaleAfter(), | ||
| ingestDB, | ||
| engineCh, | ||
| symLogger, | ||
| ) | ||
| safeGo(&wg, cancel, logger, "writer", func() { | ||
| writer.Run(ctx, writerCh) | ||
| engines[canonical] = eng | ||
| safeGo(&wg, cancel, logger, "snapshot-engine-"+canonical, func() { | ||
| eng.Run(ctx) | ||
| }) | ||
| } | ||
|
|
||
| // Snapshot engine | ||
| eng := engine.NewSnapshotEngine( | ||
| cfg.Pricing, | ||
| cfg.CanonicalSymbol, | ||
| cfg.Health.CanonicalStaleAfter(), | ||
| ingestDB, | ||
| engineCh, | ||
| logger, | ||
| ) | ||
| safeGo(&wg, cancel, logger, "snapshot-engine", func() { | ||
| eng.Run(ctx) | ||
| }) | ||
|
|
||
| // Note: canonical ticks are written to DB by the snapshot engine directly | ||
| // and broadcast to WebSocket clients by the API server's broadcast loop. | ||
| // No separate tick writer goroutine is needed. | ||
| // MultiEngine aggregates all per-symbol engines for the API. | ||
| multi := engine.NewMultiEngine(engines, symbols) | ||
|
|
||
| // Retention pruner | ||
| if ingestDB != nil { | ||
|
|
@@ -228,7 +246,7 @@ func main() { | |
| } | ||
|
|
||
| // API server | ||
| srv := api.NewServer(cfg.Server.HTTPAddr, cfg.Server.WSPath, cfg.Server.WS, cfg.Pricing, apiDB, eng, logger) | ||
| srv := api.NewServer(cfg.Server.HTTPAddr, cfg.Server.WSPath, cfg.Server.WS, cfg.Pricing, apiDB, multi, logger) | ||
| safeGo(&wg, cancel, logger, "api-server", func() { | ||
| if err := srv.Run(ctx); err != nil { | ||
| logger.Error("API server error", "error", err) | ||
|
|
@@ -238,6 +256,7 @@ func main() { | |
| logger.Info("all components running", | ||
| "http", cfg.Server.HTTPAddr, | ||
| "ws", cfg.Server.WSPath, | ||
| "symbols", symbols, | ||
| ) | ||
|
|
||
| // Wait for shutdown | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR description/title indicates this is a docs-only change (adding CLAUDE.md and un-ignoring it), but the diff includes substantial functional changes (multi-symbol pipelines, new Engine interface, WebSocket connection limiting/eviction, new metrics). Please update the PR title/description to reflect the actual scope so reviewers can assess risk appropriately.