From d6e8d5fa95a2f5167c7ee351f12b40fcaaccd4db Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 19 May 2026 17:20:20 -0700 Subject: [PATCH] =?UTF-8?q?feat(wave3):=20PR3-wave-2=20predictor=20reader?= =?UTF-8?q?=20migrations=20=E2=80=94=20runner=20+=20dead-constant=20+=20se?= =?UTF-8?q?ctor=5Fmap=20(ROADMAP=20L1401)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completes the predictor-repo side of Wave-3 PR3 reader migration. The deployed Lambda chokepoint shipped 2026-05-19 via #181 (regime/features ._read_parquet_close); this PR closes the analysis-tool sites the L1401 PR3 follow-up plan flagged + a dead module-level constant. - ``config.PRICE_CACHE_KEY = "predictor/price_cache/{ticker}.parquet"`` deleted. ``git grep PRICE_CACHE_KEY`` returned zero references outside the constant itself — classic dead-code scaffolding. A new regression-pin test ``test_config_no_longer_exports_price_cache_key _constant`` prevents a future ``from config import PRICE_CACHE_KEY`` from silently bypassing the Wave-3 read-prefix chain. - ``analysis/triple_barrier_cutover_runner._load_prices_from_s3`` iterates a per-callsite read-prefix chain (new → legacy) for the per-ticker parquet lookup. Inlines the same shape #181 used in ``regime/features._read_parquet_close`` rather than importing — keeps analysis tools decoupled from Lambda-deployed modules. - ``analysis/triple_barrier_cutover_runner._load_sector_map_from_s3`` prepends ``reference/price_cache/sector_map.json`` to the existing fallback list (data/sector_map.json → predictor/price_cache/sector _map.json). sector_map.json now lives at THREE keys post alpha-engine-data #272's write-both gap fix; the reader prefers the new home but cascades through both legacy locations for the soak. - New test file ``tests/test_triple_barrier_cutover_runner_wave3 _readers.py`` — 7 tests covering: prefers-new, falls-back-legacy, absent-in-both (price-cache 3-test discipline); sector_map prefers- new, cascades-through-data-then-predictor, returns-empty-on-total- miss; PRICE_CACHE_KEY deletion guard. Suite 1113 → 1120 green. Discovery (surfaced + handled in main checkout, NOT in this PR): ``scripts/momentum_ic_study_21d.py`` is named in the L1401 PR3-wave-2 plan but ``git status`` confirms it is **untracked / never committed** — a local-only operator analysis script. Migrated in the main repo working copy so it survives PR4 cutover, but not added to git here. Brian's call whether to git-track it later; the migration is reversible either way. Composes with: alpha-engine-predictor #181 (the deployed Lambda chokepoint this mirrors), alpha-engine-data #272 (sector_map.json 3-key write-both that this reads from), alpha-engine-data #273 (sibling PR3-wave-2 — data-repo invasive sites). Co-Authored-By: Claude Opus 4.7 (1M context) --- analysis/triple_barrier_cutover_runner.py | 36 +++- config.py | 8 +- ...le_barrier_cutover_runner_wave3_readers.py | 195 ++++++++++++++++++ 3 files changed, 231 insertions(+), 8 deletions(-) create mode 100644 tests/test_triple_barrier_cutover_runner_wave3_readers.py diff --git a/analysis/triple_barrier_cutover_runner.py b/analysis/triple_barrier_cutover_runner.py index 4f54fc1..bdb7fa0 100644 --- a/analysis/triple_barrier_cutover_runner.py +++ b/analysis/triple_barrier_cutover_runner.py @@ -134,13 +134,26 @@ def _load_prices_from_s3( s3 = s3_client or boto3.client("s3") prices: dict = {} + # Wave-3 reader migration (ROADMAP L1401): try ``reference/price_cache/`` + # first, fall back to legacy ``predictor/price_cache/`` for the soak + # window. Mirrors regime/features._read_parquet_close's chain (PR #181) + # without importing from a deployed-Lambda module — analysis tools stay + # decoupled from Lambda concerns. + price_cache_prefixes: tuple[str, ...] = ( + "reference/price_cache/", "predictor/price_cache/", + ) for ticker in tickers: - key = f"predictor/price_cache/{ticker}.parquet" - try: - obj = s3.get_object(Bucket=bucket, Key=key) - df = pd.read_parquet(BytesIO(obj["Body"].read())) - except Exception as e: - log.debug("Skipping price load for %s: %s", ticker, e) + df = None + for prefix in price_cache_prefixes: + key = f"{prefix}{ticker}.parquet" + try: + obj = s3.get_object(Bucket=bucket, Key=key) + df = pd.read_parquet(BytesIO(obj["Body"].read())) + break + except Exception as e: + last_exc = e + if df is None: + log.debug("Skipping price load for %s: %s", ticker, last_exc) continue if df is None or df.empty or "Close" not in df.columns: continue @@ -161,7 +174,16 @@ def _load_sector_map_from_s3(bucket: str, s3_client=None) -> dict: import boto3 s3 = s3_client or boto3.client("s3") - for key in ("data/sector_map.json", "predictor/price_cache/sector_map.json"): + # sector_map.json now lives at 3 keys post alpha-engine-data #272 + # (write-both gap fix that landed alongside Wave-3 PR3-wave-1): the + # constituents collector writes to data/, predictor/, AND the new + # reference/ home. Read order = ``reference/`` first (post-PR4 sole + # survivor), then the two legacy keys we've historically tolerated. + for key in ( + "reference/price_cache/sector_map.json", + "data/sector_map.json", + "predictor/price_cache/sector_map.json", + ): try: obj = s3.get_object(Bucket=bucket, Key=key) return json.loads(obj["Body"].read()) diff --git a/config.py b/config.py index 134e2aa..f5ff437 100644 --- a/config.py +++ b/config.py @@ -64,7 +64,13 @@ def _load() -> dict: METRICS_KEY = "predictor/metrics/latest.json" -PRICE_CACHE_KEY = "predictor/price_cache/{ticker}.parquet" +# ``PRICE_CACHE_KEY`` (was ``"predictor/price_cache/{ticker}.parquet"``) was +# deleted 2026-05-19 — Wave-3 PR3-wave-2 audit found zero references across +# the predictor tree (`git grep PRICE_CACHE_KEY` returned only this site). +# The active per-prefix constants live with their readers — see +# ``regime/features.DEFAULT_PRICE_CACHE_PREFIX`` + the Wave-3 fallback chain +# added in #181. Keeping a stale module-level constant invites a future +# `from config import PRICE_CACHE_KEY` that bypasses the read-prefix chain. # ── Features ────────────────────────────────────────────────────────────────── # Must stay in sync with data/feature_engineer.py::compute_features(). diff --git a/tests/test_triple_barrier_cutover_runner_wave3_readers.py b/tests/test_triple_barrier_cutover_runner_wave3_readers.py new file mode 100644 index 0000000..9551771 --- /dev/null +++ b/tests/test_triple_barrier_cutover_runner_wave3_readers.py @@ -0,0 +1,195 @@ +"""Wave-3 PR3-wave-2 reader-migration regression suite for +``analysis/triple_barrier_cutover_runner.py``. + +Pins the per-ticker price-cache read + sector_map fallback chain so a +future "drop the legacy entry" refactor (PR4 cutover) only edits the +fallback tuples in one place — these tests then flip to expect a single +prefix. + +Composes with: + * alpha-engine-data #272 (sector_map.json write-both gap fix — now at + 3 keys: data/, predictor/, AND reference/). + * alpha-engine-predictor #181 (the regime/features._read_parquet_close + chokepoint — mirrors this file's inline fallback chain). +""" +from __future__ import annotations + +import json +import os +import sys +from io import BytesIO +from unittest.mock import MagicMock + +import pandas as pd +import pytest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from analysis.triple_barrier_cutover_runner import ( + _load_prices_from_s3, + _load_sector_map_from_s3, +) + + +def _fake_get_object_factory(payload_by_key: dict): + """Build an ``s3.get_object`` stub that returns Body bytes per key, + or raises ``NoSuchKey``-shaped ``Exception`` on miss.""" + + class _Body: + def __init__(self, b: bytes): + self._b = b + + def read(self) -> bytes: + return self._b + + def _get_object(*, Bucket: str, Key: str): + if Key not in payload_by_key: + raise FileNotFoundError(f"NoSuchKey: {Key}") + return {"Body": _Body(payload_by_key[Key])} + + return _get_object + + +def _df_to_parquet_bytes(df: pd.DataFrame) -> bytes: + buf = BytesIO() + df.to_parquet(buf, engine="pyarrow") + return buf.getvalue() + + +def _tiny_close_frame() -> pd.DataFrame: + return pd.DataFrame( + {"Close": [100.0, 101.0, 102.0]}, + index=pd.DatetimeIndex(["2026-04-20", "2026-04-21", "2026-04-22"]), + ) + + +# ── Wave-3 price-cache read chain ────────────────────────────────────────── + + +def test_load_prices_prefers_reference_prefix_over_legacy(): + """The new ``reference/price_cache/`` prefix MUST be consulted first. + During the Wave-3 soak both prefixes hold byte-equal copies; reading + from new exercises the migration end-to-end. + """ + payload = { + "reference/price_cache/AAPL.parquet": _df_to_parquet_bytes(_tiny_close_frame()), + "predictor/price_cache/AAPL.parquet": _df_to_parquet_bytes(_tiny_close_frame()), + } + s3 = MagicMock() + seen_keys: list[str] = [] + + def _stub(*, Bucket: str, Key: str): + seen_keys.append(Key) + return _fake_get_object_factory(payload)(Bucket=Bucket, Key=Key) + + s3.get_object = _stub + + out = _load_prices_from_s3("b", ["AAPL"], s3_client=s3) + assert "AAPL" in out + assert seen_keys[0] == "reference/price_cache/AAPL.parquet", ( + "Wave-3 read order: new prefix first." + ) + # Success on first prefix should short-circuit; legacy not consulted. + assert "predictor/price_cache/AAPL.parquet" not in seen_keys + + +def test_load_prices_falls_back_to_legacy_when_reference_missing(): + """Legacy fallback fills gaps during the soak (or partial backfills).""" + payload = { + "predictor/price_cache/MSFT.parquet": _df_to_parquet_bytes(_tiny_close_frame()), + } + s3 = MagicMock() + seen_keys: list[str] = [] + + def _stub(*, Bucket: str, Key: str): + seen_keys.append(Key) + return _fake_get_object_factory(payload)(Bucket=Bucket, Key=Key) + + s3.get_object = _stub + + out = _load_prices_from_s3("b", ["MSFT"], s3_client=s3) + assert "MSFT" in out + assert seen_keys == [ + "reference/price_cache/MSFT.parquet", + "predictor/price_cache/MSFT.parquet", + ], "Fallback order: new → legacy." + + +def test_load_prices_skips_ticker_absent_in_both_prefixes(): + """A ticker missing from BOTH prefixes is silently dropped per the + pre-Wave-3 contract (``caller's downstream realized-alpha join + filters those rows``).""" + s3 = MagicMock() + s3.get_object = _fake_get_object_factory({}) + + out = _load_prices_from_s3("b", ["GHOST"], s3_client=s3) + assert "GHOST" not in out + assert out == {} + + +# ── sector_map.json 3-key fallback ───────────────────────────────────────── + + +def test_sector_map_prefers_reference_key(): + """Post data #272 sector_map.json is written to 3 keys (data/, + predictor/, reference/). The reader prefers the new home — when + reference/ has the file it's returned without consulting the + legacy keys. + """ + payload = { + "reference/price_cache/sector_map.json": json.dumps({"AAPL": "XLK"}).encode(), + "data/sector_map.json": json.dumps({"AAPL": "XLY-STALE"}).encode(), + "predictor/price_cache/sector_map.json": json.dumps({"AAPL": "XLF-STALE"}).encode(), + } + s3 = MagicMock() + s3.get_object = _fake_get_object_factory(payload) + + out = _load_sector_map_from_s3("b", s3_client=s3) + assert out == {"AAPL": "XLK"}, "reference/ wins when present." + + +def test_sector_map_falls_back_to_data_then_legacy_predictor(): + """When reference/ is missing the reader cascades through the + historical fallbacks (data/sector_map.json → predictor/...).""" + # reference absent → data/ wins + payload = { + "data/sector_map.json": json.dumps({"AAPL": "XLY"}).encode(), + "predictor/price_cache/sector_map.json": json.dumps({"AAPL": "XLF-STALE"}).encode(), + } + s3 = MagicMock() + s3.get_object = _fake_get_object_factory(payload) + assert _load_sector_map_from_s3("b", s3_client=s3) == {"AAPL": "XLY"} + + # reference + data absent → legacy predictor/ wins + payload2 = { + "predictor/price_cache/sector_map.json": json.dumps({"AAPL": "XLF"}).encode(), + } + s3.get_object = _fake_get_object_factory(payload2) + assert _load_sector_map_from_s3("b", s3_client=s3) == {"AAPL": "XLF"} + + +def test_sector_map_returns_empty_when_no_key_present(): + """All three keys absent → empty dict + WARN log; caller falls back + to SPY benchmark per the existing contract.""" + s3 = MagicMock() + s3.get_object = _fake_get_object_factory({}) + + out = _load_sector_map_from_s3("b", s3_client=s3) + assert out == {} + + +# ── Dead-constant guard ──────────────────────────────────────────────────── + + +def test_config_no_longer_exports_price_cache_key_constant(): + """``PRICE_CACHE_KEY`` had zero callers as of 2026-05-19 — deleting + it prevents a future ``from config import PRICE_CACHE_KEY`` that + would silently bypass the Wave-3 read-prefix chain.""" + import config as predictor_config + + assert not hasattr(predictor_config, "PRICE_CACHE_KEY"), ( + "PRICE_CACHE_KEY was deleted in Wave-3 PR3-wave-2. If you " + "need a price-cache S3 key template, use the read-prefix " + "chain in ``regime/features._read_parquet_close`` or its " + "analysis-tool mirrors — never a bare module-level constant." + )