From b6d83caa6ae6f74d5de9a96f3bf2e4abb9830a5b Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Thu, 9 Apr 2026 17:27:56 -0700 Subject: [PATCH] Add signal_returns collector + extend universe_returns with 30d MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New collector (collectors/signal_returns.py) seeds and backfills score_performance and predictor_outcomes tables by JOINing against universe_returns — zero yfinance calls. Runs after universe_returns in Phase 1. universe_returns extended with return_30d, spy_return_30d, beat_spy_30d columns (one extra polygon grouped-daily call per date). This enables the backtester's evaluator to run as a pure analysis module with no external API dependencies. Co-Authored-By: Claude Opus 4.6 (1M context) --- collectors/signal_returns.py | 385 +++++++++++++++++++++++++++++++++ collectors/universe_returns.py | 32 ++- weekly_collector.py | 24 +- 3 files changed, 433 insertions(+), 8 deletions(-) create mode 100644 collectors/signal_returns.py diff --git a/collectors/signal_returns.py b/collectors/signal_returns.py new file mode 100644 index 0000000..b02e2b4 --- /dev/null +++ b/collectors/signal_returns.py @@ -0,0 +1,385 @@ +""" +collectors/signal_returns.py — Seed and backfill signal performance tables. + +Seeds score_performance (BUY signal entry prices) and predictor_outcomes +(predictor prediction records) from S3 artifacts. Backfills forward returns +by JOINing against universe_returns (already populated by the universe_returns +collector) — no yfinance or external API calls needed. + +Must run AFTER universe_returns in the Phase 1 pipeline. + +Target tables in research.db: + - score_performance: entry prices + 5d/10d/30d forward returns for BUY signals + - predictor_outcomes: prediction records + actual 5d alpha + correctness +""" + +from __future__ import annotations + +import json +import logging +import sqlite3 +from datetime import date + +import boto3 +import pandas as pd +from botocore.exceptions import ClientError + +logger = logging.getLogger(__name__) + + +def collect( + bucket: str, + db_path: str, + signals_prefix: str = "signals", + dry_run: bool = False, +) -> dict: + """Seed and backfill signal performance tables in research.db. + + Steps: + 1. Seed score_performance from S3 signals (entry prices from universe_returns) + 2. Backfill score_performance returns from universe_returns JOIN + 3. Seed predictor_outcomes from S3 predictions + 4. Backfill predictor_outcomes returns from universe_returns JOIN + + Returns dict with status, counts for each step. + """ + s3 = boto3.client("s3") + results = {} + + # Step 1: Seed score_performance + results["seed_score_performance"] = _seed_score_performance( + s3, bucket, db_path, signals_prefix, dry_run, + ) + + # Step 2: Backfill score_performance returns via universe_returns JOIN + results["backfill_score_returns"] = _backfill_score_returns(db_path, dry_run) + + # Step 3: Seed predictor_outcomes + results["seed_predictor_outcomes"] = _seed_predictor_outcomes( + s3, bucket, db_path, dry_run, + ) + + # Step 4: Backfill predictor_outcomes via universe_returns JOIN + results["backfill_predictor_returns"] = _backfill_predictor_returns(db_path, dry_run) + + # Upload updated research.db back to S3 + total_written = sum(r.get("rows_written", 0) for r in results.values()) + if not dry_run and total_written > 0: + try: + s3.upload_file(db_path, bucket, "research.db") + logger.info("Uploaded research.db to s3://%s/research.db", bucket) + except Exception as e: + logger.warning("Failed to upload research.db: %s", e) + + has_errors = any(r.get("status") == "error" for r in results.values()) + return { + "status": "partial" if has_errors else ("ok_dry_run" if dry_run else "ok"), + "total_written": total_written, + **results, + } + + +# ── Step 1: Seed score_performance ──────────────────────────────────────────── + + +def _seed_score_performance( + s3, bucket: str, db_path: str, signals_prefix: str, dry_run: bool, +) -> dict: + """Insert BUY-rated signals into score_performance with entry prices from universe_returns.""" + try: + conn = sqlite3.connect(db_path) + _ensure_score_performance_schema(conn) + + existing = { + (r[0], r[1]) for r in + conn.execute("SELECT symbol, score_date FROM score_performance").fetchall() + } + + # List signal dates from S3 + signal_dates = _list_signal_dates(s3, bucket, signals_prefix) + + rows_to_insert = [] + for sig_date in signal_dates: + try: + obj = s3.get_object(Bucket=bucket, Key=f"{signals_prefix}/{sig_date}/signals.json") + signals = json.loads(obj["Body"].read()) + except (ClientError, json.JSONDecodeError): + continue + + for stock in signals.get("universe", []): + ticker = stock.get("ticker") + score = stock.get("score", 0) + rating = stock.get("rating", "") + if not ticker or rating != "BUY" or (ticker, sig_date) in existing: + continue + rows_to_insert.append((ticker, sig_date, score)) + + # v1 format fallback + sigs = signals.get("signals", {}) + if isinstance(sigs, dict): + for ticker, s in sigs.items(): + score = s.get("score", 0) + rating = s.get("rating", "") + if rating != "BUY" or (ticker, sig_date) in existing: + continue + rows_to_insert.append((ticker, sig_date, score)) + + if not rows_to_insert: + conn.close() + return {"status": "ok", "rows_written": 0, "note": "all rows already seeded"} + + # Get entry prices from universe_returns (already in the DB) + inserted = 0 + for ticker, sig_date, score in rows_to_insert: + if score is None: + continue + # Look up entry price from universe_returns + row = conn.execute( + "SELECT close_price FROM universe_returns WHERE ticker = ? AND eval_date = ?", + (ticker, sig_date), + ).fetchone() + price = row[0] if row else None + if price is None: + continue + + if not dry_run: + conn.execute( + "INSERT OR IGNORE INTO score_performance (symbol, score_date, score, price_on_date) VALUES (?, ?, ?, ?)", + (ticker, sig_date, round(float(score), 2), round(price, 2)), + ) + inserted += 1 + + if not dry_run: + conn.commit() + conn.close() + + if inserted: + logger.info("Seeded %d score_performance rows from %d signal dates", inserted, len(signal_dates)) + return {"status": "ok", "rows_written": inserted} + + except Exception as e: + logger.error("seed_score_performance failed: %s", e) + return {"status": "error", "error": str(e), "rows_written": 0} + + +# ── Step 2: Backfill score_performance returns ──────────────────────────────── + + +def _backfill_score_returns(db_path: str, dry_run: bool) -> dict: + """Backfill 5d/10d/30d returns in score_performance by JOINing universe_returns.""" + try: + conn = sqlite3.connect(db_path) + _ensure_score_performance_schema(conn) + + updated = 0 + for horizon in ("5d", "10d", "30d"): + bdays = {"5d": 5, "10d": 10, "30d": 30}[horizon] + + # Find score_performance rows missing this horizon's return + pending = pd.read_sql_query( + f"SELECT symbol, score_date, price_on_date FROM score_performance WHERE return_{horizon} IS NULL", + conn, + ) + if pending.empty: + continue + + for _, row in pending.iterrows(): + ticker = row["symbol"] + score_date = row["score_date"] + entry_price = row["price_on_date"] + if entry_price is None: + continue + + # Look up forward return from universe_returns + ur = conn.execute( + f"SELECT return_{horizon}, spy_return_{horizon}, beat_spy_{horizon} FROM universe_returns WHERE ticker = ? AND eval_date = ?", + (ticker, score_date), + ).fetchone() + + if ur is None or ur[0] is None: + continue + + stock_return = ur[0] # already as decimal (e.g., 0.05) + spy_return = ur[1] + beat_spy = ur[2] + exit_price = round(entry_price * (1 + stock_return), 2) + + if not dry_run: + conn.execute( + f"UPDATE score_performance SET price_{horizon}=?, return_{horizon}=?, spy_{horizon}_return=?, beat_spy_{horizon}=? WHERE symbol=? AND score_date=? AND return_{horizon} IS NULL", + ( + exit_price, + round(stock_return * 100, 2), # stored as percentage + round(spy_return * 100, 2) if spy_return is not None else None, + beat_spy, + ticker, score_date, + ), + ) + updated += 1 + + # Repair: fix beat_spy columns where return exists but beat_spy is NULL + for horizon in ("5d", "10d", "30d"): + repaired = conn.execute( + f"UPDATE score_performance SET beat_spy_{horizon} = CASE WHEN return_{horizon} > spy_{horizon}_return THEN 1 ELSE 0 END WHERE return_{horizon} IS NOT NULL AND spy_{horizon}_return IS NOT NULL AND beat_spy_{horizon} IS NULL", + ).rowcount + if repaired: + logger.info("Repaired %d beat_spy_%s values", repaired, horizon) + + if not dry_run: + conn.commit() + conn.close() + + if updated: + logger.info("Backfilled %d score_performance returns via universe_returns JOIN", updated) + return {"status": "ok", "rows_written": updated} + + except Exception as e: + logger.error("backfill_score_returns failed: %s", e) + return {"status": "error", "error": str(e), "rows_written": 0} + + +# ── Step 3: Seed predictor_outcomes ─────────────────────────────────────────── + + +def _seed_predictor_outcomes(s3, bucket: str, db_path: str, dry_run: bool) -> dict: + """Seed predictor_outcomes from S3 predictions/*.json files.""" + try: + resp = s3.list_objects_v2(Bucket=bucket, Prefix="predictor/predictions/", Delimiter="/") + keys = [obj["Key"] for obj in resp.get("Contents", []) if obj["Key"].endswith(".json") and "latest" not in obj["Key"]] + + if not keys: + return {"status": "ok", "rows_written": 0, "note": "no prediction files in S3"} + + conn = sqlite3.connect(db_path) + existing = { + (r[0], r[1]) for r in + conn.execute("SELECT symbol, prediction_date FROM predictor_outcomes").fetchall() + } + + inserted = 0 + for key in keys: + try: + obj = s3.get_object(Bucket=bucket, Key=key) + data = json.loads(obj["Body"].read()) + pred_date = data.get("date") or key.split("/")[-1].replace(".json", "") + for p in data.get("predictions", []): + ticker = p.get("ticker") + if not ticker or (ticker, pred_date) in existing: + continue + if not dry_run: + conn.execute( + "INSERT INTO predictor_outcomes (symbol, prediction_date, predicted_direction, prediction_confidence, p_up, p_flat, p_down, score_modifier_applied) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (ticker, pred_date, p.get("predicted_direction"), p.get("prediction_confidence"), p.get("p_up"), p.get("p_flat"), p.get("p_down"), 0.0), + ) + existing.add((ticker, pred_date)) + inserted += 1 + except (ClientError, json.JSONDecodeError, KeyError) as e: + logger.info("Skipping prediction file %s: %s", key, e) + + if not dry_run: + conn.commit() + conn.close() + + if inserted: + logger.info("Seeded %d predictor_outcomes rows from %d S3 files", inserted, len(keys)) + return {"status": "ok", "rows_written": inserted} + + except Exception as e: + logger.error("seed_predictor_outcomes failed: %s", e) + return {"status": "error", "error": str(e), "rows_written": 0} + + +# ── Step 4: Backfill predictor_outcomes ─────────────────────────────────────── + + +def _backfill_predictor_returns(db_path: str, dry_run: bool) -> dict: + """Backfill actual_5d_return and correct_5d using universe_returns JOIN.""" + try: + conn = sqlite3.connect(db_path) + + pending = pd.read_sql_query( + "SELECT id, symbol, prediction_date, predicted_direction FROM predictor_outcomes WHERE actual_5d_return IS NULL", + conn, + ) + if pending.empty: + conn.close() + return {"status": "ok", "rows_written": 0} + + resolved = 0 + for _, row in pending.iterrows(): + ticker = row["symbol"] + pred_date = row["prediction_date"] + + # Look up 5d return from universe_returns + ur = conn.execute( + "SELECT return_5d, spy_return_5d FROM universe_returns WHERE ticker = ? AND eval_date = ?", + (ticker, pred_date), + ).fetchone() + + if ur is None or ur[0] is None: + continue + + stock_return = ur[0] # decimal + spy_return = ur[1] if ur[1] is not None else 0 + actual_alpha = stock_return - spy_return + + direction = row["predicted_direction"] + if direction == "UP": + correct = 1 if actual_alpha > 0 else 0 + elif direction == "DOWN": + correct = 1 if actual_alpha < 0 else 0 + elif direction == "FLAT": + correct = 1 if abs(actual_alpha) < 0.01 else 0 + else: + continue + + if not dry_run: + conn.execute( + "UPDATE predictor_outcomes SET actual_5d_return=?, correct_5d=? WHERE symbol=? AND prediction_date=?", + (round(actual_alpha * 100, 4), correct, ticker, pred_date), + ) + resolved += 1 + + if not dry_run: + conn.commit() + conn.close() + + if resolved: + logger.info("Backfilled %d predictor_outcomes via universe_returns JOIN", resolved) + return {"status": "ok", "rows_written": resolved} + + except Exception as e: + logger.error("backfill_predictor_returns failed: %s", e) + return {"status": "error", "error": str(e), "rows_written": 0} + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +def _ensure_score_performance_schema(conn) -> None: + """Add 5d return columns to score_performance if they don't exist yet.""" + cols = {r[1] for r in conn.execute("PRAGMA table_info(score_performance)").fetchall()} + for col, col_type in [ + ("price_5d", "REAL"), ("return_5d", "REAL"), ("spy_5d_return", "REAL"), + ("beat_spy_5d", "INTEGER"), ("eval_date_5d", "TEXT"), + ("price_10d", "REAL"), ("return_10d", "REAL"), ("spy_10d_return", "REAL"), + ("beat_spy_10d", "INTEGER"), ("eval_date_10d", "TEXT"), + ("price_30d", "REAL"), ("return_30d", "REAL"), ("spy_30d_return", "REAL"), + ("beat_spy_30d", "INTEGER"), ("eval_date_30d", "TEXT"), + ]: + if col not in cols: + conn.execute(f"ALTER TABLE score_performance ADD COLUMN {col} {col_type}") + conn.commit() + + +def _list_signal_dates(s3, bucket: str, prefix: str) -> list[str]: + """List all signal dates from S3.""" + dates = [] + paginator = s3.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket, Prefix=f"{prefix}/", Delimiter="/"): + for cp in page.get("CommonPrefixes", []): + part = cp["Prefix"].rstrip("/").rsplit("/", 1)[-1] + if len(part) == 10 and part[4] == "-" and part[7] == "-": + dates.append(part) + dates.sort() + return dates diff --git a/collectors/universe_returns.py b/collectors/universe_returns.py index d1fb5e1..85a8936 100644 --- a/collectors/universe_returns.py +++ b/collectors/universe_returns.py @@ -55,10 +55,13 @@ close_price REAL, return_5d REAL, return_10d REAL, + return_30d REAL, spy_return_5d REAL, spy_return_10d REAL, + spy_return_30d REAL, beat_spy_5d INTEGER, beat_spy_10d INTEGER, + beat_spy_30d INTEGER, sector_etf TEXT, sector_etf_return_5d REAL, beat_sector_5d INTEGER, @@ -199,10 +202,15 @@ def _load_sector_map(s3, bucket: str, key: str) -> dict[str, str] | None: # -- DB helpers --------------------------------------------------------------- def _ensure_table(db_path: str) -> None: - """Create universe_returns table if it doesn't exist.""" + """Create universe_returns table if it doesn't exist, and add new columns.""" conn = sqlite3.connect(db_path) try: conn.execute(_CREATE_TABLE_SQL) + # Add 30d columns if they don't exist (migration for existing DBs) + cols = {r[1] for r in conn.execute("PRAGMA table_info(universe_returns)").fetchall()} + for col, col_type in [("return_30d", "REAL"), ("spy_return_30d", "REAL"), ("beat_spy_30d", "INTEGER")]: + if col not in cols: + conn.execute(f"ALTER TABLE universe_returns ADD COLUMN {col} {col_type}") conn.commit() finally: conn.close() @@ -227,15 +235,15 @@ def _insert_rows(db_path: str, rows: list[dict]) -> int: try: conn.execute( "INSERT OR IGNORE INTO universe_returns " - "(ticker, eval_date, sector, close_price, return_5d, return_10d, " - "spy_return_5d, spy_return_10d, beat_spy_5d, beat_spy_10d, " + "(ticker, eval_date, sector, close_price, return_5d, return_10d, return_30d, " + "spy_return_5d, spy_return_10d, spy_return_30d, beat_spy_5d, beat_spy_10d, beat_spy_30d, " "sector_etf, sector_etf_return_5d, beat_sector_5d) " - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ( row["ticker"], row["eval_date"], row["sector"], - row["close_price"], row["return_5d"], row["return_10d"], - row["spy_return_5d"], row["spy_return_10d"], - row["beat_spy_5d"], row["beat_spy_10d"], + row["close_price"], row["return_5d"], row["return_10d"], row["return_30d"], + row["spy_return_5d"], row["spy_return_10d"], row["spy_return_30d"], + row["beat_spy_5d"], row["beat_spy_10d"], row["beat_spy_30d"], row["sector_etf"], row["sector_etf_return_5d"], row["beat_sector_5d"], ), @@ -260,6 +268,7 @@ def _build_rows_for_date( eval_dt = date.fromisoformat(eval_date) fwd_5d = _add_business_days(eval_dt, 5) fwd_10d = _add_business_days(eval_dt, 10) + fwd_30d = _add_business_days(eval_dt, 30) # Check that forward dates are in the past (returns can be computed) today = date.today() @@ -268,11 +277,13 @@ def _build_rows_for_date( return [] has_10d = fwd_10d < today + has_30d = fwd_30d < today # Fetch grouped-daily prices for eval_date and forward dates prices_t0 = polygon_client.get_grouped_daily(eval_date) prices_5d = polygon_client.get_grouped_daily(str(fwd_5d)) prices_10d = polygon_client.get_grouped_daily(str(fwd_10d)) if has_10d else {} + prices_30d = polygon_client.get_grouped_daily(str(fwd_30d)) if has_30d else {} if not prices_t0: logger.warning("No prices for eval_date %s — may be a non-trading day", eval_date) @@ -286,9 +297,11 @@ def _build_rows_for_date( spy_t0 = prices_t0.get("SPY", {}).get("close") spy_5d = prices_5d.get("SPY", {}).get("close") spy_10d = prices_10d.get("SPY", {}).get("close") if has_10d else None + spy_30d = prices_30d.get("SPY", {}).get("close") if has_30d else None spy_ret_5d = _pct_return(spy_t0, spy_5d) spy_ret_10d = _pct_return(spy_t0, spy_10d) if has_10d else None + spy_ret_30d = _pct_return(spy_t0, spy_30d) if has_30d else None # Sector ETF returns sector_etf_returns_5d: dict[str, float | None] = {} @@ -309,9 +322,11 @@ def _build_rows_for_date( close_5d = prices_5d.get(ticker, {}).get("close") close_10d = prices_10d.get(ticker, {}).get("close") if has_10d else None + close_30d = prices_30d.get(ticker, {}).get("close") if has_30d else None ret_5d = _pct_return(close_t0, close_5d) ret_10d = _pct_return(close_t0, close_10d) if has_10d else None + ret_30d = _pct_return(close_t0, close_30d) if has_30d else None # Sector classification sector_etf = sector_map.get(ticker) if sector_map else None @@ -325,10 +340,13 @@ def _build_rows_for_date( "close_price": round(close_t0, 2), "return_5d": round(ret_5d, 4) if ret_5d is not None else None, "return_10d": round(ret_10d, 4) if ret_10d is not None else None, + "return_30d": round(ret_30d, 4) if ret_30d is not None else None, "spy_return_5d": round(spy_ret_5d, 4) if spy_ret_5d is not None else None, "spy_return_10d": round(spy_ret_10d, 4) if spy_ret_10d is not None else None, + "spy_return_30d": round(spy_ret_30d, 4) if spy_ret_30d is not None else None, "beat_spy_5d": int(ret_5d > spy_ret_5d) if ret_5d is not None and spy_ret_5d is not None else None, "beat_spy_10d": int(ret_10d > spy_ret_10d) if ret_10d is not None and spy_ret_10d is not None else None, + "beat_spy_30d": int(ret_30d > spy_ret_30d) if ret_30d is not None and spy_ret_30d is not None else None, "sector_etf": sector_etf, "sector_etf_return_5d": round(etf_ret_5d, 4) if etf_ret_5d is not None else None, "beat_sector_5d": int(ret_5d > etf_ret_5d) if ret_5d is not None and etf_ret_5d is not None else None, diff --git a/weekly_collector.py b/weekly_collector.py index 2b12aa1..9841839 100644 --- a/weekly_collector.py +++ b/weekly_collector.py @@ -34,7 +34,7 @@ from ssm_secrets import load_secrets load_secrets() -from collectors import constituents, prices, slim_cache, macro, universe_returns, alternative, daily_closes, fundamentals +from collectors import constituents, prices, slim_cache, macro, universe_returns, signal_returns, alternative, daily_closes, fundamentals logger = logging.getLogger(__name__) @@ -225,6 +225,28 @@ def _run_phase1(config: dict, args: argparse.Namespace) -> dict: logger.error("Universe returns collection failed: %s", e) results["collectors"]["universe_returns"] = {"status": "error", "error": str(e)} + # ── 5b. Signal returns (score_performance + predictor_outcomes) ──────────── + if only in (None, "signal_returns"): + logger.info("=" * 60) + logger.info("COLLECTING: signal returns (score_performance + predictor_outcomes)") + logger.info("=" * 60) + # Reuse the same db_path from universe_returns (already pulled from S3) + sr_db_path = db_path + if sr_db_path: + try: + sr_result = signal_returns.collect( + bucket=bucket, + db_path=sr_db_path, + signals_prefix=ur_cfg.get("signals_prefix", "signals"), + dry_run=dry_run, + ) + results["collectors"]["signal_returns"] = sr_result + except Exception as e: + logger.error("Signal returns collection failed: %s", e) + results["collectors"]["signal_returns"] = {"status": "error", "error": str(e)} + else: + results["collectors"]["signal_returns"] = {"status": "skipped", "reason": "no research.db"} + # ── 6. Fundamentals ─────────────────────────────────────────────────────── if only in (None, "fundamentals"): logger.info("=" * 60)