diff --git a/.gitignore b/.gitignore index c0bc651..cf3bb95 100644 --- a/.gitignore +++ b/.gitignore @@ -71,5 +71,4 @@ credentials/ # Agents .agents/ .claude/ -AGENTS.md -CLAUDE.md \ No newline at end of file +AGENTS.md \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..9d1f2bf --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,136 @@ +# CLAUDE.md — btick + +## Project Overview + +btick is a **Bitcoin price oracle service** for prediction market settlement. It aggregates real-time BTC/USD price data from four exchanges (Binance, Coinbase, Kraken, OKX) and produces a canonical, manipulation-resistant median price with sub-second precision. + +**Tech stack:** Go 1.23, PostgreSQL 17+ with TimescaleDB 2.24+, gorilla/websocket, pgx/v5, shopspring/decimal. + +## Quick Reference Commands + +```bash +# Build & run +go build -o btick ./cmd/btick +go run ./cmd/btick +go run ./cmd/btick -config custom.yaml + +# Tests +go test ./... # All tests +go test -v ./internal/engine/ # Verbose, single package +go test -run TestFoo ./internal/pkg/ # Single test + +# Docker +docker build -t btick . +docker run -e DATABASE_URL=... -p 8080:8080 btick +``` + +## Project Structure + +``` +cmd/btick/main.go # Entry point, goroutine orchestration, graceful shutdown +internal/ + adapter/ # Exchange WebSocket connectors (Binance, Coinbase, Kraken, OKX) + base.go # BaseAdapter: connection lifecycle, reconnect with backoff + api/ # REST + WebSocket API server + handlers.go # REST endpoints: /v1/price/latest, /v1/snapshots, /v1/ticks, /v1/health + websocket.go # WebSocket broadcast at /ws/price + config/ # YAML config loading with env var expansion + domain/ # Immutable domain types (RawEvent, CanonicalTick, Snapshot1s, etc.) + engine/ # Core pricing logic: median calculation, outlier rejection, snapshots + snapshot.go # SnapshotEngine: 1s snapshots, carry-forward, canonical tick generation + multi.go # MultiEngine: aggregates per-symbol engines for the API layer + persistence.go # Tick/snapshot DB persistence + normalizer/ # Event deduplication (sync.Map sharded by source), UUID v7 assignment + storage/ # PostgreSQL/TimescaleDB layer (pgx, no ORM) + postgres.go # Pool creation, migrations + queries.go # Read queries + writer.go # Batch writes (raw events, ticks, snapshots) + pruner.go # Data retention cleanup + metrics/ # In-memory Prometheus-compatible metrics at /metrics +migrations/ + 001_init.sql # Idempotent schema: raw_ticks, canonical_ticks, snapshots_1s hypertables +docs/ # API.md, ARCHITECTURE.md, DATABASE.md, DEPLOYMENT.md, OPTIMIZATION.md, openapi.yaml +``` + +## Architecture & Data Flow + +``` +Per symbol (BTC/USD, ETH/USD, ...): + Exchanges → Adapters[sym] → Normalizer[sym] → Fan-out[sym] → Engine[sym] + ↓ ↓ + shared Writer MultiEngine → API/WebSocket + ↓ + PostgreSQL (TimescaleDB) +``` + +- **Multi-symbol**: each symbol gets an independent pipeline (adapters, normalizer, engine). +- Config supports both legacy single-symbol (`canonical_symbol` + `sources`) and new multi-symbol (`symbols[]`) format. +- Each adapter runs in its own goroutine with auto-reconnect and exponential backoff. +- Normalizer deduplicates by `(source, trade_id)` and assigns UUID v7 IDs. It stamps the configured canonical symbol on all events. +- Fan-out uses **non-blocking channel sends** — full channels drop events rather than block. +- Engine produces 1-second snapshots with median price from healthy sources. +- Canonical ticks are emitted only on price changes. +- `MultiEngine` merges snapshot/tick channels from all per-symbol engines and routes `LatestState(symbol)` to the correct engine. +- REST endpoints accept `?symbol=` query param (defaults to first configured symbol). +- WebSocket messages include a `symbol` field. + +## Key Conventions + +### Code Style +- **Standard Go conventions**: `gofmt`, `go vet`, idiomatic naming (CamelCase types, camelCase unexported). +- **Short variable names**: `evt`, `cfg`, `db`, `mu`, `wg`, `ctx`. +- **Packages**: lowercase, single word (`adapter`, `engine`, `domain`). +- **Interfaces**: named with `-er` suffix or explicit nouns (`Store`, `MessageHandler`); defined on consumer side. + +### Error Handling +- Every error is explicitly checked: `if err != nil { ... }`. +- Errors are wrapped with context: `fmt.Errorf("context: %w", err)`. +- Structured logging on errors via `slog` with relevant fields. + +### Logging +- Framework: `log/slog` with structured JSON output in production. +- Levels: Debug, Info, Warn, Error. +- Add context with `.With()`: component, source, error details. + +### Testing +- Go standard `testing` package only — no external test frameworks. +- Table-driven tests are preferred. +- Mocks defined locally in test files (e.g., `mockStore`). +- Test helpers use `slog.NewTextHandler` at ErrorLevel to suppress noise. +- Use `httptest` for HTTP handler tests. +- ~188 test functions across all packages. + +### Database +- **Direct pgx** — no ORM. +- **Split connection pools**: ingest pool (writes, 12 conns) and query pool (reads, 8 conns). +- **Idempotent migrations**: `IF NOT EXISTS`, `DO $$ ... EXCEPTION` blocks. +- **TimescaleDB hypertables** with automatic compression policies. +- **shopspring/decimal** for all price arithmetic — never use `float64` for money. + +### Concurrency +- `safeGo()` wrapper for goroutines: panic recovery + graceful shutdown. +- Channels are non-blocking (drop on full, never block producers). +- Drop counters logged every 1000 events. +- Context cancellation propagates shutdown through all goroutines. + +## Configuration + +Config loaded from `config.yaml` (see `config.yaml.example`). Environment variable expansion supported via `${VAR}` syntax. `DATABASE_URL` env var overrides config DSN. + +Key sections: `server`, `database`, `sources` (per-exchange), `pricing` (median mode, outlier %, freshness), `storage` (retention, batching), `health`. + +## CI/CD + +- **GitHub Actions** (`.github/workflows/build-timescaledb.yml`): builds custom TimescaleDB Docker image to GHCR. +- **No automated test CI** — run `go test ./...` locally before pushing. +- **Deployment**: Docker or Railway. + +## Dependencies (direct) + +| Module | Purpose | +|--------|---------| +| `github.com/google/uuid` | UUID v7 generation | +| `github.com/gorilla/websocket` | WebSocket client/server | +| `github.com/jackc/pgx/v5` | PostgreSQL driver | +| `github.com/shopspring/decimal` | Exact decimal arithmetic | +| `gopkg.in/yaml.v3` | YAML config parsing | diff --git a/cmd/btick/main.go b/cmd/btick/main.go index 394c3b5..8f20730 100644 --- a/cmd/btick/main.go +++ b/cmd/btick/main.go @@ -62,6 +62,11 @@ func main() { os.Exit(1) } + if len(cfg.Symbols) == 0 { + logger.Error("no symbols configured") + os.Exit(1) + } + // Context with signal handling ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -100,113 +105,126 @@ func main() { apiDB = ingestDB } - // Channels - // Adapters -> Normalizer - rawCh := make(chan domain.RawEvent, 10000) - // Normalizer -> (Writer + SnapshotEngine) - normalizedCh := make(chan domain.RawEvent, 10000) - // Fan-out from normalizedCh to writer and engine - writerCh := make(chan domain.RawEvent, 10000) - engineCh := make(chan domain.RawEvent, 10000) - var wg sync.WaitGroup - // Start feed adapters - for _, src := range cfg.Sources { - if !src.Enabled { - logger.Info("source disabled, skipping", "source", src.Name) - continue - } + // Shared writer channel — all symbols' raw events go to the same DB. + writerCh := make(chan domain.RawEvent, 10000) - s := src - safeGo(&wg, cancel, logger, "adapter-"+s.Name, func() { - startAdapter(ctx, s, rawCh, logger) + // Raw event writer (shared across symbols) + if ingestDB != nil { + writer := storage.NewWriter(ingestDB, + cfg.Storage.BatchInsertMaxRows, + cfg.Storage.BatchInsertMaxDelay(), + logger, + ) + safeGo(&wg, cancel, logger, "writer", func() { + writer.Run(ctx, writerCh) }) } - // Normalizer - norm := normalizer.New(rawCh, normalizedCh, logger) - safeGo(&wg, cancel, logger, "normalizer", func() { - norm.Run(ctx) - }) + // Build per-symbol pipelines. + engines := make(map[string]*engine.SnapshotEngine, len(cfg.Symbols)) + symbols := make([]string, 0, len(cfg.Symbols)) - // Fan-out: normalizedCh -> writerCh + engineCh - wg.Add(1) - go func() { - defer wg.Done() - defer close(writerCh) - defer close(engineCh) - var writerDrops, engineDrops int64 - for { - select { - case <-ctx.Done(): - if writerDrops > 0 || engineDrops > 0 { - logger.Warn("fan-out final drop counts", - "writer_drops", writerDrops, - "engine_drops", engineDrops, - ) - } - return - case evt, ok := <-normalizedCh: - if !ok { - return - } + for _, sym := range cfg.Symbols { + canonical := sym.Canonical + symbols = append(symbols, canonical) + symLogger := logger.With("symbol", canonical) + + // Channels for this symbol's pipeline. + rawCh := make(chan domain.RawEvent, 10000) + normalizedCh := make(chan domain.RawEvent, 10000) + engineCh := make(chan domain.RawEvent, 10000) + + // Collect source names for dedup shard pre-init. + sourceNames := make([]string, 0, len(sym.Sources)) + for _, src := range sym.Sources { + sourceNames = append(sourceNames, src.Name) + } + + // Start feed adapters for this symbol. + for _, src := range sym.Sources { + if !src.Enabled { + symLogger.Info("source disabled, skipping", "source", src.Name) + continue + } + s := src + safeGo(&wg, cancel, logger, "adapter-"+canonical+"-"+s.Name, func() { + startAdapter(ctx, s, rawCh, symLogger) + }) + } + + // Normalizer for this symbol. + norm := normalizer.New(rawCh, normalizedCh, canonical, sourceNames, symLogger) + safeGo(&wg, cancel, logger, "normalizer-"+canonical, func() { + norm.Run(ctx) + }) + + // Fan-out: normalizedCh → shared writerCh + per-symbol engineCh. + wg.Add(1) + go func(sym string) { + defer wg.Done() + defer close(engineCh) + var writerDrops, engineDrops int64 + for { select { - case writerCh <- evt: - default: - writerDrops++ - metrics.IncChannelDrop("writer") - if writerDrops%1000 == 1 { - logger.Warn("writer channel full, dropping event", - "source", evt.Source, - "total_drops", writerDrops, + case <-ctx.Done(): + if writerDrops > 0 || engineDrops > 0 { + symLogger.Warn("fan-out final drop counts", + "writer_drops", writerDrops, + "engine_drops", engineDrops, ) } - } - select { - case engineCh <- evt: - default: - engineDrops++ - metrics.IncChannelDrop("engine") - if engineDrops%1000 == 1 { - logger.Warn("engine channel full, dropping event", - "source", evt.Source, - "total_drops", engineDrops, - ) + return + case evt, ok := <-normalizedCh: + if !ok { + return + } + select { + case writerCh <- evt: + default: + writerDrops++ + metrics.IncChannelDrop("writer") + if writerDrops%1000 == 1 { + symLogger.Warn("writer channel full, dropping event", + "source", evt.Source, + "total_drops", writerDrops, + ) + } + } + select { + case engineCh <- evt: + default: + engineDrops++ + metrics.IncChannelDrop("engine") + if engineDrops%1000 == 1 { + symLogger.Warn("engine channel full, dropping event", + "source", evt.Source, + "total_drops", engineDrops, + ) + } } } } - } - }() + }(canonical) - // Raw event writer - if ingestDB != nil { - writer := storage.NewWriter(ingestDB, - cfg.Storage.BatchInsertMaxRows, - cfg.Storage.BatchInsertMaxDelay(), - logger, + // Snapshot engine for this symbol. + eng := engine.NewSnapshotEngine( + cfg.Pricing, + canonical, + cfg.Health.CanonicalStaleAfter(), + ingestDB, + engineCh, + symLogger, ) - safeGo(&wg, cancel, logger, "writer", func() { - writer.Run(ctx, writerCh) + engines[canonical] = eng + safeGo(&wg, cancel, logger, "snapshot-engine-"+canonical, func() { + eng.Run(ctx) }) } - // Snapshot engine - eng := engine.NewSnapshotEngine( - cfg.Pricing, - cfg.CanonicalSymbol, - cfg.Health.CanonicalStaleAfter(), - ingestDB, - engineCh, - logger, - ) - safeGo(&wg, cancel, logger, "snapshot-engine", func() { - eng.Run(ctx) - }) - - // Note: canonical ticks are written to DB by the snapshot engine directly - // and broadcast to WebSocket clients by the API server's broadcast loop. - // No separate tick writer goroutine is needed. + // MultiEngine aggregates all per-symbol engines for the API. + multi := engine.NewMultiEngine(engines, symbols) // Retention pruner if ingestDB != nil { @@ -228,7 +246,7 @@ func main() { } // API server - srv := api.NewServer(cfg.Server.HTTPAddr, cfg.Server.WSPath, cfg.Server.WS, cfg.Pricing, apiDB, eng, logger) + srv := api.NewServer(cfg.Server.HTTPAddr, cfg.Server.WSPath, cfg.Server.WS, cfg.Pricing, apiDB, multi, logger) safeGo(&wg, cancel, logger, "api-server", func() { if err := srv.Run(ctx); err != nil { logger.Error("API server error", "error", err) @@ -238,6 +256,7 @@ func main() { logger.Info("all components running", "http", cfg.Server.HTTPAddr, "ws", cfg.Server.WSPath, + "symbols", symbols, ) // Wait for shutdown diff --git a/config.yaml.example b/config.yaml.example index 5937101..a349e1f 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -1,4 +1,52 @@ -canonical_symbol: BTC/USD +# Multi-symbol configuration. +# Each symbol defines its own set of exchange sources. +symbols: + - canonical: BTC/USD + sources: + - name: binance + enabled: true + ws_url: "wss://stream.binance.com:9443/stream?streams=btcusdt@trade/btcusdt@bookTicker" + native_symbol: btcusdt + use_book_ticker_fallback: true + ping_interval_sec: 15 + max_conn_lifetime_sec: 86000 + + - name: coinbase + enabled: true + ws_url: "wss://ws-feed.exchange.coinbase.com" + native_symbol: BTC-USD + ping_interval_sec: 25 + max_conn_lifetime_sec: 0 + + - name: kraken + enabled: true + ws_url: "wss://ws.kraken.com/v2" + native_symbol: BTC/USD + use_ticker_fallback: true + ping_interval_sec: 30 + max_conn_lifetime_sec: 0 + + - name: okx + enabled: true + ws_url: "wss://ws.okx.com:8443/ws/v5/public" + native_symbol: BTC-USDT + ping_interval_sec: 20 + max_conn_lifetime_sec: 0 + + # Example: add more symbols by duplicating the block above. + # - canonical: ETH/USD + # sources: + # - name: binance + # enabled: true + # ws_url: "wss://stream.binance.com:9443/stream?streams=ethusdt@trade/ethusdt@bookTicker" + # native_symbol: ethusdt + # use_book_ticker_fallback: true + # ping_interval_sec: 15 + # - name: coinbase + # enabled: true + # ws_url: "wss://ws-feed.exchange.coinbase.com" + # native_symbol: ETH-USD + # ping_interval_sec: 25 server: http_addr: ":8080" @@ -8,6 +56,8 @@ server: heartbeat_interval_sec: 5 ping_interval_sec: 30 read_deadline_sec: 60 + max_clients: 1000 + slow_client_max_drops: 500 database: dsn: "${DATABASE_URL}" @@ -15,37 +65,6 @@ database: query_max_conns: 8 run_migrations: true -sources: - - name: binance - enabled: true - ws_url: "wss://stream.binance.com:9443/stream?streams=btcusdt@trade/btcusdt@bookTicker" - native_symbol: btcusdt - use_book_ticker_fallback: true - ping_interval_sec: 15 - max_conn_lifetime_sec: 86000 - - - name: coinbase - enabled: true - ws_url: "wss://ws-feed.exchange.coinbase.com" - native_symbol: BTC-USD - ping_interval_sec: 25 - max_conn_lifetime_sec: 0 - - - name: kraken - enabled: true - ws_url: "wss://ws.kraken.com/v2" - native_symbol: BTC/USD - use_ticker_fallback: true - ping_interval_sec: 30 - max_conn_lifetime_sec: 0 - - - name: okx - enabled: true - ws_url: "wss://ws.okx.com:8443/ws/v5/public" - native_symbol: BTC-USDT - ping_interval_sec: 20 - max_conn_lifetime_sec: 0 - pricing: mode: multi_venue_median minimum_healthy_sources: 2 diff --git a/internal/api/benchmark_test.go b/internal/api/benchmark_test.go new file mode 100644 index 0000000..5f95226 --- /dev/null +++ b/internal/api/benchmark_test.go @@ -0,0 +1,242 @@ +package api + +import ( + "encoding/json" + "io" + "log/slog" + "strconv" + "testing" + "time" + + "github.com/justar9/btick/internal/config" + "github.com/justar9/btick/internal/domain" + "github.com/shopspring/decimal" +) + +func benchLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError})) +} + +func benchHub(clients int) *WSHub { + hub := NewWSHub(benchLogger(), config.WSConfig{ + SendBufferSize: 256, + SlowClientMaxDrops: 10000, // high limit to avoid eviction in bench + }, func(_ string) *domain.LatestState { return nil }, []string{"BTC/USD"}) + + // Add mock clients with buffered channels. + hub.mu.Lock() + for i := 0; i < clients; i++ { + c := &wsClient{ + sendCh: make(chan []byte, 256), + subs: newSubscriptions(), + logger: hub.logger, + connectedAt: time.Now().UTC(), + } + hub.clients[c] = struct{}{} + } + hub.mu.Unlock() + + return hub +} + +func benchMessage() WSMessage { + return WSMessage{ + Type: "latest_price", + Symbol: "BTC/USD", + TS: time.Now().UTC().Format(time.RFC3339Nano), + Price: decimal.NewFromInt(84_150).String(), + Basis: "median_trade", + QualityScore: "0.95", + SourceCount: 4, + SourcesUsed: []string{"binance", "coinbase", "kraken", "okx"}, + } +} + +// ============================================================================= +// Broadcast Benchmarks — varying client counts +// ============================================================================= + +func BenchmarkBroadcast_1Client(b *testing.B) { + benchmarkBroadcastN(b, 1) +} + +func BenchmarkBroadcast_10Clients(b *testing.B) { + benchmarkBroadcastN(b, 10) +} + +func BenchmarkBroadcast_100Clients(b *testing.B) { + benchmarkBroadcastN(b, 100) +} + +func BenchmarkBroadcast_500Clients(b *testing.B) { + benchmarkBroadcastN(b, 500) +} + +func BenchmarkBroadcast_1000Clients(b *testing.B) { + benchmarkBroadcastN(b, 1000) +} + +func benchmarkBroadcastN(b *testing.B, n int) { + hub := benchHub(n) + msg := benchMessage() + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + hub.Broadcast(msg) + } + + b.StopTimer() + // Drain client channels to avoid accumulation. + hub.mu.RLock() + for c := range hub.clients { + for len(c.sendCh) > 0 { + <-c.sendCh + } + } + hub.mu.RUnlock() +} + +// ============================================================================= +// Broadcast with subscription filtering +// ============================================================================= + +func BenchmarkBroadcast_100Clients_Filtered(b *testing.B) { + hub := benchHub(100) + + // Half of clients unsubscribe from latest_price + hub.mu.RLock() + i := 0 + for c := range hub.clients { + if i%2 == 0 { + c.subs.set("latest_price", false) + } + i++ + } + hub.mu.RUnlock() + + msg := benchMessage() + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + hub.Broadcast(msg) + } +} + +// ============================================================================= +// Broadcast with slow clients (channel full → drops) +// ============================================================================= + +func BenchmarkBroadcast_100Clients_SlowMix(b *testing.B) { + hub := NewWSHub(benchLogger(), config.WSConfig{ + SendBufferSize: 4, // tiny buffer + SlowClientMaxDrops: 100000, + }, func(_ string) *domain.LatestState { return nil }, []string{"BTC/USD"}) + + hub.mu.Lock() + for i := 0; i < 100; i++ { + c := &wsClient{ + sendCh: make(chan []byte, 4), + subs: newSubscriptions(), + logger: hub.logger, + connectedAt: time.Now().UTC(), + } + hub.clients[c] = struct{}{} + } + hub.mu.Unlock() + + msg := benchMessage() + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + hub.Broadcast(msg) + } +} + +// ============================================================================= +// JSON Marshal benchmark (message serialization cost) +// ============================================================================= + +func BenchmarkWSMessage_Marshal(b *testing.B) { + msg := benchMessage() + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, _ = json.Marshal(msg) + } +} + +// ============================================================================= +// Snapshot message construction benchmark +// ============================================================================= + +func BenchmarkSnapshotToWSMessage(b *testing.B) { + snap := domain.Snapshot1s{ + TSSecond: time.Now().UTC().Truncate(time.Second), + CanonicalSymbol: "BTC/USD", + CanonicalPrice: decimal.NewFromInt(84_150), + Basis: "median_trade", + QualityScore: decimal.NewFromFloat(0.95), + SourceCount: 4, + SourcesUsed: []string{"binance", "coinbase", "kraken", "okx"}, + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _ = WSMessage{ + Type: "snapshot_1s", + Symbol: snap.CanonicalSymbol, + TS: snap.TSSecond.Format(time.RFC3339Nano), + Price: snap.CanonicalPrice.String(), + Basis: snap.Basis, + IsStale: snap.IsStale, + QualityScore: snap.QualityScore.String(), + SourceCount: snap.SourceCount, + SourcesUsed: snap.SourcesUsed, + } + } +} + +// ============================================================================= +// Broadcast throughput benchmark (messages/sec) +// ============================================================================= + +func BenchmarkBroadcast_Throughput_100Clients(b *testing.B) { + hub := benchHub(100) + sources := []string{"binance", "coinbase", "kraken", "okx"} + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + msg := WSMessage{ + Type: "latest_price", + Symbol: "BTC/USD", + TS: time.Now().UTC().Format(time.RFC3339Nano), + Price: strconv.Itoa(84_000 + i%500), + Basis: "median_trade", + QualityScore: "0.95", + SourceCount: 4, + SourcesUsed: sources, + } + hub.Broadcast(msg) + } + + b.StopTimer() + hub.mu.RLock() + for c := range hub.clients { + for len(c.sendCh) > 0 { + <-c.sendCh + } + } + hub.mu.RUnlock() +} diff --git a/internal/api/handlers.go b/internal/api/handlers.go index d472888..275bc0f 100644 --- a/internal/api/handlers.go +++ b/internal/api/handlers.go @@ -12,8 +12,21 @@ import ( "github.com/shopspring/decimal" ) +// resolveSymbol returns the canonical symbol from query param, falling back to the first configured symbol. +func (s *Server) resolveSymbol(r *http.Request) string { + if sym := r.URL.Query().Get("symbol"); sym != "" { + return sym + } + syms := s.engine.Symbols() + if len(syms) > 0 { + return syms[0] + } + return "" +} + func (s *Server) handleLatest(w http.ResponseWriter, r *http.Request) { - latest := s.engine.LatestState() + sym := s.resolveSymbol(r) + latest := s.engine.LatestState(sym) if latest == nil { http.Error(w, `{"error":"no data yet"}`, http.StatusServiceUnavailable) return @@ -198,7 +211,8 @@ func (s *Server) handleRaw(w http.ResponseWriter, r *http.Request) { } func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { - latest := s.engine.LatestState() + sym := s.resolveSymbol(r) + latest := s.engine.LatestState(sym) status := "ok" if latest == nil { diff --git a/internal/api/handlers_test.go b/internal/api/handlers_test.go index 76a43e8..8118733 100644 --- a/internal/api/handlers_test.go +++ b/internal/api/handlers_test.go @@ -69,7 +69,8 @@ func newMockEngine(state *domain.LatestState) *mockEngine { } } -func (m *mockEngine) LatestState() *domain.LatestState { return m.latestState } +func (m *mockEngine) LatestState(_ string) *domain.LatestState { return m.latestState } +func (m *mockEngine) Symbols() []string { return []string{"BTC/USD"} } func (m *mockEngine) SnapshotCh() <-chan domain.Snapshot1s { return m.snapshotCh } @@ -84,7 +85,7 @@ func testServer(store Store, eng Engine) *Server { wsPath: "/ws/price", db: store, engine: eng, - wsHub: NewWSHub(testLogger(), config.WSConfig{}, nil), + wsHub: NewWSHub(testLogger(), config.WSConfig{}, nil, []string{"BTC/USD"}), logger: testLogger(), settlementWindow: 5 * time.Second, minHealthySources: 2, diff --git a/internal/api/server.go b/internal/api/server.go index 6a68d0d..07d1246 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -24,7 +24,8 @@ type Store interface { // Engine abstracts the snapshot engine used by API handlers. type Engine interface { - LatestState() *domain.LatestState + LatestState(symbol string) *domain.LatestState + Symbols() []string SnapshotCh() <-chan domain.Snapshot1s TickCh() <-chan domain.CanonicalTick } @@ -56,7 +57,7 @@ func NewServer(httpAddr, wsPath string, wsCfg config.WSConfig, pricingCfg config wsPath: wsPath, db: normalizeStore(db), engine: eng, - wsHub: NewWSHub(logger, wsCfg, eng.LatestState), + wsHub: NewWSHub(logger, wsCfg, eng.LatestState, eng.Symbols()), logger: logger.With("component", "api"), settlementWindow: pricingCfg.SettlementReaggregationWindow(), minHealthySources: pricingCfg.MinimumHealthySources, @@ -147,6 +148,7 @@ func (s *Server) broadcastLoop(ctx context.Context) { } s.wsHub.Broadcast(WSMessage{ Type: "snapshot_1s", + Symbol: snap.CanonicalSymbol, TS: snap.TSSecond.Format(time.RFC3339Nano), Price: snap.CanonicalPrice.String(), Basis: snap.Basis, @@ -161,6 +163,7 @@ func (s *Server) broadcastLoop(ctx context.Context) { } s.wsHub.Broadcast(WSMessage{ Type: "latest_price", + Symbol: tick.CanonicalSymbol, TS: tick.TSEvent.Format(time.RFC3339Nano), Price: tick.CanonicalPrice.String(), Basis: tick.Basis, diff --git a/internal/api/websocket.go b/internal/api/websocket.go index 84d4c8c..5b2c9d2 100644 --- a/internal/api/websocket.go +++ b/internal/api/websocket.go @@ -19,6 +19,7 @@ import ( type WSMessage struct { Type string `json:"type"` Seq uint64 `json:"seq,omitempty"` + Symbol string `json:"symbol,omitempty"` TS string `json:"ts"` Price string `json:"price,omitempty"` Basis string `json:"basis,omitempty"` @@ -86,17 +87,19 @@ type WSHub struct { logger *slog.Logger wsCfg config.WSConfig seq atomic.Uint64 - getState func() *domain.LatestState + getState func(symbol string) *domain.LatestState + symbols []string marshal func(v any) ([]byte, error) // overridable for testing } type wsClient struct { - conn *websocket.Conn - sendCh chan []byte - closeOnce sync.Once - subs *subscriptions - dropCount atomic.Int64 - logger *slog.Logger + conn *websocket.Conn + sendCh chan []byte + closeOnce sync.Once + subs *subscriptions + dropCount atomic.Int64 + logger *slog.Logger + connectedAt time.Time } var upgrader = websocket.Upgrader{ @@ -105,12 +108,13 @@ var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } -func NewWSHub(logger *slog.Logger, wsCfg config.WSConfig, getState func() *domain.LatestState) *WSHub { +func NewWSHub(logger *slog.Logger, wsCfg config.WSConfig, getState func(string) *domain.LatestState, symbols []string) *WSHub { return &WSHub{ clients: make(map[*wsClient]struct{}), logger: logger.With("component", "ws_hub"), wsCfg: wsCfg, getState: getState, + symbols: symbols, } } @@ -154,6 +158,18 @@ func (h *WSHub) sendHandshake(conn *websocket.Conn, msg WSMessage) bool { } func (h *WSHub) HandleWS(w http.ResponseWriter, r *http.Request) { + // Early check before upgrading (avoids wasting an upgrade on a full hub). + h.mu.RLock() + currentCount := len(h.clients) + h.mu.RUnlock() + + if currentCount >= h.wsCfg.MaxClientCount() { + h.logger.Warn("ws connection rejected: max clients reached", "max", h.wsCfg.MaxClientCount()) + metrics.IncWSRejected() + http.Error(w, `{"error":"too many connections"}`, http.StatusServiceUnavailable) + return + } + conn, err := upgrader.Upgrade(w, r, nil) if err != nil { h.logger.Error("ws upgrade failed", "error", err) @@ -161,13 +177,22 @@ func (h *WSHub) HandleWS(w http.ResponseWriter, r *http.Request) { } client := &wsClient{ - conn: conn, - sendCh: make(chan []byte, h.wsCfg.SendBuffer()), - subs: newSubscriptions(), - logger: h.logger, + conn: conn, + sendCh: make(chan []byte, h.wsCfg.SendBuffer()), + subs: newSubscriptions(), + logger: h.logger, + connectedAt: time.Now().UTC(), } + // Atomically check limit and add under the same lock to prevent overcount. h.mu.Lock() + if len(h.clients) >= h.wsCfg.MaxClientCount() { + h.mu.Unlock() + h.logger.Warn("ws connection rejected: max clients reached (post-upgrade)", "max", h.wsCfg.MaxClientCount()) + metrics.IncWSRejected() + _ = conn.Close() + return + } h.clients[client] = struct{}{} clientCount := len(h.clients) h.mu.Unlock() @@ -192,11 +217,18 @@ func (h *WSHub) HandleWS(w http.ResponseWriter, r *http.Request) { return } - // Send initial state. + // Send initial state for each configured symbol. if h.getState != nil { - if state := h.getState(); state != nil { + sentAny := false + for _, sym := range h.symbols { + state := h.getState(sym) + if state == nil { + continue + } + sentAny = true initMsg := WSMessage{ Type: "latest_price", + Symbol: state.Symbol, TS: state.TS.Format(time.RFC3339Nano), Price: state.Price.String(), Basis: state.Basis, @@ -207,7 +239,7 @@ func (h *WSHub) HandleWS(w http.ResponseWriter, r *http.Request) { Message: "initial_state", } if !h.sendHandshake(conn, initMsg) { - h.logger.Warn("ws handshake failed: initial_state") + h.logger.Warn("ws handshake failed: initial_state", "symbol", sym) h.mu.Lock() delete(h.clients, client) clientCount = len(h.clients) @@ -216,7 +248,8 @@ func (h *WSHub) HandleWS(w http.ResponseWriter, r *http.Request) { _ = conn.Close() return } - } else { + } + if !sentAny { noData := WSMessage{ Type: "latest_price", TS: time.Now().UTC().Format(time.RFC3339Nano), @@ -333,9 +366,13 @@ func (h *WSHub) Broadcast(msg WSMessage) { return } - h.mu.RLock() - defer h.mu.RUnlock() + maxDrops := int64(h.wsCfg.SlowClientMaxDropCount()) + broadcastStart := time.Now() + + metrics.IncWSBroadcast() + h.mu.RLock() + var evict []*wsClient for c := range h.clients { if !c.subs.wants(msg.Type) { continue @@ -345,7 +382,9 @@ func (h *WSHub) Broadcast(msg WSMessage) { default: drops := c.dropCount.Add(1) metrics.IncWSDrop() - if drops%100 == 1 { + if drops >= maxDrops { + evict = append(evict, c) + } else if drops%100 == 1 { c.logger.Warn("ws client too slow, dropping message", "type", msg.Type, "total_drops", drops, @@ -353,4 +392,42 @@ func (h *WSHub) Broadcast(msg WSMessage) { } } } + h.mu.RUnlock() + + metrics.ObserveWSBroadcastDuration(time.Since(broadcastStart)) + + // Evict slow clients outside the read lock. + if len(evict) > 0 { + h.evictClients(evict) + } +} + +// evictClients forcefully disconnects slow clients that exceeded the drop threshold. +func (h *WSHub) evictClients(clients []*wsClient) { + h.mu.Lock() + for _, c := range clients { + if _, ok := h.clients[c]; !ok { + continue // already removed + } + delete(h.clients, c) + c.logger.Warn("evicting slow ws client", + "total_drops", c.dropCount.Load(), + "connected_for", time.Since(c.connectedAt).Round(time.Second), + ) + close(c.sendCh) + if c.conn != nil { + _ = c.conn.Close() + } + metrics.IncWSEvicted() + } + clientCount := len(h.clients) + h.mu.Unlock() + metrics.SetWSClients(clientCount) +} + +// ClientCount returns the current number of connected clients. +func (h *WSHub) ClientCount() int { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.clients) } diff --git a/internal/api/websocket_test.go b/internal/api/websocket_test.go index 9058e0e..5f01bf5 100644 --- a/internal/api/websocket_test.go +++ b/internal/api/websocket_test.go @@ -24,11 +24,11 @@ func testLogger() *slog.Logger { } func testHub() *WSHub { - return NewWSHub(testLogger(), config.WSConfig{}, func() *domain.LatestState { return nil }) + return NewWSHub(testLogger(), config.WSConfig{}, func(_ string) *domain.LatestState { return nil }, []string{"BTC/USD"}) } func testHubWithState(state *domain.LatestState) *WSHub { - return NewWSHub(testLogger(), config.WSConfig{}, func() *domain.LatestState { return state }) + return NewWSHub(testLogger(), config.WSConfig{}, func(_ string) *domain.LatestState { return state }, []string{"BTC/USD"}) } func mustSetReadDeadline(t *testing.T, conn *websocket.Conn, d time.Duration) { @@ -363,7 +363,7 @@ func TestWSHub_InitialState_NoData(t *testing.T) { func TestWSHub_InitialState_NilGetState(t *testing.T) { // getState callback is nil — no initial state should be sent - hub := NewWSHub(testLogger(), config.WSConfig{}, nil) + hub := NewWSHub(testLogger(), config.WSConfig{}, nil, []string{"BTC/USD"}) server := httptest.NewServer(http.HandlerFunc(hub.HandleWS)) defer server.Close() @@ -589,7 +589,7 @@ func TestWSHub_Heartbeat(t *testing.T) { wsCfg := config.WSConfig{ HeartbeatIntervalS: 1, // 1 second for test speed } - hub := NewWSHub(testLogger(), wsCfg, func() *domain.LatestState { return nil }) + hub := NewWSHub(testLogger(), wsCfg, func(_ string) *domain.LatestState { return nil }, []string{"BTC/USD"}) server := httptest.NewServer(http.HandlerFunc(hub.HandleWS)) defer server.Close() @@ -633,7 +633,7 @@ func TestWSHub_Heartbeat_Unsubscribe(t *testing.T) { wsCfg := config.WSConfig{ HeartbeatIntervalS: 1, } - hub := NewWSHub(testLogger(), wsCfg, func() *domain.LatestState { return nil }) + hub := NewWSHub(testLogger(), wsCfg, func(_ string) *domain.LatestState { return nil }, []string{"BTC/USD"}) server := httptest.NewServer(http.HandlerFunc(hub.HandleWS)) defer server.Close() @@ -724,7 +724,7 @@ func TestWSHub_PingPong(t *testing.T) { HeartbeatIntervalS: 60, // don't interfere ReadDeadlineS: 5, } - hub := NewWSHub(testLogger(), wsCfg, func() *domain.LatestState { return nil }) + hub := NewWSHub(testLogger(), wsCfg, func(_ string) *domain.LatestState { return nil }, []string{"BTC/USD"}) server := httptest.NewServer(http.HandlerFunc(hub.HandleWS)) defer server.Close() @@ -826,7 +826,10 @@ func TestWSHub_HighFrequencyBroadcast(t *testing.T) { t.Skip("skipping stress test in short mode") } - hub := testHub() + // Use a high drop limit to prevent slow-client eviction during the stress test. + hub := NewWSHub(testLogger(), config.WSConfig{ + SlowClientMaxDrops: 100000, + }, func(_ string) *domain.LatestState { return nil }, []string{"BTC/USD"}) server := httptest.NewServer(http.HandlerFunc(hub.HandleWS)) defer server.Close() @@ -1134,6 +1137,111 @@ func TestWSHub_BroadcastDuringClientChurn(t *testing.T) { // Test passes if no panic } +// ============================================================================= +// Connection Limit Tests +// ============================================================================= + +func TestWSHub_MaxConnectionLimit(t *testing.T) { + hub := NewWSHub(testLogger(), config.WSConfig{ + MaxClients: 3, + }, func(_ string) *domain.LatestState { return nil }, []string{"BTC/USD"}) + + server := httptest.NewServer(http.HandlerFunc(hub.HandleWS)) + defer server.Close() + + wsURL := "ws" + server.URL[4:] + + // Connect 3 clients (should succeed) + conns := make([]*websocket.Conn, 0, 3) + for i := 0; i < 3; i++ { + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("dial %d error: %v", i, err) + } + conns = append(conns, conn) + drainMessages(t, conn, 2) // welcome + no_data_yet + } + + time.Sleep(50 * time.Millisecond) + + if hub.ClientCount() != 3 { + t.Fatalf("expected 3 clients, got %d", hub.ClientCount()) + } + + // 4th connection should be rejected (HTTP 503) + _, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err == nil { + t.Fatal("expected 4th connection to be rejected") + } + if resp != nil && resp.StatusCode != http.StatusServiceUnavailable { + t.Errorf("expected 503, got %d", resp.StatusCode) + } + + // Close one, then 4th should succeed + _ = conns[0].Close() + time.Sleep(100 * time.Millisecond) + + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("dial after slot freed: %v", err) + } + _ = conn.Close() + + for _, c := range conns[1:] { + _ = c.Close() + } +} + +// ============================================================================= +// Slow Client Eviction Tests +// ============================================================================= + +func TestWSHub_SlowClientEviction(t *testing.T) { + hub := NewWSHub(testLogger(), config.WSConfig{ + SendBufferSize: 4, // tiny buffer + SlowClientMaxDrops: 10, // evict after 10 drops + }, func(_ string) *domain.LatestState { return nil }, []string{"BTC/USD"}) + + server := httptest.NewServer(http.HandlerFunc(hub.HandleWS)) + defer server.Close() + + wsURL := "ws" + server.URL[4:] + + // Connect slow client (don't read from it) + slowConn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("dial error: %v", err) + } + defer func() { _ = slowConn.Close() }() + + time.Sleep(50 * time.Millisecond) + + if hub.ClientCount() != 1 { + t.Fatalf("expected 1 client, got %d", hub.ClientCount()) + } + + // Broadcast enough messages to fill buffer and trigger eviction + for i := 0; i < 20; i++ { + hub.Broadcast(WSMessage{ + Type: "latest_price", + Price: "84100.00", + }) + } + + time.Sleep(100 * time.Millisecond) + + if hub.ClientCount() != 0 { + t.Errorf("expected slow client to be evicted, got %d clients", hub.ClientCount()) + } +} + +func TestWSHub_ClientCount(t *testing.T) { + hub := testHub() + if hub.ClientCount() != 0 { + t.Errorf("expected 0 clients, got %d", hub.ClientCount()) + } +} + // ============================================================================= // Helpers // ============================================================================= diff --git a/internal/config/config.go b/internal/config/config.go index bd9d1ce..2eb30c5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -7,14 +7,25 @@ import ( "gopkg.in/yaml.v3" ) +// SymbolConfig defines a tradable symbol and its exchange sources. +type SymbolConfig struct { + Canonical string `yaml:"canonical"` + Sources []SourceConfig `yaml:"sources"` +} + type Config struct { + // Multi-symbol configuration (preferred). + Symbols []SymbolConfig `yaml:"symbols"` + + // Legacy single-symbol fields — migrated into Symbols on load. CanonicalSymbol string `yaml:"canonical_symbol"` - Server ServerConfig `yaml:"server"` - Database DatabaseConfig `yaml:"database"` Sources []SourceConfig `yaml:"sources"` - Pricing PricingConfig `yaml:"pricing"` - Storage StorageConfig `yaml:"storage"` - Health HealthConfig `yaml:"health"` + + Server ServerConfig `yaml:"server"` + Database DatabaseConfig `yaml:"database"` + Pricing PricingConfig `yaml:"pricing"` + Storage StorageConfig `yaml:"storage"` + Health HealthConfig `yaml:"health"` } type ServerConfig struct { @@ -28,6 +39,8 @@ type WSConfig struct { HeartbeatIntervalS int `yaml:"heartbeat_interval_sec"` PingIntervalS int `yaml:"ping_interval_sec"` ReadDeadlineS int `yaml:"read_deadline_sec"` + MaxClients int `yaml:"max_clients"` + SlowClientMaxDrops int `yaml:"slow_client_max_drops"` } func (w WSConfig) SendBuffer() int { @@ -58,6 +71,20 @@ func (w WSConfig) ReadDeadline() time.Duration { return time.Duration(w.ReadDeadlineS) * time.Second } +func (w WSConfig) MaxClientCount() int { + if w.MaxClients <= 0 { + return 1000 + } + return w.MaxClients +} + +func (w WSConfig) SlowClientMaxDropCount() int { + if w.SlowClientMaxDrops <= 0 { + return 500 + } + return w.SlowClientMaxDrops +} + type DatabaseConfig struct { DSN string `yaml:"dsn"` MaxConns int32 `yaml:"max_conns"` @@ -203,6 +230,18 @@ func Load(path string) (*Config, error) { if dbURL := os.Getenv("DATABASE_URL"); dbURL != "" { cfg.Database.DSN = dbURL } + + // Backward compat: migrate legacy single-symbol config into Symbols slice. + if len(cfg.Symbols) == 0 && len(cfg.Sources) > 0 { + canonical := cfg.CanonicalSymbol + if canonical == "" { + canonical = "BTC/USD" + } + cfg.Symbols = []SymbolConfig{{ + Canonical: canonical, + Sources: cfg.Sources, + }} + } if cfg.Server.HTTPAddr == "" { cfg.Server.HTTPAddr = ":8080" } diff --git a/internal/engine/benchmark_test.go b/internal/engine/benchmark_test.go index 2b1851c..f4b45d2 100644 --- a/internal/engine/benchmark_test.go +++ b/internal/engine/benchmark_test.go @@ -15,6 +15,10 @@ import ( "github.com/justar9/btick/internal/normalizer" ) +// ============================================================================= +// Core Tick Emission Benchmarks +// ============================================================================= + func BenchmarkEmitTick(b *testing.B) { eng := newBenchmarkEngine() events := []domain.RawEvent{ @@ -35,6 +39,193 @@ func BenchmarkEmitTick(b *testing.B) { } } +func BenchmarkEmitTick_SamePrice(b *testing.B) { + eng := newBenchmarkEngine() + evt := benchmarkTradeEvent("binance-same", decimal.NewFromInt(84_200)) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + eng.updateVenueState(evt) + // Same price → no tick emitted (deduplicated), drain if present + select { + case <-eng.TickCh(): + default: + } + } +} + +// ============================================================================= +// Median Computation Benchmarks +// ============================================================================= + +func BenchmarkComputeMedian_3Venues(b *testing.B) { + prices := []decimal.Decimal{ + decimal.NewFromInt(84_000), + decimal.NewFromInt(84_150), + decimal.NewFromInt(84_300), + } + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + computeMedian(prices) + } +} + +func BenchmarkComputeMedian_10Venues(b *testing.B) { + prices := make([]decimal.Decimal, 10) + for i := range prices { + prices[i] = decimal.NewFromInt(int64(84_000 + i*50)) + } + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + computeMedian(prices) + } +} + +func BenchmarkComputeMedian_50Venues(b *testing.B) { + prices := make([]decimal.Decimal, 50) + for i := range prices { + prices[i] = decimal.NewFromInt(int64(84_000 + i*10)) + } + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + computeMedian(prices) + } +} + +// ============================================================================= +// Canonical Computation Benchmarks (includes outlier rejection) +// ============================================================================= + +func BenchmarkComputeCanonical_4Venues(b *testing.B) { + eng := &SnapshotEngine{} + eng.cfg.Mode = "multi_venue_median" + eng.cfg.MinimumHealthySources = 2 + eng.cfg.OutlierRejectPct = 1.0 + + refs := []domain.VenueRefPrice{ + {Source: "binance", RefPrice: decimal.NewFromInt(84_100), Basis: "trade", AgeMs: 50}, + {Source: "coinbase", RefPrice: decimal.NewFromInt(84_200), Basis: "trade", AgeMs: 80}, + {Source: "kraken", RefPrice: decimal.NewFromInt(84_150), Basis: "trade", AgeMs: 100}, + {Source: "okx", RefPrice: decimal.NewFromInt(84_180), Basis: "trade", AgeMs: 60}, + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + eng.computeCanonical(refs) + } +} + +func BenchmarkComputeCanonical_WithOutlier(b *testing.B) { + eng := &SnapshotEngine{} + eng.cfg.Mode = "multi_venue_median" + eng.cfg.MinimumHealthySources = 2 + eng.cfg.OutlierRejectPct = 1.0 + + refs := []domain.VenueRefPrice{ + {Source: "binance", RefPrice: decimal.NewFromInt(84_100), Basis: "trade", AgeMs: 50}, + {Source: "coinbase", RefPrice: decimal.NewFromInt(84_200), Basis: "trade", AgeMs: 80}, + {Source: "kraken", RefPrice: decimal.NewFromInt(84_150), Basis: "trade", AgeMs: 100}, + {Source: "okx", RefPrice: decimal.NewFromInt(90_000), Basis: "trade", AgeMs: 60}, // outlier + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + eng.computeCanonical(refs) + } +} + +func BenchmarkComputeCanonical_MixedBasis(b *testing.B) { + eng := &SnapshotEngine{} + eng.cfg.Mode = "multi_venue_median" + eng.cfg.MinimumHealthySources = 2 + eng.cfg.OutlierRejectPct = 1.0 + + refs := []domain.VenueRefPrice{ + {Source: "binance", RefPrice: decimal.NewFromInt(84_100), Basis: "trade", AgeMs: 50}, + {Source: "coinbase", RefPrice: decimal.NewFromInt(84_200), Basis: "midpoint", AgeMs: 80}, + {Source: "kraken", RefPrice: decimal.NewFromInt(84_150), Basis: "trade", AgeMs: 100}, + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + eng.computeCanonical(refs) + } +} + +// ============================================================================= +// Venue Reference Computation Benchmarks +// ============================================================================= + +func BenchmarkComputeVenueRefs_4Sources(b *testing.B) { + eng := newBenchmarkEngineNVenues(4) + now := time.Now().UTC() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + eng.computeVenueRefs(now) + } +} + +func BenchmarkComputeVenueRefs_10Sources(b *testing.B) { + eng := newBenchmarkEngineNVenues(10) + now := time.Now().UTC() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + eng.computeVenueRefs(now) + } +} + +// ============================================================================= +// Quality Score Benchmarks +// ============================================================================= + +func BenchmarkComputeQuality_Fresh(b *testing.B) { + eng := &SnapshotEngine{} + eng.cfg.CarryForwardMaxSeconds = 10 + + refs := []domain.VenueRefPrice{ + {Source: "binance", Basis: "trade", AgeMs: 50}, + {Source: "coinbase", Basis: "trade", AgeMs: 80}, + {Source: "kraken", Basis: "trade", AgeMs: 100}, + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + eng.computeQuality(refs, false, 0) + } +} + +func BenchmarkComputeQuality_Stale(b *testing.B) { + eng := &SnapshotEngine{} + eng.cfg.CarryForwardMaxSeconds = 10 + + refs := []domain.VenueRefPrice{ + {Source: "binance", Basis: "trade", AgeMs: 50}, + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + eng.computeQuality(refs, true, 5) + } +} + +// ============================================================================= +// Full Pipeline Benchmarks +// ============================================================================= + func BenchmarkPipeline(b *testing.B) { rawCh := make(chan domain.RawEvent, 1) normalizedCh := make(chan domain.RawEvent, 1) @@ -43,7 +234,7 @@ func BenchmarkPipeline(b *testing.B) { defer cancel() logger := benchmarkDiscardLogger() - n := normalizer.New(rawCh, normalizedCh, logger) + n := normalizer.New(rawCh, normalizedCh, "BTC/USD", []string{"binance"}, logger) eng := newBenchmarkEngine() normalizerDone := make(chan struct{}) @@ -79,6 +270,141 @@ func BenchmarkPipeline(b *testing.B) { <-bridgeDone } +func BenchmarkPipeline_HighVolume(b *testing.B) { + rawCh := make(chan domain.RawEvent, 256) + normalizedCh := make(chan domain.RawEvent, 256) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + logger := benchmarkDiscardLogger() + n := normalizer.New(rawCh, normalizedCh, "BTC/USD", []string{"binance", "coinbase", "kraken", "okx"}, logger) + eng := newBenchmarkEngine() + + normalizerDone := make(chan struct{}) + go func() { + defer close(normalizerDone) + n.Run(ctx) + }() + + bridgeDone := make(chan struct{}) + go func() { + defer close(bridgeDone) + for { + select { + case <-ctx.Done(): + return + case evt, ok := <-normalizedCh: + if !ok { + return + } + eng.updateVenueState(evt) + } + } + }() + + sources := []string{"binance", "coinbase", "kraken", "okx"} + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + src := sources[i%len(sources)] + price := decimal.NewFromInt(int64(84_000 + (i%10)*50)) + now := time.Now().UTC() + rawCh <- domain.RawEvent{ + Source: src, + SymbolNative: "BTCUSDT", + EventType: "trade", + ExchangeTS: now, + RecvTS: now.Add(10 * time.Millisecond), + Price: price, + TradeID: src + "-" + strconv.Itoa(i), + } + // Drain ticks — not every event produces one (price dedup) + select { + case <-eng.TickCh(): + default: + } + } + + b.StopTimer() + cancel() + <-normalizerDone + <-bridgeDone +} + +// ============================================================================= +// UpdateVenueState Benchmarks (lock contention) +// ============================================================================= + +func BenchmarkUpdateVenueState_Trade(b *testing.B) { + eng := newBenchmarkEngine() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + evt := domain.RawEvent{ + Source: "binance", + EventType: "trade", + ExchangeTS: time.Now().UTC(), + RecvTS: time.Now().UTC(), + Price: decimal.NewFromInt(int64(84_000 + i%100)), + TradeID: "t-" + strconv.Itoa(i), + } + eng.updateVenueState(evt) + // Drain tick channel + select { + case <-eng.TickCh(): + default: + } + } +} + +func BenchmarkUpdateVenueState_Ticker(b *testing.B) { + eng := newBenchmarkEngine() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + evt := domain.RawEvent{ + Source: "binance", + EventType: "ticker", + ExchangeTS: time.Now().UTC(), + RecvTS: time.Now().UTC(), + Bid: decimal.NewFromInt(int64(84_000 + i%100)), + Ask: decimal.NewFromInt(int64(84_010 + i%100)), + } + eng.updateVenueState(evt) + } +} + +// ============================================================================= +// LatestState Contention Benchmark +// ============================================================================= + +func BenchmarkLatestState_ReadContention(b *testing.B) { + eng := newBenchmarkEngine() + // Seed a state + eng.updateVenueState(benchmarkTradeEvent("seed", decimal.NewFromInt(84_100))) + select { + case <-eng.TickCh(): + default: + } + + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _ = eng.LatestState() + } + }) +} + +// ============================================================================= +// Helpers +// ============================================================================= + func newBenchmarkEngine() *SnapshotEngine { cfg := config.PricingConfig{ Mode: "multi_venue_median", @@ -107,6 +433,31 @@ func newBenchmarkEngine() *SnapshotEngine { return eng } +func newBenchmarkEngineNVenues(n int) *SnapshotEngine { + cfg := config.PricingConfig{ + Mode: "multi_venue_median", + MinimumHealthySources: 2, + TradeFreshnessWindowMs: int((30 * time.Minute) / time.Millisecond), + QuoteFreshnessWindowMs: int((30 * time.Minute) / time.Millisecond), + OutlierRejectPct: 1.0, + CarryForwardMaxSeconds: 10, + } + + now := time.Now().UTC() + eng := NewSnapshotEngine(cfg, "BTC/USD", time.Hour, nil, nil, benchmarkDiscardLogger()) + for i := 0; i < n; i++ { + name := "venue-" + strconv.Itoa(i) + eng.venueStates[name] = &venueState{ + lastTrade: &tradeInfo{ + price: decimal.NewFromInt(int64(84_000 + i*50)), + ts: now, + }, + } + } + + return eng +} + func benchmarkTradeEvent(tradeID string, price decimal.Decimal) domain.RawEvent { now := time.Now().UTC() return domain.RawEvent{ diff --git a/internal/engine/multi.go b/internal/engine/multi.go new file mode 100644 index 0000000..84c798e --- /dev/null +++ b/internal/engine/multi.go @@ -0,0 +1,80 @@ +package engine + +import ( + "sync" + + "github.com/justar9/btick/internal/domain" +) + +// MultiEngine aggregates multiple per-symbol SnapshotEngines behind a single +// interface that the API server can consume. +type MultiEngine struct { + engines map[string]*SnapshotEngine // keyed by canonical symbol + symbols []string // ordered list of symbols + + snapshotCh chan domain.Snapshot1s + tickCh chan domain.CanonicalTick +} + +// NewMultiEngine creates a MultiEngine from a set of per-symbol engines. +// It merges their snapshot and tick channels into single output channels. +func NewMultiEngine(engines map[string]*SnapshotEngine, symbols []string) *MultiEngine { + m := &MultiEngine{ + engines: engines, + symbols: symbols, + snapshotCh: make(chan domain.Snapshot1s, 100*len(engines)), + tickCh: make(chan domain.CanonicalTick, 1000*len(engines)), + } + + // Merge per-engine channels into the combined output channels. + var wg sync.WaitGroup + for _, eng := range engines { + e := eng + wg.Add(2) + go func() { + defer wg.Done() + for snap := range e.SnapshotCh() { + m.snapshotCh <- snap + } + }() + go func() { + defer wg.Done() + for tick := range e.TickCh() { + m.tickCh <- tick + } + }() + } + + // Close merged channels once all sources are done. + go func() { + wg.Wait() + close(m.snapshotCh) + close(m.tickCh) + }() + + return m +} + +// LatestState returns the latest price state for the given symbol. +// Returns nil if symbol is unknown or no data yet. +func (m *MultiEngine) LatestState(symbol string) *domain.LatestState { + if eng, ok := m.engines[symbol]; ok { + return eng.LatestState() + } + return nil +} + +// Symbols returns the ordered list of configured canonical symbols. +func (m *MultiEngine) Symbols() []string { + return m.symbols +} + +// SnapshotCh returns the merged snapshot channel for all symbols. +func (m *MultiEngine) SnapshotCh() <-chan domain.Snapshot1s { + return m.snapshotCh +} + +// TickCh returns the merged tick channel for all symbols. +func (m *MultiEngine) TickCh() <-chan domain.CanonicalTick { + return m.tickCh +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 49cc4b9..ea3dc2e 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -20,10 +20,14 @@ type registry struct { wsClientsGauge atomic.Int64 wsDropsTotal atomic.Uint64 + wsEvictedTotal atomic.Uint64 + wsRejectedTotal atomic.Uint64 + wsBroadcastsTotal atomic.Uint64 snapshotLagSeconds atomic.Uint64 writerFlushDuration histogram writerBatchSize histogram pipelineLatencyMillis histogram + wsBroadcastDuration histogram } type histogram struct { @@ -49,6 +53,10 @@ func newRegistry() *registry { buckets: []float64{5, 10, 25, 50, 100, 250, 500, 1000, 5000}, counts: make([]uint64, 9), }, + wsBroadcastDuration: histogram{ + buckets: []float64{0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5}, + counts: make([]uint64, 8), + }, } } @@ -80,6 +88,22 @@ func IncWSDrop() { defaultRegistry.wsDropsTotal.Add(1) } +func IncWSEvicted() { + defaultRegistry.wsEvictedTotal.Add(1) +} + +func IncWSRejected() { + defaultRegistry.wsRejectedTotal.Add(1) +} + +func IncWSBroadcast() { + defaultRegistry.wsBroadcastsTotal.Add(1) +} + +func ObserveWSBroadcastDuration(d time.Duration) { + defaultRegistry.wsBroadcastDuration.observe(d.Seconds()) +} + func Handler() http.Handler { return defaultRegistry } @@ -126,6 +150,25 @@ func (r *registry) ServeHTTP(w http.ResponseWriter, req *http.Request) { "Total dropped WebSocket broadcast messages.", float64(r.wsDropsTotal.Load()), ) + r.writeCounter(&b, + "btick_ws_evicted_total", + "Total WebSocket clients evicted for being too slow.", + float64(r.wsEvictedTotal.Load()), + ) + r.writeCounter(&b, + "btick_ws_rejected_total", + "Total WebSocket connections rejected due to max client limit.", + float64(r.wsRejectedTotal.Load()), + ) + r.writeCounter(&b, + "btick_ws_broadcasts_total", + "Total WebSocket broadcast operations.", + float64(r.wsBroadcastsTotal.Load()), + ) + r.wsBroadcastDuration.writePrometheus(&b, + "btick_ws_broadcast_duration_seconds", + "Duration of WebSocket broadcast operations in seconds.", + ) _, _ = w.Write([]byte(b.String())) } diff --git a/internal/normalizer/normalizer.go b/internal/normalizer/normalizer.go index 07cbd10..a3873a8 100644 --- a/internal/normalizer/normalizer.go +++ b/internal/normalizer/normalizer.go @@ -24,23 +24,27 @@ type dedupShard struct { // Normalizer receives raw events from adapters, assigns UUIDs, deduplicates, and forwards. type Normalizer struct { - inCh <-chan domain.RawEvent - outCh chan<- domain.RawEvent - logger *slog.Logger + inCh <-chan domain.RawEvent + outCh chan<- domain.RawEvent + canonicalSymbol string + logger *slog.Logger // Dedup: per-source bounded LRU of (source, trade_id) shards sync.Map // map[string]*dedupShard maxSeen int } -func New(inCh <-chan domain.RawEvent, outCh chan<- domain.RawEvent, logger *slog.Logger) *Normalizer { +// New creates a Normalizer. canonicalSymbol is stamped on every outgoing event. +// Pass source names to pre-initialize dedup shards for those sources. +func New(inCh <-chan domain.RawEvent, outCh chan<- domain.RawEvent, canonicalSymbol string, sources []string, logger *slog.Logger) *Normalizer { n := &Normalizer{ - inCh: inCh, - outCh: outCh, - logger: logger.With("component", "normalizer"), - maxSeen: defaultMaxSeen, + inCh: inCh, + outCh: outCh, + canonicalSymbol: canonicalSymbol, + logger: logger.With("component", "normalizer", "symbol", canonicalSymbol), + maxSeen: defaultMaxSeen, } - for _, source := range [...]string{"binance", "coinbase", "kraken", "okx"} { + for _, source := range sources { n.shards.Store(source, newDedupShard(n.maxSeen)) } return n @@ -74,8 +78,8 @@ func (n *Normalizer) process(evt domain.RawEvent) { } } - // Map symbol to canonical - evt.SymbolCanonical = mapCanonicalSymbol(evt.Source, evt.SymbolNative) + // Stamp canonical symbol + evt.SymbolCanonical = n.canonicalSymbol select { case n.outCh <- evt: @@ -143,22 +147,3 @@ func dedupSource(key string) string { return key } -func mapCanonicalSymbol(source, native string) string { - // All our configured sources map to BTC/USD - switch source { - case "binance": - // btcusdt -> BTC/USD (treating USDT ≈ USD for canonical purposes) - return "BTC/USD" - case "coinbase": - // BTC-USD -> BTC/USD - return "BTC/USD" - case "kraken": - // BTC/USD -> BTC/USD - return "BTC/USD" - case "okx": - // BTC-USDT -> BTC/USD (treating USDT ≈ USD for canonical purposes) - return "BTC/USD" - default: - return "BTC/USD" - } -} diff --git a/internal/normalizer/normalizer_test.go b/internal/normalizer/normalizer_test.go index 703fbd1..7f51780 100644 --- a/internal/normalizer/normalizer_test.go +++ b/internal/normalizer/normalizer_test.go @@ -21,7 +21,7 @@ func testLogger() *slog.Logger { func TestNormalizer_AssignsUUID(t *testing.T) { inCh := make(chan domain.RawEvent, 1) outCh := make(chan domain.RawEvent, 1) - n := New(inCh, outCh, testLogger()) + n := New(inCh, outCh, "BTC/USD", []string{"binance", "coinbase", "kraken", "okx"}, testLogger()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -54,7 +54,7 @@ func TestNormalizer_AssignsUUID(t *testing.T) { func TestNormalizer_DeduplicatesTrades(t *testing.T) { inCh := make(chan domain.RawEvent, 10) outCh := make(chan domain.RawEvent, 10) - n := New(inCh, outCh, testLogger()) + n := New(inCh, outCh, "BTC/USD", []string{"binance", "coinbase", "kraken", "okx"}, testLogger()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -91,7 +91,7 @@ func TestNormalizer_DeduplicatesTrades(t *testing.T) { func TestNormalizer_DifferentSourcesSameTradeID(t *testing.T) { inCh := make(chan domain.RawEvent, 10) outCh := make(chan domain.RawEvent, 10) - n := New(inCh, outCh, testLogger()) + n := New(inCh, outCh, "BTC/USD", []string{"binance", "coinbase", "kraken", "okx"}, testLogger()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -134,7 +134,7 @@ loop1: func TestNormalizer_TickerEventsNotDeduplicated(t *testing.T) { inCh := make(chan domain.RawEvent, 10) outCh := make(chan domain.RawEvent, 10) - n := New(inCh, outCh, testLogger()) + n := New(inCh, outCh, "BTC/USD", []string{"binance", "coinbase", "kraken", "okx"}, testLogger()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -172,20 +172,17 @@ func TestNormalizer_MapsCanonicalSymbol(t *testing.T) { tests := []struct { source string nativeSymbol string - wantCanon string + canonical string }{ {"binance", "BTCUSDT", "BTC/USD"}, - {"coinbase", "BTC-USD", "BTC/USD"}, - {"kraken", "BTC/USD", "BTC/USD"}, - {"okx", "BTC-USDT", "BTC/USD"}, - {"unknown", "XYZ", "BTC/USD"}, + {"coinbase", "ETH-USD", "ETH/USD"}, } for _, tt := range tests { - t.Run(tt.source, func(t *testing.T) { + t.Run(tt.source+"_"+tt.canonical, func(t *testing.T) { inCh := make(chan domain.RawEvent, 1) outCh := make(chan domain.RawEvent, 1) - n := New(inCh, outCh, testLogger()) + n := New(inCh, outCh, tt.canonical, []string{tt.source}, testLogger()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -203,8 +200,8 @@ func TestNormalizer_MapsCanonicalSymbol(t *testing.T) { select { case out := <-outCh: - if out.SymbolCanonical != tt.wantCanon { - t.Errorf("expected canonical %s, got %s", tt.wantCanon, out.SymbolCanonical) + if out.SymbolCanonical != tt.canonical { + t.Errorf("expected canonical %s, got %s", tt.canonical, out.SymbolCanonical) } case <-time.After(100 * time.Millisecond): t.Fatal("timeout") @@ -270,7 +267,7 @@ func TestNormalizer_RingBufferEviction(t *testing.T) { func TestNormalizer_ContextCancellation(t *testing.T) { inCh := make(chan domain.RawEvent, 1) outCh := make(chan domain.RawEvent, 1) - n := New(inCh, outCh, testLogger()) + n := New(inCh, outCh, "BTC/USD", []string{"binance", "coinbase", "kraken", "okx"}, testLogger()) ctx, cancel := context.WithCancel(context.Background()) @@ -293,7 +290,7 @@ func TestNormalizer_ContextCancellation(t *testing.T) { func TestNormalizer_InputChannelClosed(t *testing.T) { inCh := make(chan domain.RawEvent, 1) outCh := make(chan domain.RawEvent, 1) - n := New(inCh, outCh, testLogger()) + n := New(inCh, outCh, "BTC/USD", []string{"binance", "coinbase", "kraken", "okx"}, testLogger()) ctx := context.Background() @@ -316,7 +313,7 @@ func TestNormalizer_InputChannelClosed(t *testing.T) { func TestNormalizer_OutputChannelFull(t *testing.T) { inCh := make(chan domain.RawEvent, 10) outCh := make(chan domain.RawEvent, 1) // small buffer - n := New(inCh, outCh, testLogger()) + n := New(inCh, outCh, "BTC/USD", []string{"binance", "coinbase", "kraken", "okx"}, testLogger()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -354,7 +351,7 @@ func TestNormalizer_OutputChannelFull(t *testing.T) { func TestNormalizer_ConcurrentProcessing(t *testing.T) { inCh := make(chan domain.RawEvent, 1000) outCh := make(chan domain.RawEvent, 1000) - n := New(inCh, outCh, testLogger()) + n := New(inCh, outCh, "BTC/USD", []string{"binance", "coinbase", "kraken", "okx"}, testLogger()) ctx, cancel := context.WithCancel(context.Background()) @@ -405,29 +402,6 @@ done: } } -func TestMapCanonicalSymbol(t *testing.T) { - tests := []struct { - source string - native string - want string - }{ - {"binance", "BTCUSDT", "BTC/USD"}, - {"binance", "btcusdt", "BTC/USD"}, - {"coinbase", "BTC-USD", "BTC/USD"}, - {"kraken", "BTC/USD", "BTC/USD"}, - {"okx", "BTC-USDT", "BTC/USD"}, - {"unknown", "ANYTHING", "BTC/USD"}, - } - - for _, tt := range tests { - t.Run(tt.source+"_"+tt.native, func(t *testing.T) { - got := mapCanonicalSymbol(tt.source, tt.native) - if got != tt.want { - t.Errorf("mapCanonicalSymbol(%s, %s) = %s, want %s", tt.source, tt.native, got, tt.want) - } - }) - } -} func TestIsDuplicate(t *testing.T) { n := &Normalizer{