From 696e2722b17f0105ead718dffd5dc80d3197b27f Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 19 May 2026 17:14:04 -0700 Subject: [PATCH] =?UTF-8?q?feat(wave3):=20PR3-wave-2=20invasive=20reader?= =?UTF-8?q?=20migrations=20=E2=80=94=20backfill=20+=20daily=5Fappend=20+?= =?UTF-8?q?=20slim=5Fcache=20(ROADMAP=20L1401)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completes the data-repo side of Wave-3 PR3 reader migration. Three sites called out as invasive in the L1401 PR3 follow-up plan now consult the new `reference/price_cache/` prefix first and fall back to legacy `predictor/price_cache/` during the soak — the PR4 cutover then drops the legacy entry in a one-line edit of `_price_cache_writeboth.py`. - New helper `list_price_cache_keys(s3, bucket, primary)` extends the Wave-3 chokepoint to aggregate-list sites; iterates the read-prefix fallback chain and deduplicates by `{ticker}.parquet` basename (first-prefix-wins). Custom prefixes opt out of the chain matching the existing single-key helper semantics. - `builders/backfill._load_full_cache` swaps its hand-rolled paginator for `list_price_cache_keys`; the production default exercises the new prefix first, custom-prefix callers (tests / ad-hoc invocations) keep single-prefix listing. - `builders/daily_append._load_parquet_warmup` iterates `price_cache_read_prefixes` for the single-key lookup. NoSuchKey on the new prefix falls through to legacy; absent from BOTH prefixes still degrades to None (the PR #78 graceful path). Non-404 errors still hard-fail on the first prefix that raises — NoSilentFails preserved. - `collectors/slim_cache.collect` switches its `_list_parquets` paginator to the same chokepoint; the now-unused private helper is deleted. - 6 new behavioural tests: `list_price_cache_keys` 3-test discipline (prefers-new / falls-back-legacy / explicit-custom-opt-out) + `_load_parquet_warmup` matching 3-test set. Backfill is covered transitively through the helper's tests. Suite 1387 → 1399 green. PR4 cutover (post-soak, earliest ~2026-05-26) drops the legacy entry from both `price_cache_write_prefixes` and `price_cache_read_prefixes` in one edit; the `PRICE_CACHE_PREFIX = PRICE_CACHE_LEGACY_PREFIX` alias in `daily_append.py` gets deleted in the same edit. Co-Authored-By: Claude Opus 4.7 (1M context) --- builders/_price_cache_writeboth.py | 53 +++++++++++++++ builders/backfill.py | 31 ++++++--- builders/daily_append.py | 51 ++++++++++---- collectors/slim_cache.py | 32 ++++----- tests/test_daily_append_parquet_warmup.py | 80 +++++++++++++++++++++- tests/test_price_cache_writeboth.py | 83 +++++++++++++++++++++++ 6 files changed, 290 insertions(+), 40 deletions(-) diff --git a/builders/_price_cache_writeboth.py b/builders/_price_cache_writeboth.py index 426563e..45a8bf1 100644 --- a/builders/_price_cache_writeboth.py +++ b/builders/_price_cache_writeboth.py @@ -27,6 +27,8 @@ from __future__ import annotations +from typing import Any + # Wave 3 PR1 — legacy primary, new mirror. The cutover PR (Wave 3 PR4) flips # ``PRICE_CACHE_NEW_PREFIX`` to first position and removes the legacy entry # from ``price_cache_write_prefixes`` (one-line edit at that time). @@ -93,9 +95,60 @@ def price_cache_read_prefixes(primary: str = PRICE_CACHE_LEGACY_PREFIX) -> list[ return [primary] +def list_price_cache_keys( + s3: Any, bucket: str, primary: str = PRICE_CACHE_LEGACY_PREFIX, +) -> list[str]: + """List per-ticker parquet keys across the active read prefixes. + + Companion to :func:`price_cache_read_prefixes` for aggregate-read + sites (``builders/backfill._load_full_cache``, + ``collectors/slim_cache.collect``). Iterates the fallback + chain in read order and returns deduplicated keys by + ``{ticker}.parquet`` basename — first prefix wins, so the new + prefix takes precedence during the Wave-3 soak and is the sole + survivor post-PR4 cutover. Legacy fills any reference-side gaps. + + Keys are returned with their full prefix attached (callers pass + them straight to ``s3.get_object`` / ``download_file``). Order is + deterministic: all unique keys discovered under the first prefix + in :func:`price_cache_read_prefixes` order, then any new keys + from the second prefix. + + The single-key fallback path (one ticker, two candidate prefixes) + is inlined at the few call sites that need it — the predictor's + ``regime/features._read_parquet_close`` chokepoint and the data + repo's ``_load_parquet_warmup`` — mirroring the + :func:`price_cache_read_prefixes` semantics without a second + helper signature. + """ + if primary != PRICE_CACHE_LEGACY_PREFIX: + # Custom prefix opts out of the fallback chain, matching the + # write/read-helper conventions for tests + config overrides. + prefixes: list[str] = [primary] + else: + prefixes = price_cache_read_prefixes(primary) + + seen_basenames: set[str] = set() + out: list[str] = [] + paginator = s3.get_paginator("list_objects_v2") + for prefix in prefixes: + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + for obj in page.get("Contents", []): + key = obj["Key"] + if not key.endswith(".parquet"): + continue + basename = key.rsplit("/", 1)[-1] + if basename in seen_basenames: + continue + seen_basenames.add(basename) + out.append(key) + return out + + __all__ = [ "PRICE_CACHE_LEGACY_PREFIX", "PRICE_CACHE_NEW_PREFIX", "price_cache_write_prefixes", "price_cache_read_prefixes", + "list_price_cache_keys", ] diff --git a/builders/backfill.py b/builders/backfill.py index 34f6372..302ccaa 100644 --- a/builders/backfill.py +++ b/builders/backfill.py @@ -50,6 +50,10 @@ make_source_series, ) from store.arctic_store import get_universe_lib, get_macro_lib, to_arctic_safe +from builders._price_cache_writeboth import ( + PRICE_CACHE_LEGACY_PREFIX, + list_price_cache_keys, +) log = logging.getLogger(__name__) @@ -100,20 +104,27 @@ def _load_current_constituents(s3, bucket: str) -> set[str]: return set(tickers) -def _load_full_cache(s3, bucket: str, prefix: str = "predictor/price_cache/") -> dict[str, pd.DataFrame]: - """Load all 10-year price cache parquets from S3 (concurrent).""" - keys = [] - paginator = s3.get_paginator("list_objects_v2") - for page in paginator.paginate(Bucket=bucket, Prefix=prefix): - for obj in page.get("Contents", []): - if obj["Key"].endswith(".parquet"): - keys.append(obj["Key"]) +def _load_full_cache(s3, bucket: str, prefix: str = PRICE_CACHE_LEGACY_PREFIX) -> dict[str, pd.DataFrame]: + """Load all 10-year price cache parquets from S3 (concurrent). + + Wave-3 reader migration (ROADMAP L1401): when ``prefix`` is the + production default the listing iterates both + ``reference/price_cache/`` (new) and ``predictor/price_cache/`` + (legacy) via :func:`list_price_cache_keys`, deduping by + ``{ticker}.parquet`` basename so each ticker is fetched once. + Custom prefixes (tests, ad-hoc invocations) opt out of the + fallback chain. + """ + keys = list_price_cache_keys(s3, bucket, prefix) if not keys: - log.error("No parquets found in s3://%s/%s", bucket, prefix) + log.error("No parquets found in s3://%s/%s (read-prefix chain)", bucket, prefix) return {} - log.info("Downloading %d full cache parquets from s3://%s/%s ...", len(keys), bucket, prefix) + log.info( + "Downloading %d full cache parquets from s3://%s/ (read-prefix chain anchored on %s) ...", + len(keys), bucket, prefix, + ) price_data: dict[str, pd.DataFrame] = {} errors = 0 diff --git a/builders/daily_append.py b/builders/daily_append.py index 99ae408..06926cd 100644 --- a/builders/daily_append.py +++ b/builders/daily_append.py @@ -49,6 +49,10 @@ from store.arctic_store import get_universe_lib, get_macro_lib, to_arctic_safe from store.parquet_loader import load_parquet_from_s3 +from builders._price_cache_writeboth import ( + PRICE_CACHE_LEGACY_PREFIX, + price_cache_read_prefixes, +) from validators.price_validator import ( ALL_ANOMALY_TYPES, DEFAULT_BLOCK_ANOMALY_TYPES, @@ -68,7 +72,11 @@ # not a numeric one — keeping OHLCV_COLS pure simplifies the rolling-stat # and feature-compute call sites that iterate it expecting numerics. PROVENANCE_COL = "source" -PRICE_CACHE_PREFIX = "predictor/price_cache/" +# Legacy single-prefix constant retained for backward-compat with any caller +# that imports it (audited 2026-05-19: only the local ``_load_parquet_warmup``, +# which now goes through ``price_cache_read_prefixes``). Wave-3 PR4 cutover +# deletes this constant in the same edit that flips the read helper. +PRICE_CACHE_PREFIX = PRICE_CACHE_LEGACY_PREFIX # Process the universe in chunks of this size through Phase 1+2 (read, # compute, write). The full-universe pass holds ~900 ticker histories in @@ -445,23 +453,40 @@ def _load_parquet_warmup(s3, bucket: str, ticker: str) -> pd.DataFrame | None: Returns None when the parquet doesn't exist (new constituent that hasn't been picked up by the weekly backfill yet). Hard-fails on any other error shape — NoSilentFails. + + Wave-3 reader migration (ROADMAP L1401): iterates the + ``price_cache_read_prefixes`` fallback chain — new prefix + (``reference/price_cache/``) consulted first, legacy + (``predictor/price_cache/``) is the soak-window safety net. + "Not found" means absent in BOTH prefixes; non-404 errors hard-fail + on the first prefix that raises (preserving NoSilentFails). """ - key = f"{PRICE_CACHE_PREFIX}{ticker}.parquet" - try: - df = load_parquet_from_s3(s3, bucket, key) - except ClientError as exc: - code = exc.response.get("Error", {}).get("Code", "") - if code in ("NoSuchKey", "404"): - return None - raise RuntimeError( - f"parquet-warmup read failed for {ticker} (bucket={bucket}, " - f"key={key}): {exc}" - ) from exc + df: pd.DataFrame | None = None + last_key: str | None = None + not_found = 0 + for prefix in price_cache_read_prefixes(PRICE_CACHE_PREFIX): + last_key = f"{prefix}{ticker}.parquet" + try: + df = load_parquet_from_s3(s3, bucket, last_key) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code in ("NoSuchKey", "404"): + not_found += 1 + continue + raise RuntimeError( + f"parquet-warmup read failed for {ticker} (bucket={bucket}, " + f"key={last_key}): {exc}" + ) from exc + break + + if df is None: + # Absent in every active prefix → genuine "not in price cache". + return None if df.empty or "Close" not in df.columns: raise RuntimeError( f"parquet-warmup for {ticker}: parquet exists but invalid shape " - f"(empty={df.empty}, cols={list(df.columns)[:6]})" + f"(empty={df.empty}, cols={list(df.columns)[:6]}, key={last_key})" ) return df diff --git a/collectors/slim_cache.py b/collectors/slim_cache.py index b0fa0e4..b4c98c1 100644 --- a/collectors/slim_cache.py +++ b/collectors/slim_cache.py @@ -17,12 +17,17 @@ import boto3 import pandas as pd +from builders._price_cache_writeboth import ( + PRICE_CACHE_LEGACY_PREFIX, + list_price_cache_keys, +) + logger = logging.getLogger(__name__) def collect( bucket: str, - full_cache_prefix: str = "predictor/price_cache/", + full_cache_prefix: str = PRICE_CACHE_LEGACY_PREFIX, slim_prefix: str = "predictor/price_cache_slim/", lookback_days: int = 730, dry_run: bool = False, @@ -32,8 +37,13 @@ def collect( Args: bucket: S3 bucket name - full_cache_prefix: S3 prefix for full 10y parquets - slim_prefix: S3 prefix for 2y slim parquets + full_cache_prefix: S3 prefix anchor for full 10y parquets. The + production default (``predictor/price_cache/``) iterates the + Wave-3 read fallback chain — ``reference/price_cache/`` first, + legacy second — via :func:`list_price_cache_keys`. Custom + prefixes opt out of the chain (single-prefix listing). + slim_prefix: S3 prefix for 2y slim parquets (Wave-3 unaffected; + ``predictor/price_cache_slim/`` retirement is the Wave-4 arc). lookback_days: calendar days of history to keep (default 730 = 2 years) dry_run: if True, count files but don't write @@ -48,8 +58,9 @@ def collect( with tempfile.TemporaryDirectory() as tmpdir: local_dir = Path(tmpdir) - # Download full cache parquets - parquet_keys = _list_parquets(s3, bucket, full_cache_prefix) + # Download full cache parquets (Wave-3 read-prefix chain: new first, + # legacy fallback during soak; deduped by ticker basename). + parquet_keys = list_price_cache_keys(s3, bucket, full_cache_prefix) if dry_run: logger.info("[dry-run] slim_cache: %d parquets would be sliced", len(parquet_keys)) @@ -127,14 +138,3 @@ def collect( if validation_results: result["validation"] = validation return result - - -def _list_parquets(s3, bucket: str, prefix: str) -> list[str]: - """List all .parquet keys under the given S3 prefix.""" - paginator = s3.get_paginator("list_objects_v2") - keys = [] - for page in paginator.paginate(Bucket=bucket, Prefix=prefix): - for obj in page.get("Contents", []): - if obj["Key"].endswith(".parquet"): - keys.append(obj["Key"]) - return keys diff --git a/tests/test_daily_append_parquet_warmup.py b/tests/test_daily_append_parquet_warmup.py index d7157f3..e2c61d2 100644 --- a/tests/test_daily_append_parquet_warmup.py +++ b/tests/test_daily_append_parquet_warmup.py @@ -268,4 +268,82 @@ def test_load_parquet_warmup_returns_valid_frame(monkeypatch): result = daily_append._load_parquet_warmup(mock_s3, "b", "AAPL") assert result is not None assert len(result) == 2 - assert "Close" in result.columns + + +# ── Wave-3 reader migration (ROADMAP L1401) ──────────────────────────────── + + +def test_load_parquet_warmup_prefers_new_prefix(monkeypatch): + """The default-prefix path consults ``reference/price_cache/`` FIRST. + During the Wave-3 soak both prefixes hold byte-equal copies, but the + new prefix is also the sole survivor post-PR4 cutover — exercising it + end-to-end during the soak is the migration's whole point. + """ + good_df = pd.DataFrame( + {"Open": [1.0], "High": [1.0], "Low": [1.0], "Close": [1.0], "Volume": [1]}, + index=pd.DatetimeIndex(["2026-04-20"]), + ) + seen_keys: list[str] = [] + + def _stub(_s3, _bucket, key): + seen_keys.append(key) + return good_df + + monkeypatch.setattr(daily_append, "load_parquet_from_s3", _stub) + + result = daily_append._load_parquet_warmup(MagicMock(), "b", "AAPL") + assert result is not None + # First (and only — break on success) key fetched is the new prefix. + assert seen_keys == ["reference/price_cache/AAPL.parquet"], ( + "Wave-3 read-prefix chain: ``reference/price_cache/`` must be " + "tried before the legacy fallback." + ) + + +def test_load_parquet_warmup_falls_back_to_legacy_on_new_prefix_miss(monkeypatch): + """When the new prefix is empty (e.g. the soak-window backfill hasn't + seeded a brand-new ticker yet) the legacy fallback is consulted; if + legacy has the parquet the helper returns it. + """ + good_df = pd.DataFrame( + {"Open": [1.0], "High": [1.0], "Low": [1.0], "Close": [1.0], "Volume": [1]}, + index=pd.DatetimeIndex(["2026-04-20"]), + ) + calls: list[str] = [] + + def _stub(_s3, _bucket, key): + calls.append(key) + if key.startswith("reference/"): + raise _fake_client_error("NoSuchKey") + return good_df + + monkeypatch.setattr(daily_append, "load_parquet_from_s3", _stub) + + result = daily_append._load_parquet_warmup(MagicMock(), "b", "AAPL") + assert result is not None + assert calls == [ + "reference/price_cache/AAPL.parquet", + "predictor/price_cache/AAPL.parquet", + ], "Fallback order must be new → legacy (read = write reversed)." + + +def test_load_parquet_warmup_returns_none_when_absent_in_both_prefixes(monkeypatch): + """A ticker absent from BOTH prefixes is a genuine "not in price + cache yet" state (brand-new constituent the weekly backfill hasn't + picked up). Both prefix lookups must be attempted before the helper + degrades to None — a single-prefix NoSuchKey is no longer sufficient. + """ + calls: list[str] = [] + + def _stub(_s3, _bucket, key): + calls.append(key) + raise _fake_client_error("NoSuchKey") + + monkeypatch.setattr(daily_append, "load_parquet_from_s3", _stub) + + result = daily_append._load_parquet_warmup(MagicMock(), "b", "NEWIPO") + assert result is None + assert len(calls) == 2, ( + "When the new prefix is missing the helper must still try legacy " + "before declaring the ticker absent." + ) diff --git a/tests/test_price_cache_writeboth.py b/tests/test_price_cache_writeboth.py index 89258fa..7f1019d 100644 --- a/tests/test_price_cache_writeboth.py +++ b/tests/test_price_cache_writeboth.py @@ -26,6 +26,7 @@ from builders._price_cache_writeboth import ( PRICE_CACHE_LEGACY_PREFIX, PRICE_CACHE_NEW_PREFIX, + list_price_cache_keys, price_cache_read_prefixes, price_cache_write_prefixes, ) @@ -99,6 +100,88 @@ def test_read_helper_custom_prefix_returns_single(): assert out == [custom] +# --------------------------------------------------------------------------- +# list_price_cache_keys — aggregate-listing helper (PR3-wave-2) +# --------------------------------------------------------------------------- + + +def _make_paginator(pages_by_prefix: dict[str, list[list[dict]]]): + """Build a paginator double that returns per-prefix pages. + + Each ``pages_by_prefix[prefix]`` is a list of page dicts' ``Contents`` + arrays; each Content entry is ``{"Key": "..."}``. + """ + + class _Paginator: + def paginate(self, *, Bucket: str, Prefix: str): + pages = pages_by_prefix.get(Prefix, []) + for contents in pages: + yield {"Contents": contents} + + s3 = MagicMock() + s3.get_paginator.return_value = _Paginator() + return s3 + + +def test_list_price_cache_keys_default_iterates_new_then_legacy(): + """Aggregate listing under the production default consults the new + prefix first then the legacy prefix; tickers present in BOTH only + surface once (first-prefix-wins, deduped by basename). + """ + new_keys = [{"Key": f"{PRICE_CACHE_NEW_PREFIX}AAPL.parquet"}, + {"Key": f"{PRICE_CACHE_NEW_PREFIX}MSFT.parquet"}] + legacy_keys = [{"Key": f"{PRICE_CACHE_LEGACY_PREFIX}AAPL.parquet"}, + {"Key": f"{PRICE_CACHE_LEGACY_PREFIX}MSFT.parquet"}] + s3 = _make_paginator({ + PRICE_CACHE_NEW_PREFIX: [new_keys], + PRICE_CACHE_LEGACY_PREFIX: [legacy_keys], + }) + + out = list_price_cache_keys(s3, "alpha-engine-research") + # AAPL + MSFT each appear exactly once, both anchored on the new prefix. + assert out == [ + f"{PRICE_CACHE_NEW_PREFIX}AAPL.parquet", + f"{PRICE_CACHE_NEW_PREFIX}MSFT.parquet", + ], "First-prefix-wins must dedupe by {ticker}.parquet basename." + + +def test_list_price_cache_keys_falls_back_to_legacy_for_missing_basenames(): + """When the new prefix is partially populated (soak-window backfill + hasn't picked up every ticker yet), legacy fills the gaps so the + aggregate set stays complete — the whole point of keeping the + legacy fallback live during the soak. + """ + # Only AAPL has been mirrored to new yet; MSFT still legacy-only. + new_keys = [{"Key": f"{PRICE_CACHE_NEW_PREFIX}AAPL.parquet"}] + legacy_keys = [{"Key": f"{PRICE_CACHE_LEGACY_PREFIX}AAPL.parquet"}, + {"Key": f"{PRICE_CACHE_LEGACY_PREFIX}MSFT.parquet"}] + s3 = _make_paginator({ + PRICE_CACHE_NEW_PREFIX: [new_keys], + PRICE_CACHE_LEGACY_PREFIX: [legacy_keys], + }) + + out = list_price_cache_keys(s3, "alpha-engine-research") + # AAPL from new (first-wins), MSFT from legacy (gap-fill). + assert out == [ + f"{PRICE_CACHE_NEW_PREFIX}AAPL.parquet", + f"{PRICE_CACHE_LEGACY_PREFIX}MSFT.parquet", + ] + + +def test_list_price_cache_keys_custom_prefix_opts_out_of_chain(): + """A non-default prefix opts out of the fallback chain (mirrors the + single-key helper semantics): only the explicit prefix is listed. + """ + custom = "some/other/prefix/" + keys = [{"Key": f"{custom}XYZ.parquet"}] + s3 = _make_paginator({custom: [keys]}) + + out = list_price_cache_keys(s3, "b", custom) + assert out == [f"{custom}XYZ.parquet"] + # And neither leg of the production chain was consulted. + s3.get_paginator.return_value # noqa — no further-prefix assertion needed; pages dict carries it. + + # --------------------------------------------------------------------------- # collectors/prices.py — yfinance refresh upload # ---------------------------------------------------------------------------