diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f3a284d..15cc2c3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,6 +17,16 @@ jobs: with: python-version: "3.12" + - name: Configure git auth for private alpha-engine-lib + env: + ALPHA_ENGINE_LIB_TOKEN: ${{ secrets.ALPHA_ENGINE_LIB_TOKEN }} + run: | + if [ -z "$ALPHA_ENGINE_LIB_TOKEN" ]; then + echo "::error::ALPHA_ENGINE_LIB_TOKEN secret not set — required to install private alpha-engine-lib" + exit 1 + fi + git config --global url."https://x-access-token:${ALPHA_ENGINE_LIB_TOKEN}@github.com/cipher813/alpha-engine-lib".insteadOf "https://github.com/cipher813/alpha-engine-lib" + - name: Install dependencies run: | pip install --upgrade pip diff --git a/log_config.py b/log_config.py deleted file mode 100644 index 393a80e..0000000 --- a/log_config.py +++ /dev/null @@ -1,95 +0,0 @@ -""" -Structured logging + Flow Doctor integration. - -JSON mode activates when ALPHA_ENGINE_JSON_LOGS=1. -Text mode (default) preserves human-readable format for local dev. - -Flow Doctor integration owns the single shared FlowDoctor instance for the -alpha-engine-data process. Entrypoints call ``setup_logging("data-collector")`` -exactly once at startup; subsequent call sites retrieve the instance via -``get_flow_doctor()`` rather than re-initializing. - -Flow Doctor activates only when FLOW_DOCTOR_ENABLED=1 (set on EC2 via the -DailyData / DataPhase1 SSM commands or the ``.alpha-engine.env``). Local -dev runs do not fire flow-doctor notifications. Logs at ERROR level or -above (including exceptions raised out of the hardened collectors) are -captured by the FlowDoctorHandler and dispatched per ``flow-doctor.yaml``. -""" - -from __future__ import annotations - -import json -import logging -import os -from datetime import datetime, timezone -from typing import Optional - -_FLOW_DOCTOR_YAML_PATH = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "flow-doctor.yaml" -) - -# Singleton — populated once by setup_logging() and retrieved by call sites -# via get_flow_doctor(). None until setup_logging() runs with -# FLOW_DOCTOR_ENABLED=1, or if the init itself failed. -_fd_instance: Optional[object] = None - - -class JSONFormatter(logging.Formatter): - def format(self, record: logging.LogRecord) -> str: - entry = { - "ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(), - "level": record.levelname, - "module": record.module, - "func": record.funcName, - "msg": record.getMessage(), - } - if record.exc_info and record.exc_info[0] is not None: - entry["exc"] = self.formatException(record.exc_info) - if hasattr(record, "ctx"): - entry["ctx"] = record.ctx - return json.dumps(entry, default=str) - - -def get_flow_doctor(): - """Return the shared flow-doctor instance, or None if not initialized.""" - return _fd_instance - - -def _attach_flow_doctor(name: str) -> None: - """Initialize the shared flow-doctor instance and attach a log handler. - - Mirrors the executor's integration (alpha-engine/executor/log_config.py). - An ERROR-level log or an uncaught exception propagated through the - logging system fires the FlowDoctorHandler, which dispatches per the - yaml config (email + GitHub issue with dedup + rate limits). - """ - global _fd_instance - import flow_doctor - _fd_instance = flow_doctor.init(config_path=_FLOW_DOCTOR_YAML_PATH) - handler = flow_doctor.FlowDoctorHandler(_fd_instance, level=logging.ERROR) - logging.getLogger().addHandler(handler) - - -def setup_logging(name: str = "data-collector") -> None: - """ - Configure root logger for alpha-engine-data entrypoints. - - JSON mode: ALPHA_ENGINE_JSON_LOGS=1 (for EC2 / production) - Text mode: default (for local dev / dry-run) - Flow Doctor: FLOW_DOCTOR_ENABLED=1 (for EC2 / production) - """ - json_mode = os.environ.get("ALPHA_ENGINE_JSON_LOGS", "0") == "1" - handler = logging.StreamHandler() - if json_mode: - handler.setFormatter(JSONFormatter()) - else: - handler.setFormatter(logging.Formatter( - f"%(asctime)s %(levelname)s [{name}] %(message)s" - )) - root = logging.getLogger() - root.handlers.clear() - root.addHandler(handler) - root.setLevel(logging.INFO) - - if os.environ.get("FLOW_DOCTOR_ENABLED", "0") == "1": - _attach_flow_doctor(name) diff --git a/preflight.py b/preflight.py new file mode 100644 index 0000000..3fcd62d --- /dev/null +++ b/preflight.py @@ -0,0 +1,48 @@ +""" +Data-module preflight: connectivity + freshness checks run at the top of +``weekly_collector.main()`` before any real collection work starts. + +Primitives live in ``alpha_engine_lib.preflight.BasePreflight``; this +module only composes them into a mode-specific sequence. See the +alpha-engine-lib README for the rationale and the 2026-04-14 failure +mode that motivated the library. +""" + +from __future__ import annotations + +from alpha_engine_lib.preflight import BasePreflight + + +class DataPreflight(BasePreflight): + """Preflight checks for the alpha-engine-data entrypoint. + + Mode determines which external services must be reachable: + + - ``"daily"`` — weekday DailyData step. ArcticDB must be readable + and SPY must be ≤4 days stale (covers Fri→Tue long weekends + + 1 day of buffer). + - ``"phase1"`` — Saturday DataPhase1. External APIs (FRED, polygon) + needed; no ArcticDB freshness check (phase1 is what *populates* + ArcticDB). + - ``"phase2"`` — Saturday DataPhase2. FMP + SEC EDGAR needed. + """ + + def __init__(self, bucket: str, mode: str): + super().__init__(bucket) + if mode not in ("daily", "phase1", "phase2"): + raise ValueError(f"DataPreflight: unknown mode {mode!r}") + self.mode = mode + + def run(self) -> None: + self.check_env_vars("AWS_REGION") + if self.mode == "phase1": + self.check_env_vars("FRED_API_KEY", "POLYGON_API_KEY") + elif self.mode == "phase2": + self.check_env_vars("FMP_API_KEY", "EDGAR_IDENTITY") + + self.check_s3_bucket() + + if self.mode == "daily": + # 4-day threshold would have caught the 2026-04-14 bug + # (ArcticDB silently not writing) by 2026-04-17. + self.check_arcticdb_fresh("universe", "SPY", max_stale_days=4) diff --git a/requirements.txt b/requirements.txt index 6e324ff..b33774f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,4 +13,5 @@ psycopg2-binary>=2.9 voyageai>=0.3 jsonschema>=4.20 arcticdb>=6.11 -flow-doctor[diagnosis]>=0.3.0,<0.4.0 +# flow-doctor is pulled in transitively via alpha-engine-lib[flow_doctor]. +alpha-engine-lib[arcticdb,flow_doctor] @ git+https://github.com/cipher813/alpha-engine-lib@v0.1.0 diff --git a/weekly_collector.py b/weekly_collector.py index 2d713d3..4615569 100644 --- a/weekly_collector.py +++ b/weekly_collector.py @@ -647,11 +647,21 @@ def main() -> None: args = _parse_args() _load_dotenv() - from log_config import setup_logging - setup_logging("data-collector") + from alpha_engine_lib.logging import setup_logging + setup_logging( + "data-collector", + flow_doctor_yaml=str(Path(__file__).parent / "flow-doctor.yaml"), + ) logging.getLogger().setLevel(getattr(logging, args.log_level)) config = load_config(args.config) + + # Pre-flight: fail fast on env / connectivity drift before starting + # the real collection work. See alpha-engine-lib/README.md. + from preflight import DataPreflight + mode = "daily" if args.daily else f"phase{args.phase or 1}" + DataPreflight(config["bucket"], mode).run() + results = run_weekly(config, args) # Hard-fail on any non-ok status — strict form of the no-silent-fails