Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 41 additions & 6 deletions regime/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@
DEFAULT_S3_BUCKET: str = "alpha-engine-research"
DEFAULT_PRICE_CACHE_PREFIX: str = "predictor/price_cache/"

# Wave-3 reader migration (ROADMAP L1401): producer write-both PR1
# (alpha-engine-data#270, shipped 2026-05-19) seeded the new
# ``reference/price_cache/`` prefix. During the ≥1-week soak window
# (~2026-05-19 → 2026-05-26) every reader should consult the new
# prefix first and fall back to legacy on miss. ``_read_parquet_close``
# below iterates this list when the caller uses the legacy default;
# explicit-prefix callers (tests, custom configs) opt out and get
# single-prefix semantics. Wave-3 PR4 cutover drops the legacy entry
# in a one-line edit here.
_PRICE_CACHE_NEW_PREFIX: str = "reference/price_cache/"
_PRICE_CACHE_FALLBACK_PREFIXES: tuple[str, ...] = (
_PRICE_CACHE_NEW_PREFIX, DEFAULT_PRICE_CACHE_PREFIX,
)


# Source tickers expected as ``{prefix}{ticker}.parquet`` keys. Index
# symbols (VIX, VIX3M, TNX) require a leading caret in yfinance (^VIX,
Expand All @@ -69,14 +83,35 @@ def _read_parquet_close(
) -> pd.Series:
"""Read ``{prefix}{ticker}.parquet`` from S3 and return its Close
column as a Series indexed by date. Returns an empty Series when
the key is missing — callers decide whether absence is fatal."""
key = f"{prefix}{ticker}.parquet"
try:
obj = s3_client.get_object(Bucket=bucket, Key=key)
except Exception as e:
the key is missing — callers decide whether absence is fatal.

When ``prefix == DEFAULT_PRICE_CACHE_PREFIX`` (the production
default), iterates the Wave-3 fallback list (new → legacy). When
``prefix`` is anything else (test/override) we treat it as an
explicit single-prefix read.
"""
# Pick the active read order. Default → fallback chain; custom →
# single-prefix.
if prefix == DEFAULT_PRICE_CACHE_PREFIX:
candidate_prefixes: tuple[str, ...] = _PRICE_CACHE_FALLBACK_PREFIXES
else:
candidate_prefixes = (prefix,)

obj = None
last_key: str | None = None
last_exc: Exception | None = None
for cand in candidate_prefixes:
last_key = f"{cand}{ticker}.parquet"
try:
obj = s3_client.get_object(Bucket=bucket, Key=last_key)
break
except Exception as e:
last_exc = e

if obj is None:
logger.warning(
"[regime_features] missing %s at s3://%s/%s (%s) — returning empty series",
ticker, bucket, key, type(e).__name__,
ticker, bucket, last_key, type(last_exc).__name__ if last_exc else "unknown",
)
return pd.Series(name=ticker, dtype="float64")

Expand Down
74 changes: 74 additions & 0 deletions tests/test_regime_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,77 @@ def test_source_tickers_match_data_side_convention() -> None:
convention."""
for ticker in SOURCE_TICKERS:
assert not ticker.startswith("^"), f"{ticker} should not have ^ prefix"


# ── Wave-3 reader migration (ROADMAP L1401) ─────────────────────────────────


def test_wave3_reader_prefers_new_prefix_over_legacy() -> None:
"""During the producer write-both soak (alpha-engine-data#270 shipped
2026-05-19; ≥1-week soak), both prefixes hold byte-equal copies. The
reader MUST consult ``reference/price_cache/`` first so the new home
is exercised end-to-end before PR4 deletes legacy."""
from regime.features import _PRICE_CACHE_NEW_PREFIX, _read_parquet_close

s3 = _FakeS3()
# Seed the NEW prefix with one Close value, LEGACY with a clearly-
# different one. If the reader picked legacy, we'd see 999.
new_df = pd.DataFrame(
{"Close": [100.0]},
index=pd.to_datetime(["2026-05-19"]),
)
legacy_df = pd.DataFrame(
{"Close": [999.0]},
index=pd.to_datetime(["2026-05-19"]),
)
s3.put_parquet("bkt", f"{_PRICE_CACHE_NEW_PREFIX}SPY.parquet", new_df)
s3.put_parquet("bkt", f"{DEFAULT_PRICE_CACHE_PREFIX}SPY.parquet", legacy_df)

s = _read_parquet_close(
"SPY", s3_client=s3, bucket="bkt",
prefix=DEFAULT_PRICE_CACHE_PREFIX,
)
assert s.iloc[0] == 100.0, "reader picked legacy when new was present"


def test_wave3_reader_falls_back_to_legacy_when_new_missing() -> None:
"""Early in the soak window the NEW prefix can lag a fresh ticker.
The reader must fall back to legacy gracefully."""
from regime.features import _read_parquet_close

s3 = _FakeS3()
legacy_df = pd.DataFrame(
{"Close": [123.0]},
index=pd.to_datetime(["2026-05-19"]),
)
# Only legacy seeded (no NEW key).
s3.put_parquet("bkt", f"{DEFAULT_PRICE_CACHE_PREFIX}SPY.parquet", legacy_df)

s = _read_parquet_close(
"SPY", s3_client=s3, bucket="bkt",
prefix=DEFAULT_PRICE_CACHE_PREFIX,
)
assert s.iloc[0] == 123.0


def test_wave3_reader_explicit_custom_prefix_does_not_fall_back() -> None:
"""Test/config-override callers that pass a non-default prefix opt
out of the fallback chain — single-prefix semantics. Mirrors the
write-side ``price_cache_write_prefixes`` opt-out."""
from regime.features import _read_parquet_close

s3 = _FakeS3()
# Seed only the legacy default — the custom prefix has nothing.
s3.put_parquet(
"bkt", f"{DEFAULT_PRICE_CACHE_PREFIX}SPY.parquet",
pd.DataFrame(
{"Close": [100.0]},
index=pd.to_datetime(["2026-05-19"]),
),
)
s = _read_parquet_close(
"SPY", s3_client=s3, bucket="bkt",
prefix="custom/explicit/", # NOT the default
)
# Custom prefix had no SPY.parquet → empty (no fallback to legacy).
assert s.empty
Loading