Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions infrastructure/lambdas/freshness-monitor/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ FUNCTION_NAME="alpha-engine-freshness-monitor"
ROLE_NAME="alpha-engine-freshness-monitor-role"
POLICY_NAME="alpha-engine-freshness-monitor-policy"
RULE_NAME="alpha-engine-freshness-monitor-cron"
HISTORICAL_RULE_NAME="alpha-engine-freshness-monitor-historical-cron"
REGION="${AWS_REGION:-us-east-1}"
ACCOUNT_ID="${ACCOUNT_ID:-711398986525}"

Expand Down Expand Up @@ -200,6 +201,33 @@ if $BOOTSTRAP; then
--principal events.amazonaws.com \
--source-arn "${RULE_ARN}" \
--region "${REGION}" 2>/dev/null || true

# Historical-mode cron: daily at 04:00 UTC, off-peak. Fires the same
# Lambda with event={"mode": "historical"} so it probes the last N
# cycles of each artifact and writes _freshness_monitor/history.json
# (page 26 reads this for per-row history expanders + gap counts).
# Lookback defaults: 12 saturday + 30 weekday/eod cycles.
echo " Creating EventBridge historical cron: ${HISTORICAL_RULE_NAME}"
run aws events put-rule \
--name "${HISTORICAL_RULE_NAME}" \
--schedule-expression "cron(0 4 * * ? *)" \
--description "Daily 04:00 UTC historical-cycle probe (mode=historical)" \
--region "${REGION}" \
--query 'RuleArn' --output text

run aws events put-targets \
--rule "${HISTORICAL_RULE_NAME}" \
--targets "Id=1,Arn=${FN_ARN},Input={\"mode\":\"historical\"}" \
--region "${REGION}"

HIST_RULE_ARN="arn:aws:events:${REGION}:${ACCOUNT_ID}:rule/${HISTORICAL_RULE_NAME}"
run aws lambda add-permission \
--function-name "${FUNCTION_NAME}" \
--statement-id "eventbridge-${HISTORICAL_RULE_NAME}" \
--action lambda:InvokeFunction \
--principal events.amazonaws.com \
--source-arn "${HIST_RULE_ARN}" \
--region "${REGION}" 2>/dev/null || true
fi

# ----- 3. Update function code (always after bootstrap, idempotent) ---------
Expand Down
261 changes: 259 additions & 2 deletions infrastructure/lambdas/freshness-monitor/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import logging
import os
import time
from datetime import date, datetime, timezone
from datetime import date, datetime, timedelta, timezone
from typing import Any

import boto3
Expand All @@ -73,6 +73,18 @@
)
HEARTBEAT_KEY = "_freshness_monitor/heartbeat.json"
CHECK_RESULTS_KEY = "_freshness_monitor/check_results.json"
HISTORY_KEY = "_freshness_monitor/history.json"

# Historical-mode lookback depth per cadence. Tunable via event payload
# (event["lookback"] = {"saturday_sf": 12, ...}). Defaults sized for ~3
# months of history at ~negligible cost: 51 artifacts × 50 cycles ≈
# 2,500 S3 HEAD requests per daily historical run ≈ $0.001/day.
_DEFAULT_LOOKBACK = {
"saturday_sf": 12,
"weekday_sf": 30,
"eod_sf": 30,
"continuous": 0, # current-state probe covers continuous artifacts
}

# OBSERVE-mode gate. Plan §3 invariant 10 + §4 Phase 6 default. Anything
# other than literal "true" (case-insensitive) suppresses alerts. Check
Expand Down Expand Up @@ -309,13 +321,258 @@ def _maybe_alert(spec: ArtifactSpec, result: CheckResult, now: datetime) -> bool
# ── Handler ─────────────────────────────────────────────────────────────────


# ── Historical-mode probe ───────────────────────────────────────────────────
#
# Closes the gap surfaced 2026-05-28: the current-state probe answers
# "is the artifact present *now*?" but operators also need "did it
# land last weekend? the weekend before? are there gaps in the
# producer's history?" Filed per the same feedback memory
# [[feedback_observe_mode_unconditional_gates_govern_cutover]] —
# absence-of-artifact is the failure mode, and a single-cycle absence
# could be a false-positive (instance failure) where a multi-cycle gap
# is a real producer regression.
#
# Fires on a separate EventBridge cron (daily ~04:00 UTC, off-peak)
# via event={"mode": "historical"}. Writes
# s3://alpha-engine-research/_freshness_monitor/history.json which
# page 26 reads to surface per-artifact gap counts + per-row history
# expanders.
#
# Date resolution is intentionally simple (calendar-naive):
# - saturday_sf: last N calendar Saturdays
# - weekday_sf / eod_sf: last N Mon-Fri days
# NYSE holidays show up as false-positive "absent" days. Operators
# interpret them in context (or filter via the page 26 surface). When
# the holiday-aware backfill becomes worth the dependency lift, we
# can route via alpha_engine_lib.dates.


