From e409d97ebc3e4d7bf78d852947170ea12f613aad Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 19 May 2026 16:32:37 -0700 Subject: [PATCH] feat(wave3): price_cache_read_prefixes helper + sf_preflight reader migration + sector_map.json write-both gap fix (ROADMAP L1401) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wave-3 reader-side follow-on to producer write-both PR1 (alpha-engine-data#270, shipped 2026-05-19). Adds the read-side companion helper, migrates the simplest reader site (sf_preflight), and patches a PR1 gap: sector_map.json was only being written to `data/` + `predictor/price_cache/` — readers that hit `reference/price_cache/sector_map.json` first (e.g. alpha-engine-backtester#230) would see a stale snapshot after PR4 deletes legacy. Added - `builders/_price_cache_writeboth.price_cache_read_prefixes()` — companion to `price_cache_write_prefixes`. Read order = WRITE order REVERSED: `[reference/, predictor/]` so the new prefix is consulted first, legacy is the soak-window fallback. PR4 cutover edits the helper + the write helper in lockstep. - Re-exported alongside the existing helper. Reader migrated - `sf_preflight.py` backfill-source-freshness check — iterates the two prefixes via the helper instead of hardcoding the legacy key. Net behavior change: a missing legacy key alone no longer fails the check (post-PR4 state); only when BOTH prefixes are unreadable does the check report `fail`. Producer-side gap fix - `collectors/constituents.py` — sector_map.json now written to 3 keys: `data/` + `predictor/price_cache/` + (NEW) `reference/price_cache/`. PR1 #270's write-both helper scoped only the ticker-parquet writes (yfinance / FRED / chronic-gap), so the separately-emitted sector_map.json from this collector wasn't reaching the new prefix on an ongoing basis. The one-shot backfill 2026-05-19 22:13Z made `reference/price_cache/sector_map.json` fresh as of that moment; without this fix it would have gone stale after the next weekly producer run. Sites NOT migrated in this PR (audit/scoping) - `builders/backfill.py` (`_load_full_cache` — LIST + bulk read) - `builders/daily_append.py` (`PRICE_CACHE_PREFIX` constant + many per-ticker GETs) - `collectors/slim_cache.py` (default-arg propagation) - More invasive; warrant a focused follow-up PR with the helper already in place. Each site should iterate `price_cache_read_prefixes()` and break on first hit. Audit miss caught - `collectors/constituents.py:114` initially read as a Wave-3 reader site (per the L1401 list) but is actually a WRITER site — the L1401 entry was mislabeled. The catch surfaced the missing 3rd write target above; net better state than the original scope. Tests (+4 new, suite 1387 → 1391 green) - `test_read_helper_default_returns_new_first_legacy_second` — pins new-first read order. - `test_read_helper_explicit_legacy_returns_new_first_legacy_second` — explicit-legacy is identical to default (production semantic). - `test_read_helper_custom_prefix_returns_single` — test/override opt-out matches the write-side semantic. - `test_sector_map_writes_to_all_three_paths` — pins the 3-key write contract on `constituents.collect()`; byte-equal bodies. Composes with - alpha-engine-data#270 (producer write-both, this PR's prerequisite). - alpha-engine#197 (IAM ARN add for `reference/price_cache/`). - alpha-engine-backtester#230 (Wave-3 backtester reader, sibling PR). - Wave-3 PR4 cutover (drops the fallback branch in both helpers; the one-line edit at that time). Co-Authored-By: Claude Opus 4.7 (1M context) --- builders/_price_cache_writeboth.py | 32 ++++++++++++++++++ collectors/constituents.py | 19 +++++++++-- sf_preflight.py | 25 ++++++++++---- tests/test_constituents_sector_map.py | 47 +++++++++++++++++++++++++++ tests/test_price_cache_writeboth.py | 28 ++++++++++++++++ 5 files changed, 142 insertions(+), 9 deletions(-) diff --git a/builders/_price_cache_writeboth.py b/builders/_price_cache_writeboth.py index 8102004..426563e 100644 --- a/builders/_price_cache_writeboth.py +++ b/builders/_price_cache_writeboth.py @@ -62,8 +62,40 @@ def price_cache_write_prefixes(primary: str = PRICE_CACHE_LEGACY_PREFIX) -> list return [primary] +def price_cache_read_prefixes(primary: str = PRICE_CACHE_LEGACY_PREFIX) -> list[str]: + """Return active READ prefixes in fallback order. + + Companion to :func:`price_cache_write_prefixes` — the Wave-3 reader + migration (PR3+) iterates this in fallback order so the new prefix + is consulted first and the legacy prefix is the safety net during + the soak window. + + * ``primary == "predictor/price_cache/"`` (the production default) → + ``["reference/price_cache/", "predictor/price_cache/"]``: try + the new prefix first (post-PR4 it's the sole survivor), fall + back to legacy on miss for the soak period. + * ``primary`` is any other string → ``[primary]`` (single-read + fallback, mirrors the test-friendly write-side semantics). + + Callers wrap their reads in:: + + for prefix in price_cache_read_prefixes(s3_prefix): + try: + return s3.get_object(Bucket=bucket, Key=f"{prefix}{name}") + except ClientError: + continue + + At Wave-3 PR4 cutover the legacy entry is removed here in the same + edit that flips the write helper — one-line change in each. + """ + if primary == PRICE_CACHE_LEGACY_PREFIX: + return [PRICE_CACHE_NEW_PREFIX, PRICE_CACHE_LEGACY_PREFIX] + return [primary] + + __all__ = [ "PRICE_CACHE_LEGACY_PREFIX", "PRICE_CACHE_NEW_PREFIX", "price_cache_write_prefixes", + "price_cache_read_prefixes", ] diff --git a/collectors/constituents.py b/collectors/constituents.py index 4efe76a..55515ed 100644 --- a/collectors/constituents.py +++ b/collectors/constituents.py @@ -109,14 +109,27 @@ def collect( ) logger.info("Wrote constituents.json to s3://%s/%s (%d tickers)", bucket, key, len(tickers)) - # Write sector_map.json to canonical data path + legacy predictor path + # Write sector_map.json to canonical data path + legacy predictor + # path + Wave-3 new reference/ path. The new ``reference/`` write + # was missing from PR1 #270 because that PR scoped only the + # ticker-parquet write paths (yfinance / FRED / chronic-gap); + # sector_map.json travels through this separate constituents + # collector and needs its own write-both update so the Wave-3 + # readers don't see a stale ``reference/`` copy after PR4 retires + # legacy. sector_map_body = json.dumps(sector_etf_map, indent=2, sort_keys=True) - for sector_map_key in ["data/sector_map.json", "predictor/price_cache/sector_map.json"]: + for sector_map_key in ( + "data/sector_map.json", + "predictor/price_cache/sector_map.json", + "reference/price_cache/sector_map.json", + ): s3.put_object( Bucket=bucket, Key=sector_map_key, Body=sector_map_body, ContentType="application/json", ) - logger.info("Wrote sector_map.json to data/ and predictor/ paths") + logger.info( + "Wrote sector_map.json to data/, predictor/, and reference/ paths", + ) # tickers is included in the return so callers don't need an S3 round-trip # to re-read what they just wrote. Pre-MorningEnrich preflight (PR #134) diff --git a/sf_preflight.py b/sf_preflight.py index cd32723..fbe8985 100644 --- a/sf_preflight.py +++ b/sf_preflight.py @@ -584,17 +584,30 @@ def check_backfill_source_freshness(ctx: PreflightContext) -> CheckResult: # Backfill source = price_cache + daily_closes delta. Effective last # is max(price_cache_last, daily_closes_last). Read SPY parquet. - try: - obj = s3.get_object(Bucket=ctx.bucket, Key="predictor/price_cache/SPY.parquet") - df = pd.read_parquet(io.BytesIO(obj["Body"].read())) - cache_last = pd.Timestamp(df.index[-1]).normalize() - except Exception as exc: + # + # Wave-3 reader migration (ROADMAP L1401): try the new + # ``reference/price_cache/`` prefix first, fall back to legacy + # ``predictor/price_cache/`` during the producer write-both soak + # (PR1 #270 shipped 2026-05-19; soak ≥1 week to ~2026-05-26). + from builders._price_cache_writeboth import price_cache_read_prefixes + + df = None + last_exc: Exception | None = None + for prefix in price_cache_read_prefixes(): + try: + obj = s3.get_object(Bucket=ctx.bucket, Key=f"{prefix}SPY.parquet") + df = pd.read_parquet(io.BytesIO(obj["Body"].read())) + break + except Exception as exc: + last_exc = exc + if df is None: return CheckResult( name="backfill_source_freshness", status="fail", - message=f"price_cache SPY read raised: {exc}", + message=f"price_cache SPY read raised (both prefixes): {last_exc}", elapsed_seconds=time.time() - t0, ) + cache_last = pd.Timestamp(df.index[-1]).normalize() # Daily delta — staging/daily_closes/{prior_trading_day}.parquet. try: diff --git a/tests/test_constituents_sector_map.py b/tests/test_constituents_sector_map.py index 445d5bb..b9f5199 100644 --- a/tests/test_constituents_sector_map.py +++ b/tests/test_constituents_sector_map.py @@ -448,3 +448,50 @@ def fake_get(url, **kwargs): "the count is only useful as observability, not as a tickers source" ) assert set(result["tickers"]) == {"AAPL", "MSFT", "JHG", "WSO"} + + +def test_sector_map_writes_to_all_three_paths() -> None: + """Wave-3 PR3 (ROADMAP L1401): sector_map.json must be written to: + + 1. ``data/sector_map.json`` — canonical \"new\" data path. + 2. ``predictor/price_cache/sector_map.json`` — legacy path (retired + in PR4). + 3. ``reference/price_cache/sector_map.json`` — Wave-3 new home for + the predictor/price_cache/ migration. PR1 #270 missed this + write (it scoped only the ticker-parquet writes); without it, + readers that hit ``reference/`` first see a stale snapshot + after PR4 deletes legacy. + """ + from unittest.mock import MagicMock + + sp500_html = _fake_html( + ["AAPL"], ["Information Technology"], + ) + sp400_html = _fake_html(["JHG"], ["Financials"]) + + def fake_get(url, **kwargs): + return _FakeResp(sp500_html if "500" in url else sp400_html) + + put_calls: list[dict] = [] + fake_s3 = MagicMock() + fake_s3.put_object.side_effect = lambda **kw: put_calls.append(kw) + + fake_boto3 = MagicMock() + fake_boto3.client.return_value = fake_s3 + + with patch("collectors.constituents.requests.get", side_effect=fake_get), \ + patch("collectors.constituents.boto3", fake_boto3): + constituents.collect(bucket="any", dry_run=False) + + sector_map_writes = [ + c for c in put_calls if c["Key"].endswith("sector_map.json") + ] + written_keys = {c["Key"] for c in sector_map_writes} + assert written_keys == { + "data/sector_map.json", + "predictor/price_cache/sector_map.json", + "reference/price_cache/sector_map.json", + }, written_keys + # Bodies must be byte-equal — readers can pick any path safely. + bodies = [c["Body"] for c in sector_map_writes] + assert len(set(bodies)) == 1, "sector_map.json bodies diverge across paths" diff --git a/tests/test_price_cache_writeboth.py b/tests/test_price_cache_writeboth.py index ae658b6..89258fa 100644 --- a/tests/test_price_cache_writeboth.py +++ b/tests/test_price_cache_writeboth.py @@ -26,6 +26,7 @@ from builders._price_cache_writeboth import ( PRICE_CACHE_LEGACY_PREFIX, PRICE_CACHE_NEW_PREFIX, + price_cache_read_prefixes, price_cache_write_prefixes, ) @@ -71,6 +72,33 @@ def test_new_prefix_is_not_legacy(): assert PRICE_CACHE_LEGACY_PREFIX.startswith("predictor/") +# --------------------------------------------------------------------------- +# Read-side helper (Wave-3 PR3 reader migration) +# --------------------------------------------------------------------------- + + +def test_read_helper_default_returns_new_first_legacy_second(): + """The read order is the WRITE order REVERSED — new prefix consulted + first so consumers see the post-PR4 home as soon as the soak begins; + legacy is the fallback during the soak window only. + """ + out = price_cache_read_prefixes() + assert out == [PRICE_CACHE_NEW_PREFIX, PRICE_CACHE_LEGACY_PREFIX] + + +def test_read_helper_explicit_legacy_returns_new_first_legacy_second(): + out = price_cache_read_prefixes(PRICE_CACHE_LEGACY_PREFIX) + assert out == [PRICE_CACHE_NEW_PREFIX, PRICE_CACHE_LEGACY_PREFIX] + + +def test_read_helper_custom_prefix_returns_single(): + """Test/config-override prefix opts out of the fallback chain — mirrors + the write-side single-prefix semantics.""" + custom = "some/other/prefix/" + out = price_cache_read_prefixes(custom) + assert out == [custom] + + # --------------------------------------------------------------------------- # collectors/prices.py — yfinance refresh upload # ---------------------------------------------------------------------------