Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions rag/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,20 @@ def execute_batch(sql: str, params_list: list[tuple]) -> None:


def is_available() -> bool:
"""Check if the RAG database is reachable. Never raises."""
"""Check if the RAG database is reachable. Never raises.

NOTE (2026-04-14): currently has zero callers inside alpha-engine-data.
The ingestion pipelines call ``get_connection()`` directly, which
hard-fails on connect errors (correct behavior while the system is
unstable). Kept in the module in case retrieval-side consumers want
a non-raising probe; flag for deletion if still unused after the
cross-repo audit completes.
"""
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1")
return True
except Exception as e:
logger.debug("RAG database unavailable: %s", e)
logger.warning("RAG database unavailable: %s", e)
return False
6 changes: 5 additions & 1 deletion rag/pipelines/ingest_8k_filings.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ def _download_and_extract(url: str) -> str | None:
# 8-Ks are typically short; cap at 15K chars
return text[:15000] if len(text) > 15000 else text
except Exception as e:
logger.debug("8-K download failed from %s: %s", url, e)
# Per-URL download failure: visible at WARNING so the rate of
# failures can be monitored in SSM logs. Caller treats None as
# "skip this filing" and continues; aggregated across all 8-Ks
# the caller already reports counts, so there's no hidden drift.
logger.warning("8-K download failed from %s: %s", url, e)
return None


Expand Down
51 changes: 27 additions & 24 deletions rag/pipelines/run_weekly_ingestion.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@
# rag/pipelines/run_weekly_ingestion.sh — Weekly RAG ingestion pipeline.
#
# Runs all ingestion pipelines in sequence:
# 0. Preflight — env vars + S3 reachability (hard-fails on miss)
# 1. SEC filings (10-K/10-Q) — from signals universe, 2y lookback
# 2. 8-K material events — from signals universe, 1y lookback
# 3. Earnings transcripts (Finnhub) — from signals universe, latest 8
# 4. Thesis history — from research.db (incremental)
# 5. Filing change detection — analyze consecutive filings
#
# Intended to run on the Saturday spot instance alongside the backtester,
# or as a standalone cron job on the always-on EC2 instance.
# Intended to run on the Saturday Step Function via SSM on the always-on
# EC2 instance. `set -euo pipefail` plus no `|| echo "non-fatal"`
# swallowers means any ingestion failure aborts the script with a
# non-zero exit, surfaces in SSM logs, fails the Step Function, and
# fires flow-doctor.
#
# Usage:
# bash rag/pipelines/run_weekly_ingestion.sh # full run
# bash rag/pipelines/run_weekly_ingestion.sh --dry-run # preview only
#
# Prerequisites:
# - .env with RAG_DATABASE_URL, VOYAGE_API_KEY, FINNHUB_API_KEY
# Prerequisites (verified by step 0):
# - .env with RAG_DATABASE_URL, VOYAGE_API_KEY, FINNHUB_API_KEY, EDGAR_IDENTITY
# - research.db available locally or fetchable from S3

set -euo pipefail
Expand Down Expand Up @@ -46,41 +50,38 @@ echo "========================================"
echo "RAG Weekly Ingestion — $(date -u '+%Y-%m-%d %H:%M UTC')"
echo "========================================"

# ── Step 0: Preflight — fail fast on env / connectivity drift ────────────────
echo ""
echo "==> Step 0/5: Preflight checks..."
python -m rag.preflight

# ── Step 1: SEC filings (10-K/10-Q) ─────────────────────────────────────────
echo ""
echo "==> Step 1/5: SEC filings (10-K/10-Q)..."
python -m rag.pipelines.ingest_sec_filings --from-signals --lookback-years 2 $DRY_RUN 2>&1 || \
echo " WARNING: SEC filing ingestion failed (non-fatal)"
python -m rag.pipelines.ingest_sec_filings --from-signals --lookback-years 2 $DRY_RUN

# ── Step 2: 8-K material events ─────────────────────────────────────────────
echo ""
echo "==> Step 2/5: 8-K material events..."
python -m rag.pipelines.ingest_8k_filings --from-signals --lookback-days 365 $DRY_RUN 2>&1 || \
echo " WARNING: 8-K ingestion failed (non-fatal)"
python -m rag.pipelines.ingest_8k_filings --from-signals --lookback-days 365 $DRY_RUN

# ── Step 3: Earnings transcripts (Finnhub) ──────────────────────────────────
# FINNHUB_API_KEY is verified by preflight; no runtime skip branch.
echo ""
echo "==> Step 3/5: Earnings transcripts (Finnhub)..."
if [ -n "${FINNHUB_API_KEY:-}" ]; then
python -m rag.pipelines.ingest_earnings_finnhub --from-signals --max-per-ticker 8 $DRY_RUN 2>&1 || \
echo " WARNING: Finnhub transcript ingestion failed (non-fatal)"
else
echo " SKIPPED: FINNHUB_API_KEY not set"
fi
python -m rag.pipelines.ingest_earnings_finnhub --from-signals --max-per-ticker 8 $DRY_RUN

