From a3e92989c860fff9e9ed564a13e416d174b028fe Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Tue, 14 Apr 2026 08:03:28 -0700 Subject: [PATCH] Convert silent fails in daily production path to hard fails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to PR #24 (daily_append). Audits the rest of the alpha-engine-data daily production path for the same "except Exception: log.debug / pass" pattern that masked ArcticDB staleness for two days. Scope limited to files on the DailyData + feature-store write path; RAG ingestion, emailer, and fundamentals scan deferred to a later sweep. features/compute.py - `_load_daily_closes_delta`: per-date NoSuchKey upgraded to WARNING; every other S3 exception raises; raise if ALL dates in the business-day range were missing (the fingerprint of an upstream daily_closes outage). - Per-ticker feature computation failure: log.debug → log.warning; new RuntimeError if `n_err / len(universe_tickers) > 5%`, matching daily_append. - Empty store_rows now raises instead of returning `status=error` (status return is legacy; raising is consistent with hard-fail). - `_load_cached_alternative` outer except: log.debug → log.warning so auth / network failures surface even when "no alt data" is expected. - Schema version write: log.debug → log.warning with a comment explaining why this one stays non-raising (drift-check metadata, not feature data). collectors/daily_closes.py - head_object idempotency check: bare `except Exception: pass` → catch only ClientError with 404/NoSuchKey. Auth / throttling now raises instead of silently proceeding. - Per-ticker yfinance extract: log.debug → log.warning so partial yfinance failures are visible in the daily log. collectors/macro.py - `load_from_s3`: pointer-missing still returns None (expected), but every other error now raises instead of masquerading as "no data." weekly_collector.py - S3 constituents load fallback: bare `except Exception: pass` → warning with context. The Wikipedia fallback remains (legitimate failover). - Wikipedia constituents failure: bare `except Exception: pass` → ERROR log. The downstream `if not tickers` guard already hard-fails. Out of scope (tracked for follow-up audit) - rag/* (SEC / 8-K / earnings / theses ingestion) - emailer.py `except Exception: pass` in finalize email - features/compute.py fundamentals/alternative per-key fallback chains - collectors/fundamentals.py per-ticker fetch log.debug Dead code flagged for later removal - features/reader.py — read_feature_snapshot / read_feature_range / latest_available_date / read_registry. No callers remain inside alpha-engine-data. Consumers in sibling repos (predictor / backtester / research) will migrate away as the ArcticDB cutover completes; safe to delete once the cross-repo migration is confirmed clean. Tests: 41/41 pass. No new tests added — the hard-fail paths are essentially `if cond: raise` and are better exercised by the existing integration test suite against a live S3 bucket (follow-up). Co-Authored-By: Claude Opus 4.6 (1M context) --- collectors/daily_closes.py | 12 ++++-- collectors/macro.py | 17 +++++---- features/compute.py | 75 ++++++++++++++++++++++++++------------ weekly_collector.py | 12 +++--- 4 files changed, 76 insertions(+), 40 deletions(-) diff --git a/collectors/daily_closes.py b/collectors/daily_closes.py index 08fc21d..88b5327 100644 --- a/collectors/daily_closes.py +++ b/collectors/daily_closes.py @@ -53,12 +53,18 @@ def collect( # Check if already written for this date key = f"{s3_prefix}{run_date}.parquet" if not dry_run: + from botocore.exceptions import ClientError try: s3.head_object(Bucket=bucket, Key=key) logger.info("Daily closes already exist for %s — skipping", run_date) return {"status": "ok", "tickers_captured": 0, "skipped": True} - except Exception: - pass # Not found — proceed + except ClientError as exc: + err_code = exc.response.get("Error", {}).get("Code") + if err_code not in ("404", "NoSuchKey"): + # Auth failure, throttling, or network — not "file doesn't exist". + # Don't silently paper over it. + raise + # 404/NoSuchKey: expected case — file doesn't exist, proceed to write. if not tickers: return {"status": "error", "error": "no tickers provided"} @@ -206,7 +212,7 @@ def _fetch_yfinance_closes( }) count += 1 except Exception as e: - logger.debug("yfinance close extract failed for %s: %s", ticker, e) + logger.warning("yfinance close extract failed for %s: %s", ticker, e) except Exception as e: logger.warning("yfinance batch failed: %s", e) diff --git a/collectors/macro.py b/collectors/macro.py index 5ba7145..31f7a53 100644 --- a/collectors/macro.py +++ b/collectors/macro.py @@ -298,15 +298,18 @@ def _compute_market_breadth(price_data: dict[str, pd.DataFrame]) -> dict: def load_from_s3(bucket: str, s3_prefix: str = "market_data/") -> dict | None: - """Load the latest macro.json from S3. Returns None if not found.""" + """Load the latest macro.json from S3. Returns None if the pointer is missing; raises on unexpected errors.""" + from botocore.exceptions import ClientError s3 = boto3.client("s3") try: resp = s3.get_object(Bucket=bucket, Key=f"{s3_prefix}latest_weekly.json") - pointer = json.loads(resp["Body"].read()) - date = pointer.get("date") - if not date: + except ClientError as exc: + if exc.response.get("Error", {}).get("Code") in ("404", "NoSuchKey"): return None - resp = s3.get_object(Bucket=bucket, Key=f"{s3_prefix}weekly/{date}/macro.json") - return json.loads(resp["Body"].read()) - except Exception: + raise + pointer = json.loads(resp["Body"].read()) + date = pointer.get("date") + if not date: return None + resp = s3.get_object(Bucket=bucket, Key=f"{s3_prefix}weekly/{date}/macro.json") + return json.loads(resp["Body"].read()) diff --git a/features/compute.py b/features/compute.py index d3e1d61..7d89445 100644 --- a/features/compute.py +++ b/features/compute.py @@ -122,30 +122,45 @@ def _load_delta_from_daily_closes( ticker_rows: dict[str, list[dict]] = {} + n_missing_dates = 0 for d in delta_dates: key = f"predictor/daily_closes/{d}.parquet" try: obj = s3.get_object(Bucket=bucket, Key=key) - buf = io.BytesIO(obj["Body"].read()) - day_df = pd.read_parquet(buf, engine="pyarrow") - # daily_closes: index=ticker (str), columns=[date, Open, High, Low, Close, Adj_Close, Volume] - for ticker, row in day_df.iterrows(): - if ticker not in ticker_rows: - ticker_rows[ticker] = [] - ticker_rows[ticker].append({ - "date": pd.Timestamp(d), - "Open": float(row.get("Open", np.nan)), - "High": float(row.get("High", np.nan)), - "Low": float(row.get("Low", np.nan)), - "Close": float(row.get("Close", np.nan)), - "Volume": int(row.get("Volume", 0)), - }) - except Exception: - log.debug("daily_closes/%s.parquet not found in S3 (non-trading day?)", d) + except s3.exceptions.NoSuchKey: + # Market holiday within the business-day range (e.g., Good Friday). + log.warning("daily_closes/%s.parquet missing (market holiday?)", d) + n_missing_dates += 1 + continue + except Exception as exc: + raise RuntimeError( + f"Unexpected S3 error reading daily_closes/{d}.parquet: {exc}" + ) from exc + buf = io.BytesIO(obj["Body"].read()) + day_df = pd.read_parquet(buf, engine="pyarrow") + for ticker, row in day_df.iterrows(): + if ticker not in ticker_rows: + ticker_rows[ticker] = [] + ticker_rows[ticker].append({ + "date": pd.Timestamp(d), + "Open": float(row.get("Open", np.nan)), + "High": float(row.get("High", np.nan)), + "Low": float(row.get("Low", np.nan)), + "Close": float(row.get("Close", np.nan)), + "Volume": int(row.get("Volume", 0)), + }) n_tickers = len(ticker_rows) n_rows = sum(len(v) for v in ticker_rows.values()) - log.info("Delta loaded: %d rows across %d tickers", n_rows, n_tickers) + log.info( + "Delta loaded: %d rows across %d tickers (%d/%d dates missing)", + n_rows, n_tickers, n_missing_dates, len(delta_dates), + ) + if delta_dates and n_missing_dates == len(delta_dates): + raise RuntimeError( + f"Every date in delta range was missing ({len(delta_dates)} dates) — " + "daily_closes writer is likely broken upstream" + ) return ticker_rows @@ -353,7 +368,7 @@ def _load_cached_alternative(s3, bucket: str) -> dict[str, dict]: return alt_data except Exception as exc: - log.debug("No cached alternative data available: %s", exc) + log.warning("No cached alternative data loaded — alternative features will use defaults: %s", exc) return {} @@ -460,18 +475,26 @@ def compute_and_write( n_ok += 1 except Exception as exc: - log.debug("Feature computation failed for %s: %s", ticker, exc) + log.warning("Feature computation failed for %s: %s", ticker, exc) n_err += 1 t_compute = time.time() - t0 - t_load log.info( - "Feature computation complete in %.1fs: %d OK, %d skipped, %d errors", - t_compute, n_ok, n_skip, n_err, + "Feature computation complete in %.1fs: %d OK, %d skipped, %d errors " + "(of %d universe tickers)", + t_compute, n_ok, n_skip, n_err, len(universe_tickers), ) if not store_rows: - log.error("No features computed — nothing to write") - return {"status": "error", "error": "no_features_computed"} + raise RuntimeError( + "Feature store compute produced zero features — nothing to write" + ) + + if universe_tickers and n_err / len(universe_tickers) > 0.05: + raise RuntimeError( + f"Feature computation error rate {n_err / len(universe_tickers):.1%} exceeds 5% threshold " + f"(n_ok={n_ok} n_err={n_err} n_skip={n_skip} of {len(universe_tickers)})" + ) # ── 3. Write to S3 ─────────────────────────────────────────────────────── features_df = pd.DataFrame(store_rows) @@ -513,7 +536,11 @@ def compute_and_write( ContentType="application/json", ) except Exception as _ver_exc: - log.debug("Schema version write failed (non-fatal): %s", _ver_exc) + # Schema version is metadata consumed by downstream drift detection. + # A failure here doesn't corrupt the features themselves, so we + # don't halt the pipeline — but we surface it as WARNING so the + # drift-check can't silently run against stale metadata. + log.warning("Schema version write failed (non-fatal): %s", _ver_exc) log.info( "Feature snapshot + registry written to s3://%s/%s%s/ (schema=%s)", diff --git a/weekly_collector.py b/weekly_collector.py index 5283090..2d713d3 100644 --- a/weekly_collector.py +++ b/weekly_collector.py @@ -127,8 +127,8 @@ def _run_phase1(config: dict, args: argparse.Namespace) -> dict: if existing: tickers = existing.get("tickers", []) logger.info("Loaded %d tickers from existing constituents.json", len(tickers)) - except Exception: - pass + except Exception as exc: + logger.warning("S3 constituents load failed — will fall back to Wikipedia: %s", exc) # ── 2. Price cache refresh ─────────────────────────────────────────────── if only in (None, "prices"): @@ -360,14 +360,14 @@ def _run_daily(config: dict, args: argparse.Namespace) -> dict: if existing: tickers = existing.get("tickers", []) logger.info("Loaded %d tickers from S3 constituents", len(tickers)) - except Exception: - pass + except Exception as exc: + logger.warning("S3 constituents load failed — will try Wikipedia fallback: %s", exc) if not tickers: try: tickers, _, _, _, _ = constituents._fetch_constituents() logger.info("Loaded %d tickers from Wikipedia (S3 fallback)", len(tickers)) - except Exception: - pass + except Exception as exc: + logger.error("Wikipedia constituents fallback failed: %s", exc) if not tickers: logger.error("No tickers available for daily closes")