From 62e28c51c7538d816e2741d8d262569b65a9f163 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Mon, 18 May 2026 17:49:28 -0700 Subject: [PATCH] =?UTF-8?q?feat(regime):=20drawdown=20regime=20leg=20?= =?UTF-8?q?=E2=80=94=20pure=20module=20+=20backfill=20dedup=20+=20additive?= =?UTF-8?q?=20substrate=20hook=20(PR=201,=20producer,=20observe-only)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Leg 3 of the regime ensemble (HMM + BOCPD + deterministic drawdown). Plan: alpha-engine-docs/private/regime-drawdown-hysteresis-260518.md; tracked in alpha-engine-config ROADMAP PR #230 (P1). - regime/drawdown.py: pure-logic module mirroring regime/fast_signal.py. Tiered SPY drawdown (risk_on/caution/risk_off) + book-vs-market excess (risk_on/alpha_bleed), pure-level asymmetric hysteresis (band is the sole debounce; min_persist_days defaults to 1; raising it re-opens F1 calibration per the single-source-of-truth invariant). step() mirrors fast_signal idempotency/cold-start. most_protective + compose_effective_regime is the canonical composition the substrate.py docstring promised but was never implemented. eod_pnl NAV reader with load-bearing graceful-degrade. - scripts/backfill_regime_fast_signal.py: _bear_stretches now delegates to regime.drawdown.bear_stretches — one drawdown reference, not two; the L95/F1-F2 eval reference and the production leg are the SAME code. Semantics-preserving (parity asserted in tests) so F1 calibration stays valid. - regime/substrate.py: optional additive drawdown_block param adds the drawdown + effective_regime payload keys + sidecar effective_regime. None (default) gives zero change for existing consumers (S3-contract-safe, ADD-only). substrate.py stays pure (block assembled by the caller). - tests/test_regime_drawdown.py: 28 cases — bear_stretches legacy parity, tiered hysteresis truth table, excess + NAV graceful-degrade, step idempotency/cold-start, ser/de, composition truth table, eod_pnl degrade matrix, additive-substrate-hook regression. Observe-only: NO consumer reads the new keys; daily-stage wiring + eod_pnl S3 fetch are deferred to the consumer PRs (pure I/O glue, keeps this a clean reviewable producer). 287 regime-suite tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- regime/drawdown.py | 490 +++++++++++++++++++++++++ regime/substrate.py | 29 ++ scripts/backfill_regime_fast_signal.py | 23 +- tests/test_regime_drawdown.py | 356 ++++++++++++++++++ 4 files changed, 885 insertions(+), 13 deletions(-) create mode 100644 regime/drawdown.py create mode 100644 tests/test_regime_drawdown.py diff --git a/regime/drawdown.py b/regime/drawdown.py new file mode 100644 index 0000000..06f7ca1 --- /dev/null +++ b/regime/drawdown.py @@ -0,0 +1,490 @@ +""" +regime/drawdown.py — deterministic drawdown regime leg (3rd ensemble leg). + +Plan: ``regime-drawdown-hysteresis-260518.md``. Pure-logic core (no S3, +no boto3 for the classifier/composition) mirroring ``regime/fast_signal.py`` +so it is testable in isolation; the inference stage / handler wire any +S3 I/O. + +The regime ensemble +------------------- +Institutional regime overlay = an ensemble with conservative +aggregation, not a single model: + +1. **Statistical** — the weekly HMM (``regime/substrate.py``). +2. **Changepoint** — the daily BOCPD ``forced_bear`` (``regime/fast_signal.py``). +3. **Deterministic rule** — drawdown de-risking (THIS module): the + missing, lowest-estimation-risk leg. + +This module owns leg 3 + the **most-protective composition** helper +(promised in ``substrate.py``'s module docstring — "executor/veto take +the max-protection of {continuous path, bear floor}" — but never +implemented until now). + +Two sub-signals, **pure-level asymmetric hysteresis** +----------------------------------------------------- +* **SPY market drawdown** — tiered ``risk_on`` / ``caution`` / ``risk_off``. +* **book-vs-market excess drawdown** — ``risk_on`` / ``alpha_bleed`` + (book bleeding materially worse than SPY ⇒ alpha-negative by the + literal objective ``alpha = portfolio - SPY``). + +Debounce design (settled 2026-05-18, plan §3): the asymmetric +enter/exit **band is the sole debounce** — pure-level, NOT Stage-F's +persistence-day confirmation. Rationale: (1) the F1 backfill ran this +exact pure-level −10/−5 machine on the realized SPY series and it +passed whipsaw decisively; (2) the loss function is asymmetric +(drawdown ≫ opportunity cost) so persistence-days *delay protection*, +the wrong direction; (3) **single-source-of-truth invariant** — the +extracted ``bear_stretches`` here is *both* the production leg and the +L95 / F1-F2 eval reference, so pure-level keeps +``production == reference == one validated implementation``. +``min_persist_days`` is exposed but **defaults to 1 (≡ pure-level)**; +raising it above 1 re-opens the F1 calibration AND must be applied +identically on the eval-reference path or the measurement decouples +from production. + +Sign convention +--------------- +Drawdown ``dd = price / running_peak - 1`` is ``<= 0``. Thresholds are +expressed as positive depth magnitudes (``0.10`` = 10% below peak); the +classifier compares against ``magnitude = -dd``. +""" +from __future__ import annotations + +import logging +from dataclasses import asdict, dataclass +from io import BytesIO +from typing import Any + +import pandas as pd + +logger = logging.getLogger(__name__) + +# Bump on any breaking change to the persisted-state or artifact schema. +DRAWDOWN_SCHEMA_VERSION: int = 1 + + +# ── Tunables ───────────────────────────────────────────────────────────── + +@dataclass(frozen=True) +class DrawdownTunables: + """Hysteresis bands (positive depth magnitudes, fraction of peak). + + Defaults are the institutional convention; the −10/−5 SPY ``risk_off`` + pair is the F1-backfill-validated reference (predictor #167) — do not + change without re-running that calibration. + """ + + # SPY market-drawdown tiers + spy_caution_enter: float = 0.05 # ≥5% below peak ⇒ caution + spy_caution_exit: float = 0.03 # recover to within 3% ⇒ leave caution + spy_risk_off_enter: float = 0.10 # ≥10% below peak ⇒ risk_off + spy_risk_off_exit: float = 0.07 # recover to within 7% ⇒ risk_off→caution + + # Book-vs-market excess drawdown (portfolio_dd − spy_dd, depth magnitude) + excess_enter: float = 0.05 # book ≥5pp deeper than SPY ⇒ alpha_bleed + excess_exit: float = 0.02 # gap back within 2pp ⇒ clear + + # Pure-level by default. >1 re-opens F1 calibration and MUST match the + # eval-reference path (single-source-of-truth invariant, plan §3). + min_persist_days: int = 1 + + def to_dict(self) -> dict[str, float]: + return asdict(self) + + +# ── Canonical regime vocabulary + most-protective composition ──────────── + +# Protection ordering. Higher = more protective (de-risked). +_PROTECTION_ORDER: dict[str, int] = { + "bull": 0, + "neutral": 1, + "caution": 2, + "bear": 3, +} + + +def most_protective(*labels: str | None) -> str: + """Conservative aggregation: the most-protective non-None label. + + The canonical helper the ``substrate.py`` docstring promised. Capital + protection needs no consensus — any leg can escalate. ``None`` legs + (a leg that does not escalate, e.g. drawdown ``risk_on``) are ignored; + all-None ⇒ ``"neutral"`` (no information, no escalation). + """ + present = [str(l) for l in labels if l] + if not present: + return "neutral" + return max(present, key=lambda l: _PROTECTION_ORDER.get(l, 1)) + + +def spy_tier_to_regime(tier: str) -> str | None: + """Map an SPY drawdown tier onto the regime vocabulary (None = no + escalation contribution).""" + return {"risk_on": None, "caution": "caution", "risk_off": "bear"}.get(tier) + + +def excess_tier_to_regime(tier: str) -> str | None: + """Map the book-vs-market excess tier onto the regime vocabulary.""" + return {"risk_on": None, "alpha_bleed": "bear"}.get(tier) + + +def compose_effective_regime( + *, + hmm_argmax: str | None = None, + spy_tier: str | None = None, + excess_tier: str | None = None, + forced_bear: bool = False, +) -> dict[str, Any]: + """Compose the effective regime = most-protective over the ensemble. + + Returns ``{"effective_regime": str, "drivers": {leg: contribution}}``. + The raw legs are preserved so the composed value is always + explainable in audit. ``hmm_argmax`` is bull/neutral/bear (already + the regime vocabulary); the drawdown tiers map via the helpers; a + Stage-F ``forced_bear`` contributes ``"bear"``. + """ + drivers: dict[str, str | None] = { + "hmm": hmm_argmax, + "drawdown_spy": spy_tier_to_regime(spy_tier) if spy_tier else None, + "drawdown_excess": ( + excess_tier_to_regime(excess_tier) if excess_tier else None + ), + "forced_bear": "bear" if forced_bear else None, + } + effective = most_protective(*drivers.values()) + return {"effective_regime": effective, "drivers": drivers} + + +# ── Vectorized bear stretches (THE single source of truth) ─────────────── + +def bear_stretches( + close: pd.Series, + *, + enter: float = 0.10, + exit: float = 0.05, +) -> list[tuple[Any, Any]]: + """SPY peak-to-trough drawdown bear stretches — pure-level hysteresis. + + Extracted verbatim (semantics-preserving) from the script-local + ``scripts/backfill_regime_fast_signal.py::_bear_stretches`` so the + production leg and the L95 / F1-F2 eval reference are the *same* + code. Onset = close ≥ ``enter`` below the trailing all-time high; + the stretch ends when it recovers to within ``exit`` of the running + peak. The ``enter``/``exit`` gap is the sole debounce. + + Returns a list of ``(start_ts, end_ts)`` tuples; an unterminated + final stretch is closed at the last index. + """ + s = close.dropna() + if s.empty: + return [] + peak = s.cummax() + dd = s / peak - 1.0 # ≤ 0 + stretches: list[tuple[Any, Any]] = [] + in_bear = False + start: Any | None = None + for ts, d in dd.items(): + if not in_bear and d <= -enter: + in_bear, start = True, ts + elif in_bear and d >= -exit: + stretches.append((start, ts)) + in_bear = False + if in_bear and start is not None: + stretches.append((start, dd.index[-1])) + return stretches + + +# ── Online tiered hysteresis ───────────────────────────────────────────── + +def _advance_spy_tier(prev: str, magnitude: float, t: DrawdownTunables) -> str: + """One-step SPY tier transition. ``magnitude`` = -dd ≥ 0 (depth). + + Ladder with asymmetric bands: risk_on ⇄ caution ⇄ risk_off. A jump + straight from risk_on to risk_off is allowed on entry; de-escalation + is one rung at a time (risk_off→caution→risk_on) so the exit bands + are honoured. + """ + if prev == "risk_off": + if magnitude <= t.spy_risk_off_exit: + # leave risk_off; may land in caution or, if fully recovered, + # risk_on (still gated by the caution exit band). + return "risk_on" if magnitude <= t.spy_caution_exit else "caution" + return "risk_off" + if prev == "caution": + if magnitude >= t.spy_risk_off_enter: + return "risk_off" + if magnitude <= t.spy_caution_exit: + return "risk_on" + return "caution" + # prev == "risk_on" + if magnitude >= t.spy_risk_off_enter: + return "risk_off" + if magnitude >= t.spy_caution_enter: + return "caution" + return "risk_on" + + +def _advance_excess_tier(prev: str, magnitude: float, t: DrawdownTunables) -> str: + """One-step excess (book-vs-market) tier transition. ``magnitude`` = + max(spy_dd − portfolio_dd, 0) depth: how much deeper the book is.""" + if prev == "alpha_bleed": + return "risk_on" if magnitude <= t.excess_exit else "alpha_bleed" + return "alpha_bleed" if magnitude >= t.excess_enter else "risk_on" + + +@dataclass +class DrawdownState: + """Persisted online state. Serialized to ``regime/drawdown_state.json``.""" + + spy_tier: str = "risk_on" + excess_tier: str = "risk_on" + spy_peak: float | None = None + nav_peak: float | None = None + # Pending-transition bookkeeping for min_persist_days > 1 (no-op at 1). + pending_spy_tier: str | None = None + pending_spy_days: int = 0 + pending_excess_tier: str | None = None + pending_excess_days: int = 0 + last_update_trading_day: str | None = None + observations_seen: int = 0 + schema_version: int = DRAWDOWN_SCHEMA_VERSION + + +def dump_state(st: DrawdownState) -> dict[str, Any]: + d = asdict(st) + # Floats may be numpy — coerce for clean JSON. + for k in ("spy_peak", "nav_peak"): + d[k] = None if d[k] is None else float(d[k]) + return d + + +def load_state(d: dict[str, Any]) -> DrawdownState: + """Rehydrate persisted state. Raises ``ValueError`` on schema + mismatch — the caller treats that as a cold start (per + ``feedback_no_silent_fails``: never silently fabricate a settled + 'no drawdown').""" + ver = d.get("schema_version") + if ver != DRAWDOWN_SCHEMA_VERSION: + raise ValueError( + f"drawdown state schema {ver} != {DRAWDOWN_SCHEMA_VERSION}" + ) + return DrawdownState( + spy_tier=d.get("spy_tier", "risk_on"), + excess_tier=d.get("excess_tier", "risk_on"), + spy_peak=d.get("spy_peak"), + nav_peak=d.get("nav_peak"), + pending_spy_tier=d.get("pending_spy_tier"), + pending_spy_days=int(d.get("pending_spy_days", 0)), + pending_excess_tier=d.get("pending_excess_tier"), + pending_excess_days=int(d.get("pending_excess_days", 0)), + last_update_trading_day=d.get("last_update_trading_day"), + observations_seen=int(d.get("observations_seen", 0)), + schema_version=ver, + ) + + +def _persisted_transition( + current: str, + proposed: str, + pending_tier: str | None, + pending_days: int, + min_persist_days: int, +) -> tuple[str, str | None, int]: + """Apply the optional persistence gate. Default min_persist_days=1 ⇒ + immediate (pure-level). Returns (new_tier, new_pending, new_days).""" + if proposed == current: + return current, None, 0 + if min_persist_days <= 1: + return proposed, None, 0 + if pending_tier == proposed: + pending_days += 1 + else: + pending_tier, pending_days = proposed, 1 + if pending_days >= min_persist_days: + return proposed, None, 0 + return current, pending_tier, pending_days + + +def step( + prev: DrawdownState | None, + *, + spy_close: float, + trading_day: str, + calendar_date: str, + run_id: str, + nav: float | None = None, + tunables: DrawdownTunables | None = None, +) -> tuple[DrawdownState, dict[str, Any]]: + """Advance the drawdown leg by one observation (weekly or daily). + + Mirrors ``fast_signal.step`` contract: returns ``(new_state, + artifact)``; idempotent on ``trading_day`` (re-invocation re-emits + with ``observed=False`` and no peak/tier advance). ``prev=None`` ⇒ + cold start (peaks seed from the first observation). ``nav=None`` ⇒ + the excess leg is unavailable (paper NAV short/gappy / not wired): + the SPY leg still acts; ``excess_tier`` holds ``risk_on`` and + ``excess_available=False``. + """ + t = tunables or DrawdownTunables() + cold_start = prev is None + + if prev is not None and prev.last_update_trading_day == trading_day: + return prev, _artifact( + prev, t, trading_day=trading_day, calendar_date=calendar_date, + run_id=run_id, spy_close=spy_close, nav=nav, + excess_available=nav is not None and prev.nav_peak is not None, + observed=False, cold_start=False, + ) + + st = prev if prev is not None else DrawdownState() + + spy_peak = spy_close if st.spy_peak is None else max(st.spy_peak, spy_close) + spy_dd = spy_close / spy_peak - 1.0 if spy_peak else 0.0 + spy_mag = -spy_dd + + proposed_spy = _advance_spy_tier(st.spy_tier, spy_mag, t) + new_spy_tier, pend_spy, pend_spy_days = _persisted_transition( + st.spy_tier, proposed_spy, st.pending_spy_tier, + st.pending_spy_days, t.min_persist_days, + ) + + # ── Excess (book-vs-market) leg — graceful-degrade when NAV absent ── + excess_available = nav is not None + if excess_available: + nav_peak = nav if st.nav_peak is None else max(st.nav_peak, nav) + nav_dd = nav / nav_peak - 1.0 if nav_peak else 0.0 + # How much DEEPER the book is than the market (positive = worse). + excess_mag = max(spy_dd - nav_dd, 0.0) + proposed_excess = _advance_excess_tier(st.excess_tier, excess_mag, t) + new_excess_tier, pend_ex, pend_ex_days = _persisted_transition( + st.excess_tier, proposed_excess, st.pending_excess_tier, + st.pending_excess_days, t.min_persist_days, + ) + else: + nav_peak = st.nav_peak + nav_dd = None + excess_mag = None + new_excess_tier, pend_ex, pend_ex_days = "risk_on", None, 0 + + new_state = DrawdownState( + spy_tier=new_spy_tier, + excess_tier=new_excess_tier, + spy_peak=spy_peak, + nav_peak=nav_peak, + pending_spy_tier=pend_spy, + pending_spy_days=pend_spy_days, + pending_excess_tier=pend_ex, + pending_excess_days=pend_ex_days, + last_update_trading_day=trading_day, + observations_seen=st.observations_seen + 1, + ) + + art = _artifact( + new_state, t, trading_day=trading_day, calendar_date=calendar_date, + run_id=run_id, spy_close=spy_close, nav=nav, + excess_available=excess_available, + observed=True, cold_start=cold_start, + spy_dd=spy_dd, nav_dd=nav_dd, excess_mag=excess_mag, + ) + return new_state, art + + +def _artifact( + st: DrawdownState, + t: DrawdownTunables, + *, + trading_day: str, + calendar_date: str, + run_id: str, + spy_close: float, + nav: float | None, + excess_available: bool, + observed: bool, + cold_start: bool, + spy_dd: float | None = None, + nav_dd: float | None = None, + excess_mag: float | None = None, +) -> dict[str, Any]: + if spy_dd is None and st.spy_peak: + spy_dd = spy_close / st.spy_peak - 1.0 + return { + "trading_day": trading_day, + "calendar_date": calendar_date, + "run_id": run_id, + "schema_version": DRAWDOWN_SCHEMA_VERSION, + "spy": { + "tier": st.spy_tier, + "drawdown": None if spy_dd is None else float(spy_dd), + "peak": None if st.spy_peak is None else float(st.spy_peak), + "regime_contribution": spy_tier_to_regime(st.spy_tier), + }, + "excess": { + "available": bool(excess_available), + "tier": st.excess_tier, + "nav_drawdown": None if nav_dd is None else float(nav_dd), + "excess_depth": None if excess_mag is None else float(excess_mag), + "regime_contribution": ( + excess_tier_to_regime(st.excess_tier) + if excess_available else None + ), + }, + "observations_seen": int(st.observations_seen), + "observed": bool(observed), + "cold_start": bool(cold_start), + "tunables": t.to_dict(), + } + + +# ── eod_pnl NAV reader (graceful-degrade) ──────────────────────────────── + +def read_eod_pnl_nav( + s3_client: Any, + *, + bucket: str, + key: str = "trades/eod_pnl.csv", +) -> "pd.Series | None": + """Best-effort read of the executor-produced ``trades/eod_pnl.csv`` + NAV series (paper NAV). Graceful-degrade is load-bearing: any + failure (missing / empty / no NAV column / < 2 rows) returns + ``None`` so the excess leg goes unavailable while the SPY leg still + acts — never raises, never blocks the regime path. + + The NAV column is matched case-insensitively (any column whose name + contains 'nav'); a date-like column, if present, becomes the index. + """ + try: + obj = s3_client.get_object(Bucket=bucket, Key=key) + df = pd.read_csv(BytesIO(obj["Body"].read())) + except Exception as exc: # noqa: BLE001 — best-effort, never block + logger.warning( + "drawdown: eod_pnl read failed at s3://%s/%s (%s) — excess " + "leg unavailable; SPY leg unaffected.", bucket, key, + type(exc).__name__, + ) + return None + if df is None or df.empty: + logger.warning("drawdown: eod_pnl empty — excess leg unavailable.") + return None + nav_cols = [c for c in df.columns if "nav" in str(c).lower()] + if not nav_cols: + logger.warning( + "drawdown: eod_pnl has no NAV-like column (cols=%s) — excess " + "leg unavailable.", list(df.columns), + ) + return None + series = pd.to_numeric(df[nav_cols[0]], errors="coerce").dropna() + date_cols = [c for c in df.columns if "date" in str(c).lower()] + if date_cols: + try: + idx = pd.to_datetime(df.loc[series.index, date_cols[0]]) + series.index = idx + except Exception: # noqa: BLE001 — index is a nicety, not required + pass + if len(series) < 2: + logger.warning( + "drawdown: eod_pnl NAV series too short (%d rows) — excess " + "leg unavailable.", len(series), + ) + return None + return series diff --git a/regime/substrate.py b/regime/substrate.py index 83d78ef..802d6f7 100644 --- a/regime/substrate.py +++ b/regime/substrate.py @@ -139,6 +139,7 @@ def build_regime_substrate( fit_window_start: str | None = None, fit_window_end: str | None = None, hmm_weights_s3_key: str | None = None, + drawdown_block: Mapping[str, Any] | None = None, ) -> dict[str, Any]: """Assemble the regime substrate payload. @@ -279,6 +280,27 @@ def build_regime_substrate( "written_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), }, } + + # ─ Drawdown regime leg (additive, optional — S3-contract-safe) ──── + # The block is assembled by the caller (handler/stage) which has S3 + # access to the SPY price cache + eod_pnl; substrate.py stays pure. + # When absent (None), the key is omitted entirely → zero change for + # existing consumers / the macro brief's HMM-only path. + if drawdown_block is not None: + from .drawdown import compose_effective_regime + + payload["drawdown"] = dict(drawdown_block) + spy_leg = drawdown_block.get("spy") or {} + excess_leg = drawdown_block.get("excess") or {} + payload["effective_regime"] = compose_effective_regime( + hmm_argmax=hmm_argmax, + spy_tier=spy_leg.get("tier"), + excess_tier=( + excess_leg.get("tier") + if excess_leg.get("available") else None + ), + ) + return payload @@ -353,6 +375,13 @@ def write_regime_substrate( "regime_change_signal": payload["bocpd"]["change_signal"], "written_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), } + # Additive: surface the composed effective regime when the + # drawdown leg is present (consumers reading latest.json get the + # composed value without resolving the full artifact). + if "effective_regime" in payload: + sidecar["effective_regime"] = payload["effective_regime"][ + "effective_regime" + ] s3_client.put_object( Bucket=bucket, Key=latest_key, diff --git a/scripts/backfill_regime_fast_signal.py b/scripts/backfill_regime_fast_signal.py index 0270413..963ba77 100644 --- a/scripts/backfill_regime_fast_signal.py +++ b/scripts/backfill_regime_fast_signal.py @@ -201,25 +201,22 @@ def _bear_stretches(s3, bucket: str) -> list[tuple[pd.Timestamp, pd.Timestamp]] _read_parquet_close, ) + from regime.drawdown import bear_stretches as _shared_bear_stretches + spy = _read_parquet_close( "SPY", s3_client=s3, bucket=bucket, prefix=DEFAULT_PRICE_CACHE_PREFIX, ).dropna() if spy.empty: log.warning("SPY parquet empty — drawdown reference unavailable.") return None - peak = spy.cummax() - dd = spy / peak - 1.0 # ≤ 0 - stretches: list[tuple[pd.Timestamp, pd.Timestamp]] = [] - in_bear = False - start: pd.Timestamp | None = None - for ts, d in dd.items(): - if not in_bear and d <= -BEAR_DD_ENTER: - in_bear, start = True, ts - elif in_bear and d >= -BEAR_DD_EXIT: - stretches.append((start, ts)) - in_bear = False - if in_bear and start is not None: - stretches.append((start, dd.index[-1])) + # Single source of truth: the production drawdown leg and this + # eval reference are now the SAME code (regime.drawdown). Keeping + # the BEAR_DD_ENTER/EXIT constants here as the explicit reference + # thresholds; raising them re-opens the F1 calibration (plan §3 + # single-source-of-truth invariant). + stretches = _shared_bear_stretches( + spy, enter=BEAR_DD_ENTER, exit=BEAR_DD_EXIT, + ) log.info( "drawdown bear reference: %d stretches (enter -%.0f%%, exit -%.0f%%)", len(stretches), BEAR_DD_ENTER * 100, BEAR_DD_EXIT * 100, diff --git a/tests/test_regime_drawdown.py b/tests/test_regime_drawdown.py new file mode 100644 index 0000000..becd409 --- /dev/null +++ b/tests/test_regime_drawdown.py @@ -0,0 +1,356 @@ +"""Tests for regime/drawdown.py — the deterministic drawdown regime leg. + +Covers: +- ``bear_stretches`` parity vs the pre-extraction inline algorithm + (the single-source-of-truth invariant: production leg == eval ref) +- tiered SPY hysteresis truth table (enter/exit asymmetry, ladder + de-escalation one rung at a time, pure-level immediacy) +- excess (book-vs-market) leg + NAV graceful-degrade +- step() idempotency + cold start +- state (de)serialization round-trip + schema-mismatch → ValueError +- most_protective + compose_effective_regime truth table +- read_eod_pnl_nav graceful-degrade matrix +- additive substrate hook (None ⇒ zero change; present ⇒ block + sidecar) +""" +from __future__ import annotations + +import io +import json + +import numpy as np +import pandas as pd +import pytest + +from regime.drawdown import ( + DRAWDOWN_SCHEMA_VERSION, + DrawdownState, + DrawdownTunables, + bear_stretches, + compose_effective_regime, + dump_state, + load_state, + most_protective, + read_eod_pnl_nav, + step, +) + + +# ── bear_stretches parity (single source of truth) ─────────────────────── + +def _legacy_bear_stretches(spy: pd.Series, enter: float, exit: float): + """The exact pre-extraction inline algorithm from + scripts/backfill_regime_fast_signal.py::_bear_stretches. The shared + implementation must reproduce this byte-for-byte or the F1 + calibration silently decouples from production (plan §3).""" + peak = spy.cummax() + dd = spy / peak - 1.0 + stretches = [] + in_bear = False + start = None + for ts, d in dd.items(): + if not in_bear and d <= -enter: + in_bear, start = True, ts + elif in_bear and d >= -exit: + stretches.append((start, ts)) + in_bear = False + if in_bear and start is not None: + stretches.append((start, dd.index[-1])) + return stretches + + +@pytest.mark.parametrize("seed", [0, 1, 2, 7, 42]) +def test_bear_stretches_matches_legacy(seed: int) -> None: + rng = np.random.default_rng(seed) + idx = pd.date_range("2015-01-01", periods=900, freq="W") + spy = pd.Series(100 * np.exp(np.cumsum(rng.normal(0.001, 0.03, 900))), index=idx) + assert bear_stretches(spy, enter=0.10, exit=0.05) == _legacy_bear_stretches( + spy, 0.10, 0.05 + ) + + +def test_bear_stretches_unterminated_closes_at_last() -> None: + idx = pd.date_range("2020-01-01", periods=5, freq="D") + spy = pd.Series([100, 100, 88, 87, 86], index=idx) # never recovers + out = bear_stretches(spy, enter=0.10, exit=0.05) + assert len(out) == 1 and out[0][1] == idx[-1] + + +def test_bear_stretches_empty_series() -> None: + assert bear_stretches(pd.Series(dtype=float)) == [] + + +# ── tiered SPY hysteresis ──────────────────────────────────────────────── + +def _run_spy(prices, tun=None): + st = None + for i, p in enumerate(prices): + st, art = step( + st, spy_close=float(p), trading_day=f"d{i}", + calendar_date=f"c{i}", run_id="r", tunables=tun, + ) + return st, art + + +def test_spy_tier_enter_thresholds_pure_level() -> None: + # Peak 100. -5% → caution, -10% → risk_off, default pure-level + # (min_persist_days=1) so transitions are immediate. + st, art = _run_spy([100, 97]) # -3% → still risk_on + assert st.spy_tier == "risk_on" + st, art = _run_spy([100, 94]) # -6% → caution + assert st.spy_tier == "caution" + st, art = _run_spy([100, 89]) # -11% → risk_off + assert st.spy_tier == "risk_off" + assert art["spy"]["regime_contribution"] == "bear" + + +def test_spy_asymmetric_exit_and_one_rung_deescalation() -> None: + # Enter risk_off at -12%, then partial recovery must NOT jump + # straight to risk_on — exit bands gate it one rung at a time. + st, _ = _run_spy([100, 88]) # risk_off + assert st.spy_tier == "risk_off" + # recover to -8% (above risk_off_exit -7%? no, 8% deep > 7%) → stay + st, _ = _run_spy([100, 88, 92]) # -8% depth → still risk_off + assert st.spy_tier == "risk_off" + # recover to -6% (≤7% risk_off_exit, but >3% caution_exit) → caution + st, _ = _run_spy([100, 88, 94]) + assert st.spy_tier == "caution" + # full recovery to -2% (≤3% caution_exit) from caution → risk_on + st, _ = _run_spy([100, 88, 94, 98]) + assert st.spy_tier == "risk_on" + + +def test_spy_hysteresis_band_holds_between_thresholds() -> None: + # caution entered at -6%; a drift to -4% stays caution (between + # caution_exit -3% and caution_enter -5%) — the band is the debounce. + st, _ = _run_spy([100, 94, 96]) + assert st.spy_tier == "caution" + + +def test_min_persist_days_gt_1_delays_then_latches() -> None: + tun = DrawdownTunables(min_persist_days=3) + # One -11% day is not enough to latch risk_off when persistence=3. + st, _ = _run_spy([100, 89], tun) + assert st.spy_tier == "risk_on" + assert st.pending_spy_tier == "risk_off" and st.pending_spy_days == 1 + # Three consecutive risk_off-proposing days → latch. + st, _ = _run_spy([100, 89, 89, 89], tun) + assert st.spy_tier == "risk_off" + + +# ── excess (book-vs-market) leg + NAV graceful-degrade ─────────────────── + +def test_excess_unavailable_when_nav_none() -> None: + st, art = step( + None, spy_close=100.0, trading_day="d0", calendar_date="c0", + run_id="r", nav=None, + ) + assert art["excess"]["available"] is False + assert art["excess"]["regime_contribution"] is None + assert st.excess_tier == "risk_on" + + +def test_excess_alpha_bleed_when_book_deeper_than_market() -> None: + # SPY flat at peak (dd 0); NAV draws down 6% → excess depth 6% ≥ 5%. + st = None + st, _ = step(None, spy_close=100.0, trading_day="d0", + calendar_date="c0", run_id="r", nav=100.0) + st, art = step(st, spy_close=100.0, trading_day="d1", + calendar_date="c1", run_id="r", nav=94.0) + assert st.excess_tier == "alpha_bleed" + assert art["excess"]["available"] is True + assert art["excess"]["regime_contribution"] == "bear" + + +def test_excess_not_triggered_when_book_tracks_market() -> None: + st = None + st, _ = step(None, spy_close=100.0, trading_day="d0", + calendar_date="c0", run_id="r", nav=100.0) + # Both down ~6% together → excess gap ~0, no alpha_bleed. + st, _ = step(st, spy_close=94.0, trading_day="d1", + calendar_date="c1", run_id="r", nav=94.0) + assert st.excess_tier == "risk_on" + + +# ── step idempotency + cold start ──────────────────────────────────────── + +def test_step_idempotent_on_same_trading_day() -> None: + st, _ = step(None, spy_close=100.0, trading_day="d0", + calendar_date="c0", run_id="r") + obs_before = st.observations_seen + st2, art = step(st, spy_close=50.0, trading_day="d0", + calendar_date="c0", run_id="r") + assert st2 is st # unchanged + assert art["observed"] is False + assert st2.observations_seen == obs_before + + +def test_cold_start_seeds_peak_and_flags() -> None: + st, art = step(None, spy_close=100.0, trading_day="d0", + calendar_date="c0", run_id="r") + assert art["cold_start"] is True + assert st.spy_peak == 100.0 + assert st.observations_seen == 1 + + +# ── state (de)serialization ────────────────────────────────────────────── + +def test_dump_load_roundtrip() -> None: + st = DrawdownState(spy_tier="caution", spy_peak=123.4, + observations_seen=9, last_update_trading_day="d9") + rt = load_state(json.loads(json.dumps(dump_state(st)))) + assert rt == st + + +def test_load_state_schema_mismatch_raises() -> None: + with pytest.raises(ValueError): + load_state({"schema_version": DRAWDOWN_SCHEMA_VERSION + 99}) + + +# ── composition truth table ────────────────────────────────────────────── + +def test_most_protective_ordering_and_none_handling() -> None: + assert most_protective(None, None) == "neutral" + assert most_protective("bull", None, "caution") == "caution" + assert most_protective("neutral", "bear", "caution") == "bear" + assert most_protective("bull", "neutral") == "neutral" + + +def test_compose_effective_regime_drivers_preserved() -> None: + out = compose_effective_regime( + hmm_argmax="neutral", spy_tier="risk_off", + excess_tier="risk_on", forced_bear=False, + ) + assert out["effective_regime"] == "bear" # drawdown_spy escalates + assert out["drivers"]["hmm"] == "neutral" + assert out["drivers"]["drawdown_spy"] == "bear" + assert out["drivers"]["drawdown_excess"] is None + # forced_bear alone also escalates + out2 = compose_effective_regime(hmm_argmax="bull", forced_bear=True) + assert out2["effective_regime"] == "bear" + + +# ── read_eod_pnl_nav graceful-degrade matrix ───────────────────────────── + +class _S3: + def __init__(self, body: bytes | None = None, raise_exc: bool = False): + self._body, self._raise = body, raise_exc + + def get_object(self, *, Bucket, Key): + if self._raise or self._body is None: + raise RuntimeError("NoSuchKey") + return {"Body": io.BytesIO(self._body)} + + +def test_eod_pnl_happy_path() -> None: + csv = b"date,nav,alpha\n2026-05-01,100\n2026-05-02,101\n2026-05-03,99\n" + s = read_eod_pnl_nav(_S3(csv), bucket="b") + assert s is not None and len(s) == 3 and float(s.iloc[1]) == 101.0 + + +def test_eod_pnl_missing_key_returns_none() -> None: + assert read_eod_pnl_nav(_S3(raise_exc=True), bucket="b") is None + + +def test_eod_pnl_no_nav_column_returns_none() -> None: + assert read_eod_pnl_nav(_S3(b"date,alpha\n2026-05-01,0.1\n"), bucket="b") is None + + +def test_eod_pnl_too_short_returns_none() -> None: + assert read_eod_pnl_nav(_S3(b"date,nav\n2026-05-01,100\n"), bucket="b") is None + + +def test_eod_pnl_empty_returns_none() -> None: + assert read_eod_pnl_nav(_S3(b""), bucket="b") is None + + +# ── additive substrate hook ────────────────────────────────────────────── + +pytest.importorskip("hmmlearn") +pytest.importorskip("scipy") + + +def _hist(n: int = 240, seed: int = 0) -> pd.DataFrame: + rng = np.random.default_rng(seed) + parts = [] + for i, (r, v, h) in enumerate([(0.02, 14.0, 320.0), (-0.02, 28.0, 600.0)]): + parts.append(pd.DataFrame({ + "spy_20d_return": rng.normal(r, 0.03, n // 2), + "vix_level": rng.normal(v, 4.0, n // 2), + "vix_term_slope": rng.normal(0.8 if i == 0 else -0.3, 0.4, n // 2), + "hy_oas_bps": rng.normal(h, 80.0, n // 2), + "yield_curve_slope": rng.normal(0.4 if i == 0 else -0.2, 0.3, n // 2), + "market_breadth": rng.normal(0.7 if i == 0 else 0.35, 0.1, + n // 2).clip(0, 1), + })) + return pd.concat(parts, ignore_index=True) + + +def _fitted_hmm(history: pd.DataFrame): + from regime.hmm import HMMRegimeClassifier + + cols = list(history.columns) + return HMMRegimeClassifier(n_states=3, random_state=0).fit(history, cols) + + +def test_substrate_no_drawdown_block_is_zero_change() -> None: + from regime.substrate import build_regime_substrate + + hist = _hist() + payload = build_regime_substrate( + feature_history=hist, current_features=hist.iloc[-1].to_dict(), + hmm=_fitted_hmm(hist), run_id="2605181200", + calendar_date="2026-05-18", trading_day="2026-05-15", + ) + assert "drawdown" not in payload + assert "effective_regime" not in payload + + +def test_substrate_with_drawdown_block_adds_keys_and_composes() -> None: + from regime.substrate import build_regime_substrate + + hist = _hist() + block = { + "spy": {"tier": "risk_off", "regime_contribution": "bear"}, + "excess": {"available": True, "tier": "risk_on"}, + } + payload = build_regime_substrate( + feature_history=hist, current_features=hist.iloc[-1].to_dict(), + hmm=_fitted_hmm(hist), run_id="2605181200", + calendar_date="2026-05-18", trading_day="2026-05-15", + drawdown_block=block, + ) + assert payload["drawdown"] == block + # SPY risk_off → bear, regardless of the HMM argmax (most-protective). + assert payload["effective_regime"]["effective_regime"] == "bear" + + +def test_substrate_sidecar_carries_effective_regime() -> None: + from regime.substrate import build_regime_substrate, write_regime_substrate + + class _Mem: + def __init__(self) -> None: + self.objs: dict[str, bytes] = {} + + def put_object(self, *, Bucket, Key, Body, ContentType=None): + self.objs[Key] = Body if isinstance(Body, bytes) else Body.encode() + return {} + + hist = _hist() + payload = build_regime_substrate( + feature_history=hist, current_features=hist.iloc[-1].to_dict(), + hmm=_fitted_hmm(hist), run_id="2605181200", + calendar_date="2026-05-18", trading_day="2026-05-15", + drawdown_block={"spy": {"tier": "caution"}, + "excess": {"available": False, "tier": "risk_on"}}, + ) + s3 = _Mem() + res = write_regime_substrate(payload, s3_client=s3, bucket="b", prefix="regime") + sidecar = json.loads(s3.objs[res["latest_key"]]) + # The sidecar must mirror the composed payload value exactly (the + # composition itself — most-protective over the HMM leg too — is + # covered by the composition truth-table tests). + assert ( + sidecar["effective_regime"] + == payload["effective_regime"]["effective_regime"] + ) + assert sidecar["effective_regime"] in {"bull", "neutral", "caution", "bear"}