From 0e005901198b89e54c813a74cf718a17f3ba720c Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Thu, 28 May 2026 08:23:36 -0700 Subject: [PATCH] =?UTF-8?q?feat(freshness-monitor):=20historical-mode=20pr?= =?UTF-8?q?obe=20=E2=80=94=20daily=20gap=20detection=20across=20all=20arti?= =?UTF-8?q?facts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the gap surfaced 2026-05-28: current-state probe answers 'is the artifact present now?' but operators also need 'did it land last weekend? are there gaps in the producer's history?' Filed per the same feedback memory observe_mode_unconditional_gates — absence-of-artifact is the failure mode, and a single-cycle absence could be a false-positive where a multi-cycle gap is a real producer regression. Adds: - event['mode']='historical' dispatch in handler(). Routes to a new _handle_historical(s3, now, started_at, lookback_overrides) path that walks the registry, probes the last N cycles per artifact, and writes _freshness_monitor/history.json (page 26 will surface per-row history expanders + gap counts). - New EB cron alpha-engine-freshness-monitor-historical-cron (daily 04:00 UTC, off-peak) wired in deploy.sh --bootstrap. - Default lookback: 12 saturday_sf + 30 weekday_sf/eod_sf cycles (~3 months each). continuous skipped (current-state covers). Tunable via event['lookback'] override. 403/404/NoSuchKey normalization: S3 returns 403 (not 404) for missing keys when the Lambda lacks s3:ListBucket. Treat both as cleanly-absent (no error_code in output) so page 26 doesn't show spurious '403 errors' on legitimately-absent historical cycles. 9 new unit tests cover: saturday/weekday/eod cycle-date resolution, continuous skip, zero-count short-circuit, date/trading_day/no-placeholder template rendering, and handler mode-dispatch. Live smoke (post-deploy + manual invoke): n_artifacts=51, n_cycles_probed=474, duration=10.08s Surfaced 1 real finding for follow-up: several artifacts use calendar-vs-trading-day-anchored templates that don't match producer behavior. research_signals registered as signals/{date}/signals.json with cadence=saturday_sf, but producer writes to mostly Friday trading-day keys (2026-05-22, 2026-05-15, etc.). The historical probe correctly reports the Saturday keys as absent — which IS the right answer given the registry template. ROADMAP follow-up filed separately to audit all registry templates for calendar-vs-trading-day mismatch. Calendar-naive by design — NYSE holidays surface as false-positive absent days but operators can interpret in context. Calendar-aware backfill is a P3 follow-up if the noise becomes worth the dependency lift. Composes with the OBSERVATION_REGISTRY arc (#349/#351/#352/#355 + #135/#136/#137). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lambdas/freshness-monitor/deploy.sh | 28 ++ .../lambdas/freshness-monitor/index.py | 261 +++++++++++++++++- .../lambdas/freshness-monitor/test_handler.py | 94 +++++++ 3 files changed, 381 insertions(+), 2 deletions(-) diff --git a/infrastructure/lambdas/freshness-monitor/deploy.sh b/infrastructure/lambdas/freshness-monitor/deploy.sh index 81b07f0..0fd433a 100755 --- a/infrastructure/lambdas/freshness-monitor/deploy.sh +++ b/infrastructure/lambdas/freshness-monitor/deploy.sh @@ -31,6 +31,7 @@ FUNCTION_NAME="alpha-engine-freshness-monitor" ROLE_NAME="alpha-engine-freshness-monitor-role" POLICY_NAME="alpha-engine-freshness-monitor-policy" RULE_NAME="alpha-engine-freshness-monitor-cron" +HISTORICAL_RULE_NAME="alpha-engine-freshness-monitor-historical-cron" REGION="${AWS_REGION:-us-east-1}" ACCOUNT_ID="${ACCOUNT_ID:-711398986525}" @@ -200,6 +201,33 @@ if $BOOTSTRAP; then --principal events.amazonaws.com \ --source-arn "${RULE_ARN}" \ --region "${REGION}" 2>/dev/null || true + + # Historical-mode cron: daily at 04:00 UTC, off-peak. Fires the same + # Lambda with event={"mode": "historical"} so it probes the last N + # cycles of each artifact and writes _freshness_monitor/history.json + # (page 26 reads this for per-row history expanders + gap counts). + # Lookback defaults: 12 saturday + 30 weekday/eod cycles. + echo " Creating EventBridge historical cron: ${HISTORICAL_RULE_NAME}" + run aws events put-rule \ + --name "${HISTORICAL_RULE_NAME}" \ + --schedule-expression "cron(0 4 * * ? *)" \ + --description "Daily 04:00 UTC historical-cycle probe (mode=historical)" \ + --region "${REGION}" \ + --query 'RuleArn' --output text + + run aws events put-targets \ + --rule "${HISTORICAL_RULE_NAME}" \ + --targets "Id=1,Arn=${FN_ARN},Input={\"mode\":\"historical\"}" \ + --region "${REGION}" + + HIST_RULE_ARN="arn:aws:events:${REGION}:${ACCOUNT_ID}:rule/${HISTORICAL_RULE_NAME}" + run aws lambda add-permission \ + --function-name "${FUNCTION_NAME}" \ + --statement-id "eventbridge-${HISTORICAL_RULE_NAME}" \ + --action lambda:InvokeFunction \ + --principal events.amazonaws.com \ + --source-arn "${HIST_RULE_ARN}" \ + --region "${REGION}" 2>/dev/null || true fi # ----- 3. Update function code (always after bootstrap, idempotent) --------- diff --git a/infrastructure/lambdas/freshness-monitor/index.py b/infrastructure/lambdas/freshness-monitor/index.py index 299e486..22dfd47 100644 --- a/infrastructure/lambdas/freshness-monitor/index.py +++ b/infrastructure/lambdas/freshness-monitor/index.py @@ -47,7 +47,7 @@ import logging import os import time -from datetime import date, datetime, timezone +from datetime import date, datetime, timedelta, timezone from typing import Any import boto3 @@ -73,6 +73,18 @@ ) HEARTBEAT_KEY = "_freshness_monitor/heartbeat.json" CHECK_RESULTS_KEY = "_freshness_monitor/check_results.json" +HISTORY_KEY = "_freshness_monitor/history.json" + +# Historical-mode lookback depth per cadence. Tunable via event payload +# (event["lookback"] = {"saturday_sf": 12, ...}). Defaults sized for ~3 +# months of history at ~negligible cost: 51 artifacts × 50 cycles ≈ +# 2,500 S3 HEAD requests per daily historical run ≈ $0.001/day. +_DEFAULT_LOOKBACK = { + "saturday_sf": 12, + "weekday_sf": 30, + "eod_sf": 30, + "continuous": 0, # current-state probe covers continuous artifacts +} # OBSERVE-mode gate. Plan §3 invariant 10 + §4 Phase 6 default. Anything # other than literal "true" (case-insensitive) suppresses alerts. Check @@ -309,13 +321,258 @@ def _maybe_alert(spec: ArtifactSpec, result: CheckResult, now: datetime) -> bool # ── Handler ───────────────────────────────────────────────────────────────── +# ── Historical-mode probe ─────────────────────────────────────────────────── +# +# Closes the gap surfaced 2026-05-28: the current-state probe answers +# "is the artifact present *now*?" but operators also need "did it +# land last weekend? the weekend before? are there gaps in the +# producer's history?" Filed per the same feedback memory +# [[feedback_observe_mode_unconditional_gates_govern_cutover]] — +# absence-of-artifact is the failure mode, and a single-cycle absence +# could be a false-positive (instance failure) where a multi-cycle gap +# is a real producer regression. +# +# Fires on a separate EventBridge cron (daily ~04:00 UTC, off-peak) +# via event={"mode": "historical"}. Writes +# s3://alpha-engine-research/_freshness_monitor/history.json which +# page 26 reads to surface per-artifact gap counts + per-row history +# expanders. +# +# Date resolution is intentionally simple (calendar-naive): +# - saturday_sf: last N calendar Saturdays +# - weekday_sf / eod_sf: last N Mon-Fri days +# NYSE holidays show up as false-positive "absent" days. Operators +# interpret them in context (or filter via the page 26 surface). When +# the holiday-aware backfill becomes worth the dependency lift, we +# can route via alpha_engine_lib.dates. + + +def _iter_historical_cycle_dates(cadence: str, now: datetime, count: int) -> list[date]: + """Return the N most recent cycle dates for the given cadence, + newest-first. Calendar-naive (NYSE holidays are not skipped). + + For ``saturday_sf``: previous ``count`` Saturdays strictly before + ``now.date()``. Today's Saturday is excluded if ``now`` is before + the typical 09:00 UTC cron — the caller is expected to invoke + historical mode after the current-state probe has had time to + cover the current cycle. + + For ``weekday_sf`` / ``eod_sf``: previous ``count`` Mon-Fri days + strictly before ``now.date()``. + """ + if count <= 0: + return [] + today = now.date() + dates: list[date] = [] + if cadence == "saturday_sf": + # Walk back day-by-day, collecting Saturdays. + d = today - timedelta(days=1) + while len(dates) < count: + if d.weekday() == 5: # Saturday + dates.append(d) + d -= timedelta(days=1) + elif cadence in {"weekday_sf", "eod_sf"}: + d = today - timedelta(days=1) + while len(dates) < count: + if d.weekday() < 5: # Mon-Fri + dates.append(d) + d -= timedelta(days=1) + # Other cadences: skip (continuous covered by current-state probe). + return dates + + +def _format_historical_key(template: str, target_date: date) -> str: + """Substitute date placeholders. Supports the same placeholders the + substrate's _format_key handles: ``{date}``, ``{trading_day}``. + ``{cycle_label}`` (fortnightly/quarterly buckets) is not historical- + probable from a single date, so artifacts using it are skipped. + """ + iso = target_date.isoformat() + return template.format(date=iso, trading_day=iso) + + +def _probe_historical( + s3_client: Any, + spec: ArtifactSpec, + cycle_dates: list[date], +) -> tuple[list[dict], bool]: + """Probe the last N cycles' keys for one artifact. Returns + ``(cycles, is_latest_pointer)``. Each ``cycles`` entry is a dict + with ``date``, ``present``, ``size``, ``last_modified``. + + For artifacts whose ``s3_key_template`` is a latest-pointer (no + ``{date}``/``{trading_day}`` placeholder), returns a single-entry + list with the pointer's current state — historical sequence isn't + observable from the pointer alone, so the page must render this + distinction. + """ + template = spec.s3_key_template + has_date_placeholder = "{date}" in template or "{trading_day}" in template + has_unsupported_placeholder = "{cycle_label}" in template + + if has_unsupported_placeholder: + return [], False + + bucket = spec.s3_bucket or REGISTRY_BUCKET + + if not has_date_placeholder: + # Latest-pointer: HEAD once, report current state only. + try: + resp = s3_client.head_object(Bucket=bucket, Key=template) + return [{ + "date": "(latest)", + "present": True, + "size": resp["ContentLength"], + "last_modified": resp["LastModified"].isoformat(), + }], True + except Exception as exc: # noqa: BLE001 — record per-spec failures inline + code = str(getattr(exc, "response", {}).get("Error", {}).get("Code", "unknown")) + entry: dict = {"date": "(latest)", "present": False} + if code not in {"404", "403", "NoSuchKey"}: + entry["error_code"] = code + return [entry], True + + # Date-templated: probe each historical date. + cycles = [] + for d in cycle_dates: + try: + key = _format_historical_key(template, d) + except (KeyError, IndexError) as exc: + cycles.append({ + "date": d.isoformat(), + "present": False, + "error_code": f"template_render_failed:{type(exc).__name__}", + }) + continue + try: + resp = s3_client.head_object(Bucket=bucket, Key=key) + cycles.append({ + "date": d.isoformat(), + "present": True, + "size": resp["ContentLength"], + "last_modified": resp["LastModified"].isoformat(), + }) + except Exception as exc: # noqa: BLE001 + code = str(getattr(exc, "response", {}).get("Error", {}).get("Code", "unknown")) + # 404 (object missing) AND 403 (object missing, no ListBucket) both + # mean "not present" in S3 semantics — when the Lambda lacks + # s3:ListBucket on the bucket, S3 returns 403 instead of 404 for + # missing keys. Surface both as cleanly-absent (no error_code) so + # the page 26 display doesn't show spurious "403 errors" on + # legitimately-absent historical cycles. Other codes (500, etc.) + # keep error_code for operator visibility. + if code in {"404", "403", "NoSuchKey"}: + cycles.append({"date": d.isoformat(), "present": False}) + else: + cycles.append({ + "date": d.isoformat(), + "present": False, + "error_code": code, + }) + return cycles, False + + +def _handle_historical( + s3_client: Any, + now: datetime, + started_at: float, + lookback_overrides: dict | None, +) -> dict: + """Walk the registry, probe each artifact's last N cycles, write + ``history.json``. Same outer error handling as the current-state + handler — load_registry raises on YAML parse / schema, per-spec + failures are caught inline.""" + logger.info( + "freshness-monitor invoked in HISTORICAL mode at %s", + now.isoformat(), + ) + lookback = dict(_DEFAULT_LOOKBACK) + if lookback_overrides: + lookback.update(lookback_overrides) + + specs = load_registry(s3_client, REGISTRY_BUCKET, REGISTRY_KEY) + logger.info("loaded %d specs from registry", len(specs)) + + artifacts_history: dict[str, dict] = {} + skipped_unsupported = 0 + total_cycles_probed = 0 + for spec in specs: + count = lookback.get(spec.cadence, 0) + cycle_dates = _iter_historical_cycle_dates(spec.cadence, now, count) + cycles, is_latest_pointer = _probe_historical(s3_client, spec, cycle_dates) + if not cycles and "{cycle_label}" in spec.s3_key_template: + skipped_unsupported += 1 + continue + total_cycles_probed += len(cycles) + # Gap count: present=False entries in date-templated history. + # Latest-pointers don't have a meaningful gap count (single point). + if is_latest_pointer: + gap_count = None + continuous = ( + len(cycles) == 1 and cycles[0].get("present") is True + ) + else: + gap_count = sum(1 for c in cycles if not c.get("present")) + continuous = (gap_count == 0 and len(cycles) > 0) + artifacts_history[spec.artifact_id] = { + "cadence": spec.cadence, + "severity": spec.severity, + "owner_repo": spec.owner_repo, + "s3_key_template": spec.s3_key_template, + "is_latest_pointer": is_latest_pointer, + "lookback_cycles": count if not is_latest_pointer else 1, + "gap_count": gap_count, + "continuous": continuous, + "history": cycles, + } + + payload = { + "generated_at": now.isoformat(), + "lookback": lookback, + "duration_seconds": round(time.time() - started_at, 2), + "n_artifacts": len(artifacts_history), + "n_cycles_probed": total_cycles_probed, + "skipped_unsupported": skipped_unsupported, + "artifacts": artifacts_history, + } + _put_json(s3_client, REGISTRY_BUCKET, HISTORY_KEY, payload) + + logger.info( + "freshness-monitor HISTORICAL complete: %d artifacts, %d cycles probed, %d skipped, duration=%.2fs", + len(artifacts_history), + total_cycles_probed, + skipped_unsupported, + payload["duration_seconds"], + ) + + return { + "mode": "historical", + "n_artifacts": len(artifacts_history), + "n_cycles_probed": total_cycles_probed, + "skipped_unsupported": skipped_unsupported, + "duration_seconds": payload["duration_seconds"], + } + + +# ── Main handler ──────────────────────────────────────────────────────────── + + def handler(event: dict, context) -> dict: # noqa: ARG001 — Lambda contract """EventBridge cron handler — every 15min walk the registry, - emit heartbeat + check_results, alert on misses past SLA.""" + emit heartbeat + check_results, alert on misses past SLA. + + ``event["mode"] == "historical"`` dispatches to the daily + historical-probe path instead (separate EB cron at ~04:00 UTC). + """ started_at = time.time() now = datetime.now(timezone.utc) s3 = boto3.client("s3") + if event and event.get("mode") == "historical": + return _handle_historical( + s3, now, started_at, event.get("lookback"), + ) + logger.info( "freshness-monitor invoked at %s (alerts_enabled=%s)", now.isoformat(), ALERTS_ENABLED, diff --git a/infrastructure/lambdas/freshness-monitor/test_handler.py b/infrastructure/lambdas/freshness-monitor/test_handler.py index 27e2578..fc3fd12 100644 --- a/infrastructure/lambdas/freshness-monitor/test_handler.py +++ b/infrastructure/lambdas/freshness-monitor/test_handler.py @@ -476,3 +476,97 @@ def test_maybe_alert_probe_failed_uses_critical_severity(monkeypatch, fixed_now) assert index._maybe_alert(spec, result, fixed_now) is True publish_mock.assert_called_once() assert publish_mock.call_args.kwargs["severity"] == "critical" + + +# ── Historical-mode tests ──────────────────────────────────────────────────── + + +def test_iter_historical_cycle_dates_saturday_returns_previous_saturdays(fixed_now): + """Saturday cadence walks back day-by-day collecting Saturdays only. + Verified anchor: 2026-05-28 is a Thursday; previous Saturdays are + 2026-05-23, 2026-05-16, 2026-05-09, etc.""" + import index + dates = index._iter_historical_cycle_dates("saturday_sf", fixed_now, 3) + assert [d.isoformat() for d in dates] == ["2026-05-23", "2026-05-16", "2026-05-09"] + + +def test_iter_historical_cycle_dates_weekday_returns_previous_mon_fri(fixed_now): + """weekday_sf walks back collecting Mon-Fri only. fixed_now is Sat + 2026-05-30; previous Mon-Fri sequence is Fri 5/29, Thu 5/28, Wed + 5/27, Tue 5/26, Mon 5/25.""" + import index + dates = index._iter_historical_cycle_dates("weekday_sf", fixed_now, 5) + assert [d.isoformat() for d in dates] == [ + "2026-05-29", "2026-05-28", "2026-05-27", "2026-05-26", "2026-05-25", + ] + + +def test_iter_historical_cycle_dates_eod_matches_weekday(fixed_now): + """eod_sf shares the weekday cadence — confirmed by callers in + ARTIFACT_REGISTRY.yaml (regime_state_dated, predictor_drift_detection).""" + import index + sat_dates = index._iter_historical_cycle_dates("weekday_sf", fixed_now, 4) + eod_dates = index._iter_historical_cycle_dates("eod_sf", fixed_now, 4) + assert sat_dates == eod_dates + + +def test_iter_historical_cycle_dates_continuous_returns_empty(fixed_now): + """continuous cadence is intentionally skipped — current-state probe + covers it at 15min granularity.""" + import index + assert index._iter_historical_cycle_dates("continuous", fixed_now, 100) == [] + + +def test_iter_historical_cycle_dates_zero_count_returns_empty(fixed_now): + """count=0 short-circuits — early return prevents infinite loop on a + cadence string whose weekday filter never matches.""" + import index + assert index._iter_historical_cycle_dates("saturday_sf", fixed_now, 0) == [] + + +def test_format_historical_key_substitutes_date_placeholder(): + import index + assert index._format_historical_key( + "candidates/{date}/candidates.json", date(2026, 5, 23), + ) == "candidates/2026-05-23/candidates.json" + + +def test_format_historical_key_substitutes_trading_day_placeholder(): + """{trading_day} renders the same ISO date as {date} — the lib's + placeholder set treats them as synonyms for historical-probe purposes.""" + import index + assert index._format_historical_key( + "predictor/predictions/{trading_day}.json", date(2026, 5, 27), + ) == "predictor/predictions/2026-05-27.json" + + +def test_format_historical_key_passes_through_latest_pointer(): + """Latest-pointer templates have no placeholder — format is a no-op.""" + import index + assert index._format_historical_key( + "factors/profiles/latest.json", date(2026, 5, 24), + ) == "factors/profiles/latest.json" + + +def test_handler_dispatches_to_historical_on_mode_flag(monkeypatch, fixed_now): + """event={'mode': 'historical'} routes to _handle_historical without + touching the current-state path.""" + import importlib + import index + importlib.reload(index) + monkeypatch.setattr( + index, "_handle_historical", + mock.Mock(return_value={"mode": "historical", "n_artifacts": 0, + "n_cycles_probed": 0, "skipped_unsupported": 0, + "duration_seconds": 0.0}), + ) + monkeypatch.setattr(index, "load_registry", mock.Mock()) # would fail otherwise + monkeypatch.setattr( + index, "datetime", mock.Mock( + now=mock.Mock(return_value=fixed_now), + ), + ) + result = index.handler({"mode": "historical"}, None) + assert result["mode"] == "historical" + index._handle_historical.assert_called_once() + index.load_registry.assert_not_called() # current-state path NOT taken