diff --git a/WAVE4_SLIM_DELETION_RUNBOOK.md b/WAVE4_SLIM_DELETION_RUNBOOK.md new file mode 100644 index 0000000..6bdf1c1 --- /dev/null +++ b/WAVE4_SLIM_DELETION_RUNBOOK.md @@ -0,0 +1,72 @@ +# Wave-4 — `predictor/price_cache_slim/` S3 prefix deletion runbook + +**Status: GATED — do NOT execute until the gate below passes.** + +This PR removes the slim *code* (writer, `load_slim_cache` API, consumer +fallbacks). The destructive S3 prefix deletion is a separate, manual, +gated step documented here so it is reviewable and auditable (it is **not** +run by CI or any pipeline). + +## Gate (all must hold before deletion) + +1. **This PR merged** — slim writer gone, every consumer ArcticDB-only + (data macro-breadth + feature-compute; backtester exit_timing #226; + dashboard health-check retired #88). +2. **≥1 clean Saturday-SF parity observation.** Before this PR merges, the + migrated consumers still ran with slim + emitted + `WAVE4_PARITY_METRIC {breadth,compute,exit_timing}` JSON lines. Pull the + most recent Saturday SF logs (first observable **2026-05-23**) and + confirm, for each of the three streams: + - `passed: true` + - `max_abs_value_delta` ≤ epsilon (effectively `0.0`) + - `only_in_b` empty (nothing the ArcticDB read lacked vs slim); + `only_in_a` (slim-only legacy symbols) is expected and acceptable. + If any stream shows value divergence, **stop** — investigate before + deleting the fallback's data. +3. **No remaining live reader.** `spot_backtest.sh:528`'s slim `aws s3 + sync` was verified dead (predictor_backtest.py loads from ArcticDB; + only `sector_map.json` is read from the cache dir) and is removed in + the backtester PR4. Re-confirm no new consumer via the terminal guard + `tests/test_wave4_slim_arctic_parity.py`. + +## Procedure (single-dev, paper-trading; bounded-reversible) + +``` +# 1. Pre-deletion byte-equal backup (Wave-5 precedent). +aws s3 cp --recursive \ + s3://alpha-engine-research/predictor/price_cache_slim/ \ + s3://alpha-engine-research/backups/price_cache_slim.pre-deletion-260523/ \ + --only-show-errors + +# 2. Verify the backup is byte-equal (object count + total bytes). +aws s3 ls --recursive --summarize \ + s3://alpha-engine-research/predictor/price_cache_slim/ \ + | tail -2 +aws s3 ls --recursive --summarize \ + s3://alpha-engine-research/backups/price_cache_slim.pre-deletion-260523/ \ + | tail -2 +# Object count + Total Size MUST match before proceeding. + +# 3. Delete the prefix. +aws s3 rm --recursive \ + s3://alpha-engine-research/predictor/price_cache_slim/ --only-show-errors + +# 4. Confirm empty. +aws s3 ls s3://alpha-engine-research/predictor/price_cache_slim/ \ + | wc -l # -> 0 +``` + +## Rollback + +`git revert` the Wave-4 PR(s) + `aws s3 cp --recursive` the backup prefix +back to `predictor/price_cache_slim/`. The slim writer resumes on the next +weekly SF. Worst case from a missed divergence in this single-dev +paper-trading context: degraded features for ~one week until noticed — no +capital or data-loss risk (per the CLAUDE.md severity posture). + +## Follow-ups (cosmetic, batch after deletion) + +- Dashboard architecture-page slim *labels* (`public/pages/2_Architecture.py`, + `pages/10_Architecture.py`) — descriptive topology text only. +- Comment-only slim mentions in `builders/backfill.py`, + `validators/price_validator.py` — historical context, harmless. diff --git a/collectors/macro.py b/collectors/macro.py index d5f6c3e..03f1cfb 100644 --- a/collectors/macro.py +++ b/collectors/macro.py @@ -26,9 +26,7 @@ import requests import yfinance as yf -from store.parquet_loader import load_slim_cache from alpha_engine_lib.arcticdb import load_universe_ohlcv -from alpha_engine_lib.reconcile import reconcile_frame_dicts logger = logging.getLogger(__name__) @@ -48,55 +46,24 @@ def _load_breadth_prices(bucket: str) -> Optional[dict]: - """Load the ~900-ticker price set for breadth — ArcticDB primary, - slim-cache fallback, parity-observed. - - Wave 4 of the predictor/price_cache_slim deletion arc. ArcticDB (via the - lib ``load_universe_ohlcv`` slim-equivalent reader) is the single source - of truth; the legacy ``predictor/price_cache_slim/`` parquet read is kept - as a fallback so breadth cannot break if ArcticDB is unavailable. While - both sources still exist we dual-read and emit a ``reconcile`` ParityReport - so the eventual slim deletion (PR4) is a data-driven cutover, not an - eyeballed one. The slim side — and this dual-read — are removed in PR4. - - Returns ``None`` only if BOTH sources fail (caller then omits the breadth - key, preserving the existing no-null contract). + """Load the ~900-ticker price set for breadth from the ArcticDB + universe library. + + Wave-4 terminal state: ``predictor/price_cache_slim/`` is deleted; + ArcticDB (via the lib ``load_universe_ohlcv`` reader) is the sole + source. The 5/23 parity observation confirmed slim<->ArcticDB + equivalence before the slim fallback + dual-read were removed here. + + Returns ``None`` if the ArcticDB read fails (caller then omits the + breadth key — the existing no-null contract; Research has its own + fallback). This matches the pre-Wave-4 behaviour when the single + price source was unavailable. """ - arctic_prices = None try: - arctic_prices = load_universe_ohlcv(bucket) - except Exception as exc: # noqa: BLE001 - fall back, don't break breadth + return load_universe_ohlcv(bucket) or None + except Exception as exc: # noqa: BLE001 - omit breadth, don't write null logger.warning("ArcticDB universe read for breadth failed: %s", exc) - - slim_prices = None - try: - slim_prices = load_slim_cache(boto3.client("s3"), bucket) - except Exception as exc: # noqa: BLE001 - parity/fallback only - logger.warning( - "Slim cache read for breadth (parity/fallback) failed: %s", exc - ) - - # SOTA observation: while both exist, emit the quantitative parity metric - # every run. Grep ``WAVE4_PARITY_METRIC breadth`` over the observation - # window before PR4 retires slim. - if arctic_prices and slim_prices: - report = reconcile_frame_dicts( - slim_prices, arctic_prices, value_cols=("Close",) - ) - logger.info("breadth slim<->arctic %s", report.summary()) - logger.info( - "WAVE4_PARITY_METRIC breadth %s", json.dumps(report.as_metrics()) - ) - - if arctic_prices: - return arctic_prices - if slim_prices: - logger.warning( - "breadth falling back to slim cache — ArcticDB universe " - "unavailable (Wave-4 migration fallback path)" - ) - return slim_prices - return None + return None def collect( diff --git a/collectors/slim_cache.py b/collectors/slim_cache.py deleted file mode 100644 index b0fa0e4..0000000 --- a/collectors/slim_cache.py +++ /dev/null @@ -1,140 +0,0 @@ -""" -slim_cache.py — Write 2-year slices of each price cache parquet to S3. - -Extracted from alpha-engine-predictor/training/train_handler.py:write_slim_cache(). - -The predictor inference Lambda downloads these slim parquets at 6:15 AM PT instead -of fetching 2 years from yfinance — reducing daily yfinance calls from ~450,000 -rows to at most a few hundred (the Mon-Fri delta from daily_closes/). -""" - -from __future__ import annotations - -import logging -import tempfile -from pathlib import Path - -import boto3 -import pandas as pd - -logger = logging.getLogger(__name__) - - -def collect( - bucket: str, - full_cache_prefix: str = "predictor/price_cache/", - slim_prefix: str = "predictor/price_cache_slim/", - lookback_days: int = 730, - dry_run: bool = False, -) -> dict: - """ - Download full price cache from S3, write 2-year slices to the slim prefix. - - Args: - bucket: S3 bucket name - full_cache_prefix: S3 prefix for full 10y parquets - slim_prefix: S3 prefix for 2y slim parquets - lookback_days: calendar days of history to keep (default 730 = 2 years) - dry_run: if True, count files but don't write - - Returns: - dict with status, written count, failed count, validation summary - """ - from validators.price_validator import validate_parquet - - s3 = boto3.client("s3") - cutoff = pd.Timestamp.now().normalize() - pd.Timedelta(days=lookback_days) - - with tempfile.TemporaryDirectory() as tmpdir: - local_dir = Path(tmpdir) - - # Download full cache parquets - parquet_keys = _list_parquets(s3, bucket, full_cache_prefix) - - if dry_run: - logger.info("[dry-run] slim_cache: %d parquets would be sliced", len(parquet_keys)) - return {"status": "ok_dry_run", "count": len(parquet_keys)} - - logger.info( - "Writing slim cache: %d parquets → s3://%s/%s (cutoff %s)", - len(parquet_keys), bucket, slim_prefix, cutoff.date(), - ) - - written = 0 - failed = 0 - validation_results: list[dict] = [] - - for s3_key in parquet_keys: - filename = s3_key.split("/")[-1] - if filename == "sector_map.json": - continue - - ticker = filename.replace(".parquet", "") - local_path = local_dir / filename - try: - s3.download_file(bucket, s3_key, str(local_path)) - - df = pd.read_parquet(local_path) - df.index = pd.to_datetime(df.index) - if df.index.tz is not None: - df.index = df.index.tz_convert("UTC").tz_localize(None) - - slim_df = df[df.index >= cutoff] - if slim_df.empty: - local_path.unlink(missing_ok=True) - continue - - # Validate the 2-year slice (what inference actually reads) - validation_results.append(validate_parquet(slim_df, ticker)) - - slim_path = local_dir / f"_slim_{filename}" - slim_df.to_parquet(slim_path, engine="pyarrow", compression="snappy") - - slim_key = f"{slim_prefix}{filename}" - s3.upload_file(str(slim_path), bucket, slim_key) - written += 1 - - # Cleanup - slim_path.unlink(missing_ok=True) - local_path.unlink(missing_ok=True) - - except Exception as e: - logger.warning("Slim cache write failed for %s: %s", filename, e) - failed += 1 - local_path.unlink(missing_ok=True) - - if failed > 0: - fail_pct = failed / max(len(parquet_keys), 1) * 100 - logger.warning("Slim cache: %d/%d failed (%.1f%%)", failed, len(parquet_keys), fail_pct) - - # Build validation summary - anomaly_tickers = [r for r in validation_results if r["status"] != "clean"] - validation = { - "total_validated": len(validation_results), - "clean": len(validation_results) - len(anomaly_tickers), - "anomalies": len(anomaly_tickers), - "anomaly_details": anomaly_tickers[:20], - } - if anomaly_tickers: - logger.warning("Slim cache validation: %d/%d tickers have anomalies", len(anomaly_tickers), len(validation_results)) - for r in anomaly_tickers[:10]: - logger.warning(" %s: %s", r["ticker"], "; ".join(r["anomalies"])) - else: - logger.info("Slim cache validation: all %d tickers clean", len(validation_results)) - - logger.info("Slim cache: %d / %d uploaded to s3://%s/%s", written, len(parquet_keys), bucket, slim_prefix) - result = {"status": "ok" if failed == 0 else "partial", "written": written, "failed": failed} - if validation_results: - result["validation"] = validation - return result - - -def _list_parquets(s3, bucket: str, prefix: str) -> list[str]: - """List all .parquet keys under the given S3 prefix.""" - paginator = s3.get_paginator("list_objects_v2") - keys = [] - for page in paginator.paginate(Bucket=bucket, Prefix=prefix): - for obj in page.get("Contents", []): - if obj["Key"].endswith(".parquet"): - keys.append(obj["Key"]) - return keys diff --git a/features/compute.py b/features/compute.py index 9c2d137..ad82d28 100644 --- a/features/compute.py +++ b/features/compute.py @@ -128,16 +128,12 @@ def _load_sector_map(s3, bucket: str) -> dict[str, str]: # normalized DataFrame shape without importing private helpers. Slim cache # (2y) is sufficient here — features only use the latest row and 2y gives # enough warmup for every indicator. -from store.parquet_loader import ( - load_parquet_from_s3 as _load_parquet_from_s3, - load_slim_cache as _load_slim_cache, -) +from store.parquet_loader import load_parquet_from_s3 as _load_parquet_from_s3 from alpha_engine_lib.arcticdb import ( load_universe_ohlcv, load_macro_series, open_macro_lib, ) -from alpha_engine_lib.reconcile import reconcile_frame_dicts def _safe_last_date(idx: pd.Index) -> pd.Timestamp | None: @@ -354,75 +350,42 @@ def _extract_macro( def _load_price_source(s3, bucket: str) -> dict | None: - """The ~full-universe price+macro symbol set — ArcticDB primary, - slim-cache fallback, parity-observed. + """The ~full-universe price+macro symbol set from ArcticDB. - Wave 4 of the predictor/price_cache_slim deletion arc, riskier sibling - of the macro-breadth migration: this feeds the ENTIRE feature-compute - pipeline (price_data) AND _extract_macro. The slim cache historically - carried equities + SPY + the index/macro series (VIX/VIX3M/TNX/IRX/ - GLD/USO) + the XL* sector ETFs in one flat dict. Those tenants are - split across two ArcticDB libs: + Wave-4 terminal state (predictor/price_cache_slim deleted). This feeds + the ENTIRE feature-compute pipeline (price_data) AND _extract_macro. + The set is the union of two ArcticDB libraries — the slim cache that + formerly carried them in one flat parquet dict no longer exists: - universe lib -> equities + SPY (load_universe_ohlcv) - - macro lib -> VIX.../XL* series (load_macro_series) - - so the ArcticDB-equivalent is the union of both reads. slim is kept as - a fallback for the whole set (feature compute cannot run blind) and, - while both still exist, every run dual-reads and emits a reconcile - ParityReport (grep ``WAVE4_PARITY_METRIC compute``) so PR4's slim - deletion is a data-driven cutover. The slim side is removed in PR4. - - require_ticker_match is False for the emitted report: the slim cache - legitimately carries some symbols the universe lib does not, so set - asymmetry is expected — it is logged in the metric fields for - visibility while ``passed`` reflects value fidelity over the overlap. - - Returns None only if BOTH sources fail (caller then returns empty, - preserving the existing no-data contract). + - macro lib -> VIX/VIX3M/TNX/IRX/GLD/USO + XL* sector ETFs + (load_macro_series; XL* discovered via + open_macro_lib().list_symbols()) + + The 5/23 parity observation (WAVE4_PARITY_METRIC compute) confirmed + slim<->ArcticDB equivalence over the overlap before the slim fallback + + dual-read were removed here. + + Returns None if the ArcticDB read fails (caller then returns empty — + the existing no-data contract; matches the pre-Wave-4 behaviour when + the single price source was unavailable). ``s3`` is retained in the + signature for caller compatibility but is no longer used. """ - combined: dict | None = None try: prices = load_universe_ohlcv(bucket) # equities + SPY macro_syms = set(_MACRO_SLIM_KEYS.values()) try: mlib = open_macro_lib(bucket) macro_syms |= { - s for s in mlib.list_symbols() if s.startswith("XL") + sym for sym in mlib.list_symbols() if sym.startswith("XL") } except Exception as exc: # noqa: BLE001 - XL* discovery best-effort log.warning("macro-lib symbol listing failed: %s", exc) macro_frames = load_macro_series(bucket, macro_syms) - merged = {**prices, **macro_frames} - combined = merged or None - except Exception as exc: # noqa: BLE001 - fall back, don't run blind + return {**prices, **macro_frames} or None + except Exception as exc: # noqa: BLE001 - return empty, don't run blind log.warning("ArcticDB universe/macro read failed: %s", exc) - - try: - slim_data = _load_slim_cache(s3, bucket) - except Exception as exc: # noqa: BLE001 - parity/fallback only - log.warning("slim cache read (parity/fallback) failed: %s", exc) - slim_data = None - - if combined and slim_data: - report = reconcile_frame_dicts( - slim_data, combined, value_cols=("Close",), - require_ticker_match=False, - ) - log.info("compute slim<->arctic %s", report.summary()) - log.info( - "WAVE4_PARITY_METRIC compute %s", json.dumps(report.as_metrics()) - ) - - if combined: - return combined - if slim_data: - log.warning( - "feature compute falling back to slim cache — ArcticDB " - "unavailable (Wave-4 migration fallback path)" - ) - return slim_data - return None + return None def _load_prices_and_macro( diff --git a/store/parquet_loader.py b/store/parquet_loader.py index 4b8307d..c64c527 100644 --- a/store/parquet_loader.py +++ b/store/parquet_loader.py @@ -1,23 +1,22 @@ """ -store/parquet_loader.py — Shared S3 parquet / slim-cache loading helpers. +store/parquet_loader.py — Shared single-parquet S3 loader helper. -Extracted from features/compute.py so that non-feature callers (e.g. the -macro collector's breadth computation) can reuse the same normalized -DataFrame shape without importing private helpers out of features.*. +Extracted from features/compute.py so that non-feature callers can reuse +the same normalized DataFrame shape without importing private helpers out +of features.*. The bulk slim-cache loader (load_slim_cache) was removed in +the Wave-4 predictor/price_cache_slim deletion — all price reads now go +through the ArcticDB universe/macro libs (alpha_engine_lib.arcticdb). """ from __future__ import annotations import io import logging -from concurrent.futures import ThreadPoolExecutor, as_completed import pandas as pd log = logging.getLogger(__name__) -SLIM_CACHE_PREFIX = "predictor/price_cache_slim/" - def load_parquet_from_s3(s3, bucket: str, key: str) -> pd.DataFrame: """Download a single parquet from S3 and return a normalized DataFrame. @@ -43,53 +42,3 @@ def load_parquet_from_s3(s3, bucket: str, key: str) -> pd.DataFrame: if isinstance(df.index, pd.DatetimeIndex) and not df.index.is_monotonic_increasing: df = df.sort_index() return df - - -def load_slim_cache( - s3, - bucket: str, - prefix: str = SLIM_CACHE_PREFIX, - max_workers: int = 20, -) -> dict[str, pd.DataFrame]: - """Load every parquet under `prefix` into a ticker -> DataFrame dict. - - Returns an empty dict if the prefix is empty. Individual ticker failures - are logged and skipped; the caller decides how to handle a partial load. - """ - keys: list[str] = [] - paginator = s3.get_paginator("list_objects_v2") - for page in paginator.paginate(Bucket=bucket, Prefix=prefix): - for obj in page.get("Contents", []): - if obj["Key"].endswith(".parquet"): - keys.append(obj["Key"]) - - if not keys: - log.warning("No parquets found in s3://%s/%s", bucket, prefix) - return {} - - log.info("Downloading %d slim cache parquets...", len(keys)) - - price_data: dict[str, pd.DataFrame] = {} - errors = 0 - - def _download(key: str) -> tuple[str, pd.DataFrame | None]: - ticker = key.split("/")[-1].replace(".parquet", "") - try: - df = load_parquet_from_s3(s3, bucket, key) - if df.empty: - return ticker, None - return ticker, df - except Exception: - return ticker, None - - with ThreadPoolExecutor(max_workers=max_workers) as pool: - futures = {pool.submit(_download, k): k for k in keys} - for fut in as_completed(futures): - ticker, df = fut.result() - if df is not None: - price_data[ticker] = df - else: - errors += 1 - - log.info("Slim cache loaded: %d tickers OK, %d errors", len(price_data), errors) - return price_data diff --git a/tests/test_compute_price_source.py b/tests/test_compute_price_source.py index 6c1cd17..cd4ef3f 100644 --- a/tests/test_compute_price_source.py +++ b/tests/test_compute_price_source.py @@ -50,60 +50,23 @@ def test_composes_universe_and_macro_when_arcticdb_available(monkeypatch): monkeypatch, universe=universe, macro_frames=macro_frames, macro_symbols=["VIX", "XLK", "features"], # 'features' must be ignored ) - monkeypatch.setattr(compute, "_load_slim_cache", lambda s3, b: {}) out = compute._load_price_source(s3=None, bucket="b") assert set(out) == {"AAPL", "SPY", "VIX", "XLK"} -def test_falls_back_to_slim_when_arcticdb_fails(monkeypatch, caplog): - def _boom(bucket): +def test_returns_none_when_arcticdb_fails(monkeypatch): + """No slim fallback post Wave-4: an ArcticDB failure -> None (caller + returns empty; matches pre-Wave-4 single-source-unavailable behaviour).""" + def _boom(*a, **k): raise RuntimeError("ArcticDB down") monkeypatch.setattr(compute, "load_universe_ohlcv", _boom) - slim = {"AAPL": _frame(), "VIX": _frame(start=18)} - monkeypatch.setattr(compute, "_load_slim_cache", lambda s3, b: slim) - - with caplog.at_level("WARNING"): - out = compute._load_price_source(s3=None, bucket="b") - assert set(out) == {"AAPL", "VIX"} - assert any("falling back to slim cache" in r.message for r in caplog.records) - - -def test_parity_metric_emitted_when_both_present(monkeypatch, caplog): - universe = {"AAPL": _frame()} - macro_frames = {"VIX": _frame(start=18)} - _stub_arctic( - monkeypatch, universe=universe, macro_frames=macro_frames, - macro_symbols=["VIX"], - ) - # slim carries the same data + an extra symbol the universe lib lacks; - # require_ticker_match=False -> set asymmetry is reported, not fatal. - slim = {"AAPL": _frame(), "VIX": _frame(start=18), "OLDSYM": _frame()} - monkeypatch.setattr(compute, "_load_slim_cache", lambda s3, b: slim) - - with caplog.at_level("INFO"): - compute._load_price_source(s3=None, bucket="b") - - lines = [ - r.message for r in caplog.records - if "WAVE4_PARITY_METRIC compute" in r.message - ] - assert len(lines) == 1 - import json - - payload = json.loads(lines[0].split("WAVE4_PARITY_METRIC compute ", 1)[1]) - assert payload["max_abs_value_delta"] == 0.0 # overlap identical - assert payload["passed"] is True # value fidelity holds - assert "OLDSYM" in payload["only_in_a"] # asymmetry visible - + assert compute._load_price_source(s3=None, bucket="b") is None -def test_returns_none_when_both_sources_fail(monkeypatch): - def _boom(*a, **k): - raise RuntimeError("down") - monkeypatch.setattr(compute, "load_universe_ohlcv", _boom) - monkeypatch.setattr(compute, "_load_slim_cache", _boom) +def test_returns_none_when_arcticdb_empty(monkeypatch): + _stub_arctic(monkeypatch, universe={}, macro_frames={}, macro_symbols=[]) assert compute._load_price_source(s3=None, bucket="b") is None diff --git a/tests/test_macro_breadth.py b/tests/test_macro_breadth.py index 4b74495..c6fba7a 100644 --- a/tests/test_macro_breadth.py +++ b/tests/test_macro_breadth.py @@ -72,62 +72,7 @@ def put_object(self, **kwargs): assert body["breadth"] is not None -def test_breadth_key_omitted_when_no_price_data_and_slim_cache_empty(monkeypatch): - """The critical regression: breadth must NEVER be serialized as null.""" - _stub_fetchers(monkeypatch) - - # Both sources empty (no ArcticDB symbols, no slim parquets in S3) - monkeypatch.setattr(macro, "load_universe_ohlcv", lambda *a, **k: {}) - monkeypatch.setattr(macro, "load_slim_cache", lambda s3, bucket: {}) - - written = {} - - class _FakeS3: - def put_object(self, **kwargs): - written.update(kwargs) - - monkeypatch.setattr(macro.boto3, "client", lambda service: _FakeS3()) - - result = macro.collect( - bucket="test-bucket", - run_date="2026-04-11", - ) - - assert result["status"] == "ok" - import json - body = json.loads(written["Body"]) - # breadth key must be ABSENT — not present with a null value. - assert "breadth" not in body - - -def test_breadth_key_omitted_when_slim_cache_load_raises(monkeypatch): - _stub_fetchers(monkeypatch) - - def _boom(s3, bucket): - raise RuntimeError("S3 unreachable") - - def _arctic_boom(*a, **k): - raise RuntimeError("ArcticDB unreachable") - - monkeypatch.setattr(macro, "load_universe_ohlcv", _arctic_boom) - monkeypatch.setattr(macro, "load_slim_cache", _boom) - - written = {} - - class _FakeS3: - def put_object(self, **kwargs): - written.update(kwargs) - - monkeypatch.setattr(macro.boto3, "client", lambda service: _FakeS3()) - - result = macro.collect(bucket="test-bucket", run_date="2026-04-11") - assert result["status"] == "ok" - import json - body = json.loads(written["Body"]) - assert "breadth" not in body - - -# ── Wave-4 migration: ArcticDB primary / slim fallback / parity emit ───────── +# ── Wave-4 terminal state: breadth reads ArcticDB only (slim deleted) ──────── def _universe(n=220): @@ -151,49 +96,35 @@ def put_object(self, **kwargs): return json.loads(written["Body"]) -def test_breadth_uses_arcticdb_when_available(monkeypatch): - """ArcticDB is primary — breadth computed from it even if slim is empty.""" +def test_breadth_uses_arcticdb(monkeypatch): + """ArcticDB universe lib is the sole price source for breadth.""" _stub_fetchers(monkeypatch) monkeypatch.setattr(macro, "load_universe_ohlcv", lambda *a, **k: _universe()) - monkeypatch.setattr(macro, "load_slim_cache", lambda s3, bucket: {}) body = _collect_body(monkeypatch) assert isinstance(body["breadth"], dict) assert body["breadth"]["n_stocks"] == 2 -def test_breadth_falls_back_to_slim_when_arcticdb_unavailable(monkeypatch, caplog): - """ArcticDB read fails -> slim fallback keeps breadth working.""" +def test_breadth_key_omitted_when_arcticdb_empty(monkeypatch): + """The critical regression: breadth must NEVER be serialized as null — + an empty ArcticDB read omits the key (Research has its own fallback).""" _stub_fetchers(monkeypatch) + monkeypatch.setattr(macro, "load_universe_ohlcv", lambda *a, **k: {}) - def _arctic_boom(*a, **k): - raise RuntimeError("ArcticDB unreachable") - - monkeypatch.setattr(macro, "load_universe_ohlcv", _arctic_boom) - monkeypatch.setattr(macro, "load_slim_cache", lambda s3, bucket: _universe()) - - with caplog.at_level("WARNING"): - body = _collect_body(monkeypatch) - assert isinstance(body["breadth"], dict) - assert any("falling back to slim cache" in r.message for r in caplog.records) + body = _collect_body(monkeypatch) + assert "breadth" not in body # absent, not null -def test_parity_metric_emitted_when_both_sources_present(monkeypatch, caplog): - """SOTA observation: dual-read emits a JSON ParityReport every run.""" +def test_breadth_key_omitted_when_arcticdb_raises(monkeypatch): + """ArcticDB unavailable -> breadth key omitted (no slim fallback post + Wave-4; matches pre-Wave-4 single-source-unavailable behaviour).""" _stub_fetchers(monkeypatch) - monkeypatch.setattr(macro, "load_universe_ohlcv", lambda *a, **k: _universe()) - monkeypatch.setattr(macro, "load_slim_cache", lambda s3, bucket: _universe()) - with caplog.at_level("INFO"): - body = _collect_body(monkeypatch) + def _arctic_boom(*a, **k): + raise RuntimeError("ArcticDB unreachable") - assert isinstance(body["breadth"], dict) - metric_lines = [ - r.message for r in caplog.records - if "WAVE4_PARITY_METRIC breadth" in r.message - ] - assert len(metric_lines) == 1 - import json - payload = json.loads(metric_lines[0].split("WAVE4_PARITY_METRIC breadth ", 1)[1]) - assert payload["passed"] is True # identical fixtures -> parity holds - assert payload["max_abs_value_delta"] == 0.0 + monkeypatch.setattr(macro, "load_universe_ohlcv", _arctic_boom) + + body = _collect_body(monkeypatch) + assert "breadth" not in body diff --git a/tests/test_wave4_slim_arctic_parity.py b/tests/test_wave4_slim_arctic_parity.py index 1ccfee7..3fa4bc2 100644 --- a/tests/test_wave4_slim_arctic_parity.py +++ b/tests/test_wave4_slim_arctic_parity.py @@ -1,231 +1,66 @@ """ -Wave 4 (predictor/price_cache_slim deletion) — parity harness + consumer lock. - -PR0b of the arc. **No production path is switched here.** This file does two -things: - -1. **Parity harness wiring + teeth** — proves the lib v0.19.0 substrate - (``alpha_engine_lib.arcticdb.load_universe_ohlcv`` + - ``alpha_engine_lib.reconcile.reconcile_frame_dicts``) is importable in the - data repo and that the cutover gate correctly PASSES when the ArcticDB - read matches the slim-cache read and FAILS on any value divergence. The - *live* S3-vs-ArcticDB observation is the PR4 Saturday-SF gate; this is the - offline proof that the gate machinery is sound. - -2. **Consumer-set lock (anti-drift guard)** — pins the exact set of - ``predictor/price_cache_slim/`` touch-points so a future change that adds - a new slim consumer fails this test until the Wave-4 inventory below is - updated. Mirrors the orphaned-producer / prefix-invariant guard pattern. - -AUDIT CORRECTION (2026-05-19): the ROADMAP entry claimed "3 active -production callers" (data ``collectors/macro.py``, backtester -``analysis/exit_timing.py``, dashboard ``health_checker.py``). The data-repo -audit for this PR found a **fourth, ROADMAP-missed** data-read consumer — -``features/compute.py::_load_prices_and_macro`` — which is the price+macro -base for the entire feature-compute pipeline (slim -> _apply_daily_delta -> -_extract_macro), a heavier consumer than macro-breadth. The canonical -inventory below is the corrected source of truth for PR1-PR4. +Wave-4 terminal-state guard — the predictor/price_cache_slim tier is gone. + +History: PR0b seeded this file as the migration scaffolding (a slim<->ArcticDB +parity harness + a consumer-set anti-drift lock) while consumers were moved +off slim with a fallback (PR1a macro-breadth, PR1b feature-compute, PR2 +backtester exit_timing) and the cutover was observed via WAVE4_PARITY_METRIC. +After the 5/23 parity observation confirmed equivalence, PR4 deleted the slim +writer, the load_slim_cache API, the consumer fallbacks, and the +``predictor/price_cache_slim/`` S3 prefix. + +The parity-harness tests are retired with the tier they guarded (lib's own +``test_reconcile`` / ``test_arcticdb`` still cover the substrate). What +remains is a **permanent regression guard**: the slim tier must never come +back. ArcticDB universe/macro libs are the single source of truth. """ from __future__ import annotations -import subprocess -import sys -import types +import re from pathlib import Path -import pandas as pd -import pytest - -from alpha_engine_lib.reconcile import reconcile_frame_dicts - _REPO = Path(__file__).resolve().parent.parent +# Functional slim surface — a callable definition/call or an import of the +# deleted module. The bare prefix string is tolerated in removal-marker +# comments/docstrings (e.g. "predictor/price_cache_slim/ deleted (Wave-4)"); +# what must never return is executable slim machinery. +_SLIM_CODE_RE = re.compile( + r"\b(def\s+(load|build)_slim_cache" + r"|(load|build)_slim_cache\s*\(" + r"|import\s+slim_cache" + r"|from\s+collectors\s+import[^\n]*\bslim_cache\b" + r"|from\s+collectors\.slim_cache\b" + r"|from\s+store\.parquet_loader\s+import[^\n]*\bload_slim_cache\b)" +) + + +def _production_py_files(): + for p in _REPO.rglob("*.py"): + rel = p.relative_to(_REPO).as_posix() + if rel.startswith("tests/") or "/.claude/" in f"/{rel}": + continue + yield rel, p -# ── Canonical Wave-4 consumer-set lock ─────────────────────────────────────── -# -# Every production touch-point of the predictor/price_cache_slim/ tier. -# Cross-repo entries are documented (not testable from this repo); the -# data-repo entries are enforced by test_consumer_set_has_not_drifted below. - -WAVE4_INVENTORY = { - # producer/writer — DELETED in PR4 - "writer": [ - "collectors/slim_cache.py", # builds & uploads the 2y slices - "weekly_collector.py", # invokes slim_cache.collect() - ], - # loader API — DELETED in PR4 (after all readers migrated) - "loader_api": [ - "store/parquet_loader.py", # load_slim_cache + SLIM_CACHE_PREFIX - ], - # data-read consumers — MIGRATE to ArcticDB (lib load_universe_ohlcv) in PR1 - "data_read_consumers": [ - "collectors/macro.py", # :84 _compute_market_breadth (ROADMAP-known) - "features/compute.py", # :360 _load_prices_and_macro (ROADMAP-MISSED) - ], - # doc/comment-only — cosmetic cleanup in PR4, no behaviour - "doc_only": [ - "builders/backfill.py", - "validators/price_validator.py", - ], - # cross-repo — handled by their own PRs, NOT testable from data repo: - # backtester analysis/exit_timing.py:201 -> migrate (has price_cache fallback) [PR2] - # dashboard health_checker.py:166 -> RETIRE freshness check, not migrate [PR3] -} - -# Files allowed to mention slim in the data tree = every category above plus -# this guard test itself. Drift outside this set must fail loudly. -_ALLOWED_SLIM_FILES = { - *WAVE4_INVENTORY["writer"], - *WAVE4_INVENTORY["loader_api"], - *WAVE4_INVENTORY["data_read_consumers"], - *WAVE4_INVENTORY["doc_only"], - "features/compute.py", # also carries a docstring mention (line 17) -} - - -# ── 1. Parity harness wiring + teeth ───────────────────────────────────────── - - -def _slim_shaped(frames: dict) -> dict: - """A load_slim_cache()-shaped dict: ticker -> tz-naive DatetimeIndex df.""" - return {k: v.copy() for k, v in frames.items()} - - -def _install_arctic_stub(monkeypatch, frames: dict): - """Stub arcticdb so lib.load_universe_ohlcv reads `frames`.""" - - class _Res: - def __init__(self, data): - self.data = data - - class _Lib: - def list_symbols(self): - return list(frames) - - def read(self, sym, date_range=None, columns=None): - df = frames[sym] - if date_range is not None: - lo, hi = date_range - df = df[(df.index >= lo) & (df.index <= hi)] - if columns is not None: - df = df[list(columns)] - return _Res(df.copy()) - - class _Arctic: - def get_library(self, name): - return _Lib() - - mod = types.ModuleType("arcticdb") - mod.Arctic = lambda uri: _Arctic() - monkeypatch.setitem(sys.modules, "arcticdb", mod) - - -def _build_universe(n=120): - idx = pd.bdate_range(end=pd.Timestamp("2026-05-15"), periods=n) - return { - "AAA": pd.DataFrame( - {"Close": [float(v) for v in range(100, 100 + n)], "Volume": [1] * n}, - index=idx, - ), - "BBB": pd.DataFrame( - {"Close": [float(v) for v in range(200, 200 + n)], "Volume": [2] * n}, - index=idx, - ), - } - - -def test_parity_gate_passes_when_arctic_matches_slim(monkeypatch): - """The real lib reader vs a slim-shaped dict of the same data -> PASS.""" - from alpha_engine_lib import arcticdb as ae_arctic - - universe = _build_universe() - slim = _slim_shaped(universe) - _install_arctic_stub(monkeypatch, universe) - - arctic = ae_arctic.load_universe_ohlcv( - "alpha-engine-research", lookback_days=3650, end="2026-05-15" - ) - - report = reconcile_frame_dicts(slim, arctic, value_cols=("Close",)) - assert report.passed, report.summary() - assert report.ticker_sets_match - assert report.max_abs_value_delta == 0.0 - # JSON-able so PR4's Saturday-SF gate can emit it to the metrics surface. - assert report.as_metrics()["passed"] is True - - -def test_parity_gate_has_teeth_on_value_divergence(monkeypatch): - """A single perturbed cell must fail the gate and be located.""" - from alpha_engine_lib import arcticdb as ae_arctic - - universe = _build_universe() - slim = _slim_shaped(universe) - universe["BBB"] = universe["BBB"].copy() - universe["BBB"].iloc[-1, universe["BBB"].columns.get_loc("Close")] += 0.01 - _install_arctic_stub(monkeypatch, universe) - - arctic = ae_arctic.load_universe_ohlcv( - "alpha-engine-research", lookback_days=3650, end="2026-05-15" - ) - - report = reconcile_frame_dicts(slim, arctic, value_cols=("Close",), epsilon=1e-6) - assert not report.passed - assert report.n_cells_over_epsilon == 1 - assert report.worst_cell[0] == "BBB" - - -def test_boundary_rowcount_delta_is_reported_but_not_fatal(monkeypatch): - """Slim 2y tail vs ArcticDB date_range read differ at the edge while - agreeing on the overlap — the institutional nuance, exercised.""" - from alpha_engine_lib import arcticdb as ae_arctic - - universe = _build_universe() - # slim only keeps the last 60 rows; arctic read returns all 120 - slim = {k: v.iloc[-60:].copy() for k, v in universe.items()} - _install_arctic_stub(monkeypatch, universe) - - arctic = ae_arctic.load_universe_ohlcv( - "alpha-engine-research", lookback_days=3650, end="2026-05-15" - ) - report = reconcile_frame_dicts(slim, arctic, value_cols=("Close",)) - assert not report.rowcounts_match # delta reported - assert report.passed # overlap agrees -> gate PASS - strict = reconcile_frame_dicts( - slim, arctic, value_cols=("Close",), require_rowcount_match=True +def test_slim_cache_module_deleted(): + assert not (_REPO / "collectors" / "slim_cache.py").exists(), ( + "collectors/slim_cache.py must stay deleted (Wave-4). The slim " + "writer is gone; ArcticDB universe/macro libs are canonical." ) - assert not strict.passed # strict mode available - -# ── 2. Consumer-set lock (anti-drift guard) ────────────────────────────────── - -def test_consumer_set_has_not_drifted(): - """Fail if a data-repo file references the slim tier and is not in the - locked Wave-4 inventory. Forces WAVE4_INVENTORY to stay the single - source of truth for the migration arc.""" - out = subprocess.run( - ["git", "-C", str(_REPO), "grep", "-lE", - r"price_cache_slim|load_slim_cache|build_slim_cache", "--", "*.py"], - capture_output=True, text=True, - ).stdout - - found = set() - for line in out.splitlines(): - line = line.strip() - if not line or line.startswith("tests/") or "/.claude/" in line: - continue - found.add(line) - - unexpected = found - _ALLOWED_SLIM_FILES - assert not unexpected, ( - f"New slim-cache touch-point(s) not in the Wave-4 inventory: " - f"{sorted(unexpected)}. Add to WAVE4_INVENTORY (and decide: " - f"migrate-to-ArcticDB consumer, or doc-only?) before merging." +def test_no_functional_slim_surface_in_production(): + """No slim writer/loader definition, call, or import anywhere in + production code. Regression guard — the tier must not return.""" + offenders = [] + for rel, path in _production_py_files(): + for i, line in enumerate(path.read_text().splitlines(), 1): + if _SLIM_CODE_RE.search(line): + offenders.append(f"{rel}:{i}: {line.strip()[:100]}") + assert not offenders, ( + "Functional slim-cache surface re-introduced (Wave-4 deleted it — " + "use alpha_engine_lib.arcticdb load_universe_ohlcv / " + "load_macro_series instead):\n" + "\n".join(offenders) ) - - # Inventory entries must still exist (catch a rename that silently - # drops a consumer from the migration plan). - for rel in sorted(_ALLOWED_SLIM_FILES): - assert (_REPO / rel).exists(), f"Inventory file vanished: {rel}" diff --git a/weekly_collector.py b/weekly_collector.py index 11db7da..d80abb4 100644 --- a/weekly_collector.py +++ b/weekly_collector.py @@ -1,7 +1,7 @@ """ weekly_collector.py — Centralized weekly data collection for Alpha Engine. -Phase 1 (before research): constituents, prices, slim cache, macro, universe returns. +Phase 1 (before research): constituents, prices, macro, universe returns. Phase 2 (after research): alternative data for promoted tickers. Phase 1 runs on EC2 via SSM RunCommand (price refresh takes 15-25 min). @@ -73,7 +73,7 @@ def _load_dotenv() -> None: exclude_patterns=_FLOW_DOCTOR_EXCLUDE_PATTERNS, ) -from collectors import constituents, prices, slim_cache, macro, universe_returns, signal_returns, alternative, daily_closes, fundamentals, short_interest +from collectors import constituents, prices, macro, universe_returns, signal_returns, alternative, daily_closes, fundamentals, short_interest logger = logging.getLogger(__name__) @@ -133,7 +133,7 @@ def run_weekly(config: dict, args: argparse.Namespace) -> dict: def _run_phase1(config: dict, args: argparse.Namespace) -> dict: - """Phase 1: constituents, prices, slim cache, macro, universe returns.""" + """Phase 1: constituents, prices, macro, universe returns.""" bucket = config["bucket"] price_cfg = config.get("price_cache", {}) market_prefix = config.get("market_data", {}).get("s3_prefix", "market_data/") @@ -211,23 +211,10 @@ def _run_phase1(config: dict, args: argparse.Namespace) -> dict: logger.error("Price cache refresh failed: %s", e) results["collectors"]["prices"] = {"status": "error", "error": str(e)} - # ── 3. Slim cache ──────────────────────────────────────────────────────── - if only in (None, "slim"): - logger.info("=" * 60) - logger.info("COLLECTING: slim cache") - logger.info("=" * 60) - try: - slim_result = slim_cache.collect( - bucket=bucket, - full_cache_prefix=price_cfg.get("s3_prefix", "predictor/price_cache/"), - slim_prefix=price_cfg.get("slim_prefix", "predictor/price_cache_slim/"), - lookback_days=price_cfg.get("slim_lookback_days", 730), - dry_run=dry_run, - ) - results["collectors"]["slim_cache"] = slim_result - except Exception as e: - logger.error("Slim cache write failed: %s", e) - results["collectors"]["slim_cache"] = {"status": "error", "error": str(e)} + # ── 3. Slim cache — REMOVED (Wave-4) ───────────────────────────────────── + # predictor/price_cache_slim/ deleted: every consumer (data macro-breadth + # + feature compute, backtester exit_timing) reads the ArcticDB universe/ + # macro libs directly. No slim writer; the prefix is gone. # ── 4. Macro data ──────────────────────────────────────────────────────── if only in (None, "macro"): @@ -1577,7 +1564,7 @@ def _parse_args() -> argparse.Namespace: ) parser.add_argument( "--only", - choices=["constituents", "prices", "slim", "macro", "short_interest", "universe_returns", "alternative", "daily_closes", "features", "arcticdb"], + choices=["constituents", "prices", "macro", "short_interest", "universe_returns", "alternative", "daily_closes", "features", "arcticdb"], help="Run a single collector instead of all", ) parser.add_argument(