Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 67 additions & 78 deletions collectors/universe_returns.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,39 @@ def collect(
db_path: str,
signals_prefix: str = "signals",
sector_map_key: str = "data/sector_map.json",
max_lookback_trading_days: int = 90,
dry_run: bool = False,
) -> dict:
"""
Populate universe_returns table with forward returns for all ~900 S&P stocks.
Populate universe_returns table with forward returns for every trading day.

Reads signal dates from S3, identifies dates missing from universe_returns,
fetches grouped-daily prices via polygon.io, and inserts rows.
Enumerates NYSE trading days directly from the trading calendar (not from
signal folders in S3). For each trading day whose 5d forward window has
closed, fetches polygon.io grouped-daily prices at t0 + t+5d, computes
return_5d per ticker and writes rows keyed by the trading day.

This decouples universe_returns from research's signal cadence. The table
is now "5d forward returns, one row per ticker per trading day," which
is the natural grain for evaluation downstream — the backtester's
_scanner_lift / _team_lift / _cio_lift joins on eval_date and the
scanner/team/cio eval rows will always find a matching trading-day row
here regardless of whether research happened to run that week.

Args:
bucket: S3 bucket name
bucket: S3 bucket name (research.db location)
db_path: path to local research.db
signals_prefix: S3 prefix for signals (e.g. "signals")
signals_prefix: deprecated, kept for API compatibility with the
previous signal-folder-driven enumeration. Unused.
sector_map_key: S3 key for sector map JSON
max_lookback_trading_days: how far back to walk. Default 90 trading
days (~18 calendar weeks) which is enough for rolling evaluator
windows and well past any single weekly run's catch-up needs.
dry_run: if True, compute but don't write to DB

Returns:
dict with status, dates_processed, rows_inserted, errors
"""
del signals_prefix # deprecated; kept in the signature for call-site compat
from polygon_client import polygon_client

