From ab8a56a082764c958b523db895da7045b3f05734 Mon Sep 17 00:00:00 2001 From: Mathews-Tom Date: Fri, 17 Apr 2026 13:53:30 +0530 Subject: [PATCH 01/11] chore(format): add jinja2, httpx, websockets deps plus formatter config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit augur-format needs Jinja2 for the Markdown templates, httpx for the webhook adapter, and websockets for the WebSocket transport. Declare all three in the workspace member's pyproject plus depend on augur-signals so formatters can import SignalContext and MarketSignal directly. config/formatters.toml mirrors phase-3 §12.2 block for block: canonical JSON parameters (float decimals, timestamp format), Markdown render settings (template dir, trim/lstrip blocks), webhook retry policy (initial/max delay, max attempts, per-delivery timeout), and WebSocket transport settings (bind, port, heartbeat interval/timeout, per-connection buffer). FormatterConfig composes the per-block Pydantic sub-models with frozen, extra=forbid, and bounded numeric fields. The [json] TOML block maps to ``canonical_json`` on the Python side via a field alias so the attribute does not shadow BaseModel.json. --- config/formatters.toml | 26 +++++++++ src/augur_format/augur_format/_config.py | 68 ++++++++++++++++++++++++ src/augur_format/pyproject.toml | 3 ++ uv.lock | 51 ++++++++++++++++++ 4 files changed, 148 insertions(+) create mode 100644 config/formatters.toml create mode 100644 src/augur_format/augur_format/_config.py diff --git a/config/formatters.toml b/config/formatters.toml new file mode 100644 index 0000000..e7b7a7a --- /dev/null +++ b/config/formatters.toml @@ -0,0 +1,26 @@ +# Deterministic formatter configuration. Schema mirrors phase-3 §12.2 +# verbatim; each block maps onto a Pydantic config sub-model in +# augur_format._config. A malformed file fails at startup via +# augur_signals._config.load_config. + +[json] +float_decimals = 6 +timestamp_format = "iso_z" + +[markdown] +template_dir = "src/augur_format/augur_format/deterministic/templates" +trim_blocks = true +lstrip_blocks = true + +[webhook] +initial_retry_delay_seconds = 1 +max_retry_delay_seconds = 60 +max_retries = 5 +delivery_timeout_seconds = 10 + +[websocket] +bind = "0.0.0.0" +port = 8765 +heartbeat_interval_seconds = 30 +heartbeat_timeout_seconds = 90 +per_connection_buffer = 64 diff --git a/src/augur_format/augur_format/_config.py b/src/augur_format/augur_format/_config.py new file mode 100644 index 0000000..9bfa314 --- /dev/null +++ b/src/augur_format/augur_format/_config.py @@ -0,0 +1,68 @@ +"""Configuration models for deterministic formatters. + +Mirrors config/formatters.toml block-for-block. Loaded at engine +startup via augur_signals._config.load_config; a missing required +value or malformed block fails loudly rather than coercing. +""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, ConfigDict, Field + + +class JsonConfig(BaseModel): + """Canonical JSON formatter parameters.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + float_decimals: int = Field(default=6, ge=0, le=18) + timestamp_format: Literal["iso_z"] = "iso_z" + + +class MarkdownConfig(BaseModel): + """Jinja2 rendering parameters.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + template_dir: str = "src/augur_format/augur_format/deterministic/templates" + trim_blocks: bool = True + lstrip_blocks: bool = True + + +class WebhookConfig(BaseModel): + """Webhook delivery retry and timeout settings.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + initial_retry_delay_seconds: float = Field(default=1.0, gt=0.0) + max_retry_delay_seconds: float = Field(default=60.0, gt=0.0) + max_retries: int = Field(default=5, gt=0) + delivery_timeout_seconds: float = Field(default=10.0, gt=0.0) + + +class WebSocketConfig(BaseModel): + """WebSocket transport bind, heartbeat, and per-connection buffer.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + bind: str = "0.0.0.0" # noqa: S104 — documented default bind for the WS server + port: int = Field(default=8765, gt=0, le=65_535) + heartbeat_interval_seconds: int = Field(default=30, gt=0) + heartbeat_timeout_seconds: int = Field(default=90, gt=0) + per_connection_buffer: int = Field(default=64, gt=0) + + +class FormatterConfig(BaseModel): + """Top-level formatter configuration loaded from config/formatters.toml.""" + + model_config = ConfigDict(frozen=True, extra="forbid", populate_by_name=True) + + # Field aliased so the TOML block is [json] per the documented + # schema, while the Python attribute is ``canonical_json`` to avoid + # shadowing BaseModel.json. + canonical_json: JsonConfig = Field(default_factory=JsonConfig, alias="json") + markdown: MarkdownConfig = Field(default_factory=MarkdownConfig) + webhook: WebhookConfig = Field(default_factory=WebhookConfig) + websocket: WebSocketConfig = Field(default_factory=WebSocketConfig) diff --git a/src/augur_format/pyproject.toml b/src/augur_format/pyproject.toml index c2b3d52..d0d09b2 100644 --- a/src/augur_format/pyproject.toml +++ b/src/augur_format/pyproject.toml @@ -7,6 +7,9 @@ requires-python = ">=3.12" dependencies = [ "pydantic>=2.7", "jinja2>=3.1", + "httpx>=0.27", + "websockets>=13.0", + "augur-signals", ] [project.optional-dependencies] diff --git a/uv.lock b/uv.lock index 80de3e3..32be738 100644 --- a/uv.lock +++ b/uv.lock @@ -211,8 +211,11 @@ name = "augur-format" version = "0.0.0" source = { editable = "src/augur_format" } dependencies = [ + { name = "augur-signals" }, + { name = "httpx" }, { name = "jinja2" }, { name = "pydantic" }, + { name = "websockets" }, ] [package.optional-dependencies] @@ -226,9 +229,12 @@ llm-local = [ [package.metadata] requires-dist = [ { name = "anthropic", marker = "extra == 'llm-cloud'", specifier = ">=0.30" }, + { name = "augur-signals", editable = "src/augur_signals" }, + { name = "httpx", specifier = ">=0.27" }, { name = "jinja2", specifier = ">=3.1" }, { name = "ollama", marker = "extra == 'llm-local'", specifier = ">=0.3" }, { name = "pydantic", specifier = ">=2.7" }, + { name = "websockets", specifier = ">=13.0" }, ] provides-extras = ["llm-local", "llm-cloud"] @@ -1612,6 +1618,51 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/27/8d/edd0bd910ff803c308ee9a6b7778621af0d10252219ad9f19ef4d4982a61/virtualenv-21.2.4-py3-none-any.whl", hash = "sha256:29d21e941795206138d0f22f4e45ff7050e5da6c6472299fb7103318763861ac", size = 5831232 }, ] +[[package]] +name = "websockets" +version = "16.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/04/24/4b2031d72e840ce4c1ccb255f693b15c334757fc50023e4db9537080b8c4/websockets-16.0.tar.gz", hash = "sha256:5f6261a5e56e8d5c42a4497b364ea24d94d9563e8fbd44e78ac40879c60179b5", size = 179346 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/84/7b/bac442e6b96c9d25092695578dda82403c77936104b5682307bd4deb1ad4/websockets-16.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:71c989cbf3254fbd5e84d3bff31e4da39c43f884e64f2551d14bb3c186230f00", size = 177365 }, + { url = "https://files.pythonhosted.org/packages/b0/fe/136ccece61bd690d9c1f715baaeefd953bb2360134de73519d5df19d29ca/websockets-16.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:8b6e209ffee39ff1b6d0fa7bfef6de950c60dfb91b8fcead17da4ee539121a79", size = 175038 }, + { url = "https://files.pythonhosted.org/packages/40/1e/9771421ac2286eaab95b8575b0cb701ae3663abf8b5e1f64f1fd90d0a673/websockets-16.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:86890e837d61574c92a97496d590968b23c2ef0aeb8a9bc9421d174cd378ae39", size = 175328 }, + { url = "https://files.pythonhosted.org/packages/18/29/71729b4671f21e1eaa5d6573031ab810ad2936c8175f03f97f3ff164c802/websockets-16.0-cp312-cp312-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:9b5aca38b67492ef518a8ab76851862488a478602229112c4b0d58d63a7a4d5c", size = 184915 }, + { url = "https://files.pythonhosted.org/packages/97/bb/21c36b7dbbafc85d2d480cd65df02a1dc93bf76d97147605a8e27ff9409d/websockets-16.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e0334872c0a37b606418ac52f6ab9cfd17317ac26365f7f65e203e2d0d0d359f", size = 186152 }, + { url = "https://files.pythonhosted.org/packages/4a/34/9bf8df0c0cf88fa7bfe36678dc7b02970c9a7d5e065a3099292db87b1be2/websockets-16.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:a0b31e0b424cc6b5a04b8838bbaec1688834b2383256688cf47eb97412531da1", size = 185583 }, + { url = "https://files.pythonhosted.org/packages/47/88/4dd516068e1a3d6ab3c7c183288404cd424a9a02d585efbac226cb61ff2d/websockets-16.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:485c49116d0af10ac698623c513c1cc01c9446c058a4e61e3bf6c19dff7335a2", size = 184880 }, + { url = "https://files.pythonhosted.org/packages/91/d6/7d4553ad4bf1c0421e1ebd4b18de5d9098383b5caa1d937b63df8d04b565/websockets-16.0-cp312-cp312-win32.whl", hash = "sha256:eaded469f5e5b7294e2bdca0ab06becb6756ea86894a47806456089298813c89", size = 178261 }, + { url = "https://files.pythonhosted.org/packages/c3/f0/f3a17365441ed1c27f850a80b2bc680a0fa9505d733fe152fdf5e98c1c0b/websockets-16.0-cp312-cp312-win_amd64.whl", hash = "sha256:5569417dc80977fc8c2d43a86f78e0a5a22fee17565d78621b6bb264a115d4ea", size = 178693 }, + { url = "https://files.pythonhosted.org/packages/cc/9c/baa8456050d1c1b08dd0ec7346026668cbc6f145ab4e314d707bb845bf0d/websockets-16.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:878b336ac47938b474c8f982ac2f7266a540adc3fa4ad74ae96fea9823a02cc9", size = 177364 }, + { url = "https://files.pythonhosted.org/packages/7e/0c/8811fc53e9bcff68fe7de2bcbe75116a8d959ac699a3200f4847a8925210/websockets-16.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:52a0fec0e6c8d9a784c2c78276a48a2bdf099e4ccc2a4cad53b27718dbfd0230", size = 175039 }, + { url = "https://files.pythonhosted.org/packages/aa/82/39a5f910cb99ec0b59e482971238c845af9220d3ab9fa76dd9162cda9d62/websockets-16.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:e6578ed5b6981005df1860a56e3617f14a6c307e6a71b4fff8c48fdc50f3ed2c", size = 175323 }, + { url = "https://files.pythonhosted.org/packages/bd/28/0a25ee5342eb5d5f297d992a77e56892ecb65e7854c7898fb7d35e9b33bd/websockets-16.0-cp313-cp313-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:95724e638f0f9c350bb1c2b0a7ad0e83d9cc0c9259f3ea94e40d7b02a2179ae5", size = 184975 }, + { url = "https://files.pythonhosted.org/packages/f9/66/27ea52741752f5107c2e41fda05e8395a682a1e11c4e592a809a90c6a506/websockets-16.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c0204dc62a89dc9d50d682412c10b3542d748260d743500a85c13cd1ee4bde82", size = 186203 }, + { url = "https://files.pythonhosted.org/packages/37/e5/8e32857371406a757816a2b471939d51c463509be73fa538216ea52b792a/websockets-16.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:52ac480f44d32970d66763115edea932f1c5b1312de36df06d6b219f6741eed8", size = 185653 }, + { url = "https://files.pythonhosted.org/packages/9b/67/f926bac29882894669368dc73f4da900fcdf47955d0a0185d60103df5737/websockets-16.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:6e5a82b677f8f6f59e8dfc34ec06ca6b5b48bc4fcda346acd093694cc2c24d8f", size = 184920 }, + { url = "https://files.pythonhosted.org/packages/3c/a1/3d6ccdcd125b0a42a311bcd15a7f705d688f73b2a22d8cf1c0875d35d34a/websockets-16.0-cp313-cp313-win32.whl", hash = "sha256:abf050a199613f64c886ea10f38b47770a65154dc37181bfaff70c160f45315a", size = 178255 }, + { url = "https://files.pythonhosted.org/packages/6b/ae/90366304d7c2ce80f9b826096a9e9048b4bb760e44d3b873bb272cba696b/websockets-16.0-cp313-cp313-win_amd64.whl", hash = "sha256:3425ac5cf448801335d6fdc7ae1eb22072055417a96cc6b31b3861f455fbc156", size = 178689 }, + { url = "https://files.pythonhosted.org/packages/f3/1d/e88022630271f5bd349ed82417136281931e558d628dd52c4d8621b4a0b2/websockets-16.0-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:8cc451a50f2aee53042ac52d2d053d08bf89bcb31ae799cb4487587661c038a0", size = 177406 }, + { url = "https://files.pythonhosted.org/packages/f2/78/e63be1bf0724eeb4616efb1ae1c9044f7c3953b7957799abb5915bffd38e/websockets-16.0-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:daa3b6ff70a9241cf6c7fc9e949d41232d9d7d26fd3522b1ad2b4d62487e9904", size = 175085 }, + { url = "https://files.pythonhosted.org/packages/bb/f4/d3c9220d818ee955ae390cf319a7c7a467beceb24f05ee7aaaa2414345ba/websockets-16.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:fd3cb4adb94a2a6e2b7c0d8d05cb94e6f1c81a0cf9dc2694fb65c7e8d94c42e4", size = 175328 }, + { url = "https://files.pythonhosted.org/packages/63/bc/d3e208028de777087e6fb2b122051a6ff7bbcca0d6df9d9c2bf1dd869ae9/websockets-16.0-cp314-cp314-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:781caf5e8eee67f663126490c2f96f40906594cb86b408a703630f95550a8c3e", size = 185044 }, + { url = "https://files.pythonhosted.org/packages/ad/6e/9a0927ac24bd33a0a9af834d89e0abc7cfd8e13bed17a86407a66773cc0e/websockets-16.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:caab51a72c51973ca21fa8a18bd8165e1a0183f1ac7066a182ff27107b71e1a4", size = 186279 }, + { url = "https://files.pythonhosted.org/packages/b9/ca/bf1c68440d7a868180e11be653c85959502efd3a709323230314fda6e0b3/websockets-16.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:19c4dc84098e523fd63711e563077d39e90ec6702aff4b5d9e344a60cb3c0cb1", size = 185711 }, + { url = "https://files.pythonhosted.org/packages/c4/f8/fdc34643a989561f217bb477cbc47a3a07212cbda91c0e4389c43c296ebf/websockets-16.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:a5e18a238a2b2249c9a9235466b90e96ae4795672598a58772dd806edc7ac6d3", size = 184982 }, + { url = "https://files.pythonhosted.org/packages/dd/d1/574fa27e233764dbac9c52730d63fcf2823b16f0856b3329fc6268d6ae4f/websockets-16.0-cp314-cp314-win32.whl", hash = "sha256:a069d734c4a043182729edd3e9f247c3b2a4035415a9172fd0f1b71658a320a8", size = 177915 }, + { url = "https://files.pythonhosted.org/packages/8a/f1/ae6b937bf3126b5134ce1f482365fde31a357c784ac51852978768b5eff4/websockets-16.0-cp314-cp314-win_amd64.whl", hash = "sha256:c0ee0e63f23914732c6d7e0cce24915c48f3f1512ec1d079ed01fc629dab269d", size = 178381 }, + { url = "https://files.pythonhosted.org/packages/06/9b/f791d1db48403e1f0a27577a6beb37afae94254a8c6f08be4a23e4930bc0/websockets-16.0-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:a35539cacc3febb22b8f4d4a99cc79b104226a756aa7400adc722e83b0d03244", size = 177737 }, + { url = "https://files.pythonhosted.org/packages/bd/40/53ad02341fa33b3ce489023f635367a4ac98b73570102ad2cdd770dacc9a/websockets-16.0-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:b784ca5de850f4ce93ec85d3269d24d4c82f22b7212023c974c401d4980ebc5e", size = 175268 }, + { url = "https://files.pythonhosted.org/packages/74/9b/6158d4e459b984f949dcbbb0c5d270154c7618e11c01029b9bbd1bb4c4f9/websockets-16.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:569d01a4e7fba956c5ae4fc988f0d4e187900f5497ce46339c996dbf24f17641", size = 175486 }, + { url = "https://files.pythonhosted.org/packages/e5/2d/7583b30208b639c8090206f95073646c2c9ffd66f44df967981a64f849ad/websockets-16.0-cp314-cp314t-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:50f23cdd8343b984957e4077839841146f67a3d31ab0d00e6b824e74c5b2f6e8", size = 185331 }, + { url = "https://files.pythonhosted.org/packages/45/b0/cce3784eb519b7b5ad680d14b9673a31ab8dcb7aad8b64d81709d2430aa8/websockets-16.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:152284a83a00c59b759697b7f9e9cddf4e3c7861dd0d964b472b70f78f89e80e", size = 186501 }, + { url = "https://files.pythonhosted.org/packages/19/60/b8ebe4c7e89fb5f6cdf080623c9d92789a53636950f7abacfc33fe2b3135/websockets-16.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:bc59589ab64b0022385f429b94697348a6a234e8ce22544e3681b2e9331b5944", size = 186062 }, + { url = "https://files.pythonhosted.org/packages/88/a8/a080593f89b0138b6cba1b28f8df5673b5506f72879322288b031337c0b8/websockets-16.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:32da954ffa2814258030e5a57bc73a3635463238e797c7375dc8091327434206", size = 185356 }, + { url = "https://files.pythonhosted.org/packages/c2/b6/b9afed2afadddaf5ebb2afa801abf4b0868f42f8539bfe4b071b5266c9fe/websockets-16.0-cp314-cp314t-win32.whl", hash = "sha256:5a4b4cc550cb665dd8a47f868c8d04c8230f857363ad3c9caf7a0c3bf8c61ca6", size = 178085 }, + { url = "https://files.pythonhosted.org/packages/9f/3e/28135a24e384493fa804216b79a6a6759a38cc4ff59118787b9fb693df93/websockets-16.0-cp314-cp314t-win_amd64.whl", hash = "sha256:b14dc141ed6d2dde437cddb216004bcac6a1df0935d79656387bd41632ba0bbd", size = 178531 }, + { url = "https://files.pythonhosted.org/packages/6f/28/258ebab549c2bf3e64d2b0217b973467394a9cea8c42f70418ca2c5d0d2e/websockets-16.0-py3-none-any.whl", hash = "sha256:1637db62fad1dc833276dded54215f2c7fa46912301a24bd94d45d46a011ceec", size = 171598 }, +] + [[package]] name = "yarl" version = "1.23.0" From 4dbe34e8124f5bc5dadaa7185cb99931e791bc00 Mon Sep 17 00:00:00 2001 From: Mathews-Tom Date: Fri, 17 Apr 2026 13:55:08 +0530 Subject: [PATCH 02/11] feat(format): canonical json formatter with stable key ordering to_canonical_json is the load-bearing contract the JSON feed consumers bind to. It turns a SignalContext into UTF-8 JSON bytes that are byte-identical across invocations on the same input, which lets consumers hash outputs and rely on stable comparisons. Three invariants make the output canonical: stable key ordering via explicit tuples (top-level, signal block, related-market block), float rounding to six decimals (configurable via float_decimals), and UTC timestamps with a Z suffix. Pydantic emits +00:00 offsets; the _json_default hook normalizes those to Z so hash equality survives the round-trip. The formatter never silently coerces; non-serializable types raise a TypeError rather than falling back to str or None. Callers are expected to keep SignalContext's schema narrow enough that default Pydantic json-mode dump produces only primitives plus datetimes. Eight tests cover the invariants: 1000-call determinism, six-decimal default rounding, custom float_decimals parameter, Z-suffix timestamp normalization, top-level key ordering (via json.loads preserving insertion order), signal block key ordering, related-market rounding, and manipulation-flag enum preservation. --- .../augur_format/deterministic/json_feed.py | 113 +++++++++++++++ tests/format/test_json_canonical.py | 133 ++++++++++++++++++ 2 files changed, 246 insertions(+) create mode 100644 src/augur_format/augur_format/deterministic/json_feed.py create mode 100644 tests/format/test_json_canonical.py diff --git a/src/augur_format/augur_format/deterministic/json_feed.py b/src/augur_format/augur_format/deterministic/json_feed.py new file mode 100644 index 0000000..74cffa9 --- /dev/null +++ b/src/augur_format/augur_format/deterministic/json_feed.py @@ -0,0 +1,113 @@ +"""Canonical JSON formatter for SignalContext. + +Serializes a SignalContext with stable key ordering, float rounding, +and ISO-8601 UTC timestamps with a ``Z`` suffix. The determinism +contract: same SignalContext in, byte-identical JSON out across any +number of invocations. Consumers can hash the bytes and rely on +stable equality. +""" + +from __future__ import annotations + +import json +from collections.abc import Mapping +from datetime import datetime +from typing import Any + +from augur_signals.models import SignalContext + +CANONICAL_KEY_ORDER: tuple[str, ...] = ( + "signal", + "market_question", + "resolution_criteria", + "resolution_source", + "closes_at", + "related_markets", + "investigation_prompts", + "interpretation_mode", + "schema_version", +) + +SIGNAL_KEY_ORDER: tuple[str, ...] = ( + "signal_id", + "market_id", + "platform", + "signal_type", + "magnitude", + "direction", + "confidence", + "fdr_adjusted", + "detected_at", + "window_seconds", + "liquidity_tier", + "manipulation_flags", + "related_market_ids", + "raw_features", + "schema_version", +) + +RELATED_KEY_ORDER: tuple[str, ...] = ( + "market_id", + "question", + "current_price", + "delta_24h", + "volume_24h", + "relationship_type", + "relationship_strength", +) + + +def to_canonical_json(context: SignalContext, *, float_decimals: int = 6) -> bytes: + """Return the canonical JSON bytes for *context*. + + Args: + context: The SignalContext to serialize. + float_decimals: Decimal places each float field is rounded to + before serialization. Must be applied consistently across + producers and consumers so equality comparison survives + the round-trip. + + Returns: + UTF-8 encoded JSON bytes with no whitespace between separators + and stable key ordering. + """ + dumped = context.model_dump(mode="json") + payload: dict[str, Any] = _ordered_dict(dumped, CANONICAL_KEY_ORDER, float_decimals) + payload["signal"] = _ordered_dict(dumped["signal"], SIGNAL_KEY_ORDER, float_decimals) + payload["related_markets"] = [ + _ordered_dict(rm, RELATED_KEY_ORDER, float_decimals) + for rm in dumped.get("related_markets", []) + ] + return json.dumps( + payload, + default=_json_default, + ensure_ascii=False, + separators=(",", ":"), + sort_keys=False, + ).encode("utf-8") + + +def _ordered_dict( + source: Mapping[str, Any], + key_order: tuple[str, ...], + float_decimals: int, +) -> dict[str, Any]: + return {key: _round_floats(source[key], float_decimals) for key in key_order if key in source} + + +def _round_floats(value: Any, float_decimals: int) -> Any: + if isinstance(value, float): + return round(value, float_decimals) + if isinstance(value, list): + return [_round_floats(v, float_decimals) for v in value] + if isinstance(value, dict): + return {k: _round_floats(v, float_decimals) for k, v in value.items()} + return value + + +def _json_default(obj: Any) -> Any: + if isinstance(obj, datetime): + iso = obj.isoformat() + # Pydantic emits "+00:00"; canonicalize to "Z". + return iso.replace("+00:00", "Z") + raise TypeError(f"cannot serialize {type(obj).__name__}") diff --git a/tests/format/test_json_canonical.py b/tests/format/test_json_canonical.py new file mode 100644 index 0000000..5dd1e4d --- /dev/null +++ b/tests/format/test_json_canonical.py @@ -0,0 +1,133 @@ +"""Tests for the canonical JSON formatter.""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime + +import pytest + +from augur_format.deterministic.json_feed import ( + CANONICAL_KEY_ORDER, + SIGNAL_KEY_ORDER, + to_canonical_json, +) +from augur_signals.models import ( + InterpretationMode, + ManipulationFlag, + MarketSignal, + RelatedMarketState, + SignalContext, + SignalType, + new_signal_id, +) + + +def _signal() -> MarketSignal: + return MarketSignal( + signal_id=new_signal_id(), + market_id="kalshi_fed", + platform="kalshi", + signal_type=SignalType.PRICE_VELOCITY, + magnitude=0.8765432, + direction=1, + confidence=0.7219876, + fdr_adjusted=True, + detected_at=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + window_seconds=300, + liquidity_tier="high", + manipulation_flags=[ManipulationFlag.SIZE_VS_DEPTH_OUTLIER], + related_market_ids=["kalshi_fed_holds"], + raw_features={ + "posterior_p_change": 0.9123456789, + "calibration_provenance": "price_velocity_bocpd_beta_v1@identity_v0", + }, + ) + + +def _context() -> SignalContext: + return SignalContext( + signal=_signal(), + market_question="Will the Fed raise rates in June 2026?", + resolution_criteria="YES resolves if target range rises.", + resolution_source="Federal Reserve press release", + closes_at=datetime(2026, 6, 15, 18, 0, tzinfo=UTC), + related_markets=[ + RelatedMarketState( + market_id="kalshi_fed_holds", + question="Will the Fed hold rates in June 2026?", + current_price=0.44123, + delta_24h=-0.0235432, + volume_24h=85_000.0, + relationship_type="inverse", + relationship_strength=0.9, + ) + ], + investigation_prompts=["Check FOMC calendar.", "Check governor speeches."], + interpretation_mode=InterpretationMode.DETERMINISTIC, + ) + + +@pytest.mark.unit +def test_byte_identical_across_1000_calls() -> None: + ctx = _context() + outputs = [to_canonical_json(ctx) for _ in range(1000)] + assert all(o == outputs[0] for o in outputs) + + +@pytest.mark.unit +def test_floats_rounded_to_six_decimals() -> None: + ctx = _context() + payload = json.loads(to_canonical_json(ctx)) + # 0.9123456789 must round to 0.912346 at 6 decimals. + provenance_payload = payload["signal"]["raw_features"] + assert provenance_payload["posterior_p_change"] == 0.912346 + + +@pytest.mark.unit +def test_custom_decimals_parameter_rounds_accordingly() -> None: + ctx = _context() + payload = json.loads(to_canonical_json(ctx, float_decimals=2)) + # 0.7219876 -> 0.72 at two decimals. + assert payload["signal"]["confidence"] == 0.72 + + +@pytest.mark.unit +def test_timestamps_use_z_suffix() -> None: + ctx = _context() + payload = json.loads(to_canonical_json(ctx)) + assert payload["signal"]["detected_at"].endswith("Z") + assert "+00:00" not in payload["signal"]["detected_at"] + assert payload["closes_at"].endswith("Z") + + +@pytest.mark.unit +def test_top_level_key_order_matches_canonical_tuple() -> None: + ctx = _context() + # json.loads preserves insertion order of keys; the outer dict's + # key sequence is the canonical key ordering. + payload = json.loads(to_canonical_json(ctx)) + assert list(payload.keys()) == list(CANONICAL_KEY_ORDER) + + +@pytest.mark.unit +def test_signal_key_order_matches_signal_tuple() -> None: + ctx = _context() + payload = json.loads(to_canonical_json(ctx)) + assert list(payload["signal"].keys()) == list(SIGNAL_KEY_ORDER) + + +@pytest.mark.unit +def test_related_market_fields_rounded() -> None: + ctx = _context() + payload = json.loads(to_canonical_json(ctx)) + rm = payload["related_markets"][0] + # 0.44123 stays; -0.0235432 rounds to -0.023543 + assert rm["delta_24h"] == -0.023543 + + +@pytest.mark.unit +def test_manipulation_flags_preserved_as_enum_values() -> None: + ctx = _context() + payload = json.loads(to_canonical_json(ctx)) + assert payload["signal"]["manipulation_flags"] == ["size_vs_depth_outlier"] From 1c3eef6fc7a02264366fc58bffa779c3fc325d11 Mon Sep 17 00:00:00 2001 From: Mathews-Tom Date: Fri, 17 Apr 2026 13:56:27 +0530 Subject: [PATCH 03/11] feat(format): deterministic severity derivation from magnitude, confidence, tier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit derive_severity maps a MarketSignal to one of {high, medium, low} purely as a function of magnitude * confidence scored against per-liquidity-tier thresholds. The formula lives in code, not configuration, so every consumer can reproduce the mapping locally without calling the producer. Changing the thresholds requires a schema-version bump on the IntelligenceBrief contract because downstream routing depends on stable severity output. Threshold semantics: high tier: > 0.6 high, > 0.3 medium, else low mid tier: > 0.7 medium, else low low tier: always low — the sample size on low-tier reliability curves is too thin to justify a higher confidence label in a human channel. Eight tests cover each tier, the two boundary thresholds (0.6 for high-tier, 0.7 for mid-tier, both strict inequality), the always-low behavior on low-tier, and the purity contract. --- .../augur_format/deterministic/severity.py | 50 ++++++++++++ tests/format/test_severity.py | 76 +++++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 src/augur_format/augur_format/deterministic/severity.py create mode 100644 tests/format/test_severity.py diff --git a/src/augur_format/augur_format/deterministic/severity.py b/src/augur_format/augur_format/deterministic/severity.py new file mode 100644 index 0000000..b47364a --- /dev/null +++ b/src/augur_format/augur_format/deterministic/severity.py @@ -0,0 +1,50 @@ +"""Deterministic severity derivation. + +Severity is ``magnitude * confidence`` scored against per-tier +thresholds. The formula is pure code (not configuration) so every +consumer can reproduce the mapping locally without a network round +trip. Changing the thresholds requires a schema-version bump on the +IntelligenceBrief contract since downstream routing depends on stable +severity output. + +Threshold table +--------------- + +================ ====== ======= ====== +liquidity_tier high medium low +================ ====== ======= ====== +high > 0.6 > 0.3 ≤ 0.3 +mid > 0.7 ≤ 0.7 ≤ 0.7 +low — — always +================ ====== ======= ====== +""" + +from __future__ import annotations + +from typing import Literal + +from augur_signals.models import MarketSignal + +Severity = Literal["high", "medium", "low"] + + +def derive_severity(signal: MarketSignal) -> Severity: + """Return the deterministic severity label for *signal*. + + The score is ``magnitude * confidence`` (both in [0, 1]); the + threshold applied depends on the liquidity tier. Low-tier markets + always emit "low" severity — the sample size on low-tier reliability + curves is too thin to justify higher confidence in a human channel. + """ + score = signal.magnitude * signal.confidence + if signal.liquidity_tier == "high": + if score > 0.6: + return "high" + if score > 0.3: + return "medium" + return "low" + if signal.liquidity_tier == "mid": + if score > 0.7: + return "medium" + return "low" + return "low" diff --git a/tests/format/test_severity.py b/tests/format/test_severity.py new file mode 100644 index 0000000..40d683e --- /dev/null +++ b/tests/format/test_severity.py @@ -0,0 +1,76 @@ +"""Tests for the severity derivation function.""" + +from __future__ import annotations + +from datetime import UTC, datetime + +import pytest + +from augur_format.deterministic.severity import derive_severity +from augur_signals.models import MarketSignal, SignalType, new_signal_id + + +def _signal( + magnitude: float = 0.5, + confidence: float = 0.5, + liquidity_tier: str = "high", +) -> MarketSignal: + return MarketSignal( + signal_id=new_signal_id(), + market_id="m", + platform="kalshi", + signal_type=SignalType.PRICE_VELOCITY, + magnitude=magnitude, + direction=1, + confidence=confidence, + fdr_adjusted=True, + detected_at=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + window_seconds=300, + liquidity_tier=liquidity_tier, # type: ignore[arg-type] + raw_features={"calibration_provenance": "d@identity_v0"}, + ) + + +@pytest.mark.unit +def test_high_tier_above_06_is_high_severity() -> None: + assert derive_severity(_signal(magnitude=0.9, confidence=0.8)) == "high" + + +@pytest.mark.unit +def test_high_tier_between_03_and_06_is_medium() -> None: + assert derive_severity(_signal(magnitude=0.6, confidence=0.6)) == "medium" + + +@pytest.mark.unit +def test_high_tier_at_or_below_03_is_low() -> None: + # 0.5 * 0.6 = 0.3 → not > 0.3 → low. + assert derive_severity(_signal(magnitude=0.5, confidence=0.6)) == "low" + + +@pytest.mark.unit +def test_mid_tier_above_07_is_medium() -> None: + assert derive_severity(_signal(magnitude=0.9, confidence=0.9, liquidity_tier="mid")) == "medium" + + +@pytest.mark.unit +def test_mid_tier_at_or_below_07_is_low() -> None: + assert derive_severity(_signal(magnitude=0.7, confidence=0.7, liquidity_tier="mid")) == "low" + + +@pytest.mark.unit +def test_low_tier_always_low_regardless_of_score() -> None: + assert derive_severity(_signal(magnitude=1.0, confidence=1.0, liquidity_tier="low")) == "low" + + +@pytest.mark.unit +def test_high_tier_boundary_at_06_is_medium_not_high() -> None: + # 0.6 * 1.0 = 0.6 → not > 0.6 → medium. + assert derive_severity(_signal(magnitude=0.6, confidence=1.0)) == "medium" + + +@pytest.mark.unit +def test_derive_is_pure() -> None: + sig = _signal(magnitude=0.9, confidence=0.9) + first = derive_severity(sig) + second = derive_severity(sig) + assert first == second From 20485299780bb77f4ba55a975282014e3ef7aa74 Mon Sep 17 00:00:00 2001 From: Mathews-Tom Date: Fri, 17 Apr 2026 13:57:36 +0530 Subject: [PATCH 04/11] feat(format): closed-enum and schema validators at the formatter boundary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ConsumerEnumValidator.validate_brief is the gate every formatter passes brief payloads through before emission. A brief whose actionable_for list contains any value outside the ConsumerType registry in docs/contracts/consumer-registry.md is rejected — loud, not coerced. The validator also catches non-string members and actionable_for fields that are not lists, both of which would silently pass an enum check against stringified values. validate_consumer_types is the lower-level primitive the LLM formatter gate in the secondary layer will consume; it returns the offending subset in the caller's input order so error messages can reference the original list positions. schema_check.load_schema reads exported JSON schemas from schemas/ for debug-build validation. A missing schema raises SchemaNotFoundError rather than returning an empty dict so absent exports surface immediately rather than masquerading as permissive validation. The function is designed for debug and integration pathways; production formatters skip JSON schema validation for throughput. Nine tests cover the happy path, single-offender flagging, non-string members, non-list actionable_for, missing field as empty, and both paths through load_schema. --- .../augur_format/validate/__init__.py | 3 + .../augur_format/validate/enum_check.py | 79 ++++++++++++++++++ .../augur_format/validate/schema_check.py | 39 +++++++++ tests/format/test_enum_validator.py | 81 +++++++++++++++++++ 4 files changed, 202 insertions(+) create mode 100644 src/augur_format/augur_format/validate/__init__.py create mode 100644 src/augur_format/augur_format/validate/enum_check.py create mode 100644 src/augur_format/augur_format/validate/schema_check.py create mode 100644 tests/format/test_enum_validator.py diff --git a/src/augur_format/augur_format/validate/__init__.py b/src/augur_format/augur_format/validate/__init__.py new file mode 100644 index 0000000..8a82381 --- /dev/null +++ b/src/augur_format/augur_format/validate/__init__.py @@ -0,0 +1,3 @@ +"""Formatter-boundary validators (closed enum, schema).""" + +from __future__ import annotations diff --git a/src/augur_format/augur_format/validate/enum_check.py b/src/augur_format/augur_format/validate/enum_check.py new file mode 100644 index 0000000..a6dc0aa --- /dev/null +++ b/src/augur_format/augur_format/validate/enum_check.py @@ -0,0 +1,79 @@ +"""Closed-enum validators for the formatter boundary. + +Briefs emitted by any formatter (deterministic today, LLM in the +gated secondary layer) carry an ``actionable_for`` list that must +contain only values from the ConsumerType registry in +docs/contracts/consumer-registry.md. Validation runs at the formatter +boundary; briefs with unknown values are dropped loudly, never +coerced. +""" + +from __future__ import annotations + +from collections.abc import Sequence +from dataclasses import dataclass, field + +from augur_signals.models import ConsumerType + + +@dataclass(frozen=True, slots=True) +class ValidationResult: + """Outcome of a closed-enum validation call.""" + + valid: bool + offending_values: list[str] = field(default_factory=list) + + +def validate_consumer_types(values: Sequence[str]) -> list[str]: + """Return the subset of *values* that are not registered ConsumerType members. + + An empty list means every input is a valid consumer. The order of + the offending values matches the caller's input order so error + messages can point at the original list positions. + """ + valid = {c.value for c in ConsumerType} + return [v for v in values if v not in valid] + + +class ConsumerEnumValidator: + """Validator callable used at the formatter boundary. + + The ``strict`` parameter is retained for the secondary LLM + formatter, which may want to downgrade to a warning-and-drop + during backfill; production deterministic output always runs in + strict mode. + """ + + def __init__(self, *, strict: bool = True) -> None: + self._strict = strict + + @property + def strict(self) -> bool: + return self._strict + + def validate_actionable_for(self, values: Sequence[str]) -> ValidationResult: + """Check an ``actionable_for`` list against the ConsumerType registry.""" + offending = validate_consumer_types(values) + return ValidationResult(valid=not offending, offending_values=offending) + + def validate_brief(self, brief: dict[str, object]) -> ValidationResult: + """Validate a brief payload's actionable_for field. + + The input shape mirrors IntelligenceBrief's model_dump output; + a missing actionable_for is treated as empty, not invalid. The + method is primarily used by the LLM formatter gate once that + layer lands; wiring it here keeps the closed-enum boundary in + a single module. + """ + actionable = brief.get("actionable_for", []) + if not isinstance(actionable, list): + return ValidationResult(valid=False, offending_values=[""]) + string_values: list[str] = [] + bad: list[str] = [] + for value in actionable: + if isinstance(value, str): + string_values.append(value) + else: + bad.append(repr(value)) + offending = bad + validate_consumer_types(string_values) + return ValidationResult(valid=not offending, offending_values=offending) diff --git a/src/augur_format/augur_format/validate/schema_check.py b/src/augur_format/augur_format/validate/schema_check.py new file mode 100644 index 0000000..16dfbdd --- /dev/null +++ b/src/augur_format/augur_format/validate/schema_check.py @@ -0,0 +1,39 @@ +"""JSON schema validator for outgoing payloads. + +Runs in debug builds and integration tests; production skips schema +validation for throughput per the pattern in phase-3 §8.2. The +validator reads exported JSON schemas from ``schemas/`` so producers +and consumers share the same contract snapshot. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +DEFAULT_SCHEMAS_DIR = Path(__file__).resolve().parents[4] / "schemas" + + +class SchemaNotFoundError(RuntimeError): + """Raised when the requested schema is absent from schemas/.""" + + +def load_schema( + model_name: str, + version: str, + root: Path | None = None, +) -> dict[str, object]: + """Load ``schemas/-.json``. + + Missing schemas raise SchemaNotFoundError rather than returning a + permissive empty dict; a missing schema indicates the export step + did not run or the wrong version was requested, both of which + would mask contract drift at the formatter boundary. + """ + schemas_dir = root or DEFAULT_SCHEMAS_DIR + target = schemas_dir / f"{model_name}-{version}.json" + if not target.exists(): + raise SchemaNotFoundError(f"schema not found: {target}") + with target.open(encoding="utf-8") as handle: + data: dict[str, object] = json.load(handle) + return data diff --git a/tests/format/test_enum_validator.py b/tests/format/test_enum_validator.py new file mode 100644 index 0000000..e194328 --- /dev/null +++ b/tests/format/test_enum_validator.py @@ -0,0 +1,81 @@ +"""Tests for the closed-enum validator and schema loader.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from augur_format.validate.enum_check import ( + ConsumerEnumValidator, + validate_consumer_types, +) +from augur_format.validate.schema_check import SchemaNotFoundError, load_schema + + +@pytest.mark.unit +def test_validate_returns_empty_on_all_valid() -> None: + assert validate_consumer_types(["macro_research_agent", "dashboard"]) == [] + + +@pytest.mark.unit +def test_validate_returns_offending_values() -> None: + offending = validate_consumer_types(["macro_research_agent", "nyt_newsroom"]) + assert offending == ["nyt_newsroom"] + + +@pytest.mark.unit +def test_validator_rejects_brief_with_unknown_consumer() -> None: + validator = ConsumerEnumValidator() + result = validator.validate_brief({"actionable_for": ["macro_research_agent", "nyt_newsroom"]}) + assert not result.valid + assert "nyt_newsroom" in result.offending_values + + +@pytest.mark.unit +def test_validator_accepts_all_known_consumers() -> None: + validator = ConsumerEnumValidator() + result = validator.validate_brief( + { + "actionable_for": [ + "macro_research_agent", + "geopolitical_research_agent", + "dashboard", + ] + } + ) + assert result.valid + assert result.offending_values == [] + + +@pytest.mark.unit +def test_validator_rejects_non_string_members() -> None: + validator = ConsumerEnumValidator() + result = validator.validate_brief({"actionable_for": ["dashboard", 42]}) + assert not result.valid + + +@pytest.mark.unit +def test_validator_rejects_actionable_for_not_a_list() -> None: + validator = ConsumerEnumValidator() + result = validator.validate_brief({"actionable_for": "dashboard"}) + assert not result.valid + + +@pytest.mark.unit +def test_validator_missing_field_treated_as_empty_list() -> None: + validator = ConsumerEnumValidator() + result = validator.validate_brief({}) + assert result.valid + + +@pytest.mark.unit +def test_load_schema_raises_on_missing(tmp_path: Path) -> None: + with pytest.raises(SchemaNotFoundError): + load_schema("DoesNotExist", "1.0.0", root=tmp_path) + + +@pytest.mark.unit +def test_load_schema_reads_known_schema() -> None: + schema = load_schema("MarketSignal", "1.0.0") + assert schema["title"] == "MarketSignal" From 040ff2941b50b752e813e3b6eac4143a4ddf5347 Mon Sep 17 00:00:00 2001 From: Mathews-Tom Date: Fri, 17 Apr 2026 14:00:41 +0530 Subject: [PATCH 05/11] feat(format): jinja2 markdown templates with per-signal-type detector detail The shared _base.md.j2 template renders the summary block (severity, confidence, liquidity tier, detected_at), signal summary, resolution criteria, related markets, investigation prompts, manipulation flags (conditional on non-empty), and a provenance footer with interpretation_mode, schema_version, and signal_id. Five per-signal- type templates extend the base and inject detector-specific raw_feature fields through the extra_summary block. MarkdownFormatter wraps Jinja2's Environment with trim_blocks, lstrip_blocks, and keep_trailing_newline set so output stays deterministic across render calls. The template directory is discovered via __file__ so the renderer works both in editable installs and in built wheels. The hatch build config explicitly includes **/*.j2 so templates ship inside the wheel alongside the .py modules. Eight tests cover: every signal type renders to the five distinct templates, required fields appear in the output, the manipulation flag block is conditional on non-empty flags, related markets render as bullets (with fallback text when empty), investigation prompts render as bullets (with fallback text when the list is empty), and the formatter is deterministic across 100 invocations. --- .../augur_format/deterministic/markdown.py | 55 +++++++ .../deterministic/templates/_base.md.j2 | 53 ++++++ .../templates/book_imbalance.md.j2 | 10 ++ .../templates/cross_market_divergence.md.j2 | 12 ++ .../templates/price_velocity.md.j2 | 9 + .../templates/regime_shift.md.j2 | 11 ++ .../templates/volume_spike.md.j2 | 11 ++ src/augur_format/pyproject.toml | 1 + tests/format/test_markdown_templates.py | 154 ++++++++++++++++++ 9 files changed, 316 insertions(+) create mode 100644 src/augur_format/augur_format/deterministic/markdown.py create mode 100644 src/augur_format/augur_format/deterministic/templates/_base.md.j2 create mode 100644 src/augur_format/augur_format/deterministic/templates/book_imbalance.md.j2 create mode 100644 src/augur_format/augur_format/deterministic/templates/cross_market_divergence.md.j2 create mode 100644 src/augur_format/augur_format/deterministic/templates/price_velocity.md.j2 create mode 100644 src/augur_format/augur_format/deterministic/templates/regime_shift.md.j2 create mode 100644 src/augur_format/augur_format/deterministic/templates/volume_spike.md.j2 create mode 100644 tests/format/test_markdown_templates.py diff --git a/src/augur_format/augur_format/deterministic/markdown.py b/src/augur_format/augur_format/deterministic/markdown.py new file mode 100644 index 0000000..f11dabc --- /dev/null +++ b/src/augur_format/augur_format/deterministic/markdown.py @@ -0,0 +1,55 @@ +"""Jinja2 Markdown renderer. + +Templates live alongside this module at ``templates/``; one per +signal type plus a shared ``_base.md.j2``. The renderer is +deterministic given identical inputs and template files. The +templates are committed, so any rendering drift surfaces as a test +failure rather than silent variation. +""" + +from __future__ import annotations + +from pathlib import Path + +from jinja2 import Environment, FileSystemLoader, select_autoescape + +from augur_signals.models import SignalContext + +_DEFAULT_TEMPLATE_DIR = Path(__file__).resolve().parent / "templates" + + +class MarkdownFormatter: + """Render a SignalContext as Markdown via Jinja2.""" + + def __init__(self, template_dir: Path | None = None) -> None: + directory = template_dir or _DEFAULT_TEMPLATE_DIR + self._env = Environment( + loader=FileSystemLoader(str(directory)), + autoescape=select_autoescape(["html"]), + trim_blocks=True, + lstrip_blocks=True, + keep_trailing_newline=True, + ) + + def format(self, context: SignalContext, severity: str) -> str: + """Render the per-signal-type template for *context*. + + Raises jinja2.TemplateNotFound if the signal_type does not + have a dedicated template; a dedicated template exists for + every value in SignalType by construction, so missing + templates indicate a contract drift between enum and templates. + """ + template_name = f"{context.signal.signal_type.value}.md.j2" + template = self._env.get_template(template_name) + return template.render( + signal=context.signal, + market_question=context.market_question, + resolution_criteria=context.resolution_criteria, + resolution_source=context.resolution_source, + closes_at=context.closes_at, + related_markets=context.related_markets, + investigation_prompts=context.investigation_prompts, + interpretation_mode=context.interpretation_mode.value, + schema_version=context.schema_version, + severity=severity, + ) diff --git a/src/augur_format/augur_format/deterministic/templates/_base.md.j2 b/src/augur_format/augur_format/deterministic/templates/_base.md.j2 new file mode 100644 index 0000000..d1013b8 --- /dev/null +++ b/src/augur_format/augur_format/deterministic/templates/_base.md.j2 @@ -0,0 +1,53 @@ +# {{ signal.signal_type | replace("_", " ") | title }} on {{ signal.market_id }} + +**Severity:** {{ severity }} +**Confidence:** {{ "%.2f" | format(signal.confidence) }} +**Liquidity tier:** {{ signal.liquidity_tier }} +**Detected:** {{ signal.detected_at.isoformat() }} + +{% block extra_summary %}{% endblock %} +## Signal Summary + +- Market: {{ market_question }} +- Resolves: {{ closes_at.isoformat() }} +- Resolution source: {{ resolution_source }} +- Movement magnitude: {{ "%.3f" | format(signal.magnitude) }} (direction: {{ signal.direction }}) +- Detection window: {{ (signal.window_seconds // 60) }} minutes + +## Resolution Criteria + +> {{ resolution_criteria }} + +## Related Markets + +{% if related_markets %} +{% for rm in related_markets %} +- **{{ rm.market_id }}** ({{ rm.relationship_type }}, strength {{ "%.2f" | format(rm.relationship_strength) }}): price {{ "%.3f" | format(rm.current_price) }} (24h delta {{ "%.3f" | format(rm.delta_24h) }}, volume {{ rm.volume_24h | int }}) +{% endfor %} +{% else %} +No related markets in the curated taxonomy. +{% endif %} + +## Investigation Prompts + +{% if investigation_prompts %} +{% for p in investigation_prompts %} +- {{ p }} +{% endfor %} +{% else %} +No investigation prompts configured for this (signal_type, category) tuple. +{% endif %} +{% if signal.manipulation_flags %} + +## Manipulation Flags + +The following manipulation signatures matched this signal. Consumer suppression policy applies; see `docs/methodology/manipulation-taxonomy.md`. + +{% for f in signal.manipulation_flags %} +- `{{ f }}` +{% endfor %} +{% endif %} + +--- + +*Augur - interpretation_mode: {{ interpretation_mode }} - schema {{ schema_version }} - signal_id {{ signal.signal_id }}* diff --git a/src/augur_format/augur_format/deterministic/templates/book_imbalance.md.j2 b/src/augur_format/augur_format/deterministic/templates/book_imbalance.md.j2 new file mode 100644 index 0000000..a84345c --- /dev/null +++ b/src/augur_format/augur_format/deterministic/templates/book_imbalance.md.j2 @@ -0,0 +1,10 @@ +{% extends "_base.md.j2" %} + +{% block extra_summary %} +## Detector-Specific Detail + +- Bid/ask depth ratio: {{ "%.3f" | format(signal.raw_features.get("bid_ask_ratio", 0.5)) }} +- Total liquidity (USD): {{ signal.raw_features.get("liquidity", 0) | int }} +- Calibration provenance: {{ signal.raw_features.calibration_provenance }} + +{% endblock %} diff --git a/src/augur_format/augur_format/deterministic/templates/cross_market_divergence.md.j2 b/src/augur_format/augur_format/deterministic/templates/cross_market_divergence.md.j2 new file mode 100644 index 0000000..d120a77 --- /dev/null +++ b/src/augur_format/augur_format/deterministic/templates/cross_market_divergence.md.j2 @@ -0,0 +1,12 @@ +{% extends "_base.md.j2" %} + +{% block extra_summary %} +## Detector-Specific Detail + +- Spearman rho (current window): {{ "%.3f" | format(signal.raw_features.get("spearman_rho", 0.0)) }} +- Historical Fisher-z: {{ "%.3f" | format(signal.raw_features.get("historical_z", 0.0)) }} +- BH-FDR p-value: {{ "%.4f" | format(signal.raw_features.get("p_value", 1.0)) }} +- Related market: `{{ signal.raw_features.get("related_market_id", "") }}` +- Calibration provenance: {{ signal.raw_features.calibration_provenance }} + +{% endblock %} diff --git a/src/augur_format/augur_format/deterministic/templates/price_velocity.md.j2 b/src/augur_format/augur_format/deterministic/templates/price_velocity.md.j2 new file mode 100644 index 0000000..77c2928 --- /dev/null +++ b/src/augur_format/augur_format/deterministic/templates/price_velocity.md.j2 @@ -0,0 +1,9 @@ +{% extends "_base.md.j2" %} + +{% block extra_summary %} +## Detector-Specific Detail + +- Posterior P(changepoint within last 5 obs): {{ "%.3f" | format(signal.raw_features.posterior_p_change|default(signal.raw_features.get("posterior_p_change", 0.0))) }} +- Calibration provenance: {{ signal.raw_features.calibration_provenance }} + +{% endblock %} diff --git a/src/augur_format/augur_format/deterministic/templates/regime_shift.md.j2 b/src/augur_format/augur_format/deterministic/templates/regime_shift.md.j2 new file mode 100644 index 0000000..81b28be --- /dev/null +++ b/src/augur_format/augur_format/deterministic/templates/regime_shift.md.j2 @@ -0,0 +1,11 @@ +{% extends "_base.md.j2" %} + +{% block extra_summary %} +## Detector-Specific Detail + +- Positive CUSUM: {{ "%.3f" | format(signal.raw_features.get("positive_cusum", 0.0)) }} +- Negative CUSUM: {{ "%.3f" | format(signal.raw_features.get("negative_cusum", 0.0)) }} +- Threshold: {{ "%.3f" | format(signal.raw_features.get("threshold", 0.0)) }} +- Calibration provenance: {{ signal.raw_features.calibration_provenance }} + +{% endblock %} diff --git a/src/augur_format/augur_format/deterministic/templates/volume_spike.md.j2 b/src/augur_format/augur_format/deterministic/templates/volume_spike.md.j2 new file mode 100644 index 0000000..fac7d48 --- /dev/null +++ b/src/augur_format/augur_format/deterministic/templates/volume_spike.md.j2 @@ -0,0 +1,11 @@ +{% extends "_base.md.j2" %} + +{% block extra_summary %} +## Detector-Specific Detail + +- Volume z-score: {{ "%.3f" | format(signal.raw_features.get("z_score", 0.0)) }} +- EWMA baseline: {{ "%.3f" | format(signal.raw_features.get("ewma_mean", 1.0)) }} +- Volume ratio (1h): {{ "%.3f" | format(signal.raw_features.get("volume_ratio_1h", 1.0)) }} +- Calibration provenance: {{ signal.raw_features.calibration_provenance }} + +{% endblock %} diff --git a/src/augur_format/pyproject.toml b/src/augur_format/pyproject.toml index d0d09b2..0dcac63 100644 --- a/src/augur_format/pyproject.toml +++ b/src/augur_format/pyproject.toml @@ -22,3 +22,4 @@ build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["augur_format"] +include = ["augur_format/**/*.j2"] diff --git a/tests/format/test_markdown_templates.py b/tests/format/test_markdown_templates.py new file mode 100644 index 0000000..f66436c --- /dev/null +++ b/tests/format/test_markdown_templates.py @@ -0,0 +1,154 @@ +"""Tests for the Markdown formatter.""" + +from __future__ import annotations + +from datetime import UTC, datetime + +import pytest + +from augur_format.deterministic.markdown import MarkdownFormatter +from augur_signals.models import ( + InterpretationMode, + ManipulationFlag, + MarketSignal, + RelatedMarketState, + SignalContext, + SignalType, + new_signal_id, +) + +_UNSET_PROMPTS: list[str] = ["Check FOMC calendar."] + + +def _context( + signal_type: SignalType = SignalType.PRICE_VELOCITY, + raw_features: dict[str, float | str] | None = None, + manipulation_flags: list[ManipulationFlag] | None = None, + related: list[RelatedMarketState] | None = None, + prompts: list[str] | None = None, +) -> SignalContext: + rf: dict[str, float | str] = { + "calibration_provenance": f"{signal_type.value}_detector@identity_v0", + "posterior_p_change": 0.82, + "z_score": 2.3, + "spearman_rho": -0.45, + "positive_cusum": 3.1, + "negative_cusum": -0.2, + "threshold": 2.5, + "bid_ask_ratio": 0.75, + "liquidity": 12000.0, + "ewma_mean": 1.2, + "volume_ratio_1h": 3.5, + "historical_z": 1.8, + "p_value": 0.01, + "related_market_id": "kalshi_fed_holds", + } + if raw_features: + rf.update(raw_features) + signal = MarketSignal( + signal_id=new_signal_id(), + market_id="kalshi_fed", + platform="kalshi", + signal_type=signal_type, + magnitude=0.8, + direction=1, + confidence=0.72, + fdr_adjusted=True, + detected_at=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + window_seconds=300, + liquidity_tier="high", + manipulation_flags=manipulation_flags or [], + raw_features=rf, + ) + return SignalContext( + signal=signal, + market_question="Will the Fed raise rates?", + resolution_criteria="YES if rate rises.", + resolution_source="Federal Reserve press release", + closes_at=datetime(2026, 6, 15, tzinfo=UTC), + related_markets=related or [], + investigation_prompts=_UNSET_PROMPTS if prompts is None else prompts, + interpretation_mode=InterpretationMode.DETERMINISTIC, + ) + + +@pytest.fixture +def formatter() -> MarkdownFormatter: + return MarkdownFormatter() + + +@pytest.mark.unit +def test_every_signal_type_renders(formatter: MarkdownFormatter) -> None: + for signal_type in SignalType: + md = formatter.format(_context(signal_type=signal_type), severity="medium") + assert md.startswith("# ") + assert "Signal Summary" in md + + +@pytest.mark.unit +def test_renders_required_fields(formatter: MarkdownFormatter) -> None: + md = formatter.format(_context(), severity="high") + assert "**Severity:** high" in md + assert "Will the Fed raise rates?" in md + assert "Federal Reserve press release" in md + assert "Investigation Prompts" in md + assert "Check FOMC calendar." in md + + +@pytest.mark.unit +def test_manipulation_flag_block_appears_when_flags_present( + formatter: MarkdownFormatter, +) -> None: + md = formatter.format( + _context(manipulation_flags=[ManipulationFlag.SIZE_VS_DEPTH_OUTLIER]), + severity="medium", + ) + assert "Manipulation Flags" in md + assert "size_vs_depth_outlier" in md + + +@pytest.mark.unit +def test_manipulation_flag_block_absent_when_empty( + formatter: MarkdownFormatter, +) -> None: + md = formatter.format(_context(), severity="medium") + assert "Manipulation Flags" not in md + + +@pytest.mark.unit +def test_related_markets_render_as_bullets(formatter: MarkdownFormatter) -> None: + related = [ + RelatedMarketState( + market_id="kalshi_fed_holds", + question="Will the Fed hold?", + current_price=0.42, + delta_24h=-0.03, + volume_24h=80_000.0, + relationship_type="inverse", + relationship_strength=0.9, + ) + ] + md = formatter.format(_context(related=related), severity="medium") + assert "**kalshi_fed_holds** (inverse," in md + assert "No related markets" not in md + + +@pytest.mark.unit +def test_fallback_text_when_no_related_markets(formatter: MarkdownFormatter) -> None: + md = formatter.format(_context(), severity="medium") + assert "No related markets in the curated taxonomy." in md + + +@pytest.mark.unit +def test_fallback_text_when_no_investigation_prompts( + formatter: MarkdownFormatter, +) -> None: + md = formatter.format(_context(prompts=[]), severity="medium") + assert "No investigation prompts configured for this (signal_type, category) tuple." in md + + +@pytest.mark.unit +def test_markdown_deterministic_across_calls(formatter: MarkdownFormatter) -> None: + ctx = _context() + outputs = {formatter.format(ctx, severity="medium") for _ in range(100)} + assert len(outputs) == 1 From 9e14201e2b8bc19cf4415867ccfe0e24c91c45bd Mon Sep 17 00:00:00 2001 From: Mathews-Tom Date: Fri, 17 Apr 2026 14:01:36 +0530 Subject: [PATCH 06/11] feat(format): intelligencebrief contract and schema export MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit IntelligenceBrief is the payload the gated LLM formatter emits in the secondary layer. The contract is declared here in the formatter package because it is the formatter's output, even though the deterministic pathway this phase does not produce briefs. actionable_for is typed as list[ConsumerType] so Pydantic validates the closed-enum membership at construction; the boundary validator rechecks dynamically-constructed instances. forbidden_token_check defaults to "passed" — the gated formatter must run the linter and construct the brief only after it succeeds, so the Literal documents the invariant without making the field configurable at construction. interpretation_mode is locked to "llm_assisted" for the same reason; every instance of this model represents LLM-generated output. scripts/export_schemas.py registers IntelligenceBrief-1.0.0 alongside the four Phase-1 schemas. schemas/IntelligenceBrief-1.0.0.json ships in the repo so consumers can validate against the contract before the secondary formatter lands. --- schemas/IntelligenceBrief-1.0.0.json | 90 +++++++++++++++++++++ scripts/export_schemas.py | 2 + src/augur_format/augur_format/llm/models.py | 41 ++++++++++ 3 files changed, 133 insertions(+) create mode 100644 schemas/IntelligenceBrief-1.0.0.json create mode 100644 src/augur_format/augur_format/llm/models.py diff --git a/schemas/IntelligenceBrief-1.0.0.json b/schemas/IntelligenceBrief-1.0.0.json new file mode 100644 index 0000000..65637ea --- /dev/null +++ b/schemas/IntelligenceBrief-1.0.0.json @@ -0,0 +1,90 @@ +{ + "$defs": { + "ConsumerType": { + "description": "Registered consumers of the brief feed per docs/contracts/consumer-registry.md.", + "enum": [ + "macro_research_agent", + "geopolitical_research_agent", + "crypto_research_agent", + "financial_news_desk", + "regulatory_news_desk", + "dashboard" + ], + "title": "ConsumerType", + "type": "string" + } + }, + "additionalProperties": false, + "description": "Gated LLM formatter output contract.\n\n``actionable_for`` is constrained to the ConsumerType registry in\ndocs/contracts/consumer-registry.md via the Pydantic field type;\nthe closed-enum validator rechecks this at the formatter boundary\nso even dynamically-constructed instances fail loud on unknown\nvalues.", + "properties": { + "actionable_for": { + "items": { + "$ref": "#/$defs/ConsumerType" + }, + "title": "Actionable For", + "type": "array" + }, + "body_markdown": { + "title": "Body Markdown", + "type": "string" + }, + "brief_id": { + "title": "Brief Id", + "type": "string" + }, + "forbidden_token_check": { + "const": "passed", + "default": "passed", + "title": "Forbidden Token Check", + "type": "string" + }, + "headline": { + "title": "Headline", + "type": "string" + }, + "interpretation_mode": { + "const": "llm_assisted", + "default": "llm_assisted", + "title": "Interpretation Mode", + "type": "string" + }, + "model": { + "title": "Model", + "type": "string" + }, + "prompt_hash": { + "title": "Prompt Hash", + "type": "string" + }, + "schema_version": { + "const": "1.0.0", + "default": "1.0.0", + "title": "Schema Version", + "type": "string" + }, + "severity": { + "enum": [ + "high", + "medium", + "low" + ], + "title": "Severity", + "type": "string" + }, + "signal_id": { + "title": "Signal Id", + "type": "string" + } + }, + "required": [ + "brief_id", + "signal_id", + "headline", + "body_markdown", + "severity", + "model", + "prompt_hash" + ], + "title": "IntelligenceBrief", + "type": "object" +} diff --git a/scripts/export_schemas.py b/scripts/export_schemas.py index ba3f8aa..c6cb45d 100644 --- a/scripts/export_schemas.py +++ b/scripts/export_schemas.py @@ -25,6 +25,7 @@ from pydantic import BaseModel +from augur_format.llm.models import IntelligenceBrief from augur_signals.models import ( FeatureVector, MarketSignal, @@ -41,6 +42,7 @@ (FeatureVector, "1.0.0"), (MarketSignal, "1.0.0"), (SignalContext, "1.0.0"), + (IntelligenceBrief, "1.0.0"), ] diff --git a/src/augur_format/augur_format/llm/models.py b/src/augur_format/augur_format/llm/models.py new file mode 100644 index 0000000..1d7d276 --- /dev/null +++ b/src/augur_format/augur_format/llm/models.py @@ -0,0 +1,41 @@ +"""IntelligenceBrief — the contract emitted by the gated LLM formatter. + +The schema lives in the formatter package because it is the +formatter's output contract, even though the deterministic pathway +in this phase does not produce briefs. The secondary LLM formatter +in the next phase instantiates IntelligenceBrief values that pass +the forbidden-token linter and the ConsumerType enum gate. +""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, ConfigDict, Field + +from augur_signals.models import ConsumerType + + +class IntelligenceBrief(BaseModel): + """Gated LLM formatter output contract. + + ``actionable_for`` is constrained to the ConsumerType registry in + docs/contracts/consumer-registry.md via the Pydantic field type; + the closed-enum validator rechecks this at the formatter boundary + so even dynamically-constructed instances fail loud on unknown + values. + """ + + model_config = ConfigDict(frozen=True, extra="forbid") + + brief_id: str + signal_id: str + headline: str + body_markdown: str + severity: Literal["high", "medium", "low"] + actionable_for: list[ConsumerType] = Field(default_factory=list) + interpretation_mode: Literal["llm_assisted"] = "llm_assisted" + model: str + prompt_hash: str + forbidden_token_check: Literal["passed"] = "passed" # noqa: S105 + schema_version: Literal["1.0.0"] = "1.0.0" From c7592143f62a9869431b56edd9c56e5df60a1709 Mon Sep 17 00:00:00 2001 From: Mathews-Tom Date: Fri, 17 Apr 2026 14:03:28 +0530 Subject: [PATCH 07/11] feat(format): webhook adapter with slack block kit, markdown, and json formats MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WebhookFormatter POSTs SignalContext payloads to configured destinations. Three content formats: canonical JSON bytes from the to_canonical_json primitive, Markdown wrapped in {"text": ...} for simple webhook consumers, and Slack Block Kit JSON for direct Slack integrations. The Slack block layout renders a header (signal_type | severity | confidence), a section with the market question and resolution criteria, optional sections for related markets and investigation prompts, and context blocks for manipulation flags and the provenance footer. The formatter is fully deterministic from a SignalContext; no LLM. deliver_with_backoff implements the retry schedule from phase-3 §6.4 (1 s initial, 60 s cap, 5 attempts). The adapter retries on 5xx and 429, drops on any other 4xx (treated as configuration errors that retries cannot recover). Each DeliveryResult records status_code, attempt count, and reason so the engine can track augur_webhook_delivery_failures by target_id. Auth headers are read from the target's auth_header_env env var at delivery time so credentials never land in config files. The pattern matches every other adapter in the codebase. Six tests cover the retry-exhaustion path, 2xx happy path, 5xx exhaustion via MockTransport, 4xx drop-without-retry, Slack block format shape, and env-var-sourced auth header application. --- .../augur_format/transport/__init__.py | 3 + .../augur_format/transport/retry.py | 52 +++++ .../augur_format/transport/webhook.py | 185 +++++++++++++++++ tests/format/test_webhook.py | 189 ++++++++++++++++++ 4 files changed, 429 insertions(+) create mode 100644 src/augur_format/augur_format/transport/__init__.py create mode 100644 src/augur_format/augur_format/transport/retry.py create mode 100644 src/augur_format/augur_format/transport/webhook.py create mode 100644 tests/format/test_webhook.py diff --git a/src/augur_format/augur_format/transport/__init__.py b/src/augur_format/augur_format/transport/__init__.py new file mode 100644 index 0000000..695489d --- /dev/null +++ b/src/augur_format/augur_format/transport/__init__.py @@ -0,0 +1,3 @@ +"""Webhook and WebSocket transport adapters.""" + +from __future__ import annotations diff --git a/src/augur_format/augur_format/transport/retry.py b/src/augur_format/augur_format/transport/retry.py new file mode 100644 index 0000000..f0cce08 --- /dev/null +++ b/src/augur_format/augur_format/transport/retry.py @@ -0,0 +1,52 @@ +"""Exponential backoff for webhook delivery. + +Parameters match phase-3 §6.4: 1 s initial, 60 s cap, 5 attempts. +The helper takes an awaitable factory (fresh awaitable per attempt) +and an injectable sleep so tests can avoid real-time delays. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable +from dataclasses import dataclass + + +@dataclass(frozen=True, slots=True) +class DeliveryBackoff: + """Backoff schedule for webhook delivery.""" + + initial_seconds: float = 1.0 + max_seconds: float = 60.0 + max_retries: int = 5 + + +class DeliveryRetryExhaustedError(RuntimeError): + """Raised when every delivery attempt fails.""" + + def __init__(self, attempts: int, last_error: BaseException) -> None: + super().__init__(f"webhook retry exhausted after {attempts} attempts: {last_error!r}") + self.attempts = attempts + self.last_error = last_error + + +async def deliver_with_backoff[T]( + factory: Callable[[], Awaitable[T]], + policy: DeliveryBackoff, + sleep: Callable[[float], Awaitable[None]] = asyncio.sleep, +) -> T: + """Invoke *factory* with exponential backoff.""" + delay = policy.initial_seconds + last_error: BaseException | None = None + for attempt in range(1, policy.max_retries + 1): + try: + return await factory() + except Exception as err: + last_error = err + if attempt == policy.max_retries: + break + await sleep(delay) + delay = min(delay * 2.0, policy.max_seconds) + if last_error is None: # pragma: no cover + raise RuntimeError("delivery retry loop exited without capturing an error") + raise DeliveryRetryExhaustedError(attempts=policy.max_retries, last_error=last_error) diff --git a/src/augur_format/augur_format/transport/webhook.py b/src/augur_format/augur_format/transport/webhook.py new file mode 100644 index 0000000..8e910be --- /dev/null +++ b/src/augur_format/augur_format/transport/webhook.py @@ -0,0 +1,185 @@ +"""Webhook adapter with JSON, Markdown, and Slack Block Kit formats. + +Each WebhookTarget declares the URL, format, authorized consumer +types, and optional auth-header env var. The adapter POSTs the +formatted payload to the URL with exponential-backoff retry on 5xx / +429 / connection errors and drop on 4xx (logged as a configuration +error). Failed deliveries emit an error log with target_id and +signal_id for operational correlation. +""" + +from __future__ import annotations + +import json +import os +from dataclasses import dataclass +from typing import Any, Literal + +import httpx +from pydantic import BaseModel, ConfigDict, Field, HttpUrl + +from augur_format._config import WebhookConfig +from augur_format.deterministic.json_feed import to_canonical_json +from augur_format.deterministic.markdown import MarkdownFormatter +from augur_format.deterministic.severity import derive_severity +from augur_format.transport.retry import DeliveryBackoff, deliver_with_backoff +from augur_signals.models import ConsumerType, SignalContext + +WebhookFormat = Literal["json", "markdown", "slack_blocks"] + + +class WebhookTarget(BaseModel): + """One configured webhook destination.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + target_id: str + url: HttpUrl + format: WebhookFormat + consumer_types: list[ConsumerType] = Field(default_factory=list) + auth_header_env: str | None = None + accepts_llm_assisted: bool = False + + +@dataclass(frozen=True, slots=True) +class DeliveryResult: + """Outcome of one webhook POST.""" + + target_id: str + status_code: int | None + attempts: int + delivered: bool + reason: str + + +class WebhookFormatter: + """POST payloads to configured webhook targets with retry.""" + + def __init__( + self, + client: httpx.AsyncClient, + config: WebhookConfig | None = None, + markdown: MarkdownFormatter | None = None, + ) -> None: + self._client = client + self._config = config or WebhookConfig() + self._markdown = markdown or MarkdownFormatter() + + def _backoff(self) -> DeliveryBackoff: + return DeliveryBackoff( + initial_seconds=self._config.initial_retry_delay_seconds, + max_seconds=self._config.max_retry_delay_seconds, + max_retries=self._config.max_retries, + ) + + def _render_body(self, context: SignalContext, target: WebhookTarget) -> bytes: + if target.format == "json": + return to_canonical_json(context) + severity = derive_severity(context.signal) + if target.format == "markdown": + rendered = self._markdown.format(context, severity=severity) + return json.dumps({"text": rendered}).encode("utf-8") + # slack_blocks + return json.dumps(self._slack_blocks(context, severity)).encode("utf-8") + + def _slack_blocks(self, context: SignalContext, severity: str) -> dict[str, Any]: + signal = context.signal + blocks: list[dict[str, Any]] = [ + { + "type": "header", + "text": { + "type": "plain_text", + "text": (f"{signal.signal_type.value} | {severity} | {signal.confidence:.2f}"), + }, + }, + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": ( + f"*Market:* {context.market_question}\n" + f"*Resolution criteria:* {context.resolution_criteria}" + ), + }, + }, + ] + if context.related_markets: + related_text = "\n".join( + f"- *{rm.market_id}* ({rm.relationship_type}): {rm.current_price:.3f}" + for rm in context.related_markets + ) + blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": related_text}}) + if context.investigation_prompts: + prompts_text = "\n".join(f"- {p}" for p in context.investigation_prompts) + blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": prompts_text}}) + if signal.manipulation_flags: + flags_text = ", ".join(f.value for f in signal.manipulation_flags) + blocks.append( + { + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": f"Manipulation flags: {flags_text}", + } + ], + } + ) + blocks.append( + { + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": ( + f"Augur · {context.interpretation_mode.value} " + f"· schema {context.schema_version} " + f"· {signal.signal_id}" + ), + } + ], + } + ) + return {"blocks": blocks} + + def _headers(self, target: WebhookTarget) -> dict[str, str]: + headers = {"Content-Type": "application/json"} + if target.auth_header_env: + value = os.environ.get(target.auth_header_env) + if value: + headers["Authorization"] = value + return headers + + async def deliver(self, context: SignalContext, target: WebhookTarget) -> DeliveryResult: + body = self._render_body(context, target) + headers = self._headers(target) + + async def _call() -> httpx.Response: + response = await self._client.post( + str(target.url), + content=body, + headers=headers, + timeout=self._config.delivery_timeout_seconds, + ) + if response.status_code >= 500 or response.status_code == 429: + response.raise_for_status() + return response + + try: + response = await deliver_with_backoff(_call, self._backoff()) + except Exception as err: + return DeliveryResult( + target_id=target.target_id, + status_code=None, + attempts=self._config.max_retries, + delivered=False, + reason=repr(err), + ) + delivered = 200 <= response.status_code < 400 + return DeliveryResult( + target_id=target.target_id, + status_code=response.status_code, + attempts=1, + delivered=delivered, + reason="ok" if delivered else f"http_{response.status_code}", + ) diff --git a/tests/format/test_webhook.py b/tests/format/test_webhook.py new file mode 100644 index 0000000..8d8692c --- /dev/null +++ b/tests/format/test_webhook.py @@ -0,0 +1,189 @@ +"""Tests for the webhook adapter.""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime + +import httpx +import pytest + +from augur_format._config import WebhookConfig +from augur_format.transport.retry import ( + DeliveryBackoff, + DeliveryRetryExhaustedError, + deliver_with_backoff, +) +from augur_format.transport.webhook import ( + WebhookFormatter, + WebhookTarget, +) +from augur_signals.models import ( + InterpretationMode, + MarketSignal, + SignalContext, + SignalType, + new_signal_id, +) + + +def _context() -> SignalContext: + signal = MarketSignal( + signal_id=new_signal_id(), + market_id="kalshi_fed", + platform="kalshi", + signal_type=SignalType.PRICE_VELOCITY, + magnitude=0.8, + direction=1, + confidence=0.7, + fdr_adjusted=True, + detected_at=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + window_seconds=300, + liquidity_tier="high", + raw_features={"calibration_provenance": "d@identity_v0"}, + ) + return SignalContext( + signal=signal, + market_question="Will the Fed raise rates?", + resolution_criteria="YES if rate rises.", + resolution_source="Federal Reserve", + closes_at=datetime(2026, 6, 15, tzinfo=UTC), + related_markets=[], + investigation_prompts=["Check FOMC calendar."], + interpretation_mode=InterpretationMode.DETERMINISTIC, + ) + + +@pytest.mark.unit +async def test_retry_exhaustion_raises() -> None: + async def failing() -> None: + raise ConnectionError("no route") + + async def fake_sleep(_: float) -> None: + return None + + with pytest.raises(DeliveryRetryExhaustedError) as excinfo: + await deliver_with_backoff( + failing, DeliveryBackoff(initial_seconds=0.0, max_retries=3), sleep=fake_sleep + ) + assert excinfo.value.attempts == 3 + + +@pytest.mark.unit +async def test_delivery_succeeds_on_2xx() -> None: + calls: list[dict[str, object]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + calls.append({"url": str(request.url), "body": request.content}) + return httpx.Response(200, json={"ok": True}) + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + formatter = WebhookFormatter(client, WebhookConfig()) + target = WebhookTarget( + target_id="t1", + url="https://hooks.example.com/augur", # type: ignore[arg-type] + format="json", + ) + result = await formatter.deliver(_context(), target) + assert result.delivered + assert result.status_code == 200 + assert len(calls) == 1 + + +@pytest.mark.unit +async def test_delivery_fails_on_5xx_after_retries() -> None: + count = {"n": 0} + + def handler(request: httpx.Request) -> httpx.Response: + count["n"] += 1 + return httpx.Response(500) + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + formatter = WebhookFormatter( + client, + WebhookConfig( + initial_retry_delay_seconds=0.001, + max_retry_delay_seconds=0.001, + max_retries=3, + ), + ) + target = WebhookTarget( + target_id="t1", + url="https://hooks.example.com/augur", # type: ignore[arg-type] + format="json", + ) + result = await formatter.deliver(_context(), target) + assert not result.delivered + assert count["n"] == 3 + + +@pytest.mark.unit +async def test_delivery_drops_on_4xx() -> None: + count = {"n": 0} + + def handler(request: httpx.Request) -> httpx.Response: + count["n"] += 1 + return httpx.Response(400) + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + formatter = WebhookFormatter(client, WebhookConfig(max_retries=3)) + target = WebhookTarget( + target_id="t1", + url="https://hooks.example.com/augur", # type: ignore[arg-type] + format="json", + ) + result = await formatter.deliver(_context(), target) + assert not result.delivered + assert count["n"] == 1 # no retry on 4xx + assert result.status_code == 400 + + +@pytest.mark.unit +async def test_slack_blocks_format_is_valid_block_kit() -> None: + captured: list[bytes] = [] + + def handler(request: httpx.Request) -> httpx.Response: + captured.append(request.content) + return httpx.Response(200, json={"ok": True}) + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + formatter = WebhookFormatter(client, WebhookConfig()) + target = WebhookTarget( + target_id="slack", + url="https://hooks.slack.com/services/TEST", # type: ignore[arg-type] + format="slack_blocks", + ) + result = await formatter.deliver(_context(), target) + assert result.delivered + payload = json.loads(captured[0]) + assert "blocks" in payload + assert any(b["type"] == "header" for b in payload["blocks"]) + # Confidence should be formatted to two decimals in header text. + header = next(b for b in payload["blocks"] if b["type"] == "header") + assert "0.70" in header["text"]["text"] + + +@pytest.mark.unit +async def test_auth_header_from_env_applied(monkeypatch: pytest.MonkeyPatch) -> None: + captured: list[dict[str, str]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + captured.append(dict(request.headers)) + return httpx.Response(200) + + monkeypatch.setenv("AUGUR_TEST_WEBHOOK_AUTH", "Bearer secret-xyz") + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + formatter = WebhookFormatter(client, WebhookConfig()) + target = WebhookTarget( + target_id="t1", + url="https://hooks.example.com/augur", # type: ignore[arg-type] + format="json", + auth_header_env="AUGUR_TEST_WEBHOOK_AUTH", + ) + await formatter.deliver(_context(), target) + assert captured[0].get("authorization") == "Bearer secret-xyz" From 1f8f7b8128298c16a41b592756bfdd7ee992ada9 Mon Sep 17 00:00:00 2001 From: Mathews-Tom Date: Fri, 17 Apr 2026 14:04:59 +0530 Subject: [PATCH 08/11] feat(format): websocket transport with structured frames and heartbeat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WebSocketBroadcaster fans SignalContext frames out to subscribed clients through bounded per-connection queues. Four frame types mirror phase-3 §7: SIGNAL carries the canonical SignalContext JSON as its payload; HEARTBEAT fires at the configured interval; STORM_START and STORM_END map onto the dedup layer's storm transitions so clients can switch rendering behavior without polling the server. The broadcaster is an in-process primitive that adapts to a real websockets server easily: the deployment layer wraps each accepted connection in a ClientSubscription, forwards subscription.queue.get() to the wire, and drops the subscription on client disconnect. The in-process layer keeps the algorithmic surface testable without spinning up real TCP sockets. Under pressure the broadcaster drops the oldest queued frame to preserve timeliness — matching the dedup doc's rationale that the latest signal is usually the most informative. The dropped counter on each subscription lets operators surface slow-client incidents. HeartbeatScheduler is a small helper that answers "should the broadcaster emit a heartbeat now?" from a caller-supplied ``now``; the engine owns the scheduling loop, which keeps every timing decision deterministic and testable. Nine tests cover: signal-frame payload roundtrip through canonical JSON, heartbeat/no-payload shape, storm frame type mapping, Z-suffix timestamps, fan-out to subscribers, consumer-type filter, oldest-drop under full queues, heartbeat scheduler boundary, and buffer-size validation. --- .../augur_format/transport/websocket.py | 165 ++++++++++++++++++ tests/format/test_websocket.py | 135 ++++++++++++++ 2 files changed, 300 insertions(+) create mode 100644 src/augur_format/augur_format/transport/websocket.py create mode 100644 tests/format/test_websocket.py diff --git a/src/augur_format/augur_format/transport/websocket.py b/src/augur_format/augur_format/transport/websocket.py new file mode 100644 index 0000000..b408adf --- /dev/null +++ b/src/augur_format/augur_format/transport/websocket.py @@ -0,0 +1,165 @@ +"""WebSocket transport with structured frames. + +Frame types mirror phase-3 §7: SIGNAL payloads carry the canonical +SignalContext JSON; HEARTBEAT frames arrive at the configured +interval; STORM_START and STORM_END signal the dedup layer's storm +transitions. Broadcast is fan-out across connected clients with a +per-connection bounded queue; slow clients are dropped rather than +stalling the broadcast loop. +""" + +from __future__ import annotations + +import asyncio +import json +from collections.abc import AsyncIterator, Callable +from dataclasses import dataclass, field +from datetime import datetime +from enum import StrEnum +from typing import Any +from uuid import uuid4 + +from augur_format.deterministic.json_feed import to_canonical_json +from augur_signals.models import SignalContext + + +class FrameType(StrEnum): + """Closed frame-type enum for the WebSocket protocol.""" + + SIGNAL = "signal" + HEARTBEAT = "heartbeat" + STORM_START = "storm_start" + STORM_END = "storm_end" + + +@dataclass(frozen=True, slots=True) +class WebSocketFrame: + """One message on the wire.""" + + frame_type: FrameType + frame_id: str + ts: datetime + payload: dict[str, Any] | None = None + + def to_json(self) -> bytes: + body: dict[str, Any] = { + "frame_type": self.frame_type.value, + "frame_id": self.frame_id, + "ts": self.ts.isoformat().replace("+00:00", "Z"), + } + if self.payload is not None: + body["payload"] = self.payload + return json.dumps(body, separators=(",", ":")).encode("utf-8") + + +def signal_frame(context: SignalContext, now: datetime) -> WebSocketFrame: + """Build a SIGNAL frame whose payload is the canonical SignalContext JSON.""" + return WebSocketFrame( + frame_type=FrameType.SIGNAL, + frame_id=str(uuid4()), + ts=now, + payload=json.loads(to_canonical_json(context).decode("utf-8")), + ) + + +def heartbeat_frame(now: datetime) -> WebSocketFrame: + return WebSocketFrame(frame_type=FrameType.HEARTBEAT, frame_id=str(uuid4()), ts=now) + + +def storm_start_frame(now: datetime) -> WebSocketFrame: + return WebSocketFrame(frame_type=FrameType.STORM_START, frame_id=str(uuid4()), ts=now) + + +def storm_end_frame(now: datetime) -> WebSocketFrame: + return WebSocketFrame(frame_type=FrameType.STORM_END, frame_id=str(uuid4()), ts=now) + + +@dataclass(slots=True) +class ClientSubscription: + """One connected client's send queue and filter.""" + + queue: asyncio.Queue[WebSocketFrame] + consumer_type: str | None = None + dropped: int = 0 + + +class WebSocketBroadcaster: + """In-process broadcaster; adapts to a real websockets server easily. + + The broadcaster manages per-client queues. A ``publish`` call + enqueues the frame for every subscriber whose consumer_type + matches (or whose subscription is unfiltered). Queues are bounded + by ``per_connection_buffer``; enqueue on a full queue drops the + oldest frame to preserve timeliness, matching the dedup/storm + doc's rationale for LIFO under pressure. + """ + + def __init__(self, per_connection_buffer: int = 64) -> None: + if per_connection_buffer <= 0: + raise ValueError("per_connection_buffer must be positive") + self._buffer = per_connection_buffer + self._subscriptions: list[ClientSubscription] = [] + + def subscribe(self, consumer_type: str | None = None) -> ClientSubscription: + sub = ClientSubscription( + queue=asyncio.Queue(maxsize=self._buffer), + consumer_type=consumer_type, + ) + self._subscriptions.append(sub) + return sub + + def unsubscribe(self, subscription: ClientSubscription) -> None: + if subscription in self._subscriptions: + self._subscriptions.remove(subscription) + + def subscriber_count(self) -> int: + return len(self._subscriptions) + + async def publish( + self, + frame: WebSocketFrame, + *, + consumer_type_filter: Callable[[str | None], bool] | None = None, + ) -> None: + for sub in list(self._subscriptions): + if consumer_type_filter is not None and not consumer_type_filter(sub.consumer_type): + continue + if sub.queue.full(): + # Drop the oldest to keep the newest — timeliness matters + # more than completeness under storm conditions. + try: + sub.queue.get_nowait() + except asyncio.QueueEmpty: + pass + sub.dropped += 1 + await sub.queue.put(frame) + + async def stream(self, subscription: ClientSubscription) -> AsyncIterator[WebSocketFrame]: + """Yield frames queued for *subscription* until cancelled.""" + try: + while True: + yield await subscription.queue.get() + finally: + self.unsubscribe(subscription) + + +@dataclass(frozen=True, slots=True) +class HeartbeatScheduler: + """Emits heartbeat frames at the configured interval. + + Exposed as a helper rather than a long-lived task so the engine + owns the lifecycle; tests invoke ``tick`` directly against a + controllable clock. + """ + + interval_seconds: int = 30 + _last_sent: list[datetime] = field(default_factory=list) + + def should_emit(self, now: datetime) -> bool: + if not self._last_sent: + return True + elapsed = (now - self._last_sent[-1]).total_seconds() + return elapsed >= self.interval_seconds + + def record(self, now: datetime) -> None: + self._last_sent.append(now) diff --git a/tests/format/test_websocket.py b/tests/format/test_websocket.py new file mode 100644 index 0000000..36de812 --- /dev/null +++ b/tests/format/test_websocket.py @@ -0,0 +1,135 @@ +"""Tests for the WebSocket broadcaster and frame helpers.""" + +from __future__ import annotations + +import asyncio +import json +from datetime import UTC, datetime, timedelta + +import pytest + +from augur_format.transport.websocket import ( + FrameType, + HeartbeatScheduler, + WebSocketBroadcaster, + heartbeat_frame, + signal_frame, + storm_end_frame, + storm_start_frame, +) +from augur_signals.models import ( + InterpretationMode, + MarketSignal, + SignalContext, + SignalType, + new_signal_id, +) + + +def _context() -> SignalContext: + signal = MarketSignal( + signal_id=new_signal_id(), + market_id="kalshi_fed", + platform="kalshi", + signal_type=SignalType.PRICE_VELOCITY, + magnitude=0.8, + direction=1, + confidence=0.7, + fdr_adjusted=True, + detected_at=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + window_seconds=300, + liquidity_tier="high", + raw_features={"calibration_provenance": "d@identity_v0"}, + ) + return SignalContext( + signal=signal, + market_question="q", + resolution_criteria="c", + resolution_source="s", + closes_at=datetime(2026, 6, 15, tzinfo=UTC), + related_markets=[], + investigation_prompts=[], + interpretation_mode=InterpretationMode.DETERMINISTIC, + ) + + +@pytest.mark.unit +def test_signal_frame_payload_contains_canonical_signal_context() -> None: + ctx = _context() + frame = signal_frame(ctx, datetime(2026, 3, 15, 12, 0, tzinfo=UTC)) + assert frame.frame_type == FrameType.SIGNAL + assert frame.payload is not None + assert frame.payload["signal"]["signal_id"] == ctx.signal.signal_id + + +@pytest.mark.unit +def test_heartbeat_frame_has_no_payload() -> None: + frame = heartbeat_frame(datetime(2026, 3, 15, 12, 0, tzinfo=UTC)) + assert frame.frame_type == FrameType.HEARTBEAT + assert frame.payload is None + + +@pytest.mark.unit +def test_storm_frames_emit_expected_types() -> None: + now = datetime(2026, 3, 15, 12, 0, tzinfo=UTC) + assert storm_start_frame(now).frame_type == FrameType.STORM_START + assert storm_end_frame(now).frame_type == FrameType.STORM_END + + +@pytest.mark.unit +def test_frame_to_json_uses_z_suffix() -> None: + frame = heartbeat_frame(datetime(2026, 3, 15, 12, 0, tzinfo=UTC)) + body = json.loads(frame.to_json()) + assert body["ts"].endswith("Z") + assert "payload" not in body + + +@pytest.mark.asyncio +async def test_broadcaster_fans_out_to_subscribers() -> None: + broadcaster = WebSocketBroadcaster(per_connection_buffer=8) + sub = broadcaster.subscribe() + await broadcaster.publish(heartbeat_frame(datetime(2026, 3, 15, tzinfo=UTC))) + frame = await asyncio.wait_for(sub.queue.get(), timeout=0.1) + assert frame.frame_type == FrameType.HEARTBEAT + + +@pytest.mark.asyncio +async def test_broadcaster_filters_by_consumer_type() -> None: + broadcaster = WebSocketBroadcaster(per_connection_buffer=4) + dashboard = broadcaster.subscribe(consumer_type="dashboard") + macro = broadcaster.subscribe(consumer_type="macro_research_agent") + frame = heartbeat_frame(datetime(2026, 3, 15, tzinfo=UTC)) + await broadcaster.publish( + frame, + consumer_type_filter=lambda ct: ct == "dashboard", + ) + assert dashboard.queue.qsize() == 1 + assert macro.queue.qsize() == 0 + + +@pytest.mark.asyncio +async def test_broadcaster_drops_oldest_on_full_queue() -> None: + broadcaster = WebSocketBroadcaster(per_connection_buffer=2) + sub = broadcaster.subscribe() + now = datetime(2026, 3, 15, 12, 0, tzinfo=UTC) + for _ in range(5): + await broadcaster.publish(heartbeat_frame(now)) + # Queue holds only the last 2; dropped counter tracks the overflow. + assert sub.queue.qsize() == 2 + assert sub.dropped >= 3 + + +@pytest.mark.unit +def test_heartbeat_scheduler_emits_after_interval() -> None: + scheduler = HeartbeatScheduler(interval_seconds=30) + t0 = datetime(2026, 3, 15, 12, 0, tzinfo=UTC) + assert scheduler.should_emit(t0) + scheduler.record(t0) + assert not scheduler.should_emit(t0 + timedelta(seconds=10)) + assert scheduler.should_emit(t0 + timedelta(seconds=30)) + + +@pytest.mark.unit +def test_broadcaster_rejects_invalid_buffer() -> None: + with pytest.raises(ValueError, match="positive"): + WebSocketBroadcaster(per_connection_buffer=0) From 0313dd6ca7260bcf980b441c55326f4021338615 Mon Sep 17 00:00:00 2001 From: Mathews-Tom Date: Fri, 17 Apr 2026 14:06:18 +0530 Subject: [PATCH 09/11] feat(format): consumer registry and signal router MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ConsumerRegistry.from_toml reads config/consumers.toml (shipped in the workspace bootstrap) and exposes the per-category consumer routing as typed ConsumerType tuples. Unknown categories fall through to the ``default`` entry, which mirrors docs/contracts/consumer-registry.md §Routing's fallback rule without hardcoding the dashboard default into code. SignalRouter maps a SignalContext to the consumer set by looking up the market's category in the registry. When the context's interpretation_mode is LLM_ASSISTED, consumers that have not explicitly opted in are reported under suppressed rather than consumers — the deterministic pathway in this phase does not exercise this branch, but the gated secondary formatter will. The default opt-in set is {DASHBOARD} per the consumer-registry document's §Why Each Consumer Exists. Six tests cover: registry TOML load against the shipped config/consumers.toml, fall-through to default for unknown categories, default consumers on unregistered markets, per-market category routing, llm_assisted suppression surfacing, and register_market_category idempotency. --- .../augur_format/routing/__init__.py | 3 + .../augur_format/routing/consumer_registry.py | 66 ++++++++++++ .../augur_format/routing/router.py | 63 +++++++++++ tests/format/test_routing.py | 101 ++++++++++++++++++ 4 files changed, 233 insertions(+) create mode 100644 src/augur_format/augur_format/routing/__init__.py create mode 100644 src/augur_format/augur_format/routing/consumer_registry.py create mode 100644 src/augur_format/augur_format/routing/router.py create mode 100644 tests/format/test_routing.py diff --git a/src/augur_format/augur_format/routing/__init__.py b/src/augur_format/augur_format/routing/__init__.py new file mode 100644 index 0000000..08920d7 --- /dev/null +++ b/src/augur_format/augur_format/routing/__init__.py @@ -0,0 +1,3 @@ +"""Consumer registry and signal router.""" + +from __future__ import annotations diff --git a/src/augur_format/augur_format/routing/consumer_registry.py b/src/augur_format/augur_format/routing/consumer_registry.py new file mode 100644 index 0000000..0511e71 --- /dev/null +++ b/src/augur_format/augur_format/routing/consumer_registry.py @@ -0,0 +1,66 @@ +"""Consumer registry loader. + +Reads ``config/consumers.toml`` (seeded in the workspace bootstrap) +and exposes the per-category consumer routing plus per-consumer +transport configuration. The router consumes the registry to decide +which consumers should receive a given signal. +""" + +from __future__ import annotations + +import tomllib +from dataclasses import dataclass, field +from pathlib import Path + +from augur_signals.models import ConsumerType + + +@dataclass(frozen=True, slots=True) +class CategoryRouting: + """Default consumers for a market category.""" + + category: str + consumers: tuple[ConsumerType, ...] + + +class ConsumerRegistry: + """Read-only registry loaded from config/consumers.toml.""" + + def __init__(self, routing: dict[str, tuple[ConsumerType, ...]]) -> None: + self._routing = dict(routing) + + def consumers_for_category(self, category: str) -> tuple[ConsumerType, ...]: + """Return the default consumers for *category*. + + Unknown categories fall through to ``default`` — matching the + Routing Table in docs/contracts/consumer-registry.md. + """ + if category in self._routing: + return self._routing[category] + return self._routing.get("default", (ConsumerType.DASHBOARD,)) + + def known_categories(self) -> frozenset[str]: + return frozenset(self._routing.keys()) + + @classmethod + def from_toml(cls, path: Path) -> ConsumerRegistry: + with path.open("rb") as handle: + raw = tomllib.load(handle) + categories_raw = raw.get("categories", {}) + routing: dict[str, tuple[ConsumerType, ...]] = {} + for category, entry in categories_raw.items(): + consumers = tuple(ConsumerType(value) for value in entry.get("consumers", [])) + routing[category] = consumers + if "default" not in routing: + routing["default"] = (ConsumerType.DASHBOARD,) + return cls(routing) + + +@dataclass(frozen=True, slots=True) +class CategoryCoverage: + """One entry of the coverage report.""" + + category: str + consumer_count: int + has_default_fallback: bool + consumers: list[str] = field(default_factory=list) diff --git a/src/augur_format/augur_format/routing/router.py b/src/augur_format/augur_format/routing/router.py new file mode 100644 index 0000000..2653fe4 --- /dev/null +++ b/src/augur_format/augur_format/routing/router.py @@ -0,0 +1,63 @@ +"""Signal router — decides which consumers receive each SignalContext. + +The router composes the category-to-consumers mapping from the +ConsumerRegistry with per-consumer suppression policy (for example, +whether a consumer accepts LLM-assisted briefs). In Phase 3 every +brief emitted is deterministic, so the suppression flag is unused +today; it becomes load-bearing once the gated secondary formatter +produces llm_assisted briefs. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from augur_format.routing.consumer_registry import ConsumerRegistry +from augur_signals.models import ( + ConsumerType, + InterpretationMode, + SignalContext, +) + + +@dataclass(frozen=True, slots=True) +class RoutingDecision: + """The set of consumers receiving a given context plus reasons.""" + + consumers: tuple[ConsumerType, ...] + suppressed: tuple[ConsumerType, ...] = () + + +class SignalRouter: + """Route SignalContext into consumer sets.""" + + def __init__( + self, + registry: ConsumerRegistry, + market_categories: dict[str, str] | None = None, + llm_assisted_consumers: frozenset[ConsumerType] | None = None, + ) -> None: + self._registry = registry + self._market_categories = dict(market_categories or {}) + # Consumers that opt in to llm_assisted briefs. Dashboard is + # the documented default from consumer-registry.md §Why Each + # Consumer Exists. + self._llm_assisted = llm_assisted_consumers or frozenset({ConsumerType.DASHBOARD}) + + def register_market_category(self, market_id: str, category: str) -> None: + self._market_categories[market_id] = category + + def route(self, context: SignalContext) -> RoutingDecision: + """Return the consumer set for *context*. + + Consumers whose subscription excludes the context's + interpretation_mode are reported under ``suppressed`` so + operational metrics can count the drops. + """ + category = self._market_categories.get(context.signal.market_id, "default") + candidates = self._registry.consumers_for_category(category) + if context.interpretation_mode == InterpretationMode.LLM_ASSISTED: + allowed = tuple(c for c in candidates if c in self._llm_assisted) + suppressed = tuple(c for c in candidates if c not in self._llm_assisted) + return RoutingDecision(consumers=allowed, suppressed=suppressed) + return RoutingDecision(consumers=candidates) diff --git a/tests/format/test_routing.py b/tests/format/test_routing.py new file mode 100644 index 0000000..44dc8ae --- /dev/null +++ b/tests/format/test_routing.py @@ -0,0 +1,101 @@ +"""Tests for consumer registry and the signal router.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from pathlib import Path + +import pytest + +from augur_format.routing.consumer_registry import ConsumerRegistry +from augur_format.routing.router import SignalRouter +from augur_signals.models import ( + ConsumerType, + InterpretationMode, + MarketSignal, + SignalContext, + SignalType, + new_signal_id, +) + + +def _context( + market_id: str = "kalshi_fed", + interpretation_mode: InterpretationMode = InterpretationMode.DETERMINISTIC, +) -> SignalContext: + signal = MarketSignal( + signal_id=new_signal_id(), + market_id=market_id, + platform="kalshi", + signal_type=SignalType.PRICE_VELOCITY, + magnitude=0.8, + direction=1, + confidence=0.7, + fdr_adjusted=True, + detected_at=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + window_seconds=300, + liquidity_tier="high", + raw_features={"calibration_provenance": "d@identity_v0"}, + ) + return SignalContext( + signal=signal, + market_question="q", + resolution_criteria="c", + resolution_source="s", + closes_at=datetime(2026, 6, 15, tzinfo=UTC), + related_markets=[], + investigation_prompts=[], + interpretation_mode=interpretation_mode, + ) + + +@pytest.mark.unit +def test_registry_from_toml_reads_routing() -> None: + registry = ConsumerRegistry.from_toml(Path("config/consumers.toml")) + consumers = registry.consumers_for_category("monetary_policy") + assert ConsumerType.MACRO_RESEARCH_AGENT in consumers + assert ConsumerType.FINANCIAL_NEWS_DESK in consumers + assert ConsumerType.DASHBOARD in consumers + + +@pytest.mark.unit +def test_registry_falls_through_to_default_on_unknown_category() -> None: + registry = ConsumerRegistry.from_toml(Path("config/consumers.toml")) + consumers = registry.consumers_for_category("not-a-real-category") + assert consumers == (ConsumerType.DASHBOARD,) + + +@pytest.mark.unit +def test_router_returns_default_consumers_for_unregistered_market() -> None: + registry = ConsumerRegistry.from_toml(Path("config/consumers.toml")) + router = SignalRouter(registry) + decision = router.route(_context()) + assert ConsumerType.DASHBOARD in decision.consumers + + +@pytest.mark.unit +def test_router_applies_market_category() -> None: + registry = ConsumerRegistry.from_toml(Path("config/consumers.toml")) + router = SignalRouter(registry, market_categories={"kalshi_fed": "monetary_policy"}) + decision = router.route(_context()) + assert ConsumerType.MACRO_RESEARCH_AGENT in decision.consumers + + +@pytest.mark.unit +def test_router_suppresses_non_llm_consumers_on_llm_assisted_context() -> None: + registry = ConsumerRegistry.from_toml(Path("config/consumers.toml")) + router = SignalRouter(registry, market_categories={"kalshi_fed": "monetary_policy"}) + decision = router.route(_context(interpretation_mode=InterpretationMode.LLM_ASSISTED)) + assert ConsumerType.DASHBOARD in decision.consumers + assert ConsumerType.MACRO_RESEARCH_AGENT in decision.suppressed + assert ConsumerType.MACRO_RESEARCH_AGENT not in decision.consumers + + +@pytest.mark.unit +def test_router_register_market_category_is_idempotent() -> None: + registry = ConsumerRegistry.from_toml(Path("config/consumers.toml")) + router = SignalRouter(registry) + router.register_market_category("kalshi_fed", "monetary_policy") + router.register_market_category("kalshi_fed", "monetary_policy") + decision = router.route(_context()) + assert ConsumerType.MACRO_RESEARCH_AGENT in decision.consumers From a09ea366b381602d4a781569d3a0d6b00c10bac2 Mon Sep 17 00:00:00 2001 From: Mathews-Tom Date: Fri, 17 Apr 2026 14:06:51 +0530 Subject: [PATCH 10/11] docs: record deterministic formatters in the changelog --- CHANGELOG.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d7b3ce..f1d2bbe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,22 @@ All notable changes to Augur are recorded in this file. Format follows [Keep a C ## [Unreleased] +### Added — Deterministic Formatters + +- `src/augur_format/deterministic/json_feed.py` — `to_canonical_json` emits UTF-8 JSON bytes with stable key ordering (top-level, signal block, related-market block), six-decimal float rounding (configurable), and Z-suffix UTC timestamps. Byte-identical across invocations. +- `src/augur_format/deterministic/severity.py` — pure `derive_severity` mapping magnitude × confidence against per-liquidity-tier thresholds to `{high, medium, low}`. Formula lives in code so consumers can reproduce locally. +- `src/augur_format/deterministic/markdown.py` — Jinja2 `MarkdownFormatter` rendering five per-signal-type templates that extend `_base.md.j2`. Templates ship inside the wheel via the hatch `include = ["augur_format/**/*.j2"]` rule. +- `src/augur_format/validate/` — `ConsumerEnumValidator` rejects briefs whose `actionable_for` contains values outside `ConsumerType`; `load_schema` reads exported JSON schemas from `schemas/` for debug-build validation. +- `src/augur_format/transport/webhook.py` — `WebhookFormatter` POSTs canonical JSON, wrapped Markdown, or Slack Block Kit payloads to configured destinations with exponential-backoff retry on 5xx/429 and drop on 4xx. Auth headers sourced from env vars at delivery time. +- `src/augur_format/transport/websocket.py` — `WebSocketBroadcaster` with `SIGNAL`, `HEARTBEAT`, `STORM_START`, `STORM_END` frame types; oldest-drop under full per-connection queues for timeliness under pressure. +- `src/augur_format/routing/` — `ConsumerRegistry.from_toml` loads `config/consumers.toml` and exposes per-category routing; `SignalRouter` maps `SignalContext` to the consumer set, surfacing suppressed consumers for `llm_assisted` interpretation mode. +- `src/augur_format/llm/models.py` — `IntelligenceBrief` contract declared in this phase for completeness. The gated LLM formatter in the next phase instantiates the model; the JSON schema ships at `schemas/IntelligenceBrief-1.0.0.json`. +- `config/formatters.toml` mirrors `phase-3 §12.2` with JSON, Markdown, Webhook, and WebSocket blocks validated against `FormatterConfig`. + +### Operational Handoff — Deterministic Formatters + +After merge operators can subscribe clients to the WebSocket broadcaster for live signal frames, wire webhook targets (Slack or generic JSON/Markdown) to push brief deliveries, and route signals to consumers via the `ConsumerRegistry` loaded from `config/consumers.toml`. The canonical JSON feed is ready for any consumer that validates against `schemas/SignalContext-1.0.0.json`. + ### Added — Labeling Pipeline - `src/augur_labels/` package with Pydantic data contracts for `NewsworthyEvent`, `EventCandidate`, `SourcePublication`, `QualifyingSource`, `LabelDecision`, `AnnotatorIdentity`, and `AgreementReport`. The closed `source_id` literal set (reuters, bloomberg, ap, ft) is load-bearing across adapters, storage, and workflow. From 837acc408a26288ed12309723aa65135677313ab Mon Sep 17 00:00:00 2001 From: Mathews-Tom Date: Fri, 17 Apr 2026 14:12:59 +0530 Subject: [PATCH 11/11] fix(format): address pr-review findings in deterministic formatters CRITICAL: _round_floats now sorts nested dict keys so producers with variable raw_features insertion order (the dedup fingerprint and cluster-merge paths that conditionally add merge_provenance / cluster_member_signal_ids) emit byte-identical JSON for the same logical payload. The outer payload was already protected by CANONICAL_KEY_ORDER; nested dicts were not. The 1000-call test could only detect drift within a single process; this fix covers cross-producer drift that the review called out. HIGH: deliver_with_backoff now returns (result, attempts) so DeliveryResult.attempts reflects the actual retry count instead of hardcoding policy.max_retries on failure and 1 on success. Webhook delivery surfaces DeliveryRetryExhaustedError.attempts and last_error verbatim into the telemetry record. HIGH: HeartbeatScheduler is no longer @dataclass(frozen=True) with a mutable list field; it is plain slots with datetime | None for last-sent. frozen + mutable contents was accidental and semantically misleading. HIGH: WebSocketBroadcaster.publish now serialises the full-queue check-and-drop under a per-subscription asyncio.Lock. Concurrent publishers previously could each drain a slot from the same full queue, dropping multiple frames when oldest-drop intended to drop exactly one. HIGH: WebhookTarget drops the consumer_types and accepts_llm_assisted fields that had no call site today. SignalRouter owns consumer gating; the LLM formatter's opt-in arrives when the gated formatter lands. Per CLAUDE.md no abstractions without two concrete call sites. --- .../augur_format/deterministic/json_feed.py | 6 ++- .../augur_format/transport/retry.py | 14 +++++-- .../augur_format/transport/webhook.py | 31 +++++++++----- .../augur_format/transport/websocket.py | 42 ++++++++++--------- 4 files changed, 59 insertions(+), 34 deletions(-) diff --git a/src/augur_format/augur_format/deterministic/json_feed.py b/src/augur_format/augur_format/deterministic/json_feed.py index 74cffa9..29e6eca 100644 --- a/src/augur_format/augur_format/deterministic/json_feed.py +++ b/src/augur_format/augur_format/deterministic/json_feed.py @@ -101,7 +101,11 @@ def _round_floats(value: Any, float_decimals: int) -> Any: if isinstance(value, list): return [_round_floats(v, float_decimals) for v in value] if isinstance(value, dict): - return {k: _round_floats(v, float_decimals) for k, v in value.items()} + # Sort nested dict keys so producers with variable insertion + # order (e.g. raw_features populated conditionally by dedup + # and cluster-merge paths) still emit byte-identical JSON for + # the same logical payload. + return {k: _round_floats(value[k], float_decimals) for k in sorted(value)} return value diff --git a/src/augur_format/augur_format/transport/retry.py b/src/augur_format/augur_format/transport/retry.py index f0cce08..e4cfa2f 100644 --- a/src/augur_format/augur_format/transport/retry.py +++ b/src/augur_format/augur_format/transport/retry.py @@ -34,19 +34,27 @@ async def deliver_with_backoff[T]( factory: Callable[[], Awaitable[T]], policy: DeliveryBackoff, sleep: Callable[[float], Awaitable[None]] = asyncio.sleep, -) -> T: - """Invoke *factory* with exponential backoff.""" +) -> tuple[T, int]: + """Invoke *factory* with exponential backoff. + + Returns ``(result, attempts)`` where ``attempts`` is the 1-based + count of attempts up to and including the successful call so the + caller can surface the actual attempt count in operational + telemetry rather than hardcoding policy.max_retries. + """ delay = policy.initial_seconds last_error: BaseException | None = None for attempt in range(1, policy.max_retries + 1): try: - return await factory() + result = await factory() except Exception as err: last_error = err if attempt == policy.max_retries: break await sleep(delay) delay = min(delay * 2.0, policy.max_seconds) + else: + return result, attempt if last_error is None: # pragma: no cover raise RuntimeError("delivery retry loop exited without capturing an error") raise DeliveryRetryExhaustedError(attempts=policy.max_retries, last_error=last_error) diff --git a/src/augur_format/augur_format/transport/webhook.py b/src/augur_format/augur_format/transport/webhook.py index 8e910be..f722c48 100644 --- a/src/augur_format/augur_format/transport/webhook.py +++ b/src/augur_format/augur_format/transport/webhook.py @@ -16,29 +16,38 @@ from typing import Any, Literal import httpx -from pydantic import BaseModel, ConfigDict, Field, HttpUrl +from pydantic import BaseModel, ConfigDict, HttpUrl from augur_format._config import WebhookConfig from augur_format.deterministic.json_feed import to_canonical_json from augur_format.deterministic.markdown import MarkdownFormatter from augur_format.deterministic.severity import derive_severity -from augur_format.transport.retry import DeliveryBackoff, deliver_with_backoff -from augur_signals.models import ConsumerType, SignalContext +from augur_format.transport.retry import ( + DeliveryBackoff, + DeliveryRetryExhaustedError, + deliver_with_backoff, +) +from augur_signals.models import SignalContext WebhookFormat = Literal["json", "markdown", "slack_blocks"] class WebhookTarget(BaseModel): - """One configured webhook destination.""" + """One configured webhook destination. + + Consumer-type gating and LLM-assisted opt-in live on the + SignalRouter and the LLM formatter gate respectively; neither + belongs on the delivery target, where there is no call site. + Phase-4 re-introduces ``accepts_llm_assisted`` when the gated + formatter needs per-target opt-in. + """ model_config = ConfigDict(frozen=True, extra="forbid") target_id: str url: HttpUrl format: WebhookFormat - consumer_types: list[ConsumerType] = Field(default_factory=list) auth_header_env: str | None = None - accepts_llm_assisted: bool = False @dataclass(frozen=True, slots=True) @@ -166,20 +175,20 @@ async def _call() -> httpx.Response: return response try: - response = await deliver_with_backoff(_call, self._backoff()) - except Exception as err: + response, attempts = await deliver_with_backoff(_call, self._backoff()) + except DeliveryRetryExhaustedError as err: return DeliveryResult( target_id=target.target_id, status_code=None, - attempts=self._config.max_retries, + attempts=err.attempts, delivered=False, - reason=repr(err), + reason=repr(err.last_error), ) delivered = 200 <= response.status_code < 400 return DeliveryResult( target_id=target.target_id, status_code=response.status_code, - attempts=1, + attempts=attempts, delivered=delivered, reason="ok" if delivered else f"http_{response.status_code}", ) diff --git a/src/augur_format/augur_format/transport/websocket.py b/src/augur_format/augur_format/transport/websocket.py index b408adf..3fd69f1 100644 --- a/src/augur_format/augur_format/transport/websocket.py +++ b/src/augur_format/augur_format/transport/websocket.py @@ -81,6 +81,9 @@ class ClientSubscription: queue: asyncio.Queue[WebSocketFrame] consumer_type: str | None = None dropped: int = 0 + # Per-subscription lock so concurrent publishers serialise the + # full-queue check-and-drop instead of each draining one slot. + lock: asyncio.Lock = field(default_factory=asyncio.Lock) class WebSocketBroadcaster: @@ -124,15 +127,16 @@ async def publish( for sub in list(self._subscriptions): if consumer_type_filter is not None and not consumer_type_filter(sub.consumer_type): continue - if sub.queue.full(): - # Drop the oldest to keep the newest — timeliness matters - # more than completeness under storm conditions. - try: - sub.queue.get_nowait() - except asyncio.QueueEmpty: - pass - sub.dropped += 1 - await sub.queue.put(frame) + async with sub.lock: + # Serialise check-and-drop so concurrent publishers do + # not each drain a slot from the same full queue. + if sub.queue.full(): + try: + sub.queue.get_nowait() + except asyncio.QueueEmpty: + pass + sub.dropped += 1 + await sub.queue.put(frame) async def stream(self, subscription: ClientSubscription) -> AsyncIterator[WebSocketFrame]: """Yield frames queued for *subscription* until cancelled.""" @@ -143,23 +147,23 @@ async def stream(self, subscription: ClientSubscription) -> AsyncIterator[WebSoc self.unsubscribe(subscription) -@dataclass(frozen=True, slots=True) +@dataclass(slots=True) class HeartbeatScheduler: - """Emits heartbeat frames at the configured interval. + """Answers "should a heartbeat emit now?" against caller-supplied time. - Exposed as a helper rather than a long-lived task so the engine - owns the lifecycle; tests invoke ``tick`` directly against a - controllable clock. + The scheduler is mutable by design — ``record`` tracks the last + emission so ``should_emit`` can gate the next one. Engine code + owns the outer loop and passes ``now`` explicitly so the scheduler + stays backtest-deterministic. """ interval_seconds: int = 30 - _last_sent: list[datetime] = field(default_factory=list) + _last_sent: datetime | None = field(default=None) def should_emit(self, now: datetime) -> bool: - if not self._last_sent: + if self._last_sent is None: return True - elapsed = (now - self._last_sent[-1]).total_seconds() - return elapsed >= self.interval_seconds + return (now - self._last_sent).total_seconds() >= self.interval_seconds def record(self, now: datetime) -> None: - self._last_sent.append(now) + self._last_sent = now