From b3269a55183f7cbb1bed6a963679b871bbbcc7a2 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 19 May 2026 12:46:04 -0700 Subject: [PATCH] feat(exit_timing): ArcticDB-primary price load (slim parity + price_cache fallback) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR2 of the Wave-4 predictor/price_cache_slim deletion arc — backtester consumer. Consumes lib v0.20.0. analysis/exit_timing._load_price_cache: the traded-ticker price load (equities + SPY from trades.db — all universe members; exit_timing never needs macro/index symbols, so no macro-lib read) moves to lib load_universe_ohlcv as primary. The slim -> price_cache(10y) parquet chain is retained as fallback for any ticker ArcticDB does not return. While slim still exists, dual-read + emit reconcile as_metrics() JSON (grep WAVE4_PARITY_METRIC exit_timing), require_ticker_match=False (some traded tickers may live only in the parquet cache). slim leg removed in PR4; predictor/price_cache (10y) stays — Wave-3 scope. Stale module docstring (line 11) corrected. Lockstep pin bump 0.16.0 -> 0.20.0 across all 4 sites (requirements.txt + lambda_health/concordance/counterfactual Dockerfiles). The separate test_flow_doctor_wiring stable-tag assertion updated per its own 'update if the pin moves' instruction. +3 tests (arctic-primary / parquet-fallback / arctic-failure-caught); full backtester suite 1687 passing. FINDING (not in scope; surfaced not actioned): spot_backtest.sh:528 'aws s3 sync predictor/price_cache_slim/' bootstraps a local cache for the predictor synthetic backtest, but synthetic/predictor_backtest.py's own docstring states its slim fallbacks were removed 2026-04-16 (ArcticDB canonical) — likely DEAD staging (Wave-5 'tested but uncalled' pattern). Left intact pending verification; flagged for a follow-up before PR4. Co-Authored-By: Claude Opus 4.7 (1M context) --- analysis/exit_timing.py | 89 ++++++++++++++++++++----- lambda_concordance/Dockerfile | 2 +- lambda_counterfactual/Dockerfile | 2 +- lambda_health/Dockerfile | 2 +- requirements.txt | 2 +- tests/test_exit_timing.py | 111 +++++++++++++++++++++++++++++++ tests/test_flow_doctor_wiring.py | 17 ++--- 7 files changed, 196 insertions(+), 29 deletions(-) diff --git a/analysis/exit_timing.py b/analysis/exit_timing.py index 875c789..0041b2e 100644 --- a/analysis/exit_timing.py +++ b/analysis/exit_timing.py @@ -7,8 +7,11 @@ - Capture ratio: realized return / MFE (are we capturing gains?) - Stop efficiency: |realized loss| / MAE (are stops placed well?) -Requires daily OHLCV price data during the hold period. Reads from -predictor/price_cache_slim parquets in S3 (no external API calls). +Requires daily OHLCV price data during the hold period. Reads from the +ArcticDB universe library (primary, via alpha_engine_lib), falling back to +the predictor/price_cache_slim then predictor/price_cache parquets in S3 +(no external API calls). Wave-4 migration: the slim leg is parity-observed +and removed in PR4. Data source: trades table in trades.db (roundtrip trades with entry_trade_id). """ @@ -21,6 +24,9 @@ import pandas as pd +from alpha_engine_lib.arcticdb import load_universe_ohlcv +from alpha_engine_lib.reconcile import reconcile_frame_dicts + logger = logging.getLogger(__name__) @@ -193,24 +199,73 @@ def _load_price_cache(tickers: list[str], bucket: str = "alpha-engine-research") Silently skips tickers that don't have cache files. """ import io + import json import boto3 + # Wave-4 (predictor/price_cache_slim deletion): the ArcticDB universe + # lib is primary for traded tickers (all equities + SPY, which are + # universe members — exit_timing never needs macro/index symbols, so + # no macro-lib read here). The slim -> price_cache(10y) parquet chain + # is the fallback. While slim still exists we dual-read it for the + # parity ParityReport (grep ``WAVE4_PARITY_METRIC exit_timing``) so + # PR4's deletion is data-driven. The slim leg is removed in PR4; + # predictor/price_cache (10y) stays — that is Wave-3's scope. + tickers = list(tickers) s3 = boto3.client("s3") - cache = {} + + arctic: dict[str, pd.DataFrame] = {} + try: + arctic = load_universe_ohlcv(bucket, symbols=tickers) + except Exception as exc: # noqa: BLE001 - fall back to parquet chain + logger.warning( + "ArcticDB universe read for exit_timing failed: %s", exc + ) + + def _read_parquet(prefix: str, ticker: str): + key = f"{prefix}/{ticker}.parquet" + try: + resp = s3.get_object(Bucket=bucket, Key=key) + df = pd.read_parquet(io.BytesIO(resp["Body"].read())) + if df.empty: + return None + if not isinstance(df.index, pd.DatetimeIndex): + if "Date" in df.columns: + df = df.set_index("Date") + df.index = pd.to_datetime(df.index) + return df + except Exception: + return None + + # Parity observation: compare slim vs ArcticDB over the tickers + # ArcticDB returned (set asymmetry expected — some traded tickers may + # only exist in the parquet cache; logged, not fatal). + if arctic: + slim_for_parity = {} + for ticker in arctic: + d = _read_parquet("predictor/price_cache_slim", ticker) + if d is not None: + slim_for_parity[ticker] = d + if slim_for_parity: + report = reconcile_frame_dicts( + slim_for_parity, + {k: arctic[k] for k in slim_for_parity}, + value_cols=("Close",), + require_ticker_match=False, + ) + logger.info("exit_timing slim<->arctic %s", report.summary()) + logger.info( + "WAVE4_PARITY_METRIC exit_timing %s", + json.dumps(report.as_metrics()), + ) + + cache = dict(arctic) + # Fallback parquet chain for any ticker ArcticDB did not return. for ticker in tickers: - # Try slim cache first (smaller, 2y), then full cache (10y) + if ticker in cache: + continue for prefix in ("predictor/price_cache_slim", "predictor/price_cache"): - key = f"{prefix}/{ticker}.parquet" - try: - resp = s3.get_object(Bucket=bucket, Key=key) - df = pd.read_parquet(io.BytesIO(resp["Body"].read())) - if not df.empty: - if not isinstance(df.index, pd.DatetimeIndex): - if "Date" in df.columns: - df = df.set_index("Date") - df.index = pd.to_datetime(df.index) - cache[ticker] = df - break - except Exception: - continue + df = _read_parquet(prefix, ticker) + if df is not None: + cache[ticker] = df + break return cache diff --git a/lambda_concordance/Dockerfile b/lambda_concordance/Dockerfile index e4fc827..3f6a8c9 100644 --- a/lambda_concordance/Dockerfile +++ b/lambda_concordance/Dockerfile @@ -26,7 +26,7 @@ RUN microdnf install -y git && microdnf clean all # integration. No [arcticdb] / [rag] — concordance reads decision # artifacts from S3 only. COPY lambda_concordance/requirements-concordance.txt requirements-concordance.txt -RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.16.0" && \ +RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.20.0" && \ pip install --no-cache-dir -r requirements-concordance.txt && \ rm -rf /root/.cache/pip diff --git a/lambda_counterfactual/Dockerfile b/lambda_counterfactual/Dockerfile index 5a92657..3adc22c 100644 --- a/lambda_counterfactual/Dockerfile +++ b/lambda_counterfactual/Dockerfile @@ -24,7 +24,7 @@ RUN microdnf install -y git && microdnf clean all # flow-doctor's logging integration. No [arcticdb] / [rag] — counter- # factual reads decision artifacts from S3 only. COPY lambda_counterfactual/requirements-counterfactual.txt requirements-counterfactual.txt -RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.16.0" && \ +RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.20.0" && \ pip install --no-cache-dir -r requirements-counterfactual.txt && \ rm -rf /root/.cache/pip diff --git a/lambda_health/Dockerfile b/lambda_health/Dockerfile index 0a86b12..f2e3c5a 100644 --- a/lambda_health/Dockerfile +++ b/lambda_health/Dockerfile @@ -26,7 +26,7 @@ COPY requirements-health.txt . # why the health Lambda image stayed frozen at its 2026-04-08 build and # re-raised the already-fixed 2026-05-11 false-positive retrain alert. RUN microdnf install -y git && microdnf clean all -RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.16.0" && \ +RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.20.0" && \ pip install --no-cache-dir -r requirements-health.txt && \ rm -rf /root/.cache/pip diff --git a/requirements.txt b/requirements.txt index 3fe6f48..9eb3cd9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,4 +45,4 @@ cvxpy>=1.4,<1.8 # integration tests); cross-repo lockstep — bumping in sync with # alpha-engine's lib pin to avoid the silent-downgrade pattern flagged # in feedback_lib_pin_lockstep_cross_repo_composition. -alpha-engine-lib[arcticdb,flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.16.0 +alpha-engine-lib[arcticdb,flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.20.0 diff --git a/tests/test_exit_timing.py b/tests/test_exit_timing.py index 0d3aca8..3fe2529 100644 --- a/tests/test_exit_timing.py +++ b/tests/test_exit_timing.py @@ -325,3 +325,114 @@ def broken_connect(_path): result = compute_exit_timing(str(db)) assert result["status"] == "error" assert "simulated query failure" in result["error"] + + +# ── Wave-4: _load_price_cache ArcticDB primary / parquet fallback / parity ──── + +import io as _io # noqa: E402 + +from analysis.exit_timing import _load_price_cache # noqa: E402 + + +def _pf(n=8, start=100.0): + idx = pd.date_range("2026-03-01", periods=n, freq="D") + return pd.DataFrame( + {"Open": [start] * n, "High": [start] * n, "Low": [start] * n, + "Close": [float(start + i) for i in range(n)], "Volume": [1] * n}, + index=idx, + ) + + +class _FakeS3: + """get_object serving parquet bytes from a {key: DataFrame} map.""" + + def __init__(self, store): + self._store = store + + def get_object(self, Bucket, Key): + if Key not in self._store: + raise RuntimeError(f"NoSuchKey {Key}") + buf = _io.BytesIO() + self._store[Key].to_parquet(buf) + buf.seek(0) + return {"Body": buf} + + +def test_load_price_cache_arcticdb_primary_no_parquet_needed(monkeypatch): + monkeypatch.setattr( + "analysis.exit_timing.load_universe_ohlcv", + lambda bucket, symbols: {"AAPL": _pf(), "SPY": _pf(start=500)}, + ) + # S3 store empty except slim parity copies (identical -> parity PASS) + store = { + "predictor/price_cache_slim/AAPL.parquet": _pf(), + "predictor/price_cache_slim/SPY.parquet": _pf(start=500), + } + monkeypatch.setattr( + "boto3.client", lambda svc: _FakeS3(store) + ) + + import logging + with _capture_logs("analysis.exit_timing") as recs: + cache = _load_price_cache(["AAPL", "SPY"]) + + assert set(cache) == {"AAPL", "SPY"} + lines = [r for r in recs if "WAVE4_PARITY_METRIC exit_timing" in r] + assert len(lines) == 1 + import json + payload = json.loads(lines[0].split("WAVE4_PARITY_METRIC exit_timing ", 1)[1]) + assert payload["passed"] is True + assert payload["max_abs_value_delta"] == 0.0 + + +def test_load_price_cache_falls_back_to_parquet_when_arctic_empty(monkeypatch): + monkeypatch.setattr( + "analysis.exit_timing.load_universe_ohlcv", + lambda bucket, symbols: {}, + ) + store = { + "predictor/price_cache_slim/AAPL.parquet": _pf(), + "predictor/price_cache/MSFT.parquet": _pf(start=300), # 10y leg + } + monkeypatch.setattr( + "boto3.client", lambda svc: _FakeS3(store) + ) + cache = _load_price_cache(["AAPL", "MSFT", "GONE"]) + assert set(cache) == {"AAPL", "MSFT"} # slim + price_cache legs; GONE absent + + +def test_load_price_cache_arctic_failure_is_caught(monkeypatch): + def _boom(bucket, symbols): + raise RuntimeError("ArcticDB down") + + monkeypatch.setattr("analysis.exit_timing.load_universe_ohlcv", _boom) + store = {"predictor/price_cache/AAPL.parquet": _pf()} + monkeypatch.setattr( + "boto3.client", lambda svc: _FakeS3(store) + ) + cache = _load_price_cache(["AAPL"]) + assert set(cache) == {"AAPL"} # graceful fallback, no raise + + +import contextlib # noqa: E402 +import logging # noqa: E402 + + +@contextlib.contextmanager +def _capture_logs(logger_name): + recs: list[str] = [] + + class _H(logging.Handler): + def emit(self, record): + recs.append(record.getMessage()) + + lg = logging.getLogger(logger_name) + h = _H() + lg.addHandler(h) + old = lg.level + lg.setLevel(logging.INFO) + try: + yield recs + finally: + lg.removeHandler(h) + lg.setLevel(old) diff --git a/tests/test_flow_doctor_wiring.py b/tests/test_flow_doctor_wiring.py index 009bbe4..2b5665c 100644 --- a/tests/test_flow_doctor_wiring.py +++ b/tests/test_flow_doctor_wiring.py @@ -374,12 +374,13 @@ def test_requirements_pins_lib_to_stable_tag(self): # Either tagged version, or unpinned via @main (we explicitly # forbid @main here — it floats and breaks reproducible builds). assert "@main" not in text, "alpha-engine-lib must be pinned to a tag, not @main" - assert "@v0.16.0" in text, ( - "alpha-engine-lib should pin to v0.16.0 (load_latest_eval_artifact " - "+ list_eval_artifacts canonical readers — required transitively " - "by the executor's signal_reader at test time for simulate-via-" - "deciders parity. Lockstep with the alpha-engine executor pin " - "bumped 2026-05-14 for the Stage D' Wire 2 sizing wire + the " - "T2 stratified-Sortino runner; update this test if the pin " - "moves further forward." + assert "@v0.20.0" in text, ( + "alpha-engine-lib should pin to v0.20.0 (Wave-4 predictor/" + "price_cache_slim deletion: load_universe_ohlcv added in v0.19.0, " + "load_macro_series + shared read-core in v0.20.0 — exit_timing's " + "_load_price_cache reads the ArcticDB universe lib via " + "load_universe_ohlcv with a slim->price_cache parquet fallback " + "and a reconcile parity emit. Bumped from v0.16.0 2026-05-19; " + "still carries the v0.16.0 eval_artifacts canonical readers " + "transitively. Update this test if the pin moves further forward." )