Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,38 @@ All notable changes to Augur are recorded in this file. Format follows [Keep a C

## [Unreleased]

### Added — Distributed Runtime Scaffolding

- `src/augur_signals/bus/base.py` — byte-level `EventBus` protocol plus `BusMessage` envelope. The Phase 1 `InProcessAsyncBus` remains the monolith transport; the new protocol is consumed by multi-process workers via the factory in `bus/factory.py`.
- `src/augur_signals/bus/nats.py` — NATS JetStream adapter. One stream per subject prefix, pull consumers keyed by `(pattern, consumer_group)`, publish with optional headers. `NATSKVLock` uses JetStream KV as the distributed-lock backend.
- `src/augur_signals/bus/redis_streams.py` — Redis Streams adapter with XADD/XREADGROUP/XACK for at-least-once delivery. `RedisLock` uses `SET NX EX` for acquire and `WATCH`/`MULTI`/`EXEC` for CAS renew/release so the adapter works against fakeredis and Redis Cluster alike.
- `src/augur_signals/bus/_lock.py` — `DistributedLock` protocol plus `InMemoryLock` reference implementation with an injectable monotonic clock for deterministic test failover.
- `src/augur_signals/storage/timescaledb_store.py` — TimescaleDB adapter mirroring `DuckDBStore`'s public surface. `initialize()` creates hypertables with configurable chunk intervals, compression segment-by clauses, and retention policies; zero-day values skip the policy.
- `src/augur_signals/storage/factory.py` — picks between DuckDB and TimescaleDB backends via `config/storage.toml` `backend.kind`.
- `src/augur_signals/_observability.py` — Prometheus-backed counters/gauges and an OpenTelemetry OTLP tracer behind the same shim call sites Phase 1 already instruments. Tests pass a fresh `CollectorRegistry` per case for isolation.
- `src/augur_signals/workers/harness.py` — `WorkerHarness` supervisor. Connects the bus, fires the heartbeat task, drives the worker main coroutine, handles SIGINT / SIGTERM, and records `augur_worker_alive` / `augur_worker_processed_total` metrics.
- `src/augur_signals/workers/stateless.py` — `run_bridge` consumer/transform/publisher spine shared by feature, detector, manipulation, calibration, and context-format workers. Shard filter uses FNV-1a modulo replica count for per-market pinning.
- `src/augur_signals/workers/singleton.py` — `SingletonRunner` with `SingletonHeartbeat` renewing a `DistributedLock` on every beat. Lost renewals stop the harness and trigger orchestrator-driven restart, which re-enters the acquire loop so the surviving replica takes over.
- `src/augur_signals/workers/poller.py` / `subjects.py` / `sharding.py` — platform-poller entrypoint, subject naming helpers aligned with `.docs/phase-5-scaling.md §4.3`, and the shared shard-index function.
- `config/storage.toml`, `config/bus.toml`, `config/observability.toml` with `StorageConfig`, `BusConfig`, `ObservabilityConfig` Pydantic loaders (`frozen=True`, `extra="forbid"`).
- `scripts/migrate_to_timescale.py` — `backfill` and `verify` subcommands for Parquet-to-TimescaleDB migration with row-count parity enforcement.
- `scripts/dual_write_sidecar.py` — tee consumer that replays engine writes into TimescaleDB during the dual-write window with `augur_dual_write_lag_seconds` gauge + alert counter.
- `ops/docker/Dockerfile` — multi-stage image shared across worker kinds; Kubernetes manifests under `ops/deploy/` (namespace, ConfigMap, Secret, pollers, stateless worker Deployments, singleton StatefulSets, Services, HPAs, ServiceMonitor, Kustomize overlay).
- `augur-signals` gains optional-dependency groups `bus-nats`, `bus-redis`, `storage-timescale`, `observability`, and `distributed` so the monolith wheel stays lean and the multi-process deployment pulls the full driver set.

### Operational Handoff — Distributed Runtime

After merge, the Phase 1-4 monolith remains the production deployment. Cutover to the multi-process runtime is operator-driven once the growth triggers in `.docs/phase-5-scaling.md §2` fire twice across separate measurement windows:

1. Stand up TimescaleDB; run `scripts/migrate_to_timescale.py backfill --from labels/snapshots_archive`, then `verify` for byte-for-byte parity.
2. Start the dual-write sidecar; observe `augur_dual_write_lag_seconds` for ≥7 days below the 10-second threshold.
3. Deploy the message bus (NATS or Redis) and bring up shadow workers (consume only, no publish).
4. Flip workers to active mode one kind at a time, starting with manipulation (smallest blast radius), then feature/detector (per-market shard validation), then the dedup and LLM singletons.
5. Flip `config/storage.toml` `backend.kind` to `timescaledb` and restart the engine; retain the DuckDB archive for 30 days for rollback.
6. After 30 days of stable operation, remove the DuckDB startup path and archive the Parquet archive to cold storage.

Live failover integration tests (NATS cluster, Redis Cluster, TimescaleDB with WAL streaming) remain operator-owned — the CI suite exercises the adapters against fakes and stubs. `ops/deploy/` manifests are a starting point; a production rollout layers operator-specific ingress, RBAC, and network policy on top.

### Added — Gated LLM Secondary Formatter

- `src/augur_format/llm/` package — the only location in the codebase where LLM SDK imports live, complementing the CI grep guard over `src/augur_signals/`.
Expand Down
43 changes: 43 additions & 0 deletions config/bus.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Message bus configuration.
#
# The default backend is "memory" — the single-process InProcessAsyncBus
# used by the monolith engine. Operators flip to "nats" or "redis" when
# they deploy the multi-process runtime; see `.docs/phase-5-scaling.md
# §4` for the decision matrix and operational tradeoffs.

[backend]
# One of: "memory" | "nats" | "redis".
kind = "memory"
# Used by the in-process bus; ignored for nats/redis.
capacity = 256

[nats]
servers = ["nats://localhost:4222"]
# File containing nats credentials. The env var is read at startup;
# the file path inside the env var is opened by the nats-py client.
credentials_file_env = "NATS_CREDENTIALS_FILE"
# JetStream stream name. A single stream carries every augur.* subject.
stream_name = "augur"
# JetStream replication factor. Production clusters run at 3; a
# single-node dev cluster accepts 1.
replication_factor = 3
# Subject prefix for every Augur topic. Downstream subjects follow
# the §4.3 layout: <prefix>.snapshots.<platform>.<market_id>, etc.
subject_prefix = "augur"

[redis]
url_env = "REDIS_URL"
# XADD MAXLEN target per stream. Oldest entries are trimmed past this.
stream_max_length = 100000
# Consumer groups are named "<prefix>.<topic>".
consumer_group_prefix = "augur"
# XREAD block timeout. The consumer loop unblocks at this cadence
# so graceful shutdown observes stop signals without delay.
block_ms = 1000

[lock]
# Distributed-lock TTL used by active-passive singleton workers.
# The active holder renews the lock every renew_interval_seconds; if
# renewal misses three intervals the passive peer takes over.
ttl_seconds = 30
renew_interval_seconds = 10
28 changes: 28 additions & 0 deletions config/observability.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Observability backend configuration.
#
# Phase 1-4 used no-op shims; Phase 5 replaces them with
# prometheus-client counters/gauges and an OpenTelemetry tracer. Call
# sites are unchanged. When the kind fields below are "disabled" the
# backends stay silent — useful for unit tests and backtest runs.

[metrics]
# One of: "disabled" | "prometheus".
kind = "prometheus"
# Bound address for the /metrics HTTP endpoint. The worker-harness
# start-up path opens the listener before running subscriptions.
prometheus_bind = "0.0.0.0"
prometheus_port = 9090

[traces]
# One of: "disabled" | "otlp".
kind = "otlp"
otlp_endpoint = "http://otel-collector:4317"
service_name = "augur"
# Fraction of spans sampled. At 0.0 the tracer is wired but records
# no spans; at 1.0 every span is recorded. 0.1 is the per-service
# default recommended in `.docs/phase-5-scaling.md §7.2`.
sampling_ratio = 0.1

[logs]
level = "INFO"
format = "json"
47 changes: 47 additions & 0 deletions config/storage.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Storage backend configuration.
#
# The Phase 1-4 monolith runs with backend.kind = "duckdb" and the
# local Parquet archive. Cutover to TimescaleDB flips backend.kind
# to "timescaledb" and restarts the engine; rollback flips it back.
# See `.docs/phase-5-scaling.md §5` for the cutover procedure.

[backend]
# One of: "duckdb" | "timescaledb".
kind = "duckdb"
# Path used when kind == "duckdb".
duckdb_path = "data/augur.duckdb"
# Env var holding the PostgreSQL DSN when kind == "timescaledb".
timescale_url_env = "AUGUR_TIMESCALE_URL"

[connection]
pool_size = 20
max_overflow = 10
pool_timeout_seconds = 30

[migration]
parquet_archive_root = "labels/snapshots_archive"
# Dual-write sidecar alerts when the observed per-table lag exceeds
# this threshold. The 7-day dual-write window lives operationally;
# the sidecar surfaces breaches but does not auto-rollback.
dual_write_lag_alert_seconds = 10

[hypertable]
# Chunk intervals per hypertable. Tuned so the hot-vs-cold boundary
# lines up with the compression policy below.
snapshot_chunk_interval_days = 1
feature_chunk_interval_days = 1
signal_chunk_interval_days = 7

[retention]
# Retention policies per TimescaleDB hypertable. A zero value skips
# the retention policy (rows never drop).
snapshot_retention_days = 0
feature_retention_days = 30
signal_retention_days = 0

[compression]
# Chunks older than this threshold compress automatically. Set to
# zero to disable compression on a given hypertable.
snapshot_compress_after_days = 7
feature_compress_after_days = 7
signal_compress_after_days = 30
20 changes: 20 additions & 0 deletions docs/architecture/system-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,23 @@ The closed list of phrase strings the LLM formatter rejects. Maintained as a con
6. **Pre-resolution exclusion.** No detector fires within six hours of `closes_at`. Enforced inside each detector's `ingest()`.
7. **Manipulation flags are descriptive, not prescriptive.** Augur attaches flags; consumers apply suppression policy.
8. **Deterministic context primary, LLM secondary.** The canonical machine-consumed output is `SignalContext` JSON. The LLM formatter is gated, opt-in, and routed to human channels only by default.

## Deployment Modes

Augur supports two deployment modes from the same codebase:

### Monolith (Phase 1-4 default)

One `augur_signals.engine` process owns the full pipeline from ingestion to formatter emission. `InProcessAsyncBus` routes between layers; `DuckDBStore` persists; the deterministic formatters run inline. This mode is the supported deployment until the growth triggers in `.docs/phase-5-scaling.md §2` fire twice across separate measurement windows.

### Distributed Runtime (Phase 5)

The engine decomposes into worker processes when scale demands it:

- Pollers (`augur_signals.workers.poller`), one per platform, publish snapshots to `augur.snapshots.<platform>.<market_id>`.
- Stateless workers (feature, detector, manipulation, calibration, context_format) scale horizontally behind an `EventBus` (NATS JetStream or Redis Streams). Per-market sharding uses FNV-1a modulo replica count; each replica sees only its shard.
- Singletons (dedup, llm_formatter) run as active-passive pairs coordinated by a `DistributedLock`. The active instance renews the lock on each heartbeat; a missed renewal flips the harness, orchestrator-driven restart re-enters the acquire loop, and the surviving replica takes over within `ttl_seconds + renew_interval_seconds`.
- TimescaleDB replaces DuckDB for persistence. Hypertables partition `snapshots`, `features`, and `signals` by time with compression and retention policies attached per `storage.toml`.
- Prometheus + OpenTelemetry replace the Phase 1 no-op shims without any call-site edits; the backend swap happens in `configure_observability`.

The distributed runtime is operator-driven — see `docs/operations/distributed-runbook.md` for cutover, rollback, and failover procedures. The monolith path remains fully supported during and after rollout so operators can revert to DuckDB for 30 days post-cutover.
99 changes: 99 additions & 0 deletions docs/operations/distributed-runbook.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Distributed Runtime Operational Runbook

The Phase 5 multi-process deployment is triggered by growth thresholds in `.docs/phase-5-scaling.md §2`: >80M snapshot rows, P95 backtest latency >30s, or P99 live-write latency >500ms, each observed twice across separate measurement windows. Until the triggers fire, the single-process engine is the supported deployment.

This runbook covers cutover, rollback, failover response, and on-call escalation for the distributed runtime.

## 1. Pre-Cutover Checklist

| Item | How to verify |
| --- | --- |
| TimescaleDB primary provisioned with hypertables | `SELECT * FROM timescaledb_information.hypertables` returns rows for `snapshots`, `features`, `signals` |
| Backfill complete with row-count parity | `scripts/migrate_to_timescale.py verify --start ... --end ...` exits 0 |
| Dual-write sidecar lag <10s for ≥7 days | `augur_dual_write_lag_seconds{table=*}` max-over-time stays under threshold |
| NATS or Redis bus operational; consumer groups created | `augur_bus_consume_lag_seconds` reports a value per `(topic, consumer_group)` |
| Shadow workers running for ≥48h without errors | `augur_worker_alive{worker_kind=*} == 1` continuously |
| Distributed lock backend seeded | `SET augur.lock.dedup ...` responds OK; NATS KV bucket exists |

## 2. Cutover Procedure

1. Announce the freeze window on the ops channel; set the monolith engine to drain mode (`AUGUR_DRAIN=true` env var) and let it finish in-flight cycles.
2. Flip `config/storage.toml`:

```toml
[backend]
kind = "timescaledb" # was "duckdb"
```

3. Apply the updated ConfigMap: `kubectl apply -k ops/deploy/`.
4. Restart the monolith engine (or run `kubectl rollout restart deployment/augur-engine`) — the new process picks up TimescaleDB at startup and opens a connection pool from `AUGUR_TIMESCALE_URL`.
5. Watch `augur_db_query_seconds{table, operation}` for 15 minutes; rollback if P95 exceeds the pre-cutover DuckDB baseline by 2×.
6. Bring workers online in the order recommended in `.docs/phase-5-scaling.md §12.1`: manipulation → feature → detector → calibration → context_format → dedup → LLM.

## 3. Rollback Procedure

Rollback is always available for 30 days post-cutover because the Parquet archive is preserved.

1. Flip `config/storage.toml` `backend.kind` back to `"duckdb"` and reapply the ConfigMap.
2. Scale the workers to zero: `kubectl scale --replicas=0 -n augur deployment --all statefulset --all`.
3. Start the monolith engine against the DuckDB archive.
4. Announce rollback on the ops channel and file a post-incident ticket with the TimescaleDB query traces that motivated rollback.

After the DuckDB path is removed (day 30+), rollback requires restoring from a TimescaleDB backup — see §5.

## 4. Failover Response

### 4.1 Dedup Singleton

Symptom: `augur_singleton_lock_holder{singleton_kind="dedup"}` drops to 0, or `augur_failover_total{singleton_kind="dedup"}` increments.

Procedure:

1. Confirm the active pod was terminated: `kubectl get pod -n augur -l app.kubernetes.io/component=dedup`.
2. Observe the passive replica acquire the lock within `lock.ttl_seconds + renew_interval_seconds` (default 40s). The metric flips back to 1 with the new pod name.
3. If the lock stays unheld for >2× `ttl_seconds`, manually delete the stale lock:
- Redis: `redis-cli DEL augur.lock.dedup`
- NATS: `nats kv delete augur-locks dedup`
4. Force a restart of both replicas so the acquire race runs clean: `kubectl rollout restart -n augur statefulset/augur-dedup`.

### 4.2 LLM Formatter

Identical to dedup with `singleton_kind="llm_formatter"`. In-flight briefs at failover time are dropped — by design per Phase 4 guidance. No retry.

### 4.3 Stateful Worker (feature / detector)

Stateful workers persist their per-market cursor to TimescaleDB every 60 seconds. On crash:

1. Kubernetes reschedules the pod; the replacement replica reads the last persisted cursor and resumes.
2. If the shard count changed (HPA scaled the deployment), the new owner replays from the last cursor of the displaced replica. Expect a short backlog as messages re-ack.
3. Monitor `augur_bus_consume_lag_seconds{topic="augur.features.*", consumer_group="feature-*"}` — it should return below 5s within one poll cycle.

## 5. Backup and Restore

| Artefact | Cadence | Retention |
| --- | --- | --- |
| TimescaleDB base backup | Daily (pg_basebackup) | 30 days |
| TimescaleDB WAL | Continuous archive to S3-equivalent | 14 days |
| Parquet archive | Written by engine during dual-write | 30 days post-cutover, then 1 year cold |
| Reliability curves | Checkpointed per calibration run | Indefinite |

Restore: `pg_basebackup` into a replacement host, replay WAL to the desired point-in-time, rerun `TimescaleDBStore.initialize` to validate hypertable definitions, then swap `AUGUR_TIMESCALE_URL`.

## 6. SLO Response Thresholds

| SLO | Target | Page if |
| --- | --- | --- |
| End-to-end signal latency P95 | <60s | >60s for 5+ min |
| End-to-end signal latency P99 | <120s | >120s for 5+ min |
| Live ingest write P99 | <200ms | >200ms for 2+ min |
| Bus consume lag P95 | <5s | >5s for 5+ min |
| Dedup failover time | <60s | >60s observed |
| LLM brief rejection rate | <5% / hr | >5% for 1 hr |

Pager rotation and escalation are operations-team-owned; the engineering runbook only defines the thresholds.

## 7. Common Investigations

- **High bus lag on one shard**: check `augur_worker_processed_total{worker_kind="feature", replica_id="..."}` per replica. A replica that plateaus while peers advance is likely stuck on a long-running transform.
- **Increasing LLM rejections**: inspect `augur_llm_briefs_rejected_total{reason}` — the label identifies the gate that dropped the brief (forbidden token, schema violation, consumer gate, backend error).
- **TimescaleDB lock contention**: correlate `augur_db_query_seconds{operation="write"}` tail with pg_stat_activity; a handful of long-held WAL-sender sessions usually point to a slow read replica.
25 changes: 25 additions & 0 deletions ops/deploy/configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Phase 5 operational config.
#
# Populate `config.*.toml` entries from the repository `config/` files
# at deploy time: `kubectl create configmap augur-config --from-file=config/`
# or sync via Kustomize/Helm. The worker image mounts this ConfigMap at
# /app/config, matching the Dockerfile's `AUGUR_CONFIG_DIR`.
apiVersion: v1
kind: ConfigMap
metadata:
name: augur-config
namespace: augur
data:
# Populated from the repository `config/` directory at deploy time.
# Each key matches the TOML filename the worker loads at startup.
storage.toml: ""
bus.toml: ""
observability.toml: ""
polling.toml: ""
detectors.toml: ""
dedup.toml: ""
formatters.toml: ""
consumers.toml: ""
llm.toml: ""
markets.toml: ""
forbidden_tokens.toml: ""
28 changes: 28 additions & 0 deletions ops/deploy/hpa.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# HorizontalPodAutoscaler only for stateless workers whose shard key
# is irrelevant. feature/detector workers shard by market_id modulo
# replica_count; dynamic scaling would silently drop or duplicate
# shards because the shard mod changes per replica-count change. To
# resize those pools the operator edits the Deployment `replicas`
# directly and restarts pods so the new REPLICA_COUNT propagates.
# Singletons do NOT autoscale — their replica count is pinned at 2
# (active + passive).
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: augur-context-format
namespace: augur
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: augur-context-format
minReplicas: 4
maxReplicas: 8
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 65
22 changes: 22 additions & 0 deletions ops/deploy/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

namespace: augur

resources:
- namespace.yaml
- configmap.yaml
- secrets.yaml
- pollers.yaml
- stateless-workers.yaml
- singletons.yaml
- services.yaml
- hpa.yaml
- servicemonitor.yaml

images:
- name: ghcr.io/aetherforge/augur
newTag: latest

commonLabels:
app.kubernetes.io/part-of: augur
Loading
Loading