diff --git a/tests/test_risk_model_persist.py b/tests/test_risk_model_persist.py new file mode 100644 index 0000000..6bad316 --- /dev/null +++ b/tests/test_risk_model_persist.py @@ -0,0 +1,265 @@ +"""Tests for ``training/risk_model_persist.py`` (ROADMAP C.2b). + +Wires the C.2a math primitives (``risk_model.build_factor_risk_model``) +into the Saturday SF PredictorTraining stage and persists F + D to +``s3://{bucket}/risk_model/{date}/{F,D}.parquet``. Tests pin: + + * empty / sparse data_dir returns ``status=skipped`` with the + appropriate reason (no all-NaN F pollution of the weekly series) + * insufficient loading-column coverage (pre-#324 cache) returns + ``status=skipped`` — auto-activates once C.1 loadings have + accumulated + * happy path persists F.parquet + D.parquet + metadata.json to + the expected S3 prefix + * S3 persist failure is non-fatal (warns + returns payload) + * dry_run path builds the model but skips the S3 write + * graceful read of malformed / unreadable parquets +""" + +from __future__ import annotations + +import json +import tempfile +from pathlib import Path +from unittest.mock import MagicMock + +import numpy as np +import pandas as pd +import pytest + +from training.risk_model_persist import ( + FACTOR_LOADING_COLUMNS, + build_and_persist_risk_model, +) + + +def _write_synthetic_ticker( + data_dir: Path, + ticker: str, + dates: pd.DatetimeIndex, + rng: np.random.Generator, + *, + include_loadings: bool = True, +) -> None: + """Write a synthetic per-ticker parquet with OHLCV + (optionally) + the 8 factor-loading columns.""" + n = len(dates) + close = 100.0 * np.exp(np.cumsum(rng.normal(0, 0.01, size=n))) + cols = { + "Open": close, + "High": close * 1.005, + "Low": close * 0.995, + "Close": close, + "Volume": rng.integers(1_000_000, 10_000_000, size=n), + "VWAP": close, + "source": ["yfinance"] * n, + } + if include_loadings: + for col in FACTOR_LOADING_COLUMNS: + cols[col] = rng.normal(0, 1, size=n) + df = pd.DataFrame(cols, index=dates) + df.to_parquet(data_dir / f"{ticker}.parquet", engine="pyarrow") + + +@pytest.fixture +def synthetic_universe_dir(tmp_path): + """Build a 60-date × 40-ticker synthetic universe cache with all 8 + factor-loading columns populated — enough data to clear both the + minimum-dates and minimum-tickers gates.""" + rng = np.random.default_rng(42) + data_dir = tmp_path / "cache" + data_dir.mkdir() + dates = pd.date_range("2026-01-01", periods=80, freq="D") + for i in range(40): + _write_synthetic_ticker(data_dir, f"TICK{i:02d}", dates, rng) + return data_dir + + +# ── Sparse-data skip paths ────────────────────────────────────────────── + + +class TestSparseDataSkipPaths: + def test_empty_data_dir_returns_skipped(self, tmp_path): + data_dir = tmp_path / "empty" + data_dir.mkdir() + result = build_and_persist_risk_model( + data_dir=data_dir, + bucket="test-bucket", + date_str="2026-05-27", + s3_client=MagicMock(), + ) + assert result["status"] == "skipped" + assert result["n_dates"] == 0 + assert "returns_panel has only 0 dates" in result["reason"] + + def test_below_min_dates_returns_skipped(self, tmp_path): + rng = np.random.default_rng(1) + data_dir = tmp_path / "shallow" + data_dir.mkdir() + # Only 10 dates — below the 60-date threshold + dates = pd.date_range("2026-01-01", periods=10, freq="D") + for i in range(40): + _write_synthetic_ticker(data_dir, f"TICK{i:02d}", dates, rng) + result = build_and_persist_risk_model( + data_dir=data_dir, + bucket="test-bucket", + date_str="2026-05-27", + s3_client=MagicMock(), + ) + assert result["status"] == "skipped" + assert "returns_panel has only" in result["reason"] + + def test_below_min_tickers_with_loadings_returns_skipped(self, tmp_path): + rng = np.random.default_rng(2) + data_dir = tmp_path / "no_loadings" + data_dir.mkdir() + dates = pd.date_range("2026-01-01", periods=80, freq="D") + # 20 tickers WITH loadings, 5 tickers without — below the 30- + # ticker threshold + for i in range(20): + _write_synthetic_ticker(data_dir, f"WITH{i:02d}", dates, rng) + for i in range(5): + _write_synthetic_ticker( + data_dir, f"WITHOUT{i:02d}", dates, rng, + include_loadings=False, + ) + result = build_and_persist_risk_model( + data_dir=data_dir, + bucket="test-bucket", + date_str="2026-05-27", + s3_client=MagicMock(), + ) + assert result["status"] == "skipped" + assert "tickers carry all" in result["reason"] + assert "pre-#324 universe cache" in result["reason"] + + def test_no_loading_columns_returns_skipped(self, tmp_path): + """Every ticker carries OHLCV but ZERO loading columns — + pre-2026-05-26 universe cache shape.""" + rng = np.random.default_rng(3) + data_dir = tmp_path / "pre_c1" + data_dir.mkdir() + dates = pd.date_range("2026-01-01", periods=80, freq="D") + for i in range(40): + _write_synthetic_ticker( + data_dir, f"TICK{i:02d}", dates, rng, + include_loadings=False, + ) + result = build_and_persist_risk_model( + data_dir=data_dir, + bucket="test-bucket", + date_str="2026-05-27", + s3_client=MagicMock(), + ) + assert result["status"] == "skipped" + assert "0 tickers carry all" in result["reason"] + + +# ── Happy path ────────────────────────────────────────────────────────── + + +class TestHappyPath: + def test_persists_F_D_metadata_to_expected_prefix( + self, synthetic_universe_dir + ): + s3 = MagicMock() + result = build_and_persist_risk_model( + data_dir=synthetic_universe_dir, + bucket="alpha-engine-research", + date_str="2026-05-27", + s3_client=s3, + ) + assert result["status"] == "ok" + assert result["F_shape"][0] == result["F_shape"][1] + assert result["D_length"] > 0 + + # 3 put_object calls: F.parquet, D.parquet, metadata.json + keys = [c.kwargs["Key"] for c in s3.put_object.call_args_list] + assert "risk_model/2026-05-27/F.parquet" in keys + assert "risk_model/2026-05-27/D.parquet" in keys + assert "risk_model/2026-05-27/metadata.json" in keys + + # metadata is JSON + meta_call = next( + c for c in s3.put_object.call_args_list + if c.kwargs["Key"].endswith("metadata.json") + ) + meta = json.loads(meta_call.kwargs["Body"]) + assert "n_dates" in meta + assert "K_eff" in meta + + def test_F_is_square_symmetric(self, synthetic_universe_dir): + """F MUST be square (K × K). C.3 will use it in Σ = B·F·Bᵀ + D; + non-square breaks the matmul + the PSD reconstruction.""" + result = build_and_persist_risk_model( + data_dir=synthetic_universe_dir, + bucket="test-bucket", + date_str="2026-05-27", + dry_run=True, + ) + assert result["status"] == "ok" + F_rows, F_cols = result["F_shape"] + assert F_rows == F_cols + + +# ── dry_run skips S3 ─────────────────────────────────────────────────── + + +class TestDryRun: + def test_dry_run_builds_model_but_no_s3(self, synthetic_universe_dir): + s3 = MagicMock() + result = build_and_persist_risk_model( + data_dir=synthetic_universe_dir, + bucket="test-bucket", + date_str="2026-05-27", + dry_run=True, + s3_client=s3, + ) + assert result["status"] == "ok" + s3.put_object.assert_not_called() + + +# ── S3 persist failure ───────────────────────────────────────────────── + + +class TestS3PersistFailure: + def test_s3_failure_does_not_raise( + self, synthetic_universe_dir, caplog + ): + s3 = MagicMock() + s3.put_object.side_effect = RuntimeError("S3 outage") + result = build_and_persist_risk_model( + data_dir=synthetic_universe_dir, + bucket="test-bucket", + date_str="2026-05-27", + s3_client=s3, + ) + # status=ok because the model was built; the write was best-effort + assert result["status"] == "ok" + assert any( + "S3 persist failed" in record.message + for record in caplog.records + ) + + +# ── Malformed parquet handling ───────────────────────────────────────── + + +class TestMalformedParquet: + def test_unreadable_parquet_is_skipped( + self, synthetic_universe_dir, caplog + ): + """A garbage/truncated parquet file in the cache should be + silently skipped — don't let one bad ticker abort the whole + weekly risk-model build.""" + garbage = synthetic_universe_dir / "BADTICK.parquet" + garbage.write_bytes(b"not a parquet") + # Should still succeed with the remaining 40 valid tickers + result = build_and_persist_risk_model( + data_dir=synthetic_universe_dir, + bucket="test-bucket", + date_str="2026-05-27", + dry_run=True, + ) + assert result["status"] == "ok" + assert result["n_tickers"] == 40 # garbage skipped, others survived diff --git a/training/risk_model_persist.py b/training/risk_model_persist.py new file mode 100644 index 0000000..a054130 --- /dev/null +++ b/training/risk_model_persist.py @@ -0,0 +1,289 @@ +"""training/risk_model_persist.py — C.2b production-persistence layer. + +ROADMAP C.2b — builds the Barra-style structural factor risk model +(F = factor-return covariance, D = idiosyncratic variance) from the +universe library's price + factor-loading columns, and persists the +two parquet artifacts weekly to +``s3://{bucket}/risk_model/{date}/{F,D,metadata}.parquet`` from the +``PredictorTraining`` Saturday SF stage. + +The pure-math primitives live in ``risk_model/__init__.py`` (C.2a, +PR #200). This module is the wiring layer — local-parquet read, +DataFrame pivoting, S3 write. + +**Hard sequencing constraint (per C.3):** Σ = B · F · Bᵀ + D is NOT +wired into ``executor.portfolio_optimizer.solve_target_weights`` until +B.5 (α̂-uncertainty cutover gate) passes. Two simultaneous Σ-substrate +changes make backtester regressions untraceable. C.2b ships the +weekly persistence so by the time C.3 reads ``risk_model/{date}/``, +there are ≥4 weeks of F + D accumulated. + +**Graceful degradation.** The 8 factor-loading ``*_zscore`` columns +shipped 2026-05-26 evening in alpha-engine-data #324 — older universe- +library rows don't carry them. The builder is best-effort: if zero +loading columns are present in the local cache, it returns +``status=skipped`` with a clear reason rather than producing an +all-NaN F. The Saturday SF will start emitting real F + D once a +weeks' worth of post-#324 universe data exists. +""" + +from __future__ import annotations + +import json +import logging +import os +import time +from pathlib import Path + +import numpy as np +import pandas as pd + +from risk_model import build_factor_risk_model + +log = logging.getLogger(__name__) + +# Pinned factor-loading column registry. Mirrors the +# alpha-engine-data ``features/registry.py`` ``factor_loading`` +# entries (PR #324). Order is the canonical Barra-style factor order: +# momentum → return → beta → idio_vol → realized_vol → dist_high → value → quality. +FACTOR_LOADING_COLUMNS: tuple[str, ...] = ( + "momentum_20d_zscore", + "return_60d_zscore", + "beta_60d_zscore", + "idio_vol_60d_zscore", + "realized_vol_63d_zscore", + "dist_from_52w_high_zscore", + "pe_ratio_zscore", + "roe_zscore", +) + +# Minimum data thresholds before we attempt the F+D build. If fewer +# tickers carry loading columns, the cross-sectional regression at +# each date will be rank-deficient (K=8 + intercept needs >>9 obs to +# be meaningful). 30 is the same floor risk_model.estimate_factor_ +# covariance defaults to for cov_obs. +_MIN_TICKERS_WITH_LOADINGS = 30 +_MIN_DATES_WITH_RETURNS = 60 + + +def _load_universe_panels( + data_dir: str | os.PathLike, +) -> tuple[pd.DataFrame, dict[pd.Timestamp, pd.DataFrame]]: + """Read every ticker parquet under ``data_dir`` and pivot into: + + * returns_panel: (T × N) DataFrame of close-to-close log returns, + DatetimeIndex × ticker columns. NaNs preserved (tickers with + gaps for a date don't get filled). + * loadings_by_date: ``{date: (N × K) DataFrame}`` — for each + date, the per-ticker factor-loading vector (one row per + ticker that has all ``FACTOR_LOADING_COLUMNS`` non-NaN on + that date). + + Skips macro files (no ``Close`` column expected to behave like a + ticker — macros are not part of the universe). + """ + data_dir = Path(data_dir) + if not data_dir.is_dir(): + raise ValueError(f"data_dir not found: {data_dir}") + + returns_by_ticker: dict[str, pd.Series] = {} + loadings_rows: list[tuple[pd.Timestamp, str, np.ndarray]] = [] + + parquets = sorted(data_dir.glob("*.parquet")) + log.info( + "risk_model_persist: scanning %d parquet files in %s", + len(parquets), data_dir, + ) + + for path in parquets: + ticker = path.stem + try: + df = pd.read_parquet(path) + except Exception as exc: + log.debug("Skip %s: read failed (%s)", ticker, exc) + continue + if "Close" not in df.columns or df.empty: + continue + if not isinstance(df.index, pd.DatetimeIndex): + try: + df.index = pd.to_datetime(df.index) + except Exception: + log.debug("Skip %s: index not DatetimeIndex", ticker) + continue + + # Close → log returns. NaN on first row by construction. + close = df["Close"].astype(float) + log_ret = np.log(close / close.shift(1)) + returns_by_ticker[ticker] = log_ret + + # Extract factor-loading columns. If any of the 8 are missing + # in this ticker's parquet, skip the ticker entirely (rank- + # deficient row at every date). + available = [c for c in FACTOR_LOADING_COLUMNS if c in df.columns] + if len(available) != len(FACTOR_LOADING_COLUMNS): + continue + loadings = df[list(FACTOR_LOADING_COLUMNS)].astype(float) + for date, row in loadings.iterrows(): + if row.isna().any(): + continue + loadings_rows.append((date, ticker, row.values)) + + returns_panel = pd.DataFrame(returns_by_ticker).sort_index() + + # Pivot loadings_rows into a per-date dict of N×K DataFrames. + loadings_by_date: dict[pd.Timestamp, pd.DataFrame] = {} + by_date_buckets: dict[pd.Timestamp, list[tuple[str, np.ndarray]]] = {} + for date, ticker, row in loadings_rows: + by_date_buckets.setdefault(date, []).append((ticker, row)) + for date, items in by_date_buckets.items(): + tickers = [t for t, _ in items] + mat = np.vstack([r for _, r in items]) + loadings_by_date[date] = pd.DataFrame( + mat, + index=tickers, + columns=list(FACTOR_LOADING_COLUMNS), + ) + + return returns_panel, loadings_by_date + + +def _write_parquet_to_s3(s3_client, bucket: str, key: str, df: pd.DataFrame) -> None: + """Serialize ``df`` to in-memory parquet and put it to S3 at + ``s3://{bucket}/{key}``. Caller wraps in best-effort try/except.""" + import io + + buf = io.BytesIO() + df.to_parquet(buf, engine="pyarrow", compression="snappy") + buf.seek(0) + s3_client.put_object( + Bucket=bucket, + Key=key, + Body=buf.getvalue(), + ContentType="application/octet-stream", + ) + + +def build_and_persist_risk_model( + data_dir: str | os.PathLike, + bucket: str, + date_str: str, + *, + dry_run: bool = False, + s3_client=None, +) -> dict: + """Build F + D from the universe-library local cache and persist + to S3. + + Returns a status dict shaped like the sweep stages in + ``alpha-engine-backtester/backtest.py`` (status=ok|skipped, plus + metadata). Non-fatal: callers wrap in try/except so a failure + here doesn't abort the training pipeline. + + Args: + data_dir: local directory of per-ticker parquets (the same + ``tmp_cache`` ``train_handler.main`` populated from + ``download_from_arctic``). + bucket: S3 bucket for the persistence layer. + date_str: Saturday SF run date in ``YYYY-MM-DD``. Used as + the per-week S3 prefix. + dry_run: when True, build the model but skip the S3 write. + s3_client: optional boto3 client override (for tests). + """ + t0 = time.time() + log.info("risk_model_persist: starting (date=%s, bucket=%s)", date_str, bucket) + + returns_panel, loadings_by_date = _load_universe_panels(data_dir) + + # Sanity / coverage gates — return skipped on insufficient data + # rather than producing an all-NaN F that pollutes the weekly + # accumulation series. + if returns_panel.empty or returns_panel.shape[0] < _MIN_DATES_WITH_RETURNS: + reason = ( + f"returns_panel has only {returns_panel.shape[0]} dates " + f"(< {_MIN_DATES_WITH_RETURNS} threshold) — universe cache " + f"may be empty or freshly bootstrapped" + ) + log.warning("risk_model_persist: skipped — %s", reason) + return { + "status": "skipped", + "date": date_str, + "reason": reason, + "n_dates": int(returns_panel.shape[0]), + "n_tickers": int(returns_panel.shape[1]), + "n_dates_with_loadings": 0, + } + + n_tickers_with_loadings = ( + max(df.shape[0] for df in loadings_by_date.values()) + if loadings_by_date else 0 + ) + if n_tickers_with_loadings < _MIN_TICKERS_WITH_LOADINGS: + reason = ( + f"only {n_tickers_with_loadings} tickers carry all " + f"{len(FACTOR_LOADING_COLUMNS)} factor-loading columns " + f"(< {_MIN_TICKERS_WITH_LOADINGS} threshold) — likely " + "pre-#324 universe cache; activates automatically once " + "alpha-engine-data PR #324's loadings have accumulated" + ) + log.warning("risk_model_persist: skipped — %s", reason) + return { + "status": "skipped", + "date": date_str, + "reason": reason, + "n_dates": int(returns_panel.shape[0]), + "n_tickers": int(returns_panel.shape[1]), + "n_dates_with_loadings": len(loadings_by_date), + "max_tickers_per_loading_date": n_tickers_with_loadings, + } + + log.info( + "risk_model_persist: building model — " + "%d dates × %d tickers, %d loading-dates", + returns_panel.shape[0], returns_panel.shape[1], + len(loadings_by_date), + ) + + model = build_factor_risk_model( + returns_panel=returns_panel, + loadings_by_date=loadings_by_date, + ) + + if not dry_run: + import boto3 + + s3 = s3_client or boto3.client("s3") + prefix = f"risk_model/{date_str}" + try: + _write_parquet_to_s3(s3, bucket, f"{prefix}/F.parquet", model["F"]) + _write_parquet_to_s3( + s3, bucket, f"{prefix}/D.parquet", + model["D"].to_frame(name="idiosyncratic_variance"), + ) + s3.put_object( + Bucket=bucket, + Key=f"{prefix}/metadata.json", + Body=json.dumps(model["metadata"], indent=2).encode("utf-8"), + ContentType="application/json", + ) + log.info("risk_model_persist: persisted s3://%s/%s/", bucket, prefix) + except Exception as exc: + log.warning( + "risk_model_persist: S3 persist failed (non-fatal): %s", exc, + ) + + elapsed = time.time() - t0 + log.info( + "risk_model_persist: ok — F shape=%s, D length=%d, %.1fs", + tuple(model["F"].shape), len(model["D"]), elapsed, + ) + return { + "status": "ok", + "date": date_str, + "elapsed_seconds": round(elapsed, 1), + "n_dates": int(returns_panel.shape[0]), + "n_tickers": int(returns_panel.shape[1]), + "n_dates_with_loadings": len(loadings_by_date), + "F_shape": list(model["F"].shape), + "D_length": int(len(model["D"])), + "metadata": model["metadata"], + } diff --git a/training/train_handler.py b/training/train_handler.py index 4930eb5..4274f7f 100644 --- a/training/train_handler.py +++ b/training/train_handler.py @@ -1036,6 +1036,45 @@ def main( except Exception as _sum_err: log.warning("Training summary write failed (non-blocking): %s", _sum_err) + # Step 2d2: Factor-risk-model F + D weekly persistence (ROADMAP C.2b). + # Reads the per-ticker parquets the train_handler already populated + # to ``tmp_cache`` from ArcticDB, extracts close → log returns + the + # 8 ``*_zscore`` factor-loading columns (C.1, alpha-engine-data + # #324), runs the Fama-MacBeth cross-sectional regression + # (C.2a, ``risk_model.build_factor_risk_model``), and persists F + D + # parquets to ``s3://{bucket}/risk_model/{date}/``. Non-blocking: + # failure here doesn't abort the training pipeline. Auto-skips + # when loading-column coverage is insufficient (pre-#324 cache). + # + # The actual wiring of Σ = B · F · Bᵀ + D into + # ``executor.portfolio_optimizer.solve_target_weights`` is C.3 — + # under a hard sequencing constraint per the plan doc: do NOT + # wire until B.5 (α̂-uncertainty cutover gate) passes. Two + # simultaneous Σ-substrate changes make backtester regressions + # untraceable. C.2b ships the weekly persistence so by the time + # C.3 reads ``risk_model/{date}/``, there are ≥4 weeks of F + D + # accumulated. + if not dry_run: + try: + from training.risk_model_persist import ( + build_and_persist_risk_model, + ) + _rm_payload = build_and_persist_risk_model( + data_dir=str(tmp_cache), + bucket=bucket, + date_str=date_str, + ) + result["risk_model"] = _rm_payload + log.info( + "risk_model_persist: status=%s", + _rm_payload.get("status"), + ) + except Exception as _rm_err: + log.warning( + "risk_model_persist failed (non-blocking): %s", + _rm_err, exc_info=True, + ) + # Step 2e: Triple-barrier cutover gate (Stage 3 PR 5 SF wiring). # Runs after model upload + training summary so the gate has access # to the freshly-promoted meta_model_tb.pkl and the most recent ~6