diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d7b3ce..f1d2bbe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,22 @@ All notable changes to Augur are recorded in this file. Format follows [Keep a C ## [Unreleased] +### Added — Deterministic Formatters + +- `src/augur_format/deterministic/json_feed.py` — `to_canonical_json` emits UTF-8 JSON bytes with stable key ordering (top-level, signal block, related-market block), six-decimal float rounding (configurable), and Z-suffix UTC timestamps. Byte-identical across invocations. +- `src/augur_format/deterministic/severity.py` — pure `derive_severity` mapping magnitude × confidence against per-liquidity-tier thresholds to `{high, medium, low}`. Formula lives in code so consumers can reproduce locally. +- `src/augur_format/deterministic/markdown.py` — Jinja2 `MarkdownFormatter` rendering five per-signal-type templates that extend `_base.md.j2`. Templates ship inside the wheel via the hatch `include = ["augur_format/**/*.j2"]` rule. +- `src/augur_format/validate/` — `ConsumerEnumValidator` rejects briefs whose `actionable_for` contains values outside `ConsumerType`; `load_schema` reads exported JSON schemas from `schemas/` for debug-build validation. +- `src/augur_format/transport/webhook.py` — `WebhookFormatter` POSTs canonical JSON, wrapped Markdown, or Slack Block Kit payloads to configured destinations with exponential-backoff retry on 5xx/429 and drop on 4xx. Auth headers sourced from env vars at delivery time. +- `src/augur_format/transport/websocket.py` — `WebSocketBroadcaster` with `SIGNAL`, `HEARTBEAT`, `STORM_START`, `STORM_END` frame types; oldest-drop under full per-connection queues for timeliness under pressure. +- `src/augur_format/routing/` — `ConsumerRegistry.from_toml` loads `config/consumers.toml` and exposes per-category routing; `SignalRouter` maps `SignalContext` to the consumer set, surfacing suppressed consumers for `llm_assisted` interpretation mode. +- `src/augur_format/llm/models.py` — `IntelligenceBrief` contract declared in this phase for completeness. The gated LLM formatter in the next phase instantiates the model; the JSON schema ships at `schemas/IntelligenceBrief-1.0.0.json`. +- `config/formatters.toml` mirrors `phase-3 §12.2` with JSON, Markdown, Webhook, and WebSocket blocks validated against `FormatterConfig`. + +### Operational Handoff — Deterministic Formatters + +After merge operators can subscribe clients to the WebSocket broadcaster for live signal frames, wire webhook targets (Slack or generic JSON/Markdown) to push brief deliveries, and route signals to consumers via the `ConsumerRegistry` loaded from `config/consumers.toml`. The canonical JSON feed is ready for any consumer that validates against `schemas/SignalContext-1.0.0.json`. + ### Added — Labeling Pipeline - `src/augur_labels/` package with Pydantic data contracts for `NewsworthyEvent`, `EventCandidate`, `SourcePublication`, `QualifyingSource`, `LabelDecision`, `AnnotatorIdentity`, and `AgreementReport`. The closed `source_id` literal set (reuters, bloomberg, ap, ft) is load-bearing across adapters, storage, and workflow. diff --git a/config/formatters.toml b/config/formatters.toml new file mode 100644 index 0000000..e7b7a7a --- /dev/null +++ b/config/formatters.toml @@ -0,0 +1,26 @@ +# Deterministic formatter configuration. Schema mirrors phase-3 §12.2 +# verbatim; each block maps onto a Pydantic config sub-model in +# augur_format._config. A malformed file fails at startup via +# augur_signals._config.load_config. + +[json] +float_decimals = 6 +timestamp_format = "iso_z" + +[markdown] +template_dir = "src/augur_format/augur_format/deterministic/templates" +trim_blocks = true +lstrip_blocks = true + +[webhook] +initial_retry_delay_seconds = 1 +max_retry_delay_seconds = 60 +max_retries = 5 +delivery_timeout_seconds = 10 + +[websocket] +bind = "0.0.0.0" +port = 8765 +heartbeat_interval_seconds = 30 +heartbeat_timeout_seconds = 90 +per_connection_buffer = 64 diff --git a/schemas/IntelligenceBrief-1.0.0.json b/schemas/IntelligenceBrief-1.0.0.json new file mode 100644 index 0000000..65637ea --- /dev/null +++ b/schemas/IntelligenceBrief-1.0.0.json @@ -0,0 +1,90 @@ +{ + "$defs": { + "ConsumerType": { + "description": "Registered consumers of the brief feed per docs/contracts/consumer-registry.md.", + "enum": [ + "macro_research_agent", + "geopolitical_research_agent", + "crypto_research_agent", + "financial_news_desk", + "regulatory_news_desk", + "dashboard" + ], + "title": "ConsumerType", + "type": "string" + } + }, + "additionalProperties": false, + "description": "Gated LLM formatter output contract.\n\n``actionable_for`` is constrained to the ConsumerType registry in\ndocs/contracts/consumer-registry.md via the Pydantic field type;\nthe closed-enum validator rechecks this at the formatter boundary\nso even dynamically-constructed instances fail loud on unknown\nvalues.", + "properties": { + "actionable_for": { + "items": { + "$ref": "#/$defs/ConsumerType" + }, + "title": "Actionable For", + "type": "array" + }, + "body_markdown": { + "title": "Body Markdown", + "type": "string" + }, + "brief_id": { + "title": "Brief Id", + "type": "string" + }, + "forbidden_token_check": { + "const": "passed", + "default": "passed", + "title": "Forbidden Token Check", + "type": "string" + }, + "headline": { + "title": "Headline", + "type": "string" + }, + "interpretation_mode": { + "const": "llm_assisted", + "default": "llm_assisted", + "title": "Interpretation Mode", + "type": "string" + }, + "model": { + "title": "Model", + "type": "string" + }, + "prompt_hash": { + "title": "Prompt Hash", + "type": "string" + }, + "schema_version": { + "const": "1.0.0", + "default": "1.0.0", + "title": "Schema Version", + "type": "string" + }, + "severity": { + "enum": [ + "high", + "medium", + "low" + ], + "title": "Severity", + "type": "string" + }, + "signal_id": { + "title": "Signal Id", + "type": "string" + } + }, + "required": [ + "brief_id", + "signal_id", + "headline", + "body_markdown", + "severity", + "model", + "prompt_hash" + ], + "title": "IntelligenceBrief", + "type": "object" +} diff --git a/scripts/export_schemas.py b/scripts/export_schemas.py index ba3f8aa..c6cb45d 100644 --- a/scripts/export_schemas.py +++ b/scripts/export_schemas.py @@ -25,6 +25,7 @@ from pydantic import BaseModel +from augur_format.llm.models import IntelligenceBrief from augur_signals.models import ( FeatureVector, MarketSignal, @@ -41,6 +42,7 @@ (FeatureVector, "1.0.0"), (MarketSignal, "1.0.0"), (SignalContext, "1.0.0"), + (IntelligenceBrief, "1.0.0"), ] diff --git a/src/augur_format/augur_format/_config.py b/src/augur_format/augur_format/_config.py new file mode 100644 index 0000000..9bfa314 --- /dev/null +++ b/src/augur_format/augur_format/_config.py @@ -0,0 +1,68 @@ +"""Configuration models for deterministic formatters. + +Mirrors config/formatters.toml block-for-block. Loaded at engine +startup via augur_signals._config.load_config; a missing required +value or malformed block fails loudly rather than coercing. +""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, ConfigDict, Field + + +class JsonConfig(BaseModel): + """Canonical JSON formatter parameters.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + float_decimals: int = Field(default=6, ge=0, le=18) + timestamp_format: Literal["iso_z"] = "iso_z" + + +class MarkdownConfig(BaseModel): + """Jinja2 rendering parameters.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + template_dir: str = "src/augur_format/augur_format/deterministic/templates" + trim_blocks: bool = True + lstrip_blocks: bool = True + + +class WebhookConfig(BaseModel): + """Webhook delivery retry and timeout settings.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + initial_retry_delay_seconds: float = Field(default=1.0, gt=0.0) + max_retry_delay_seconds: float = Field(default=60.0, gt=0.0) + max_retries: int = Field(default=5, gt=0) + delivery_timeout_seconds: float = Field(default=10.0, gt=0.0) + + +class WebSocketConfig(BaseModel): + """WebSocket transport bind, heartbeat, and per-connection buffer.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + bind: str = "0.0.0.0" # noqa: S104 — documented default bind for the WS server + port: int = Field(default=8765, gt=0, le=65_535) + heartbeat_interval_seconds: int = Field(default=30, gt=0) + heartbeat_timeout_seconds: int = Field(default=90, gt=0) + per_connection_buffer: int = Field(default=64, gt=0) + + +class FormatterConfig(BaseModel): + """Top-level formatter configuration loaded from config/formatters.toml.""" + + model_config = ConfigDict(frozen=True, extra="forbid", populate_by_name=True) + + # Field aliased so the TOML block is [json] per the documented + # schema, while the Python attribute is ``canonical_json`` to avoid + # shadowing BaseModel.json. + canonical_json: JsonConfig = Field(default_factory=JsonConfig, alias="json") + markdown: MarkdownConfig = Field(default_factory=MarkdownConfig) + webhook: WebhookConfig = Field(default_factory=WebhookConfig) + websocket: WebSocketConfig = Field(default_factory=WebSocketConfig) diff --git a/src/augur_format/augur_format/deterministic/json_feed.py b/src/augur_format/augur_format/deterministic/json_feed.py new file mode 100644 index 0000000..29e6eca --- /dev/null +++ b/src/augur_format/augur_format/deterministic/json_feed.py @@ -0,0 +1,117 @@ +"""Canonical JSON formatter for SignalContext. + +Serializes a SignalContext with stable key ordering, float rounding, +and ISO-8601 UTC timestamps with a ``Z`` suffix. The determinism +contract: same SignalContext in, byte-identical JSON out across any +number of invocations. Consumers can hash the bytes and rely on +stable equality. +""" + +from __future__ import annotations + +import json +from collections.abc import Mapping +from datetime import datetime +from typing import Any + +from augur_signals.models import SignalContext + +CANONICAL_KEY_ORDER: tuple[str, ...] = ( + "signal", + "market_question", + "resolution_criteria", + "resolution_source", + "closes_at", + "related_markets", + "investigation_prompts", + "interpretation_mode", + "schema_version", +) + +SIGNAL_KEY_ORDER: tuple[str, ...] = ( + "signal_id", + "market_id", + "platform", + "signal_type", + "magnitude", + "direction", + "confidence", + "fdr_adjusted", + "detected_at", + "window_seconds", + "liquidity_tier", + "manipulation_flags", + "related_market_ids", + "raw_features", + "schema_version", +) + +RELATED_KEY_ORDER: tuple[str, ...] = ( + "market_id", + "question", + "current_price", + "delta_24h", + "volume_24h", + "relationship_type", + "relationship_strength", +) + + +def to_canonical_json(context: SignalContext, *, float_decimals: int = 6) -> bytes: + """Return the canonical JSON bytes for *context*. + + Args: + context: The SignalContext to serialize. + float_decimals: Decimal places each float field is rounded to + before serialization. Must be applied consistently across + producers and consumers so equality comparison survives + the round-trip. + + Returns: + UTF-8 encoded JSON bytes with no whitespace between separators + and stable key ordering. + """ + dumped = context.model_dump(mode="json") + payload: dict[str, Any] = _ordered_dict(dumped, CANONICAL_KEY_ORDER, float_decimals) + payload["signal"] = _ordered_dict(dumped["signal"], SIGNAL_KEY_ORDER, float_decimals) + payload["related_markets"] = [ + _ordered_dict(rm, RELATED_KEY_ORDER, float_decimals) + for rm in dumped.get("related_markets", []) + ] + return json.dumps( + payload, + default=_json_default, + ensure_ascii=False, + separators=(",", ":"), + sort_keys=False, + ).encode("utf-8") + + +def _ordered_dict( + source: Mapping[str, Any], + key_order: tuple[str, ...], + float_decimals: int, +) -> dict[str, Any]: + return {key: _round_floats(source[key], float_decimals) for key in key_order if key in source} + + +def _round_floats(value: Any, float_decimals: int) -> Any: + if isinstance(value, float): + return round(value, float_decimals) + if isinstance(value, list): + return [_round_floats(v, float_decimals) for v in value] + if isinstance(value, dict): + # Sort nested dict keys so producers with variable insertion + # order (e.g. raw_features populated conditionally by dedup + # and cluster-merge paths) still emit byte-identical JSON for + # the same logical payload. + return {k: _round_floats(value[k], float_decimals) for k in sorted(value)} + return value + + +def _json_default(obj: Any) -> Any: + if isinstance(obj, datetime): + iso = obj.isoformat() + # Pydantic emits "+00:00"; canonicalize to "Z". + return iso.replace("+00:00", "Z") + raise TypeError(f"cannot serialize {type(obj).__name__}") diff --git a/src/augur_format/augur_format/deterministic/markdown.py b/src/augur_format/augur_format/deterministic/markdown.py new file mode 100644 index 0000000..f11dabc --- /dev/null +++ b/src/augur_format/augur_format/deterministic/markdown.py @@ -0,0 +1,55 @@ +"""Jinja2 Markdown renderer. + +Templates live alongside this module at ``templates/``; one per +signal type plus a shared ``_base.md.j2``. The renderer is +deterministic given identical inputs and template files. The +templates are committed, so any rendering drift surfaces as a test +failure rather than silent variation. +""" + +from __future__ import annotations + +from pathlib import Path + +from jinja2 import Environment, FileSystemLoader, select_autoescape + +from augur_signals.models import SignalContext + +_DEFAULT_TEMPLATE_DIR = Path(__file__).resolve().parent / "templates" + + +class MarkdownFormatter: + """Render a SignalContext as Markdown via Jinja2.""" + + def __init__(self, template_dir: Path | None = None) -> None: + directory = template_dir or _DEFAULT_TEMPLATE_DIR + self._env = Environment( + loader=FileSystemLoader(str(directory)), + autoescape=select_autoescape(["html"]), + trim_blocks=True, + lstrip_blocks=True, + keep_trailing_newline=True, + ) + + def format(self, context: SignalContext, severity: str) -> str: + """Render the per-signal-type template for *context*. + + Raises jinja2.TemplateNotFound if the signal_type does not + have a dedicated template; a dedicated template exists for + every value in SignalType by construction, so missing + templates indicate a contract drift between enum and templates. + """ + template_name = f"{context.signal.signal_type.value}.md.j2" + template = self._env.get_template(template_name) + return template.render( + signal=context.signal, + market_question=context.market_question, + resolution_criteria=context.resolution_criteria, + resolution_source=context.resolution_source, + closes_at=context.closes_at, + related_markets=context.related_markets, + investigation_prompts=context.investigation_prompts, + interpretation_mode=context.interpretation_mode.value, + schema_version=context.schema_version, + severity=severity, + ) diff --git a/src/augur_format/augur_format/deterministic/severity.py b/src/augur_format/augur_format/deterministic/severity.py new file mode 100644 index 0000000..b47364a --- /dev/null +++ b/src/augur_format/augur_format/deterministic/severity.py @@ -0,0 +1,50 @@ +"""Deterministic severity derivation. + +Severity is ``magnitude * confidence`` scored against per-tier +thresholds. The formula is pure code (not configuration) so every +consumer can reproduce the mapping locally without a network round +trip. Changing the thresholds requires a schema-version bump on the +IntelligenceBrief contract since downstream routing depends on stable +severity output. + +Threshold table +--------------- + +================ ====== ======= ====== +liquidity_tier high medium low +================ ====== ======= ====== +high > 0.6 > 0.3 ≤ 0.3 +mid > 0.7 ≤ 0.7 ≤ 0.7 +low — — always +================ ====== ======= ====== +""" + +from __future__ import annotations + +from typing import Literal + +from augur_signals.models import MarketSignal + +Severity = Literal["high", "medium", "low"] + + +def derive_severity(signal: MarketSignal) -> Severity: + """Return the deterministic severity label for *signal*. + + The score is ``magnitude * confidence`` (both in [0, 1]); the + threshold applied depends on the liquidity tier. Low-tier markets + always emit "low" severity — the sample size on low-tier reliability + curves is too thin to justify higher confidence in a human channel. + """ + score = signal.magnitude * signal.confidence + if signal.liquidity_tier == "high": + if score > 0.6: + return "high" + if score > 0.3: + return "medium" + return "low" + if signal.liquidity_tier == "mid": + if score > 0.7: + return "medium" + return "low" + return "low" diff --git a/src/augur_format/augur_format/deterministic/templates/_base.md.j2 b/src/augur_format/augur_format/deterministic/templates/_base.md.j2 new file mode 100644 index 0000000..d1013b8 --- /dev/null +++ b/src/augur_format/augur_format/deterministic/templates/_base.md.j2 @@ -0,0 +1,53 @@ +# {{ signal.signal_type | replace("_", " ") | title }} on {{ signal.market_id }} + +**Severity:** {{ severity }} +**Confidence:** {{ "%.2f" | format(signal.confidence) }} +**Liquidity tier:** {{ signal.liquidity_tier }} +**Detected:** {{ signal.detected_at.isoformat() }} + +{% block extra_summary %}{% endblock %} +## Signal Summary + +- Market: {{ market_question }} +- Resolves: {{ closes_at.isoformat() }} +- Resolution source: {{ resolution_source }} +- Movement magnitude: {{ "%.3f" | format(signal.magnitude) }} (direction: {{ signal.direction }}) +- Detection window: {{ (signal.window_seconds // 60) }} minutes + +## Resolution Criteria + +> {{ resolution_criteria }} + +## Related Markets + +{% if related_markets %} +{% for rm in related_markets %} +- **{{ rm.market_id }}** ({{ rm.relationship_type }}, strength {{ "%.2f" | format(rm.relationship_strength) }}): price {{ "%.3f" | format(rm.current_price) }} (24h delta {{ "%.3f" | format(rm.delta_24h) }}, volume {{ rm.volume_24h | int }}) +{% endfor %} +{% else %} +No related markets in the curated taxonomy. +{% endif %} + +## Investigation Prompts + +{% if investigation_prompts %} +{% for p in investigation_prompts %} +- {{ p }} +{% endfor %} +{% else %} +No investigation prompts configured for this (signal_type, category) tuple. +{% endif %} +{% if signal.manipulation_flags %} + +## Manipulation Flags + +The following manipulation signatures matched this signal. Consumer suppression policy applies; see `docs/methodology/manipulation-taxonomy.md`. + +{% for f in signal.manipulation_flags %} +- `{{ f }}` +{% endfor %} +{% endif %} + +--- + +*Augur - interpretation_mode: {{ interpretation_mode }} - schema {{ schema_version }} - signal_id {{ signal.signal_id }}* diff --git a/src/augur_format/augur_format/deterministic/templates/book_imbalance.md.j2 b/src/augur_format/augur_format/deterministic/templates/book_imbalance.md.j2 new file mode 100644 index 0000000..a84345c --- /dev/null +++ b/src/augur_format/augur_format/deterministic/templates/book_imbalance.md.j2 @@ -0,0 +1,10 @@ +{% extends "_base.md.j2" %} + +{% block extra_summary %} +## Detector-Specific Detail + +- Bid/ask depth ratio: {{ "%.3f" | format(signal.raw_features.get("bid_ask_ratio", 0.5)) }} +- Total liquidity (USD): {{ signal.raw_features.get("liquidity", 0) | int }} +- Calibration provenance: {{ signal.raw_features.calibration_provenance }} + +{% endblock %} diff --git a/src/augur_format/augur_format/deterministic/templates/cross_market_divergence.md.j2 b/src/augur_format/augur_format/deterministic/templates/cross_market_divergence.md.j2 new file mode 100644 index 0000000..d120a77 --- /dev/null +++ b/src/augur_format/augur_format/deterministic/templates/cross_market_divergence.md.j2 @@ -0,0 +1,12 @@ +{% extends "_base.md.j2" %} + +{% block extra_summary %} +## Detector-Specific Detail + +- Spearman rho (current window): {{ "%.3f" | format(signal.raw_features.get("spearman_rho", 0.0)) }} +- Historical Fisher-z: {{ "%.3f" | format(signal.raw_features.get("historical_z", 0.0)) }} +- BH-FDR p-value: {{ "%.4f" | format(signal.raw_features.get("p_value", 1.0)) }} +- Related market: `{{ signal.raw_features.get("related_market_id", "") }}` +- Calibration provenance: {{ signal.raw_features.calibration_provenance }} + +{% endblock %} diff --git a/src/augur_format/augur_format/deterministic/templates/price_velocity.md.j2 b/src/augur_format/augur_format/deterministic/templates/price_velocity.md.j2 new file mode 100644 index 0000000..77c2928 --- /dev/null +++ b/src/augur_format/augur_format/deterministic/templates/price_velocity.md.j2 @@ -0,0 +1,9 @@ +{% extends "_base.md.j2" %} + +{% block extra_summary %} +## Detector-Specific Detail + +- Posterior P(changepoint within last 5 obs): {{ "%.3f" | format(signal.raw_features.posterior_p_change|default(signal.raw_features.get("posterior_p_change", 0.0))) }} +- Calibration provenance: {{ signal.raw_features.calibration_provenance }} + +{% endblock %} diff --git a/src/augur_format/augur_format/deterministic/templates/regime_shift.md.j2 b/src/augur_format/augur_format/deterministic/templates/regime_shift.md.j2 new file mode 100644 index 0000000..81b28be --- /dev/null +++ b/src/augur_format/augur_format/deterministic/templates/regime_shift.md.j2 @@ -0,0 +1,11 @@ +{% extends "_base.md.j2" %} + +{% block extra_summary %} +## Detector-Specific Detail + +- Positive CUSUM: {{ "%.3f" | format(signal.raw_features.get("positive_cusum", 0.0)) }} +- Negative CUSUM: {{ "%.3f" | format(signal.raw_features.get("negative_cusum", 0.0)) }} +- Threshold: {{ "%.3f" | format(signal.raw_features.get("threshold", 0.0)) }} +- Calibration provenance: {{ signal.raw_features.calibration_provenance }} + +{% endblock %} diff --git a/src/augur_format/augur_format/deterministic/templates/volume_spike.md.j2 b/src/augur_format/augur_format/deterministic/templates/volume_spike.md.j2 new file mode 100644 index 0000000..fac7d48 --- /dev/null +++ b/src/augur_format/augur_format/deterministic/templates/volume_spike.md.j2 @@ -0,0 +1,11 @@ +{% extends "_base.md.j2" %} + +{% block extra_summary %} +## Detector-Specific Detail + +- Volume z-score: {{ "%.3f" | format(signal.raw_features.get("z_score", 0.0)) }} +- EWMA baseline: {{ "%.3f" | format(signal.raw_features.get("ewma_mean", 1.0)) }} +- Volume ratio (1h): {{ "%.3f" | format(signal.raw_features.get("volume_ratio_1h", 1.0)) }} +- Calibration provenance: {{ signal.raw_features.calibration_provenance }} + +{% endblock %} diff --git a/src/augur_format/augur_format/llm/models.py b/src/augur_format/augur_format/llm/models.py new file mode 100644 index 0000000..1d7d276 --- /dev/null +++ b/src/augur_format/augur_format/llm/models.py @@ -0,0 +1,41 @@ +"""IntelligenceBrief — the contract emitted by the gated LLM formatter. + +The schema lives in the formatter package because it is the +formatter's output contract, even though the deterministic pathway +in this phase does not produce briefs. The secondary LLM formatter +in the next phase instantiates IntelligenceBrief values that pass +the forbidden-token linter and the ConsumerType enum gate. +""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, ConfigDict, Field + +from augur_signals.models import ConsumerType + + +class IntelligenceBrief(BaseModel): + """Gated LLM formatter output contract. + + ``actionable_for`` is constrained to the ConsumerType registry in + docs/contracts/consumer-registry.md via the Pydantic field type; + the closed-enum validator rechecks this at the formatter boundary + so even dynamically-constructed instances fail loud on unknown + values. + """ + + model_config = ConfigDict(frozen=True, extra="forbid") + + brief_id: str + signal_id: str + headline: str + body_markdown: str + severity: Literal["high", "medium", "low"] + actionable_for: list[ConsumerType] = Field(default_factory=list) + interpretation_mode: Literal["llm_assisted"] = "llm_assisted" + model: str + prompt_hash: str + forbidden_token_check: Literal["passed"] = "passed" # noqa: S105 + schema_version: Literal["1.0.0"] = "1.0.0" diff --git a/src/augur_format/augur_format/routing/__init__.py b/src/augur_format/augur_format/routing/__init__.py new file mode 100644 index 0000000..08920d7 --- /dev/null +++ b/src/augur_format/augur_format/routing/__init__.py @@ -0,0 +1,3 @@ +"""Consumer registry and signal router.""" + +from __future__ import annotations diff --git a/src/augur_format/augur_format/routing/consumer_registry.py b/src/augur_format/augur_format/routing/consumer_registry.py new file mode 100644 index 0000000..0511e71 --- /dev/null +++ b/src/augur_format/augur_format/routing/consumer_registry.py @@ -0,0 +1,66 @@ +"""Consumer registry loader. + +Reads ``config/consumers.toml`` (seeded in the workspace bootstrap) +and exposes the per-category consumer routing plus per-consumer +transport configuration. The router consumes the registry to decide +which consumers should receive a given signal. +""" + +from __future__ import annotations + +import tomllib +from dataclasses import dataclass, field +from pathlib import Path + +from augur_signals.models import ConsumerType + + +@dataclass(frozen=True, slots=True) +class CategoryRouting: + """Default consumers for a market category.""" + + category: str + consumers: tuple[ConsumerType, ...] + + +class ConsumerRegistry: + """Read-only registry loaded from config/consumers.toml.""" + + def __init__(self, routing: dict[str, tuple[ConsumerType, ...]]) -> None: + self._routing = dict(routing) + + def consumers_for_category(self, category: str) -> tuple[ConsumerType, ...]: + """Return the default consumers for *category*. + + Unknown categories fall through to ``default`` — matching the + Routing Table in docs/contracts/consumer-registry.md. + """ + if category in self._routing: + return self._routing[category] + return self._routing.get("default", (ConsumerType.DASHBOARD,)) + + def known_categories(self) -> frozenset[str]: + return frozenset(self._routing.keys()) + + @classmethod + def from_toml(cls, path: Path) -> ConsumerRegistry: + with path.open("rb") as handle: + raw = tomllib.load(handle) + categories_raw = raw.get("categories", {}) + routing: dict[str, tuple[ConsumerType, ...]] = {} + for category, entry in categories_raw.items(): + consumers = tuple(ConsumerType(value) for value in entry.get("consumers", [])) + routing[category] = consumers + if "default" not in routing: + routing["default"] = (ConsumerType.DASHBOARD,) + return cls(routing) + + +@dataclass(frozen=True, slots=True) +class CategoryCoverage: + """One entry of the coverage report.""" + + category: str + consumer_count: int + has_default_fallback: bool + consumers: list[str] = field(default_factory=list) diff --git a/src/augur_format/augur_format/routing/router.py b/src/augur_format/augur_format/routing/router.py new file mode 100644 index 0000000..2653fe4 --- /dev/null +++ b/src/augur_format/augur_format/routing/router.py @@ -0,0 +1,63 @@ +"""Signal router — decides which consumers receive each SignalContext. + +The router composes the category-to-consumers mapping from the +ConsumerRegistry with per-consumer suppression policy (for example, +whether a consumer accepts LLM-assisted briefs). In Phase 3 every +brief emitted is deterministic, so the suppression flag is unused +today; it becomes load-bearing once the gated secondary formatter +produces llm_assisted briefs. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from augur_format.routing.consumer_registry import ConsumerRegistry +from augur_signals.models import ( + ConsumerType, + InterpretationMode, + SignalContext, +) + + +@dataclass(frozen=True, slots=True) +class RoutingDecision: + """The set of consumers receiving a given context plus reasons.""" + + consumers: tuple[ConsumerType, ...] + suppressed: tuple[ConsumerType, ...] = () + + +class SignalRouter: + """Route SignalContext into consumer sets.""" + + def __init__( + self, + registry: ConsumerRegistry, + market_categories: dict[str, str] | None = None, + llm_assisted_consumers: frozenset[ConsumerType] | None = None, + ) -> None: + self._registry = registry + self._market_categories = dict(market_categories or {}) + # Consumers that opt in to llm_assisted briefs. Dashboard is + # the documented default from consumer-registry.md §Why Each + # Consumer Exists. + self._llm_assisted = llm_assisted_consumers or frozenset({ConsumerType.DASHBOARD}) + + def register_market_category(self, market_id: str, category: str) -> None: + self._market_categories[market_id] = category + + def route(self, context: SignalContext) -> RoutingDecision: + """Return the consumer set for *context*. + + Consumers whose subscription excludes the context's + interpretation_mode are reported under ``suppressed`` so + operational metrics can count the drops. + """ + category = self._market_categories.get(context.signal.market_id, "default") + candidates = self._registry.consumers_for_category(category) + if context.interpretation_mode == InterpretationMode.LLM_ASSISTED: + allowed = tuple(c for c in candidates if c in self._llm_assisted) + suppressed = tuple(c for c in candidates if c not in self._llm_assisted) + return RoutingDecision(consumers=allowed, suppressed=suppressed) + return RoutingDecision(consumers=candidates) diff --git a/src/augur_format/augur_format/transport/__init__.py b/src/augur_format/augur_format/transport/__init__.py new file mode 100644 index 0000000..695489d --- /dev/null +++ b/src/augur_format/augur_format/transport/__init__.py @@ -0,0 +1,3 @@ +"""Webhook and WebSocket transport adapters.""" + +from __future__ import annotations diff --git a/src/augur_format/augur_format/transport/retry.py b/src/augur_format/augur_format/transport/retry.py new file mode 100644 index 0000000..e4cfa2f --- /dev/null +++ b/src/augur_format/augur_format/transport/retry.py @@ -0,0 +1,60 @@ +"""Exponential backoff for webhook delivery. + +Parameters match phase-3 §6.4: 1 s initial, 60 s cap, 5 attempts. +The helper takes an awaitable factory (fresh awaitable per attempt) +and an injectable sleep so tests can avoid real-time delays. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable +from dataclasses import dataclass + + +@dataclass(frozen=True, slots=True) +class DeliveryBackoff: + """Backoff schedule for webhook delivery.""" + + initial_seconds: float = 1.0 + max_seconds: float = 60.0 + max_retries: int = 5 + + +class DeliveryRetryExhaustedError(RuntimeError): + """Raised when every delivery attempt fails.""" + + def __init__(self, attempts: int, last_error: BaseException) -> None: + super().__init__(f"webhook retry exhausted after {attempts} attempts: {last_error!r}") + self.attempts = attempts + self.last_error = last_error + + +async def deliver_with_backoff[T]( + factory: Callable[[], Awaitable[T]], + policy: DeliveryBackoff, + sleep: Callable[[float], Awaitable[None]] = asyncio.sleep, +) -> tuple[T, int]: + """Invoke *factory* with exponential backoff. + + Returns ``(result, attempts)`` where ``attempts`` is the 1-based + count of attempts up to and including the successful call so the + caller can surface the actual attempt count in operational + telemetry rather than hardcoding policy.max_retries. + """ + delay = policy.initial_seconds + last_error: BaseException | None = None + for attempt in range(1, policy.max_retries + 1): + try: + result = await factory() + except Exception as err: + last_error = err + if attempt == policy.max_retries: + break + await sleep(delay) + delay = min(delay * 2.0, policy.max_seconds) + else: + return result, attempt + if last_error is None: # pragma: no cover + raise RuntimeError("delivery retry loop exited without capturing an error") + raise DeliveryRetryExhaustedError(attempts=policy.max_retries, last_error=last_error) diff --git a/src/augur_format/augur_format/transport/webhook.py b/src/augur_format/augur_format/transport/webhook.py new file mode 100644 index 0000000..f722c48 --- /dev/null +++ b/src/augur_format/augur_format/transport/webhook.py @@ -0,0 +1,194 @@ +"""Webhook adapter with JSON, Markdown, and Slack Block Kit formats. + +Each WebhookTarget declares the URL, format, authorized consumer +types, and optional auth-header env var. The adapter POSTs the +formatted payload to the URL with exponential-backoff retry on 5xx / +429 / connection errors and drop on 4xx (logged as a configuration +error). Failed deliveries emit an error log with target_id and +signal_id for operational correlation. +""" + +from __future__ import annotations + +import json +import os +from dataclasses import dataclass +from typing import Any, Literal + +import httpx +from pydantic import BaseModel, ConfigDict, HttpUrl + +from augur_format._config import WebhookConfig +from augur_format.deterministic.json_feed import to_canonical_json +from augur_format.deterministic.markdown import MarkdownFormatter +from augur_format.deterministic.severity import derive_severity +from augur_format.transport.retry import ( + DeliveryBackoff, + DeliveryRetryExhaustedError, + deliver_with_backoff, +) +from augur_signals.models import SignalContext + +WebhookFormat = Literal["json", "markdown", "slack_blocks"] + + +class WebhookTarget(BaseModel): + """One configured webhook destination. + + Consumer-type gating and LLM-assisted opt-in live on the + SignalRouter and the LLM formatter gate respectively; neither + belongs on the delivery target, where there is no call site. + Phase-4 re-introduces ``accepts_llm_assisted`` when the gated + formatter needs per-target opt-in. + """ + + model_config = ConfigDict(frozen=True, extra="forbid") + + target_id: str + url: HttpUrl + format: WebhookFormat + auth_header_env: str | None = None + + +@dataclass(frozen=True, slots=True) +class DeliveryResult: + """Outcome of one webhook POST.""" + + target_id: str + status_code: int | None + attempts: int + delivered: bool + reason: str + + +class WebhookFormatter: + """POST payloads to configured webhook targets with retry.""" + + def __init__( + self, + client: httpx.AsyncClient, + config: WebhookConfig | None = None, + markdown: MarkdownFormatter | None = None, + ) -> None: + self._client = client + self._config = config or WebhookConfig() + self._markdown = markdown or MarkdownFormatter() + + def _backoff(self) -> DeliveryBackoff: + return DeliveryBackoff( + initial_seconds=self._config.initial_retry_delay_seconds, + max_seconds=self._config.max_retry_delay_seconds, + max_retries=self._config.max_retries, + ) + + def _render_body(self, context: SignalContext, target: WebhookTarget) -> bytes: + if target.format == "json": + return to_canonical_json(context) + severity = derive_severity(context.signal) + if target.format == "markdown": + rendered = self._markdown.format(context, severity=severity) + return json.dumps({"text": rendered}).encode("utf-8") + # slack_blocks + return json.dumps(self._slack_blocks(context, severity)).encode("utf-8") + + def _slack_blocks(self, context: SignalContext, severity: str) -> dict[str, Any]: + signal = context.signal + blocks: list[dict[str, Any]] = [ + { + "type": "header", + "text": { + "type": "plain_text", + "text": (f"{signal.signal_type.value} | {severity} | {signal.confidence:.2f}"), + }, + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": ( + f"*Market:* {context.market_question}\n" + f"*Resolution criteria:* {context.resolution_criteria}" + ), + }, + }, + ] + if context.related_markets: + related_text = "\n".join( + f"- *{rm.market_id}* ({rm.relationship_type}): {rm.current_price:.3f}" + for rm in context.related_markets + ) + blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": related_text}}) + if context.investigation_prompts: + prompts_text = "\n".join(f"- {p}" for p in context.investigation_prompts) + blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": prompts_text}}) + if signal.manipulation_flags: + flags_text = ", ".join(f.value for f in signal.manipulation_flags) + blocks.append( + { + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": f"Manipulation flags: {flags_text}", + } + ], + } + ) + blocks.append( + { + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": ( + f"Augur · {context.interpretation_mode.value} " + f"· schema {context.schema_version} " + f"· {signal.signal_id}" + ), + } + ], + } + ) + return {"blocks": blocks} + + def _headers(self, target: WebhookTarget) -> dict[str, str]: + headers = {"Content-Type": "application/json"} + if target.auth_header_env: + value = os.environ.get(target.auth_header_env) + if value: + headers["Authorization"] = value + return headers + + async def deliver(self, context: SignalContext, target: WebhookTarget) -> DeliveryResult: + body = self._render_body(context, target) + headers = self._headers(target) + + async def _call() -> httpx.Response: + response = await self._client.post( + str(target.url), + content=body, + headers=headers, + timeout=self._config.delivery_timeout_seconds, + ) + if response.status_code >= 500 or response.status_code == 429: + response.raise_for_status() + return response + + try: + response, attempts = await deliver_with_backoff(_call, self._backoff()) + except DeliveryRetryExhaustedError as err: + return DeliveryResult( + target_id=target.target_id, + status_code=None, + attempts=err.attempts, + delivered=False, + reason=repr(err.last_error), + ) + delivered = 200 <= response.status_code < 400 + return DeliveryResult( + target_id=target.target_id, + status_code=response.status_code, + attempts=attempts, + delivered=delivered, + reason="ok" if delivered else f"http_{response.status_code}", + ) diff --git a/src/augur_format/augur_format/transport/websocket.py b/src/augur_format/augur_format/transport/websocket.py new file mode 100644 index 0000000..3fd69f1 --- /dev/null +++ b/src/augur_format/augur_format/transport/websocket.py @@ -0,0 +1,169 @@ +"""WebSocket transport with structured frames. + +Frame types mirror phase-3 §7: SIGNAL payloads carry the canonical +SignalContext JSON; HEARTBEAT frames arrive at the configured +interval; STORM_START and STORM_END signal the dedup layer's storm +transitions. Broadcast is fan-out across connected clients with a +per-connection bounded queue; slow clients are dropped rather than +stalling the broadcast loop. +""" + +from __future__ import annotations + +import asyncio +import json +from collections.abc import AsyncIterator, Callable +from dataclasses import dataclass, field +from datetime import datetime +from enum import StrEnum +from typing import Any +from uuid import uuid4 + +from augur_format.deterministic.json_feed import to_canonical_json +from augur_signals.models import SignalContext + + +class FrameType(StrEnum): + """Closed frame-type enum for the WebSocket protocol.""" + + SIGNAL = "signal" + HEARTBEAT = "heartbeat" + STORM_START = "storm_start" + STORM_END = "storm_end" + + +@dataclass(frozen=True, slots=True) +class WebSocketFrame: + """One message on the wire.""" + + frame_type: FrameType + frame_id: str + ts: datetime + payload: dict[str, Any] | None = None + + def to_json(self) -> bytes: + body: dict[str, Any] = { + "frame_type": self.frame_type.value, + "frame_id": self.frame_id, + "ts": self.ts.isoformat().replace("+00:00", "Z"), + } + if self.payload is not None: + body["payload"] = self.payload + return json.dumps(body, separators=(",", ":")).encode("utf-8") + + +def signal_frame(context: SignalContext, now: datetime) -> WebSocketFrame: + """Build a SIGNAL frame whose payload is the canonical SignalContext JSON.""" + return WebSocketFrame( + frame_type=FrameType.SIGNAL, + frame_id=str(uuid4()), + ts=now, + payload=json.loads(to_canonical_json(context).decode("utf-8")), + ) + + +def heartbeat_frame(now: datetime) -> WebSocketFrame: + return WebSocketFrame(frame_type=FrameType.HEARTBEAT, frame_id=str(uuid4()), ts=now) + + +def storm_start_frame(now: datetime) -> WebSocketFrame: + return WebSocketFrame(frame_type=FrameType.STORM_START, frame_id=str(uuid4()), ts=now) + + +def storm_end_frame(now: datetime) -> WebSocketFrame: + return WebSocketFrame(frame_type=FrameType.STORM_END, frame_id=str(uuid4()), ts=now) + + +@dataclass(slots=True) +class ClientSubscription: + """One connected client's send queue and filter.""" + + queue: asyncio.Queue[WebSocketFrame] + consumer_type: str | None = None + dropped: int = 0 + # Per-subscription lock so concurrent publishers serialise the + # full-queue check-and-drop instead of each draining one slot. + lock: asyncio.Lock = field(default_factory=asyncio.Lock) + + +class WebSocketBroadcaster: + """In-process broadcaster; adapts to a real websockets server easily. + + The broadcaster manages per-client queues. A ``publish`` call + enqueues the frame for every subscriber whose consumer_type + matches (or whose subscription is unfiltered). Queues are bounded + by ``per_connection_buffer``; enqueue on a full queue drops the + oldest frame to preserve timeliness, matching the dedup/storm + doc's rationale for LIFO under pressure. + """ + + def __init__(self, per_connection_buffer: int = 64) -> None: + if per_connection_buffer <= 0: + raise ValueError("per_connection_buffer must be positive") + self._buffer = per_connection_buffer + self._subscriptions: list[ClientSubscription] = [] + + def subscribe(self, consumer_type: str | None = None) -> ClientSubscription: + sub = ClientSubscription( + queue=asyncio.Queue(maxsize=self._buffer), + consumer_type=consumer_type, + ) + self._subscriptions.append(sub) + return sub + + def unsubscribe(self, subscription: ClientSubscription) -> None: + if subscription in self._subscriptions: + self._subscriptions.remove(subscription) + + def subscriber_count(self) -> int: + return len(self._subscriptions) + + async def publish( + self, + frame: WebSocketFrame, + *, + consumer_type_filter: Callable[[str | None], bool] | None = None, + ) -> None: + for sub in list(self._subscriptions): + if consumer_type_filter is not None and not consumer_type_filter(sub.consumer_type): + continue + async with sub.lock: + # Serialise check-and-drop so concurrent publishers do + # not each drain a slot from the same full queue. + if sub.queue.full(): + try: + sub.queue.get_nowait() + except asyncio.QueueEmpty: + pass + sub.dropped += 1 + await sub.queue.put(frame) + + async def stream(self, subscription: ClientSubscription) -> AsyncIterator[WebSocketFrame]: + """Yield frames queued for *subscription* until cancelled.""" + try: + while True: + yield await subscription.queue.get() + finally: + self.unsubscribe(subscription) + + +@dataclass(slots=True) +class HeartbeatScheduler: + """Answers "should a heartbeat emit now?" against caller-supplied time. + + The scheduler is mutable by design — ``record`` tracks the last + emission so ``should_emit`` can gate the next one. Engine code + owns the outer loop and passes ``now`` explicitly so the scheduler + stays backtest-deterministic. + """ + + interval_seconds: int = 30 + _last_sent: datetime | None = field(default=None) + + def should_emit(self, now: datetime) -> bool: + if self._last_sent is None: + return True + return (now - self._last_sent).total_seconds() >= self.interval_seconds + + def record(self, now: datetime) -> None: + self._last_sent = now diff --git a/src/augur_format/augur_format/validate/__init__.py b/src/augur_format/augur_format/validate/__init__.py new file mode 100644 index 0000000..8a82381 --- /dev/null +++ b/src/augur_format/augur_format/validate/__init__.py @@ -0,0 +1,3 @@ +"""Formatter-boundary validators (closed enum, schema).""" + +from __future__ import annotations diff --git a/src/augur_format/augur_format/validate/enum_check.py b/src/augur_format/augur_format/validate/enum_check.py new file mode 100644 index 0000000..a6dc0aa --- /dev/null +++ b/src/augur_format/augur_format/validate/enum_check.py @@ -0,0 +1,79 @@ +"""Closed-enum validators for the formatter boundary. + +Briefs emitted by any formatter (deterministic today, LLM in the +gated secondary layer) carry an ``actionable_for`` list that must +contain only values from the ConsumerType registry in +docs/contracts/consumer-registry.md. Validation runs at the formatter +boundary; briefs with unknown values are dropped loudly, never +coerced. +""" + +from __future__ import annotations + +from collections.abc import Sequence +from dataclasses import dataclass, field + +from augur_signals.models import ConsumerType + + +@dataclass(frozen=True, slots=True) +class ValidationResult: + """Outcome of a closed-enum validation call.""" + + valid: bool + offending_values: list[str] = field(default_factory=list) + + +def validate_consumer_types(values: Sequence[str]) -> list[str]: + """Return the subset of *values* that are not registered ConsumerType members. + + An empty list means every input is a valid consumer. The order of + the offending values matches the caller's input order so error + messages can point at the original list positions. + """ + valid = {c.value for c in ConsumerType} + return [v for v in values if v not in valid] + + +class ConsumerEnumValidator: + """Validator callable used at the formatter boundary. + + The ``strict`` parameter is retained for the secondary LLM + formatter, which may want to downgrade to a warning-and-drop + during backfill; production deterministic output always runs in + strict mode. + """ + + def __init__(self, *, strict: bool = True) -> None: + self._strict = strict + + @property + def strict(self) -> bool: + return self._strict + + def validate_actionable_for(self, values: Sequence[str]) -> ValidationResult: + """Check an ``actionable_for`` list against the ConsumerType registry.""" + offending = validate_consumer_types(values) + return ValidationResult(valid=not offending, offending_values=offending) + + def validate_brief(self, brief: dict[str, object]) -> ValidationResult: + """Validate a brief payload's actionable_for field. + + The input shape mirrors IntelligenceBrief's model_dump output; + a missing actionable_for is treated as empty, not invalid. The + method is primarily used by the LLM formatter gate once that + layer lands; wiring it here keeps the closed-enum boundary in + a single module. + """ + actionable = brief.get("actionable_for", []) + if not isinstance(actionable, list): + return ValidationResult(valid=False, offending_values=[""]) + string_values: list[str] = [] + bad: list[str] = [] + for value in actionable: + if isinstance(value, str): + string_values.append(value) + else: + bad.append(repr(value)) + offending = bad + validate_consumer_types(string_values) + return ValidationResult(valid=not offending, offending_values=offending) diff --git a/src/augur_format/augur_format/validate/schema_check.py b/src/augur_format/augur_format/validate/schema_check.py new file mode 100644 index 0000000..16dfbdd --- /dev/null +++ b/src/augur_format/augur_format/validate/schema_check.py @@ -0,0 +1,39 @@ +"""JSON schema validator for outgoing payloads. + +Runs in debug builds and integration tests; production skips schema +validation for throughput per the pattern in phase-3 §8.2. The +validator reads exported JSON schemas from ``schemas/`` so producers +and consumers share the same contract snapshot. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +DEFAULT_SCHEMAS_DIR = Path(__file__).resolve().parents[4] / "schemas" + + +class SchemaNotFoundError(RuntimeError): + """Raised when the requested schema is absent from schemas/.""" + + +def load_schema( + model_name: str, + version: str, + root: Path | None = None, +) -> dict[str, object]: + """Load ``schemas/-.json``. + + Missing schemas raise SchemaNotFoundError rather than returning a + permissive empty dict; a missing schema indicates the export step + did not run or the wrong version was requested, both of which + would mask contract drift at the formatter boundary. + """ + schemas_dir = root or DEFAULT_SCHEMAS_DIR + target = schemas_dir / f"{model_name}-{version}.json" + if not target.exists(): + raise SchemaNotFoundError(f"schema not found: {target}") + with target.open(encoding="utf-8") as handle: + data: dict[str, object] = json.load(handle) + return data diff --git a/src/augur_format/pyproject.toml b/src/augur_format/pyproject.toml index c2b3d52..0dcac63 100644 --- a/src/augur_format/pyproject.toml +++ b/src/augur_format/pyproject.toml @@ -7,6 +7,9 @@ requires-python = ">=3.12" dependencies = [ "pydantic>=2.7", "jinja2>=3.1", + "httpx>=0.27", + "websockets>=13.0", + "augur-signals", ] [project.optional-dependencies] @@ -19,3 +22,4 @@ build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["augur_format"] +include = ["augur_format/**/*.j2"] diff --git a/tests/format/test_enum_validator.py b/tests/format/test_enum_validator.py new file mode 100644 index 0000000..e194328 --- /dev/null +++ b/tests/format/test_enum_validator.py @@ -0,0 +1,81 @@ +"""Tests for the closed-enum validator and schema loader.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from augur_format.validate.enum_check import ( + ConsumerEnumValidator, + validate_consumer_types, +) +from augur_format.validate.schema_check import SchemaNotFoundError, load_schema + + +@pytest.mark.unit +def test_validate_returns_empty_on_all_valid() -> None: + assert validate_consumer_types(["macro_research_agent", "dashboard"]) == [] + + +@pytest.mark.unit +def test_validate_returns_offending_values() -> None: + offending = validate_consumer_types(["macro_research_agent", "nyt_newsroom"]) + assert offending == ["nyt_newsroom"] + + +@pytest.mark.unit +def test_validator_rejects_brief_with_unknown_consumer() -> None: + validator = ConsumerEnumValidator() + result = validator.validate_brief({"actionable_for": ["macro_research_agent", "nyt_newsroom"]}) + assert not result.valid + assert "nyt_newsroom" in result.offending_values + + +@pytest.mark.unit +def test_validator_accepts_all_known_consumers() -> None: + validator = ConsumerEnumValidator() + result = validator.validate_brief( + { + "actionable_for": [ + "macro_research_agent", + "geopolitical_research_agent", + "dashboard", + ] + } + ) + assert result.valid + assert result.offending_values == [] + + +@pytest.mark.unit +def test_validator_rejects_non_string_members() -> None: + validator = ConsumerEnumValidator() + result = validator.validate_brief({"actionable_for": ["dashboard", 42]}) + assert not result.valid + + +@pytest.mark.unit +def test_validator_rejects_actionable_for_not_a_list() -> None: + validator = ConsumerEnumValidator() + result = validator.validate_brief({"actionable_for": "dashboard"}) + assert not result.valid + + +@pytest.mark.unit +def test_validator_missing_field_treated_as_empty_list() -> None: + validator = ConsumerEnumValidator() + result = validator.validate_brief({}) + assert result.valid + + +@pytest.mark.unit +def test_load_schema_raises_on_missing(tmp_path: Path) -> None: + with pytest.raises(SchemaNotFoundError): + load_schema("DoesNotExist", "1.0.0", root=tmp_path) + + +@pytest.mark.unit +def test_load_schema_reads_known_schema() -> None: + schema = load_schema("MarketSignal", "1.0.0") + assert schema["title"] == "MarketSignal" diff --git a/tests/format/test_json_canonical.py b/tests/format/test_json_canonical.py new file mode 100644 index 0000000..5dd1e4d --- /dev/null +++ b/tests/format/test_json_canonical.py @@ -0,0 +1,133 @@ +"""Tests for the canonical JSON formatter.""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime + +import pytest + +from augur_format.deterministic.json_feed import ( + CANONICAL_KEY_ORDER, + SIGNAL_KEY_ORDER, + to_canonical_json, +) +from augur_signals.models import ( + InterpretationMode, + ManipulationFlag, + MarketSignal, + RelatedMarketState, + SignalContext, + SignalType, + new_signal_id, +) + + +def _signal() -> MarketSignal: + return MarketSignal( + signal_id=new_signal_id(), + market_id="kalshi_fed", + platform="kalshi", + signal_type=SignalType.PRICE_VELOCITY, + magnitude=0.8765432, + direction=1, + confidence=0.7219876, + fdr_adjusted=True, + detected_at=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + window_seconds=300, + liquidity_tier="high", + manipulation_flags=[ManipulationFlag.SIZE_VS_DEPTH_OUTLIER], + related_market_ids=["kalshi_fed_holds"], + raw_features={ + "posterior_p_change": 0.9123456789, + "calibration_provenance": "price_velocity_bocpd_beta_v1@identity_v0", + }, + ) + + +def _context() -> SignalContext: + return SignalContext( + signal=_signal(), + market_question="Will the Fed raise rates in June 2026?", + resolution_criteria="YES resolves if target range rises.", + resolution_source="Federal Reserve press release", + closes_at=datetime(2026, 6, 15, 18, 0, tzinfo=UTC), + related_markets=[ + RelatedMarketState( + market_id="kalshi_fed_holds", + question="Will the Fed hold rates in June 2026?", + current_price=0.44123, + delta_24h=-0.0235432, + volume_24h=85_000.0, + relationship_type="inverse", + relationship_strength=0.9, + ) + ], + investigation_prompts=["Check FOMC calendar.", "Check governor speeches."], + interpretation_mode=InterpretationMode.DETERMINISTIC, + ) + + +@pytest.mark.unit +def test_byte_identical_across_1000_calls() -> None: + ctx = _context() + outputs = [to_canonical_json(ctx) for _ in range(1000)] + assert all(o == outputs[0] for o in outputs) + + +@pytest.mark.unit +def test_floats_rounded_to_six_decimals() -> None: + ctx = _context() + payload = json.loads(to_canonical_json(ctx)) + # 0.9123456789 must round to 0.912346 at 6 decimals. + provenance_payload = payload["signal"]["raw_features"] + assert provenance_payload["posterior_p_change"] == 0.912346 + + +@pytest.mark.unit +def test_custom_decimals_parameter_rounds_accordingly() -> None: + ctx = _context() + payload = json.loads(to_canonical_json(ctx, float_decimals=2)) + # 0.7219876 -> 0.72 at two decimals. + assert payload["signal"]["confidence"] == 0.72 + + +@pytest.mark.unit +def test_timestamps_use_z_suffix() -> None: + ctx = _context() + payload = json.loads(to_canonical_json(ctx)) + assert payload["signal"]["detected_at"].endswith("Z") + assert "+00:00" not in payload["signal"]["detected_at"] + assert payload["closes_at"].endswith("Z") + + +@pytest.mark.unit +def test_top_level_key_order_matches_canonical_tuple() -> None: + ctx = _context() + # json.loads preserves insertion order of keys; the outer dict's + # key sequence is the canonical key ordering. + payload = json.loads(to_canonical_json(ctx)) + assert list(payload.keys()) == list(CANONICAL_KEY_ORDER) + + +@pytest.mark.unit +def test_signal_key_order_matches_signal_tuple() -> None: + ctx = _context() + payload = json.loads(to_canonical_json(ctx)) + assert list(payload["signal"].keys()) == list(SIGNAL_KEY_ORDER) + + +@pytest.mark.unit +def test_related_market_fields_rounded() -> None: + ctx = _context() + payload = json.loads(to_canonical_json(ctx)) + rm = payload["related_markets"][0] + # 0.44123 stays; -0.0235432 rounds to -0.023543 + assert rm["delta_24h"] == -0.023543 + + +@pytest.mark.unit +def test_manipulation_flags_preserved_as_enum_values() -> None: + ctx = _context() + payload = json.loads(to_canonical_json(ctx)) + assert payload["signal"]["manipulation_flags"] == ["size_vs_depth_outlier"] diff --git a/tests/format/test_markdown_templates.py b/tests/format/test_markdown_templates.py new file mode 100644 index 0000000..f66436c --- /dev/null +++ b/tests/format/test_markdown_templates.py @@ -0,0 +1,154 @@ +"""Tests for the Markdown formatter.""" + +from __future__ import annotations + +from datetime import UTC, datetime + +import pytest + +from augur_format.deterministic.markdown import MarkdownFormatter +from augur_signals.models import ( + InterpretationMode, + ManipulationFlag, + MarketSignal, + RelatedMarketState, + SignalContext, + SignalType, + new_signal_id, +) + +_UNSET_PROMPTS: list[str] = ["Check FOMC calendar."] + + +def _context( + signal_type: SignalType = SignalType.PRICE_VELOCITY, + raw_features: dict[str, float | str] | None = None, + manipulation_flags: list[ManipulationFlag] | None = None, + related: list[RelatedMarketState] | None = None, + prompts: list[str] | None = None, +) -> SignalContext: + rf: dict[str, float | str] = { + "calibration_provenance": f"{signal_type.value}_detector@identity_v0", + "posterior_p_change": 0.82, + "z_score": 2.3, + "spearman_rho": -0.45, + "positive_cusum": 3.1, + "negative_cusum": -0.2, + "threshold": 2.5, + "bid_ask_ratio": 0.75, + "liquidity": 12000.0, + "ewma_mean": 1.2, + "volume_ratio_1h": 3.5, + "historical_z": 1.8, + "p_value": 0.01, + "related_market_id": "kalshi_fed_holds", + } + if raw_features: + rf.update(raw_features) + signal = MarketSignal( + signal_id=new_signal_id(), + market_id="kalshi_fed", + platform="kalshi", + signal_type=signal_type, + magnitude=0.8, + direction=1, + confidence=0.72, + fdr_adjusted=True, + detected_at=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + window_seconds=300, + liquidity_tier="high", + manipulation_flags=manipulation_flags or [], + raw_features=rf, + ) + return SignalContext( + signal=signal, + market_question="Will the Fed raise rates?", + resolution_criteria="YES if rate rises.", + resolution_source="Federal Reserve press release", + closes_at=datetime(2026, 6, 15, tzinfo=UTC), + related_markets=related or [], + investigation_prompts=_UNSET_PROMPTS if prompts is None else prompts, + interpretation_mode=InterpretationMode.DETERMINISTIC, + ) + + +@pytest.fixture +def formatter() -> MarkdownFormatter: + return MarkdownFormatter() + + +@pytest.mark.unit +def test_every_signal_type_renders(formatter: MarkdownFormatter) -> None: + for signal_type in SignalType: + md = formatter.format(_context(signal_type=signal_type), severity="medium") + assert md.startswith("# ") + assert "Signal Summary" in md + + +@pytest.mark.unit +def test_renders_required_fields(formatter: MarkdownFormatter) -> None: + md = formatter.format(_context(), severity="high") + assert "**Severity:** high" in md + assert "Will the Fed raise rates?" in md + assert "Federal Reserve press release" in md + assert "Investigation Prompts" in md + assert "Check FOMC calendar." in md + + +@pytest.mark.unit +def test_manipulation_flag_block_appears_when_flags_present( + formatter: MarkdownFormatter, +) -> None: + md = formatter.format( + _context(manipulation_flags=[ManipulationFlag.SIZE_VS_DEPTH_OUTLIER]), + severity="medium", + ) + assert "Manipulation Flags" in md + assert "size_vs_depth_outlier" in md + + +@pytest.mark.unit +def test_manipulation_flag_block_absent_when_empty( + formatter: MarkdownFormatter, +) -> None: + md = formatter.format(_context(), severity="medium") + assert "Manipulation Flags" not in md + + +@pytest.mark.unit +def test_related_markets_render_as_bullets(formatter: MarkdownFormatter) -> None: + related = [ + RelatedMarketState( + market_id="kalshi_fed_holds", + question="Will the Fed hold?", + current_price=0.42, + delta_24h=-0.03, + volume_24h=80_000.0, + relationship_type="inverse", + relationship_strength=0.9, + ) + ] + md = formatter.format(_context(related=related), severity="medium") + assert "**kalshi_fed_holds** (inverse," in md + assert "No related markets" not in md + + +@pytest.mark.unit +def test_fallback_text_when_no_related_markets(formatter: MarkdownFormatter) -> None: + md = formatter.format(_context(), severity="medium") + assert "No related markets in the curated taxonomy." in md + + +@pytest.mark.unit +def test_fallback_text_when_no_investigation_prompts( + formatter: MarkdownFormatter, +) -> None: + md = formatter.format(_context(prompts=[]), severity="medium") + assert "No investigation prompts configured for this (signal_type, category) tuple." in md + + +@pytest.mark.unit +def test_markdown_deterministic_across_calls(formatter: MarkdownFormatter) -> None: + ctx = _context() + outputs = {formatter.format(ctx, severity="medium") for _ in range(100)} + assert len(outputs) == 1 diff --git a/tests/format/test_routing.py b/tests/format/test_routing.py new file mode 100644 index 0000000..44dc8ae --- /dev/null +++ b/tests/format/test_routing.py @@ -0,0 +1,101 @@ +"""Tests for consumer registry and the signal router.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from pathlib import Path + +import pytest + +from augur_format.routing.consumer_registry import ConsumerRegistry +from augur_format.routing.router import SignalRouter +from augur_signals.models import ( + ConsumerType, + InterpretationMode, + MarketSignal, + SignalContext, + SignalType, + new_signal_id, +) + + +def _context( + market_id: str = "kalshi_fed", + interpretation_mode: InterpretationMode = InterpretationMode.DETERMINISTIC, +) -> SignalContext: + signal = MarketSignal( + signal_id=new_signal_id(), + market_id=market_id, + platform="kalshi", + signal_type=SignalType.PRICE_VELOCITY, + magnitude=0.8, + direction=1, + confidence=0.7, + fdr_adjusted=True, + detected_at=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + window_seconds=300, + liquidity_tier="high", + raw_features={"calibration_provenance": "d@identity_v0"}, + ) + return SignalContext( + signal=signal, + market_question="q", + resolution_criteria="c", + resolution_source="s", + closes_at=datetime(2026, 6, 15, tzinfo=UTC), + related_markets=[], + investigation_prompts=[], + interpretation_mode=interpretation_mode, + ) + + +@pytest.mark.unit +def test_registry_from_toml_reads_routing() -> None: + registry = ConsumerRegistry.from_toml(Path("config/consumers.toml")) + consumers = registry.consumers_for_category("monetary_policy") + assert ConsumerType.MACRO_RESEARCH_AGENT in consumers + assert ConsumerType.FINANCIAL_NEWS_DESK in consumers + assert ConsumerType.DASHBOARD in consumers + + +@pytest.mark.unit +def test_registry_falls_through_to_default_on_unknown_category() -> None: + registry = ConsumerRegistry.from_toml(Path("config/consumers.toml")) + consumers = registry.consumers_for_category("not-a-real-category") + assert consumers == (ConsumerType.DASHBOARD,) + + +@pytest.mark.unit +def test_router_returns_default_consumers_for_unregistered_market() -> None: + registry = ConsumerRegistry.from_toml(Path("config/consumers.toml")) + router = SignalRouter(registry) + decision = router.route(_context()) + assert ConsumerType.DASHBOARD in decision.consumers + + +@pytest.mark.unit +def test_router_applies_market_category() -> None: + registry = ConsumerRegistry.from_toml(Path("config/consumers.toml")) + router = SignalRouter(registry, market_categories={"kalshi_fed": "monetary_policy"}) + decision = router.route(_context()) + assert ConsumerType.MACRO_RESEARCH_AGENT in decision.consumers + + +@pytest.mark.unit +def test_router_suppresses_non_llm_consumers_on_llm_assisted_context() -> None: + registry = ConsumerRegistry.from_toml(Path("config/consumers.toml")) + router = SignalRouter(registry, market_categories={"kalshi_fed": "monetary_policy"}) + decision = router.route(_context(interpretation_mode=InterpretationMode.LLM_ASSISTED)) + assert ConsumerType.DASHBOARD in decision.consumers + assert ConsumerType.MACRO_RESEARCH_AGENT in decision.suppressed + assert ConsumerType.MACRO_RESEARCH_AGENT not in decision.consumers + + +@pytest.mark.unit +def test_router_register_market_category_is_idempotent() -> None: + registry = ConsumerRegistry.from_toml(Path("config/consumers.toml")) + router = SignalRouter(registry) + router.register_market_category("kalshi_fed", "monetary_policy") + router.register_market_category("kalshi_fed", "monetary_policy") + decision = router.route(_context()) + assert ConsumerType.MACRO_RESEARCH_AGENT in decision.consumers diff --git a/tests/format/test_severity.py b/tests/format/test_severity.py new file mode 100644 index 0000000..40d683e --- /dev/null +++ b/tests/format/test_severity.py @@ -0,0 +1,76 @@ +"""Tests for the severity derivation function.""" + +from __future__ import annotations + +from datetime import UTC, datetime + +import pytest + +from augur_format.deterministic.severity import derive_severity +from augur_signals.models import MarketSignal, SignalType, new_signal_id + + +def _signal( + magnitude: float = 0.5, + confidence: float = 0.5, + liquidity_tier: str = "high", +) -> MarketSignal: + return MarketSignal( + signal_id=new_signal_id(), + market_id="m", + platform="kalshi", + signal_type=SignalType.PRICE_VELOCITY, + magnitude=magnitude, + direction=1, + confidence=confidence, + fdr_adjusted=True, + detected_at=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + window_seconds=300, + liquidity_tier=liquidity_tier, # type: ignore[arg-type] + raw_features={"calibration_provenance": "d@identity_v0"}, + ) + + +@pytest.mark.unit +def test_high_tier_above_06_is_high_severity() -> None: + assert derive_severity(_signal(magnitude=0.9, confidence=0.8)) == "high" + + +@pytest.mark.unit +def test_high_tier_between_03_and_06_is_medium() -> None: + assert derive_severity(_signal(magnitude=0.6, confidence=0.6)) == "medium" + + +@pytest.mark.unit +def test_high_tier_at_or_below_03_is_low() -> None: + # 0.5 * 0.6 = 0.3 → not > 0.3 → low. + assert derive_severity(_signal(magnitude=0.5, confidence=0.6)) == "low" + + +@pytest.mark.unit +def test_mid_tier_above_07_is_medium() -> None: + assert derive_severity(_signal(magnitude=0.9, confidence=0.9, liquidity_tier="mid")) == "medium" + + +@pytest.mark.unit +def test_mid_tier_at_or_below_07_is_low() -> None: + assert derive_severity(_signal(magnitude=0.7, confidence=0.7, liquidity_tier="mid")) == "low" + + +@pytest.mark.unit +def test_low_tier_always_low_regardless_of_score() -> None: + assert derive_severity(_signal(magnitude=1.0, confidence=1.0, liquidity_tier="low")) == "low" + + +@pytest.mark.unit +def test_high_tier_boundary_at_06_is_medium_not_high() -> None: + # 0.6 * 1.0 = 0.6 → not > 0.6 → medium. + assert derive_severity(_signal(magnitude=0.6, confidence=1.0)) == "medium" + + +@pytest.mark.unit +def test_derive_is_pure() -> None: + sig = _signal(magnitude=0.9, confidence=0.9) + first = derive_severity(sig) + second = derive_severity(sig) + assert first == second diff --git a/tests/format/test_webhook.py b/tests/format/test_webhook.py new file mode 100644 index 0000000..8d8692c --- /dev/null +++ b/tests/format/test_webhook.py @@ -0,0 +1,189 @@ +"""Tests for the webhook adapter.""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime + +import httpx +import pytest + +from augur_format._config import WebhookConfig +from augur_format.transport.retry import ( + DeliveryBackoff, + DeliveryRetryExhaustedError, + deliver_with_backoff, +) +from augur_format.transport.webhook import ( + WebhookFormatter, + WebhookTarget, +) +from augur_signals.models import ( + InterpretationMode, + MarketSignal, + SignalContext, + SignalType, + new_signal_id, +) + + +def _context() -> SignalContext: + signal = MarketSignal( + signal_id=new_signal_id(), + market_id="kalshi_fed", + platform="kalshi", + signal_type=SignalType.PRICE_VELOCITY, + magnitude=0.8, + direction=1, + confidence=0.7, + fdr_adjusted=True, + detected_at=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + window_seconds=300, + liquidity_tier="high", + raw_features={"calibration_provenance": "d@identity_v0"}, + ) + return SignalContext( + signal=signal, + market_question="Will the Fed raise rates?", + resolution_criteria="YES if rate rises.", + resolution_source="Federal Reserve", + closes_at=datetime(2026, 6, 15, tzinfo=UTC), + related_markets=[], + investigation_prompts=["Check FOMC calendar."], + interpretation_mode=InterpretationMode.DETERMINISTIC, + ) + + +@pytest.mark.unit +async def test_retry_exhaustion_raises() -> None: + async def failing() -> None: + raise ConnectionError("no route") + + async def fake_sleep(_: float) -> None: + return None + + with pytest.raises(DeliveryRetryExhaustedError) as excinfo: + await deliver_with_backoff( + failing, DeliveryBackoff(initial_seconds=0.0, max_retries=3), sleep=fake_sleep + ) + assert excinfo.value.attempts == 3 + + +@pytest.mark.unit +async def test_delivery_succeeds_on_2xx() -> None: + calls: list[dict[str, object]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + calls.append({"url": str(request.url), "body": request.content}) + return httpx.Response(200, json={"ok": True}) + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + formatter = WebhookFormatter(client, WebhookConfig()) + target = WebhookTarget( + target_id="t1", + url="https://hooks.example.com/augur", # type: ignore[arg-type] + format="json", + ) + result = await formatter.deliver(_context(), target) + assert result.delivered + assert result.status_code == 200 + assert len(calls) == 1 + + +@pytest.mark.unit +async def test_delivery_fails_on_5xx_after_retries() -> None: + count = {"n": 0} + + def handler(request: httpx.Request) -> httpx.Response: + count["n"] += 1 + return httpx.Response(500) + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + formatter = WebhookFormatter( + client, + WebhookConfig( + initial_retry_delay_seconds=0.001, + max_retry_delay_seconds=0.001, + max_retries=3, + ), + ) + target = WebhookTarget( + target_id="t1", + url="https://hooks.example.com/augur", # type: ignore[arg-type] + format="json", + ) + result = await formatter.deliver(_context(), target) + assert not result.delivered + assert count["n"] == 3 + + +@pytest.mark.unit +async def test_delivery_drops_on_4xx() -> None: + count = {"n": 0} + + def handler(request: httpx.Request) -> httpx.Response: + count["n"] += 1 + return httpx.Response(400) + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + formatter = WebhookFormatter(client, WebhookConfig(max_retries=3)) + target = WebhookTarget( + target_id="t1", + url="https://hooks.example.com/augur", # type: ignore[arg-type] + format="json", + ) + result = await formatter.deliver(_context(), target) + assert not result.delivered + assert count["n"] == 1 # no retry on 4xx + assert result.status_code == 400 + + +@pytest.mark.unit +async def test_slack_blocks_format_is_valid_block_kit() -> None: + captured: list[bytes] = [] + + def handler(request: httpx.Request) -> httpx.Response: + captured.append(request.content) + return httpx.Response(200, json={"ok": True}) + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + formatter = WebhookFormatter(client, WebhookConfig()) + target = WebhookTarget( + target_id="slack", + url="https://hooks.slack.com/services/TEST", # type: ignore[arg-type] + format="slack_blocks", + ) + result = await formatter.deliver(_context(), target) + assert result.delivered + payload = json.loads(captured[0]) + assert "blocks" in payload + assert any(b["type"] == "header" for b in payload["blocks"]) + # Confidence should be formatted to two decimals in header text. + header = next(b for b in payload["blocks"] if b["type"] == "header") + assert "0.70" in header["text"]["text"] + + +@pytest.mark.unit +async def test_auth_header_from_env_applied(monkeypatch: pytest.MonkeyPatch) -> None: + captured: list[dict[str, str]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + captured.append(dict(request.headers)) + return httpx.Response(200) + + monkeypatch.setenv("AUGUR_TEST_WEBHOOK_AUTH", "Bearer secret-xyz") + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + formatter = WebhookFormatter(client, WebhookConfig()) + target = WebhookTarget( + target_id="t1", + url="https://hooks.example.com/augur", # type: ignore[arg-type] + format="json", + auth_header_env="AUGUR_TEST_WEBHOOK_AUTH", + ) + await formatter.deliver(_context(), target) + assert captured[0].get("authorization") == "Bearer secret-xyz" diff --git a/tests/format/test_websocket.py b/tests/format/test_websocket.py new file mode 100644 index 0000000..36de812 --- /dev/null +++ b/tests/format/test_websocket.py @@ -0,0 +1,135 @@ +"""Tests for the WebSocket broadcaster and frame helpers.""" + +from __future__ import annotations + +import asyncio +import json +from datetime import UTC, datetime, timedelta + +import pytest + +from augur_format.transport.websocket import ( + FrameType, + HeartbeatScheduler, + WebSocketBroadcaster, + heartbeat_frame, + signal_frame, + storm_end_frame, + storm_start_frame, +) +from augur_signals.models import ( + InterpretationMode, + MarketSignal, + SignalContext, + SignalType, + new_signal_id, +) + + +def _context() -> SignalContext: + signal = MarketSignal( + signal_id=new_signal_id(), + market_id="kalshi_fed", + platform="kalshi", + signal_type=SignalType.PRICE_VELOCITY, + magnitude=0.8, + direction=1, + confidence=0.7, + fdr_adjusted=True, + detected_at=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + window_seconds=300, + liquidity_tier="high", + raw_features={"calibration_provenance": "d@identity_v0"}, + ) + return SignalContext( + signal=signal, + market_question="q", + resolution_criteria="c", + resolution_source="s", + closes_at=datetime(2026, 6, 15, tzinfo=UTC), + related_markets=[], + investigation_prompts=[], + interpretation_mode=InterpretationMode.DETERMINISTIC, + ) + + +@pytest.mark.unit +def test_signal_frame_payload_contains_canonical_signal_context() -> None: + ctx = _context() + frame = signal_frame(ctx, datetime(2026, 3, 15, 12, 0, tzinfo=UTC)) + assert frame.frame_type == FrameType.SIGNAL + assert frame.payload is not None + assert frame.payload["signal"]["signal_id"] == ctx.signal.signal_id + + +@pytest.mark.unit +def test_heartbeat_frame_has_no_payload() -> None: + frame = heartbeat_frame(datetime(2026, 3, 15, 12, 0, tzinfo=UTC)) + assert frame.frame_type == FrameType.HEARTBEAT + assert frame.payload is None + + +@pytest.mark.unit +def test_storm_frames_emit_expected_types() -> None: + now = datetime(2026, 3, 15, 12, 0, tzinfo=UTC) + assert storm_start_frame(now).frame_type == FrameType.STORM_START + assert storm_end_frame(now).frame_type == FrameType.STORM_END + + +@pytest.mark.unit +def test_frame_to_json_uses_z_suffix() -> None: + frame = heartbeat_frame(datetime(2026, 3, 15, 12, 0, tzinfo=UTC)) + body = json.loads(frame.to_json()) + assert body["ts"].endswith("Z") + assert "payload" not in body + + +@pytest.mark.asyncio +async def test_broadcaster_fans_out_to_subscribers() -> None: + broadcaster = WebSocketBroadcaster(per_connection_buffer=8) + sub = broadcaster.subscribe() + await broadcaster.publish(heartbeat_frame(datetime(2026, 3, 15, tzinfo=UTC))) + frame = await asyncio.wait_for(sub.queue.get(), timeout=0.1) + assert frame.frame_type == FrameType.HEARTBEAT + + +@pytest.mark.asyncio +async def test_broadcaster_filters_by_consumer_type() -> None: + broadcaster = WebSocketBroadcaster(per_connection_buffer=4) + dashboard = broadcaster.subscribe(consumer_type="dashboard") + macro = broadcaster.subscribe(consumer_type="macro_research_agent") + frame = heartbeat_frame(datetime(2026, 3, 15, tzinfo=UTC)) + await broadcaster.publish( + frame, + consumer_type_filter=lambda ct: ct == "dashboard", + ) + assert dashboard.queue.qsize() == 1 + assert macro.queue.qsize() == 0 + + +@pytest.mark.asyncio +async def test_broadcaster_drops_oldest_on_full_queue() -> None: + broadcaster = WebSocketBroadcaster(per_connection_buffer=2) + sub = broadcaster.subscribe() + now = datetime(2026, 3, 15, 12, 0, tzinfo=UTC) + for _ in range(5): + await broadcaster.publish(heartbeat_frame(now)) + # Queue holds only the last 2; dropped counter tracks the overflow. + assert sub.queue.qsize() == 2 + assert sub.dropped >= 3 + + +@pytest.mark.unit +def test_heartbeat_scheduler_emits_after_interval() -> None: + scheduler = HeartbeatScheduler(interval_seconds=30) + t0 = datetime(2026, 3, 15, 12, 0, tzinfo=UTC) + assert scheduler.should_emit(t0) + scheduler.record(t0) + assert not scheduler.should_emit(t0 + timedelta(seconds=10)) + assert scheduler.should_emit(t0 + timedelta(seconds=30)) + + +@pytest.mark.unit +def test_broadcaster_rejects_invalid_buffer() -> None: + with pytest.raises(ValueError, match="positive"): + WebSocketBroadcaster(per_connection_buffer=0) diff --git a/uv.lock b/uv.lock index 80de3e3..32be738 100644 --- a/uv.lock +++ b/uv.lock @@ -211,8 +211,11 @@ name = "augur-format" version = "0.0.0" source = { editable = "src/augur_format" } dependencies = [ + { name = "augur-signals" }, + { name = "httpx" }, { name = "jinja2" }, { name = "pydantic" }, + { name = "websockets" }, ] [package.optional-dependencies] @@ -226,9 +229,12 @@ llm-local = [ [package.metadata] requires-dist = [ { name = "anthropic", marker = "extra == 'llm-cloud'", specifier = ">=0.30" }, + { name = "augur-signals", editable = "src/augur_signals" }, + { name = "httpx", specifier = ">=0.27" }, { name = "jinja2", specifier = ">=3.1" }, { name = "ollama", marker = "extra == 'llm-local'", specifier = ">=0.3" }, { name = "pydantic", specifier = ">=2.7" }, + { name = "websockets", specifier = ">=13.0" }, ] provides-extras = ["llm-local", "llm-cloud"] @@ -1612,6 +1618,51 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/27/8d/edd0bd910ff803c308ee9a6b7778621af0d10252219ad9f19ef4d4982a61/virtualenv-21.2.4-py3-none-any.whl", hash = "sha256:29d21e941795206138d0f22f4e45ff7050e5da6c6472299fb7103318763861ac", size = 5831232 }, ] +[[package]] +name = "websockets" +version = "16.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/04/24/4b2031d72e840ce4c1ccb255f693b15c334757fc50023e4db9537080b8c4/websockets-16.0.tar.gz", hash = "sha256:5f6261a5e56e8d5c42a4497b364ea24d94d9563e8fbd44e78ac40879c60179b5", size = 179346 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/84/7b/bac442e6b96c9d25092695578dda82403c77936104b5682307bd4deb1ad4/websockets-16.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:71c989cbf3254fbd5e84d3bff31e4da39c43f884e64f2551d14bb3c186230f00", size = 177365 }, + { url = "https://files.pythonhosted.org/packages/b0/fe/136ccece61bd690d9c1f715baaeefd953bb2360134de73519d5df19d29ca/websockets-16.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:8b6e209ffee39ff1b6d0fa7bfef6de950c60dfb91b8fcead17da4ee539121a79", size = 175038 }, + { url = "https://files.pythonhosted.org/packages/40/1e/9771421ac2286eaab95b8575b0cb701ae3663abf8b5e1f64f1fd90d0a673/websockets-16.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:86890e837d61574c92a97496d590968b23c2ef0aeb8a9bc9421d174cd378ae39", size = 175328 }, + { url = "https://files.pythonhosted.org/packages/18/29/71729b4671f21e1eaa5d6573031ab810ad2936c8175f03f97f3ff164c802/websockets-16.0-cp312-cp312-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:9b5aca38b67492ef518a8ab76851862488a478602229112c4b0d58d63a7a4d5c", size = 184915 }, + { url = "https://files.pythonhosted.org/packages/97/bb/21c36b7dbbafc85d2d480cd65df02a1dc93bf76d97147605a8e27ff9409d/websockets-16.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e0334872c0a37b606418ac52f6ab9cfd17317ac26365f7f65e203e2d0d0d359f", size = 186152 }, + { url = "https://files.pythonhosted.org/packages/4a/34/9bf8df0c0cf88fa7bfe36678dc7b02970c9a7d5e065a3099292db87b1be2/websockets-16.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:a0b31e0b424cc6b5a04b8838bbaec1688834b2383256688cf47eb97412531da1", size = 185583 }, + { url = "https://files.pythonhosted.org/packages/47/88/4dd516068e1a3d6ab3c7c183288404cd424a9a02d585efbac226cb61ff2d/websockets-16.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:485c49116d0af10ac698623c513c1cc01c9446c058a4e61e3bf6c19dff7335a2", size = 184880 }, + { url = "https://files.pythonhosted.org/packages/91/d6/7d4553ad4bf1c0421e1ebd4b18de5d9098383b5caa1d937b63df8d04b565/websockets-16.0-cp312-cp312-win32.whl", hash = "sha256:eaded469f5e5b7294e2bdca0ab06becb6756ea86894a47806456089298813c89", size = 178261 }, + { url = "https://files.pythonhosted.org/packages/c3/f0/f3a17365441ed1c27f850a80b2bc680a0fa9505d733fe152fdf5e98c1c0b/websockets-16.0-cp312-cp312-win_amd64.whl", hash = "sha256:5569417dc80977fc8c2d43a86f78e0a5a22fee17565d78621b6bb264a115d4ea", size = 178693 }, + { url = "https://files.pythonhosted.org/packages/cc/9c/baa8456050d1c1b08dd0ec7346026668cbc6f145ab4e314d707bb845bf0d/websockets-16.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:878b336ac47938b474c8f982ac2f7266a540adc3fa4ad74ae96fea9823a02cc9", size = 177364 }, + { url = "https://files.pythonhosted.org/packages/7e/0c/8811fc53e9bcff68fe7de2bcbe75116a8d959ac699a3200f4847a8925210/websockets-16.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:52a0fec0e6c8d9a784c2c78276a48a2bdf099e4ccc2a4cad53b27718dbfd0230", size = 175039 }, + { url = "https://files.pythonhosted.org/packages/aa/82/39a5f910cb99ec0b59e482971238c845af9220d3ab9fa76dd9162cda9d62/websockets-16.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:e6578ed5b6981005df1860a56e3617f14a6c307e6a71b4fff8c48fdc50f3ed2c", size = 175323 }, + { url = "https://files.pythonhosted.org/packages/bd/28/0a25ee5342eb5d5f297d992a77e56892ecb65e7854c7898fb7d35e9b33bd/websockets-16.0-cp313-cp313-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:95724e638f0f9c350bb1c2b0a7ad0e83d9cc0c9259f3ea94e40d7b02a2179ae5", size = 184975 }, + { url = "https://files.pythonhosted.org/packages/f9/66/27ea52741752f5107c2e41fda05e8395a682a1e11c4e592a809a90c6a506/websockets-16.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c0204dc62a89dc9d50d682412c10b3542d748260d743500a85c13cd1ee4bde82", size = 186203 }, + { url = "https://files.pythonhosted.org/packages/37/e5/8e32857371406a757816a2b471939d51c463509be73fa538216ea52b792a/websockets-16.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:52ac480f44d32970d66763115edea932f1c5b1312de36df06d6b219f6741eed8", size = 185653 }, + { url = "https://files.pythonhosted.org/packages/9b/67/f926bac29882894669368dc73f4da900fcdf47955d0a0185d60103df5737/websockets-16.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:6e5a82b677f8f6f59e8dfc34ec06ca6b5b48bc4fcda346acd093694cc2c24d8f", size = 184920 }, + { url = "https://files.pythonhosted.org/packages/3c/a1/3d6ccdcd125b0a42a311bcd15a7f705d688f73b2a22d8cf1c0875d35d34a/websockets-16.0-cp313-cp313-win32.whl", hash = "sha256:abf050a199613f64c886ea10f38b47770a65154dc37181bfaff70c160f45315a", size = 178255 }, + { url = "https://files.pythonhosted.org/packages/6b/ae/90366304d7c2ce80f9b826096a9e9048b4bb760e44d3b873bb272cba696b/websockets-16.0-cp313-cp313-win_amd64.whl", hash = "sha256:3425ac5cf448801335d6fdc7ae1eb22072055417a96cc6b31b3861f455fbc156", size = 178689 }, + { url = "https://files.pythonhosted.org/packages/f3/1d/e88022630271f5bd349ed82417136281931e558d628dd52c4d8621b4a0b2/websockets-16.0-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:8cc451a50f2aee53042ac52d2d053d08bf89bcb31ae799cb4487587661c038a0", size = 177406 }, + { url = "https://files.pythonhosted.org/packages/f2/78/e63be1bf0724eeb4616efb1ae1c9044f7c3953b7957799abb5915bffd38e/websockets-16.0-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:daa3b6ff70a9241cf6c7fc9e949d41232d9d7d26fd3522b1ad2b4d62487e9904", size = 175085 }, + { url = "https://files.pythonhosted.org/packages/bb/f4/d3c9220d818ee955ae390cf319a7c7a467beceb24f05ee7aaaa2414345ba/websockets-16.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:fd3cb4adb94a2a6e2b7c0d8d05cb94e6f1c81a0cf9dc2694fb65c7e8d94c42e4", size = 175328 }, + { url = "https://files.pythonhosted.org/packages/63/bc/d3e208028de777087e6fb2b122051a6ff7bbcca0d6df9d9c2bf1dd869ae9/websockets-16.0-cp314-cp314-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:781caf5e8eee67f663126490c2f96f40906594cb86b408a703630f95550a8c3e", size = 185044 }, + { url = "https://files.pythonhosted.org/packages/ad/6e/9a0927ac24bd33a0a9af834d89e0abc7cfd8e13bed17a86407a66773cc0e/websockets-16.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:caab51a72c51973ca21fa8a18bd8165e1a0183f1ac7066a182ff27107b71e1a4", size = 186279 }, + { url = "https://files.pythonhosted.org/packages/b9/ca/bf1c68440d7a868180e11be653c85959502efd3a709323230314fda6e0b3/websockets-16.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:19c4dc84098e523fd63711e563077d39e90ec6702aff4b5d9e344a60cb3c0cb1", size = 185711 }, + { url = "https://files.pythonhosted.org/packages/c4/f8/fdc34643a989561f217bb477cbc47a3a07212cbda91c0e4389c43c296ebf/websockets-16.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:a5e18a238a2b2249c9a9235466b90e96ae4795672598a58772dd806edc7ac6d3", size = 184982 }, + { url = "https://files.pythonhosted.org/packages/dd/d1/574fa27e233764dbac9c52730d63fcf2823b16f0856b3329fc6268d6ae4f/websockets-16.0-cp314-cp314-win32.whl", hash = "sha256:a069d734c4a043182729edd3e9f247c3b2a4035415a9172fd0f1b71658a320a8", size = 177915 }, + { url = "https://files.pythonhosted.org/packages/8a/f1/ae6b937bf3126b5134ce1f482365fde31a357c784ac51852978768b5eff4/websockets-16.0-cp314-cp314-win_amd64.whl", hash = "sha256:c0ee0e63f23914732c6d7e0cce24915c48f3f1512ec1d079ed01fc629dab269d", size = 178381 }, + { url = "https://files.pythonhosted.org/packages/06/9b/f791d1db48403e1f0a27577a6beb37afae94254a8c6f08be4a23e4930bc0/websockets-16.0-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:a35539cacc3febb22b8f4d4a99cc79b104226a756aa7400adc722e83b0d03244", size = 177737 }, + { url = "https://files.pythonhosted.org/packages/bd/40/53ad02341fa33b3ce489023f635367a4ac98b73570102ad2cdd770dacc9a/websockets-16.0-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:b784ca5de850f4ce93ec85d3269d24d4c82f22b7212023c974c401d4980ebc5e", size = 175268 }, + { url = "https://files.pythonhosted.org/packages/74/9b/6158d4e459b984f949dcbbb0c5d270154c7618e11c01029b9bbd1bb4c4f9/websockets-16.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:569d01a4e7fba956c5ae4fc988f0d4e187900f5497ce46339c996dbf24f17641", size = 175486 }, + { url = "https://files.pythonhosted.org/packages/e5/2d/7583b30208b639c8090206f95073646c2c9ffd66f44df967981a64f849ad/websockets-16.0-cp314-cp314t-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:50f23cdd8343b984957e4077839841146f67a3d31ab0d00e6b824e74c5b2f6e8", size = 185331 }, + { url = "https://files.pythonhosted.org/packages/45/b0/cce3784eb519b7b5ad680d14b9673a31ab8dcb7aad8b64d81709d2430aa8/websockets-16.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:152284a83a00c59b759697b7f9e9cddf4e3c7861dd0d964b472b70f78f89e80e", size = 186501 }, + { url = "https://files.pythonhosted.org/packages/19/60/b8ebe4c7e89fb5f6cdf080623c9d92789a53636950f7abacfc33fe2b3135/websockets-16.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:bc59589ab64b0022385f429b94697348a6a234e8ce22544e3681b2e9331b5944", size = 186062 }, + { url = "https://files.pythonhosted.org/packages/88/a8/a080593f89b0138b6cba1b28f8df5673b5506f72879322288b031337c0b8/websockets-16.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:32da954ffa2814258030e5a57bc73a3635463238e797c7375dc8091327434206", size = 185356 }, + { url = "https://files.pythonhosted.org/packages/c2/b6/b9afed2afadddaf5ebb2afa801abf4b0868f42f8539bfe4b071b5266c9fe/websockets-16.0-cp314-cp314t-win32.whl", hash = "sha256:5a4b4cc550cb665dd8a47f868c8d04c8230f857363ad3c9caf7a0c3bf8c61ca6", size = 178085 }, + { url = "https://files.pythonhosted.org/packages/9f/3e/28135a24e384493fa804216b79a6a6759a38cc4ff59118787b9fb693df93/websockets-16.0-cp314-cp314t-win_amd64.whl", hash = "sha256:b14dc141ed6d2dde437cddb216004bcac6a1df0935d79656387bd41632ba0bbd", size = 178531 }, + { url = "https://files.pythonhosted.org/packages/6f/28/258ebab549c2bf3e64d2b0217b973467394a9cea8c42f70418ca2c5d0d2e/websockets-16.0-py3-none-any.whl", hash = "sha256:1637db62fad1dc833276dded54215f2c7fa46912301a24bd94d45d46a011ceec", size = 171598 }, +] + [[package]] name = "yarl" version = "1.23.0"