diff --git a/builders/migrate_universe_vwap.py b/builders/migrate_universe_vwap.py new file mode 100644 index 0000000..a997370 --- /dev/null +++ b/builders/migrate_universe_vwap.py @@ -0,0 +1,265 @@ +"""builders/migrate_universe_vwap.py — normalize VWAP column position in ArcticDB universe. + +Background (2026-04-27 EOD-email blackout investigation): +--------------------------------------------------------- +Universe ArcticDB had heterogeneous schemas: + - 816 symbols (~90%): 64 cols, no VWAP at all + - 88 symbols (~10%): 65 cols, VWAP at idx=64 (appended at end) + +``builders/daily_append`` writes via +``OHLCV_COLS = [Open, High, Low, Close, Volume, VWAP]``, so the per-stock row +has VWAP at idx=5 (between Volume and the feature block). ArcticDB's +``update()`` requires the argument's column order to match the stored version +exactly — both schema variants above mismatch the canonical write path, so +every per-stock write failed today with:: + + The columns (names and types) in the argument are not identical to that + of the existing version: UPDATE + -FD + +FD + +FD + +This blocked daily_append step 4 entirely, which left every held ticker +without a 2026-04-27 row in universe_lib, which then hard-failed EOD +reconcile's per-position close lookup. (The structural fix in PR #104 +already decoupled macro/SPY freshness from this — SPY landed cleanly. The +remaining surface is the per-stock universe write.) + +Operational design (yfinance EOD → polygon morning): +---------------------------------------------------- +- yfinance EOD post-close hook writes daily_closes parquet with VWAP=NaN + (yfinance does not expose true volume-weighted VWAP). +- polygon morning enrichment overwrites the parquet with real VWAP values + (true volume-weighted VWAP from polygon grouped-daily). +- daily_append runs end-of-day and writes whatever VWAP is in the parquet + to ArcticDB universe — NaN initially, real values after morning re-run. + +For that flow to work, VWAP must be a first-class column in the universe +schema with a stable position. This migration normalizes every symbol to:: + + [Open, High, Low, Close, Volume, VWAP] + FEATURES + +(canonical OHLCV_COLS order followed by the feature block). NaN-fills VWAP +historically for the 816 symbols that didn't have it. Repositions VWAP for +the 88 symbols that had it appended at the end. + +Idempotent — symbols already in canonical order are skipped. + +Usage:: + + python -m builders.migrate_universe_vwap # dry-run + python -m builders.migrate_universe_vwap --apply # actually write + python -m builders.migrate_universe_vwap --apply --tickers AAPL,MO # subset +""" + +from __future__ import annotations + +import argparse +import json +import logging +from datetime import datetime, timezone + +import boto3 +import numpy as np + +from features.compute import DEFAULT_BUCKET +from store.arctic_store import get_universe_lib + +log = logging.getLogger(__name__) + +OHLCV_COLS_CANONICAL = ["Open", "High", "Low", "Close", "Volume", "VWAP"] +AUDIT_PREFIX = "builders/migrate_universe_vwap_audit/" + + +def _canonical_column_order(existing_cols: list[str]) -> list[str]: + """Return the canonical column ordering for a universe symbol. + + OHLCV_COLS_CANONICAL first (with VWAP inserted at idx=5 if missing), + then every existing non-OHLCV column in its current relative order + (i.e. the feature block stays as-is). Drops nothing. + """ + ohlcv_set = set(OHLCV_COLS_CANONICAL) + feature_block = [c for c in existing_cols if c not in ohlcv_set] + return list(OHLCV_COLS_CANONICAL) + feature_block + + +def _is_canonical(existing_cols: list[str]) -> bool: + """True iff existing column order already matches the canonical layout.""" + if "VWAP" not in existing_cols: + return False + canonical = _canonical_column_order(existing_cols) + return list(existing_cols) == canonical + + +def _write_audit(s3, bucket: str, summary: dict) -> None: + ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + key = f"{AUDIT_PREFIX}{ts}.json" + s3.put_object( + Bucket=bucket, + Key=key, + Body=json.dumps(summary, indent=2, default=str).encode("utf-8"), + ContentType="application/json", + ) + log.info("Wrote audit to s3://%s/%s", bucket, key) + + +def migrate_universe_vwap( + *, + bucket: str = DEFAULT_BUCKET, + apply: bool = False, + tickers_override: list[str] | None = None, +) -> dict: + """Normalize universe symbols to canonical [OHLCV+VWAP, FEATURES] ordering. + + Parameters + ---------- + bucket + S3 bucket holding ArcticDB. + apply + If True, actually write the reordered/added-VWAP frames. Default + False (dry-run; counts only). + tickers_override + Subset of symbols to migrate (rest are left alone). Useful for + canary runs and one-off repairs. ``None`` = every symbol in the + universe library. + + Returns + ------- + summary dict with the action plan and outcome. + """ + s3 = boto3.client("s3") + universe_lib = get_universe_lib(bucket) + + arctic_symbols = sorted(universe_lib.list_symbols()) + log.info("ArcticDB universe holds %d symbols", len(arctic_symbols)) + + if tickers_override is not None: + targets = sorted(set(tickers_override) & set(arctic_symbols)) + ignored = sorted(set(tickers_override) - set(arctic_symbols)) + if ignored: + log.warning( + "Skipping %d tickers from --tickers override that aren't in " + "ArcticDB: %s", + len(ignored), ignored, + ) + else: + targets = arctic_symbols + + migrated: list[dict] = [] + already_canonical: list[str] = [] + errors: list[dict] = [] + + for ticker in targets: + try: + df = universe_lib.read(ticker).data + except Exception as exc: + log.error("Could not read %s: %s", ticker, exc) + errors.append({"ticker": ticker, "stage": "read", "error": str(exc)}) + continue + + existing_cols = list(df.columns) + if _is_canonical(existing_cols): + already_canonical.append(ticker) + continue + + # Add VWAP if absent. FLOAT64 matches the daily_append write dtype + # (pandas-inferred from the closes parquet) so future update() calls + # don't trip the dtype-mismatch path. + if "VWAP" not in df.columns: + df["VWAP"] = np.nan # float64 by default + + canonical = _canonical_column_order(list(df.columns)) + df = df[canonical] + + record = { + "ticker": ticker, + "rows": len(df), + "had_vwap": "VWAP" in existing_cols, + "previous_vwap_idx": ( + existing_cols.index("VWAP") if "VWAP" in existing_cols else None + ), + "new_vwap_idx": canonical.index("VWAP"), + } + + if apply: + try: + universe_lib.write(ticker, df, prune_previous_versions=True) + except Exception as exc: + log.error("Failed to write %s: %s", ticker, exc) + errors.append({"ticker": ticker, "stage": "write", "error": str(exc)}) + continue + log.info( + "MIGRATED ticker=%s rows=%d previous_vwap_idx=%s -> new_vwap_idx=%d", + ticker, len(df), record["previous_vwap_idx"], record["new_vwap_idx"], + ) + else: + log.info( + "DRY-RUN would migrate ticker=%s rows=%d previous_vwap_idx=%s -> new_vwap_idx=%d", + ticker, len(df), record["previous_vwap_idx"], record["new_vwap_idx"], + ) + migrated.append(record) + + summary = { + "status": "ok" if not errors else "partial", + "applied": apply, + "arctic_universe_size": len(arctic_symbols), + "targets_count": len(targets), + "migrated_count": len(migrated), + "already_canonical_count": len(already_canonical), + "errors_count": len(errors), + "migrated": migrated, + "already_canonical": already_canonical, + "errors": errors, + } + + log.info( + "migrate_universe_vwap: applied=%s targets=%d migrated=%d " + "already_canonical=%d errors=%d", + apply, len(targets), len(migrated), len(already_canonical), len(errors), + ) + + _write_audit(s3, bucket, summary) + + return summary + + +def main(): + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s %(message)s", + ) + parser = argparse.ArgumentParser(description=__doc__.split("\n")[0]) + parser.add_argument( + "--apply", + action="store_true", + help="Actually rewrite. Default dry-run.", + ) + parser.add_argument( + "--tickers", + help="Comma-separated subset of tickers to migrate (default: all).", + ) + parser.add_argument( + "--bucket", + default=DEFAULT_BUCKET, + help=f"S3 bucket (default: {DEFAULT_BUCKET})", + ) + args = parser.parse_args() + + tickers_override = ( + [t.strip() for t in args.tickers.split(",") if t.strip()] + if args.tickers + else None + ) + + result = migrate_universe_vwap( + bucket=args.bucket, + apply=args.apply, + tickers_override=tickers_override, + ) + print(json.dumps(result, indent=2, default=str)) + if result["errors_count"] > 0: + raise SystemExit(1) + + +if __name__ == "__main__": + main() diff --git a/tests/test_migrate_universe_vwap.py b/tests/test_migrate_universe_vwap.py new file mode 100644 index 0000000..308ef13 --- /dev/null +++ b/tests/test_migrate_universe_vwap.py @@ -0,0 +1,263 @@ +"""Tests for builders/migrate_universe_vwap.py.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import numpy as np +import pandas as pd +import pytest + +from builders.migrate_universe_vwap import ( + OHLCV_COLS_CANONICAL, + _canonical_column_order, + _is_canonical, + migrate_universe_vwap, +) + + +def _stock_frame(cols: list[str], rows: int = 5) -> pd.DataFrame: + idx = pd.date_range("2024-01-01", periods=rows, freq="B") + return pd.DataFrame( + {c: np.linspace(1.0, 2.0, rows) for c in cols}, + index=idx, + ) + + +# ── _canonical_column_order / _is_canonical ────────────────────────────────── + + +def test_canonical_order_inserts_vwap_at_idx5(): + """OHLCV_COLS_CANONICAL puts VWAP at index 5.""" + assert OHLCV_COLS_CANONICAL.index("VWAP") == 5 + + +def test_canonical_order_with_features_puts_vwap_idx5(): + """Existing layout with VWAP at end gets relocated to idx=5.""" + existing = ["Open", "High", "Low", "Close", "Volume", + "rsi_14", "macd_cross", "VWAP"] + canonical = _canonical_column_order(existing) + assert canonical[5] == "VWAP" + assert canonical[:6] == OHLCV_COLS_CANONICAL + # Feature block preserved in its relative order, with VWAP removed + # from the suffix and re-inserted in the OHLCV block. + assert canonical[6:] == ["rsi_14", "macd_cross"] + + +def test_canonical_order_with_no_vwap_inserts_it(): + existing = ["Open", "High", "Low", "Close", "Volume", "rsi_14", "macd_cross"] + canonical = _canonical_column_order(existing) + assert canonical == [ + "Open", "High", "Low", "Close", "Volume", "VWAP", + "rsi_14", "macd_cross", + ] + + +def test_is_canonical_recognizes_correct_layout(): + assert _is_canonical([ + "Open", "High", "Low", "Close", "Volume", "VWAP", + "rsi_14", "macd_cross", + ]) + + +def test_is_canonical_rejects_appended_vwap(): + assert not _is_canonical([ + "Open", "High", "Low", "Close", "Volume", + "rsi_14", "macd_cross", "VWAP", + ]) + + +def test_is_canonical_rejects_missing_vwap(): + assert not _is_canonical([ + "Open", "High", "Low", "Close", "Volume", + "rsi_14", "macd_cross", + ]) + + +# ── migrate_universe_vwap (functional) ────────────────────────────────────── + + +def _patch_libs(monkeypatch, tickers_to_frames: dict[str, pd.DataFrame]): + """Stub out the universe lib + s3 client so the migration runs in-memory.""" + from builders import migrate_universe_vwap as _m + + universe_lib = MagicMock() + universe_lib.list_symbols.return_value = list(tickers_to_frames.keys()) + + # Track the in-memory state across read/write so the test can verify + # that reorder + repeat-call is idempotent. + state = {t: df.copy() for t, df in tickers_to_frames.items()} + + def _read(ticker): + result = MagicMock() + result.data = state[ticker].copy() + return result + + def _write(ticker, df, prune_previous_versions=False): + state[ticker] = df.copy() + return None + + universe_lib.read.side_effect = _read + universe_lib.write.side_effect = _write + + monkeypatch.setattr(_m, "get_universe_lib", lambda *a, **k: universe_lib) + monkeypatch.setattr(_m, "boto3", MagicMock()) + + # Don't actually upload audit JSON + monkeypatch.setattr(_m, "_write_audit", MagicMock()) + + return universe_lib, state + + +def test_migration_dry_run_makes_no_writes(monkeypatch): + frames = { + "AAPL": _stock_frame([ + "Open", "High", "Low", "Close", "Volume", + "rsi_14", "macd_cross", + ]), + } + universe_lib, state = _patch_libs(monkeypatch, frames) + result = migrate_universe_vwap(apply=False) + assert result["migrated_count"] == 1 + assert universe_lib.write.call_count == 0 + # In-memory state still without VWAP + assert "VWAP" not in state["AAPL"].columns + + +def test_migration_apply_inserts_vwap_at_idx5(monkeypatch): + frames = { + "AAPL": _stock_frame([ + "Open", "High", "Low", "Close", "Volume", + "rsi_14", "macd_cross", + ]), + } + universe_lib, state = _patch_libs(monkeypatch, frames) + result = migrate_universe_vwap(apply=True) + assert result["migrated_count"] == 1 + assert result["errors_count"] == 0 + assert universe_lib.write.call_count == 1 + final = state["AAPL"] + assert list(final.columns)[:6] == OHLCV_COLS_CANONICAL + assert list(final.columns) == [ + "Open", "High", "Low", "Close", "Volume", "VWAP", + "rsi_14", "macd_cross", + ] + # VWAP starts as float64 NaN + assert final["VWAP"].dtype == np.float64 + assert final["VWAP"].isna().all() + + +def test_migration_apply_relocates_vwap_from_end(monkeypatch): + """Symbols that have VWAP appended at idx=last get it moved to idx=5.""" + frames = { + "MO": _stock_frame([ + "Open", "High", "Low", "Close", "Volume", + "rsi_14", "macd_cross", "VWAP", + ]), + } + universe_lib, state = _patch_libs(monkeypatch, frames) + result = migrate_universe_vwap(apply=True) + assert result["migrated_count"] == 1 + final = state["MO"] + assert list(final.columns) == [ + "Open", "High", "Low", "Close", "Volume", "VWAP", + "rsi_14", "macd_cross", + ] + # Reorder must preserve existing VWAP values, not NaN them out + assert not final["VWAP"].isna().any() + + +def test_migration_skips_already_canonical(monkeypatch): + frames = { + "GOOG": _stock_frame([ + "Open", "High", "Low", "Close", "Volume", "VWAP", + "rsi_14", "macd_cross", + ]), + } + universe_lib, state = _patch_libs(monkeypatch, frames) + result = migrate_universe_vwap(apply=True) + assert result["migrated_count"] == 0 + assert result["already_canonical_count"] == 1 + assert universe_lib.write.call_count == 0 + + +def test_migration_idempotent(monkeypatch): + """Running twice must not change the second-run result.""" + frames = { + "AAPL": _stock_frame([ + "Open", "High", "Low", "Close", "Volume", + "rsi_14", "macd_cross", + ]), + } + universe_lib, state = _patch_libs(monkeypatch, frames) + first = migrate_universe_vwap(apply=True) + second = migrate_universe_vwap(apply=True) + assert first["migrated_count"] == 1 + assert second["migrated_count"] == 0 + assert second["already_canonical_count"] == 1 + + +def test_migration_tickers_override_filters_to_subset(monkeypatch): + frames = { + "AAPL": _stock_frame([ + "Open", "High", "Low", "Close", "Volume", + "rsi_14", + ]), + "MSFT": _stock_frame([ + "Open", "High", "Low", "Close", "Volume", + "rsi_14", + ]), + } + universe_lib, state = _patch_libs(monkeypatch, frames) + result = migrate_universe_vwap(apply=True, tickers_override=["AAPL"]) + assert result["migrated_count"] == 1 + assert result["targets_count"] == 1 + assert "VWAP" in state["AAPL"].columns + # MSFT untouched + assert "VWAP" not in state["MSFT"].columns + + +def test_migration_records_errors_without_aborting(monkeypatch): + """One symbol blowing up on write must not stop the rest.""" + from builders import migrate_universe_vwap as _m + + frames = { + "AAPL": _stock_frame([ + "Open", "High", "Low", "Close", "Volume", "rsi_14", + ]), + "BREAKS": _stock_frame([ + "Open", "High", "Low", "Close", "Volume", "rsi_14", + ]), + } + universe_lib, state = _patch_libs(monkeypatch, frames) + + def _selective_write(ticker, df, prune_previous_versions=False): + if ticker == "BREAKS": + raise RuntimeError("simulated arctic write failure") + state[ticker] = df.copy() + + universe_lib.write.side_effect = _selective_write + + result = migrate_universe_vwap(apply=True) + assert result["migrated_count"] == 1 # AAPL succeeded + assert result["errors_count"] == 1 + assert result["errors"][0]["ticker"] == "BREAKS" + assert result["status"] == "partial" + + +def test_migration_preserves_feature_block_order(monkeypatch): + """The feature block must keep its existing relative ordering — only + the OHLCV+VWAP prefix gets normalized.""" + frames = { + "AAPL": _stock_frame([ + "Open", "High", "Low", "Close", "Volume", + "feat_z", "feat_a", "feat_m", # deliberately not alphabetic + ]), + } + universe_lib, state = _patch_libs(monkeypatch, frames) + migrate_universe_vwap(apply=True) + final = state["AAPL"] + assert list(final.columns) == [ + "Open", "High", "Low", "Close", "Volume", "VWAP", + "feat_z", "feat_a", "feat_m", + ]