Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions inference/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ class PipelineContext:
# degrades the veto to its legacy (Wire 4 / discrete) behavior.
regime_forced_bear: bool = False

# ── Drawdown regime leg (set by regime_fast_signal stage; observe-only) ──
# Composed effective regime from the deterministic drawdown leg
# (regime-drawdown-hysteresis-260518.md). PR 2 = producer/observe
# only: NO consumer reads this yet (the executor/predictor-veto
# consumer PRs, gated `drawdown_regime_enabled` default-off, will).
# Stamped from the drawdown artifact so the consumer PRs read it
# in-process, mirroring `regime_forced_bear`.
drawdown_effective_regime: Optional[str] = None

def near_timeout(self) -> bool:
"""Check if we're nearing the Lambda soft timeout."""
elapsed = _time.monotonic() - self.start_ts
Expand Down
140 changes: 140 additions & 0 deletions inference/stages/regime_fast_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
STATE_KEY = "regime/bocpd_state.json"
FAST_SIGNAL_PREFIX = "regime/fast_signal"

# Drawdown regime leg (regime-drawdown-hysteresis-260518.md). Rolling
# online state + dated/forensic eval artifact + latest sidecar, mirroring
# the fast-signal keys. PR 2 = producer/observe only.
DRAWDOWN_STATE_KEY = "regime/drawdown_state.json"
DRAWDOWN_PREFIX = "regime/drawdown"


def _emit_metrics(art: dict) -> None:
"""Best-effort CloudWatch gauges. Mirrors run_inference's
Expand Down Expand Up @@ -58,6 +64,133 @@ def _emit_metrics(art: dict) -> None:
)


def _advance_drawdown(ctx: PipelineContext, s3, dual, run_id: str) -> None:
"""Advance the deterministic drawdown regime leg by one day.

PR 2 (regime-drawdown-hysteresis-260518.md): producer + **observe
only** — persists rolling state + a forensic artifact + latest
sidecar, stamps ``ctx.drawdown_effective_regime`` for the future
consumer PRs, and logs the counterfactual. NO consumer reads it yet
(the executor / predictor-veto PRs, gated ``drawdown_regime_enabled``
default-off, will). Fully self-contained + non-raising: a failure
here must never affect the fast-signal path or predictions.

HMM argmax is composed at the *weekly* substrate layer (PR 1's
``build_regime_substrate`` hook); the daily effective regime composes
the legs available intraday — the drawdown leg + Stage-F
``forced_bear`` (most-protective-wins).
"""
try:
import json as _json

from regime.drawdown import (
compose_effective_regime,
dump_state,
load_state,
read_eod_pnl_nav,
seed_state,
step as dd_step,
)
from alpha_engine_lib.eval_artifacts import (
eval_artifact_key,
eval_latest_key,
)

spy_series = ctx.macro.get("SPY") if ctx.macro else None
if spy_series is None or len(spy_series.dropna()) == 0:
log.warning(
"drawdown leg: ctx.macro['SPY'] missing/empty — skipping "
"this run (SPY leg needs the close series). "
"Drawdown state unchanged.",
)
return

nav_series = read_eod_pnl_nav(s3, bucket=ctx.bucket) # None ⇒ degrade

# ── Load prior state (missing/corrupt/schema ⇒ history seed) ─────
# Single robust handler (no fragile except-on-getattr): any read
# failure re-seeds from history; NoSuchKey is the benign cold
# start, anything else is warned for investigation.
prev = None
try:
obj = s3.get_object(Bucket=ctx.bucket, Key=DRAWDOWN_STATE_KEY)
prev = load_state(_json.loads(obj["Body"].read()))
except Exception as exc: # noqa: BLE001 — missing/corrupt/schema
prev = seed_state(spy_series, nav_history=nav_series)
if type(exc).__name__ == "NoSuchKey":
log.warning(
"drawdown leg: no prior state at s3://%s/%s — COLD "
"START, seeded from history (true trailing peak; "
"conservative initial tier).",
ctx.bucket, DRAWDOWN_STATE_KEY,
)
else:
log.warning(
"drawdown leg: prior state unreadable (%s) — "
"re-seeding from history. Investigate if this recurs.",
exc,
)

spy_close = float(spy_series.dropna().iloc[-1])
nav_close = (
float(nav_series.iloc[-1]) if nav_series is not None else None
)

new_state, art = dd_step(
prev,
spy_close=spy_close,
nav=nav_close,
trading_day=dual.trading_day,
calendar_date=dual.calendar_date,
run_id=run_id,
)

composed = compose_effective_regime(
spy_tier=new_state.spy_tier,
excess_tier=(
new_state.excess_tier
if art["excess"]["available"] else None
),
forced_bear=bool(ctx.regime_forced_bear),
)
art["effective_regime"] = composed
# Observe-only stamp — no consumer reads this in PR 2.
ctx.drawdown_effective_regime = composed["effective_regime"]

_s3_put_json(
s3, ctx.bucket, DRAWDOWN_STATE_KEY,
json.dumps(dump_state(new_state)),
)
_s3_put_json(
s3, ctx.bucket,
eval_artifact_key(DRAWDOWN_PREFIX, run_id),
json.dumps(art, default=str),
)
_s3_put_json(
s3, ctx.bucket,
eval_latest_key(DRAWDOWN_PREFIX),
json.dumps(art, default=str),
)

log.info(
"drawdown leg: trading_day=%s spy_tier=%s spy_dd=%.4f "
"excess_tier=%s(avail=%s) forced_bear=%s → effective_regime=%s "
"observed=%s [OBSERVE-ONLY — no consumer in PR 2]",
art["trading_day"], art["spy"]["tier"],
art["spy"]["drawdown"] if art["spy"]["drawdown"] is not None
else float("nan"),
art["excess"]["tier"], art["excess"]["available"],
ctx.regime_forced_bear, composed["effective_regime"],
art["observed"],
)
except Exception as exc: # noqa: BLE001 — non-critical, never raise
log.warning(
"drawdown leg failed: %s — predictions + fast-signal "
"unaffected (observe-only). Drawdown state may be stale this "
"run.", exc, exc_info=True,
)


def run(ctx: PipelineContext) -> None:
"""Advance the daily fast signal. Observe-only in F1."""
if ctx.dry_run or ctx.local:
Expand Down Expand Up @@ -162,6 +295,13 @@ def run(ctx: PipelineContext) -> None:
art["consecutive_change_days"], art["consecutive_clear_days"],
art["warmup"], art["observed"],
)

# ── Drawdown regime leg (PR 2, observe-only) ─────────────────────
# Rides the same daily rail (post-run_inference: ctx.macro["SPY"]
# is populated; ctx.regime_forced_bear was just stamped above).
# Fully self-contained + non-raising so it never affects the
# fast-signal success path or predictions.
_advance_drawdown(ctx, s3, dual, run_id)
except Exception as exc: # noqa: BLE001 — non-critical, never block predictions
log.warning(
"regime_fast_signal stage failed: %s — predictions unaffected "
Expand Down
59 changes: 59 additions & 0 deletions regime/drawdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,3 +488,62 @@ def read_eod_pnl_nav(
)
return None
return series


# ── Cold-start seeding ───────────────────────────────────────────────────

def seed_state(
spy_close_history: "pd.Series",
*,
nav_history: "pd.Series | None" = None,
tunables: DrawdownTunables | None = None,
) -> DrawdownState:
"""Derive a cold-start state from price/NAV history.

