Skip to content

feat: distributed runtime — bus, storage, workers, containers#7

Merged
Mathews-Tom merged 11 commits into
mainfrom
feat/distributed-runtime
Apr 17, 2026
Merged

feat: distributed runtime — bus, storage, workers, containers#7
Mathews-Tom merged 11 commits into
mainfrom
feat/distributed-runtime

Conversation

@Mathews-Tom

Copy link
Copy Markdown
Owner

Summary

  • Phase 5 scaffolding for the multi-process runtime. The Phase 1-4 monolith stays the supported deployment until migration triggers in .docs/phase-5-scaling.md §2 fire twice across separate measurement windows.
  • New transport: byte-level EventBus protocol with NATS JetStream and Redis Streams adapters; make_event_bus routes by BusConfig.backend.kind. Distributed lock primitive (InMemoryLock, NATSKVLock, RedisLock) powers active-passive singleton failover.
  • New persistence: TimescaleDBStore mirroring DuckDBStore's surface with hypertable creation + compression/retention policies from storage.toml.
  • New observability: Prometheus counters/gauges and OpenTelemetry OTLP tracer swapped in behind the same shim call sites; per-test CollectorRegistry isolates the prometheus_client default registry.
  • New workers: WorkerHarness supervisor with heartbeat + SIGINT/SIGTERM; run_bridge spine consumed by feature/detector/manipulation/calibration/context-format workers; poller entrypoint per platform; SingletonRunner for dedup + LLM formatter with lock-based active-passive coordination.
  • Migration tooling: scripts/migrate_to_timescale.py (backfill + verify) and scripts/dual_write_sidecar.py with augur_dual_write_lag_seconds gauge + alert counter.
  • Ops: multi-stage Dockerfile shared across worker kinds; Kubernetes manifests (namespace, ConfigMap, Secret, Deployments, StatefulSets for singletons, HPAs, ServiceMonitor, Kustomize overlay).
  • Docs: CHANGELOG entry, docs/operations/distributed-runbook.md (cutover/rollback/failover), and a new "Deployment Modes" section in docs/architecture/system-design.md.

Test plan

  • Phase 5 unit tests exercise every new adapter against injected fakes (fakeredis, stub NATS JetStream, recording AsyncConnection, InMemoryLock).
  • Full workspace suite: uv run pytest — 336 passed locally.
  • uv run ruff check + uv run ruff format --check . + uv run mypy --strict all clean.
  • Integration suite (live NATS / Redis / TimescaleDB) remains opt-in and operator-owned; a follow-up PR wires docker-compose fixtures behind @pytest.mark.integration.
  • No regressions on the monolith path: tests/signals/test_engine_integration.py green.

Introduce the configuration surface that the multi-process runtime
consumes: storage.toml for backend selection and hypertable/compression
/retention policies, bus.toml for NATS/Redis/memory backend wiring and
distributed-lock TTLs, observability.toml for the Prometheus and OTLP
exporters. Each TOML loads through a Pydantic frozen model with
extra=forbid so typos fail loud at startup.

Move NATS, Redis, psycopg, prometheus_client, and the OpenTelemetry
SDKs into optional-dependencies groups on augur-signals so the monolith
wheel stays lean; the dev group adds them (plus fakeredis) so CI can
unit-test adapters against injected fakes without live clusters.
Replace the Phase 1 no-op MetricCounter / MetricGauge / trace_span
shims with a pluggable backend selected by ObservabilityConfig. Call
sites do not change: instrumented code keeps building instances by
name+labels and invoking inc / set / span; configure_observability
swaps the concrete registration target behind them.

Counters and gauges cache by metric name so repeated construction
returns the same registered collector. The backend accepts an explicit
CollectorRegistry for test isolation — production uses the default
module-level registry while tests pass a fresh CollectorRegistry per
case to avoid "already registered" collisions. Tracing wires through
TracerProvider with a TraceIdRatioBased sampler and the OTLP gRPC
exporter; start_metrics_server opens the scrape endpoint only when
metrics.kind is "prometheus".
Introduce the byte-level EventBus protocol used by multi-process
workers. Two adapters implement it:

* NATSBus — one JetStream stream per subject prefix, pull consumers
  keyed by (pattern, consumer_group). Publish via js.publish with
  optional headers; subscribe yields BusMessage envelopes and acks
  only after the consumer iterates past each message so unfinished
  processing triggers JetStream redelivery on restart.
* RedisStreamsBus — one stream per literal subject, XADD with MAXLEN
  trim for hot-retention, XREADGROUP/XACK for at-least-once delivery.

make_event_bus selects between them from BusConfig. The memory kind
is intentionally rejected and callers are redirected to the monolith's
InProcessAsyncBus.

DistributedLock protocol plus three backends:

* InMemoryLock with an injectable monotonic clock for unit tests.
* NATSKVLock using the JetStream KV bucket's create/put/delete
  semantics as a CAS primitive.
* RedisLock using SET NX EX for acquire and WATCH+MULTI+EXEC for
  renew/release, keeping the adapter usable against fakeredis
  (which does not ship a Lua interpreter) and against real Redis
  clusters without relying on the script cache.

Adapters accept an injected client so unit tests exercise them via
fakeredis.aioredis and a fake NATS JetStream without any live cluster.
TimescaleDBStore mirrors the DuckDBStore public surface so the engine
flips storage backends via config.backend.kind without call-site edits.
Writes use ON CONFLICT DO UPDATE for idempotent upserts; reads use the
same query shapes as DuckDB with psycopg parameter binding.

