Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 97 additions & 5 deletions collectors/daily_closes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
The predictor inference Lambda uses these to bridge the gap between the
weekly slim cache and today's prices, avoiding a full 2-year yfinance fetch.

Data source priority: polygon.io grouped-daily (1 API call for all US stocks),
then yfinance batch download for any tickers polygon missed.
Data source priority:
1. polygon.io grouped-daily (1 API call, covers all US stocks + ETFs)
2. FRED (covers the 4 index tickers not on polygon free tier: VIX, VIX3M, TNX, IRX)
3. yfinance batch download (fallback for whatever remains)

Schema: index=ticker (str), columns=[date, Open, High, Low, Close, Adj_Close, Volume, VWAP]
"""
Expand All @@ -15,17 +17,33 @@

import io
import logging
import os
import time
from datetime import datetime, timezone

import boto3
import pandas as pd
import requests

logger = logging.getLogger(__name__)

_YFINANCE_BATCH_SIZE = 100
_YFINANCE_BATCH_DELAY = 2 # seconds between batches

_FRED_BASE = "https://api.stlouisfed.org/fred/series/observations"
_FRED_TIMEOUT = 15

# Map our ArcticDB ticker key (after stripping ^) to FRED series id.
# Both yfinance (^VIX, ^TNX, ...) and FRED (VIXCLS, DGS10, ...) publish
# these in the same scale (raw index level for VIX/VIX3M, percent for
# TNX/IRX), so no conversion is needed before appending to ArcticDB.
_FRED_INDEX_MAP = {
"VIX": "VIXCLS",
"VIX3M": "VXVCLS",
"TNX": "DGS10",
"IRX": "DTB3",
}


def collect(
bucket: str,
Expand Down Expand Up @@ -97,7 +115,20 @@ def collect(
except Exception as e:
logger.warning("Polygon grouped-daily failed: %s — falling back to yfinance", e)

# ── Step 2: yfinance fallback for missing tickers ────────────────────────
# ── Step 2: FRED fallback for index tickers ──────────────────────────────
# VIX/VIX3M/TNX/IRX are not on polygon free tier. FRED has same-scale
# equivalents (VIXCLS/VXVCLS/DGS10/DTB3) that typically publish T-1 values
# by the time the daily pipeline runs at 6:05 AM PT.
captured_tickers = {r["ticker"] for r in records}
fred_missing = [
t for t in tickers
if t.lstrip("^") not in captured_tickers and t.lstrip("^") in _FRED_INDEX_MAP
]
fred_count = 0
if fred_missing:
fred_count = _fetch_fred_closes(fred_missing, run_date, records)

# ── Step 3: yfinance fallback for anything still missing ─────────────────
captured_tickers = {r["ticker"] for r in records}
missing = [t for t in tickers if t.lstrip("^") not in captured_tickers]
yfinance_count = 0
Expand All @@ -110,14 +141,17 @@ def collect(
return {"status": "error", "error": "no data fetched", "tickers_captured": 0}

closes_df = pd.DataFrame(records).set_index("ticker")
logger.info("Daily closes: %d tickers for %s (polygon=%d, yfinance=%d)",
len(closes_df), run_date, polygon_count, yfinance_count)
logger.info(
"Daily closes: %d tickers for %s (polygon=%d, fred=%d, yfinance=%d)",
len(closes_df), run_date, polygon_count, fred_count, yfinance_count,
)

if dry_run:
return {
"status": "ok_dry_run",
"tickers_captured": len(closes_df),
"polygon": polygon_count,
"fred": fred_count,
"yfinance": yfinance_count,
}

Expand All @@ -137,13 +171,71 @@ def collect(
"status": "ok",
"tickers_captured": len(closes_df),
"polygon": polygon_count,
"fred": fred_count,
"yfinance": yfinance_count,
}
except Exception as e:
logger.error("Failed to write daily closes: %s", e)
return {"status": "error", "error": str(e), "tickers_captured": len(closes_df)}


def _fetch_fred_closes(
tickers: list[str],
date_str: str,
records: list[dict],
) -> int:
"""Fetch the latest close for index tickers from FRED.

Serves the 4 index symbols not on polygon free tier (VIX, VIX3M, TNX, IRX).
Takes the most recent non-missing observation for each series — typically
T-1 when the daily pipeline runs at 6:05 AM PT.
"""
api_key = os.environ.get("FRED_API_KEY", "")
if not api_key:
logger.warning("FRED_API_KEY not set — skipping FRED fallback for %d tickers", len(tickers))
return 0

count = 0
for ticker in tickers:
store_ticker = ticker.lstrip("^")
series_id = _FRED_INDEX_MAP.get(store_ticker)
if not series_id:
continue
try:
params = {
"series_id": series_id,
"api_key": api_key,
"file_type": "json",
"sort_order": "desc",
"limit": 5,
}
resp = requests.get(_FRED_BASE, params=params, timeout=_FRED_TIMEOUT)
resp.raise_for_status()
obs = resp.json().get("observations", [])
latest = next((o for o in obs if o.get("value", ".") != "."), None)
if latest is None:
logger.warning("FRED %s → %s: no non-missing observation", store_ticker, series_id)
continue
close = float(latest["value"])
records.append({
"ticker": store_ticker,
"date": date_str,
"Open": round(close, 4),
"High": round(close, 4),
"Low": round(close, 4),
"Close": round(close, 4),
"Adj_Close": round(close, 4),
"Volume": 0,
"VWAP": round(close, 4),
})
count += 1
except Exception as e:
logger.warning("FRED fetch failed for %s (%s): %s", store_ticker, series_id, e)

logger.info("FRED fallback: %d/%d index tickers captured", count, len(tickers))
return count


def _fetch_yfinance_closes(
tickers: list[str],
date_str: str,
Expand Down
15 changes: 15 additions & 0 deletions weekly_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,21 @@ def _run_daily(config: dict, args: argparse.Namespace) -> dict:
results["status"] = "failed"
return results

# Macro symbols are not S&P constituents but are core daily predictor inputs
# (vix_level, vix_term_slope, yield_10y, yield_curve_slope, sector-relative
# features). Appending them here lets builders/daily_append.py update the
# ArcticDB macro library every weekday — pre-ArcticDB, the predictor Lambda
# fetched these from yfinance on each run; post-migration, the write path
# moved here. ETFs come from polygon; indices (^-prefix) fall through to
# FRED then yfinance in daily_closes.collect.
MACRO_DAILY_TICKERS = [
"SPY", "GLD", "USO",
"XLB", "XLC", "XLE", "XLF", "XLI", "XLK",
"XLP", "XLRE", "XLU", "XLV", "XLY",
"^VIX", "^VIX3M", "^TNX", "^IRX",
]
tickers = list(dict.fromkeys(tickers + MACRO_DAILY_TICKERS))

logger.info("=" * 60)
logger.info("COLLECTING: daily closes")
logger.info("=" * 60)
Expand Down
Loading