diff --git a/sf_preflight.py b/sf_preflight.py new file mode 100644 index 0000000..dfd16c6 --- /dev/null +++ b/sf_preflight.py @@ -0,0 +1,790 @@ +""" +sf_preflight.py — Predict whether the Saturday SF would succeed BEFORE +launching a spot. + +Today's Saturday SF (alpha-engine-saturday-pipeline) is a 50-min spot run +that costs 1 polygon API call (free-tier 5/min budget) per attempt and a +spot bootstrap (~3 min wall-clock + IAM/SSM dance). Repeated launch-fail +cycles burn polygon quota and operator hours. This module simulates the +critical pre-Phase-1 path against real S3 + ArcticDB state and reports +predicted pass/fail per step BEFORE any compute fires. + +Usage: + python sf_preflight.py # human-readable summary + python sf_preflight.py --json # structured output + python sf_preflight.py --bucket # alternate bucket + +Exit codes: + 0 all checks pass — SF is predicted to succeed + 1 ≥1 check fails — fix before redrive + +Polygon API budget: 1 call total (one grouped-daily lookup for the prior +trading day). Same call the actual SF makes; reusable in spirit since the +SF re-fetches anyway in MorningEnrich. + +What this catches (mapped to today's incidents): + PR #130 (backfill regression) — check_backfill_source_freshness + PR #131 (polygon coverage flake) — check_polygon_grouped_coverage + PR #132 (missing-from-closes scoping) — check_predicted_missing_from_closes + PR #133 (freshness scan scoping) — check_universe_sample_freshness + PR #134 (workflow ordering) — check_universe_drift + PR #135 (return shape) — check_constituents_fetch + Postflight contracts — check_postflight_contracts + +What this CANNOT catch: + - Polygon coverage flipping AFTER preflight succeeds (transient + between preflight + actual SF kickoff). PR #131 is defense for this. + - ArcticDB write failures (we don't write here). + - Spot reclaim / SSM timeouts (infrastructure-level). +""" + +from __future__ import annotations + +import argparse +import json +import logging +import sys +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from typing import Any + +log = logging.getLogger(__name__) + +DEFAULT_BUCKET = "alpha-engine-research" + +# Same threshold daily_append uses (DAILY_APPEND_MISSING_THRESHOLD). +# Pre-MorningEnrich prune (PR #134) drops stragglers, so the residual +# count should be the chronic polygon-coverage gaps only (BF-B, BRK-B, +# MOG-A, PSTG = 4 today). +_MISSING_FROM_CLOSES_THRESHOLD = 5 + +# Universe-freshness scan threshold from builders/daily_append.py. +_UNIVERSE_FRESHNESS_MAX_STALE_DAYS = 5 + +# Postflight SPY freshness threshold (validators/postflight.py). +_POSTFLIGHT_SPY_MAX_STALE_DAYS = 1 + +# Sample size for the universe-freshness check; matches the post-write +# scan's _UNIVERSE_SCAN_WORKERS budget. +_UNIVERSE_SAMPLE_SIZE = 20 + + +# ── Result types ────────────────────────────────────────────────────────────── + + +@dataclass +class CheckResult: + name: str + status: str # "ok" | "warn" | "fail" + message: str + details: dict = field(default_factory=dict) + elapsed_seconds: float = 0.0 + + +@dataclass +class PreflightContext: + bucket: str + today: str # YYYY-MM-DD + prior_trading_day: str # YYYY-MM-DD + fresh_constituents: "set[str] | None" = None # populated by check_constituents_fetch + arctic_universe_symbols: "set[str] | None" = None # populated by check_arctic_connectivity + polygon_returned_tickers: "set[str] | None" = None # populated by check_polygon_grouped_coverage + # ArcticDB handles — initialized once in check_arctic_connectivity and + # reused across downstream checks. ArcticDB on macOS crashes in + # ``Aws::S3::S3Client::S3Client`` when ``adb.Arctic(uri)`` runs more + # than once per process (AWS SDK init race), so every check that needs + # arctic must read these from ctx instead of re-initializing. + universe_lib: "Any | None" = None + macro_lib: "Any | None" = None + + +# ── Individual checks ───────────────────────────────────────────────────────── + + +def check_constituents_fetch(ctx: PreflightContext) -> CheckResult: + """Catches PR #135 class: ``constituents.collect()`` return-shape regressions. + + Calls the real ``_fetch_constituents()`` (Wikipedia, no rate limit) and + asserts the contract: non-empty tickers, complete sector map. The S&P + 500/400 split must each contribute their expected ~500/~400 counts. + """ + import time + t0 = time.time() + try: + from collectors.constituents import _fetch_constituents + tickers, sector_map, sector_etf_map, sp500, sp400 = _fetch_constituents() + except Exception as exc: + return CheckResult( + name="constituents_fetch", + status="fail", + message=f"Wikipedia fetch raised: {exc}", + elapsed_seconds=time.time() - t0, + ) + + if not tickers: + return CheckResult( + name="constituents_fetch", + status="fail", + message="Wikipedia returned 0 tickers", + elapsed_seconds=time.time() - t0, + ) + if sp500 < 480 or sp500 > 520: + return CheckResult( + name="constituents_fetch", + status="fail", + message=f"S&P 500 count out of band: {sp500} (expected 480-520)", + elapsed_seconds=time.time() - t0, + ) + if sp400 < 380 or sp400 > 420: + return CheckResult( + name="constituents_fetch", + status="fail", + message=f"S&P 400 count out of band: {sp400} (expected 380-420)", + elapsed_seconds=time.time() - t0, + ) + unmapped = [t for t in tickers if t not in sector_map] + if unmapped: + return CheckResult( + name="constituents_fetch", + status="fail", + message=f"sector_map missing for {len(unmapped)} tickers (collect would raise)", + details={"unmapped_sample": unmapped[:10]}, + elapsed_seconds=time.time() - t0, + ) + + ctx.fresh_constituents = set(tickers) + return CheckResult( + name="constituents_fetch", + status="ok", + message=f"Wikipedia OK: {len(tickers)} tickers ({sp500} S&P 500 + {sp400} S&P 400)", + details={"total": len(tickers), "sp500": sp500, "sp400": sp400}, + elapsed_seconds=time.time() - t0, + ) + + +def check_arctic_connectivity(ctx: PreflightContext) -> CheckResult: + """ArcticDB cluster reachable + macro/universe libraries present. + + Mirrors the existing preflight.py ArcticDB probe but populates the + universe symbol set into the context for downstream checks. + """ + import time + t0 = time.time() + try: + import arcticdb as adb + uri = ( + f"s3s://s3.us-east-1.amazonaws.com:{ctx.bucket}" + "?path_prefix=arcticdb&aws_auth=true" + ) + arctic = adb.Arctic(uri) + libs = set(arctic.list_libraries()) + if "universe" not in libs or "macro" not in libs: + return CheckResult( + name="arctic_connectivity", + status="fail", + message=f"ArcticDB missing required libraries: have {sorted(libs)}", + elapsed_seconds=time.time() - t0, + ) + ctx.universe_lib = arctic.get_library("universe") + ctx.macro_lib = arctic.get_library("macro") + symbols = set(ctx.universe_lib.list_symbols()) + ctx.arctic_universe_symbols = symbols + except Exception as exc: + return CheckResult( + name="arctic_connectivity", + status="fail", + message=f"ArcticDB probe raised: {exc}", + elapsed_seconds=time.time() - t0, + ) + + return CheckResult( + name="arctic_connectivity", + status="ok", + message=f"ArcticDB reachable; universe library has {len(symbols)} symbols", + details={"universe_size": len(symbols)}, + elapsed_seconds=time.time() - t0, + ) + + +def check_universe_drift(ctx: PreflightContext) -> CheckResult: + """Catches PR #134 class: stragglers in arctic that aren't in + current constituents, predicting the pre-MorningEnrich prune outcome. + + Computes ``arctic - constituents``, identifies which would actually + be pruned (last_date >= 5d stale, matching PR #134's absent_days=5). + """ + import time + t0 = time.time() + if ctx.fresh_constituents is None or ctx.arctic_universe_symbols is None: + return CheckResult( + name="universe_drift", + status="fail", + message="Skipped: prior checks failed to populate context", + elapsed_seconds=time.time() - t0, + ) + + from features.compute import _SKIP_TICKERS, _is_sector_etf + + candidates = sorted( + s for s in ctx.arctic_universe_symbols + if s not in ctx.fresh_constituents + and s not in _SKIP_TICKERS + and not _is_sector_etf(s) + ) + + if not candidates: + return CheckResult( + name="universe_drift", + status="ok", + message="No straggler candidates (arctic ⊆ constituents)", + elapsed_seconds=time.time() - t0, + ) + + # Reuse the universe lib from check_arctic_connectivity to avoid the + # macOS arcticdb re-init crash (see PreflightContext docstring). + if ctx.universe_lib is None: + return CheckResult( + name="universe_drift", + status="fail", + message="Skipped: arctic_connectivity did not populate universe_lib", + elapsed_seconds=time.time() - t0, + ) + universe_lib = ctx.universe_lib + import pandas as pd + today_ts = pd.Timestamp(ctx.today) + + will_prune: list[dict] = [] + will_skip: list[dict] = [] + for ticker in candidates: + try: + df = universe_lib.tail(ticker, n=1).data + last_ts = pd.Timestamp(df.index[-1]).normalize() if not df.empty else None + except Exception: + last_ts = None + if last_ts is None: + will_skip.append({"ticker": ticker, "reason": "unreadable"}) + continue + days_stale = (today_ts.normalize() - last_ts).days + entry = {"ticker": ticker, "last_date": last_ts.date().isoformat(), "days_stale": days_stale} + # PR #134 uses absent_days=5 for the pre-MorningEnrich prune. + if days_stale > 5: + will_prune.append(entry) + else: + will_skip.append({**entry, "reason": "below_5d_threshold"}) + + return CheckResult( + name="universe_drift", + status="ok", + message=( + f"{len(candidates)} arctic stragglers; {len(will_prune)} would be pruned, " + f"{len(will_skip)} too fresh to drop" + ), + details={ + "candidates_count": len(candidates), + "would_prune_count": len(will_prune), + "would_prune": will_prune[:20], + "would_skip_count": len(will_skip), + }, + elapsed_seconds=time.time() - t0, + ) + + +def check_universe_sample_freshness(ctx: PreflightContext) -> CheckResult: + """Catches PR #133 class: post-write freshness scan tripping on + expected tickers. + + Sample 20 from ``arctic ∩ constituents`` (the same population the + actual scan would audit after PR #134's prune drains stragglers). + Predict any stale. + """ + import time + t0 = time.time() + if ctx.fresh_constituents is None or ctx.arctic_universe_symbols is None: + return CheckResult( + name="universe_sample_freshness", + status="fail", + message="Skipped: prior checks failed to populate context", + elapsed_seconds=time.time() - t0, + ) + + import arcticdb as adb + import pandas as pd + import random + + relevant = sorted(ctx.arctic_universe_symbols & ctx.fresh_constituents) + if not relevant: + return CheckResult( + name="universe_sample_freshness", + status="fail", + message="Empty (arctic ∩ constituents) — universe pruned to nothing or constituents misconfigured", + elapsed_seconds=time.time() - t0, + ) + + rng = random.Random(ctx.today) + sample = rng.sample(relevant, min(_UNIVERSE_SAMPLE_SIZE, len(relevant))) + + if ctx.universe_lib is None: + return CheckResult( + name="universe_sample_freshness", + status="fail", + message="Skipped: arctic_connectivity did not populate universe_lib", + elapsed_seconds=time.time() - t0, + ) + universe_lib = ctx.universe_lib + today = pd.Timestamp(ctx.today).normalize() + + stale: list[dict] = [] + for ticker in sample: + try: + df = universe_lib.tail(ticker, n=1).data + last_ts = pd.Timestamp(df.index[-1]).normalize() if not df.empty else None + except Exception: + last_ts = None + if last_ts is None: + stale.append({"ticker": ticker, "reason": "unreadable"}) + continue + days_stale = (today - last_ts).days + if days_stale > _UNIVERSE_FRESHNESS_MAX_STALE_DAYS: + stale.append({ + "ticker": ticker, + "last_date": last_ts.date().isoformat(), + "days_stale": days_stale, + }) + + if stale: + return CheckResult( + name="universe_sample_freshness", + status="warn", + message=( + f"{len(stale)}/{len(sample)} sampled symbols >{_UNIVERSE_FRESHNESS_MAX_STALE_DAYS}d " + f"stale TODAY (post-MorningEnrich would refresh, so not a hard-fail; " + f"flagging for visibility)" + ), + details={"stale": stale[:10]}, + elapsed_seconds=time.time() - t0, + ) + + return CheckResult( + name="universe_sample_freshness", + status="ok", + message=f"Sampled {len(sample)} symbols, all within {_UNIVERSE_FRESHNESS_MAX_STALE_DAYS}d of today", + elapsed_seconds=time.time() - t0, + ) + + +def check_polygon_grouped_coverage(ctx: PreflightContext) -> CheckResult: + """ONE polygon grouped-daily call to predict missing-from-closes. + + Same call the actual SF makes — re-using the rate-limit slot that + would otherwise be spent during the SF run. Populates the returned + ticker set into the context for downstream checks. + """ + import time + t0 = time.time() + if ctx.fresh_constituents is None: + return CheckResult( + name="polygon_grouped_coverage", + status="fail", + message="Skipped: constituents fetch failed", + elapsed_seconds=time.time() - t0, + ) + + import os + if not os.environ.get("POLYGON_API_KEY"): + # Local-laptop preflight — polygon key lives in .env on the spot + # and on EC2. Skip without failing so the rest of the report is + # actionable; on the spot the key is present and this fires. + return CheckResult( + name="polygon_grouped_coverage", + status="warn", + message="POLYGON_API_KEY not set — skipped (will run on spot/EC2)", + elapsed_seconds=time.time() - t0, + ) + + try: + from polygon_client import polygon_client, PolygonForbiddenError + grouped = polygon_client().get_grouped_daily(ctx.prior_trading_day) + except PolygonForbiddenError as exc: + return CheckResult( + name="polygon_grouped_coverage", + status="fail", + message=f"Polygon 403 — same-day fetch on free tier? ({exc})", + elapsed_seconds=time.time() - t0, + ) + except Exception as exc: + return CheckResult( + name="polygon_grouped_coverage", + status="fail", + message=f"Polygon raised: {exc}", + elapsed_seconds=time.time() - t0, + ) + + if not grouped: + return CheckResult( + name="polygon_grouped_coverage", + status="fail", + message=f"Polygon returned 0 tickers for {ctx.prior_trading_day}", + elapsed_seconds=time.time() - t0, + ) + + polygon_symbols = set(grouped.keys()) + ctx.polygon_returned_tickers = polygon_symbols + + requested = ctx.fresh_constituents + covered = polygon_symbols & requested + coverage_ratio = len(covered) / len(requested) if requested else 0 + missing = sorted(requested - polygon_symbols) + + if coverage_ratio < 0.95: + return CheckResult( + name="polygon_grouped_coverage", + status="fail", + message=( + f"Polygon coverage {coverage_ratio:.1%} below 95% — " + f"{len(missing)} of {len(requested)} requested constituents missing" + ), + details={"missing_sample": missing[:20]}, + elapsed_seconds=time.time() - t0, + ) + + return CheckResult( + name="polygon_grouped_coverage", + status="ok", + message=( + f"Polygon returned {len(polygon_symbols)} tickers; covers " + f"{len(covered)}/{len(requested)} constituents ({coverage_ratio:.1%})" + ), + details={ + "polygon_total": len(polygon_symbols), + "constituents_covered": len(covered), + "constituents_missing": len(missing), + "missing_sample": missing[:10], + }, + elapsed_seconds=time.time() - t0, + ) + + +def check_predicted_missing_from_closes(ctx: PreflightContext) -> CheckResult: + """Catches PR #132/#134 class: predict the missing-from-closes count + daily_append would compute AFTER the pre-MorningEnrich prune drains + stragglers. Should be the chronic polygon gaps only (≤4 today). + """ + import time + t0 = time.time() + if ctx.fresh_constituents is None or ctx.arctic_universe_symbols is None: + return CheckResult( + name="predicted_missing_from_closes", + status="fail", + message="Skipped: prior checks failed to populate context", + elapsed_seconds=time.time() - t0, + ) + if ctx.polygon_returned_tickers is None: + return CheckResult( + name="predicted_missing_from_closes", + status="warn", + message="Skipped: polygon check skipped (no API key locally)", + elapsed_seconds=time.time() - t0, + ) + + # Simulate post-prune state: arctic ∩ constituents (stragglers gone). + post_prune_arctic = ctx.arctic_universe_symbols & ctx.fresh_constituents + + # Closes will contain whatever polygon returned + per-ticker fallback + # (PR #131). Per-ticker fallback recovers ~0 of the chronic 4 today + # (BF-B, BRK-B, MOG-A, PSTG); model worst-case = no recovery. + expected_closes = ctx.polygon_returned_tickers + missing = sorted(post_prune_arctic - expected_closes) + n_missing = len(missing) + + if n_missing > _MISSING_FROM_CLOSES_THRESHOLD: + return CheckResult( + name="predicted_missing_from_closes", + status="fail", + message=( + f"Predicted {n_missing} > threshold {_MISSING_FROM_CLOSES_THRESHOLD} " + f"missing-from-closes after prune. SF would halt MorningEnrich." + ), + details={"missing": missing[:20], "threshold": _MISSING_FROM_CLOSES_THRESHOLD}, + elapsed_seconds=time.time() - t0, + ) + + return CheckResult( + name="predicted_missing_from_closes", + status="ok", + message=( + f"Predicted {n_missing} missing (under {_MISSING_FROM_CLOSES_THRESHOLD} threshold) " + f"— WARN-only path" + ), + details={"missing": missing, "threshold": _MISSING_FROM_CLOSES_THRESHOLD}, + elapsed_seconds=time.time() - t0, + ) + + +def check_backfill_source_freshness(ctx: PreflightContext) -> CheckResult: + """Catches PR #130 class: backfill regression preflight failure. + + Reads SPY's last_date from ArcticDB macro and the staging/daily_closes + parquet date. If staging exists for the prior trading day, backfill's + delta-merge will land at that date. Predict whether ArcticDB SPY + last_date <= effective backfill source last_date (no regression). + """ + import time + import io + import boto3 + import pandas as pd + + t0 = time.time() + s3 = boto3.client("s3") + + # ArcticDB SPY last_date — reuse macro_lib from check_arctic_connectivity. + if ctx.macro_lib is None: + return CheckResult( + name="backfill_source_freshness", + status="fail", + message="Skipped: arctic_connectivity did not populate macro_lib", + elapsed_seconds=time.time() - t0, + ) + try: + spy_df = ctx.macro_lib.tail("SPY", n=1).data + arctic_spy_last = pd.Timestamp(spy_df.index[-1]).normalize() if not spy_df.empty else None + except Exception as exc: + return CheckResult( + name="backfill_source_freshness", + status="fail", + message=f"ArcticDB SPY read raised: {exc}", + elapsed_seconds=time.time() - t0, + ) + + if arctic_spy_last is None: + return CheckResult( + name="backfill_source_freshness", + status="fail", + message="ArcticDB SPY is empty", + elapsed_seconds=time.time() - t0, + ) + + # Backfill source = price_cache + daily_closes delta. Effective last + # is max(price_cache_last, daily_closes_last). Read SPY parquet. + try: + obj = s3.get_object(Bucket=ctx.bucket, Key="predictor/price_cache/SPY.parquet") + df = pd.read_parquet(io.BytesIO(obj["Body"].read())) + cache_last = pd.Timestamp(df.index[-1]).normalize() + except Exception as exc: + return CheckResult( + name="backfill_source_freshness", + status="fail", + message=f"price_cache SPY read raised: {exc}", + elapsed_seconds=time.time() - t0, + ) + + # Daily delta — staging/daily_closes/{prior_trading_day}.parquet. + try: + obj = s3.get_object( + Bucket=ctx.bucket, + Key=f"staging/daily_closes/{ctx.prior_trading_day}.parquet", + ) + delta_df = pd.read_parquet(io.BytesIO(obj["Body"].read())) + delta_last = pd.Timestamp(ctx.prior_trading_day).normalize() if "SPY" in delta_df.index else None + except Exception: + delta_last = None + + effective_last = cache_last + if delta_last is not None and delta_last > effective_last: + effective_last = delta_last + + details = { + "arctic_spy_last": arctic_spy_last.date().isoformat(), + "cache_spy_last": cache_last.date().isoformat(), + "delta_spy_last": delta_last.date().isoformat() if delta_last else None, + "effective_backfill_source_last": effective_last.date().isoformat(), + } + + if effective_last < arctic_spy_last: + return CheckResult( + name="backfill_source_freshness", + status="fail", + message=( + f"Backfill regression preflight (PR #130) would fail: " + f"source last={effective_last.date()} < arctic last={arctic_spy_last.date()}" + ), + details=details, + elapsed_seconds=time.time() - t0, + ) + + return CheckResult( + name="backfill_source_freshness", + status="ok", + message=f"Backfill source ({effective_last.date()}) ≥ arctic ({arctic_spy_last.date()})", + details=details, + elapsed_seconds=time.time() - t0, + ) + + +def check_postflight_contracts(ctx: PreflightContext) -> CheckResult: + """Verify the S3 contract files postflight (validators/postflight.py) + will read are present + parseable. Catches latest_weekly.json / + constituents.json / macro.json / short_interest.json drift before SF + fires the actual postflight. + """ + import time + import boto3 + t0 = time.time() + s3 = boto3.client("s3") + issues: list[str] = [] + + def _read(key: str) -> "dict | None": + try: + obj = s3.get_object(Bucket=ctx.bucket, Key=key) + return json.loads(obj["Body"].read()) + except Exception as exc: + issues.append(f"{key}: {exc}") + return None + + pointer = _read("market_data/latest_weekly.json") + if pointer: + ptr_date = pointer.get("date") + if not ptr_date: + issues.append("latest_weekly.json missing 'date'") + else: + # Each weekly artifact is checked at the pointer's date prefix. + prefix = pointer.get("s3_prefix", f"market_data/weekly/{ptr_date}/").rstrip("/") + cons = _read(f"{prefix}/constituents.json") + if cons: + if len(cons.get("tickers") or []) < 800: + issues.append(f"constituents.json tickers {len(cons.get('tickers') or [])} < 800") + if not isinstance(cons.get("sector_map"), dict): + issues.append("constituents.json missing sector_map dict") + macro = _read(f"{prefix}/macro.json") + if macro and macro.get("fed_funds_rate") is None: + issues.append("macro.json missing fed_funds_rate") + + if issues: + return CheckResult( + name="postflight_contracts", + status="warn", + message=( + f"{len(issues)} contract issues; postflight may still pass if Phase 1 " + f"rewrites these mid-run, but flagging for visibility" + ), + details={"issues": issues[:10]}, + elapsed_seconds=time.time() - t0, + ) + + return CheckResult( + name="postflight_contracts", + status="ok", + message="All postflight contract files present + parseable", + elapsed_seconds=time.time() - t0, + ) + + +# ── Orchestrator ────────────────────────────────────────────────────────────── + + +# ArcticDB on macOS crashes in ``Aws::S3::S3Client::S3Client`` if boto3 has +# already initialized the AWS SDK in the process — the arcticdb-bundled +# AWS SDK conflicts with the system one. Initializing arctic FIRST avoids +# this on macOS and is harmless on Linux. (Linux EC2 doesn't hit the race +# at all; this matters only for local-laptop preflight runs.) +CHECKS = [ + check_arctic_connectivity, + check_constituents_fetch, + check_universe_drift, + check_universe_sample_freshness, + check_polygon_grouped_coverage, + check_predicted_missing_from_closes, + check_backfill_source_freshness, + check_postflight_contracts, +] + + +def _previous_trading_day_str() -> str: + """Resolve the prior trading day. Avoids importing weekly_collector + (which transitively imports boto3 + every collector module) so + ArcticDB's bundled AWS SDK doesn't conflict with system boto3 — the + conflict crashes on macOS, see CHECKS docstring. + """ + from datetime import timedelta + from alpha_engine_lib.trading_calendar import is_trading_day + today = datetime.now(timezone.utc).date() + candidate = today - timedelta(days=1) + for _ in range(10): + if is_trading_day(candidate): + return candidate.strftime("%Y-%m-%d") + candidate -= timedelta(days=1) + raise RuntimeError("Could not find a trading day within the last 10 days") + + +def run_preflight(bucket: str = DEFAULT_BUCKET) -> tuple[int, list[CheckResult]]: + """Execute all checks against real state. Returns (n_failures, results). + + Each check runs in its own try/except — a single check raising must + not abort the others (we want the full picture, not first-fail-bail). + """ + today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + prior = _previous_trading_day_str() + ctx = PreflightContext(bucket=bucket, today=today, prior_trading_day=prior) + + results: list[CheckResult] = [] + for check_fn in CHECKS: + try: + results.append(check_fn(ctx)) + except Exception as exc: + results.append(CheckResult( + name=check_fn.__name__.replace("check_", ""), + status="fail", + message=f"Check raised: {type(exc).__name__}: {exc}", + )) + n_fail = sum(1 for r in results if r.status == "fail") + return n_fail, results + + +# ── CLI ─────────────────────────────────────────────────────────────────────── + + +def _format_human(results: list[CheckResult]) -> str: + lines = ["", "=" * 70, " Saturday SF Preflight ", "=" * 70, ""] + icons = {"ok": "[OK] ", "warn": "[WARN]", "fail": "[FAIL]"} + for r in results: + lines.append(f"{icons.get(r.status, '[?] ')} {r.name:<32} {r.message}") + if r.status == "fail" and r.details: + for k, v in r.details.items(): + lines.append(f" {k}: {v}") + n_fail = sum(1 for r in results if r.status == "fail") + n_warn = sum(1 for r in results if r.status == "warn") + lines.append("") + lines.append("-" * 70) + if n_fail == 0 and n_warn == 0: + lines.append(" Predicted SF outcome: PASS") + elif n_fail == 0: + lines.append(f" Predicted SF outcome: PASS with {n_warn} warning(s)") + else: + lines.append(f" Predicted SF outcome: FAIL ({n_fail} failure(s), {n_warn} warning(s))") + lines.append("=" * 70) + return "\n".join(lines) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--bucket", default=DEFAULT_BUCKET) + parser.add_argument("--json", action="store_true", help="Emit structured JSON instead of human summary") + parser.add_argument("--verbose", "-v", action="store_true") + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.WARNING, + format="%(asctime)s %(levelname)s [%(name)s] %(message)s", + ) + + n_fail, results = run_preflight(bucket=args.bucket) + + if args.json: + print(json.dumps([asdict(r) for r in results], indent=2, default=str)) + else: + print(_format_human(results)) + + return 1 if n_fail > 0 else 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_sf_preflight.py b/tests/test_sf_preflight.py new file mode 100644 index 0000000..af7704f --- /dev/null +++ b/tests/test_sf_preflight.py @@ -0,0 +1,362 @@ +"""Tests for sf_preflight.py — Saturday SF dry-rehearsal. + +Each check tested independently with mocked S3 / ArcticDB / polygon / +Wikipedia. Asserts both the happy path and the specific failure mode +each check is designed to catch. +""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pandas as pd +import pytest + +import sf_preflight as sfp + + +def _ctx(bucket: str = "test-bucket") -> sfp.PreflightContext: + return sfp.PreflightContext( + bucket=bucket, + today="2026-05-02", + prior_trading_day="2026-05-01", + ) + + +# ── check_constituents_fetch ────────────────────────────────────────────────── + + +def test_constituents_fetch_ok_populates_context(): + ctx = _ctx() + fake_return = ( + ["AAPL"] * 500 + ["MSFT"] * 400, # tickers (totals: ~903 like prod) + {**{f"T{i}": "Industrials" for i in range(900)}, # sector_map covers all + "AAPL": "Information Technology", "MSFT": "Information Technology"}, + {"AAPL": "XLK", "MSFT": "XLK"}, # sector_etf_map + 500, # sp500_count + 400, # sp400_count + ) + # Actually use realistic-shape data: deduped tickers + complete sector_map. + real_tickers = [f"T{i}" for i in range(900)] + real_sectors = {t: "Industrials" for t in real_tickers} + fake_return = (real_tickers, real_sectors, {}, 500, 400) + + with patch("collectors.constituents._fetch_constituents", return_value=fake_return): + result = sfp.check_constituents_fetch(ctx) + assert result.status == "ok" + assert "900 tickers" in result.message + assert ctx.fresh_constituents == set(real_tickers) + + +def test_constituents_fetch_fails_on_zero_tickers(): + ctx = _ctx() + with patch("collectors.constituents._fetch_constituents", return_value=([], {}, {}, 0, 0)): + result = sfp.check_constituents_fetch(ctx) + assert result.status == "fail" + assert "0 tickers" in result.message + assert ctx.fresh_constituents is None + + +def test_constituents_fetch_fails_on_unmapped_tickers(): + """Pre-empts the RuntimeError that constituents.collect would raise.""" + ctx = _ctx() + tickers = [f"T{i}" for i in range(900)] + # Sector map is missing 50 tickers — collect() would hard-fail at write time. + sectors = {t: "Industrials" for t in tickers[:850]} + with patch("collectors.constituents._fetch_constituents", + return_value=(tickers, sectors, {}, 500, 400)): + result = sfp.check_constituents_fetch(ctx) + assert result.status == "fail" + assert "sector_map missing" in result.message + + +def test_constituents_fetch_fails_on_sp500_count_drift(): + """If Wikipedia parsing drops the table, sp500_count tanks.""" + ctx = _ctx() + tickers = [f"T{i}" for i in range(400)] + with patch( + "collectors.constituents._fetch_constituents", + return_value=(tickers, {t: "Industrials" for t in tickers}, {}, 0, 400), + ): + result = sfp.check_constituents_fetch(ctx) + assert result.status == "fail" + assert "S&P 500 count" in result.message + + +def test_constituents_fetch_fails_on_wikipedia_exception(): + ctx = _ctx() + with patch("collectors.constituents._fetch_constituents", + side_effect=ConnectionError("Wikipedia 503")): + result = sfp.check_constituents_fetch(ctx) + assert result.status == "fail" + assert "Wikipedia 503" in result.message + + +# ── check_universe_drift (PR #134 class) ────────────────────────────────────── + + +def _stub_universe_lib_for_drift(stragglers_with_dates: dict[str, str]): + """ArcticDB stub returning specified last_dates for stragglers.""" + lib = MagicMock() + + def fake_tail(sym, n=1): + if sym in stragglers_with_dates: + df = pd.DataFrame({"Close": [100.0]}, + index=[pd.Timestamp(stragglers_with_dates[sym])]) + else: + df = pd.DataFrame({"Close": [100.0]}, + index=[pd.Timestamp("2026-05-01")]) # fresh + return MagicMock(data=df) + + lib.tail.side_effect = fake_tail + return lib + + +def test_universe_drift_predicts_prune_outcome(): + """The 2026-05-02 case: 8 stragglers in arctic, all stale enough to prune.""" + ctx = _ctx() + ctx.fresh_constituents = {"AAPL", "MSFT"} + ctx.arctic_universe_symbols = {"AAPL", "MSFT", "ASGN", "GTM", "HOLX", + "KMPR", "LW", "MOH", "MTCH", "PAYC"} + + stale_dates = { + "ASGN": "2026-04-24", "GTM": "2026-04-24", "HOLX": "2026-04-07", + "KMPR": "2026-04-24", "LW": "2026-04-24", "MOH": "2026-04-24", + "MTCH": "2026-04-24", "PAYC": "2026-04-24", + } + ctx.universe_lib = _stub_universe_lib_for_drift(stale_dates) + + result = sfp.check_universe_drift(ctx) + + assert result.status == "ok" + assert result.details["candidates_count"] == 8 + assert result.details["would_prune_count"] == 8 + + +def test_universe_drift_no_stragglers_passes_quietly(): + ctx = _ctx() + ctx.fresh_constituents = {"AAPL", "MSFT"} + ctx.arctic_universe_symbols = {"AAPL", "MSFT"} + + result = sfp.check_universe_drift(ctx) + assert result.status == "ok" + assert "No straggler candidates" in result.message + + +def test_universe_drift_skipped_if_context_unpopulated(): + """If constituents fetch failed upstream, this check fails loudly + instead of misleadingly passing on partial data.""" + ctx = _ctx() + # ctx.fresh_constituents and ctx.arctic_universe_symbols left None + result = sfp.check_universe_drift(ctx) + assert result.status == "fail" + + +# ── check_polygon_grouped_coverage (PR #131 class) ──────────────────────────── + + +def test_polygon_grouped_coverage_ok_at_full_coverage(monkeypatch): + monkeypatch.setenv("POLYGON_API_KEY", "stub") + ctx = _ctx() + ctx.fresh_constituents = {"AAPL", "MSFT"} + fake_client = MagicMock() + fake_client.get_grouped_daily.return_value = {"AAPL": {}, "MSFT": {}, "GOOG": {}} + with patch("polygon_client.polygon_client", return_value=fake_client): + result = sfp.check_polygon_grouped_coverage(ctx) + assert result.status == "ok" + assert ctx.polygon_returned_tickers == {"AAPL", "MSFT", "GOOG"} + + +def test_polygon_grouped_coverage_fails_below_95pct(monkeypatch): + """The exact PR #131 scenario: polygon returns fewer-than-needed tickers.""" + monkeypatch.setenv("POLYGON_API_KEY", "stub") + ctx = _ctx() + ctx.fresh_constituents = {f"T{i}" for i in range(100)} + # polygon returns only 50/100 — 50% coverage, below 95% threshold. + fake_client = MagicMock() + fake_client.get_grouped_daily.return_value = {f"T{i}": {} for i in range(50)} + with patch("polygon_client.polygon_client", return_value=fake_client): + result = sfp.check_polygon_grouped_coverage(ctx) + assert result.status == "fail" + assert "coverage" in result.message.lower() + + +def test_polygon_grouped_coverage_fails_on_403(monkeypatch): + monkeypatch.setenv("POLYGON_API_KEY", "stub") + from polygon_client import PolygonForbiddenError + ctx = _ctx() + ctx.fresh_constituents = {"AAPL"} + fake_client = MagicMock() + fake_client.get_grouped_daily.side_effect = PolygonForbiddenError("free tier same-day") + with patch("polygon_client.polygon_client", return_value=fake_client): + result = sfp.check_polygon_grouped_coverage(ctx) + assert result.status == "fail" + assert "403" in result.message + + +def test_polygon_grouped_coverage_skips_when_no_api_key(monkeypatch): + """Local-laptop preflight without POLYGON_API_KEY must skip gracefully + (WARN, not FAIL) so the rest of the report stays actionable.""" + monkeypatch.delenv("POLYGON_API_KEY", raising=False) + ctx = _ctx() + ctx.fresh_constituents = {"AAPL"} + result = sfp.check_polygon_grouped_coverage(ctx) + assert result.status == "warn" + assert "POLYGON_API_KEY" in result.message + + +# ── check_predicted_missing_from_closes (PR #132 class) ─────────────────────── + + +def test_predicted_missing_under_threshold_passes(): + """Post-prune state: only the chronic 4 polygon-coverage tickers missing + from constituents — under the threshold of 5.""" + ctx = _ctx() + ctx.fresh_constituents = {"AAPL", "MSFT", "BF-B", "BRK-B", "MOG-A", "PSTG"} + ctx.arctic_universe_symbols = ctx.fresh_constituents # post-prune coherent + ctx.polygon_returned_tickers = {"AAPL", "MSFT"} # polygon misses the 4 chronic + result = sfp.check_predicted_missing_from_closes(ctx) + assert result.status == "ok" + + +def test_predicted_missing_above_threshold_fails(): + """Pre-prune state (or stragglers missed): would trip the SF hard-fail.""" + ctx = _ctx() + ctx.fresh_constituents = {f"T{i}" for i in range(20)} + ctx.arctic_universe_symbols = ctx.fresh_constituents + ctx.polygon_returned_tickers = {"T0", "T1"} # 18 missing, threshold is 5 + result = sfp.check_predicted_missing_from_closes(ctx) + assert result.status == "fail" + assert "would halt" in result.message.lower() + + +def test_predicted_missing_excludes_stragglers_correctly(): + """The PR #134 + PR #132 intersection: stragglers in arctic but not in + fresh constituents must be excluded from the 'expected' set so they + don't inflate the missing count post-prune.""" + ctx = _ctx() + ctx.fresh_constituents = {"AAPL", "MSFT"} + # Arctic still has stragglers (pre-prune state). + ctx.arctic_universe_symbols = {"AAPL", "MSFT", "STRAGGLER1", "STRAGGLER2"} + ctx.polygon_returned_tickers = {"AAPL", "MSFT"} + result = sfp.check_predicted_missing_from_closes(ctx) + # Post-prune (arctic ∩ constituents) = {AAPL, MSFT}; closes covers both. + assert result.status == "ok" + + +# ── check_backfill_source_freshness (PR #130 class) ─────────────────────────── + + +def _bytes_for_parquet(last_date_str: str, has_spy: bool = True) -> bytes: + import io + df = pd.DataFrame( + {"Close": [100.0]}, + index=pd.DatetimeIndex([pd.Timestamp(last_date_str)]), + ) + if has_spy: + df.index = pd.Index(["SPY"]) # daily_closes uses ticker as index + buf = io.BytesIO() + df.to_parquet(buf, engine="pyarrow") + return buf.getvalue() + + +def _stub_macro_lib(spy_last_date: str): + lib = MagicMock() + lib.tail.return_value = MagicMock( + data=pd.DataFrame({"Close": [100.0]}, index=[pd.Timestamp(spy_last_date)]) + ) + return lib + + +def test_backfill_source_freshness_passes_when_delta_covers_arctic(): + """Happy path: ArcticDB SPY at 2026-04-30, daily_closes has 2026-05-01, + backfill source ≥ arctic → no regression predicted.""" + ctx = _ctx() + ctx.macro_lib = _stub_macro_lib("2026-04-30") + + import io + cache_df = pd.DataFrame({"Close": [100.0]}, + index=[pd.Timestamp("2026-04-30")]) + cache_buf = io.BytesIO() + cache_df.to_parquet(cache_buf, engine="pyarrow") + + delta_df = pd.DataFrame({"Close": [100.0]}, index=pd.Index(["SPY"])) + delta_buf = io.BytesIO() + delta_df.to_parquet(delta_buf, engine="pyarrow") + + fake_s3 = MagicMock() + def fake_get(**kw): + body = MagicMock() + if "price_cache" in kw["Key"]: + body.read.return_value = cache_buf.getvalue() + else: + body.read.return_value = delta_buf.getvalue() + return {"Body": body} + fake_s3.get_object.side_effect = fake_get + + with patch("boto3.client", return_value=fake_s3): + result = sfp.check_backfill_source_freshness(ctx) + assert result.status == "ok" + + +def test_backfill_source_freshness_fails_when_source_regresses(): + """The PR #130 scenario: ArcticDB has 5/1 (from MorningEnrich earlier), + but cache is only 4/30 and no daily_closes delta exists → backfill + would clobber 5/1 → regression.""" + ctx = _ctx() + ctx.macro_lib = _stub_macro_lib("2026-05-01") # arctic ahead + + import io + cache_df = pd.DataFrame({"Close": [100.0]}, + index=[pd.Timestamp("2026-04-30")]) + cache_buf = io.BytesIO() + cache_df.to_parquet(cache_buf, engine="pyarrow") + + fake_s3 = MagicMock() + def fake_get(**kw): + if "price_cache" in kw["Key"]: + body = MagicMock() + body.read.return_value = cache_buf.getvalue() + return {"Body": body} + raise Exception("NoSuchKey") + fake_s3.get_object.side_effect = fake_get + + with patch("boto3.client", return_value=fake_s3): + result = sfp.check_backfill_source_freshness(ctx) + assert result.status == "fail" + assert "regression" in result.message.lower() + + +# ── Orchestrator ────────────────────────────────────────────────────────────── + + +def test_run_preflight_isolates_check_failures(): + """A single check raising must NOT abort the suite — we want the full + picture. Forces one check to raise; asserts the others still ran.""" + def raising_check(ctx): + raise RuntimeError("boom") + + raising_check.__name__ = "check_test_raise" + + with patch.object(sfp, "CHECKS", [raising_check, sfp.check_arctic_connectivity]), \ + patch("arcticdb.Arctic", side_effect=Exception("arctic stub")): + n_fail, results = sfp.run_preflight(bucket="test-bucket") + + assert len(results) == 2 # both ran; first wrapped to fail, second ran + assert results[0].status == "fail" + assert "boom" in results[0].message + + +def test_run_preflight_returns_failure_count(): + def fail_check(ctx): + return sfp.CheckResult(name="x", status="fail", message="nope") + fail_check.__name__ = "check_x" + + def ok_check(ctx): + return sfp.CheckResult(name="y", status="ok", message="fine") + ok_check.__name__ = "check_y" + + with patch.object(sfp, "CHECKS", [fail_check, ok_check, fail_check]): + n_fail, results = sfp.run_preflight(bucket="test-bucket") + assert n_fail == 2 + assert len(results) == 3