Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions regime/drawdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,3 +547,73 @@ def seed_state(
nav_peak=nav_peak,
observations_seen=0,
)


# ── Forensic batch view (weekly substrate block) ─────────────────────────

def block_from_history(
spy_close: "pd.Series",
*,
nav_history: "pd.Series | None" = None,
tunables: DrawdownTunables | None = None,
) -> dict[str, Any] | None:
"""Hysteresis-correct *current* drawdown state, replayed from price
history — the forensic weekly-substrate view.

The weekly substrate block must be reproducible from the price cache
alone (same philosophy as the HMM refitting from history each week),
so this **replays the full SPY series through ``step()``** to get
the hysteresis-correct current tier (NOT ``seed_state``'s
conservative cold-start shortcut). The excess (book-vs-market) leg
is a **point-in-time** read of the latest NAV-vs-SPY gap — the NAV
series is executor-produced + short, so its hysteresis history is
not reproducible here; a point-in-time forensic read is the honest
surface.

Returns the same ``spy``/``excess`` sub-block shape as the daily
artifact (so the substrate ``drawdown`` key is uniform weekly vs
daily), or ``None`` when there is no SPY history (caller omits the
block — S3-contract-safe, additive).
"""
t = tunables or DrawdownTunables()
s = spy_close.dropna()
if s.empty:
return None
st: DrawdownState | None = None
for i, px in enumerate(s.to_list()):
st, _ = step(
st, spy_close=float(px), trading_day=str(i),
calendar_date=str(i), run_id="hist", tunables=t,
)
assert st is not None # s is non-empty ⇒ at least one step ran
spy_dd = (
float(s.iloc[-1]) / st.spy_peak - 1.0 if st.spy_peak else 0.0
)
spy_block = {
"tier": st.spy_tier,
"drawdown": float(spy_dd),
"peak": None if st.spy_peak is None else float(st.spy_peak),
"regime_contribution": spy_tier_to_regime(st.spy_tier),
}
excess_block: dict[str, Any] = {
"available": False, "tier": "risk_on",
"nav_drawdown": None, "excess_depth": None,
"regime_contribution": None,
}
if nav_history is not None:
nv = nav_history.dropna()
if len(nv) >= 2:
nav_peak = float(nv.cummax().iloc[-1])
nav_dd = (
float(nv.iloc[-1]) / nav_peak - 1.0 if nav_peak else 0.0
)
excess_mag = max(spy_dd - nav_dd, 0.0)
ex_tier = _advance_excess_tier("risk_on", excess_mag, t)
excess_block = {
"available": True,
"tier": ex_tier,
"nav_drawdown": float(nav_dd),
"excess_depth": float(excess_mag),
"regime_contribution": excess_tier_to_regime(ex_tier),
}
return {"spy": spy_block, "excess": excess_block}
33 changes: 33 additions & 0 deletions regime/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,38 @@ def produce_regime_substrate(

current = current_features_from_history(history)

# ── Drawdown regime leg — weekly forensic block (additive) ───────────
# regime-drawdown-hysteresis-260518.md §4.1. Reproducible from the
# price cache (replayed through the hysteresis), mirroring the HMM
# refit-from-history discipline. Best-effort: a failure here writes
# the substrate WITHOUT the drawdown key (S3-contract-safe, ADD-only)
# — the brief/macro-agent fall back to the HMM-only path. The excess
# leg is live-only (point-in-time eod_pnl is not reproducible for an
# ``as_of`` historical backfill).
drawdown_block = None
try:
from regime.drawdown import block_from_history, read_eod_pnl_nav
from regime.features import _read_parquet_close

spy_close = _read_parquet_close(
"SPY", s3_client=s3_client, bucket=bucket,
prefix=price_cache_prefix,
)
if as_of is not None and len(spy_close):
spy_close = spy_close[spy_close.index <= as_of]
nav_hist = (
read_eod_pnl_nav(s3_client, bucket=bucket)
if as_of is None else None
)
drawdown_block = block_from_history(spy_close, nav_history=nav_hist)
except Exception as _dd_err: # noqa: BLE001 — additive, never block
logger.warning(
"[regime_substrate] drawdown block assembly failed (%s) — "
"substrate written without it (additive, S3-contract-safe).",
_dd_err,
)
drawdown_block = None

payload = build_regime_substrate(
feature_history=hmm_input, # only the cols HMM saw at fit time
current_features=current,
Expand All @@ -169,6 +201,7 @@ def produce_regime_substrate(
trading_day=trading_day,
fit_window_start=str(hmm_input.index.min().date()),
fit_window_end=str(hmm_input.index.max().date()),
drawdown_block=drawdown_block,
)

if not write:
Expand Down
39 changes: 39 additions & 0 deletions tests/test_regime_drawdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
DrawdownState,
DrawdownTunables,
bear_stretches,
block_from_history,
compose_effective_regime,
dump_state,
load_state,
Expand Down Expand Up @@ -387,6 +388,44 @@ def put_object(self, **kw):
assert ctx.drawdown_effective_regime is None


# ── block_from_history (forensic weekly view) ────────────────────────────

def test_block_from_history_hysteresis_correct_replay() -> None:
# Rally to 120, fall to 102 (-15%), recover to 117 (-2.5%). A
# hysteresis-correct replay latches risk_off at -10% and only
# releases via the exit bands — at -2.5% it is back to risk_on
# (unlike seed_state's point-in-time, this honours the path).
idx = pd.date_range("2020-01-01", periods=8, freq="W")
spy = pd.Series([100, 110, 120, 108, 102, 108, 114, 117], index=idx)
blk = block_from_history(spy)
assert blk["spy"]["peak"] == 120.0
assert blk["spy"]["tier"] == "risk_on" # recovered past exit band
assert blk["spy"]["drawdown"] == pytest.approx(117 / 120 - 1.0)
assert blk["excess"]["available"] is False


def test_block_from_history_still_in_drawdown() -> None:
idx = pd.date_range("2020-01-01", periods=4, freq="W")
spy = pd.Series([100, 120, 110, 104], index=idx) # -13% from 120
blk = block_from_history(spy)
assert blk["spy"]["tier"] == "risk_off"
assert blk["spy"]["regime_contribution"] == "bear"


def test_block_from_history_excess_point_in_time() -> None:
idx = pd.date_range("2020-01-01", periods=4, freq="W")
spy = pd.Series([100, 100, 100, 100], index=idx) # SPY flat at peak
nav = pd.Series([100, 100, 100, 93], index=idx) # book -7% ⇒ excess 7%
blk = block_from_history(spy, nav_history=nav)
assert blk["excess"]["available"] is True
assert blk["excess"]["tier"] == "alpha_bleed"
assert blk["excess"]["regime_contribution"] == "bear"


def test_block_from_history_empty_returns_none() -> None:
assert block_from_history(pd.Series(dtype=float)) is None


# ── additive substrate hook ──────────────────────────────────────────────

pytest.importorskip("hmmlearn")
Expand Down
40 changes: 40 additions & 0 deletions tests/test_regime_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,43 @@ def test_default_hmm_features_include_pin_feature() -> None:
"""spy_20d_return must be in DEFAULT_HMM_FEATURE_COLUMNS — otherwise
every default-invocation Lambda call would fail the pin check."""
assert "spy_20d_return" in DEFAULT_HMM_FEATURE_COLUMNS


def test_produce_assembles_weekly_drawdown_block() -> None:
"""The weekly handler now fills the additive drawdown block (PR:
weekly-handler wiring) so the substrate the macro agent reads
carries `drawdown` + the composed `effective_regime`."""
s3 = _FakeS3()
_seed_two_regime_price_cache(s3, "test-bucket", DEFAULT_PRICE_CACHE_PREFIX)
result = produce_regime_substrate(
s3_client=s3, bucket="test-bucket", write=False,
)
payload = result["payload"]
assert "drawdown" in payload
assert payload["drawdown"]["spy"]["tier"] in {
"risk_on", "caution", "risk_off",
}
# No eod_pnl.csv seeded ⇒ excess leg gracefully unavailable.
assert payload["drawdown"]["excess"]["available"] is False
eff = payload["effective_regime"]["effective_regime"]
assert eff in {"bull", "neutral", "caution", "bear"}


def test_produce_substrate_survives_drawdown_block_failure(monkeypatch) -> None:
"""A drawdown-assembly failure must NOT block the substrate — the
key is simply omitted (additive, S3-contract-safe)."""
import regime.drawdown as dd

s3 = _FakeS3()
_seed_two_regime_price_cache(s3, "test-bucket", DEFAULT_PRICE_CACHE_PREFIX)
monkeypatch.setattr(
dd, "block_from_history",
lambda *a, **k: (_ for _ in ()).throw(RuntimeError("boom")),
)
result = produce_regime_substrate(
s3_client=s3, bucket="test-bucket", write=False,
)
payload = result["payload"]
assert "drawdown" not in payload # omitted, not crashed
assert "effective_regime" not in payload
assert payload["hmm"]["argmax"] in {"bear", "neutral", "bull"}
Loading