From 11829d9718e058037be4a30d4f8bf83da5ca0443 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 19 May 2026 14:47:24 -0700 Subject: [PATCH] feat(price-cache): Wave 3 PR1 producer write-both predictor/ -> reference/ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ROADMAP P1 "predictor/ S3 namespace rationalization Wave 3" — start the write-both soak that migrates the 10y price_cache parquet tree from predictor/price_cache/ (under the predictor module's namespace) to reference/price_cache/ (long-lived data-module references). Mirrors the shape of Wave 1's predictor/daily_closes/ -> staging/daily_closes/ but uses write-both + soak instead of hard-cutover because this writer only rewrites STALE tickers — a hard cut would leave fresh tickers in legacy and the new prefix incomplete for a full yfinance refresh cycle. CLAUDE.md S3 Contract Safety mandates the write-both + >=1 week soak for any path change of this shape. ## What ships in PR1 (producer-side only — zero reader changes) - builders/_price_cache_writeboth.py (new): the single chokepoint. `price_cache_write_prefixes(primary)` returns [legacy, new] for the production default and [primary] for any custom string. Legacy ordered first so a fail-loud on the legacy write preserves pre-Wave-3 failure semantics — the new prefix never silently masks a legacy write error. - collectors/prices.py: yfinance refresh upload now writes both prefixes. - collectors/fred_history.py: FRED backfill upload now writes both prefixes. - weekly_collector.py: chronic-gap self-heal patch writes both prefixes (the get_object read stays on legacy since readers haven't migrated). - infrastructure/backfill_reference_price_cache.sh (new): one-shot `aws s3 sync` operator script to seed reference/price_cache/ with the ~934 objects currently in predictor/price_cache/. Idempotent; --dry-run supported. Run ONCE as part of PR1's deploy. - tests/test_price_cache_writeboth.py (new, 7 tests): helper contract (legacy default returns both, custom returns single, ordering pinned) + each of the 3 production writers exercised end-to-end with stubbed s3 + recording asserts that BOTH keys land per ticker with identical bodies. - tests/test_fred_history_fetcher.py: updated the pre-existing test_uploads_to_s3_when_not_dry_run from asserting a single upload to asserting write-both behavior. Required by zero-tolerance test policy. ## What does NOT ship in PR1 - Reader migrations: ~10 read sites across alpha-engine-data, alpha-engine-predictor, alpha-engine-backtester, alpha-engine-dashboard stay on the legacy prefix. PR3+ migrates them with legacy fallback. - IAM grant expansion to cover reference/price_cache/* — PR2 mirrors Wave 1 #120's IAM pattern on the alpha-engine repo's alpha-engine-s3-access.json. - builders/daily_append.py:_load_parquet_warmup (reader, not writer) — migrates in PR3. - sector_map.json (separate concern — write-once-per-Saturday, not part of the stale-ticker churn). Handled at cutover or PR3. - The cutover itself: PR4 will flip primary -> reference/, drop the legacy entry from price_cache_write_prefixes, retire reader fallbacks, and `aws s3 rm --recursive` the legacy prefix. Gated on >=1 week of clean write-both observation. ## Soak contract PR1 merge -> deploy this commit live -> run the backfill script ONCE to seed the new prefix -> next Saturday SF firing's first write to both prefixes starts the soak clock -> after >=4 Saturday firings (matches Wave 4's discipline) with no parity divergence, PR3 reader migrations go in, then PR4 cutover. ## Tests pytest tests/ -q -> 1387 passed, 1 skipped, 0 failed Composes with: ROADMAP Wave 4 slim-deletion arc currently in flight (institutional pattern for data-tier prefix changes — dual-read / dual-write + lib reconcile observation), Wave 1 PR #112 (template), S3 Contract Safety in CLAUDE.md. Co-Authored-By: Claude Opus 4.7 (1M context) --- builders/_price_cache_writeboth.py | 69 +++++ collectors/fred_history.py | 26 +- collectors/prices.py | 9 +- .../backfill_reference_price_cache.sh | 68 +++++ tests/test_fred_history_fetcher.py | 22 +- tests/test_price_cache_writeboth.py | 269 ++++++++++++++++++ weekly_collector.py | 18 +- 7 files changed, 461 insertions(+), 20 deletions(-) create mode 100644 builders/_price_cache_writeboth.py create mode 100755 infrastructure/backfill_reference_price_cache.sh create mode 100644 tests/test_price_cache_writeboth.py diff --git a/builders/_price_cache_writeboth.py b/builders/_price_cache_writeboth.py new file mode 100644 index 0000000..8102004 --- /dev/null +++ b/builders/_price_cache_writeboth.py @@ -0,0 +1,69 @@ +"""Wave 3 PR1 — additive write-both helper for the price_cache prefix migration. + +ROADMAP P1 "`predictor/` S3 namespace rationalization Wave 3": migrate the 10y +``predictor/price_cache/*.parquet`` tree to ``reference/price_cache/`` so the +``predictor/`` namespace ends up owning ONLY predictor-module outputs +(``weights/``, ``predictions/``, ``metrics/``) and ``reference/`` collects +long-lived data-module references. Mirrors the Wave 1 ``predictor/daily_closes/`` +→ ``staging/daily_closes/`` arc but uses write-both + soak (instead of Wave 1's +hard cutover) because this writer rewrites only STALE tickers — a hard cut +would leave fresh tickers in legacy and the new prefix incomplete for an +entire yfinance refresh cycle. CLAUDE.md S3 Contract Safety mandates the +write-both for any path change. + +Soak contract: + Wave 3 PR1 (this PR) → both prefixes written, every reader stays on legacy. + ≥1 week soak after PR1's first Saturday SF lands writes to both prefixes. + Wave 3 PR3+ → reader migrations with legacy fallback in 4 repos. + Wave 3 PR4 (cutover) → swap primary to ``reference/``, drop legacy from this + helper's return list, retire fallbacks, ``aws s3 rm`` legacy prefix. + +The helper is the single chokepoint for write-both — three writers call into +it (``collectors/prices.py`` yfinance refresh, ``collectors/fred_history.py`` +FRED backfill, ``weekly_collector.py`` chronic-gap self-heal patch). Adding +a future writer requires no per-call-site discipline beyond wrapping the +upload in ``for prefix in price_cache_write_prefixes(s3_prefix): ...``. +""" + +from __future__ import annotations + +# Wave 3 PR1 — legacy primary, new mirror. The cutover PR (Wave 3 PR4) flips +# ``PRICE_CACHE_NEW_PREFIX`` to first position and removes the legacy entry +# from ``price_cache_write_prefixes`` (one-line edit at that time). +PRICE_CACHE_LEGACY_PREFIX = "predictor/price_cache/" +PRICE_CACHE_NEW_PREFIX = "reference/price_cache/" + + +def price_cache_write_prefixes(primary: str = PRICE_CACHE_LEGACY_PREFIX) -> list[str]: + """Return the active write prefixes for ticker parquet uploads. + + During the Wave 3 write-both soak (this PR through Wave 3 PR4 cutover): + + * ``primary == "predictor/price_cache/"`` (the production default) → + ``["predictor/price_cache/", "reference/price_cache/"]`` — every + ticker-parquet write hits both prefixes byte-for-byte. + * ``primary`` is any other string (custom prefix from a test or a + config override) → ``[primary]`` — single-write fallback. Tests + that need to assert a single-prefix write pass an explicit custom + prefix; production paths use the default and get write-both. + + Callers wrap their upload in:: + + for prefix in price_cache_write_prefixes(s3_prefix): + s3.upload_file(local_path, bucket, f"{prefix}{ticker}.parquet") + + The order is deterministic (legacy first, new second) so a fail-fast + upload error on the legacy prefix preserves the existing pre-Wave-3 + failure semantics — the new prefix never silently masks a legacy + write failure. + """ + if primary == PRICE_CACHE_LEGACY_PREFIX: + return [PRICE_CACHE_LEGACY_PREFIX, PRICE_CACHE_NEW_PREFIX] + return [primary] + + +__all__ = [ + "PRICE_CACHE_LEGACY_PREFIX", + "PRICE_CACHE_NEW_PREFIX", + "price_cache_write_prefixes", +] diff --git a/collectors/fred_history.py b/collectors/fred_history.py index 837584b..98547e7 100644 --- a/collectors/fred_history.py +++ b/collectors/fred_history.py @@ -34,6 +34,8 @@ import pandas as pd import requests +from builders._price_cache_writeboth import price_cache_write_prefixes + logger = logging.getLogger(__name__) _FRED_BASE = "https://api.stlouisfed.org/fred/series/observations" @@ -247,16 +249,20 @@ def backfill_to_s3( "last_date": ohlcv.index.max().date().isoformat(), } if not dry_run: - s3.upload_file( - str(parquet_path), - bucket, - f"{s3_prefix}{ticker}.parquet", - ) - logger.info( - "Uploaded s3://%s/%s%s.parquet (%d rows, %s → %s)", - bucket, s3_prefix, ticker, len(ohlcv), - results[ticker]["first_date"], results[ticker]["last_date"], - ) + # Wave 3 PR1: write-both to legacy ``predictor/price_cache/`` + # + new ``reference/price_cache/`` (see + # builders/_price_cache_writeboth.py for soak contract) + for prefix in price_cache_write_prefixes(s3_prefix): + s3.upload_file( + str(parquet_path), + bucket, + f"{prefix}{ticker}.parquet", + ) + logger.info( + "Uploaded s3://%s/%s%s.parquet (%d rows, %s → %s)", + bucket, prefix, ticker, len(ohlcv), + results[ticker]["first_date"], results[ticker]["last_date"], + ) except Exception as e: logger.error("Backfill failed for %s (%s): %s", ticker, series_id, e) results[ticker] = {"status": "error", "error": str(e)} diff --git a/collectors/prices.py b/collectors/prices.py index e5b2bb0..c1fcaa0 100644 --- a/collectors/prices.py +++ b/collectors/prices.py @@ -28,6 +28,8 @@ import pandas as pd import yfinance as yf +from builders._price_cache_writeboth import price_cache_write_prefixes + logger = logging.getLogger(__name__) # Tickers that require a leading caret in yfinance (not available on polygon) @@ -218,10 +220,13 @@ def _refresh_stale( new_df.index = idx new_df = new_df.sort_index() - # Write locally and upload + # Write locally and upload (Wave 3 PR1: write-both to legacy + # ``predictor/price_cache/`` + new ``reference/price_cache/``; + # see builders/_price_cache_writeboth.py for soak contract) parquet_path = local_dir / f"{ticker}.parquet" new_df.to_parquet(parquet_path, engine="pyarrow", compression="snappy") - s3.upload_file(str(parquet_path), bucket, f"{s3_prefix}{ticker}.parquet") + for prefix in price_cache_write_prefixes(s3_prefix): + s3.upload_file(str(parquet_path), bucket, f"{prefix}{ticker}.parquet") refreshed += 1 except Exception as e: diff --git a/infrastructure/backfill_reference_price_cache.sh b/infrastructure/backfill_reference_price_cache.sh new file mode 100755 index 0000000..9bf6381 --- /dev/null +++ b/infrastructure/backfill_reference_price_cache.sh @@ -0,0 +1,68 @@ +#!/usr/bin/env bash +# infrastructure/backfill_reference_price_cache.sh — Wave 3 PR1 one-shot seed. +# +# Copies every object under ``s3://alpha-engine-research/predictor/price_cache/`` +# to ``s3://alpha-engine-research/reference/price_cache/`` byte-for-byte. Run +# ONCE as part of the PR1 deploy — the producer write-both in +# ``collectors/prices.py`` + ``collectors/fred_history.py`` + +# ``weekly_collector.py`` only mirrors writes for STALE tickers, so the new +# prefix needs this initial seed before the write-both soak clock starts. +# +# Idempotent. ``aws s3 sync`` skips objects that already match by size + +# last-modified, so re-running is safe and incremental. +# +# Usage: +# bash infrastructure/backfill_reference_price_cache.sh # real copy +# bash infrastructure/backfill_reference_price_cache.sh --dry-run # plan only + +set -euo pipefail + +BUCKET="${ALPHA_ENGINE_BUCKET:-alpha-engine-research}" +LEGACY_PREFIX="predictor/price_cache/" +NEW_PREFIX="reference/price_cache/" + +DRY_RUN="" +if [[ "${1:-}" == "--dry-run" ]]; then + DRY_RUN="--dryrun" + echo "[backfill] DRY-RUN: planning copy without writing" +fi + +echo "[backfill] Wave 3 PR1 seed: ${LEGACY_PREFIX} -> ${NEW_PREFIX} on s3://${BUCKET}" + +# Pre-flight: legacy must exist + have objects +LEGACY_COUNT=$(aws s3 ls "s3://${BUCKET}/${LEGACY_PREFIX}" --recursive --summarize \ + | awk '/Total Objects:/ {print $3}') +if [[ -z "${LEGACY_COUNT}" || "${LEGACY_COUNT}" -lt 100 ]]; then + echo "[backfill] ERROR: legacy prefix has ${LEGACY_COUNT:-0} objects (expected >100)." >&2 + echo "[backfill] refusing to seed an empty/sparse mirror." >&2 + exit 1 +fi +echo "[backfill] legacy: ${LEGACY_COUNT} objects" + +# aws s3 sync: copy with delta-only semantics (skips objects matching by +# size + mtime). The first run copies everything; subsequent runs are +# fast no-ops unless the legacy prefix has been refreshed. +aws s3 sync \ + "s3://${BUCKET}/${LEGACY_PREFIX}" \ + "s3://${BUCKET}/${NEW_PREFIX}" \ + --only-show-errors \ + ${DRY_RUN} + +if [[ -n "${DRY_RUN}" ]]; then + echo "[backfill] DRY-RUN complete — no objects written." + exit 0 +fi + +# Post-flight: parity check on object counts +NEW_COUNT=$(aws s3 ls "s3://${BUCKET}/${NEW_PREFIX}" --recursive --summarize \ + | awk '/Total Objects:/ {print $3}') +echo "[backfill] new prefix: ${NEW_COUNT} objects (legacy: ${LEGACY_COUNT})" + +if [[ "${NEW_COUNT}" -lt "${LEGACY_COUNT}" ]]; then + echo "[backfill] WARN: new prefix has fewer objects than legacy." >&2 + echo "[backfill] delta=${LEGACY_COUNT} - ${NEW_COUNT}. Re-run to converge." >&2 + exit 2 +fi + +echo "[backfill] OK — reference/price_cache/ seeded. Soak clock starts on the" +echo "[backfill] next Saturday SF firing's first write to both prefixes." diff --git a/tests/test_fred_history_fetcher.py b/tests/test_fred_history_fetcher.py index 69b019f..f4294d6 100644 --- a/tests/test_fred_history_fetcher.py +++ b/tests/test_fred_history_fetcher.py @@ -273,6 +273,9 @@ def maybe_fail(series_id, period_years=10, api_key=None): assert result["per_ticker"]["HYOAS"]["status"] == "ok" def test_uploads_to_s3_when_not_dry_run(self): + """Wave 3 PR1: each FRED-sourced ticker writes to BOTH legacy + (``predictor/price_cache/``) and new (``reference/price_cache/``) + prefixes. PR4 cutover will flip this back to a single key.""" with self._patch_fetch_returning(), \ patch("collectors.fred_history.boto3.client") as mock_boto: mock_s3 = MagicMock() @@ -283,8 +286,17 @@ def test_uploads_to_s3_when_not_dry_run(self): dry_run=False, ) assert result["dry_run"] is False - # upload_file called with (local_path, bucket, key) - assert mock_s3.upload_file.called - call = mock_s3.upload_file.call_args - assert call[0][1] == "test-bucket" - assert call[0][2] == "predictor/price_cache/TWO.parquet" + # upload_file called twice — once per write-both prefix + assert mock_s3.upload_file.call_count == 2 + keys = sorted( + call.args[2] for call in mock_s3.upload_file.call_args_list + ) + assert keys == [ + "predictor/price_cache/TWO.parquet", + "reference/price_cache/TWO.parquet", + ] + # Both writes hit the same bucket + same local file + buckets = {call.args[1] for call in mock_s3.upload_file.call_args_list} + assert buckets == {"test-bucket"} + local_paths = {call.args[0] for call in mock_s3.upload_file.call_args_list} + assert len(local_paths) == 1 # same local parquet body for both uploads diff --git a/tests/test_price_cache_writeboth.py b/tests/test_price_cache_writeboth.py new file mode 100644 index 0000000..ae658b6 --- /dev/null +++ b/tests/test_price_cache_writeboth.py @@ -0,0 +1,269 @@ +"""Wave 3 PR1 — producer-side write-both regression suite. + +Covers: + * The ``price_cache_write_prefixes`` helper itself (legacy default → both, + custom prefix → single). + * Each of the three production writers (``collectors/prices.py``, + ``collectors/fred_history.py``, ``weekly_collector._patch_chronic_gap_ticker`` + via its module-scoped chronic-gap path) calls into the helper and ends up + putting ticker parquets at BOTH the legacy and new prefix, with identical + bodies and key shape. + +These tests pin the Wave 3 write-both contract so a future "delete the legacy +write" refactor can't quietly skip a writer — every active prod path is +exercised. PR4 cutover edits the helper + flips these tests to expect a +single-prefix write. +""" + +from __future__ import annotations + +import io +from unittest.mock import MagicMock + +import pandas as pd +import pytest + +from builders._price_cache_writeboth import ( + PRICE_CACHE_LEGACY_PREFIX, + PRICE_CACHE_NEW_PREFIX, + price_cache_write_prefixes, +) + + +# --------------------------------------------------------------------------- +# Helper contract +# --------------------------------------------------------------------------- + + +def test_default_returns_both_prefixes_legacy_first(): + """Production default → write-both with legacy ordered first. + + Order matters: legacy first so a permission/quota failure on the legacy + prefix preserves pre-Wave-3 fail-loud semantics — the new prefix never + silently masks a legacy write failure. + """ + out = price_cache_write_prefixes() + assert out == [PRICE_CACHE_LEGACY_PREFIX, PRICE_CACHE_NEW_PREFIX] + + +def test_explicit_legacy_returns_both_prefixes(): + """Callers that pass the legacy prefix explicitly get the same result as + callers that use the default — protects against config-layer regressions + where ``s3_prefix`` is read from yaml and matches the legacy string.""" + out = price_cache_write_prefixes(PRICE_CACHE_LEGACY_PREFIX) + assert out == [PRICE_CACHE_LEGACY_PREFIX, PRICE_CACHE_NEW_PREFIX] + + +def test_custom_prefix_returns_single(): + """A test/config-override prefix (anything other than legacy) gets + single-write behavior — Wave 3 write-both only mirrors the legacy + production path.""" + custom = "some/other/prefix/" + out = price_cache_write_prefixes(custom) + assert out == [custom] + + +def test_new_prefix_is_not_legacy(): + """Sanity guard against a future copy-paste regression that aliases the + two constants.""" + assert PRICE_CACHE_NEW_PREFIX != PRICE_CACHE_LEGACY_PREFIX + assert PRICE_CACHE_NEW_PREFIX.startswith("reference/") + assert PRICE_CACHE_LEGACY_PREFIX.startswith("predictor/") + + +# --------------------------------------------------------------------------- +# collectors/prices.py — yfinance refresh upload +# --------------------------------------------------------------------------- + + +def test_prices_refresh_uploads_to_both_prefixes(monkeypatch, tmp_path): + """``_refresh_stale_tickers`` ends each successful per-ticker branch with + an ``s3.upload_file`` — Wave 3 wraps that in a write-both loop. We exercise + the success path with stubbed yfinance + a recording S3 client and assert + BOTH keys land for every refreshed ticker.""" + from collectors import prices + + # Stub yfinance to return a deterministic single-ticker frame + idx = pd.date_range("2026-04-01", periods=10, freq="B") + fake_df = pd.DataFrame( + {"Open": 100.0, "High": 101.0, "Low": 99.0, "Close": 100.5, "Volume": 1_000}, + index=idx, + ) + + def fake_download(**_kwargs): + return fake_df.copy() + + monkeypatch.setattr(prices.yf, "download", fake_download) + + recorded: list[tuple[str, str]] = [] + + class _RecordingS3: + def upload_file(self, _local, bucket, key): + recorded.append((bucket, key)) + + s3 = _RecordingS3() + refreshed, failed = prices._refresh_stale( + s3=s3, + bucket="test-bucket", + s3_prefix=PRICE_CACHE_LEGACY_PREFIX, + stale=["AAPL"], + fetch_period="10y", + batch_size=10, + ) + + assert refreshed == 1 + assert failed == [] + # Both prefixes hit, same ticker, same bucket + keys = sorted(k for _b, k in recorded) + assert keys == [ + f"{PRICE_CACHE_LEGACY_PREFIX}AAPL.parquet", + f"{PRICE_CACHE_NEW_PREFIX}AAPL.parquet", + ] + assert all(b == "test-bucket" for b, _ in recorded) + + +# --------------------------------------------------------------------------- +# collectors/fred_history.py — FRED backfill upload +# --------------------------------------------------------------------------- + + +def test_fred_backfill_uploads_to_both_prefixes(monkeypatch): + """``backfill_to_s3`` uploads each FRED-sourced ticker parquet via + ``s3.upload_file``. Wave 3 wraps that in a write-both loop.""" + from collectors import fred_history + + # Stub the FRED HTTP path — return a deterministic OHLCV frame + idx = pd.date_range("2020-01-01", periods=20, freq="B") + fake_ohlcv = pd.DataFrame( + { + "Open": 1.0, "High": 1.0, "Low": 1.0, "Close": 1.0, + "Adj_Close": 1.0, "Volume": 0, "VWAP": None, "source": "fred", + }, + index=idx, + ) + monkeypatch.setattr( + fred_history, "fetch_fred_history", lambda *_args, **_kw: fake_ohlcv, + ) + monkeypatch.setattr( + fred_history, "fred_history_to_ohlcv", lambda df: df, + ) + + recorded: list[tuple[str, str]] = [] + + class _RecordingS3: + def upload_file(self, _local, bucket, key): + recorded.append((bucket, key)) + + monkeypatch.setattr( + fred_history.boto3, "client", lambda _svc: _RecordingS3(), + ) + + out = fred_history.backfill_to_s3( + bucket="test-bucket", + s3_prefix=PRICE_CACHE_LEGACY_PREFIX, + tickers=["TWO"], + period_years=5, + dry_run=False, + ) + + assert out["status"] == "ok" + assert out["refreshed"] == 1 + keys = sorted(k for _b, k in recorded) + assert keys == [ + f"{PRICE_CACHE_LEGACY_PREFIX}TWO.parquet", + f"{PRICE_CACHE_NEW_PREFIX}TWO.parquet", + ] + + +# --------------------------------------------------------------------------- +# weekly_collector — chronic-gap self-heal patch +# --------------------------------------------------------------------------- + + +def test_weekly_chronic_gap_self_heal_writes_both_prefixes(monkeypatch): + """``_self_heal_chronic_polygon_gaps`` reads the legacy parquet for the + existing-rows union, then PUTs the combined frame back. Wave 3 PR1 sends + the PUT to both prefixes with identical body bytes; the GET stays on + legacy until reader migration (PR3+).""" + import weekly_collector as wc + + target_date = "2026-05-12" + target_ts = pd.Timestamp(target_date).normalize() + + # yfinance.download is locally re-imported inside the helper as ``_yf``; + # monkeypatching the module attribute pre-call rebinds the name yfinance + # resolves to. + import yfinance as yf + + idx = pd.bdate_range(target_ts - pd.Timedelta(days=10), target_ts) + new_rows_df = pd.DataFrame( + {"Open": 1.0, "High": 1.0, "Low": 1.0, "Close": 1.0, "Volume": 100}, + index=idx, + ) + + def _fake_download(*_a, **_kw): + return new_rows_df.copy() + + monkeypatch.setattr(yf, "download", _fake_download) + + # ArcticDB universe lib: tail returns a stale last_date so the heal + # branch runs. + class _StaleTail: + data = pd.DataFrame(index=[pd.Timestamp("2026-04-01")]) + + class _FakeUniverseLib: + def tail(self, _ticker, n=1): + return _StaleTail() + + monkeypatch.setattr( + "store.arctic_store.get_universe_lib", + lambda _bucket: _FakeUniverseLib(), + ) + + # S3 client: NoSuchKey on the legacy GET (no prior parquet), recording + # put_object so we can assert write-both. + put_calls: list[dict] = [] + + class _FakeS3Exceptions: + class NoSuchKey(Exception): + pass + + class _FakeS3: + exceptions = _FakeS3Exceptions + + def get_object(self, **_kw): + raise _FakeS3Exceptions.NoSuchKey("no prior parquet") + + def put_object(self, **kw): + put_calls.append(kw) + + monkeypatch.setattr(wc.boto3, "client", lambda _svc: _FakeS3()) + + # builders.backfill is called after the put — stub so the test doesn't + # touch ArcticDB / S3 again. + monkeypatch.setattr( + "builders.backfill.backfill", lambda **_kw: {"status": "ok"}, + ) + + summary = wc._self_heal_chronic_polygon_gaps( + bucket="test-bucket", + target_date=target_date, + chronic_tickers=["PSTG"], + dry_run=False, + ) + + assert summary["errors"] == [], summary + assert len(summary["healed"]) == 1, summary + + # Both prefixes hit with the same ticker key, bodies identical + pcache_keys = sorted( + c["Key"] for c in put_calls if c["Key"].endswith("/PSTG.parquet") + ) + assert pcache_keys == [ + f"{PRICE_CACHE_LEGACY_PREFIX}PSTG.parquet", + f"{PRICE_CACHE_NEW_PREFIX}PSTG.parquet", + ], pcache_keys + + bodies = [c["Body"] for c in put_calls if c["Key"].endswith("/PSTG.parquet")] + assert len(bodies) == 2 + assert bodies[0] == bodies[1] diff --git a/weekly_collector.py b/weekly_collector.py index 11db7da..3c846ae 100644 --- a/weekly_collector.py +++ b/weekly_collector.py @@ -74,6 +74,9 @@ def _load_dotenv() -> None: ) from collectors import constituents, prices, slim_cache, macro, universe_returns, signal_returns, alternative, daily_closes, fundamentals, short_interest +from builders._price_cache_writeboth import ( + price_cache_write_prefixes as _price_cache_write_prefixes, +) logger = logging.getLogger(__name__) @@ -645,7 +648,9 @@ def _self_heal_chronic_polygon_gaps( after the first heal lands). 3. Else yfinance-fetch ``[last_date+1, target_date]`` OHLCV. 4. Append the new rows to ``predictor/price_cache/{ticker}.parquet`` - (dedupe by date keep="last" so repeated heals are idempotent). + (dedupe by date keep="last" so repeated heals are idempotent). Wave 3 + PR1 mirrors the put to ``reference/price_cache/{ticker}.parquet`` via + the ``_price_cache_write_prefixes`` helper. 5. Invoke ``builders.backfill(ticker_filter=ticker)`` — reuses the per-ticker compute_features + ArcticDB write path so the new rows get the same feature schema as every other ticker. @@ -733,6 +738,10 @@ def _self_heal_chronic_polygon_gaps( ohlcv_cols = ["Open", "High", "Low", "Close", "Volume"] new_rows = yf_df[[c for c in ohlcv_cols if c in yf_df.columns]].copy() + # Wave 3 PR1: read from legacy (single source of truth during the + # write-both soak — readers haven't migrated yet); write to both + # legacy + new prefix on the put back (see + # builders/_price_cache_writeboth.py for soak contract) pcache_key = f"predictor/price_cache/{ticker}.parquet" try: obj = s3.get_object(Bucket=bucket, Key=pcache_key) @@ -748,8 +757,11 @@ def _self_heal_chronic_polygon_gaps( if not dry_run: buf = _io.BytesIO() combined_pcache.to_parquet(buf, engine="pyarrow", compression="snappy") - buf.seek(0) - s3.put_object(Bucket=bucket, Key=pcache_key, Body=buf.getvalue()) + body = buf.getvalue() + for _prefix in _price_cache_write_prefixes(): + s3.put_object( + Bucket=bucket, Key=f"{_prefix}{ticker}.parquet", Body=body + ) from builders.backfill import backfill as _backfill _backfill(bucket=bucket, ticker_filter=ticker, dry_run=False)