diff --git a/infrastructure/lambdas/freshness-monitor/deploy.sh b/infrastructure/lambdas/freshness-monitor/deploy.sh index 81b07f0..0fd433a 100755 --- a/infrastructure/lambdas/freshness-monitor/deploy.sh +++ b/infrastructure/lambdas/freshness-monitor/deploy.sh @@ -31,6 +31,7 @@ FUNCTION_NAME="alpha-engine-freshness-monitor" ROLE_NAME="alpha-engine-freshness-monitor-role" POLICY_NAME="alpha-engine-freshness-monitor-policy" RULE_NAME="alpha-engine-freshness-monitor-cron" +HISTORICAL_RULE_NAME="alpha-engine-freshness-monitor-historical-cron" REGION="${AWS_REGION:-us-east-1}" ACCOUNT_ID="${ACCOUNT_ID:-711398986525}" @@ -200,6 +201,33 @@ if $BOOTSTRAP; then --principal events.amazonaws.com \ --source-arn "${RULE_ARN}" \ --region "${REGION}" 2>/dev/null || true + + # Historical-mode cron: daily at 04:00 UTC, off-peak. Fires the same + # Lambda with event={"mode": "historical"} so it probes the last N + # cycles of each artifact and writes _freshness_monitor/history.json + # (page 26 reads this for per-row history expanders + gap counts). + # Lookback defaults: 12 saturday + 30 weekday/eod cycles. + echo " Creating EventBridge historical cron: ${HISTORICAL_RULE_NAME}" + run aws events put-rule \ + --name "${HISTORICAL_RULE_NAME}" \ + --schedule-expression "cron(0 4 * * ? *)" \ + --description "Daily 04:00 UTC historical-cycle probe (mode=historical)" \ + --region "${REGION}" \ + --query 'RuleArn' --output text + + run aws events put-targets \ + --rule "${HISTORICAL_RULE_NAME}" \ + --targets "Id=1,Arn=${FN_ARN},Input={\"mode\":\"historical\"}" \ + --region "${REGION}" + + HIST_RULE_ARN="arn:aws:events:${REGION}:${ACCOUNT_ID}:rule/${HISTORICAL_RULE_NAME}" + run aws lambda add-permission \ + --function-name "${FUNCTION_NAME}" \ + --statement-id "eventbridge-${HISTORICAL_RULE_NAME}" \ + --action lambda:InvokeFunction \ + --principal events.amazonaws.com \ + --source-arn "${HIST_RULE_ARN}" \ + --region "${REGION}" 2>/dev/null || true fi # ----- 3. Update function code (always after bootstrap, idempotent) --------- diff --git a/infrastructure/lambdas/freshness-monitor/index.py b/infrastructure/lambdas/freshness-monitor/index.py index 299e486..22dfd47 100644 --- a/infrastructure/lambdas/freshness-monitor/index.py +++ b/infrastructure/lambdas/freshness-monitor/index.py @@ -47,7 +47,7 @@ import logging import os import time -from datetime import date, datetime, timezone +from datetime import date, datetime, timedelta, timezone from typing import Any import boto3 @@ -73,6 +73,18 @@ ) HEARTBEAT_KEY = "_freshness_monitor/heartbeat.json" CHECK_RESULTS_KEY = "_freshness_monitor/check_results.json" +HISTORY_KEY = "_freshness_monitor/history.json" + +# Historical-mode lookback depth per cadence. Tunable via event payload +# (event["lookback"] = {"saturday_sf": 12, ...}). Defaults sized for ~3 +# months of history at ~negligible cost: 51 artifacts × 50 cycles ≈ +# 2,500 S3 HEAD requests per daily historical run ≈ $0.001/day. +_DEFAULT_LOOKBACK = { + "saturday_sf": 12, + "weekday_sf": 30, + "eod_sf": 30, + "continuous": 0, # current-state probe covers continuous artifacts +} # OBSERVE-mode gate. Plan §3 invariant 10 + §4 Phase 6 default. Anything # other than literal "true" (case-insensitive) suppresses alerts. Check @@ -309,13 +321,258 @@ def _maybe_alert(spec: ArtifactSpec, result: CheckResult, now: datetime) -> bool # ── Handler ───────────────────────────────────────────────────────────────── +# ── Historical-mode probe ─────────────────────────────────────────────────── +# +# Closes the gap surfaced 2026-05-28: the current-state probe answers +# "is the artifact present *now*?" but operators also need "did it +# land last weekend? the weekend before? are there gaps in the +# producer's history?" Filed per the same feedback memory +# [[feedback_observe_mode_unconditional_gates_govern_cutover]] — +# absence-of-artifact is the failure mode, and a single-cycle absence +# could be a false-positive (instance failure) where a multi-cycle gap +# is a real producer regression. +# +# Fires on a separate EventBridge cron (daily ~04:00 UTC, off-peak) +# via event={"mode": "historical"}. Writes +# s3://alpha-engine-research/_freshness_monitor/history.json which +# page 26 reads to surface per-artifact gap counts + per-row history +# expanders. +# +# Date resolution is intentionally simple (calendar-naive): +# - saturday_sf: last N calendar Saturdays +# - weekday_sf / eod_sf: last N Mon-Fri days +# NYSE holidays show up as false-positive "absent" days. Operators +# interpret them in context (or filter via the page 26 surface). When +# the holiday-aware backfill becomes worth the dependency lift, we +# can route via alpha_engine_lib.dates. + + +def _iter_historical_cycle_dates(cadence: str, now: datetime, count: int) -> list[date]: + """Return the N most recent cycle dates for the given cadence, + newest-first. Calendar-naive (NYSE holidays are not skipped). + + For ``saturday_sf``: previous ``count`` Saturdays strictly before + ``now.date()``. Today's Saturday is excluded if ``now`` is before + the typical 09:00 UTC cron — the caller is expected to invoke + historical mode after the current-state probe has had time to + cover the current cycle. + + For ``weekday_sf`` / ``eod_sf``: previous ``count`` Mon-Fri days + strictly before ``now.date()``. + """ + if count <= 0: + return [] + today = now.date() + dates: list[date] = [] + if cadence == "saturday_sf": + # Walk back day-by-day, collecting Saturdays. + d = today - timedelta(days=1) + while len(dates) < count: + if d.weekday() == 5: # Saturday + dates.append(d) + d -= timedelta(days=1) + elif cadence in {"weekday_sf", "eod_sf"}: + d = today - timedelta(days=1) + while len(dates) < count: + if d.weekday() < 5: # Mon-Fri + dates.append(d) + d -= timedelta(days=1) + # Other cadences: skip (continuous covered by current-state probe). + return dates + + +def _format_historical_key(template: str, target_date: date) -> str: + """Substitute date placeholders. Supports the same placeholders the + substrate's _format_key handles: ``{date}``, ``{trading_day}``. + ``{cycle_label}`` (fortnightly/quarterly buckets) is not historical- + probable from a single date, so artifacts using it are skipped. + """ + iso = target_date.isoformat() + return template.format(date=iso, trading_day=iso) + + +def _probe_historical( + s3_client: Any, + spec: ArtifactSpec, + cycle_dates: list[date], +) -> tuple[list[dict], bool]: + """Probe the last N cycles' keys for one artifact. Returns + ``(cycles, is_latest_pointer)``. Each ``cycles`` entry is a dict + with ``date``, ``present``, ``size``, ``last_modified``. + + For artifacts whose ``s3_key_template`` is a latest-pointer (no + ``{date}``/``{trading_day}`` placeholder), returns a single-entry + list with the pointer's current state — historical sequence isn't + observable from the pointer alone, so the page must render this + distinction. + """ + template = spec.s3_key_template + has_date_placeholder = "{date}" in template or "{trading_day}" in template + has_unsupported_placeholder = "{cycle_label}" in template + + if has_unsupported_placeholder: + return [], False + + bucket = spec.s3_bucket or REGISTRY_BUCKET + + if not has_date_placeholder: + # Latest-pointer: HEAD once, report current state only. + try: + resp = s3_client.head_object(Bucket=bucket, Key=template) + return [{ + "date": "(latest)", + "present": True, + "size": resp["ContentLength"], + "last_modified": resp["LastModified"].isoformat(), + }], True + except Exception as exc: # noqa: BLE001 — record per-spec failures inline + code = str(getattr(exc, "response", {}).get("Error", {}).get("Code", "unknown")) + entry: dict = {"date": "(latest)", "present": False} + if code not in {"404", "403", "NoSuchKey"}: + entry["error_code"] = code + return [entry], True + + # Date-templated: probe each historical date. + cycles = [] + for d in cycle_dates: + try: + key = _format_historical_key(template, d) + except (KeyError, IndexError) as exc: + cycles.append({ + "date": d.isoformat(), + "present": False, + "error_code": f"template_render_failed:{type(exc).__name__}", + }) + continue + try: + resp = s3_client.head_object(Bucket=bucket, Key=key) + cycles.append({ + "date": d.isoformat(), + "present": True, + "size": resp["ContentLength"], + "last_modified": resp["LastModified"].isoformat(), + }) + except Exception as exc: # noqa: BLE001 + code = str(getattr(exc, "response", {}).get("Error", {}).get("Code", "unknown")) + # 404 (object missing) AND 403 (object missing, no ListBucket) both + # mean "not present" in S3 semantics — when the Lambda lacks + # s3:ListBucket on the bucket, S3 returns 403 instead of 404 for + # missing keys. Surface both as cleanly-absent (no error_code) so + # the page 26 display doesn't show spurious "403 errors" on + # legitimately-absent historical cycles. Other codes (500, etc.) + # keep error_code for operator visibility. + if code in {"404", "403", "NoSuchKey"}: + cycles.append({"date": d.isoformat(), "present": False}) + else: + cycles.append({ + "date": d.isoformat(), + "present": False, + "error_code": code, + }) + return cycles, False + + +def _handle_historical( + s3_client: Any, + now: datetime, + started_at: float, + lookback_overrides: dict | None, +) -> dict: + """Walk the registry, probe each artifact's last N cycles, write + ``history.json``. Same outer error handling as the current-state + handler — load_registry raises on YAML parse / schema, per-spec + failures are caught inline.""" + logger.info( + "freshness-monitor invoked in HISTORICAL mode at %s", + now.isoformat(), + ) + lookback = dict(_DEFAULT_LOOKBACK) + if lookback_overrides: + lookback.update(lookback_overrides) + + specs = load_registry(s3_client, REGISTRY_BUCKET, REGISTRY_KEY) + logger.info("loaded %d specs from registry", len(specs)) + + artifacts_history: dict[str, dict] = {} + skipped_unsupported = 0 + total_cycles_probed = 0 + for spec in specs: + count = lookback.get(spec.cadence, 0) + cycle_dates = _iter_historical_cycle_dates(spec.cadence, now, count) + cycles, is_latest_pointer = _probe_historical(s3_client, spec, cycle_dates) + if not cycles and "{cycle_label}" in spec.s3_key_template: + skipped_unsupported += 1 + continue + total_cycles_probed += len(cycles) + # Gap count: present=False entries in date-templated history. + # Latest-pointers don't have a meaningful gap count (single point). + if is_latest_pointer: + gap_count = None + continuous = ( + len(cycles) == 1 and cycles[0].get("present") is True + ) + else: + gap_count = sum(1 for c in cycles if not c.get("present")) + continuous = (gap_count == 0 and len(cycles) > 0) + artifacts_history[spec.artifact_id] = { + "cadence": spec.cadence, + "severity": spec.severity, + "owner_repo": spec.owner_repo, + "s3_key_template": spec.s3_key_template, + "is_latest_pointer": is_latest_pointer, + "lookback_cycles": count if not is_latest_pointer else 1, + "gap_count": gap_count, + "continuous": continuous, + "history": cycles, + } + + payload = { + "generated_at": now.isoformat(), + "lookback": lookback, + "duration_seconds": round(time.time() - started_at, 2), + "n_artifacts": len(artifacts_history), + "n_cycles_probed": total_cycles_probed, + "skipped_unsupported": skipped_unsupported, + "artifacts": artifacts_history, + } + _put_json(s3_client, REGISTRY_BUCKET, HISTORY_KEY, payload) + + logger.info( + "freshness-monitor HISTORICAL complete: %d artifacts, %d cycles probed, %d skipped, duration=%.2fs", + len(artifacts_history), + total_cycles_probed, + skipped_unsupported, + payload["duration_seconds"], + ) + + return { + "mode": "historical", + "n_artifacts": len(artifacts_history), + "n_cycles_probed": total_cycles_probed, + "skipped_unsupported": skipped_unsupported, + "duration_seconds": payload["duration_seconds"], + } + + +# ── Main handler ──────────────────────────────────────────────────────────── + + def handler(event: dict, context) -> dict: # noqa: ARG001 — Lambda contract """EventBridge cron handler — every 15min walk the registry, - emit heartbeat + check_results, alert on misses past SLA.""" + emit heartbeat + check_results, alert on misses past SLA. + + ``event["mode"] == "historical"`` dispatches to the daily + historical-probe path instead (separate EB cron at ~04:00 UTC). + """ started_at = time.time() now = datetime.now(timezone.utc) s3 = boto3.client("s3") + if event and event.get("mode") == "historical": + return _handle_historical( + s3, now, started_at, event.get("lookback"), + ) + logger.info( "freshness-monitor invoked at %s (alerts_enabled=%s)", now.isoformat(), ALERTS_ENABLED, diff --git a/infrastructure/lambdas/freshness-monitor/test_handler.py b/infrastructure/lambdas/freshness-monitor/test_handler.py index 27e2578..fc3fd12 100644 --- a/infrastructure/lambdas/freshness-monitor/test_handler.py +++ b/infrastructure/lambdas/freshness-monitor/test_handler.py @@ -476,3 +476,97 @@ def test_maybe_alert_probe_failed_uses_critical_severity(monkeypatch, fixed_now) assert index._maybe_alert(spec, result, fixed_now) is True publish_mock.assert_called_once() assert publish_mock.call_args.kwargs["severity"] == "critical" + + +# ── Historical-mode tests ──────────────────────────────────────────────────── + + +def test_iter_historical_cycle_dates_saturday_returns_previous_saturdays(fixed_now): + """Saturday cadence walks back day-by-day collecting Saturdays only. + Verified anchor: 2026-05-28 is a Thursday; previous Saturdays are + 2026-05-23, 2026-05-16, 2026-05-09, etc.""" + import index + dates = index._iter_historical_cycle_dates("saturday_sf", fixed_now, 3) + assert [d.isoformat() for d in dates] == ["2026-05-23", "2026-05-16", "2026-05-09"] + + +def test_iter_historical_cycle_dates_weekday_returns_previous_mon_fri(fixed_now): + """weekday_sf walks back collecting Mon-Fri only. fixed_now is Sat + 2026-05-30; previous Mon-Fri sequence is Fri 5/29, Thu 5/28, Wed + 5/27, Tue 5/26, Mon 5/25.""" + import index + dates = index._iter_historical_cycle_dates("weekday_sf", fixed_now, 5) + assert [d.isoformat() for d in dates] == [ + "2026-05-29", "2026-05-28", "2026-05-27", "2026-05-26", "2026-05-25", + ] + + +def test_iter_historical_cycle_dates_eod_matches_weekday(fixed_now): + """eod_sf shares the weekday cadence — confirmed by callers in + ARTIFACT_REGISTRY.yaml (regime_state_dated, predictor_drift_detection).""" + import index + sat_dates = index._iter_historical_cycle_dates("weekday_sf", fixed_now, 4) + eod_dates = index._iter_historical_cycle_dates("eod_sf", fixed_now, 4) + assert sat_dates == eod_dates + + +def test_iter_historical_cycle_dates_continuous_returns_empty(fixed_now): + """continuous cadence is intentionally skipped — current-state probe + covers it at 15min granularity.""" + import index + assert index._iter_historical_cycle_dates("continuous", fixed_now, 100) == [] + + +def test_iter_historical_cycle_dates_zero_count_returns_empty(fixed_now): + """count=0 short-circuits — early return prevents infinite loop on a + cadence string whose weekday filter never matches.""" + import index + assert index._iter_historical_cycle_dates("saturday_sf", fixed_now, 0) == [] + + +def test_format_historical_key_substitutes_date_placeholder(): + import index + assert index._format_historical_key( + "candidates/{date}/candidates.json", date(2026, 5, 23), + ) == "candidates/2026-05-23/candidates.json" + + +def test_format_historical_key_substitutes_trading_day_placeholder(): + """{trading_day} renders the same ISO date as {date} — the lib's + placeholder set treats them as synonyms for historical-probe purposes.""" + import index + assert index._format_historical_key( + "predictor/predictions/{trading_day}.json", date(2026, 5, 27), + ) == "predictor/predictions/2026-05-27.json" + + +def test_format_historical_key_passes_through_latest_pointer(): + """Latest-pointer templates have no placeholder — format is a no-op.""" + import index + assert index._format_historical_key( + "factors/profiles/latest.json", date(2026, 5, 24), + ) == "factors/profiles/latest.json" + + +def test_handler_dispatches_to_historical_on_mode_flag(monkeypatch, fixed_now): + """event={'mode': 'historical'} routes to _handle_historical without + touching the current-state path.""" + import importlib + import index + importlib.reload(index) + monkeypatch.setattr( + index, "_handle_historical", + mock.Mock(return_value={"mode": "historical", "n_artifacts": 0, + "n_cycles_probed": 0, "skipped_unsupported": 0, + "duration_seconds": 0.0}), + ) + monkeypatch.setattr(index, "load_registry", mock.Mock()) # would fail otherwise + monkeypatch.setattr( + index, "datetime", mock.Mock( + now=mock.Mock(return_value=fixed_now), + ), + ) + result = index.handler({"mode": "historical"}, None) + assert result["mode"] == "historical" + index._handle_historical.assert_called_once() + index.load_registry.assert_not_called() # current-state path NOT taken