Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 16 additions & 36 deletions inference/stages/load_prices.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,16 @@ def _safe_last_date(idx: "pd.Index") -> "pd.Timestamp | None":
return pd.Timestamp(last).normalize()


def _verify_arctic_fresh(universe_lib, macro_lib, date_str: str) -> None:
def _verify_arctic_fresh(universe_lib, date_str: str) -> None:
"""Assert ArcticDB's SPY close series carries the most recent NYSE
session that has actually closed as of wall-clock now.

Reads SPY from ``universe`` ArcticDB library (L1346 (b) retirement,
2026-05-24). Pre-fix used ``macro_lib.read("SPY")`` which was the
only SPY source in ArcticDB until alpha-engine-data #245 (MERGED
2026-05-15) lifted SPY to a full ``universe`` member via
``_UNIVERSE_EXTRA = frozenset({"SPY"})`` written by BOTH backfill
(Saturday) AND daily_append (weekday). Both stores now carry SPY's
Close; reading from ``universe`` retires the macro-special-case
that this entry's L1346 closes-when criterion targeted. ``macro_lib``
parameter retained for backwards-compatibility with callers; will
drop on the next signature-changing refactor. Falls back to
``macro_lib`` if ``universe.SPY`` is absent — defensive transition
posture during the cross-repo retirement arc.
Reads SPY from ``universe`` ArcticDB library. SPY is written there
by both `builders/backfill.py` (Saturday) and `builders/daily_append.py`
(weekday) via the `_UNIVERSE_EXTRA = frozenset({"SPY"})` write path
shipped in alpha-engine-data #245 (2026-05-15). Pre-#245 the freshness
gate read from `macro.SPY` instead; a transitional `macro_lib` fallback
was kept for one cross-repo soak cycle and retired here.

Uses ``alpha_engine_lib.trading_calendar.last_closed_trading_day()`` —
the same primitive the system-wide ``alpha_engine_lib.dates.{trading_days_stale,
Expand All @@ -99,38 +93,24 @@ def _verify_arctic_fresh(universe_lib, macro_lib, date_str: str) -> None:

Raises PipelineAbort on stale/missing SPY.
"""
# L1346 (b) — prefer universe.SPY (post-PR #245). Fall back to macro.SPY
# if universe.SPY is absent (defensive transition during the cross-repo
# retirement arc; after one weekday SF on the v0.X universe-SPY-write
# path, this fallback can be deleted along with the macro_lib parameter).
df = None
read_source = "universe"
try:
df = universe_lib.read("SPY", columns=["Close"]).data
except Exception:
# Fall through to macro fallback below.
df = None
if df is None or df.empty:
try:
df = macro_lib.read("SPY", columns=["Close"]).data
read_source = "macro (fallback)"
except Exception as exc:
raise PipelineAbort(
f"ArcticDB SPY unreadable from BOTH universe AND macro: {exc} "
f"— DataPhase1 did not run or both libraries are broken."
) from exc
except Exception as exc:
raise PipelineAbort(
f"ArcticDB universe.SPY unreadable: {exc} — DataPhase1 did "
f"not run or the universe library is broken."
) from exc

last_date = _safe_last_date(df.index)
if last_date is None:
raise PipelineAbort(
f"ArcticDB SPY (from {read_source}) has no rows — "
f"DataPhase1 has never written."
"ArcticDB universe.SPY has no rows — DataPhase1 has never written."
)

expected_min = pd.Timestamp(last_closed_trading_day()).normalize()
if last_date < expected_min:
raise PipelineAbort(
f"ArcticDB SPY (from {read_source}) last_date={last_date.date()} "
f"ArcticDB universe.SPY last_date={last_date.date()} "
f"is stale for inference date={date_str} (expected last_date >= "
f"{expected_min.date()}, the most recent closed trading session). "
f"The post-close daily-data job likely failed to update — "
Expand Down Expand Up @@ -295,8 +275,8 @@ def run(ctx: PipelineContext) -> None:

# ── Freshness gate: SPY must have today's close in ArcticDB ──────────────
if not ctx.dry_run:
universe_lib, macro_lib = _connect_arctic(ctx.bucket)
_verify_arctic_fresh(universe_lib, macro_lib, ctx.date_str)
universe_lib, _macro_lib = _connect_arctic(ctx.bucket)
_verify_arctic_fresh(universe_lib, ctx.date_str)

# ── Per-ticker price data age (downstream telemetry) ─────────────────────
if ctx.price_data:
Expand Down
80 changes: 40 additions & 40 deletions tests/test_bad_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class TestArcticFreshnessGate:
post-close it expects today. Tests freeze wall-clock time to cover both.
"""

def _macro_lib_mock(self, last_date=None, raise_exc=None):
def _universe_lib_mock(self, last_date=None, raise_exc=None):
from unittest.mock import MagicMock
lib = MagicMock()
if raise_exc is not None:
Expand Down Expand Up @@ -149,17 +149,17 @@ def test_missing_spy_raises_pipeline_abort(self):
from inference.stages.load_prices import _verify_arctic_fresh
from inference.pipeline import PipelineAbort

lib = self._macro_lib_mock(raise_exc=Exception("SymbolNotFound"))
lib = self._universe_lib_mock(raise_exc=Exception("SymbolNotFound"))
with pytest.raises(PipelineAbort, match="unreadable"):
_verify_arctic_fresh(lib, lib, "2026-04-20")
_verify_arctic_fresh(lib, "2026-04-20")

def test_empty_spy_raises_pipeline_abort(self):
from inference.stages.load_prices import _verify_arctic_fresh
from inference.pipeline import PipelineAbort

lib = self._macro_lib_mock(last_date=None)
lib = self._universe_lib_mock(last_date=None)
with pytest.raises(PipelineAbort, match="no rows"):
_verify_arctic_fresh(lib, lib, "2026-04-20")
_verify_arctic_fresh(lib, "2026-04-20")

def test_preclose_monday_with_friday_spy_passes(self, monkeypatch):
"""Morning SF (Mon 9 AM ET, pre-close) with SPY@Fri = the normal happy path."""
Expand All @@ -168,8 +168,8 @@ def test_preclose_monday_with_friday_spy_passes(self, monkeypatch):
from inference.stages.load_prices import _verify_arctic_fresh

self._freeze_now(monkeypatch, datetime(2026, 4, 20, 9, 0, tzinfo=ZoneInfo("America/New_York")))
lib = self._macro_lib_mock(last_date="2026-04-17")
_verify_arctic_fresh(lib, lib, "2026-04-20") # should not raise
lib = self._universe_lib_mock(last_date="2026-04-17")
_verify_arctic_fresh(lib, "2026-04-20") # should not raise

def test_postclose_monday_with_monday_spy_passes(self, monkeypatch):
"""Post-close rerun with SPY@today = also a valid configuration."""
Expand All @@ -178,8 +178,8 @@ def test_postclose_monday_with_monday_spy_passes(self, monkeypatch):
from inference.stages.load_prices import _verify_arctic_fresh

self._freeze_now(monkeypatch, datetime(2026, 4, 20, 16, 30, tzinfo=ZoneInfo("America/New_York")))
lib = self._macro_lib_mock(last_date="2026-04-20")
_verify_arctic_fresh(lib, lib, "2026-04-20") # should not raise
lib = self._universe_lib_mock(last_date="2026-04-20")
_verify_arctic_fresh(lib, "2026-04-20") # should not raise

def test_preclose_monday_with_thursday_spy_raises(self, monkeypatch):
"""SPY older than the most recent closed session should abort."""
Expand All @@ -189,9 +189,9 @@ def test_preclose_monday_with_thursday_spy_raises(self, monkeypatch):
from inference.pipeline import PipelineAbort

self._freeze_now(monkeypatch, datetime(2026, 4, 20, 9, 0, tzinfo=ZoneInfo("America/New_York")))
lib = self._macro_lib_mock(last_date="2026-04-16")
lib = self._universe_lib_mock(last_date="2026-04-16")
with pytest.raises(PipelineAbort, match="stale"):
_verify_arctic_fresh(lib, lib, "2026-04-20")
_verify_arctic_fresh(lib, "2026-04-20")

def test_postclose_monday_with_friday_spy_raises(self, monkeypatch):
"""Post-close, SPY must include today — Fri is stale for a Mon post-close run."""
Expand All @@ -201,19 +201,18 @@ def test_postclose_monday_with_friday_spy_raises(self, monkeypatch):
from inference.pipeline import PipelineAbort

self._freeze_now(monkeypatch, datetime(2026, 4, 20, 16, 30, tzinfo=ZoneInfo("America/New_York")))
lib = self._macro_lib_mock(last_date="2026-04-17")
lib = self._universe_lib_mock(last_date="2026-04-17")
with pytest.raises(PipelineAbort, match="stale"):
_verify_arctic_fresh(lib, lib, "2026-04-20")
_verify_arctic_fresh(lib, "2026-04-20")


class TestUniverseSPYFreshnessReadL1346:
"""L1346 (b) — predictor freshness gate now prefers universe.SPY over
macro.SPY (post-PR #245 _UNIVERSE_EXTRA write path).
"""L1346 (b) — predictor freshness gate reads universe.SPY only.

Pins:
1. universe.SPY present → reads from universe (no macro touch)
2. universe.SPY absent → falls back to macro.SPY (transition posture)
3. Both absent → PipelineAbort
Pre-#245 the gate read from macro.SPY; #245 (2026-05-15) lifted SPY
to a full universe member via `_UNIVERSE_EXTRA = frozenset({"SPY"})`.
A transitional macro-fallback was kept for one cross-repo soak cycle
and retired here — universe.SPY is now the sole source.
"""

def _make_lib(self, last_date=None, raise_exc=None):
Expand Down Expand Up @@ -241,36 +240,37 @@ def now(cls, tz=None):

monkeypatch.setattr(tc, "datetime", _FrozenDatetime)

def test_universe_spy_present_macro_untouched(self, monkeypatch):
"""When universe.SPY has fresh data, macro is NEVER read."""
def test_universe_spy_present_passes(self, monkeypatch):
"""When universe.SPY has fresh data, the gate clears with no fallback."""
from datetime import datetime
from zoneinfo import ZoneInfo
from inference.stages.load_prices import _verify_arctic_fresh

self._freeze_now(monkeypatch, datetime(2026, 4, 20, 9, 0, tzinfo=ZoneInfo("America/New_York")))
universe_lib = self._make_lib(last_date="2026-04-17")
macro_lib = self._make_lib(raise_exc=Exception("MUST NOT BE READ"))
_verify_arctic_fresh(universe_lib, macro_lib, "2026-04-20")
macro_lib.read.assert_not_called()
_verify_arctic_fresh(universe_lib, "2026-04-20")
universe_lib.read.assert_called_once()

def test_universe_spy_absent_falls_back_to_macro(self, monkeypatch):
"""universe.SPY raises (e.g. symbol-not-found) → fallback to macro.SPY."""
from datetime import datetime
from zoneinfo import ZoneInfo
def test_universe_spy_absent_raises_pipeline_abort(self):
"""universe.SPY missing (e.g. symbol-not-found) → hard fail; no macro fallback."""
from inference.stages.load_prices import _verify_arctic_fresh
from inference.pipeline import PipelineAbort

self._freeze_now(monkeypatch, datetime(2026, 4, 20, 9, 0, tzinfo=ZoneInfo("America/New_York")))
universe_lib = self._make_lib(raise_exc=Exception("SymbolNotFound"))
macro_lib = self._make_lib(last_date="2026-04-17")
# Should not raise — fallback path activates
_verify_arctic_fresh(universe_lib, macro_lib, "2026-04-20")
macro_lib.read.assert_called_once()
with pytest.raises(PipelineAbort, match="universe.SPY unreadable"):
_verify_arctic_fresh(universe_lib, "2026-04-20")

def test_both_absent_raises_pipeline_abort(self):
from inference.stages.load_prices import _verify_arctic_fresh
from inference.pipeline import PipelineAbort
def test_signature_takes_universe_lib_only(self):
"""Signature pin — caller must pass exactly (universe_lib, date_str).

universe_lib = self._make_lib(raise_exc=Exception("universe gone"))
macro_lib = self._make_lib(raise_exc=Exception("macro gone"))
with pytest.raises(PipelineAbort, match="BOTH universe AND macro"):
_verify_arctic_fresh(universe_lib, macro_lib, "2026-04-20")
Prevents a regression that re-introduces the macro_lib parameter
(the L1346 closes-when state was 'drop the macro_lib parameter').
"""
import inspect
from inference.stages.load_prices import _verify_arctic_fresh
sig = inspect.signature(_verify_arctic_fresh)
assert list(sig.parameters) == ["universe_lib", "date_str"], (
f"_verify_arctic_fresh signature drifted: {sig.parameters}. "
f"L1346 (b) retired the macro_lib parameter; re-adding it "
f"would re-introduce the dead-defense pattern."
)
Loading