def _iter_historical_cycle_dates(cadence: str, now: datetime, count: int) -> list[date]:
"""Return the N most recent cycle dates for the given cadence,
newest-first. Calendar-naive (NYSE holidays are not skipped).

For ``saturday_sf``: previous ``count`` Saturdays strictly before
``now.date()``. Today's Saturday is excluded if ``now`` is before
the typical 09:00 UTC cron — the caller is expected to invoke
historical mode after the current-state probe has had time to
cover the current cycle.

For ``weekday_sf`` / ``eod_sf``: previous ``count`` Mon-Fri days
strictly before ``now.date()``.
"""
if count <= 0:
return []
today = now.date()
dates: list[date] = []
if cadence == "saturday_sf":
# Walk back day-by-day, collecting Saturdays.
d = today - timedelta(days=1)
while len(dates) < count:
if d.weekday() == 5: # Saturday
dates.append(d)
d -= timedelta(days=1)
elif cadence in {"weekday_sf", "eod_sf"}:
d = today - timedelta(days=1)
while len(dates) < count:
if d.weekday() < 5: # Mon-Fri
dates.append(d)
d -= timedelta(days=1)
# Other cadences: skip (continuous covered by current-state probe).
return dates


def _format_historical_key(template: str, target_date: date) -> str:
"""Substitute date placeholders. Supports the same placeholders the
substrate's _format_key handles: ``{date}``, ``{trading_day}``.
``{cycle_label}`` (fortnightly/quarterly buckets) is not historical-
probable from a single date, so artifacts using it are skipped.
"""
iso = target_date.isoformat()
return template.format(date=iso, trading_day=iso)


def _probe_historical(
s3_client: Any,
spec: ArtifactSpec,
cycle_dates: list[date],
) -> tuple[list[dict], bool]:
"""Probe the last N cycles' keys for one artifact. Returns
``(cycles, is_latest_pointer)``. Each ``cycles`` entry is a dict
with ``date``, ``present``, ``size``, ``last_modified``.

For artifacts whose ``s3_key_template`` is a latest-pointer (no
``{date}``/``{trading_day}`` placeholder), returns a single-entry
list with the pointer's current state — historical sequence isn't
observable from the pointer alone, so the page must render this
distinction.
"""
template = spec.s3_key_template
has_date_placeholder = "{date}" in template or "{trading_day}" in template
has_unsupported_placeholder = "{cycle_label}" in template

if has_unsupported_placeholder:
return [], False

bucket = spec.s3_bucket or REGISTRY_BUCKET

if not has_date_placeholder:
# Latest-pointer: HEAD once, report current state only.
try:
resp = s3_client.head_object(Bucket=bucket, Key=template)
return [{
"date": "(latest)",
"present": True,
"size": resp["ContentLength"],
"last_modified": resp["LastModified"].isoformat(),
}], True
except Exception as exc: # noqa: BLE001 — record per-spec failures inline
code = str(getattr(exc, "response", {}).get("Error", {}).get("Code", "unknown"))
entry: dict = {"date": "(latest)", "present": False}
if code not in {"404", "403", "NoSuchKey"}:
entry["error_code"] = code
return [entry], True

