diff --git a/rag/db.py b/rag/db.py index 5980d62..84896ce 100644 --- a/rag/db.py +++ b/rag/db.py @@ -70,12 +70,20 @@ def execute_batch(sql: str, params_list: list[tuple]) -> None: def is_available() -> bool: - """Check if the RAG database is reachable. Never raises.""" + """Check if the RAG database is reachable. Never raises. + + NOTE (2026-04-14): currently has zero callers inside alpha-engine-data. + The ingestion pipelines call ``get_connection()`` directly, which + hard-fails on connect errors (correct behavior while the system is + unstable). Kept in the module in case retrieval-side consumers want + a non-raising probe; flag for deletion if still unused after the + cross-repo audit completes. + """ try: with get_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT 1") return True except Exception as e: - logger.debug("RAG database unavailable: %s", e) + logger.warning("RAG database unavailable: %s", e) return False diff --git a/rag/pipelines/ingest_8k_filings.py b/rag/pipelines/ingest_8k_filings.py index 44ed5a2..4f7fea0 100644 --- a/rag/pipelines/ingest_8k_filings.py +++ b/rag/pipelines/ingest_8k_filings.py @@ -149,7 +149,11 @@ def _download_and_extract(url: str) -> str | None: # 8-Ks are typically short; cap at 15K chars return text[:15000] if len(text) > 15000 else text except Exception as e: - logger.debug("8-K download failed from %s: %s", url, e) + # Per-URL download failure: visible at WARNING so the rate of + # failures can be monitored in SSM logs. Caller treats None as + # "skip this filing" and continues; aggregated across all 8-Ks + # the caller already reports counts, so there's no hidden drift. + logger.warning("8-K download failed from %s: %s", url, e) return None diff --git a/rag/pipelines/run_weekly_ingestion.sh b/rag/pipelines/run_weekly_ingestion.sh index 619631a..c6cf388 100755 --- a/rag/pipelines/run_weekly_ingestion.sh +++ b/rag/pipelines/run_weekly_ingestion.sh @@ -2,21 +2,25 @@ # rag/pipelines/run_weekly_ingestion.sh — Weekly RAG ingestion pipeline. # # Runs all ingestion pipelines in sequence: +# 0. Preflight — env vars + S3 reachability (hard-fails on miss) # 1. SEC filings (10-K/10-Q) — from signals universe, 2y lookback # 2. 8-K material events — from signals universe, 1y lookback # 3. Earnings transcripts (Finnhub) — from signals universe, latest 8 # 4. Thesis history — from research.db (incremental) # 5. Filing change detection — analyze consecutive filings # -# Intended to run on the Saturday spot instance alongside the backtester, -# or as a standalone cron job on the always-on EC2 instance. +# Intended to run on the Saturday Step Function via SSM on the always-on +# EC2 instance. `set -euo pipefail` plus no `|| echo "non-fatal"` +# swallowers means any ingestion failure aborts the script with a +# non-zero exit, surfaces in SSM logs, fails the Step Function, and +# fires flow-doctor. # # Usage: # bash rag/pipelines/run_weekly_ingestion.sh # full run # bash rag/pipelines/run_weekly_ingestion.sh --dry-run # preview only # -# Prerequisites: -# - .env with RAG_DATABASE_URL, VOYAGE_API_KEY, FINNHUB_API_KEY +# Prerequisites (verified by step 0): +# - .env with RAG_DATABASE_URL, VOYAGE_API_KEY, FINNHUB_API_KEY, EDGAR_IDENTITY # - research.db available locally or fetchable from S3 set -euo pipefail @@ -46,41 +50,38 @@ echo "========================================" echo "RAG Weekly Ingestion — $(date -u '+%Y-%m-%d %H:%M UTC')" echo "========================================" +# ── Step 0: Preflight — fail fast on env / connectivity drift ──────────────── +echo "" +echo "==> Step 0/5: Preflight checks..." +python -m rag.preflight + # ── Step 1: SEC filings (10-K/10-Q) ───────────────────────────────────────── echo "" echo "==> Step 1/5: SEC filings (10-K/10-Q)..." -python -m rag.pipelines.ingest_sec_filings --from-signals --lookback-years 2 $DRY_RUN 2>&1 || \ - echo " WARNING: SEC filing ingestion failed (non-fatal)" +python -m rag.pipelines.ingest_sec_filings --from-signals --lookback-years 2 $DRY_RUN # ── Step 2: 8-K material events ───────────────────────────────────────────── echo "" echo "==> Step 2/5: 8-K material events..." -python -m rag.pipelines.ingest_8k_filings --from-signals --lookback-days 365 $DRY_RUN 2>&1 || \ - echo " WARNING: 8-K ingestion failed (non-fatal)" +python -m rag.pipelines.ingest_8k_filings --from-signals --lookback-days 365 $DRY_RUN # ── Step 3: Earnings transcripts (Finnhub) ────────────────────────────────── +# FINNHUB_API_KEY is verified by preflight; no runtime skip branch. echo "" echo "==> Step 3/5: Earnings transcripts (Finnhub)..." -if [ -n "${FINNHUB_API_KEY:-}" ]; then - python -m rag.pipelines.ingest_earnings_finnhub --from-signals --max-per-ticker 8 $DRY_RUN 2>&1 || \ - echo " WARNING: Finnhub transcript ingestion failed (non-fatal)" -else - echo " SKIPPED: FINNHUB_API_KEY not set" -fi +python -m rag.pipelines.ingest_earnings_finnhub --from-signals --max-per-ticker 8 $DRY_RUN # ── Step 4: Thesis history (v2 quant/qual from signals.json) ───────────────── echo "" echo "==> Step 4/5: Thesis history..." SINCE=$(date -u -d '14 days ago' '+%Y-%m-%d' 2>/dev/null || date -u -v-14d '+%Y-%m-%d') -python -m rag.pipelines.ingest_theses --signals --since "$SINCE" $DRY_RUN 2>&1 || \ - echo " WARNING: Thesis ingestion failed (non-fatal)" +python -m rag.pipelines.ingest_theses --signals --since "$SINCE" $DRY_RUN # ── Step 5: Filing change detection ────────────────────────────────────────── echo "" echo "==> Step 5/5: Filing change detection..." if [ -z "$DRY_RUN" ]; then - python -m rag.pipelines.filing_change_detection --output-s3 2>&1 || \ - echo " WARNING: Filing change detection failed (non-fatal)" + python -m rag.pipelines.filing_change_detection --output-s3 else echo " SKIPPED in dry-run mode" fi @@ -90,17 +91,19 @@ echo "========================================" echo "RAG Weekly Ingestion Complete — $(date -u '+%Y-%m-%d %H:%M UTC')" echo "========================================" -# Emit CloudWatch heartbeat on successful completion +# Emit CloudWatch heartbeat on successful completion. No `|| echo` swallow +# — a broken heartbeat emission is a real signal worth escalating. aws cloudwatch put-metric-data \ --namespace "AlphaEngine" \ --metric-name "Heartbeat" \ --dimensions "Process=rag-ingestion" \ --value 1 --unit "Count" \ - --region "${AWS_REGION:-us-east-1}" 2>/dev/null \ - && echo "Heartbeat emitted: rag-ingestion" \ - || echo "WARNING: Failed to emit heartbeat (non-fatal)" + --region "${AWS_REGION:-us-east-1}" +echo "Heartbeat emitted: rag-ingestion" -# Send completion email +# Send completion email. With `set -euo pipefail` active, reaching this +# point means all 5 pipelines succeeded — the hardcoded 'ok' statuses +# are truthful rather than aspirational. PYTHON_BIN="${PYTHON_BIN:-python}" $PYTHON_BIN -c " from emailer import send_step_email @@ -121,4 +124,4 @@ results = { }, } send_step_email('RAG Ingestion', results, date_str) -" 2>&1 || echo "WARNING: RAG completion email failed (non-fatal)" +" diff --git a/rag/preflight.py b/rag/preflight.py new file mode 100644 index 0000000..b386f0c --- /dev/null +++ b/rag/preflight.py @@ -0,0 +1,70 @@ +""" +RAG weekly ingestion preflight. + +Called at the top of ``run_weekly_ingestion.sh`` before any of the five +ingestion pipelines run. Mirrors the DataPreflight pattern: +connectivity + env-var checks that fail fast and loud instead of +letting the Saturday pipeline run to "success" with empty RAG tables. + +Run via ``python -m rag.preflight`` (or equivalent) — raises on any +missing requirement. Flow-doctor picks up the raised exception and +fires an email + GitHub issue. +""" + +from __future__ import annotations + +import logging +import os +import sys + +from alpha_engine_lib.logging import setup_logging +from alpha_engine_lib.preflight import BasePreflight + +log = logging.getLogger(__name__) + + +class RAGPreflight(BasePreflight): + """Preflight for the RAG weekly ingestion pipeline. + + Required env vars: + - ``AWS_REGION`` — S3 client region (matches other modules) + - ``VOYAGE_API_KEY`` — embedding provider for all 5 pipelines + - ``FINNHUB_API_KEY`` — earnings transcript ingestion (step 3) + - ``EDGAR_IDENTITY`` — SEC EDGAR User-Agent for filings (steps 1, 2) + - ``RAG_DATABASE_URL`` — postgres+pgvector connection string (all pipelines) + + Required S3 access: + - bucket reachable for `alpha-engine-research` + """ + + def __init__(self, bucket: str): + super().__init__(bucket) + + def run(self) -> None: + self.check_env_vars( + "AWS_REGION", + "VOYAGE_API_KEY", + "FINNHUB_API_KEY", + "EDGAR_IDENTITY", + "RAG_DATABASE_URL", + ) + self.check_s3_bucket() + + +def main() -> int: + """CLI entrypoint invoked by run_weekly_ingestion.sh.""" + setup_logging( + "rag-preflight", + flow_doctor_yaml=os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + "flow-doctor.yaml", + ), + ) + bucket = os.environ.get("ALPHA_ENGINE_BUCKET", "alpha-engine-research") + RAGPreflight(bucket).run() + log.info("RAG pre-flight OK") + return 0 + + +if __name__ == "__main__": + sys.exit(main())