diff --git a/collectors/daily_closes.py b/collectors/daily_closes.py index 88b5327..40290fa 100644 --- a/collectors/daily_closes.py +++ b/collectors/daily_closes.py @@ -5,8 +5,10 @@ The predictor inference Lambda uses these to bridge the gap between the weekly slim cache and today's prices, avoiding a full 2-year yfinance fetch. -Data source priority: polygon.io grouped-daily (1 API call for all US stocks), -then yfinance batch download for any tickers polygon missed. +Data source priority: + 1. polygon.io grouped-daily (1 API call, covers all US stocks + ETFs) + 2. FRED (covers the 4 index tickers not on polygon free tier: VIX, VIX3M, TNX, IRX) + 3. yfinance batch download (fallback for whatever remains) Schema: index=ticker (str), columns=[date, Open, High, Low, Close, Adj_Close, Volume, VWAP] """ @@ -15,17 +17,33 @@ import io import logging +import os import time from datetime import datetime, timezone import boto3 import pandas as pd +import requests logger = logging.getLogger(__name__) _YFINANCE_BATCH_SIZE = 100 _YFINANCE_BATCH_DELAY = 2 # seconds between batches +_FRED_BASE = "https://api.stlouisfed.org/fred/series/observations" +_FRED_TIMEOUT = 15 + +# Map our ArcticDB ticker key (after stripping ^) to FRED series id. +# Both yfinance (^VIX, ^TNX, ...) and FRED (VIXCLS, DGS10, ...) publish +# these in the same scale (raw index level for VIX/VIX3M, percent for +# TNX/IRX), so no conversion is needed before appending to ArcticDB. +_FRED_INDEX_MAP = { + "VIX": "VIXCLS", + "VIX3M": "VXVCLS", + "TNX": "DGS10", + "IRX": "DTB3", +} + def collect( bucket: str, @@ -97,7 +115,20 @@ def collect( except Exception as e: logger.warning("Polygon grouped-daily failed: %s — falling back to yfinance", e) - # ── Step 2: yfinance fallback for missing tickers ──────────────────────── + # ── Step 2: FRED fallback for index tickers ────────────────────────────── + # VIX/VIX3M/TNX/IRX are not on polygon free tier. FRED has same-scale + # equivalents (VIXCLS/VXVCLS/DGS10/DTB3) that typically publish T-1 values + # by the time the daily pipeline runs at 6:05 AM PT. + captured_tickers = {r["ticker"] for r in records} + fred_missing = [ + t for t in tickers + if t.lstrip("^") not in captured_tickers and t.lstrip("^") in _FRED_INDEX_MAP + ] + fred_count = 0 + if fred_missing: + fred_count = _fetch_fred_closes(fred_missing, run_date, records) + + # ── Step 3: yfinance fallback for anything still missing ───────────────── captured_tickers = {r["ticker"] for r in records} missing = [t for t in tickers if t.lstrip("^") not in captured_tickers] yfinance_count = 0 @@ -110,14 +141,17 @@ def collect( return {"status": "error", "error": "no data fetched", "tickers_captured": 0} closes_df = pd.DataFrame(records).set_index("ticker") - logger.info("Daily closes: %d tickers for %s (polygon=%d, yfinance=%d)", - len(closes_df), run_date, polygon_count, yfinance_count) + logger.info( + "Daily closes: %d tickers for %s (polygon=%d, fred=%d, yfinance=%d)", + len(closes_df), run_date, polygon_count, fred_count, yfinance_count, + ) if dry_run: return { "status": "ok_dry_run", "tickers_captured": len(closes_df), "polygon": polygon_count, + "fred": fred_count, "yfinance": yfinance_count, } @@ -137,6 +171,7 @@ def collect( "status": "ok", "tickers_captured": len(closes_df), "polygon": polygon_count, + "fred": fred_count, "yfinance": yfinance_count, } except Exception as e: @@ -144,6 +179,63 @@ def collect( return {"status": "error", "error": str(e), "tickers_captured": len(closes_df)} +def _fetch_fred_closes( + tickers: list[str], + date_str: str, + records: list[dict], +) -> int: + """Fetch the latest close for index tickers from FRED. + + Serves the 4 index symbols not on polygon free tier (VIX, VIX3M, TNX, IRX). + Takes the most recent non-missing observation for each series — typically + T-1 when the daily pipeline runs at 6:05 AM PT. + """ + api_key = os.environ.get("FRED_API_KEY", "") + if not api_key: + logger.warning("FRED_API_KEY not set — skipping FRED fallback for %d tickers", len(tickers)) + return 0 + + count = 0 + for ticker in tickers: + store_ticker = ticker.lstrip("^") + series_id = _FRED_INDEX_MAP.get(store_ticker) + if not series_id: + continue + try: + params = { + "series_id": series_id, + "api_key": api_key, + "file_type": "json", + "sort_order": "desc", + "limit": 5, + } + resp = requests.get(_FRED_BASE, params=params, timeout=_FRED_TIMEOUT) + resp.raise_for_status() + obs = resp.json().get("observations", []) + latest = next((o for o in obs if o.get("value", ".") != "."), None) + if latest is None: + logger.warning("FRED %s → %s: no non-missing observation", store_ticker, series_id) + continue + close = float(latest["value"]) + records.append({ + "ticker": store_ticker, + "date": date_str, + "Open": round(close, 4), + "High": round(close, 4), + "Low": round(close, 4), + "Close": round(close, 4), + "Adj_Close": round(close, 4), + "Volume": 0, + "VWAP": round(close, 4), + }) + count += 1 + except Exception as e: + logger.warning("FRED fetch failed for %s (%s): %s", store_ticker, series_id, e) + + logger.info("FRED fallback: %d/%d index tickers captured", count, len(tickers)) + return count + + def _fetch_yfinance_closes( tickers: list[str], date_str: str, diff --git a/weekly_collector.py b/weekly_collector.py index 4615569..c48d89b 100644 --- a/weekly_collector.py +++ b/weekly_collector.py @@ -374,6 +374,21 @@ def _run_daily(config: dict, args: argparse.Namespace) -> dict: results["status"] = "failed" return results + # Macro symbols are not S&P constituents but are core daily predictor inputs + # (vix_level, vix_term_slope, yield_10y, yield_curve_slope, sector-relative + # features). Appending them here lets builders/daily_append.py update the + # ArcticDB macro library every weekday — pre-ArcticDB, the predictor Lambda + # fetched these from yfinance on each run; post-migration, the write path + # moved here. ETFs come from polygon; indices (^-prefix) fall through to + # FRED then yfinance in daily_closes.collect. + MACRO_DAILY_TICKERS = [ + "SPY", "GLD", "USO", + "XLB", "XLC", "XLE", "XLF", "XLI", "XLK", + "XLP", "XLRE", "XLU", "XLV", "XLY", + "^VIX", "^VIX3M", "^TNX", "^IRX", + ] + tickers = list(dict.fromkeys(tickers + MACRO_DAILY_TICKERS)) + logger.info("=" * 60) logger.info("COLLECTING: daily closes") logger.info("=" * 60)