# Date-templated: probe each historical date.
cycles = []
for d in cycle_dates:
try:
key = _format_historical_key(template, d)
except (KeyError, IndexError) as exc:
cycles.append({
"date": d.isoformat(),
"present": False,
"error_code": f"template_render_failed:{type(exc).__name__}",
})
continue
try:
resp = s3_client.head_object(Bucket=bucket, Key=key)
cycles.append({
"date": d.isoformat(),
"present": True,
"size": resp["ContentLength"],
"last_modified": resp["LastModified"].isoformat(),
})
except Exception as exc: # noqa: BLE001
code = str(getattr(exc, "response", {}).get("Error", {}).get("Code", "unknown"))
# 404 (object missing) AND 403 (object missing, no ListBucket) both
# mean "not present" in S3 semantics — when the Lambda lacks
# s3:ListBucket on the bucket, S3 returns 403 instead of 404 for
# missing keys. Surface both as cleanly-absent (no error_code) so
# the page 26 display doesn't show spurious "403 errors" on
# legitimately-absent historical cycles. Other codes (500, etc.)
# keep error_code for operator visibility.
if code in {"404", "403", "NoSuchKey"}:
cycles.append({"date": d.isoformat(), "present": False})
else:
cycles.append({
"date": d.isoformat(),
"present": False,
"error_code": code,
})
return cycles, False


def _handle_historical(
s3_client: Any,
now: datetime,
started_at: float,
lookback_overrides: dict | None,
) -> dict:
"""Walk the registry, probe each artifact's last N cycles, write
``history.json``. Same outer error handling as the current-state
handler — load_registry raises on YAML parse / schema, per-spec
failures are caught inline."""
logger.info(
"freshness-monitor invoked in HISTORICAL mode at %s",
now.isoformat(),
)
lookback = dict(_DEFAULT_LOOKBACK)
if lookback_overrides:
lookback.update(lookback_overrides)

specs = load_registry(s3_client, REGISTRY_BUCKET, REGISTRY_KEY)
logger.info("loaded %d specs from registry", len(specs))

artifacts_history: dict[str, dict] = {}
skipped_unsupported = 0
total_cycles_probed = 0
for spec in specs:
count = lookback.get(spec.cadence, 0)
cycle_dates = _iter_historical_cycle_dates(spec.cadence, now, count)
cycles, is_latest_pointer = _probe_historical(s3_client, spec, cycle_dates)
if not cycles and "{cycle_label}" in spec.s3_key_template:
skipped_unsupported += 1
continue
total_cycles_probed += len(cycles)
# Gap count: present=False entries in date-templated history.
# Latest-pointers don't have a meaningful gap count (single point).
if is_latest_pointer:
gap_count = None
continuous = (
len(cycles) == 1 and cycles[0].get("present") is True
)
else:
gap_count = sum(1 for c in cycles if not c.get("present"))
continuous = (gap_count == 0 and len(cycles) > 0)
artifacts_history[spec.artifact_id] = {
"cadence": spec.cadence,
"severity": spec.severity,
"owner_repo": spec.owner_repo,
"s3_key_template": spec.s3_key_template,
"is_latest_pointer": is_latest_pointer,
"lookback_cycles": count if not is_latest_pointer else 1,
"gap_count": gap_count,
"continuous": continuous,
"history": cycles,
}

payload = {
"generated_at": now.isoformat(),
"lookback": lookback,
"duration_seconds": round(time.time() - started_at, 2),
"n_artifacts": len(artifacts_history),
"n_cycles_probed": total_cycles_probed,
"skipped_unsupported": skipped_unsupported,
"artifacts": artifacts_history,
}
_put_json(s3_client, REGISTRY_BUCKET, HISTORY_KEY, payload)

logger.info(
"freshness-monitor HISTORICAL complete: %d artifacts, %d cycles probed, %d skipped, duration=%.2fs",
len(artifacts_history),
total_cycles_probed,
skipped_unsupported,
payload["duration_seconds"],
)

return {
"mode": "historical",
"n_artifacts": len(artifacts_history),
"n_cycles_probed": total_cycles_probed,
"skipped_unsupported": skipped_unsupported,
"duration_seconds": payload["duration_seconds"],
}


# ── Main handler ────────────────────────────────────────────────────────────


def handler(event: dict, context) -> dict: # noqa: ARG001 — Lambda contract
"""EventBridge cron handler — every 15min walk the registry,
emit heartbeat + check_results, alert on misses past SLA."""
emit heartbeat + check_results, alert on misses past SLA.

``event["mode"] == "historical"`` dispatches to the daily
historical-probe path instead (separate EB cron at ~04:00 UTC).
"""
started_at = time.time()
now = datetime.now(timezone.utc)
s3 = boto3.client("s3")

if event and event.get("mode") == "historical":
return _handle_historical(
s3, now, started_at, event.get("lookback"),
)

logger.info(
"freshness-monitor invoked at %s (alerts_enabled=%s)",
now.isoformat(), ALERTS_ENABLED,
Expand Down
Loading
Loading