# ── Step 4: Thesis history (v2 quant/qual from signals.json) ─────────────────
echo ""
echo "==> Step 4/5: Thesis history..."
SINCE=$(date -u -d '14 days ago' '+%Y-%m-%d' 2>/dev/null || date -u -v-14d '+%Y-%m-%d')
python -m rag.pipelines.ingest_theses --signals --since "$SINCE" $DRY_RUN 2>&1 || \
echo " WARNING: Thesis ingestion failed (non-fatal)"
python -m rag.pipelines.ingest_theses --signals --since "$SINCE" $DRY_RUN

# ── Step 5: Filing change detection ──────────────────────────────────────────
echo ""
echo "==> Step 5/5: Filing change detection..."
if [ -z "$DRY_RUN" ]; then
python -m rag.pipelines.filing_change_detection --output-s3 2>&1 || \
echo " WARNING: Filing change detection failed (non-fatal)"
python -m rag.pipelines.filing_change_detection --output-s3
else
echo " SKIPPED in dry-run mode"
fi
Expand All @@ -90,17 +91,19 @@ echo "========================================"
echo "RAG Weekly Ingestion Complete — $(date -u '+%Y-%m-%d %H:%M UTC')"
echo "========================================"

# Emit CloudWatch heartbeat on successful completion
# Emit CloudWatch heartbeat on successful completion. No `|| echo` swallow
# — a broken heartbeat emission is a real signal worth escalating.
aws cloudwatch put-metric-data \
--namespace "AlphaEngine" \
--metric-name "Heartbeat" \
--dimensions "Process=rag-ingestion" \
--value 1 --unit "Count" \
--region "${AWS_REGION:-us-east-1}" 2>/dev/null \
&& echo "Heartbeat emitted: rag-ingestion" \
|| echo "WARNING: Failed to emit heartbeat (non-fatal)"
--region "${AWS_REGION:-us-east-1}"
echo "Heartbeat emitted: rag-ingestion"

# Send completion email
# Send completion email. With `set -euo pipefail` active, reaching this
# point means all 5 pipelines succeeded — the hardcoded 'ok' statuses
# are truthful rather than aspirational.
PYTHON_BIN="${PYTHON_BIN:-python}"
$PYTHON_BIN -c "
from emailer import send_step_email
Expand All @@ -121,4 +124,4 @@ results = {
},
}
send_step_email('RAG Ingestion', results, date_str)
" 2>&1 || echo "WARNING: RAG completion email failed (non-fatal)"
"
70 changes: 70 additions & 0 deletions rag/preflight.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
RAG weekly ingestion preflight.

Called at the top of ``run_weekly_ingestion.sh`` before any of the five
ingestion pipelines run. Mirrors the DataPreflight pattern:
connectivity + env-var checks that fail fast and loud instead of
letting the Saturday pipeline run to "success" with empty RAG tables.

Run via ``python -m rag.preflight`` (or equivalent) — raises on any
missing requirement. Flow-doctor picks up the raised exception and
fires an email + GitHub issue.
"""

from __future__ import annotations

import logging
import os
import sys

from alpha_engine_lib.logging import setup_logging
from alpha_engine_lib.preflight import BasePreflight

log = logging.getLogger(__name__)


class RAGPreflight(BasePreflight):
"""Preflight for the RAG weekly ingestion pipeline.

Required env vars:
- ``AWS_REGION`` — S3 client region (matches other modules)
- ``VOYAGE_API_KEY`` — embedding provider for all 5 pipelines
- ``FINNHUB_API_KEY`` — earnings transcript ingestion (step 3)
- ``EDGAR_IDENTITY`` — SEC EDGAR User-Agent for filings (steps 1, 2)
- ``RAG_DATABASE_URL`` — postgres+pgvector connection string (all pipelines)

Required S3 access:
- bucket reachable for `alpha-engine-research`
"""

def __init__(self, bucket: str):
super().__init__(bucket)

def run(self) -> None:
self.check_env_vars(
"AWS_REGION",
"VOYAGE_API_KEY",
"FINNHUB_API_KEY",
"EDGAR_IDENTITY",
"RAG_DATABASE_URL",
)
self.check_s3_bucket()


def main() -> int:
"""CLI entrypoint invoked by run_weekly_ingestion.sh."""
setup_logging(
"rag-preflight",
flow_doctor_yaml=os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"flow-doctor.yaml",
),
)
bucket = os.environ.get("ALPHA_ENGINE_BUCKET", "alpha-engine-research")
RAGPreflight(bucket).run()
log.info("RAG pre-flight OK")
return 0


if __name__ == "__main__":
sys.exit(main())
Loading