From 4e92ccc44e18ca4a342e13c09a7dd5b79a5e77a2 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Wed, 6 May 2026 13:24:51 -0700 Subject: [PATCH] feat(sf): add WeeklySubstrateHealthCheck state at end of Saturday SF MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Inserts ``WeeklySubstrateHealthCheck`` + ``WaitForWeeklySubstrateHealthCheck`` between the existing ``WaitForSaturdayHealthCheck`` and ``NotifyComplete``. The new states invoke ``python -m alpha_engine_lib.transparency --cadence weekly --alert`` on the dashboard EC2 (Sat SF dispatcher), running the row-driven substrate health checker shipped in alpha-engine-lib v0.5.0. The substrate check is the enforced half of the Phase 2 → 3 observation gate. Per-row CloudWatch metrics emit to AlphaEngine/Substrate so individual rows have their own alarms — a failed row pages immediately and decrements the 8-week observation-gate denominator for that row instead of letting the failure get noticed retrospectively. The existing artifact-freshness ``SaturdayHealthCheck`` and behavioral ``DriftDetection`` continue to run unchanged. The substrate check is a different abstraction (row-driven inventory validation, content assertions) and runs in parallel; SaturdayHealthCheck retirement is planned after ~4-6 weeks of green substrate runs. Both new states are non-blocking (Catch routes to NotifyComplete) per the same pattern as SaturdayHealthCheck — pipeline halts only on hard infra failure, row-level failures fire CloudWatch alarms. 15 new wiring tests pin the chain ordering, Catch semantics, command shape (``--cadence weekly``, ``--alert``, dashboard EC2, git pull before run), and ResultPath isolation between freshness and substrate states. 490 total passing. Requires: alpha-engine-dashboard PR bumping lib pin to v0.5.0. Co-Authored-By: Claude Opus 4.7 (1M context) --- infrastructure/step_function.json | 64 +++++++++- tests/test_sf_substrate_check_wiring.py | 158 ++++++++++++++++++++++++ 2 files changed, 220 insertions(+), 2 deletions(-) create mode 100644 tests/test_sf_substrate_check_wiring.py diff --git a/infrastructure/step_function.json b/infrastructure/step_function.json index 19d80e3..36a51fe 100644 --- a/infrastructure/step_function.json +++ b/infrastructure/step_function.json @@ -829,12 +829,72 @@ "Catch": [ { "ErrorEquals": ["States.ALL"], - "Comment": "Non-blocking — continue even if polling fails", - "Next": "NotifyComplete", + "Comment": "Non-blocking — continue to substrate check even if freshness polling fails. SaturdayHealthCheck (artifact freshness) and WeeklySubstrateHealthCheck (row-driven inventory validation) are independent observability paths; both must run.", + "Next": "WeeklySubstrateHealthCheck", "ResultPath": "$.health_check_error" } ], "ResultPath": "$.health_check_poll", + "Next": "WeeklySubstrateHealthCheck" + }, + + "WeeklySubstrateHealthCheck": { + "Type": "Task", + "Comment": "Phase 2 → 3 transparency-substrate health check. Validates every row of the transparency inventory (alpha-engine-lib transparency_inventory.yaml) — agent decisions, predictor decisions, trade lineage, risk events, P&L attribution, config changes, data quality, agent quality, pipeline execution. Per-row CloudWatch metrics emit to AlphaEngine/Substrate so individual rows have their own alarms. Non-blocking (alerts on failure but does not halt pipeline) — same pattern as SaturdayHealthCheck. The Sat SF passes --cadence weekly which sweeps weekly + daily rows; the weekday EOD SF will run --cadence daily in a follow-up PR.", + "Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand", + "Parameters": { + "DocumentName": "AWS-RunShellScript", + "InstanceIds.$": "$.ec2_instance_id", + "Parameters": { + "commands": [ + "set -eo pipefail", + "sudo -u ec2-user git -C /home/ec2-user/alpha-engine-dashboard pull --ff-only origin main", + "cd /home/ec2-user/alpha-engine-dashboard", + "source .venv/bin/activate", + "pip install --quiet --upgrade -r requirements.txt", + "python -m alpha_engine_lib.transparency --cadence weekly --alert 2>&1 | tee /var/log/substrate-health-check.log" + ], + "executionTimeout": ["180"] + }, + "TimeoutSeconds": 180 + }, + "TimeoutSeconds": 240, + "Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Comment": "Substrate check failure is non-blocking — continue to notify. Per-row CloudWatch alarms catch row-level failures; this Catch only fires on infra/SSM failure.", + "Next": "NotifyComplete", + "ResultPath": "$.substrate_check_error" + } + ], + "ResultPath": "$.substrate_check_result", + "Next": "WaitForWeeklySubstrateHealthCheck" + }, + + "WaitForWeeklySubstrateHealthCheck": { + "Type": "Task", + "Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation", + "Parameters": { + "CommandId.$": "$.substrate_check_result.Command.CommandId", + "InstanceId.$": "$.ec2_instance_id[0]" + }, + "Retry": [ + { + "ErrorEquals": ["Ssm.InvocationDoesNotExistException"], + "MaxAttempts": 5, + "IntervalSeconds": 5, + "BackoffRate": 1.5 + } + ], + "Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Comment": "Non-blocking — continue even if polling fails. Row-level alarms still page on failure regardless of polling outcome.", + "Next": "NotifyComplete", + "ResultPath": "$.substrate_check_error" + } + ], + "ResultPath": "$.substrate_check_poll", "Next": "NotifyComplete" }, diff --git a/tests/test_sf_substrate_check_wiring.py b/tests/test_sf_substrate_check_wiring.py new file mode 100644 index 0000000..1e660c8 --- /dev/null +++ b/tests/test_sf_substrate_check_wiring.py @@ -0,0 +1,158 @@ +"""Pins the Phase 2 → 3 substrate-health-check wiring in the Saturday SF. + +The new states ``WeeklySubstrateHealthCheck`` and +``WaitForWeeklySubstrateHealthCheck`` chain off the end of the existing +``WaitForSaturdayHealthCheck`` and run the row-driven +``alpha_engine_lib.transparency`` checker on the dashboard EC2. + +Catches regressions like: +- Someone reroutes ``WaitForSaturdayHealthCheck.Next`` back to + ``NotifyComplete`` and silently drops the substrate check. +- Someone removes the substrate state thinking it's redundant with the + artifact-freshness check (it isn't — different abstractions, see PR + body for the deprecation timeline). +- Someone flips the substrate Catch into a hard-fail and starts halting + the pipeline on row-level failure (per-row alarms own paging — the + Catch is for SSM/infra failures only). +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + + +_REPO_ROOT = Path(__file__).resolve().parent.parent +_SF_PATH = _REPO_ROOT / "infrastructure" / "step_function.json" + + +@pytest.fixture(scope="module") +def sf() -> dict: + return json.loads(_SF_PATH.read_text()) + + +@pytest.fixture(scope="module") +def states(sf) -> dict: + return sf["States"] + + +class TestStatePresence: + """Both new states must exist and chain after the existing freshness check.""" + + def test_weekly_substrate_check_state_exists(self, states): + assert "WeeklySubstrateHealthCheck" in states + + def test_wait_for_weekly_substrate_check_exists(self, states): + assert "WaitForWeeklySubstrateHealthCheck" in states + + +class TestChainOrdering: + """Wiring goes: SaturdayHealthCheck → WaitForSat → Substrate → WaitForSubstrate → Notify.""" + + def test_wait_for_saturday_health_check_routes_to_substrate(self, states): + wait_state = states["WaitForSaturdayHealthCheck"] + assert wait_state["Next"] == "WeeklySubstrateHealthCheck", ( + "WaitForSaturdayHealthCheck must hand off to the substrate check, " + "not skip directly to NotifyComplete." + ) + + def test_wait_for_saturday_catch_routes_to_substrate(self, states): + catches = states["WaitForSaturdayHealthCheck"]["Catch"] + assert any(c["Next"] == "WeeklySubstrateHealthCheck" for c in catches), ( + "If freshness polling fails, substrate check must still run — " + "they're independent observability paths." + ) + + def test_substrate_check_routes_to_wait_state(self, states): + assert states["WeeklySubstrateHealthCheck"]["Next"] == ( + "WaitForWeeklySubstrateHealthCheck" + ) + + def test_wait_for_substrate_routes_to_notify_complete(self, states): + assert states["WaitForWeeklySubstrateHealthCheck"]["Next"] == "NotifyComplete" + + +class TestCatchSemantics: + """Substrate failures must NOT halt the pipeline. + + Per-row CloudWatch alarms own paging; the SF Catch only fires on + infra-level failures (SSM unreachable, EC2 down). Either way, the + failure path must terminate at NotifyComplete, not HandleFailure. + """ + + def test_substrate_check_catch_is_non_blocking(self, states): + catches = states["WeeklySubstrateHealthCheck"]["Catch"] + assert len(catches) >= 1 + for c in catches: + assert c["Next"] == "NotifyComplete", ( + f"Substrate Catch must continue to NotifyComplete, not " + f"{c['Next']!r} — the substrate check is observability, not gating." + ) + + def test_substrate_wait_catch_is_non_blocking(self, states): + catches = states["WaitForWeeklySubstrateHealthCheck"]["Catch"] + for c in catches: + assert c["Next"] == "NotifyComplete" + + +class TestCommandShape: + """The SSM command must invoke the lib CLI with --cadence weekly --alert. + + Drops here would silently neuter the check (e.g. dropping --alert + suppresses SNS without changing exit code; dropping --cadence flips + to argparse error). + """ + + @pytest.fixture + def commands(self, states) -> list[str]: + return states["WeeklySubstrateHealthCheck"]["Parameters"]["Parameters"]["commands"] + + def test_invokes_transparency_module(self, commands): + assert any( + "python -m alpha_engine_lib.transparency" in cmd for cmd in commands + ) + + def test_passes_cadence_weekly(self, commands): + joined = " ".join(commands) + assert "--cadence weekly" in joined + + def test_passes_alert_flag(self, commands): + joined = " ".join(commands) + assert "--alert" in joined, ( + "Without --alert, row-level failures emit metrics but no SNS. " + "Removing this flag silently degrades the gate." + ) + + def test_runs_on_dashboard_ec2(self, commands): + # The dispatcher EC2 has the lib installed; confirm we cd there. + joined = " ".join(commands) + assert "alpha-engine-dashboard" in joined + + def test_pulls_latest_dashboard_main_before_running(self, commands): + # Stale repo on the dispatcher would run an outdated lib pin. + joined = " ".join(commands) + assert "git" in joined and "pull" in joined + + +class TestResultPathIsolation: + """The substrate state must not stomp on the freshness state's result.""" + + def test_distinct_result_paths(self, states): + sat_path = states["SaturdayHealthCheck"]["ResultPath"] + sub_path = states["WeeklySubstrateHealthCheck"]["ResultPath"] + assert sat_path != sub_path, ( + "Both states use ssm:sendCommand and need separate ResultPath " + "fields so the wait states can resolve the right CommandId." + ) + + def test_wait_state_reads_substrate_command_id(self, states): + params = states["WaitForWeeklySubstrateHealthCheck"]["Parameters"] + # SF Parameters use ``CommandId.$`` (the dot-dollar suffix marks + # the value as a JSONPath reference rather than a literal). + cmd_id = params["CommandId.$"] + assert "substrate_check_result" in cmd_id, ( + "WaitForWeeklySubstrateHealthCheck must poll the substrate " + "command, not the freshness command." + )