Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 16 additions & 42 deletions analysis/exit_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

Requires daily OHLCV price data during the hold period. Reads from the
ArcticDB universe library (primary, via alpha_engine_lib), falling back to
the predictor/price_cache_slim then predictor/price_cache parquets in S3
(no external API calls). Wave-4 migration: the slim leg is parity-observed
and removed in PR4.
the predictor/price_cache (10y) parquets in S3 (no external API calls).
Wave-4: the predictor/price_cache_slim leg was removed after the parity
observation confirmed slim<->ArcticDB equivalence.

Data source: trades table in trades.db (roundtrip trades with entry_trade_id).
"""
Expand All @@ -25,7 +25,6 @@
import pandas as pd

from alpha_engine_lib.arcticdb import load_universe_ohlcv
from alpha_engine_lib.reconcile import reconcile_frame_dicts

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -199,24 +198,23 @@ def _load_price_cache(tickers: list[str], bucket: str = "alpha-engine-research")
Silently skips tickers that don't have cache files.
"""
import io
import json
import boto3

# Wave-4 (predictor/price_cache_slim deletion): the ArcticDB universe
# lib is primary for traded tickers (all equities + SPY, which are
# universe members — exit_timing never needs macro/index symbols, so
# no macro-lib read here). The slim -> price_cache(10y) parquet chain
# is the fallback. While slim still exists we dual-read it for the
# parity ParityReport (grep ``WAVE4_PARITY_METRIC exit_timing``) so
# PR4's deletion is data-driven. The slim leg is removed in PR4;
# predictor/price_cache (10y) stays — that is Wave-3's scope.
# Wave-4 terminal state (predictor/price_cache_slim deleted): the
# ArcticDB universe lib is the source for traded tickers (all equities
# + SPY, which are universe members — exit_timing never needs macro/
# index symbols, so no macro-lib read). predictor/price_cache (the 10y
# full parquet — Wave-3's scope, untouched here) is the sole fallback
# for any ticker ArcticDB does not return. The slim leg + parity
# dual-read were removed after the 5/23 observation confirmed
# slim<->ArcticDB equivalence.
tickers = list(tickers)
s3 = boto3.client("s3")

arctic: dict[str, pd.DataFrame] = {}
try:
arctic = load_universe_ohlcv(bucket, symbols=tickers)
except Exception as exc: # noqa: BLE001 - fall back to parquet chain
except Exception as exc: # noqa: BLE001 - fall back to price_cache
logger.warning(
"ArcticDB universe read for exit_timing failed: %s", exc
)
Expand All @@ -236,36 +234,12 @@ def _read_parquet(prefix: str, ticker: str):
except Exception:
return None

# Parity observation: compare slim vs ArcticDB over the tickers
# ArcticDB returned (set asymmetry expected — some traded tickers may
# only exist in the parquet cache; logged, not fatal).
if arctic:
slim_for_parity = {}
for ticker in arctic:
d = _read_parquet("predictor/price_cache_slim", ticker)
if d is not None:
slim_for_parity[ticker] = d
if slim_for_parity:
report = reconcile_frame_dicts(
slim_for_parity,
{k: arctic[k] for k in slim_for_parity},
value_cols=("Close",),
require_ticker_match=False,
)
logger.info("exit_timing slim<->arctic %s", report.summary())
logger.info(
"WAVE4_PARITY_METRIC exit_timing %s",
json.dumps(report.as_metrics()),
)

cache = dict(arctic)
# Fallback parquet chain for any ticker ArcticDB did not return.
# Fallback: predictor/price_cache (10y) for any ticker ArcticDB missed.
for ticker in tickers:
if ticker in cache:
continue
for prefix in ("predictor/price_cache_slim", "predictor/price_cache"):
df = _read_parquet(prefix, ticker)
if df is not None:
cache[ticker] = df
break
df = _read_parquet("predictor/price_cache", ticker)
if df is not None:
cache[ticker] = df
return cache
11 changes: 7 additions & 4 deletions infrastructure/spot_backtest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -517,17 +517,20 @@ else
echo " WARNING: predictor.yaml not found — predictor backtest will be skipped"
fi