try:
Expand All @@ -101,77 +116,32 @@ def collect(
logger.warning("Polygon client init failed: %s", e)
return {"status": "error", "error": str(e)}

# List signal dates from S3
s3 = boto3.client("s3")
eval_dates = _list_signal_dates(s3, bucket, signals_prefix)
if not eval_dates:
logger.warning("No signal dates found in s3://%s/%s/", bucket, signals_prefix)
return {"status": "ok", "dates_processed": 0, "rows_inserted": 0, "skipped": 0}

# Load sector map
sector_map = _load_sector_map(s3, bucket, sector_map_key)

# Ensure table exists
_ensure_table(db_path)
existing = _get_existing_dates(db_path)

# Drop non-trading-day signal folders. Research runs before the
# next_trading_day stamping fix (alpha-engine-research 9a94e34, 2026-04-13)
# wrote signals/{Sat,Sun}/... which have no market data — attempting to
# process them produces rows with NULL return_5d that then get stuck in
# the `existing` set and prevent real reprocessing. Keeping the filter
# here is also a defense against future misstamping.
eval_dates = [d for d in eval_dates if _is_trading_day(d)]

dates_to_process = [d for d in eval_dates if d not in existing]
if not dates_to_process:
logger.info("All %d eval_dates already in universe_returns", len(eval_dates))
return {
"status": "ok" if not dry_run else "ok_dry_run",
"dates_processed": 0,
"rows_inserted": 0,
"skipped": len(eval_dates),
"deferred": 0,
}

# Pre-filter dates whose 5d forward window has not yet closed. These are
# legitimately not computable yet (we need 5 trading days of forward
# prices to measure returns) and will be picked up in a future run — they
# are NOT errors. Classifying them as errors produced a spurious `partial`
# status that got silently swallowed by the old soft-fail path.
today = date.today()
deferred = [
d for d in dates_to_process
if _add_business_days(date.fromisoformat(d), 5) >= today
]
deferred_set = set(deferred)
dates_to_process = [d for d in dates_to_process if d not in deferred_set]

if deferred:
logger.info(
"Deferring %d eval_dates — 5d forward window not yet complete "
"(will be picked up in a future run): %s",
len(deferred),
deferred if len(deferred) <= 5 else f"{deferred[:3]}...+{len(deferred)-3} more",
)
dates_to_process = _trading_days_to_process(
today, max_lookback_trading_days, existing
)

if not dates_to_process:
logger.info(
"No eval_dates ready to process (%d already in DB, %d deferred)",
len(existing), len(deferred),
"All trading days in the last %d lookback already have return_5d populated",
max_lookback_trading_days,
)
return {
"status": "ok" if not dry_run else "ok_dry_run",
"dates_processed": 0,
"rows_inserted": 0,
"skipped": len(existing),
"deferred": len(deferred),
}

logger.info(
"Processing %d eval_dates for universe_returns "
"(%d already exist, %d deferred)",
len(dates_to_process), len(existing), len(deferred),
"Processing %d trading days (lookback=%d, %d already populated)",
len(dates_to_process), max_lookback_trading_days, len(existing),
)

total_inserted = 0
Expand Down Expand Up @@ -205,8 +175,9 @@ def collect(

# Any real error (exception or "no rows computed" after pre-filter) is a
# hard failure under the no-silent-fails rule. The old `partial` path was
# being dropped by the Step Function. Deferred dates (5d forward window
# not yet complete) are NOT errors — they were pre-filtered out above.
# being dropped by the Step Function. The trading-day enumerator only
# yields dates whose 5d forward window has closed, so there is no
# "deferred" concept — every enqueued date is expected to succeed.
if errors:
status = "error"
elif dry_run:
Expand All @@ -218,25 +189,48 @@ def collect(
"status": status,
"dates_processed": len(dates_to_process),
"rows_inserted": total_inserted,
"deferred": len(deferred),
"errors": errors[:20],
}


# -- Signal date listing -----------------------------------------------------
# -- Trading-day enumeration -------------------------------------------------

def _list_signal_dates(s3, bucket: str, prefix: str) -> list[str]:
"""List all signal dates from S3 (YYYY-MM-DD directories under prefix/)."""
dates = []
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=f"{prefix}/", Delimiter="/"):
for cp in page.get("CommonPrefixes", []):
# prefix/2026-03-28/ -> 2026-03-28
part = cp["Prefix"].rstrip("/").rsplit("/", 1)[-1]
if len(part) == 10 and part[4] == "-" and part[7] == "-":
dates.append(part)
dates.sort()
return dates
def _trading_days_to_process(
today: date,
max_lookback: int,
existing: set[str],
) -> list[str]:
"""Enumerate NYSE trading days whose 5d forward window has closed.

Walks backwards from `today` across up to `max_lookback` trading days
(skipping weekends and NYSE holidays). For each, includes the date in
the result when:
- the 5d forward window has closed (so return_5d is computable), AND
- the date is not already in `existing` (the set of dates that have
return_5d populated in the DB).

Returns ISO dates sorted chronologically. The trading-calendar module
at the repo root handles holiday awareness (market closures through 2030).
"""
from trading_calendar import is_trading_day as nyse_is_trading_day

out: list[str] = []
d = today
trading_days_seen = 0
# Sanity fence — cap walk at roughly max_lookback * 1.5 in calendar days
# to protect against a broken is_trading_day implementation looping forever.
calendar_budget = max_lookback * 3 + 30
while trading_days_seen < max_lookback and calendar_budget > 0:
if nyse_is_trading_day(d):
trading_days_seen += 1
iso = d.isoformat()
fwd_5d = _add_business_days(d, 5)
if fwd_5d < today and iso not in existing:
out.append(iso)
d -= timedelta(days=1)
calendar_budget -= 1
out.sort()
return out


# -- Sector map loading ------------------------------------------------------
Expand Down Expand Up @@ -288,11 +282,6 @@ def _get_existing_dates(db_path: str) -> set[str]:
conn.close()


def _is_trading_day(date_str: str) -> bool:
"""True if YYYY-MM-DD is Mon–Fri. Does not account for market holidays."""
return date.fromisoformat(date_str).weekday() < 5


def _insert_rows(db_path: str, rows: list[dict]) -> int:
"""Insert rows into universe_returns; reprocessed dates overwrite stale rows.

Expand Down
Loading