Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ arcticdb>=6.11
# previously listed above as direct deps; kept those direct lines for now to
# avoid breaking pinning during the migration. Drop the duplicate direct
# pgvector/psycopg2-binary pins once the migration soaks.
alpha-engine-lib[arcticdb,flow_doctor,rag] @ git+https://github.com/cipher813/alpha-engine-lib@v0.5.1
alpha-engine-lib[arcticdb,flow_doctor,rag] @ git+https://github.com/cipher813/alpha-engine-lib@v0.6.2
92 changes: 92 additions & 0 deletions tests/test_collector_error_visibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Verify _finalize surfaces per-collector errors to logging.ERROR.

Surfaced 2026-05-09 from a Saturday SF DataPhase1 PARTIAL run where the
arcticdb backfill regression preflight failure was stored in the result
dict but never logged at ERROR level — only main()'s generic "non-ok
status" summary fired, which produces a single dedup signature across
every partial run and contains no actual error text for Flow Doctor's
LLM diagnose pipeline to work with.

Fix: ``_finalize`` now calls
``alpha_engine_lib.collector_results.report_collector_errors(results["collectors"])``
which emits one ``logger.error()`` per error-status entry with the
collector name + original message. This wiring test pins that call site
so a future refactor can't silently drop it.
"""

from __future__ import annotations

import logging

import pytest

from weekly_collector import _finalize


def test_finalize_logs_each_collector_error(caplog: pytest.LogCaptureFixture):
"""Per-collector error messages are visible at ERROR level.

Two distinct collector failures must produce two distinct ERROR
records — Flow Doctor's dedup keys off the rendered message, so
one alert per failure is the load-bearing property.
"""
results = {
"phase": 1,
"collectors": {
"constituents": {"status": "ok"},
"arcticdb": {
"status": "error",
"error": "Backfill regression preflight failed: 38 symbols would regress",
},
"fundamentals": {"status": "error", "error": "Polygon 429 rate limit"},
},
}
with caplog.at_level(logging.ERROR):
_finalize(
results,
bucket="test-bucket",
market_prefix="market_data/",
run_date="2026-05-09",
dry_run=True, # skip _write_manifest / _write_validation_json + postflight
only=None,
)

error_records = [r for r in caplog.records if r.levelno == logging.ERROR]
messages = [r.getMessage() for r in error_records]

assert any(
"collector arcticdb failed: Backfill regression preflight failed" in m
for m in messages
), f"arcticdb error not logged at ERROR. messages={messages}"
assert any(
"collector fundamentals failed: Polygon 429" in m for m in messages
), f"fundamentals error not logged at ERROR. messages={messages}"
assert results["status"] == "partial"


def test_finalize_silent_on_all_ok(caplog: pytest.LogCaptureFixture):
"""All-ok run must not emit any ERROR-level records.

Spurious ERROR logs would dedup-spam Flow Doctor's daily caps.
"""
results = {
"phase": 1,
"collectors": {
"constituents": {"status": "ok"},
"prices": {"status": "ok"},
"macro": {"status": "ok_dry_run"},
},
}
with caplog.at_level(logging.ERROR):
_finalize(
results,
bucket="test-bucket",
market_prefix="market_data/",
run_date="2026-05-09",
dry_run=True,
only=None,
)

assert results["status"] == "ok"
error_records = [r for r in caplog.records if r.levelno == logging.ERROR]
assert error_records == [], f"unexpected ERROR records on all-ok run: {error_records}"
9 changes: 9 additions & 0 deletions weekly_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,15 @@ def _finalize(
else:
results["status"] = "partial"

# Surface per-collector errors to Flow Doctor's ERROR-level handler.
# Without this, the only logger.error() that fires on a partial run is
# main()'s generic "non-ok status" summary line — single dedup signature
# across every partial run, no diagnose-context error text. The helper
# emits one logger.error per error-status entry with the collector name
# + original message, restoring per-failure alert granularity.
from alpha_engine_lib.collector_results import report_collector_errors
report_collector_errors(results["collectors"])

if not dry_run and only is None:
_write_manifest(bucket, market_prefix, run_date, results)
_write_validation_json(bucket, market_prefix, run_date, results)
Expand Down
Loading