Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ RUN microdnf install -y git && microdnf clean all
# requirements file so the [flow_doctor]-only install above isn't
# overridden by the [arcticdb,flow_doctor,rag] extras pinned for EC2.
COPY requirements.txt ${LAMBDA_TASK_ROOT}/
RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.19.0" && \
RUN pip install --no-cache-dir "alpha-engine-lib[flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.20.0" && \
grep -vE "^#|^$|^pytest|^python-dotenv|^boto3|^botocore|^s3transfer|^alpha-engine-lib" requirements.txt > /tmp/req-lambda.txt && \
pip install --no-cache-dir -r /tmp/req-lambda.txt && \
rm -rf /root/.cache/pip /tmp/req-lambda.txt
Expand Down
89 changes: 84 additions & 5 deletions features/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ def _load_sector_map(s3, bucket: str) -> dict[str, str]:
load_parquet_from_s3 as _load_parquet_from_s3,
load_slim_cache as _load_slim_cache,
)
from alpha_engine_lib.arcticdb import (
load_universe_ohlcv,
load_macro_series,
open_macro_lib,
)
from alpha_engine_lib.reconcile import reconcile_frame_dicts


def _safe_last_date(idx: pd.Index) -> pd.Timestamp | None:
Expand Down Expand Up @@ -347,23 +353,96 @@ def _extract_macro(
return macro


def _load_price_source(s3, bucket: str) -> dict | None:
"""The ~full-universe price+macro symbol set — ArcticDB primary,
slim-cache fallback, parity-observed.

Wave 4 of the predictor/price_cache_slim deletion arc, riskier sibling
of the macro-breadth migration: this feeds the ENTIRE feature-compute
pipeline (price_data) AND _extract_macro. The slim cache historically
carried equities + SPY + the index/macro series (VIX/VIX3M/TNX/IRX/
GLD/USO) + the XL* sector ETFs in one flat dict. Those tenants are
split across two ArcticDB libs:

- universe lib -> equities + SPY (load_universe_ohlcv)
- macro lib -> VIX.../XL* series (load_macro_series)

so the ArcticDB-equivalent is the union of both reads. slim is kept as
a fallback for the whole set (feature compute cannot run blind) and,
while both still exist, every run dual-reads and emits a reconcile
ParityReport (grep ``WAVE4_PARITY_METRIC compute``) so PR4's slim
deletion is a data-driven cutover. The slim side is removed in PR4.

require_ticker_match is False for the emitted report: the slim cache
legitimately carries some symbols the universe lib does not, so set
asymmetry is expected — it is logged in the metric fields for
visibility while ``passed`` reflects value fidelity over the overlap.

Returns None only if BOTH sources fail (caller then returns empty,
preserving the existing no-data contract).
"""
combined: dict | None = None
try:
prices = load_universe_ohlcv(bucket) # equities + SPY
macro_syms = set(_MACRO_SLIM_KEYS.values())
try:
mlib = open_macro_lib(bucket)
macro_syms |= {
s for s in mlib.list_symbols() if s.startswith("XL")
}
except Exception as exc: # noqa: BLE001 - XL* discovery best-effort
log.warning("macro-lib symbol listing failed: %s", exc)
macro_frames = load_macro_series(bucket, macro_syms)
merged = {**prices, **macro_frames}
combined = merged or None
except Exception as exc: # noqa: BLE001 - fall back, don't run blind
log.warning("ArcticDB universe/macro read failed: %s", exc)

try:
slim_data = _load_slim_cache(s3, bucket)
except Exception as exc: # noqa: BLE001 - parity/fallback only
log.warning("slim cache read (parity/fallback) failed: %s", exc)
slim_data = None

if combined and slim_data:
report = reconcile_frame_dicts(
slim_data, combined, value_cols=("Close",),
require_ticker_match=False,
)
log.info("compute slim<->arctic %s", report.summary())
log.info(
"WAVE4_PARITY_METRIC compute %s", json.dumps(report.as_metrics())
)

if combined:
return combined
if slim_data:
log.warning(
"feature compute falling back to slim cache — ArcticDB "
"unavailable (Wave-4 migration fallback path)"
)
return slim_data
return None


def _load_prices_and_macro(
s3, bucket: str, date_str: str,
) -> tuple[dict[str, pd.DataFrame], dict[str, pd.Series]]:
"""
Load price data and macro series from S3 slim cache + daily delta.
Load price data and macro series — ArcticDB primary, slim fallback
(see _load_price_source) + daily delta.

Trusts upstream data quality — DailyData collects fresh prices,
Saturday DataPhase1 handles splits during full price refresh.
No yfinance calls; no external API dependencies.
"""
slim_data = _load_slim_cache(s3, bucket)
if not slim_data:
source = _load_price_source(s3, bucket)
if not source:
return {}, {}

price_data = dict(slim_data)
price_data = dict(source)
price_data, _split_tickers = _apply_daily_delta(s3, bucket, date_str, price_data)
macro = _extract_macro(price_data, slim_data)
macro = _extract_macro(price_data, source)

return price_data, macro

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ arcticdb>=6.11
# previously listed above as direct deps; kept those direct lines for now to
# avoid breaking pinning during the migration. Drop the duplicate direct
# pgvector/psycopg2-binary pins once the migration soaks.
alpha-engine-lib[arcticdb,flow_doctor,rag] @ git+https://github.com/cipher813/alpha-engine-lib@v0.19.0
alpha-engine-lib[arcticdb,flow_doctor,rag] @ git+https://github.com/cipher813/alpha-engine-lib@v0.20.0
113 changes: 113 additions & 0 deletions tests/test_compute_price_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""
Wave-4 PR1b — features.compute._load_price_source.

The riskier consumer migration: ArcticDB (universe lib + macro lib) is the
primary price+macro source, slim cache is the fallback, and a parity
ParityReport is emitted every run while both exist (grep
``WAVE4_PARITY_METRIC compute``). Covers the composed-read, fallback, and
observation paths.
"""

from __future__ import annotations

import pandas as pd
import pytest

from features import compute


def _frame(n=10, start=100.0):
idx = pd.date_range("2026-04-01", periods=n, freq="D")
return pd.DataFrame(
{"Close": [float(start + i) for i in range(n)], "Volume": [1] * n},
index=idx,
)


class _FakeMacroLib:
def __init__(self, symbols):
self._symbols = symbols

def list_symbols(self):
return self._symbols


def _stub_arctic(monkeypatch, *, universe, macro_frames, macro_symbols):
monkeypatch.setattr(compute, "load_universe_ohlcv", lambda bucket: dict(universe))
monkeypatch.setattr(
compute, "open_macro_lib", lambda bucket: _FakeMacroLib(macro_symbols)
)
monkeypatch.setattr(
compute, "load_macro_series", lambda bucket, syms: dict(macro_frames)
)


def test_composes_universe_and_macro_when_arcticdb_available(monkeypatch):
"""Equities+SPY from universe lib UNIONED with VIX../XL* from macro lib."""
universe = {"AAPL": _frame(), "SPY": _frame(start=500)}
macro_frames = {"VIX": _frame(start=18), "XLK": _frame(start=200)}
_stub_arctic(
monkeypatch, universe=universe, macro_frames=macro_frames,
macro_symbols=["VIX", "XLK", "features"], # 'features' must be ignored
)
monkeypatch.setattr(compute, "_load_slim_cache", lambda s3, b: {})

out = compute._load_price_source(s3=None, bucket="b")
assert set(out) == {"AAPL", "SPY", "VIX", "XLK"}


def test_falls_back_to_slim_when_arcticdb_fails(monkeypatch, caplog):
def _boom(bucket):
raise RuntimeError("ArcticDB down")

monkeypatch.setattr(compute, "load_universe_ohlcv", _boom)
slim = {"AAPL": _frame(), "VIX": _frame(start=18)}
monkeypatch.setattr(compute, "_load_slim_cache", lambda s3, b: slim)

with caplog.at_level("WARNING"):
out = compute._load_price_source(s3=None, bucket="b")
assert set(out) == {"AAPL", "VIX"}
assert any("falling back to slim cache" in r.message for r in caplog.records)


def test_parity_metric_emitted_when_both_present(monkeypatch, caplog):
universe = {"AAPL": _frame()}
macro_frames = {"VIX": _frame(start=18)}
_stub_arctic(
monkeypatch, universe=universe, macro_frames=macro_frames,
macro_symbols=["VIX"],
)
# slim carries the same data + an extra symbol the universe lib lacks;
# require_ticker_match=False -> set asymmetry is reported, not fatal.
slim = {"AAPL": _frame(), "VIX": _frame(start=18), "OLDSYM": _frame()}
monkeypatch.setattr(compute, "_load_slim_cache", lambda s3, b: slim)

with caplog.at_level("INFO"):
compute._load_price_source(s3=None, bucket="b")

lines = [
r.message for r in caplog.records
if "WAVE4_PARITY_METRIC compute" in r.message
]
assert len(lines) == 1
import json

payload = json.loads(lines[0].split("WAVE4_PARITY_METRIC compute ", 1)[1])
assert payload["max_abs_value_delta"] == 0.0 # overlap identical
assert payload["passed"] is True # value fidelity holds
assert "OLDSYM" in payload["only_in_a"] # asymmetry visible


def test_returns_none_when_both_sources_fail(monkeypatch):
def _boom(*a, **k):
raise RuntimeError("down")

monkeypatch.setattr(compute, "load_universe_ohlcv", _boom)
monkeypatch.setattr(compute, "_load_slim_cache", _boom)
assert compute._load_price_source(s3=None, bucket="b") is None


def test_load_prices_and_macro_empty_when_no_source(monkeypatch):
monkeypatch.setattr(compute, "_load_price_source", lambda s3, b: None)
prices, macro = compute._load_prices_and_macro(None, "b", "2026-04-10")
assert prices == {} and macro == {}
Loading