Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 72 additions & 17 deletions analysis/exit_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
- Capture ratio: realized return / MFE (are we capturing gains?)
- Stop efficiency: |realized loss| / MAE (are stops placed well?)

Requires daily OHLCV price data during the hold period. Reads from
predictor/price_cache_slim parquets in S3 (no external API calls).
Requires daily OHLCV price data during the hold period. Reads from the
ArcticDB universe library (primary, via alpha_engine_lib), falling back to
the predictor/price_cache_slim then predictor/price_cache parquets in S3
(no external API calls). Wave-4 migration: the slim leg is parity-observed
and removed in PR4.

Data source: trades table in trades.db (roundtrip trades with entry_trade_id).
"""
Expand All @@ -21,6 +24,9 @@

import pandas as pd

from alpha_engine_lib.arcticdb import load_universe_ohlcv
from alpha_engine_lib.reconcile import reconcile_frame_dicts

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -193,24 +199,73 @@ def _load_price_cache(tickers: list[str], bucket: str = "alpha-engine-research")
Silently skips tickers that don't have cache files.
"""
import io
import json
import boto3

# Wave-4 (predictor/price_cache_slim deletion): the ArcticDB universe
# lib is primary for traded tickers (all equities + SPY, which are
# universe members — exit_timing never needs macro/index symbols, so
# no macro-lib read here). The slim -> price_cache(10y) parquet chain
# is the fallback. While slim still exists we dual-read it for the
# parity ParityReport (grep ``WAVE4_PARITY_METRIC exit_timing``) so
# PR4's deletion is data-driven. The slim leg is removed in PR4;
# predictor/price_cache (10y) stays — that is Wave-3's scope.
tickers = list(tickers)
s3 = boto3.client("s3")
cache = {}

arctic: dict[str, pd.DataFrame] = {}
try:
arctic = load_universe_ohlcv(bucket, symbols=tickers)
except Exception as exc: # noqa: BLE001 - fall back to parquet chain
logger.warning(
"ArcticDB universe read for exit_timing failed: %s", exc
)

def _read_parquet(prefix: str, ticker: str):
key = f"{prefix}/{ticker}.parquet"
try:
resp = s3.get_object(Bucket=bucket, Key=key)
df = pd.read_parquet(io.BytesIO(resp["Body"].read()))
if df.empty:
return None
if not isinstance(df.index, pd.DatetimeIndex):
if "Date" in df.columns:
df = df.set_index("Date")
df.index = pd.to_datetime(df.index)
return df
except Exception:
return None

# Parity observation: compare slim vs ArcticDB over the tickers
# ArcticDB returned (set asymmetry expected — some traded tickers may
# only exist in the parquet cache; logged, not fatal).
if arctic:
slim_for_parity = {}
for ticker in arctic:
d = _read_parquet("predictor/price_cache_slim", ticker)
if d is not None:
slim_for_parity[ticker] = d
if slim_for_parity:
report = reconcile_frame_dicts(
slim_for_parity,
{k: arctic[k] for k in slim_for_parity},
value_cols=("Close",),
require_ticker_match=False,
)
logger.info("exit_timing slim<->arctic %s", report.summary())
logger.info(
"WAVE4_PARITY_METRIC exit_timing %s",
json.dumps(report.as_metrics()),
)

cache = dict(arctic)
# Fallback parquet chain for any ticker ArcticDB did not return.
for ticker in tickers:
# Try slim cache first (smaller, 2y), then full cache (10y)
if ticker in cache:
continue
for prefix in ("predictor/price_cache_slim", "predictor/price_cache"):
key = f"{prefix}/{ticker}.parquet"
try:
resp = s3.get_object(Bucket=bucket, Key=key)
df = pd.read_parquet(io.BytesIO(resp["Body"].read()))
if not df.empty:
if not isinstance(df.index, pd.DatetimeIndex):
if "Date" in df.columns:
df = df.set_index("Date")
df.index = pd.to_datetime(df.index)
cache[ticker] = df
break
except Exception:
continue
df = _read_parquet(prefix, ticker)
if df is not None:
cache[ticker] = df
break
return cache
2 changes: 1 addition & 1 deletion lambda_concordance/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ RUN microdnf install -y git && microdnf clean all
# integration. No [arcticdb] / [rag] — concordance reads decision
# artifacts from S3 only.
COPY lambda_concordance/requirements-concordance.txt requirements-concordance.txt
RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.16.0" && \
RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.20.0" && \
pip install --no-cache-dir -r requirements-concordance.txt && \
rm -rf /root/.cache/pip

Expand Down
2 changes: 1 addition & 1 deletion lambda_counterfactual/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ RUN microdnf install -y git && microdnf clean all
# flow-doctor's logging integration. No [arcticdb] / [rag] — counter-
# factual reads decision artifacts from S3 only.
COPY lambda_counterfactual/requirements-counterfactual.txt requirements-counterfactual.txt
RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.16.0" && \
RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.20.0" && \
pip install --no-cache-dir -r requirements-counterfactual.txt && \
rm -rf /root/.cache/pip

