From c39d00977bb8cad807c0a79b0f73c9254d18c2fa Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Sat, 9 May 2026 05:49:09 -0700 Subject: [PATCH] feat(visibility): surface per-collector errors to Flow Doctor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Saturday SF DataPhase1 PARTIAL run on 2026-05-09 fired no per-failure Flow Doctor alert. The arcticdb backfill regression was stored in the result dict but never logged at ERROR level — only main()'s generic "Weekly collection finished with non-ok status=partial" summary fires, which produces a single dedup signature across every partial run and contains no actual error text for Flow Doctor's LLM diagnose pipeline. Fix: _finalize() now calls alpha_engine_lib.collector_results.report_collector_errors(), which emits one logger.error() per error-status entry with the collector name + original message. Each emitted record carries a distinct dedup signature, restoring per-failure alert granularity. - Pin alpha-engine-lib v0.5.1 → v0.6.2 (helper landed in lib PR #34) - Wire call inside _finalize after status computation, before manifest write — fires for every code path that finalizes (phase 1, phase 2, daily, morning enrich) and runs even if postflight raises afterward - New wiring test test_collector_error_visibility.py pins the call so a future refactor can't silently drop it 552 unit tests pass locally. Co-Authored-By: Claude Opus 4.7 (1M context) --- requirements.txt | 2 +- tests/test_collector_error_visibility.py | 92 ++++++++++++++++++++++++ weekly_collector.py | 9 +++ 3 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 tests/test_collector_error_visibility.py diff --git a/requirements.txt b/requirements.txt index 21bbc95..757bccc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,4 +19,4 @@ arcticdb>=6.11 # previously listed above as direct deps; kept those direct lines for now to # avoid breaking pinning during the migration. Drop the duplicate direct # pgvector/psycopg2-binary pins once the migration soaks. -alpha-engine-lib[arcticdb,flow_doctor,rag] @ git+https://github.com/cipher813/alpha-engine-lib@v0.5.1 +alpha-engine-lib[arcticdb,flow_doctor,rag] @ git+https://github.com/cipher813/alpha-engine-lib@v0.6.2 diff --git a/tests/test_collector_error_visibility.py b/tests/test_collector_error_visibility.py new file mode 100644 index 0000000..e4cc08a --- /dev/null +++ b/tests/test_collector_error_visibility.py @@ -0,0 +1,92 @@ +"""Verify _finalize surfaces per-collector errors to logging.ERROR. + +Surfaced 2026-05-09 from a Saturday SF DataPhase1 PARTIAL run where the +arcticdb backfill regression preflight failure was stored in the result +dict but never logged at ERROR level — only main()'s generic "non-ok +status" summary fired, which produces a single dedup signature across +every partial run and contains no actual error text for Flow Doctor's +LLM diagnose pipeline to work with. + +Fix: ``_finalize`` now calls +``alpha_engine_lib.collector_results.report_collector_errors(results["collectors"])`` +which emits one ``logger.error()`` per error-status entry with the +collector name + original message. This wiring test pins that call site +so a future refactor can't silently drop it. +""" + +from __future__ import annotations + +import logging + +import pytest + +from weekly_collector import _finalize + + +def test_finalize_logs_each_collector_error(caplog: pytest.LogCaptureFixture): + """Per-collector error messages are visible at ERROR level. + + Two distinct collector failures must produce two distinct ERROR + records — Flow Doctor's dedup keys off the rendered message, so + one alert per failure is the load-bearing property. + """ + results = { + "phase": 1, + "collectors": { + "constituents": {"status": "ok"}, + "arcticdb": { + "status": "error", + "error": "Backfill regression preflight failed: 38 symbols would regress", + }, + "fundamentals": {"status": "error", "error": "Polygon 429 rate limit"}, + }, + } + with caplog.at_level(logging.ERROR): + _finalize( + results, + bucket="test-bucket", + market_prefix="market_data/", + run_date="2026-05-09", + dry_run=True, # skip _write_manifest / _write_validation_json + postflight + only=None, + ) + + error_records = [r for r in caplog.records if r.levelno == logging.ERROR] + messages = [r.getMessage() for r in error_records] + + assert any( + "collector arcticdb failed: Backfill regression preflight failed" in m + for m in messages + ), f"arcticdb error not logged at ERROR. messages={messages}" + assert any( + "collector fundamentals failed: Polygon 429" in m for m in messages + ), f"fundamentals error not logged at ERROR. messages={messages}" + assert results["status"] == "partial" + + +def test_finalize_silent_on_all_ok(caplog: pytest.LogCaptureFixture): + """All-ok run must not emit any ERROR-level records. + + Spurious ERROR logs would dedup-spam Flow Doctor's daily caps. + """ + results = { + "phase": 1, + "collectors": { + "constituents": {"status": "ok"}, + "prices": {"status": "ok"}, + "macro": {"status": "ok_dry_run"}, + }, + } + with caplog.at_level(logging.ERROR): + _finalize( + results, + bucket="test-bucket", + market_prefix="market_data/", + run_date="2026-05-09", + dry_run=True, + only=None, + ) + + assert results["status"] == "ok" + error_records = [r for r in caplog.records if r.levelno == logging.ERROR] + assert error_records == [], f"unexpected ERROR records on all-ok run: {error_records}" diff --git a/weekly_collector.py b/weekly_collector.py index 5ece04c..19c37d7 100644 --- a/weekly_collector.py +++ b/weekly_collector.py @@ -973,6 +973,15 @@ def _finalize( else: results["status"] = "partial" + # Surface per-collector errors to Flow Doctor's ERROR-level handler. + # Without this, the only logger.error() that fires on a partial run is + # main()'s generic "non-ok status" summary line — single dedup signature + # across every partial run, no diagnose-context error text. The helper + # emits one logger.error per error-status entry with the collector name + # + original message, restoring per-failure alert granularity. + from alpha_engine_lib.collector_results import report_collector_errors + report_collector_errors(results["collectors"]) + if not dry_run and only is None: _write_manifest(bucket, market_prefix, run_date, results) _write_validation_json(bucket, market_prefix, run_date, results)