From 27823a7866497446aaccfef0202e82b8e3f1fa2a Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 19 May 2026 16:36:51 -0700 Subject: [PATCH] feat(wave3): try reference/price_cache/ first in regime/features._read_parquet_close (ROADMAP L1401) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wave-3 reader-side migration for the regime substrate fetcher (`regime/features.fetch_macro_feature_history` + its daily sibling + all callers in `regime/handler.py`, `regime/retrospective_eval_handler.py`, the backfill scripts). Companion to producer write-both PR1 (alpha-engine-data#270, shipped 2026-05-19). The legacy single-key read is funneled through one chokepoint — `_read_parquet_close(ticker, ..., prefix)` — so this PR is a surgical change there: - When `prefix == DEFAULT_PRICE_CACHE_PREFIX` (the production default used by handler.py + retrospective_eval_handler.py + the backfill scripts), iterate `(reference/, predictor/)` in order and return the first hit. New prefix consulted first; legacy is the soak-window fallback. - When `prefix` is anything else (a custom test/override prefix), single-prefix semantics — opt-out from the fallback chain. Mirrors the write-side `price_cache_write_prefixes` opt-out. After Wave-3 PR4 retires legacy + runs `aws s3 rm --recursive predictor/price_cache/`, the fallback entry in `_PRICE_CACHE_FALLBACK_PREFIXES` becomes dead and is dropped in the same one-line cutover edit. Tests (+3 new, suite 13 → 16 in test_regime_features.py) - `test_wave3_reader_prefers_new_prefix_over_legacy` — plants both prefixes with distinct Close values; asserts the new prefix wins. - `test_wave3_reader_falls_back_to_legacy_when_new_missing` — early- soak ticker only in legacy; reader must serve it. - `test_wave3_reader_explicit_custom_prefix_does_not_fall_back` — custom prefix opts out; empty result when its key is missing. Existing tests unaffected — they seed to `DEFAULT_PRICE_CACHE_PREFIX` (legacy) and the fallback chain still finds the data on the second attempt. Composes with - alpha-engine-data#270 (producer write-both, prerequisite). - alpha-engine#197 (IAM ARN add for `reference/price_cache/`). - alpha-engine-data#272 + alpha-engine-backtester#230 (sibling Wave-3 reader migrations). - Wave-3 PR4 cutover (drops the fallback entry). Co-Authored-By: Claude Opus 4.7 (1M context) --- regime/features.py | 47 +++++++++++++++++++--- tests/test_regime_features.py | 74 +++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 6 deletions(-) diff --git a/regime/features.py b/regime/features.py index 19b4f64..6c9df53 100644 --- a/regime/features.py +++ b/regime/features.py @@ -49,6 +49,20 @@ DEFAULT_S3_BUCKET: str = "alpha-engine-research" DEFAULT_PRICE_CACHE_PREFIX: str = "predictor/price_cache/" +# Wave-3 reader migration (ROADMAP L1401): producer write-both PR1 +# (alpha-engine-data#270, shipped 2026-05-19) seeded the new +# ``reference/price_cache/`` prefix. During the ≥1-week soak window +# (~2026-05-19 → 2026-05-26) every reader should consult the new +# prefix first and fall back to legacy on miss. ``_read_parquet_close`` +# below iterates this list when the caller uses the legacy default; +# explicit-prefix callers (tests, custom configs) opt out and get +# single-prefix semantics. Wave-3 PR4 cutover drops the legacy entry +# in a one-line edit here. +_PRICE_CACHE_NEW_PREFIX: str = "reference/price_cache/" +_PRICE_CACHE_FALLBACK_PREFIXES: tuple[str, ...] = ( + _PRICE_CACHE_NEW_PREFIX, DEFAULT_PRICE_CACHE_PREFIX, +) + # Source tickers expected as ``{prefix}{ticker}.parquet`` keys. Index # symbols (VIX, VIX3M, TNX) require a leading caret in yfinance (^VIX, @@ -69,14 +83,35 @@ def _read_parquet_close( ) -> pd.Series: """Read ``{prefix}{ticker}.parquet`` from S3 and return its Close column as a Series indexed by date. Returns an empty Series when - the key is missing — callers decide whether absence is fatal.""" - key = f"{prefix}{ticker}.parquet" - try: - obj = s3_client.get_object(Bucket=bucket, Key=key) - except Exception as e: + the key is missing — callers decide whether absence is fatal. + + When ``prefix == DEFAULT_PRICE_CACHE_PREFIX`` (the production + default), iterates the Wave-3 fallback list (new → legacy). When + ``prefix`` is anything else (test/override) we treat it as an + explicit single-prefix read. + """ + # Pick the active read order. Default → fallback chain; custom → + # single-prefix. + if prefix == DEFAULT_PRICE_CACHE_PREFIX: + candidate_prefixes: tuple[str, ...] = _PRICE_CACHE_FALLBACK_PREFIXES + else: + candidate_prefixes = (prefix,) + + obj = None + last_key: str | None = None + last_exc: Exception | None = None + for cand in candidate_prefixes: + last_key = f"{cand}{ticker}.parquet" + try: + obj = s3_client.get_object(Bucket=bucket, Key=last_key) + break + except Exception as e: + last_exc = e + + if obj is None: logger.warning( "[regime_features] missing %s at s3://%s/%s (%s) — returning empty series", - ticker, bucket, key, type(e).__name__, + ticker, bucket, last_key, type(last_exc).__name__ if last_exc else "unknown", ) return pd.Series(name=ticker, dtype="float64") diff --git a/tests/test_regime_features.py b/tests/test_regime_features.py index 623e977..5b87b6b 100644 --- a/tests/test_regime_features.py +++ b/tests/test_regime_features.py @@ -190,3 +190,77 @@ def test_source_tickers_match_data_side_convention() -> None: convention.""" for ticker in SOURCE_TICKERS: assert not ticker.startswith("^"), f"{ticker} should not have ^ prefix" + + +# ── Wave-3 reader migration (ROADMAP L1401) ───────────────────────────────── + + +def test_wave3_reader_prefers_new_prefix_over_legacy() -> None: + """During the producer write-both soak (alpha-engine-data#270 shipped + 2026-05-19; ≥1-week soak), both prefixes hold byte-equal copies. The + reader MUST consult ``reference/price_cache/`` first so the new home + is exercised end-to-end before PR4 deletes legacy.""" + from regime.features import _PRICE_CACHE_NEW_PREFIX, _read_parquet_close + + s3 = _FakeS3() + # Seed the NEW prefix with one Close value, LEGACY with a clearly- + # different one. If the reader picked legacy, we'd see 999. + new_df = pd.DataFrame( + {"Close": [100.0]}, + index=pd.to_datetime(["2026-05-19"]), + ) + legacy_df = pd.DataFrame( + {"Close": [999.0]}, + index=pd.to_datetime(["2026-05-19"]), + ) + s3.put_parquet("bkt", f"{_PRICE_CACHE_NEW_PREFIX}SPY.parquet", new_df) + s3.put_parquet("bkt", f"{DEFAULT_PRICE_CACHE_PREFIX}SPY.parquet", legacy_df) + + s = _read_parquet_close( + "SPY", s3_client=s3, bucket="bkt", + prefix=DEFAULT_PRICE_CACHE_PREFIX, + ) + assert s.iloc[0] == 100.0, "reader picked legacy when new was present" + + +def test_wave3_reader_falls_back_to_legacy_when_new_missing() -> None: + """Early in the soak window the NEW prefix can lag a fresh ticker. + The reader must fall back to legacy gracefully.""" + from regime.features import _read_parquet_close + + s3 = _FakeS3() + legacy_df = pd.DataFrame( + {"Close": [123.0]}, + index=pd.to_datetime(["2026-05-19"]), + ) + # Only legacy seeded (no NEW key). + s3.put_parquet("bkt", f"{DEFAULT_PRICE_CACHE_PREFIX}SPY.parquet", legacy_df) + + s = _read_parquet_close( + "SPY", s3_client=s3, bucket="bkt", + prefix=DEFAULT_PRICE_CACHE_PREFIX, + ) + assert s.iloc[0] == 123.0 + + +def test_wave3_reader_explicit_custom_prefix_does_not_fall_back() -> None: + """Test/config-override callers that pass a non-default prefix opt + out of the fallback chain — single-prefix semantics. Mirrors the + write-side ``price_cache_write_prefixes`` opt-out.""" + from regime.features import _read_parquet_close + + s3 = _FakeS3() + # Seed only the legacy default — the custom prefix has nothing. + s3.put_parquet( + "bkt", f"{DEFAULT_PRICE_CACHE_PREFIX}SPY.parquet", + pd.DataFrame( + {"Close": [100.0]}, + index=pd.to_datetime(["2026-05-19"]), + ), + ) + s = _read_parquet_close( + "SPY", s3_client=s3, bucket="bkt", + prefix="custom/explicit/", # NOT the default + ) + # Custom prefix had no SPY.parquet → empty (no fallback to legacy). + assert s.empty