Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions collectors/daily_closes.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,18 @@ def collect(
# Check if already written for this date
key = f"{s3_prefix}{run_date}.parquet"
if not dry_run:
from botocore.exceptions import ClientError
try:
s3.head_object(Bucket=bucket, Key=key)
logger.info("Daily closes already exist for %s — skipping", run_date)
return {"status": "ok", "tickers_captured": 0, "skipped": True}
except Exception:
pass # Not found — proceed
except ClientError as exc:
err_code = exc.response.get("Error", {}).get("Code")
if err_code not in ("404", "NoSuchKey"):
# Auth failure, throttling, or network — not "file doesn't exist".
# Don't silently paper over it.
raise
# 404/NoSuchKey: expected case — file doesn't exist, proceed to write.

if not tickers:
return {"status": "error", "error": "no tickers provided"}
Expand Down Expand Up @@ -206,7 +212,7 @@ def _fetch_yfinance_closes(
})
count += 1
except Exception as e:
logger.debug("yfinance close extract failed for %s: %s", ticker, e)
logger.warning("yfinance close extract failed for %s: %s", ticker, e)
except Exception as e:
logger.warning("yfinance batch failed: %s", e)

Expand Down
17 changes: 10 additions & 7 deletions collectors/macro.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,15 +298,18 @@ def _compute_market_breadth(price_data: dict[str, pd.DataFrame]) -> dict:


