diff --git a/builders/_price_cache_writeboth.py b/builders/_price_cache_writeboth.py index 8102004..426563e 100644 --- a/builders/_price_cache_writeboth.py +++ b/builders/_price_cache_writeboth.py @@ -62,8 +62,40 @@ def price_cache_write_prefixes(primary: str = PRICE_CACHE_LEGACY_PREFIX) -> list return [primary] +def price_cache_read_prefixes(primary: str = PRICE_CACHE_LEGACY_PREFIX) -> list[str]: + """Return active READ prefixes in fallback order. + + Companion to :func:`price_cache_write_prefixes` — the Wave-3 reader + migration (PR3+) iterates this in fallback order so the new prefix + is consulted first and the legacy prefix is the safety net during + the soak window. + + * ``primary == "predictor/price_cache/"`` (the production default) → + ``["reference/price_cache/", "predictor/price_cache/"]``: try + the new prefix first (post-PR4 it's the sole survivor), fall + back to legacy on miss for the soak period. + * ``primary`` is any other string → ``[primary]`` (single-read + fallback, mirrors the test-friendly write-side semantics). + + Callers wrap their reads in:: + + for prefix in price_cache_read_prefixes(s3_prefix): + try: + return s3.get_object(Bucket=bucket, Key=f"{prefix}{name}") + except ClientError: + continue + + At Wave-3 PR4 cutover the legacy entry is removed here in the same + edit that flips the write helper — one-line change in each. + """ + if primary == PRICE_CACHE_LEGACY_PREFIX: + return [PRICE_CACHE_NEW_PREFIX, PRICE_CACHE_LEGACY_PREFIX] + return [primary] + + __all__ = [ "PRICE_CACHE_LEGACY_PREFIX", "PRICE_CACHE_NEW_PREFIX", "price_cache_write_prefixes", + "price_cache_read_prefixes", ] diff --git a/collectors/constituents.py b/collectors/constituents.py index 4efe76a..55515ed 100644 --- a/collectors/constituents.py +++ b/collectors/constituents.py @@ -109,14 +109,27 @@ def collect( ) logger.info("Wrote constituents.json to s3://%s/%s (%d tickers)", bucket, key, len(tickers)) - # Write sector_map.json to canonical data path + legacy predictor path + # Write sector_map.json to canonical data path + legacy predictor + # path + Wave-3 new reference/ path. The new ``reference/`` write + # was missing from PR1 #270 because that PR scoped only the + # ticker-parquet write paths (yfinance / FRED / chronic-gap); + # sector_map.json travels through this separate constituents + # collector and needs its own write-both update so the Wave-3 + # readers don't see a stale ``reference/`` copy after PR4 retires + # legacy. sector_map_body = json.dumps(sector_etf_map, indent=2, sort_keys=True) - for sector_map_key in ["data/sector_map.json", "predictor/price_cache/sector_map.json"]: + for sector_map_key in ( + "data/sector_map.json", + "predictor/price_cache/sector_map.json", + "reference/price_cache/sector_map.json", + ): s3.put_object( Bucket=bucket, Key=sector_map_key, Body=sector_map_body, ContentType="application/json", ) - logger.info("Wrote sector_map.json to data/ and predictor/ paths") + logger.info( + "Wrote sector_map.json to data/, predictor/, and reference/ paths", + ) # tickers is included in the return so callers don't need an S3 round-trip # to re-read what they just wrote. Pre-MorningEnrich preflight (PR #134) diff --git a/sf_preflight.py b/sf_preflight.py index cd32723..fbe8985 100644 --- a/sf_preflight.py +++ b/sf_preflight.py @@ -584,17 +584,30 @@ def check_backfill_source_freshness(ctx: PreflightContext) -> CheckResult: # Backfill source = price_cache + daily_closes delta. Effective last # is max(price_cache_last, daily_closes_last). Read SPY parquet. - try: - obj = s3.get_object(Bucket=ctx.bucket, Key="predictor/price_cache/SPY.parquet") - df = pd.read_parquet(io.BytesIO(obj["Body"].read())) - cache_last = pd.Timestamp(df.index[-1]).normalize() - except Exception as exc: + # + # Wave-3 reader migration (ROADMAP L1401): try the new + # ``reference/price_cache/`` prefix first, fall back to legacy + # ``predictor/price_cache/`` during the producer write-both soak + # (PR1 #270 shipped 2026-05-19; soak ≥1 week to ~2026-05-26). + from builders._price_cache_writeboth import price_cache_read_prefixes + + df = None + last_exc: Exception | None = None + for prefix in price_cache_read_prefixes(): + try: + obj = s3.get_object(Bucket=ctx.bucket, Key=f"{prefix}SPY.parquet") + df = pd.read_parquet(io.BytesIO(obj["Body"].read())) + break + except Exception as exc: + last_exc = exc + if df is None: return CheckResult( name="backfill_source_freshness", status="fail", - message=f"price_cache SPY read raised: {exc}", + message=f"price_cache SPY read raised (both prefixes): {last_exc}", elapsed_seconds=time.time() - t0, ) + cache_last = pd.Timestamp(df.index[-1]).normalize() # Daily delta — staging/daily_closes/{prior_trading_day}.parquet. try: diff --git a/tests/test_constituents_sector_map.py b/tests/test_constituents_sector_map.py index 445d5bb..b9f5199 100644 --- a/tests/test_constituents_sector_map.py +++ b/tests/test_constituents_sector_map.py @@ -448,3 +448,50 @@ def fake_get(url, **kwargs): "the count is only useful as observability, not as a tickers source" ) assert set(result["tickers"]) == {"AAPL", "MSFT", "JHG", "WSO"} + + +def test_sector_map_writes_to_all_three_paths() -> None: + """Wave-3 PR3 (ROADMAP L1401): sector_map.json must be written to: + + 1. ``data/sector_map.json`` — canonical \"new\" data path. + 2. ``predictor/price_cache/sector_map.json`` — legacy path (retired + in PR4). + 3. ``reference/price_cache/sector_map.json`` — Wave-3 new home for + the predictor/price_cache/ migration. PR1 #270 missed this + write (it scoped only the ticker-parquet writes); without it, + readers that hit ``reference/`` first see a stale snapshot + after PR4 deletes legacy. + """ + from unittest.mock import MagicMock + + sp500_html = _fake_html( + ["AAPL"], ["Information Technology"], + ) + sp400_html = _fake_html(["JHG"], ["Financials"]) + + def fake_get(url, **kwargs): + return _FakeResp(sp500_html if "500" in url else sp400_html) + + put_calls: list[dict] = [] + fake_s3 = MagicMock() + fake_s3.put_object.side_effect = lambda **kw: put_calls.append(kw) + + fake_boto3 = MagicMock() + fake_boto3.client.return_value = fake_s3 + + with patch("collectors.constituents.requests.get", side_effect=fake_get), \ + patch("collectors.constituents.boto3", fake_boto3): + constituents.collect(bucket="any", dry_run=False) + + sector_map_writes = [ + c for c in put_calls if c["Key"].endswith("sector_map.json") + ] + written_keys = {c["Key"] for c in sector_map_writes} + assert written_keys == { + "data/sector_map.json", + "predictor/price_cache/sector_map.json", + "reference/price_cache/sector_map.json", + }, written_keys + # Bodies must be byte-equal — readers can pick any path safely. + bodies = [c["Body"] for c in sector_map_writes] + assert len(set(bodies)) == 1, "sector_map.json bodies diverge across paths" diff --git a/tests/test_price_cache_writeboth.py b/tests/test_price_cache_writeboth.py index ae658b6..89258fa 100644 --- a/tests/test_price_cache_writeboth.py +++ b/tests/test_price_cache_writeboth.py @@ -26,6 +26,7 @@ from builders._price_cache_writeboth import ( PRICE_CACHE_LEGACY_PREFIX, PRICE_CACHE_NEW_PREFIX, + price_cache_read_prefixes, price_cache_write_prefixes, ) @@ -71,6 +72,33 @@ def test_new_prefix_is_not_legacy(): assert PRICE_CACHE_LEGACY_PREFIX.startswith("predictor/") +# --------------------------------------------------------------------------- +# Read-side helper (Wave-3 PR3 reader migration) +# --------------------------------------------------------------------------- + + +def test_read_helper_default_returns_new_first_legacy_second(): + """The read order is the WRITE order REVERSED — new prefix consulted + first so consumers see the post-PR4 home as soon as the soak begins; + legacy is the fallback during the soak window only. + """ + out = price_cache_read_prefixes() + assert out == [PRICE_CACHE_NEW_PREFIX, PRICE_CACHE_LEGACY_PREFIX] + + +def test_read_helper_explicit_legacy_returns_new_first_legacy_second(): + out = price_cache_read_prefixes(PRICE_CACHE_LEGACY_PREFIX) + assert out == [PRICE_CACHE_NEW_PREFIX, PRICE_CACHE_LEGACY_PREFIX] + + +def test_read_helper_custom_prefix_returns_single(): + """Test/config-override prefix opts out of the fallback chain — mirrors + the write-side single-prefix semantics.""" + custom = "some/other/prefix/" + out = price_cache_read_prefixes(custom) + assert out == [custom] + + # --------------------------------------------------------------------------- # collectors/prices.py — yfinance refresh upload # ---------------------------------------------------------------------------