Why this exists: ``step()`` accumulates the running peak online, so a
naive cold start (peak = today's close) would report ~0% drawdown
even if the market is mid-crash — silently fabricating a settled
'no drawdown' (the exact failure ``feedback_no_silent_fails``
warns about). Seeding the **true trailing peak** (history cummax) +
a conservative initial tier (the deepest tier the *current* depth
warrants, via the ENTER thresholds — pure-level needs no prior-tier
history) makes the very first online ``step()`` report the real
drawdown.

Returns a ``DrawdownState`` with ``observations_seen=0`` and
``last_update_trading_day=None`` so the first real ``step()`` advances
(not idempotent-skipped).
"""
t = tunables or DrawdownTunables()
s = spy_close_history.dropna()
if s.empty:
return DrawdownState()
spy_peak = float(s.cummax().iloc[-1])
spy_close = float(s.iloc[-1])
spy_dd = spy_close / spy_peak - 1.0 if spy_peak else 0.0
spy_mag = -spy_dd
if spy_mag >= t.spy_risk_off_enter:
spy_tier = "risk_off"
elif spy_mag >= t.spy_caution_enter:
spy_tier = "caution"
else:
spy_tier = "risk_on"

nav_peak: float | None = None
excess_tier = "risk_on"
if nav_history is not None:
nv = nav_history.dropna()
if not nv.empty:
nav_peak = float(nv.cummax().iloc[-1])
nav_close = float(nv.iloc[-1])
nav_dd = nav_close / nav_peak - 1.0 if nav_peak else 0.0
if max(spy_dd - nav_dd, 0.0) >= t.excess_enter:
excess_tier = "alpha_bleed"

return DrawdownState(
spy_tier=spy_tier,
excess_tier=excess_tier,
spy_peak=spy_peak,
nav_peak=nav_peak,
observations_seen=0,
)
124 changes: 124 additions & 0 deletions tests/test_regime_drawdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
load_state,
most_protective,
read_eod_pnl_nav,
seed_state,
step,
)

Expand Down Expand Up @@ -263,6 +264,129 @@ def test_eod_pnl_empty_returns_none() -> None:
assert read_eod_pnl_nav(_S3(b""), bucket="b") is None


# ── cold-start seed_state ────────────────────────────────────────────────

def test_seed_state_uses_true_trailing_peak_not_today() -> None:
# Rallied to 120 then fell to 102 (-15% from the trailing peak). A
# naive cold start (peak=today) would report ~0%; seed_state must
# use the 120 peak and start in risk_off.
idx = pd.date_range("2020-01-01", periods=6, freq="W")
spy = pd.Series([100, 110, 120, 115, 108, 102], index=idx)
st = seed_state(spy)
assert st.spy_peak == 120.0
assert st.spy_tier == "risk_off" # -15% ≥ 10% enter
# First real step from the seed reports the true drawdown, not 0.
st2, art = step(st, spy_close=102.0, trading_day="d0",
calendar_date="c0", run_id="r")
assert art["spy"]["drawdown"] == pytest.approx(102 / 120 - 1.0)
assert st2.spy_tier == "risk_off"


def test_seed_state_conservative_tier_thresholds() -> None:
idx = pd.date_range("2020-01-01", periods=3, freq="W")
assert seed_state(pd.Series([100, 100, 97], index=idx)).spy_tier == "risk_on"
assert seed_state(pd.Series([100, 100, 94], index=idx)).spy_tier == "caution"
assert seed_state(pd.Series([100, 100, 89], index=idx)).spy_tier == "risk_off"


def test_seed_state_nav_excess_seeding() -> None:
idx = pd.date_range("2020-01-01", periods=3, freq="W")
spy = pd.Series([100, 100, 100], index=idx) # SPY flat at peak
nav = pd.Series([100, 100, 93], index=idx) # book -7% ⇒ excess 7%
st = seed_state(spy, nav_history=nav)
assert st.nav_peak == 100.0 and st.excess_tier == "alpha_bleed"


def test_seed_state_empty_history() -> None:
assert seed_state(pd.Series(dtype=float)) == DrawdownState()


# ── daily stage: _advance_drawdown (observe-only) ────────────────────────

def test_stage_advances_drawdown_observe_only(monkeypatch) -> None:
"""The fast-signal stage also advances the drawdown leg: stamps
ctx.drawdown_effective_regime + persists state/artifact/sidecar,
while leaving predictions + the fast-signal path untouched."""
import inference.stages.regime_fast_signal as stg
from inference.pipeline import PipelineContext

class _NoSuchKey(Exception):
pass

puts: dict[str, str] = {}

class _FakeS3:
class exceptions:
NoSuchKey = _NoSuchKey

def get_object(self, Bucket, Key): # noqa: N803 — cold start both
raise _NoSuchKey()

def put_object(self, **kw):
pass

monkeypatch.setitem(
__import__("sys").modules, "boto3",
type("B", (), {"client": staticmethod(lambda svc: _FakeS3())}),
)
monkeypatch.setattr(stg, "_s3_put_json",
lambda s3, b, k, body: puts.__setitem__(k, body))
monkeypatch.setattr(stg, "_emit_metrics", lambda art: None)

ctx = PipelineContext(date_str="2026-05-15", dry_run=False, local=False,
bucket="alpha-engine-research")
ctx.regime_intensity_z = -2.5
# -16% from the 120 trailing peak ⇒ drawdown leg → risk_off → bear.
ctx.macro = {"SPY": pd.Series(
[100.0, 110.0, 120.0, 112.0, 105.0, 101.0],
index=pd.date_range("2026-04-01", periods=6, freq="W"),
)}
assert ctx.drawdown_effective_regime is None # default

stg.run(ctx)

assert ctx.drawdown_effective_regime == "bear" # risk_off escalates
assert stg.DRAWDOWN_STATE_KEY in puts
assert any("regime/drawdown/" in k for k in puts) # artifact + latest
# Fast-signal path unaffected (cold-start warmup ⇒ forced_bear False).
assert ctx.regime_forced_bear is False


def test_stage_drawdown_graceful_when_spy_missing(monkeypatch) -> None:
"""No SPY in ctx.macro ⇒ drawdown leg skips cleanly; fast-signal +
predictions unaffected, ctx.drawdown_effective_regime stays None."""
import inference.stages.regime_fast_signal as stg
from inference.pipeline import PipelineContext

class _NoSuchKey(Exception):
pass

class _FakeS3:
class exceptions:
NoSuchKey = _NoSuchKey

def get_object(self, Bucket, Key): # noqa: N803
raise _NoSuchKey()

def put_object(self, **kw):
pass

monkeypatch.setitem(
__import__("sys").modules, "boto3",
type("B", (), {"client": staticmethod(lambda svc: _FakeS3())}),
)
monkeypatch.setattr(stg, "_s3_put_json", lambda *a, **k: None)
monkeypatch.setattr(stg, "_emit_metrics", lambda art: None)

ctx = PipelineContext(date_str="2026-05-15", bucket="alpha-engine-research")
ctx.regime_intensity_z = 0.1
ctx.macro = {} # no SPY

stg.run(ctx) # must not raise

assert ctx.drawdown_effective_regime is None


# ── additive substrate hook ──────────────────────────────────────────────

pytest.importorskip("hmmlearn")
Expand Down
Loading