def load_from_s3(bucket: str, s3_prefix: str = "market_data/") -> dict | None:
"""Load the latest macro.json from S3. Returns None if not found."""
"""Load the latest macro.json from S3. Returns None if the pointer is missing; raises on unexpected errors."""
from botocore.exceptions import ClientError
s3 = boto3.client("s3")
try:
resp = s3.get_object(Bucket=bucket, Key=f"{s3_prefix}latest_weekly.json")
pointer = json.loads(resp["Body"].read())
date = pointer.get("date")
if not date:
except ClientError as exc:
if exc.response.get("Error", {}).get("Code") in ("404", "NoSuchKey"):
return None
resp = s3.get_object(Bucket=bucket, Key=f"{s3_prefix}weekly/{date}/macro.json")
return json.loads(resp["Body"].read())
except Exception:
raise
pointer = json.loads(resp["Body"].read())
date = pointer.get("date")
if not date:
return None
resp = s3.get_object(Bucket=bucket, Key=f"{s3_prefix}weekly/{date}/macro.json")
return json.loads(resp["Body"].read())
75 changes: 51 additions & 24 deletions features/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,30 +122,45 @@ def _load_delta_from_daily_closes(

ticker_rows: dict[str, list[dict]] = {}

n_missing_dates = 0
for d in delta_dates:
key = f"predictor/daily_closes/{d}.parquet"
try:
obj = s3.get_object(Bucket=bucket, Key=key)
buf = io.BytesIO(obj["Body"].read())
day_df = pd.read_parquet(buf, engine="pyarrow")
# daily_closes: index=ticker (str), columns=[date, Open, High, Low, Close, Adj_Close, Volume]
for ticker, row in day_df.iterrows():
if ticker not in ticker_rows:
ticker_rows[ticker] = []
ticker_rows[ticker].append({
"date": pd.Timestamp(d),
"Open": float(row.get("Open", np.nan)),
"High": float(row.get("High", np.nan)),
"Low": float(row.get("Low", np.nan)),
"Close": float(row.get("Close", np.nan)),
"Volume": int(row.get("Volume", 0)),
})
except Exception:
log.debug("daily_closes/%s.parquet not found in S3 (non-trading day?)", d)
except s3.exceptions.NoSuchKey:
# Market holiday within the business-day range (e.g., Good Friday).
log.warning("daily_closes/%s.parquet missing (market holiday?)", d)
n_missing_dates += 1
continue
except Exception as exc:
raise RuntimeError(
f"Unexpected S3 error reading daily_closes/{d}.parquet: {exc}"
) from exc
buf = io.BytesIO(obj["Body"].read())
day_df = pd.read_parquet(buf, engine="pyarrow")
for ticker, row in day_df.iterrows():
if ticker not in ticker_rows:
ticker_rows[ticker] = []
ticker_rows[ticker].append({
"date": pd.Timestamp(d),
"Open": float(row.get("Open", np.nan)),
"High": float(row.get("High", np.nan)),
"Low": float(row.get("Low", np.nan)),
"Close": float(row.get("Close", np.nan)),
"Volume": int(row.get("Volume", 0)),
})

n_tickers = len(ticker_rows)
n_rows = sum(len(v) for v in ticker_rows.values())
log.info("Delta loaded: %d rows across %d tickers", n_rows, n_tickers)
log.info(
"Delta loaded: %d rows across %d tickers (%d/%d dates missing)",
n_rows, n_tickers, n_missing_dates, len(delta_dates),
)
if delta_dates and n_missing_dates == len(delta_dates):
raise RuntimeError(
f"Every date in delta range was missing ({len(delta_dates)} dates) — "
"daily_closes writer is likely broken upstream"
)
return ticker_rows


Expand Down Expand Up @@ -353,7 +368,7 @@ def _load_cached_alternative(s3, bucket: str) -> dict[str, dict]:
return alt_data

except Exception as exc:
log.debug("No cached alternative data available: %s", exc)
log.warning("No cached alternative data loaded — alternative features will use defaults: %s", exc)
return {}


Expand Down Expand Up @@ -460,18 +475,26 @@ def compute_and_write(
n_ok += 1

except Exception as exc:
log.debug("Feature computation failed for %s: %s", ticker, exc)
log.warning("Feature computation failed for %s: %s", ticker, exc)
n_err += 1

t_compute = time.time() - t0 - t_load
log.info(
"Feature computation complete in %.1fs: %d OK, %d skipped, %d errors",
t_compute, n_ok, n_skip, n_err,
"Feature computation complete in %.1fs: %d OK, %d skipped, %d errors "
"(of %d universe tickers)",
t_compute, n_ok, n_skip, n_err, len(universe_tickers),
)

if not store_rows:
log.error("No features computed — nothing to write")
return {"status": "error", "error": "no_features_computed"}
raise RuntimeError(
"Feature store compute produced zero features — nothing to write"
)

if universe_tickers and n_err / len(universe_tickers) > 0.05:
raise RuntimeError(
f"Feature computation error rate {n_err / len(universe_tickers):.1%} exceeds 5% threshold "
f"(n_ok={n_ok} n_err={n_err} n_skip={n_skip} of {len(universe_tickers)})"
)

# ── 3. Write to S3 ───────────────────────────────────────────────────────
features_df = pd.DataFrame(store_rows)
Expand Down Expand Up @@ -513,7 +536,11 @@ def compute_and_write(
ContentType="application/json",
)
except Exception as _ver_exc:
log.debug("Schema version write failed (non-fatal): %s", _ver_exc)
# Schema version is metadata consumed by downstream drift detection.
# A failure here doesn't corrupt the features themselves, so we
# don't halt the pipeline — but we surface it as WARNING so the
# drift-check can't silently run against stale metadata.
log.warning("Schema version write failed (non-fatal): %s", _ver_exc)

log.info(
"Feature snapshot + registry written to s3://%s/%s%s/ (schema=%s)",
Expand Down
12 changes: 6 additions & 6 deletions weekly_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ def _run_phase1(config: dict, args: argparse.Namespace) -> dict:
if existing:
tickers = existing.get("tickers", [])
logger.info("Loaded %d tickers from existing constituents.json", len(tickers))
except Exception:
pass
except Exception as exc:
logger.warning("S3 constituents load failed — will fall back to Wikipedia: %s", exc)

# ── 2. Price cache refresh ───────────────────────────────────────────────
if only in (None, "prices"):
Expand Down Expand Up @@ -360,14 +360,14 @@ def _run_daily(config: dict, args: argparse.Namespace) -> dict:
if existing:
tickers = existing.get("tickers", [])
logger.info("Loaded %d tickers from S3 constituents", len(tickers))
except Exception:
pass
except Exception as exc:
logger.warning("S3 constituents load failed — will try Wikipedia fallback: %s", exc)
if not tickers:
try:
tickers, _, _, _, _ = constituents._fetch_constituents()
logger.info("Loaded %d tickers from Wikipedia (S3 fallback)", len(tickers))
except Exception:
pass
except Exception as exc:
logger.error("Wikipedia constituents fallback failed: %s", exc)

if not tickers:
logger.error("No tickers available for daily closes")
Expand Down
Loading