feat: distributed runtime — bus, storage, workers, containers#7
Merged
Conversation
Introduce the configuration surface that the multi-process runtime consumes: storage.toml for backend selection and hypertable/compression /retention policies, bus.toml for NATS/Redis/memory backend wiring and distributed-lock TTLs, observability.toml for the Prometheus and OTLP exporters. Each TOML loads through a Pydantic frozen model with extra=forbid so typos fail loud at startup. Move NATS, Redis, psycopg, prometheus_client, and the OpenTelemetry SDKs into optional-dependencies groups on augur-signals so the monolith wheel stays lean; the dev group adds them (plus fakeredis) so CI can unit-test adapters against injected fakes without live clusters.
Replace the Phase 1 no-op MetricCounter / MetricGauge / trace_span shims with a pluggable backend selected by ObservabilityConfig. Call sites do not change: instrumented code keeps building instances by name+labels and invoking inc / set / span; configure_observability swaps the concrete registration target behind them. Counters and gauges cache by metric name so repeated construction returns the same registered collector. The backend accepts an explicit CollectorRegistry for test isolation — production uses the default module-level registry while tests pass a fresh CollectorRegistry per case to avoid "already registered" collisions. Tracing wires through TracerProvider with a TraceIdRatioBased sampler and the OTLP gRPC exporter; start_metrics_server opens the scrape endpoint only when metrics.kind is "prometheus".
Introduce the byte-level EventBus protocol used by multi-process workers. Two adapters implement it: * NATSBus — one JetStream stream per subject prefix, pull consumers keyed by (pattern, consumer_group). Publish via js.publish with optional headers; subscribe yields BusMessage envelopes and acks only after the consumer iterates past each message so unfinished processing triggers JetStream redelivery on restart. * RedisStreamsBus — one stream per literal subject, XADD with MAXLEN trim for hot-retention, XREADGROUP/XACK for at-least-once delivery. make_event_bus selects between them from BusConfig. The memory kind is intentionally rejected and callers are redirected to the monolith's InProcessAsyncBus. DistributedLock protocol plus three backends: * InMemoryLock with an injectable monotonic clock for unit tests. * NATSKVLock using the JetStream KV bucket's create/put/delete semantics as a CAS primitive. * RedisLock using SET NX EX for acquire and WATCH+MULTI+EXEC for renew/release, keeping the adapter usable against fakeredis (which does not ship a Lua interpreter) and against real Redis clusters without relying on the script cache. Adapters accept an injected client so unit tests exercise them via fakeredis.aioredis and a fake NATS JetStream without any live cluster.
TimescaleDBStore mirrors the DuckDBStore public surface so the engine flips storage backends via config.backend.kind without call-site edits. Writes use ON CONFLICT DO UPDATE for idempotent upserts; reads use the same query shapes as DuckDB with psycopg parameter binding. initialize() issues the schema DDL, calls create_hypertable on snapshots, features, and signals with chunk intervals and segment-by columns from StorageConfig, and attaches compression + retention policies per HypertableSpec — zero-day values skip the policy. The adapter takes its AsyncConnection via constructor injection so unit tests exercise every branch against a recording stub; CI runs the integration suite against a real TimescaleDB container in a follow-up. make_storage factory routes by backend.kind: duckdb returns the existing DuckDBStore, timescaledb opens an AsyncConnection from the configured env var (unless tests inject one) and returns the new adapter.
WorkerHarness supervises one replica: connect to the event bus, fire an optional heartbeat task, drive the worker main coroutine, shut down cleanly on SIGINT or SIGTERM or when the heartbeat stops returning True. Stateless workers pass NoHeartbeat; singletons plug in a lock-renewing emitter that signals failover on lost lease. Metric counters and gauges emit through the observability backend automatically. run_bridge is the one supervisor stateless workers build on: subscribe then deserialize then shard-filter then transform then serialize then publish. Feature, detector, manipulation, calibration, and context workers share this spine. A separate poller entrypoint fans snapshots from AdaptivePoller into the snapshot subjects. The shard filter uses FNV-1a modulo replica_count so per-market state stays on the same replica even after redeploy, and the subject helpers centralize the subject taxonomy.
SingletonRunner wraps one dedup-or-llm replica around a distributed lock. Boot: try acquire; if we win we become active and the harness runs our main coroutine with a SingletonHeartbeat renewing the lock every config.lock.renew_interval_seconds. Losing the renew flips the harness stop flag — the orchestrator restarts the process and the race re-runs, giving the surviving replica another chance. Passive peers wait in acquire_active_role polling the lock at wait_tick_seconds; max_wait_ticks is a test-only escape hatch so the passive loop is not unbounded under pytest. Failover telemetry: augur_failover_total counter increments whenever a heartbeat observes the lock was stolen, and augur_singleton_lock_holder gauge exposes the current holder per singleton_kind label. Tested end-to-end against InMemoryLock: happy-path activation, passive timeout, heartbeat detecting lost lock, and a two-runner sequence where the passive picks up after the active exits and releases.
…ments The distributed-runtime modules landed with reStructuredText-style double backticks. The rest of the codebase uses single backticks in docstrings and comments, so fold them to match.
Multi-stage Dockerfile builds a single image used by every worker kind. The runtime stage copies the uv venv, source, and config into a non-root user; CMD defaults to the monolith engine and is overridden per worker in the Kubernetes manifests. K8s manifests under ops/deploy/: * namespace, ConfigMap (config/*.toml bundle), and Secret scaffolding for AUGUR_TIMESCALE_URL / REDIS_URL / NATS_CREDENTIALS_FILE / ANTHROPIC_API_KEY. * Deployments for each stateless worker (poller per platform, feature, detector, manipulation, calibration, context-format), each exposing /metrics on port 9090. * StatefulSets for the active-passive singletons (dedup, llm) so each replica gets a stable pod name forwarded to the distributed lock as holder_id. * Services: augur-websocket public endpoint; headless services for the singleton StatefulSets. * HorizontalPodAutoscalers on CPU + augur_bus_consume_lag_seconds. * ServiceMonitor for the prometheus-operator scrape setup. * kustomization.yaml wires everything and pins the image tag.
CRITICAL:
- bus/redis_streams.py + bus/nats.py: defer XACK/msg.ack() to the
start of the next iteration so consumers that break mid-async-for
leave the in-flight message pending for redelivery. Previously the
ack fired immediately after yield return, which under clean
shutdown ran the ack for the last yielded message only if the
consumer iterated forward, and fired the ack *before* downstream
publish completed in run_bridge.
- scripts/migrate_to_timescale.py: reject any Parquet column not in
the snapshots allowlist before building dynamic SQL, and report
cur.rowcount instead of len(records) so ON CONFLICT DO NOTHING
skips do not silently pass the row-count parity check.
- storage/timescaledb_store.py: compress_segmentby takes a column-
list identifier, not a quoted string. Split, validate each column
against the SQL-ident allowlist, and inline the unquoted list.
HIGH:
- bus/nats.py NATSKVLock: renew/release use kv.update/kv.delete with
the observed revision so a stale replica recovering from a network
blip cannot overwrite the current holder's value or delete a key
the new holder already claimed.
- storage/timescaledb_store.py: schema_version INSERT switches to
INSERT ... SELECT WHERE NOT EXISTS so applied_at stays a migration
audit record rather than a last-boot marker, matching DuckDB.
- _observability.py: OTel TracerProvider is reused across
configure_observability calls because the SDK refuses to replace
it, eliminating silent 'Overriding current TracerProvider' warnings
in test runs.
- ops/deploy/hpa.yaml: remove HPA autoscale on feature/detector
deployments; shard-sensitive workers cannot tolerate dynamic
replica counts without re-sharding, so operators resize them by
editing 'replicas' directly.
- ops/deploy/{pollers,stateless-workers}.yaml: every Deployment now
declares readinessProbe and livenessProbe on the /metrics port so
rolling updates and service-mesh routing respect worker readiness.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
.docs/phase-5-scaling.md §2fire twice across separate measurement windows.EventBusprotocol with NATS JetStream and Redis Streams adapters;make_event_busroutes byBusConfig.backend.kind. Distributed lock primitive (InMemoryLock,NATSKVLock,RedisLock) powers active-passive singleton failover.TimescaleDBStoremirroringDuckDBStore's surface with hypertable creation + compression/retention policies fromstorage.toml.CollectorRegistryisolates the prometheus_client default registry.WorkerHarnesssupervisor with heartbeat + SIGINT/SIGTERM;run_bridgespine consumed by feature/detector/manipulation/calibration/context-format workers; poller entrypoint per platform;SingletonRunnerfor dedup + LLM formatter with lock-based active-passive coordination.scripts/migrate_to_timescale.py(backfill + verify) andscripts/dual_write_sidecar.pywithaugur_dual_write_lag_secondsgauge + alert counter.docs/operations/distributed-runbook.md(cutover/rollback/failover), and a new "Deployment Modes" section indocs/architecture/system-design.md.Test plan
fakeredis, stub NATS JetStream, recording AsyncConnection, InMemoryLock).uv run pytest— 336 passed locally.uv run ruff check+uv run ruff format --check .+uv run mypy --strictall clean.@pytest.mark.integration.tests/signals/test_engine_integration.pygreen.