diff --git a/analysis/exit_timing.py b/analysis/exit_timing.py index 0041b2e..b92376c 100644 --- a/analysis/exit_timing.py +++ b/analysis/exit_timing.py @@ -9,9 +9,9 @@ Requires daily OHLCV price data during the hold period. Reads from the ArcticDB universe library (primary, via alpha_engine_lib), falling back to -the predictor/price_cache_slim then predictor/price_cache parquets in S3 -(no external API calls). Wave-4 migration: the slim leg is parity-observed -and removed in PR4. +the predictor/price_cache (10y) parquets in S3 (no external API calls). +Wave-4: the predictor/price_cache_slim leg was removed after the parity +observation confirmed slim<->ArcticDB equivalence. Data source: trades table in trades.db (roundtrip trades with entry_trade_id). """ @@ -25,7 +25,6 @@ import pandas as pd from alpha_engine_lib.arcticdb import load_universe_ohlcv -from alpha_engine_lib.reconcile import reconcile_frame_dicts logger = logging.getLogger(__name__) @@ -199,24 +198,23 @@ def _load_price_cache(tickers: list[str], bucket: str = "alpha-engine-research") Silently skips tickers that don't have cache files. """ import io - import json import boto3 - # Wave-4 (predictor/price_cache_slim deletion): the ArcticDB universe - # lib is primary for traded tickers (all equities + SPY, which are - # universe members — exit_timing never needs macro/index symbols, so - # no macro-lib read here). The slim -> price_cache(10y) parquet chain - # is the fallback. While slim still exists we dual-read it for the - # parity ParityReport (grep ``WAVE4_PARITY_METRIC exit_timing``) so - # PR4's deletion is data-driven. The slim leg is removed in PR4; - # predictor/price_cache (10y) stays — that is Wave-3's scope. + # Wave-4 terminal state (predictor/price_cache_slim deleted): the + # ArcticDB universe lib is the source for traded tickers (all equities + # + SPY, which are universe members — exit_timing never needs macro/ + # index symbols, so no macro-lib read). predictor/price_cache (the 10y + # full parquet — Wave-3's scope, untouched here) is the sole fallback + # for any ticker ArcticDB does not return. The slim leg + parity + # dual-read were removed after the 5/23 observation confirmed + # slim<->ArcticDB equivalence. tickers = list(tickers) s3 = boto3.client("s3") arctic: dict[str, pd.DataFrame] = {} try: arctic = load_universe_ohlcv(bucket, symbols=tickers) - except Exception as exc: # noqa: BLE001 - fall back to parquet chain + except Exception as exc: # noqa: BLE001 - fall back to price_cache logger.warning( "ArcticDB universe read for exit_timing failed: %s", exc ) @@ -236,36 +234,12 @@ def _read_parquet(prefix: str, ticker: str): except Exception: return None - # Parity observation: compare slim vs ArcticDB over the tickers - # ArcticDB returned (set asymmetry expected — some traded tickers may - # only exist in the parquet cache; logged, not fatal). - if arctic: - slim_for_parity = {} - for ticker in arctic: - d = _read_parquet("predictor/price_cache_slim", ticker) - if d is not None: - slim_for_parity[ticker] = d - if slim_for_parity: - report = reconcile_frame_dicts( - slim_for_parity, - {k: arctic[k] for k in slim_for_parity}, - value_cols=("Close",), - require_ticker_match=False, - ) - logger.info("exit_timing slim<->arctic %s", report.summary()) - logger.info( - "WAVE4_PARITY_METRIC exit_timing %s", - json.dumps(report.as_metrics()), - ) - cache = dict(arctic) - # Fallback parquet chain for any ticker ArcticDB did not return. + # Fallback: predictor/price_cache (10y) for any ticker ArcticDB missed. for ticker in tickers: if ticker in cache: continue - for prefix in ("predictor/price_cache_slim", "predictor/price_cache"): - df = _read_parquet(prefix, ticker) - if df is not None: - cache[ticker] = df - break + df = _read_parquet("predictor/price_cache", ticker) + if df is not None: + cache[ticker] = df return cache diff --git a/infrastructure/spot_backtest.sh b/infrastructure/spot_backtest.sh index a8d741b..c0d4f9e 100755 --- a/infrastructure/spot_backtest.sh +++ b/infrastructure/spot_backtest.sh @@ -517,17 +517,20 @@ else echo " WARNING: predictor.yaml not found — predictor backtest will be skipped" fi -# Bootstrap predictor data cache (slim cache parquets + sector_map required for backtest) -echo "==> Downloading predictor slim cache from S3 (~25 MB)..." +# Bootstrap predictor data cache: only sector_map.json is consumed +# (predictor_backtest.load_sector_map). The former price_cache_slim sync +# was Wave-4 dead staging — predictor_backtest loads prices+features from +# ArcticDB (load_universe_from_arctic), never the local cache parquets; +# verified no data/cache/*.parquet reader exists. Removed in Wave-4 PR4. +echo "==> Downloading predictor sector_map from S3..." run_remote bash -s <<'CACHE' set -euo pipefail CACHE_DIR="/home/ec2-user/alpha-engine-predictor/data/cache" mkdir -p "$CACHE_DIR" if command -v aws &>/dev/null; then aws s3 cp s3://alpha-engine-research/predictor/price_cache/sector_map.json "$CACHE_DIR/sector_map.json" 2>/dev/null || true - aws s3 sync s3://alpha-engine-research/predictor/price_cache_slim/ "$CACHE_DIR/" --quiet 2>/dev/null || true fi -echo "Predictor cache dir: $(ls "$CACHE_DIR"/*.parquet 2>/dev/null | wc -l) parquet files" +echo "Predictor cache dir: sector_map.json $([ -f "$CACHE_DIR/sector_map.json" ] && echo present || echo MISSING)" CACHE # ── Build env export command ───────────────────────────────────────────────── diff --git a/tests/test_exit_timing.py b/tests/test_exit_timing.py index 3fe2529..0b0a1b2 100644 --- a/tests/test_exit_timing.py +++ b/tests/test_exit_timing.py @@ -359,46 +359,32 @@ def get_object(self, Bucket, Key): def test_load_price_cache_arcticdb_primary_no_parquet_needed(monkeypatch): + """Wave-4 terminal: ArcticDB returns all tickers -> no parquet read, + no slim, no parity emit.""" monkeypatch.setattr( "analysis.exit_timing.load_universe_ohlcv", lambda bucket, symbols: {"AAPL": _pf(), "SPY": _pf(start=500)}, ) - # S3 store empty except slim parity copies (identical -> parity PASS) - store = { - "predictor/price_cache_slim/AAPL.parquet": _pf(), - "predictor/price_cache_slim/SPY.parquet": _pf(start=500), - } - monkeypatch.setattr( - "boto3.client", lambda svc: _FakeS3(store) - ) - - import logging - with _capture_logs("analysis.exit_timing") as recs: - cache = _load_price_cache(["AAPL", "SPY"]) - + monkeypatch.setattr("boto3.client", lambda svc: _FakeS3({})) + cache = _load_price_cache(["AAPL", "SPY"]) assert set(cache) == {"AAPL", "SPY"} - lines = [r for r in recs if "WAVE4_PARITY_METRIC exit_timing" in r] - assert len(lines) == 1 - import json - payload = json.loads(lines[0].split("WAVE4_PARITY_METRIC exit_timing ", 1)[1]) - assert payload["passed"] is True - assert payload["max_abs_value_delta"] == 0.0 -def test_load_price_cache_falls_back_to_parquet_when_arctic_empty(monkeypatch): +def test_load_price_cache_falls_back_to_price_cache_when_arctic_empty(monkeypatch): + """Sole fallback is predictor/price_cache (10y) — the slim leg is gone.""" monkeypatch.setattr( "analysis.exit_timing.load_universe_ohlcv", lambda bucket, symbols: {}, ) store = { + # slim is deleted: a slim-only ticker is now unrecoverable. "predictor/price_cache_slim/AAPL.parquet": _pf(), - "predictor/price_cache/MSFT.parquet": _pf(start=300), # 10y leg + "predictor/price_cache/MSFT.parquet": _pf(start=300), } - monkeypatch.setattr( - "boto3.client", lambda svc: _FakeS3(store) - ) + monkeypatch.setattr("boto3.client", lambda svc: _FakeS3(store)) cache = _load_price_cache(["AAPL", "MSFT", "GONE"]) - assert set(cache) == {"AAPL", "MSFT"} # slim + price_cache legs; GONE absent + # Only the price_cache(10y) leg is consulted now -> MSFT only. + assert set(cache) == {"MSFT"} def test_load_price_cache_arctic_failure_is_caught(monkeypatch): @@ -407,32 +393,6 @@ def _boom(bucket, symbols): monkeypatch.setattr("analysis.exit_timing.load_universe_ohlcv", _boom) store = {"predictor/price_cache/AAPL.parquet": _pf()} - monkeypatch.setattr( - "boto3.client", lambda svc: _FakeS3(store) - ) + monkeypatch.setattr("boto3.client", lambda svc: _FakeS3(store)) cache = _load_price_cache(["AAPL"]) assert set(cache) == {"AAPL"} # graceful fallback, no raise - - -import contextlib # noqa: E402 -import logging # noqa: E402 - - -@contextlib.contextmanager -def _capture_logs(logger_name): - recs: list[str] = [] - - class _H(logging.Handler): - def emit(self, record): - recs.append(record.getMessage()) - - lg = logging.getLogger(logger_name) - h = _H() - lg.addHandler(h) - old = lg.level - lg.setLevel(logging.INFO) - try: - yield recs - finally: - lg.removeHandler(h) - lg.setLevel(old)