Expand Down
2 changes: 1 addition & 1 deletion lambda_health/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ COPY requirements-health.txt .
# why the health Lambda image stayed frozen at its 2026-04-08 build and
# re-raised the already-fixed 2026-05-11 false-positive retrain alert.
RUN microdnf install -y git && microdnf clean all
RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.16.0" && \
RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.20.0" && \
pip install --no-cache-dir -r requirements-health.txt && \
rm -rf /root/.cache/pip

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ cvxpy>=1.4,<1.8
# integration tests); cross-repo lockstep — bumping in sync with
# alpha-engine's lib pin to avoid the silent-downgrade pattern flagged
# in feedback_lib_pin_lockstep_cross_repo_composition.
alpha-engine-lib[arcticdb,flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.16.0
alpha-engine-lib[arcticdb,flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.20.0
111 changes: 111 additions & 0 deletions tests/test_exit_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,114 @@ def broken_connect(_path):
result = compute_exit_timing(str(db))
assert result["status"] == "error"
assert "simulated query failure" in result["error"]


# ── Wave-4: _load_price_cache ArcticDB primary / parquet fallback / parity ────

import io as _io # noqa: E402

from analysis.exit_timing import _load_price_cache # noqa: E402


def _pf(n=8, start=100.0):
idx = pd.date_range("2026-03-01", periods=n, freq="D")
return pd.DataFrame(
{"Open": [start] * n, "High": [start] * n, "Low": [start] * n,
"Close": [float(start + i) for i in range(n)], "Volume": [1] * n},
index=idx,
)


class _FakeS3:
"""get_object serving parquet bytes from a {key: DataFrame} map."""

def __init__(self, store):
self._store = store

def get_object(self, Bucket, Key):
if Key not in self._store:
raise RuntimeError(f"NoSuchKey {Key}")
buf = _io.BytesIO()
self._store[Key].to_parquet(buf)
buf.seek(0)
return {"Body": buf}


def test_load_price_cache_arcticdb_primary_no_parquet_needed(monkeypatch):
monkeypatch.setattr(
"analysis.exit_timing.load_universe_ohlcv",
lambda bucket, symbols: {"AAPL": _pf(), "SPY": _pf(start=500)},
)
# S3 store empty except slim parity copies (identical -> parity PASS)
store = {
"predictor/price_cache_slim/AAPL.parquet": _pf(),
"predictor/price_cache_slim/SPY.parquet": _pf(start=500),
}
monkeypatch.setattr(
"boto3.client", lambda svc: _FakeS3(store)
)

import logging
with _capture_logs("analysis.exit_timing") as recs:
cache = _load_price_cache(["AAPL", "SPY"])

assert set(cache) == {"AAPL", "SPY"}
lines = [r for r in recs if "WAVE4_PARITY_METRIC exit_timing" in r]
assert len(lines) == 1
import json
payload = json.loads(lines[0].split("WAVE4_PARITY_METRIC exit_timing ", 1)[1])
assert payload["passed"] is True
assert payload["max_abs_value_delta"] == 0.0


def test_load_price_cache_falls_back_to_parquet_when_arctic_empty(monkeypatch):
monkeypatch.setattr(
"analysis.exit_timing.load_universe_ohlcv",
lambda bucket, symbols: {},
)
store = {
"predictor/price_cache_slim/AAPL.parquet": _pf(),
"predictor/price_cache/MSFT.parquet": _pf(start=300), # 10y leg
}
monkeypatch.setattr(
"boto3.client", lambda svc: _FakeS3(store)
)
cache = _load_price_cache(["AAPL", "MSFT", "GONE"])
assert set(cache) == {"AAPL", "MSFT"} # slim + price_cache legs; GONE absent


def test_load_price_cache_arctic_failure_is_caught(monkeypatch):
def _boom(bucket, symbols):
raise RuntimeError("ArcticDB down")

monkeypatch.setattr("analysis.exit_timing.load_universe_ohlcv", _boom)
store = {"predictor/price_cache/AAPL.parquet": _pf()}
monkeypatch.setattr(
"boto3.client", lambda svc: _FakeS3(store)
)
cache = _load_price_cache(["AAPL"])
assert set(cache) == {"AAPL"} # graceful fallback, no raise


import contextlib # noqa: E402
import logging # noqa: E402


@contextlib.contextmanager
def _capture_logs(logger_name):
recs: list[str] = []

class _H(logging.Handler):
def emit(self, record):
recs.append(record.getMessage())

lg = logging.getLogger(logger_name)
h = _H()
lg.addHandler(h)
old = lg.level
lg.setLevel(logging.INFO)
try:
yield recs
finally:
lg.removeHandler(h)
lg.setLevel(old)
17 changes: 9 additions & 8 deletions tests/test_flow_doctor_wiring.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,13 @@ def test_requirements_pins_lib_to_stable_tag(self):
# Either tagged version, or unpinned via @main (we explicitly
# forbid @main here — it floats and breaks reproducible builds).
assert "@main" not in text, "alpha-engine-lib must be pinned to a tag, not @main"
assert "@v0.16.0" in text, (
"alpha-engine-lib should pin to v0.16.0 (load_latest_eval_artifact "
"+ list_eval_artifacts canonical readers — required transitively "
"by the executor's signal_reader at test time for simulate-via-"
"deciders parity. Lockstep with the alpha-engine executor pin "
"bumped 2026-05-14 for the Stage D' Wire 2 sizing wire + the "
"T2 stratified-Sortino runner; update this test if the pin "
"moves further forward."
assert "@v0.20.0" in text, (
"alpha-engine-lib should pin to v0.20.0 (Wave-4 predictor/"
"price_cache_slim deletion: load_universe_ohlcv added in v0.19.0, "
"load_macro_series + shared read-core in v0.20.0 — exit_timing's "
"_load_price_cache reads the ArcticDB universe lib via "
"load_universe_ohlcv with a slim->price_cache parquet fallback "
"and a reconcile parity emit. Bumped from v0.16.0 2026-05-19; "
"still carries the v0.16.0 eval_artifacts canonical readers "
"transitively. Update this test if the pin moves further forward."
)
Loading