diff --git a/collectors/universe_returns.py b/collectors/universe_returns.py index b52f0f2..683f690 100644 --- a/collectors/universe_returns.py +++ b/collectors/universe_returns.py @@ -75,24 +75,39 @@ def collect( db_path: str, signals_prefix: str = "signals", sector_map_key: str = "data/sector_map.json", + max_lookback_trading_days: int = 90, dry_run: bool = False, ) -> dict: """ - Populate universe_returns table with forward returns for all ~900 S&P stocks. + Populate universe_returns table with forward returns for every trading day. - Reads signal dates from S3, identifies dates missing from universe_returns, - fetches grouped-daily prices via polygon.io, and inserts rows. + Enumerates NYSE trading days directly from the trading calendar (not from + signal folders in S3). For each trading day whose 5d forward window has + closed, fetches polygon.io grouped-daily prices at t0 + t+5d, computes + return_5d per ticker and writes rows keyed by the trading day. + + This decouples universe_returns from research's signal cadence. The table + is now "5d forward returns, one row per ticker per trading day," which + is the natural grain for evaluation downstream — the backtester's + _scanner_lift / _team_lift / _cio_lift joins on eval_date and the + scanner/team/cio eval rows will always find a matching trading-day row + here regardless of whether research happened to run that week. Args: - bucket: S3 bucket name + bucket: S3 bucket name (research.db location) db_path: path to local research.db - signals_prefix: S3 prefix for signals (e.g. "signals") + signals_prefix: deprecated, kept for API compatibility with the + previous signal-folder-driven enumeration. Unused. sector_map_key: S3 key for sector map JSON + max_lookback_trading_days: how far back to walk. Default 90 trading + days (~18 calendar weeks) which is enough for rolling evaluator + windows and well past any single weekly run's catch-up needs. dry_run: if True, compute but don't write to DB Returns: dict with status, dates_processed, rows_inserted, errors """ + del signals_prefix # deprecated; kept in the signature for call-site compat from polygon_client import polygon_client try: @@ -101,77 +116,32 @@ def collect( logger.warning("Polygon client init failed: %s", e) return {"status": "error", "error": str(e)} - # List signal dates from S3 s3 = boto3.client("s3") - eval_dates = _list_signal_dates(s3, bucket, signals_prefix) - if not eval_dates: - logger.warning("No signal dates found in s3://%s/%s/", bucket, signals_prefix) - return {"status": "ok", "dates_processed": 0, "rows_inserted": 0, "skipped": 0} - - # Load sector map sector_map = _load_sector_map(s3, bucket, sector_map_key) - # Ensure table exists _ensure_table(db_path) existing = _get_existing_dates(db_path) - # Drop non-trading-day signal folders. Research runs before the - # next_trading_day stamping fix (alpha-engine-research 9a94e34, 2026-04-13) - # wrote signals/{Sat,Sun}/... which have no market data — attempting to - # process them produces rows with NULL return_5d that then get stuck in - # the `existing` set and prevent real reprocessing. Keeping the filter - # here is also a defense against future misstamping. - eval_dates = [d for d in eval_dates if _is_trading_day(d)] - - dates_to_process = [d for d in eval_dates if d not in existing] - if not dates_to_process: - logger.info("All %d eval_dates already in universe_returns", len(eval_dates)) - return { - "status": "ok" if not dry_run else "ok_dry_run", - "dates_processed": 0, - "rows_inserted": 0, - "skipped": len(eval_dates), - "deferred": 0, - } - - # Pre-filter dates whose 5d forward window has not yet closed. These are - # legitimately not computable yet (we need 5 trading days of forward - # prices to measure returns) and will be picked up in a future run — they - # are NOT errors. Classifying them as errors produced a spurious `partial` - # status that got silently swallowed by the old soft-fail path. today = date.today() - deferred = [ - d for d in dates_to_process - if _add_business_days(date.fromisoformat(d), 5) >= today - ] - deferred_set = set(deferred) - dates_to_process = [d for d in dates_to_process if d not in deferred_set] - - if deferred: - logger.info( - "Deferring %d eval_dates — 5d forward window not yet complete " - "(will be picked up in a future run): %s", - len(deferred), - deferred if len(deferred) <= 5 else f"{deferred[:3]}...+{len(deferred)-3} more", - ) + dates_to_process = _trading_days_to_process( + today, max_lookback_trading_days, existing + ) if not dates_to_process: logger.info( - "No eval_dates ready to process (%d already in DB, %d deferred)", - len(existing), len(deferred), + "All trading days in the last %d lookback already have return_5d populated", + max_lookback_trading_days, ) return { "status": "ok" if not dry_run else "ok_dry_run", "dates_processed": 0, "rows_inserted": 0, "skipped": len(existing), - "deferred": len(deferred), } logger.info( - "Processing %d eval_dates for universe_returns " - "(%d already exist, %d deferred)", - len(dates_to_process), len(existing), len(deferred), + "Processing %d trading days (lookback=%d, %d already populated)", + len(dates_to_process), max_lookback_trading_days, len(existing), ) total_inserted = 0 @@ -205,8 +175,9 @@ def collect( # Any real error (exception or "no rows computed" after pre-filter) is a # hard failure under the no-silent-fails rule. The old `partial` path was - # being dropped by the Step Function. Deferred dates (5d forward window - # not yet complete) are NOT errors — they were pre-filtered out above. + # being dropped by the Step Function. The trading-day enumerator only + # yields dates whose 5d forward window has closed, so there is no + # "deferred" concept — every enqueued date is expected to succeed. if errors: status = "error" elif dry_run: @@ -218,25 +189,48 @@ def collect( "status": status, "dates_processed": len(dates_to_process), "rows_inserted": total_inserted, - "deferred": len(deferred), "errors": errors[:20], } -# -- Signal date listing ----------------------------------------------------- +# -- Trading-day enumeration ------------------------------------------------- -def _list_signal_dates(s3, bucket: str, prefix: str) -> list[str]: - """List all signal dates from S3 (YYYY-MM-DD directories under prefix/).""" - dates = [] - paginator = s3.get_paginator("list_objects_v2") - for page in paginator.paginate(Bucket=bucket, Prefix=f"{prefix}/", Delimiter="/"): - for cp in page.get("CommonPrefixes", []): - # prefix/2026-03-28/ -> 2026-03-28 - part = cp["Prefix"].rstrip("/").rsplit("/", 1)[-1] - if len(part) == 10 and part[4] == "-" and part[7] == "-": - dates.append(part) - dates.sort() - return dates +def _trading_days_to_process( + today: date, + max_lookback: int, + existing: set[str], +) -> list[str]: + """Enumerate NYSE trading days whose 5d forward window has closed. + + Walks backwards from `today` across up to `max_lookback` trading days + (skipping weekends and NYSE holidays). For each, includes the date in + the result when: + - the 5d forward window has closed (so return_5d is computable), AND + - the date is not already in `existing` (the set of dates that have + return_5d populated in the DB). + + Returns ISO dates sorted chronologically. The trading-calendar module + at the repo root handles holiday awareness (market closures through 2030). + """ + from trading_calendar import is_trading_day as nyse_is_trading_day + + out: list[str] = [] + d = today + trading_days_seen = 0 + # Sanity fence — cap walk at roughly max_lookback * 1.5 in calendar days + # to protect against a broken is_trading_day implementation looping forever. + calendar_budget = max_lookback * 3 + 30 + while trading_days_seen < max_lookback and calendar_budget > 0: + if nyse_is_trading_day(d): + trading_days_seen += 1 + iso = d.isoformat() + fwd_5d = _add_business_days(d, 5) + if fwd_5d < today and iso not in existing: + out.append(iso) + d -= timedelta(days=1) + calendar_budget -= 1 + out.sort() + return out # -- Sector map loading ------------------------------------------------------ @@ -288,11 +282,6 @@ def _get_existing_dates(db_path: str) -> set[str]: conn.close() -def _is_trading_day(date_str: str) -> bool: - """True if YYYY-MM-DD is Mon–Fri. Does not account for market holidays.""" - return date.fromisoformat(date_str).weekday() < 5 - - def _insert_rows(db_path: str, rows: list[dict]) -> int: """Insert rows into universe_returns; reprocessed dates overwrite stale rows.