From 45fc7adbb1cc243eaf974c26b34dd36455d99913 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Wed, 15 Apr 2026 16:21:22 -0700 Subject: [PATCH] feat(universe_returns): key on trading days, decouple from signal folders MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rewrites the collector enumeration from "list signal folder dates in S3 and process each" to "walk backwards through NYSE trading days and process any whose 5d forward window has closed." The table's grain is now "one row per ticker per trading day" — the natural grain for downstream evaluation. Why (discussion on 2026-04-15): The backtester's _scanner_lift / _team_lift / _cio_lift joins on (ticker, eval_date) between scanner_evaluations etc. and universe_returns. Before this change, universe_returns eval_dates came from signal folder names — sporadic and timestamped with whatever research happened to stamp (run date, next trading day, etc.). The join's behaviour then depended on whether research happened to run that week and on which date-stamping convention was in effect. After this change, universe_returns stores a row for every NYSE trading day in a 90-day rolling window, regardless of research cadence. The evaluator can match any trading-day eval_date on the scanner/team/cio side; schedule drift or research misfires no longer blow holes in the grade surface. Implementation: - Added _trading_days_to_process(today, max_lookback, existing) that walks backwards through trading days (via the repo-root trading_calendar.is_trading_day for holiday awareness) and yields dates whose +5 business days window has closed and which are not already populated. - collect() now uses the enumerator directly. The signals_prefix arg is kept in the signature for call-site compatibility (weekly_collector.py passes it) but is unused. - Added max_lookback_trading_days arg (default 90) — enough for all rolling evaluator windows with plenty of slack. - Removed _list_signal_dates (dead) and the local _is_trading_day helper (superseded by the holiday-aware shared module function). - Removed the "deferred" concept from the return dict; the enumerator guarantees every enqueued date has a closed window. Builds on PR #38's still-valid changes (INSERT OR REPLACE + NULL-aware _get_existing_dates) which let reprocessed trading days actually overwrite stale NULL-return rows. Test suite: 46/46 unit tests pass. Smoke-tested _trading_days_to_process with today=2026-04-15, max_lookback=15 → returns 9 trading days (3/25 through 4/7, correctly skipping Good Friday 2026-04-03 and recent dates whose 5d window hasn't closed). --- collectors/universe_returns.py | 145 +++++++++++++++------------------ 1 file changed, 67 insertions(+), 78 deletions(-) diff --git a/collectors/universe_returns.py b/collectors/universe_returns.py index b52f0f2..683f690 100644 --- a/collectors/universe_returns.py +++ b/collectors/universe_returns.py @@ -75,24 +75,39 @@ def collect( db_path: str, signals_prefix: str = "signals", sector_map_key: str = "data/sector_map.json", + max_lookback_trading_days: int = 90, dry_run: bool = False, ) -> dict: """ - Populate universe_returns table with forward returns for all ~900 S&P stocks. + Populate universe_returns table with forward returns for every trading day. - Reads signal dates from S3, identifies dates missing from universe_returns, - fetches grouped-daily prices via polygon.io, and inserts rows. + Enumerates NYSE trading days directly from the trading calendar (not from + signal folders in S3). For each trading day whose 5d forward window has + closed, fetches polygon.io grouped-daily prices at t0 + t+5d, computes + return_5d per ticker and writes rows keyed by the trading day. + + This decouples universe_returns from research's signal cadence. The table + is now "5d forward returns, one row per ticker per trading day," which + is the natural grain for evaluation downstream — the backtester's + _scanner_lift / _team_lift / _cio_lift joins on eval_date and the + scanner/team/cio eval rows will always find a matching trading-day row + here regardless of whether research happened to run that week. Args: - bucket: S3 bucket name + bucket: S3 bucket name (research.db location) db_path: path to local research.db - signals_prefix: S3 prefix for signals (e.g. "signals") + signals_prefix: deprecated, kept for API compatibility with the + previous signal-folder-driven enumeration. Unused. sector_map_key: S3 key for sector map JSON + max_lookback_trading_days: how far back to walk. Default 90 trading + days (~18 calendar weeks) which is enough for rolling evaluator + windows and well past any single weekly run's catch-up needs. dry_run: if True, compute but don't write to DB Returns: dict with status, dates_processed, rows_inserted, errors """ + del signals_prefix # deprecated; kept in the signature for call-site compat from polygon_client import polygon_client try: @@ -101,77 +116,32 @@ def collect( logger.warning("Polygon client init failed: %s", e) return {"status": "error", "error": str(e)} - # List signal dates from S3 s3 = boto3.client("s3") - eval_dates = _list_signal_dates(s3, bucket, signals_prefix) - if not eval_dates: - logger.warning("No signal dates found in s3://%s/%s/", bucket, signals_prefix) - return {"status": "ok", "dates_processed": 0, "rows_inserted": 0, "skipped": 0} - - # Load sector map sector_map = _load_sector_map(s3, bucket, sector_map_key) - # Ensure table exists _ensure_table(db_path) existing = _get_existing_dates(db_path) - # Drop non-trading-day signal folders. Research runs before the - # next_trading_day stamping fix (alpha-engine-research 9a94e34, 2026-04-13) - # wrote signals/{Sat,Sun}/... which have no market data — attempting to - # process them produces rows with NULL return_5d that then get stuck in - # the `existing` set and prevent real reprocessing. Keeping the filter - # here is also a defense against future misstamping. - eval_dates = [d for d in eval_dates if _is_trading_day(d)] - - dates_to_process = [d for d in eval_dates if d not in existing] - if not dates_to_process: - logger.info("All %d eval_dates already in universe_returns", len(eval_dates)) - return { - "status": "ok" if not dry_run else "ok_dry_run", - "dates_processed": 0, - "rows_inserted": 0, - "skipped": len(eval_dates), - "deferred": 0, - } - - # Pre-filter dates whose 5d forward window has not yet closed. These are - # legitimately not computable yet (we need 5 trading days of forward - # prices to measure returns) and will be picked up in a future run — they - # are NOT errors. Classifying them as errors produced a spurious `partial` - # status that got silently swallowed by the old soft-fail path. today = date.today() - deferred = [ - d for d in dates_to_process - if _add_business_days(date.fromisoformat(d), 5) >= today - ] - deferred_set = set(deferred) - dates_to_process = [d for d in dates_to_process if d not in deferred_set] - - if deferred: - logger.info( - "Deferring %d eval_dates — 5d forward window not yet complete " - "(will be picked up in a future run): %s", - len(deferred), - deferred if len(deferred) <= 5 else f"{deferred[:3]}...+{len(deferred)-3} more", - ) + dates_to_process = _trading_days_to_process( + today, max_lookback_trading_days, existing + ) if not dates_to_process: logger.info( - "No eval_dates ready to process (%d already in DB, %d deferred)", - len(existing), len(deferred), + "All trading days in the last %d lookback already have return_5d populated", + max_lookback_trading_days, ) return { "status": "ok" if not dry_run else "ok_dry_run", "dates_processed": 0, "rows_inserted": 0, "skipped": len(existing), - "deferred": len(deferred), } logger.info( - "Processing %d eval_dates for universe_returns " - "(%d already exist, %d deferred)", - len(dates_to_process), len(existing), len(deferred), + "Processing %d trading days (lookback=%d, %d already populated)", + len(dates_to_process), max_lookback_trading_days, len(existing), ) total_inserted = 0 @@ -205,8 +175,9 @@ def collect( # Any real error (exception or "no rows computed" after pre-filter) is a # hard failure under the no-silent-fails rule. The old `partial` path was - # being dropped by the Step Function. Deferred dates (5d forward window - # not yet complete) are NOT errors — they were pre-filtered out above. + # being dropped by the Step Function. The trading-day enumerator only + # yields dates whose 5d forward window has closed, so there is no + # "deferred" concept — every enqueued date is expected to succeed. if errors: status = "error" elif dry_run: @@ -218,25 +189,48 @@ def collect( "status": status, "dates_processed": len(dates_to_process), "rows_inserted": total_inserted, - "deferred": len(deferred), "errors": errors[:20], } -# -- Signal date listing ----------------------------------------------------- +# -- Trading-day enumeration ------------------------------------------------- -def _list_signal_dates(s3, bucket: str, prefix: str) -> list[str]: - """List all signal dates from S3 (YYYY-MM-DD directories under prefix/).""" - dates = [] - paginator = s3.get_paginator("list_objects_v2") - for page in paginator.paginate(Bucket=bucket, Prefix=f"{prefix}/", Delimiter="/"): - for cp in page.get("CommonPrefixes", []): - # prefix/2026-03-28/ -> 2026-03-28 - part = cp["Prefix"].rstrip("/").rsplit("/", 1)[-1] - if len(part) == 10 and part[4] == "-" and part[7] == "-": - dates.append(part) - dates.sort() - return dates +def _trading_days_to_process( + today: date, + max_lookback: int, + existing: set[str], +) -> list[str]: + """Enumerate NYSE trading days whose 5d forward window has closed. + + Walks backwards from `today` across up to `max_lookback` trading days + (skipping weekends and NYSE holidays). For each, includes the date in + the result when: + - the 5d forward window has closed (so return_5d is computable), AND + - the date is not already in `existing` (the set of dates that have + return_5d populated in the DB). + + Returns ISO dates sorted chronologically. The trading-calendar module + at the repo root handles holiday awareness (market closures through 2030). + """ + from trading_calendar import is_trading_day as nyse_is_trading_day + + out: list[str] = [] + d = today + trading_days_seen = 0 + # Sanity fence — cap walk at roughly max_lookback * 1.5 in calendar days + # to protect against a broken is_trading_day implementation looping forever. + calendar_budget = max_lookback * 3 + 30 + while trading_days_seen < max_lookback and calendar_budget > 0: + if nyse_is_trading_day(d): + trading_days_seen += 1 + iso = d.isoformat() + fwd_5d = _add_business_days(d, 5) + if fwd_5d < today and iso not in existing: + out.append(iso) + d -= timedelta(days=1) + calendar_budget -= 1 + out.sort() + return out # -- Sector map loading ------------------------------------------------------ @@ -288,11 +282,6 @@ def _get_existing_dates(db_path: str) -> set[str]: conn.close() -def _is_trading_day(date_str: str) -> bool: - """True if YYYY-MM-DD is Mon–Fri. Does not account for market holidays.""" - return date.fromisoformat(date_str).weekday() < 5 - - def _insert_rows(db_path: str, rows: list[dict]) -> int: """Insert rows into universe_returns; reprocessed dates overwrite stale rows.