Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions collectors/universe_returns.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ def collect(
_ensure_table(db_path)
existing = _get_existing_dates(db_path)

# Drop non-trading-day signal folders. Research runs before the
# next_trading_day stamping fix (alpha-engine-research 9a94e34, 2026-04-13)
# wrote signals/{Sat,Sun}/... which have no market data — attempting to
# process them produces rows with NULL return_5d that then get stuck in
# the `existing` set and prevent real reprocessing. Keeping the filter
# here is also a defense against future misstamping.
eval_dates = [d for d in eval_dates if _is_trading_day(d)]

dates_to_process = [d for d in eval_dates if d not in existing]
if not dates_to_process:
logger.info("All %d eval_dates already in universe_returns", len(eval_dates))
Expand Down Expand Up @@ -261,24 +269,45 @@ def _ensure_table(db_path: str) -> None:


def _get_existing_dates(db_path: str) -> set[str]:
"""Return set of eval_dates already populated."""
"""Return set of eval_dates that have return_5d fully populated.

A date is considered "existing" only when at least one row has return_5d
set. Dates where rows landed but return_5d stayed NULL (e.g. the 5d
forward window hadn't closed when the collector ran, and the date was
then frozen out by the prior all-dates-in-DB skip) get reprocessed so
the returns can be backfilled.
"""
conn = sqlite3.connect(db_path)
try:
rows = conn.execute("SELECT DISTINCT eval_date FROM universe_returns").fetchall()
rows = conn.execute(
"SELECT DISTINCT eval_date FROM universe_returns "
"WHERE return_5d IS NOT NULL"
).fetchall()
return {r[0] for r in rows}
finally:
conn.close()


def _is_trading_day(date_str: str) -> bool:
"""True if YYYY-MM-DD is Mon–Fri. Does not account for market holidays."""
return date.fromisoformat(date_str).weekday() < 5


def _insert_rows(db_path: str, rows: list[dict]) -> int:
"""Insert rows into universe_returns, skipping duplicates."""
"""Insert rows into universe_returns; reprocessed dates overwrite stale rows.

Uses INSERT OR REPLACE so a date that was previously inserted with NULL
forward-return columns (because the 5d window hadn't closed yet) gets
its returns filled in on reprocessing. The previous INSERT OR IGNORE
behaviour left those NULL rows stuck forever.
"""
conn = sqlite3.connect(db_path)
try:
inserted = 0
for row in rows:
try:
conn.execute(
"INSERT OR IGNORE INTO universe_returns "
"INSERT OR REPLACE INTO universe_returns "
"(ticker, eval_date, sector, close_price, return_5d, return_10d, return_30d, "
"spy_return_5d, spy_return_10d, spy_return_30d, beat_spy_5d, beat_spy_10d, beat_spy_30d, "
"sector_etf, sector_etf_return_5d, beat_sector_5d) "
Expand Down
Loading