diff --git a/regime/features.py b/regime/features.py index 19b4f64..6c9df53 100644 --- a/regime/features.py +++ b/regime/features.py @@ -49,6 +49,20 @@ DEFAULT_S3_BUCKET: str = "alpha-engine-research" DEFAULT_PRICE_CACHE_PREFIX: str = "predictor/price_cache/" +# Wave-3 reader migration (ROADMAP L1401): producer write-both PR1 +# (alpha-engine-data#270, shipped 2026-05-19) seeded the new +# ``reference/price_cache/`` prefix. During the ≥1-week soak window +# (~2026-05-19 → 2026-05-26) every reader should consult the new +# prefix first and fall back to legacy on miss. ``_read_parquet_close`` +# below iterates this list when the caller uses the legacy default; +# explicit-prefix callers (tests, custom configs) opt out and get +# single-prefix semantics. Wave-3 PR4 cutover drops the legacy entry +# in a one-line edit here. +_PRICE_CACHE_NEW_PREFIX: str = "reference/price_cache/" +_PRICE_CACHE_FALLBACK_PREFIXES: tuple[str, ...] = ( + _PRICE_CACHE_NEW_PREFIX, DEFAULT_PRICE_CACHE_PREFIX, +) + # Source tickers expected as ``{prefix}{ticker}.parquet`` keys. Index # symbols (VIX, VIX3M, TNX) require a leading caret in yfinance (^VIX, @@ -69,14 +83,35 @@ def _read_parquet_close( ) -> pd.Series: """Read ``{prefix}{ticker}.parquet`` from S3 and return its Close column as a Series indexed by date. Returns an empty Series when - the key is missing — callers decide whether absence is fatal.""" - key = f"{prefix}{ticker}.parquet" - try: - obj = s3_client.get_object(Bucket=bucket, Key=key) - except Exception as e: + the key is missing — callers decide whether absence is fatal. + + When ``prefix == DEFAULT_PRICE_CACHE_PREFIX`` (the production + default), iterates the Wave-3 fallback list (new → legacy). When + ``prefix`` is anything else (test/override) we treat it as an + explicit single-prefix read. + """ + # Pick the active read order. Default → fallback chain; custom → + # single-prefix. + if prefix == DEFAULT_PRICE_CACHE_PREFIX: + candidate_prefixes: tuple[str, ...] = _PRICE_CACHE_FALLBACK_PREFIXES + else: + candidate_prefixes = (prefix,) + + obj = None + last_key: str | None = None + last_exc: Exception | None = None + for cand in candidate_prefixes: + last_key = f"{cand}{ticker}.parquet" + try: + obj = s3_client.get_object(Bucket=bucket, Key=last_key) + break + except Exception as e: + last_exc = e + + if obj is None: logger.warning( "[regime_features] missing %s at s3://%s/%s (%s) — returning empty series", - ticker, bucket, key, type(e).__name__, + ticker, bucket, last_key, type(last_exc).__name__ if last_exc else "unknown", ) return pd.Series(name=ticker, dtype="float64") diff --git a/tests/test_regime_features.py b/tests/test_regime_features.py index 623e977..5b87b6b 100644 --- a/tests/test_regime_features.py +++ b/tests/test_regime_features.py @@ -190,3 +190,77 @@ def test_source_tickers_match_data_side_convention() -> None: convention.""" for ticker in SOURCE_TICKERS: assert not ticker.startswith("^"), f"{ticker} should not have ^ prefix" + + +# ── Wave-3 reader migration (ROADMAP L1401) ───────────────────────────────── + + +def test_wave3_reader_prefers_new_prefix_over_legacy() -> None: + """During the producer write-both soak (alpha-engine-data#270 shipped + 2026-05-19; ≥1-week soak), both prefixes hold byte-equal copies. The + reader MUST consult ``reference/price_cache/`` first so the new home + is exercised end-to-end before PR4 deletes legacy.""" + from regime.features import _PRICE_CACHE_NEW_PREFIX, _read_parquet_close + + s3 = _FakeS3() + # Seed the NEW prefix with one Close value, LEGACY with a clearly- + # different one. If the reader picked legacy, we'd see 999. + new_df = pd.DataFrame( + {"Close": [100.0]}, + index=pd.to_datetime(["2026-05-19"]), + ) + legacy_df = pd.DataFrame( + {"Close": [999.0]}, + index=pd.to_datetime(["2026-05-19"]), + ) + s3.put_parquet("bkt", f"{_PRICE_CACHE_NEW_PREFIX}SPY.parquet", new_df) + s3.put_parquet("bkt", f"{DEFAULT_PRICE_CACHE_PREFIX}SPY.parquet", legacy_df) + + s = _read_parquet_close( + "SPY", s3_client=s3, bucket="bkt", + prefix=DEFAULT_PRICE_CACHE_PREFIX, + ) + assert s.iloc[0] == 100.0, "reader picked legacy when new was present" + + +def test_wave3_reader_falls_back_to_legacy_when_new_missing() -> None: + """Early in the soak window the NEW prefix can lag a fresh ticker. + The reader must fall back to legacy gracefully.""" + from regime.features import _read_parquet_close + + s3 = _FakeS3() + legacy_df = pd.DataFrame( + {"Close": [123.0]}, + index=pd.to_datetime(["2026-05-19"]), + ) + # Only legacy seeded (no NEW key). + s3.put_parquet("bkt", f"{DEFAULT_PRICE_CACHE_PREFIX}SPY.parquet", legacy_df) + + s = _read_parquet_close( + "SPY", s3_client=s3, bucket="bkt", + prefix=DEFAULT_PRICE_CACHE_PREFIX, + ) + assert s.iloc[0] == 123.0 + + +def test_wave3_reader_explicit_custom_prefix_does_not_fall_back() -> None: + """Test/config-override callers that pass a non-default prefix opt + out of the fallback chain — single-prefix semantics. Mirrors the + write-side ``price_cache_write_prefixes`` opt-out.""" + from regime.features import _read_parquet_close + + s3 = _FakeS3() + # Seed only the legacy default — the custom prefix has nothing. + s3.put_parquet( + "bkt", f"{DEFAULT_PRICE_CACHE_PREFIX}SPY.parquet", + pd.DataFrame( + {"Close": [100.0]}, + index=pd.to_datetime(["2026-05-19"]), + ), + ) + s = _read_parquet_close( + "SPY", s3_client=s3, bucket="bkt", + prefix="custom/explicit/", # NOT the default + ) + # Custom prefix had no SPY.parquet → empty (no fallback to legacy). + assert s.empty