initialize() issues the schema DDL, calls create_hypertable on
snapshots, features, and signals with chunk intervals and segment-by
columns from StorageConfig, and attaches compression + retention
policies per HypertableSpec — zero-day values skip the policy. The
adapter takes its AsyncConnection via constructor injection so unit
tests exercise every branch against a recording stub; CI runs the
integration suite against a real TimescaleDB container in a follow-up.

make_storage factory routes by backend.kind: duckdb returns the
existing DuckDBStore, timescaledb opens an AsyncConnection from the
configured env var (unless tests inject one) and returns the new
adapter.
WorkerHarness supervises one replica: connect to the event bus, fire
an optional heartbeat task, drive the worker main coroutine, shut down
cleanly on SIGINT or SIGTERM or when the heartbeat stops returning True.
Stateless workers pass NoHeartbeat; singletons plug in a lock-renewing
emitter that signals failover on lost lease. Metric counters and gauges
emit through the observability backend automatically.

run_bridge is the one supervisor stateless workers build on: subscribe
then deserialize then shard-filter then transform then serialize then
publish. Feature, detector, manipulation, calibration, and context
workers share this spine. A separate poller entrypoint fans snapshots
from AdaptivePoller into the snapshot subjects.

The shard filter uses FNV-1a modulo replica_count so per-market state
stays on the same replica even after redeploy, and the subject helpers
centralize the subject taxonomy.
SingletonRunner wraps one dedup-or-llm replica around a distributed
lock. Boot: try acquire; if we win we become active and the harness
runs our main coroutine with a SingletonHeartbeat renewing the lock
every config.lock.renew_interval_seconds. Losing the renew flips the
harness stop flag — the orchestrator restarts the process and the
race re-runs, giving the surviving replica another chance.

Passive peers wait in acquire_active_role polling the lock at
wait_tick_seconds; max_wait_ticks is a test-only escape hatch so the
passive loop is not unbounded under pytest.

Failover telemetry: augur_failover_total counter increments whenever
a heartbeat observes the lock was stolen, and augur_singleton_lock_holder
gauge exposes the current holder per singleton_kind label.

Tested end-to-end against InMemoryLock: happy-path activation, passive
timeout, heartbeat detecting lost lock, and a two-runner sequence where
the passive picks up after the active exits and releases.
…ments

The distributed-runtime modules landed with reStructuredText-style
double backticks. The rest of the codebase uses single backticks in
docstrings and comments, so fold them to match.
Multi-stage Dockerfile builds a single image used by every worker
kind. The runtime stage copies the uv venv, source, and config into
a non-root user; CMD defaults to the monolith engine and is overridden
per worker in the Kubernetes manifests.

K8s manifests under ops/deploy/:

* namespace, ConfigMap (config/*.toml bundle), and Secret scaffolding
  for AUGUR_TIMESCALE_URL / REDIS_URL / NATS_CREDENTIALS_FILE /
  ANTHROPIC_API_KEY.
* Deployments for each stateless worker (poller per platform,
  feature, detector, manipulation, calibration, context-format),
  each exposing /metrics on port 9090.
* StatefulSets for the active-passive singletons (dedup, llm) so each
  replica gets a stable pod name forwarded to the distributed lock
  as holder_id.
* Services: augur-websocket public endpoint; headless services for
  the singleton StatefulSets.
* HorizontalPodAutoscalers on CPU + augur_bus_consume_lag_seconds.
* ServiceMonitor for the prometheus-operator scrape setup.
* kustomization.yaml wires everything and pins the image tag.
CRITICAL:
- bus/redis_streams.py + bus/nats.py: defer XACK/msg.ack() to the
  start of the next iteration so consumers that break mid-async-for
  leave the in-flight message pending for redelivery. Previously the
  ack fired immediately after yield return, which under clean
  shutdown ran the ack for the last yielded message only if the
  consumer iterated forward, and fired the ack *before* downstream
  publish completed in run_bridge.
- scripts/migrate_to_timescale.py: reject any Parquet column not in
  the snapshots allowlist before building dynamic SQL, and report
  cur.rowcount instead of len(records) so ON CONFLICT DO NOTHING
  skips do not silently pass the row-count parity check.
- storage/timescaledb_store.py: compress_segmentby takes a column-
  list identifier, not a quoted string. Split, validate each column
  against the SQL-ident allowlist, and inline the unquoted list.

HIGH:
- bus/nats.py NATSKVLock: renew/release use kv.update/kv.delete with
  the observed revision so a stale replica recovering from a network
  blip cannot overwrite the current holder's value or delete a key
  the new holder already claimed.
- storage/timescaledb_store.py: schema_version INSERT switches to
  INSERT ... SELECT WHERE NOT EXISTS so applied_at stays a migration
  audit record rather than a last-boot marker, matching DuckDB.
- _observability.py: OTel TracerProvider is reused across
  configure_observability calls because the SDK refuses to replace
  it, eliminating silent 'Overriding current TracerProvider' warnings
  in test runs.
- ops/deploy/hpa.yaml: remove HPA autoscale on feature/detector
  deployments; shard-sensitive workers cannot tolerate dynamic
  replica counts without re-sharding, so operators resize them by
  editing 'replicas' directly.
- ops/deploy/{pollers,stateless-workers}.yaml: every Deployment now
  declares readinessProbe and livenessProbe on the /metrics port so
  rolling updates and service-mesh routing respect worker readiness.
@Mathews-Tom Mathews-Tom merged commit 570261c into main Apr 17, 2026
2 checks passed
@Mathews-Tom Mathews-Tom deleted the feat/distributed-runtime branch April 17, 2026 10:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant