From f48dac08a8316c9befac89ada437e960a7d95c0f Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 19 May 2026 12:28:14 -0700 Subject: [PATCH] feat(compute): feature price+macro source -> ArcticDB (slim fallback, parity) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR1b of the Wave-4 predictor/price_cache_slim deletion arc — the riskier consumer (feeds the ENTIRE feature-compute pipeline + _extract_macro). Consumes lib v0.20.0 (PR0a-2: load_macro_series). features/compute._load_prices_and_macro's price source moves from a single load_slim_cache read to a composed ArcticDB read via the new _load_price_source() helper: - universe lib (load_universe_ohlcv) -> equities + SPY - macro lib (load_macro_series) -> VIX/VIX3M/TNX/IRX/GLD/USO + XL* sector ETFs (discovered via open_macro_lib().list_symbols(), filtered startswith XL; the heterogeneous non-price 'features' key is excluded by the explicit-symbols contract) - union = the slim-cache equivalent that _extract_macro + the feature pipeline consume unchanged. slim cache RETAINED as a whole-set fallback — feature compute cannot run blind (returns None only if BOTH sources fail -> empty, preserving the existing no-data contract). SOTA observation: while both exist, every run dual-reads + emits reconcile as_metrics() JSON (grep WAVE4_PARITY_METRIC compute). require_ticker_match=False — slim legitimately carries symbols the universe lib does not, so set asymmetry is logged for visibility while passed reflects value fidelity over the overlap. Dual-read + slim removed in PR4. Pin bump 0.19.0 -> 0.20.0 (requirements.txt + Dockerfile lockstep). No Dockerfile-extra change needed: features/compute.py is NOT Lambda-packaged (Dockerfile does not COPY features/); runs only via weekly_collector on EC2 spot, whose requirements already carry the [arcticdb] extra. +5 tests (compose / slim fallback / parity emit / both-fail / empty); full data suite 1380 passing; Wave-4 anti-drift guard still holds (compute.py keeps slim fallback -> stays in WAVE4_INVENTORY until PR4). Co-Authored-By: Claude Opus 4.7 (1M context) --- Dockerfile | 2 +- features/compute.py | 89 +++++++++++++++++++++-- requirements.txt | 2 +- tests/test_compute_price_source.py | 113 +++++++++++++++++++++++++++++ 4 files changed, 199 insertions(+), 7 deletions(-) create mode 100644 tests/test_compute_price_source.py diff --git a/Dockerfile b/Dockerfile index 9714e91..75bd0ac 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,7 +19,7 @@ RUN microdnf install -y git && microdnf clean all # requirements file so the [flow_doctor]-only install above isn't # overridden by the [arcticdb,flow_doctor,rag] extras pinned for EC2. COPY requirements.txt ${LAMBDA_TASK_ROOT}/ -RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.19.0" && \ +RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.20.0" && \ grep -vE "^#|^$|^pytest|^python-dotenv|^boto3|^botocore|^s3transfer|^alpha-engine-lib" requirements.txt > /tmp/req-lambda.txt && \ pip install --no-cache-dir -r /tmp/req-lambda.txt && \ rm -rf /root/.cache/pip /tmp/req-lambda.txt diff --git a/features/compute.py b/features/compute.py index 761808a..9c2d137 100644 --- a/features/compute.py +++ b/features/compute.py @@ -132,6 +132,12 @@ def _load_sector_map(s3, bucket: str) -> dict[str, str]: load_parquet_from_s3 as _load_parquet_from_s3, load_slim_cache as _load_slim_cache, ) +from alpha_engine_lib.arcticdb import ( + load_universe_ohlcv, + load_macro_series, + open_macro_lib, +) +from alpha_engine_lib.reconcile import reconcile_frame_dicts def _safe_last_date(idx: pd.Index) -> pd.Timestamp | None: @@ -347,23 +353,96 @@ def _extract_macro( return macro +def _load_price_source(s3, bucket: str) -> dict | None: + """The ~full-universe price+macro symbol set — ArcticDB primary, + slim-cache fallback, parity-observed. + + Wave 4 of the predictor/price_cache_slim deletion arc, riskier sibling + of the macro-breadth migration: this feeds the ENTIRE feature-compute + pipeline (price_data) AND _extract_macro. The slim cache historically + carried equities + SPY + the index/macro series (VIX/VIX3M/TNX/IRX/ + GLD/USO) + the XL* sector ETFs in one flat dict. Those tenants are + split across two ArcticDB libs: + + - universe lib -> equities + SPY (load_universe_ohlcv) + - macro lib -> VIX.../XL* series (load_macro_series) + + so the ArcticDB-equivalent is the union of both reads. slim is kept as + a fallback for the whole set (feature compute cannot run blind) and, + while both still exist, every run dual-reads and emits a reconcile + ParityReport (grep ``WAVE4_PARITY_METRIC compute``) so PR4's slim + deletion is a data-driven cutover. The slim side is removed in PR4. + + require_ticker_match is False for the emitted report: the slim cache + legitimately carries some symbols the universe lib does not, so set + asymmetry is expected — it is logged in the metric fields for + visibility while ``passed`` reflects value fidelity over the overlap. + + Returns None only if BOTH sources fail (caller then returns empty, + preserving the existing no-data contract). + """ + combined: dict | None = None + try: + prices = load_universe_ohlcv(bucket) # equities + SPY + macro_syms = set(_MACRO_SLIM_KEYS.values()) + try: + mlib = open_macro_lib(bucket) + macro_syms |= { + s for s in mlib.list_symbols() if s.startswith("XL") + } + except Exception as exc: # noqa: BLE001 - XL* discovery best-effort + log.warning("macro-lib symbol listing failed: %s", exc) + macro_frames = load_macro_series(bucket, macro_syms) + merged = {**prices, **macro_frames} + combined = merged or None + except Exception as exc: # noqa: BLE001 - fall back, don't run blind + log.warning("ArcticDB universe/macro read failed: %s", exc) + + try: + slim_data = _load_slim_cache(s3, bucket) + except Exception as exc: # noqa: BLE001 - parity/fallback only + log.warning("slim cache read (parity/fallback) failed: %s", exc) + slim_data = None + + if combined and slim_data: + report = reconcile_frame_dicts( + slim_data, combined, value_cols=("Close",), + require_ticker_match=False, + ) + log.info("compute slim<->arctic %s", report.summary()) + log.info( + "WAVE4_PARITY_METRIC compute %s", json.dumps(report.as_metrics()) + ) + + if combined: + return combined + if slim_data: + log.warning( + "feature compute falling back to slim cache — ArcticDB " + "unavailable (Wave-4 migration fallback path)" + ) + return slim_data + return None + + def _load_prices_and_macro( s3, bucket: str, date_str: str, ) -> tuple[dict[str, pd.DataFrame], dict[str, pd.Series]]: """ - Load price data and macro series from S3 slim cache + daily delta. + Load price data and macro series — ArcticDB primary, slim fallback + (see _load_price_source) + daily delta. Trusts upstream data quality — DailyData collects fresh prices, Saturday DataPhase1 handles splits during full price refresh. No yfinance calls; no external API dependencies. """ - slim_data = _load_slim_cache(s3, bucket) - if not slim_data: + source = _load_price_source(s3, bucket) + if not source: return {}, {} - price_data = dict(slim_data) + price_data = dict(source) price_data, _split_tickers = _apply_daily_delta(s3, bucket, date_str, price_data) - macro = _extract_macro(price_data, slim_data) + macro = _extract_macro(price_data, source) return price_data, macro diff --git a/requirements.txt b/requirements.txt index 60e4950..6d98315 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,4 +19,4 @@ arcticdb>=6.11 # previously listed above as direct deps; kept those direct lines for now to # avoid breaking pinning during the migration. Drop the duplicate direct # pgvector/psycopg2-binary pins once the migration soaks. -alpha-engine-lib[arcticdb,flow_doctor,rag] @ git+https://github.com/cipher813/alpha-engine-lib@v0.19.0 +alpha-engine-lib[arcticdb,flow_doctor,rag] @ git+https://github.com/cipher813/alpha-engine-lib@v0.20.0 diff --git a/tests/test_compute_price_source.py b/tests/test_compute_price_source.py new file mode 100644 index 0000000..6c1cd17 --- /dev/null +++ b/tests/test_compute_price_source.py @@ -0,0 +1,113 @@ +""" +Wave-4 PR1b — features.compute._load_price_source. + +The riskier consumer migration: ArcticDB (universe lib + macro lib) is the +primary price+macro source, slim cache is the fallback, and a parity +ParityReport is emitted every run while both exist (grep +``WAVE4_PARITY_METRIC compute``). Covers the composed-read, fallback, and +observation paths. +""" + +from __future__ import annotations + +import pandas as pd +import pytest + +from features import compute + + +def _frame(n=10, start=100.0): + idx = pd.date_range("2026-04-01", periods=n, freq="D") + return pd.DataFrame( + {"Close": [float(start + i) for i in range(n)], "Volume": [1] * n}, + index=idx, + ) + + +class _FakeMacroLib: + def __init__(self, symbols): + self._symbols = symbols + + def list_symbols(self): + return self._symbols + + +def _stub_arctic(monkeypatch, *, universe, macro_frames, macro_symbols): + monkeypatch.setattr(compute, "load_universe_ohlcv", lambda bucket: dict(universe)) + monkeypatch.setattr( + compute, "open_macro_lib", lambda bucket: _FakeMacroLib(macro_symbols) + ) + monkeypatch.setattr( + compute, "load_macro_series", lambda bucket, syms: dict(macro_frames) + ) + + +def test_composes_universe_and_macro_when_arcticdb_available(monkeypatch): + """Equities+SPY from universe lib UNIONED with VIX../XL* from macro lib.""" + universe = {"AAPL": _frame(), "SPY": _frame(start=500)} + macro_frames = {"VIX": _frame(start=18), "XLK": _frame(start=200)} + _stub_arctic( + monkeypatch, universe=universe, macro_frames=macro_frames, + macro_symbols=["VIX", "XLK", "features"], # 'features' must be ignored + ) + monkeypatch.setattr(compute, "_load_slim_cache", lambda s3, b: {}) + + out = compute._load_price_source(s3=None, bucket="b") + assert set(out) == {"AAPL", "SPY", "VIX", "XLK"} + + +def test_falls_back_to_slim_when_arcticdb_fails(monkeypatch, caplog): + def _boom(bucket): + raise RuntimeError("ArcticDB down") + + monkeypatch.setattr(compute, "load_universe_ohlcv", _boom) + slim = {"AAPL": _frame(), "VIX": _frame(start=18)} + monkeypatch.setattr(compute, "_load_slim_cache", lambda s3, b: slim) + + with caplog.at_level("WARNING"): + out = compute._load_price_source(s3=None, bucket="b") + assert set(out) == {"AAPL", "VIX"} + assert any("falling back to slim cache" in r.message for r in caplog.records) + + +def test_parity_metric_emitted_when_both_present(monkeypatch, caplog): + universe = {"AAPL": _frame()} + macro_frames = {"VIX": _frame(start=18)} + _stub_arctic( + monkeypatch, universe=universe, macro_frames=macro_frames, + macro_symbols=["VIX"], + ) + # slim carries the same data + an extra symbol the universe lib lacks; + # require_ticker_match=False -> set asymmetry is reported, not fatal. + slim = {"AAPL": _frame(), "VIX": _frame(start=18), "OLDSYM": _frame()} + monkeypatch.setattr(compute, "_load_slim_cache", lambda s3, b: slim) + + with caplog.at_level("INFO"): + compute._load_price_source(s3=None, bucket="b") + + lines = [ + r.message for r in caplog.records + if "WAVE4_PARITY_METRIC compute" in r.message + ] + assert len(lines) == 1 + import json + + payload = json.loads(lines[0].split("WAVE4_PARITY_METRIC compute ", 1)[1]) + assert payload["max_abs_value_delta"] == 0.0 # overlap identical + assert payload["passed"] is True # value fidelity holds + assert "OLDSYM" in payload["only_in_a"] # asymmetry visible + + +def test_returns_none_when_both_sources_fail(monkeypatch): + def _boom(*a, **k): + raise RuntimeError("down") + + monkeypatch.setattr(compute, "load_universe_ohlcv", _boom) + monkeypatch.setattr(compute, "_load_slim_cache", _boom) + assert compute._load_price_source(s3=None, bucket="b") is None + + +def test_load_prices_and_macro_empty_when_no_source(monkeypatch): + monkeypatch.setattr(compute, "_load_price_source", lambda s3, b: None) + prices, macro = compute._load_prices_and_macro(None, "b", "2026-04-10") + assert prices == {} and macro == {}