# Bootstrap predictor data cache (slim cache parquets + sector_map required for backtest)
echo "==> Downloading predictor slim cache from S3 (~25 MB)..."
# Bootstrap predictor data cache: only sector_map.json is consumed
# (predictor_backtest.load_sector_map). The former price_cache_slim sync
# was Wave-4 dead staging — predictor_backtest loads prices+features from
# ArcticDB (load_universe_from_arctic), never the local cache parquets;
# verified no data/cache/*.parquet reader exists. Removed in Wave-4 PR4.
echo "==> Downloading predictor sector_map from S3..."
run_remote bash -s <<'CACHE'
set -euo pipefail
CACHE_DIR="/home/ec2-user/alpha-engine-predictor/data/cache"
mkdir -p "$CACHE_DIR"
if command -v aws &>/dev/null; then
aws s3 cp s3://alpha-engine-research/predictor/price_cache/sector_map.json "$CACHE_DIR/sector_map.json" 2>/dev/null || true
aws s3 sync s3://alpha-engine-research/predictor/price_cache_slim/ "$CACHE_DIR/" --quiet 2>/dev/null || true
fi
echo "Predictor cache dir: $(ls "$CACHE_DIR"/*.parquet 2>/dev/null | wc -l) parquet files"
echo "Predictor cache dir: sector_map.json $([ -f "$CACHE_DIR/sector_map.json" ] && echo present || echo MISSING)"
CACHE

# ── Build env export command ─────────────────────────────────────────────────
Expand Down
64 changes: 12 additions & 52 deletions tests/test_exit_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,46 +359,32 @@ def get_object(self, Bucket, Key):


def test_load_price_cache_arcticdb_primary_no_parquet_needed(monkeypatch):
"""Wave-4 terminal: ArcticDB returns all tickers -> no parquet read,
no slim, no parity emit."""
monkeypatch.setattr(
"analysis.exit_timing.load_universe_ohlcv",
lambda bucket, symbols: {"AAPL": _pf(), "SPY": _pf(start=500)},
)
# S3 store empty except slim parity copies (identical -> parity PASS)
store = {
"predictor/price_cache_slim/AAPL.parquet": _pf(),
"predictor/price_cache_slim/SPY.parquet": _pf(start=500),
}
monkeypatch.setattr(
"boto3.client", lambda svc: _FakeS3(store)
)

import logging
with _capture_logs("analysis.exit_timing") as recs:
cache = _load_price_cache(["AAPL", "SPY"])

monkeypatch.setattr("boto3.client", lambda svc: _FakeS3({}))
cache = _load_price_cache(["AAPL", "SPY"])
assert set(cache) == {"AAPL", "SPY"}
lines = [r for r in recs if "WAVE4_PARITY_METRIC exit_timing" in r]
assert len(lines) == 1
import json
payload = json.loads(lines[0].split("WAVE4_PARITY_METRIC exit_timing ", 1)[1])
assert payload["passed"] is True
assert payload["max_abs_value_delta"] == 0.0


def test_load_price_cache_falls_back_to_parquet_when_arctic_empty(monkeypatch):
def test_load_price_cache_falls_back_to_price_cache_when_arctic_empty(monkeypatch):
"""Sole fallback is predictor/price_cache (10y) — the slim leg is gone."""
monkeypatch.setattr(
"analysis.exit_timing.load_universe_ohlcv",
lambda bucket, symbols: {},
)
store = {
# slim is deleted: a slim-only ticker is now unrecoverable.
"predictor/price_cache_slim/AAPL.parquet": _pf(),
"predictor/price_cache/MSFT.parquet": _pf(start=300), # 10y leg
"predictor/price_cache/MSFT.parquet": _pf(start=300),
}
monkeypatch.setattr(
"boto3.client", lambda svc: _FakeS3(store)
)
monkeypatch.setattr("boto3.client", lambda svc: _FakeS3(store))
cache = _load_price_cache(["AAPL", "MSFT", "GONE"])
assert set(cache) == {"AAPL", "MSFT"} # slim + price_cache legs; GONE absent
# Only the price_cache(10y) leg is consulted now -> MSFT only.
assert set(cache) == {"MSFT"}


def test_load_price_cache_arctic_failure_is_caught(monkeypatch):
Expand All @@ -407,32 +393,6 @@ def _boom(bucket, symbols):

monkeypatch.setattr("analysis.exit_timing.load_universe_ohlcv", _boom)
store = {"predictor/price_cache/AAPL.parquet": _pf()}
monkeypatch.setattr(
"boto3.client", lambda svc: _FakeS3(store)
)
monkeypatch.setattr("boto3.client", lambda svc: _FakeS3(store))
cache = _load_price_cache(["AAPL"])
assert set(cache) == {"AAPL"} # graceful fallback, no raise


import contextlib # noqa: E402
import logging # noqa: E402


@contextlib.contextmanager
def _capture_logs(logger_name):
recs: list[str] = []

class _H(logging.Handler):
def emit(self, record):
recs.append(record.getMessage())

lg = logging.getLogger(logger_name)
h = _H()
lg.addHandler(h)
old = lg.level
lg.setLevel(logging.INFO)
try:
yield recs
finally:
lg.removeHandler(h)
lg.setLevel(old)
Loading