From b90bd868fd004397880d15d4dc48e8bab939f1d7 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Wed, 6 May 2026 19:02:23 -0700 Subject: [PATCH] feat(sf): add DailySubstrateHealthCheck state at end of weekday EOD SF MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the Saturday-SF WeeklySubstrateHealthCheck (PR #175) into the weekday EOD SF, running ``python -m alpha_engine_lib.transparency --cadence daily --alert`` on the dashboard EC2 between EODReconcile success and StopTradingInstance. Closes the Phase 2 → 3 gap where rows 4/5/6 of transparency_inventory (lineage, risk_events, residual_pct) only got checked once per week despite emitting daily — a bad emission Mon-Thu would otherwise sit undetected until Saturday's run. The same per-row CloudWatch alarms from PR #176 cover daily emissions (SubstrateRowOK metric is cadence-agnostic). No new alarms needed. Both new states are non-blocking (Catch routes to StopTradingInstance) so a substrate-check infra failure can never leave the trading EC2 running overnight (cost-guard requirement). Refactors update_eod_pipeline_sf.sh to read the SF definition from infrastructure/step_function_eod.json instead of an inline heredoc, matching the deploy_step_function.sh pattern for the Saturday SF. The JSON file is now the single source of truth; wiring tests pin its contents. Eliminates the two-staleness-vectors antipattern that had the heredoc and the JSON file diverging silently. 17 new wiring tests pin chain ordering, Catch semantics, command shape (--cadence daily, --alert, dashboard EC2, git pull before run), ResultPath isolation, and instance targeting (dashboard EC2 not trading EC2). 519 total passing. Requires: alpha-engine PR adding ec2_instance_id to the daemon's EOD SF input (DailySubstrateHealthCheck targets \$.ec2_instance_id, which the daemon trigger now populates with the dashboard EC2 instance id). Co-Authored-By: Claude Opus 4.7 (1M context) --- infrastructure/step_function_eod.json | 270 ++++++++++---- infrastructure/update_eod_pipeline_sf.sh | 367 +------------------- tests/test_sf_eod_substrate_check_wiring.py | 208 +++++++++++ 3 files changed, 435 insertions(+), 410 deletions(-) create mode 100644 tests/test_sf_eod_substrate_check_wiring.py diff --git a/infrastructure/step_function_eod.json b/infrastructure/step_function_eod.json index 4716a7e..7f93481 100644 --- a/infrastructure/step_function_eod.json +++ b/infrastructure/step_function_eod.json @@ -1,27 +1,27 @@ { - "Comment": "Alpha Engine EOD Pipeline — post-market data capture, reconciliation, instance shutdown. Triggered by daemon shutdown on trading EC2.", + "Comment": "Alpha Engine EOD Pipeline — post-market data capture, snapshot capture, reconciliation, daily substrate health check, instance shutdown. Triggered by daemon shutdown on trading EC2.", "StartAt": "PostMarketData", "States": { - "PostMarketData": { "Type": "Task", - "Comment": "Capture today's closing prices from polygon + append to ArcticDB (runs on micro EC2)", + "Comment": "Capture today's closing prices via yfinance + append to ArcticDB (runs on ae-trading). Hard-fails on non-zero exit via pipefail so tee does not mask python failures.", "Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand", "Parameters": { "DocumentName": "AWS-RunShellScript", - "InstanceIds.$": "$.ec2_instance_id", + "InstanceIds.$": "$.trading_instance_id", "Parameters": { "commands": [ + "set -o pipefail", "cd /home/ec2-user/alpha-engine-data", "set -a && source /home/ec2-user/.alpha-engine.env && set +a", "source .venv/bin/activate", "python weekly_collector.py --daily 2>&1 | tee /var/log/postmarket-data.log" ], - "executionTimeout": ["180"] + "executionTimeout": ["1200"] }, - "TimeoutSeconds": 180 + "TimeoutSeconds": 1200 }, - "TimeoutSeconds": 240, + "TimeoutSeconds": 1260, "Retry": [ { "ErrorEquals": ["States.TaskFailed"], @@ -33,21 +33,20 @@ "Catch": [ { "ErrorEquals": ["States.ALL"], - "Comment": "PostMarketData failure is non-blocking — EOD can still use IB Gateway prices as fallback", - "Next": "EODReconcile", - "ResultPath": "$.postmarket_error" + "Comment": "Hard-fail: EOD reconcile requires fresh ArcticDB closes (price_cache raises RuntimeError on miss). Don't paper over a PostMarketData failure.", + "Next": "HandleFailure", + "ResultPath": "$.error" } ], "ResultPath": "$.postmarket_result", "Next": "WaitForPostMarketData" }, - "WaitForPostMarketData": { "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation", "Parameters": { "CommandId.$": "$.postmarket_result.Command.CommandId", - "InstanceId.$": "$.ec2_instance_id[0]" + "InstanceId.$": "$.trading_instance_id[0]" }, "Retry": [ { @@ -60,51 +59,147 @@ "Catch": [ { "ErrorEquals": ["States.ALL"], - "Next": "EODReconcile", - "ResultPath": "$.postmarket_error" + "Next": "HandleFailure", + "ResultPath": "$.error" } ], "ResultPath": "$.postmarket_poll", "Next": "CheckPostMarketStatus" }, - "CheckPostMarketStatus": { "Type": "Choice", + "Comment": "Default routes to HandleFailure so any unexpected SSM status (Failed, Cancelled, TimedOut, ...) is surfaced, not silently swallowed.", "Choices": [ - { - "Variable": "$.postmarket_poll.Status", - "StringEquals": "Success", - "Next": "EODReconcile" + { "Variable": "$.postmarket_poll.Status", "StringEquals": "Success", "Next": "CaptureSnapshot" }, + { "Variable": "$.postmarket_poll.Status", "StringEquals": "InProgress", "Next": "PostMarketWait" }, + { "Variable": "$.postmarket_poll.Status", "StringEquals": "Pending", "Next": "PostMarketWait" }, + { "Variable": "$.postmarket_poll.Status", "StringEquals": "Delayed", "Next": "PostMarketWait" } + ], + "Default": "PostMarketStatusError" + }, + "PostMarketStatusError": { + "Type": "Pass", + "Comment": "Choice Default path — synthesize $.error from poll result so HandleFailure's JsonToString($.error) has something to stringify.", + "Parameters": { + "source": "post_market_status", + "status.$": "$.postmarket_poll.Status", + "status_details.$": "$.postmarket_poll.StatusDetails", + "response_code.$": "$.postmarket_poll.ResponseCode", + "command_id.$": "$.postmarket_poll.CommandId", + "instance_id.$": "$.postmarket_poll.InstanceId" + }, + "ResultPath": "$.error", + "Next": "HandleFailure" + }, + "PostMarketWait": { + "Type": "Wait", + "Seconds": 15, + "Next": "WaitForPostMarketData" + }, + "CaptureSnapshot": { + "Type": "Task", + "Comment": "Phase 2 of EOD-SF cutover: capture live IB state once at end-of-day and persist to s3://...trades/snapshots/{run_date}.json. EODReconcile then reads the snapshot instead of querying live IB. Decouples capture from reconciliation so the row keyed by run_date=X sources from observations made at time X.", + "Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand", + "Parameters": { + "DocumentName": "AWS-RunShellScript", + "InstanceIds.$": "$.trading_instance_id", + "Parameters": { + "commands": [ + "set -o pipefail", + "cd /home/ec2-user/alpha-engine", + "set -a && source /home/ec2-user/.alpha-engine.env && set +a", + "source .venv/bin/activate", + "python executor/snapshot_capturer.py 2>&1 | tee /var/log/snapshot.log" + ], + "executionTimeout": ["120"] }, + "TimeoutSeconds": 120 + }, + "TimeoutSeconds": 180, + "Retry": [ { - "Variable": "$.postmarket_poll.Status", - "StringEquals": "InProgress", - "Next": "PostMarketWait" - }, + "ErrorEquals": ["States.TaskFailed"], + "MaxAttempts": 1, + "IntervalSeconds": 30, + "BackoffRate": 1.0 + } + ], + "Catch": [ { - "Variable": "$.postmarket_poll.Status", - "StringEquals": "Pending", - "Next": "PostMarketWait" + "ErrorEquals": ["States.ALL"], + "Comment": "Hard-fail: EODReconcile depends on this snapshot. No silent fallback to live IB — that's the whole point of Phase 2.", + "Next": "HandleFailure", + "ResultPath": "$.error" } ], - "Default": "EODReconcile" + "ResultPath": "$.snapshot_result", + "Next": "WaitForCaptureSnapshot" }, - - "PostMarketWait": { + "WaitForCaptureSnapshot": { + "Type": "Task", + "Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation", + "Parameters": { + "CommandId.$": "$.snapshot_result.Command.CommandId", + "InstanceId.$": "$.trading_instance_id[0]" + }, + "Retry": [ + { + "ErrorEquals": ["Ssm.InvocationDoesNotExistException"], + "MaxAttempts": 10, + "IntervalSeconds": 10, + "BackoffRate": 1.5 + } + ], + "Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Next": "HandleFailure", + "ResultPath": "$.error" + } + ], + "ResultPath": "$.snapshot_poll", + "Next": "CheckSnapshotStatus" + }, + "CheckSnapshotStatus": { + "Type": "Choice", + "Comment": "Default routes to HandleFailure so any unexpected SSM status is surfaced, not silently swallowed.", + "Choices": [ + { "Variable": "$.snapshot_poll.Status", "StringEquals": "Success", "Next": "EODReconcile" }, + { "Variable": "$.snapshot_poll.Status", "StringEquals": "InProgress", "Next": "SnapshotWait" }, + { "Variable": "$.snapshot_poll.Status", "StringEquals": "Pending", "Next": "SnapshotWait" }, + { "Variable": "$.snapshot_poll.Status", "StringEquals": "Delayed", "Next": "SnapshotWait" } + ], + "Default": "SnapshotStatusError" + }, + "SnapshotStatusError": { + "Type": "Pass", + "Comment": "Choice Default path — synthesize $.error from poll result.", + "Parameters": { + "source": "snapshot_status", + "status.$": "$.snapshot_poll.Status", + "status_details.$": "$.snapshot_poll.StatusDetails", + "response_code.$": "$.snapshot_poll.ResponseCode", + "command_id.$": "$.snapshot_poll.CommandId", + "instance_id.$": "$.snapshot_poll.InstanceId" + }, + "ResultPath": "$.error", + "Next": "HandleFailure" + }, + "SnapshotWait": { "Type": "Wait", - "Seconds": 15, - "Next": "WaitForPostMarketData" + "Seconds": 5, + "Next": "WaitForCaptureSnapshot" }, - "EODReconcile": { "Type": "Task", - "Comment": "Run EOD reconciliation on trading EC2 — reads closes from S3, computes P&L, sends email", + "Comment": "Run EOD reconciliation on trading EC2 — reads closes from ArcticDB + state from S3 snapshot (no live IB), computes P&L, sends email. pipefail surfaces python crashes that tee would otherwise mask.", "Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand", "Parameters": { "DocumentName": "AWS-RunShellScript", "InstanceIds.$": "$.trading_instance_id", "Parameters": { "commands": [ + "set -o pipefail", "cd /home/ec2-user/alpha-engine", "set -a && source /home/ec2-user/.alpha-engine.env && set +a", "source .venv/bin/activate", @@ -126,14 +221,13 @@ "Catch": [ { "ErrorEquals": ["States.ALL"], - "Next": "StopTradingInstance", - "ResultPath": "$.eod_error" + "Next": "HandleFailure", + "ResultPath": "$.error" } ], "ResultPath": "$.eod_result", "Next": "WaitForEOD" }, - "WaitForEOD": { "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation", @@ -152,42 +246,101 @@ "Catch": [ { "ErrorEquals": ["States.ALL"], - "Next": "StopTradingInstance", - "ResultPath": "$.eod_error" + "Next": "HandleFailure", + "ResultPath": "$.error" } ], "ResultPath": "$.eod_poll", "Next": "CheckEODStatus" }, - "CheckEODStatus": { "Type": "Choice", + "Comment": "Default routes to HandleFailure so any unexpected SSM status (Failed, Cancelled, TimedOut, ...) is surfaced, not silently swallowed.", "Choices": [ - { - "Variable": "$.eod_poll.Status", - "StringEquals": "Success", - "Next": "StopTradingInstance" - }, - { - "Variable": "$.eod_poll.Status", - "StringEquals": "InProgress", - "Next": "EODWait" - }, - { - "Variable": "$.eod_poll.Status", - "StringEquals": "Pending", - "Next": "EODWait" - } + { "Variable": "$.eod_poll.Status", "StringEquals": "Success", "Next": "DailySubstrateHealthCheck" }, + { "Variable": "$.eod_poll.Status", "StringEquals": "InProgress", "Next": "EODWait" }, + { "Variable": "$.eod_poll.Status", "StringEquals": "Pending", "Next": "EODWait" }, + { "Variable": "$.eod_poll.Status", "StringEquals": "Delayed", "Next": "EODWait" } ], - "Default": "StopTradingInstance" + "Default": "EODStatusError" + }, + "EODStatusError": { + "Type": "Pass", + "Comment": "Choice Default path — synthesize $.error from poll result so HandleFailure's JsonToString($.error) has something to stringify.", + "Parameters": { + "source": "eod_status", + "status.$": "$.eod_poll.Status", + "status_details.$": "$.eod_poll.StatusDetails", + "response_code.$": "$.eod_poll.ResponseCode", + "command_id.$": "$.eod_poll.CommandId", + "instance_id.$": "$.eod_poll.InstanceId" + }, + "ResultPath": "$.error", + "Next": "HandleFailure" }, - "EODWait": { "Type": "Wait", "Seconds": 10, "Next": "WaitForEOD" }, - + "DailySubstrateHealthCheck": { + "Type": "Task", + "Comment": "Phase 2 → 3 transparency-substrate health check, daily cadence. Mirrors the Sat SF WeeklySubstrateHealthCheck but runs only the daily-cadence rows of transparency_inventory.yaml (lineage, risk_events, residual_pct). Closes the gap where daily-emitting rows would otherwise only get checked once per week. Per-row CloudWatch metrics emit to AlphaEngine/Substrate; the same alarms PR #176 created cover both cadences (SubstrateRowOK metric is cadence-agnostic). Non-blocking (alerts on failure but does not halt pipeline) — same pattern as WeeklySubstrateHealthCheck. Runs on dashboard EC2 (the SF dispatcher) where alpha-engine-dashboard + lib pin are installed.", + "Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand", + "Parameters": { + "DocumentName": "AWS-RunShellScript", + "InstanceIds.$": "$.ec2_instance_id", + "Parameters": { + "commands": [ + "set -eo pipefail", + "sudo -u ec2-user git -C /home/ec2-user/alpha-engine-dashboard pull --ff-only origin main", + "cd /home/ec2-user/alpha-engine-dashboard", + "source .venv/bin/activate", + "pip install --quiet --upgrade -r requirements.txt", + "python -m alpha_engine_lib.transparency --cadence daily --alert 2>&1 | tee /var/log/substrate-health-check-daily.log" + ], + "executionTimeout": ["180"] + }, + "TimeoutSeconds": 180 + }, + "TimeoutSeconds": 240, + "Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Comment": "Substrate check failure is non-blocking — continue to StopTradingInstance. Per-row CloudWatch alarms catch row-level failures; this Catch only fires on infra/SSM failure (e.g. dashboard EC2 unreachable). Cost-guard requirement: trading EC2 must always stop, regardless of substrate-check outcome.", + "Next": "StopTradingInstance", + "ResultPath": "$.substrate_check_error" + } + ], + "ResultPath": "$.substrate_check_result", + "Next": "WaitForDailySubstrateHealthCheck" + }, + "WaitForDailySubstrateHealthCheck": { + "Type": "Task", + "Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation", + "Parameters": { + "CommandId.$": "$.substrate_check_result.Command.CommandId", + "InstanceId.$": "$.ec2_instance_id[0]" + }, + "Retry": [ + { + "ErrorEquals": ["Ssm.InvocationDoesNotExistException"], + "MaxAttempts": 5, + "IntervalSeconds": 5, + "BackoffRate": 1.5 + } + ], + "Catch": [ + { + "ErrorEquals": ["States.ALL"], + "Comment": "Non-blocking — continue to StopTradingInstance even if polling fails. Row-level alarms still page on failure regardless of polling outcome.", + "Next": "StopTradingInstance", + "ResultPath": "$.substrate_check_error" + } + ], + "ResultPath": "$.substrate_check_poll", + "Next": "StopTradingInstance" + }, "StopTradingInstance": { "Type": "Task", "Comment": "Stop trading EC2 instance after EOD completes", @@ -213,7 +366,6 @@ "ResultPath": "$.stop_result", "End": true }, - "HandleFailure": { "Type": "Task", "Comment": "Failure alert via SNS — instance still stops to avoid cost", @@ -226,7 +378,6 @@ "ResultPath": "$.failure_notify", "Next": "ForceStopInstance" }, - "ForceStopInstance": { "Type": "Task", "Comment": "Always stop trading instance even on failure — avoid cost overrun", @@ -237,7 +388,6 @@ "ResultPath": "$.force_stop_result", "Next": "FailExecution" }, - "FailExecution": { "Type": "Fail", "Error": "EODPipelineFailure", diff --git a/infrastructure/update_eod_pipeline_sf.sh b/infrastructure/update_eod_pipeline_sf.sh index baf0a2a..e5ce120 100755 --- a/infrastructure/update_eod_pipeline_sf.sh +++ b/infrastructure/update_eod_pipeline_sf.sh @@ -1,10 +1,11 @@ #!/usr/bin/env bash -# update_eod_pipeline_sf.sh — Apply the Phase 2 EOD-SF update. +# update_eod_pipeline_sf.sh — Apply the canonical EOD pipeline SF definition. # -# Inserts the CaptureSnapshot step between PostMarketData and -# EODReconcile in alpha-engine-eod-pipeline. Companion code in -# alpha-engine repo: executor/snapshot_capturer.py + the eod_reconcile -# refactor that reads from S3 instead of live IB. +# Reads the state-machine definition from +# infrastructure/step_function_eod.json (single source of truth, same +# pattern as deploy_step_function.sh for the Saturday SF) and applies +# it to alpha-engine-eod-pipeline. The JSON file is the authoritative +# definition — wiring tests pin its contents. # # Idempotent: re-running with the same definition is a no-op (AWS only # bumps the revision when the definition actually changes). @@ -14,364 +15,30 @@ set -euo pipefail +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +DEFN_FILE="$SCRIPT_DIR/step_function_eod.json" + REGION="${AWS_REGION:-us-east-1}" ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text --region "$REGION") SM_ARN="arn:aws:states:${REGION}:${ACCOUNT_ID}:stateMachine:alpha-engine-eod-pipeline" echo "=== Alpha Engine EOD Pipeline — SF Definition Update ===" -echo " Region: $REGION" +echo " Region: $REGION" echo " State machine: $SM_ARN" +echo " Definition: $DEFN_FILE" echo "" -DEFN=$(cat <<'JSON' -{ - "Comment": "Alpha Engine EOD Pipeline — post-market data capture, snapshot capture, reconciliation, instance shutdown. Triggered by daemon shutdown on trading EC2.", - "StartAt": "PostMarketData", - "States": { - "PostMarketData": { - "Type": "Task", - "Comment": "Capture today's closing prices via yfinance + append to ArcticDB (runs on ae-trading). Hard-fails on non-zero exit via pipefail so tee does not mask python failures.", - "Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand", - "Parameters": { - "DocumentName": "AWS-RunShellScript", - "InstanceIds.$": "$.trading_instance_id", - "Parameters": { - "commands": [ - "set -o pipefail", - "cd /home/ec2-user/alpha-engine-data", - "set -a && source /home/ec2-user/.alpha-engine.env && set +a", - "source .venv/bin/activate", - "python weekly_collector.py --daily 2>&1 | tee /var/log/postmarket-data.log" - ], - "executionTimeout": ["1200"] - }, - "TimeoutSeconds": 1200 - }, - "TimeoutSeconds": 1260, - "Retry": [ - { - "ErrorEquals": ["States.TaskFailed"], - "MaxAttempts": 1, - "IntervalSeconds": 30, - "BackoffRate": 1.0 - } - ], - "Catch": [ - { - "ErrorEquals": ["States.ALL"], - "Comment": "Hard-fail: EOD reconcile requires fresh ArcticDB closes (price_cache raises RuntimeError on miss). Don't paper over a PostMarketData failure.", - "Next": "HandleFailure", - "ResultPath": "$.error" - } - ], - "ResultPath": "$.postmarket_result", - "Next": "WaitForPostMarketData" - }, - "WaitForPostMarketData": { - "Type": "Task", - "Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation", - "Parameters": { - "CommandId.$": "$.postmarket_result.Command.CommandId", - "InstanceId.$": "$.trading_instance_id[0]" - }, - "Retry": [ - { - "ErrorEquals": ["Ssm.InvocationDoesNotExistException"], - "MaxAttempts": 10, - "IntervalSeconds": 10, - "BackoffRate": 1.5 - } - ], - "Catch": [ - { - "ErrorEquals": ["States.ALL"], - "Next": "HandleFailure", - "ResultPath": "$.error" - } - ], - "ResultPath": "$.postmarket_poll", - "Next": "CheckPostMarketStatus" - }, - "CheckPostMarketStatus": { - "Type": "Choice", - "Comment": "Default routes to HandleFailure so any unexpected SSM status (Failed, Cancelled, TimedOut, ...) is surfaced, not silently swallowed.", - "Choices": [ - { "Variable": "$.postmarket_poll.Status", "StringEquals": "Success", "Next": "CaptureSnapshot" }, - { "Variable": "$.postmarket_poll.Status", "StringEquals": "InProgress", "Next": "PostMarketWait" }, - { "Variable": "$.postmarket_poll.Status", "StringEquals": "Pending", "Next": "PostMarketWait" }, - { "Variable": "$.postmarket_poll.Status", "StringEquals": "Delayed", "Next": "PostMarketWait" } - ], - "Default": "PostMarketStatusError" - }, - "PostMarketStatusError": { - "Type": "Pass", - "Comment": "Choice Default path — synthesize $.error from poll result so HandleFailure's JsonToString($.error) has something to stringify.", - "Parameters": { - "source": "post_market_status", - "status.$": "$.postmarket_poll.Status", - "status_details.$": "$.postmarket_poll.StatusDetails", - "response_code.$": "$.postmarket_poll.ResponseCode", - "command_id.$": "$.postmarket_poll.CommandId", - "instance_id.$": "$.postmarket_poll.InstanceId" - }, - "ResultPath": "$.error", - "Next": "HandleFailure" - }, - "PostMarketWait": { - "Type": "Wait", - "Seconds": 15, - "Next": "WaitForPostMarketData" - }, - "CaptureSnapshot": { - "Type": "Task", - "Comment": "Phase 2 of EOD-SF cutover: capture live IB state once at end-of-day and persist to s3://...trades/snapshots/{run_date}.json. EODReconcile then reads the snapshot instead of querying live IB. Decouples capture from reconciliation so the row keyed by run_date=X sources from observations made at time X.", - "Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand", - "Parameters": { - "DocumentName": "AWS-RunShellScript", - "InstanceIds.$": "$.trading_instance_id", - "Parameters": { - "commands": [ - "set -o pipefail", - "cd /home/ec2-user/alpha-engine", - "set -a && source /home/ec2-user/.alpha-engine.env && set +a", - "source .venv/bin/activate", - "python executor/snapshot_capturer.py 2>&1 | tee /var/log/snapshot.log" - ], - "executionTimeout": ["120"] - }, - "TimeoutSeconds": 120 - }, - "TimeoutSeconds": 180, - "Retry": [ - { - "ErrorEquals": ["States.TaskFailed"], - "MaxAttempts": 1, - "IntervalSeconds": 30, - "BackoffRate": 1.0 - } - ], - "Catch": [ - { - "ErrorEquals": ["States.ALL"], - "Comment": "Hard-fail: EODReconcile depends on this snapshot. No silent fallback to live IB — that's the whole point of Phase 2.", - "Next": "HandleFailure", - "ResultPath": "$.error" - } - ], - "ResultPath": "$.snapshot_result", - "Next": "WaitForCaptureSnapshot" - }, - "WaitForCaptureSnapshot": { - "Type": "Task", - "Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation", - "Parameters": { - "CommandId.$": "$.snapshot_result.Command.CommandId", - "InstanceId.$": "$.trading_instance_id[0]" - }, - "Retry": [ - { - "ErrorEquals": ["Ssm.InvocationDoesNotExistException"], - "MaxAttempts": 10, - "IntervalSeconds": 10, - "BackoffRate": 1.5 - } - ], - "Catch": [ - { - "ErrorEquals": ["States.ALL"], - "Next": "HandleFailure", - "ResultPath": "$.error" - } - ], - "ResultPath": "$.snapshot_poll", - "Next": "CheckSnapshotStatus" - }, - "CheckSnapshotStatus": { - "Type": "Choice", - "Comment": "Default routes to HandleFailure so any unexpected SSM status is surfaced, not silently swallowed.", - "Choices": [ - { "Variable": "$.snapshot_poll.Status", "StringEquals": "Success", "Next": "EODReconcile" }, - { "Variable": "$.snapshot_poll.Status", "StringEquals": "InProgress", "Next": "SnapshotWait" }, - { "Variable": "$.snapshot_poll.Status", "StringEquals": "Pending", "Next": "SnapshotWait" }, - { "Variable": "$.snapshot_poll.Status", "StringEquals": "Delayed", "Next": "SnapshotWait" } - ], - "Default": "SnapshotStatusError" - }, - "SnapshotStatusError": { - "Type": "Pass", - "Comment": "Choice Default path — synthesize $.error from poll result.", - "Parameters": { - "source": "snapshot_status", - "status.$": "$.snapshot_poll.Status", - "status_details.$": "$.snapshot_poll.StatusDetails", - "response_code.$": "$.snapshot_poll.ResponseCode", - "command_id.$": "$.snapshot_poll.CommandId", - "instance_id.$": "$.snapshot_poll.InstanceId" - }, - "ResultPath": "$.error", - "Next": "HandleFailure" - }, - "SnapshotWait": { - "Type": "Wait", - "Seconds": 5, - "Next": "WaitForCaptureSnapshot" - }, - "EODReconcile": { - "Type": "Task", - "Comment": "Run EOD reconciliation on trading EC2 — reads closes from ArcticDB + state from S3 snapshot (no live IB), computes P&L, sends email. pipefail surfaces python crashes that tee would otherwise mask.", - "Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand", - "Parameters": { - "DocumentName": "AWS-RunShellScript", - "InstanceIds.$": "$.trading_instance_id", - "Parameters": { - "commands": [ - "set -o pipefail", - "cd /home/ec2-user/alpha-engine", - "set -a && source /home/ec2-user/.alpha-engine.env && set +a", - "source .venv/bin/activate", - "python executor/eod_reconcile.py 2>&1 | tee /var/log/eod.log" - ], - "executionTimeout": ["120"] - }, - "TimeoutSeconds": 120 - }, - "TimeoutSeconds": 180, - "Retry": [ - { - "ErrorEquals": ["States.TaskFailed"], - "MaxAttempts": 1, - "IntervalSeconds": 30, - "BackoffRate": 1.0 - } - ], - "Catch": [ - { - "ErrorEquals": ["States.ALL"], - "Next": "HandleFailure", - "ResultPath": "$.error" - } - ], - "ResultPath": "$.eod_result", - "Next": "WaitForEOD" - }, - "WaitForEOD": { - "Type": "Task", - "Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation", - "Parameters": { - "CommandId.$": "$.eod_result.Command.CommandId", - "InstanceId.$": "$.trading_instance_id[0]" - }, - "Retry": [ - { - "ErrorEquals": ["Ssm.InvocationDoesNotExistException"], - "MaxAttempts": 10, - "IntervalSeconds": 10, - "BackoffRate": 1.5 - } - ], - "Catch": [ - { - "ErrorEquals": ["States.ALL"], - "Next": "HandleFailure", - "ResultPath": "$.error" - } - ], - "ResultPath": "$.eod_poll", - "Next": "CheckEODStatus" - }, - "CheckEODStatus": { - "Type": "Choice", - "Comment": "Default routes to HandleFailure so any unexpected SSM status (Failed, Cancelled, TimedOut, ...) is surfaced, not silently swallowed.", - "Choices": [ - { "Variable": "$.eod_poll.Status", "StringEquals": "Success", "Next": "StopTradingInstance" }, - { "Variable": "$.eod_poll.Status", "StringEquals": "InProgress", "Next": "EODWait" }, - { "Variable": "$.eod_poll.Status", "StringEquals": "Pending", "Next": "EODWait" }, - { "Variable": "$.eod_poll.Status", "StringEquals": "Delayed", "Next": "EODWait" } - ], - "Default": "EODStatusError" - }, - "EODStatusError": { - "Type": "Pass", - "Comment": "Choice Default path — synthesize $.error from poll result so HandleFailure's JsonToString($.error) has something to stringify.", - "Parameters": { - "source": "eod_status", - "status.$": "$.eod_poll.Status", - "status_details.$": "$.eod_poll.StatusDetails", - "response_code.$": "$.eod_poll.ResponseCode", - "command_id.$": "$.eod_poll.CommandId", - "instance_id.$": "$.eod_poll.InstanceId" - }, - "ResultPath": "$.error", - "Next": "HandleFailure" - }, - "EODWait": { - "Type": "Wait", - "Seconds": 10, - "Next": "WaitForEOD" - }, - "StopTradingInstance": { - "Type": "Task", - "Comment": "Stop trading EC2 instance after EOD completes", - "Resource": "arn:aws:states:::aws-sdk:ec2:stopInstances", - "Parameters": { - "InstanceIds.$": "$.trading_instance_id" - }, - "Retry": [ - { - "ErrorEquals": ["States.TaskFailed"], - "MaxAttempts": 2, - "IntervalSeconds": 30, - "BackoffRate": 1.0 - } - ], - "Catch": [ - { - "ErrorEquals": ["States.ALL"], - "Next": "HandleFailure", - "ResultPath": "$.error" - } - ], - "ResultPath": "$.stop_result", - "End": true - }, - "HandleFailure": { - "Type": "Task", - "Comment": "Failure alert via SNS — instance still stops to avoid cost", - "Resource": "arn:aws:states:::sns:publish", - "Parameters": { - "TopicArn.$": "$.sns_topic_arn", - "Subject": "Alpha Engine EOD Pipeline — FAILED", - "Message.$": "States.Format('EOD pipeline failed. Error: {}', States.JsonToString($.error))" - }, - "ResultPath": "$.failure_notify", - "Next": "ForceStopInstance" - }, - "ForceStopInstance": { - "Type": "Task", - "Comment": "Always stop trading instance even on failure — avoid cost overrun", - "Resource": "arn:aws:states:::aws-sdk:ec2:stopInstances", - "Parameters": { - "InstanceIds.$": "$.trading_instance_id" - }, - "ResultPath": "$.force_stop_result", - "Next": "FailExecution" - }, - "FailExecution": { - "Type": "Fail", - "Error": "EODPipelineFailure", - "Cause": "EOD pipeline failed — trading instance stopped, check logs." - } - } -} -JSON -) +if [ ! -f "$DEFN_FILE" ]; then + echo "ERROR: $DEFN_FILE not found" >&2 + exit 1 +fi # Validate JSON before sending it to AWS. -echo "$DEFN" | python3 -c "import json,sys; json.loads(sys.stdin.read()); print(' Definition: JSON valid')" +python3 -c "import json,sys; json.load(open(sys.argv[1])); print(' Definition: JSON valid')" "$DEFN_FILE" aws stepfunctions update-state-machine \ --state-machine-arn "$SM_ARN" \ - --definition "$DEFN" \ + --definition "file://$DEFN_FILE" \ --region "$REGION" > /dev/null echo " State machine: definition updated" diff --git a/tests/test_sf_eod_substrate_check_wiring.py b/tests/test_sf_eod_substrate_check_wiring.py new file mode 100644 index 0000000..d5f65c2 --- /dev/null +++ b/tests/test_sf_eod_substrate_check_wiring.py @@ -0,0 +1,208 @@ +"""Pins the Phase 2 → 3 substrate-health-check wiring in the EOD SF. + +Mirrors ``test_sf_substrate_check_wiring.py`` (the Saturday SF version). +The new states ``DailySubstrateHealthCheck`` and +``WaitForDailySubstrateHealthCheck`` chain off the success path of +``CheckEODStatus`` and run the row-driven +``alpha_engine_lib.transparency`` checker on the dashboard EC2 with +``--cadence daily``. + +The Saturday SF runs ``--cadence weekly`` which sweeps weekly + daily +rows. The weekday EOD SF runs ``--cadence daily`` so daily-emitting +rows (lineage, risk_events, residual_pct) get checked on the day they +land — without this, a bad emission Mon-Thu wouldn't surface until +Saturday's run. + +Catches regressions like: +- Someone reroutes ``CheckEODStatus`` Success back to ``StopTradingInstance`` + and silently drops the substrate check. +- Someone removes the substrate state thinking the weekly check is + enough (it isn't — that's the gap this state closes). +- Someone flips the substrate Catch into a hard-fail and starts halting + EOD shutdown on row-level failure (per-row alarms own paging — the + Catch is for SSM/infra failures only). Worse: a hard-fail Catch could + prevent ``StopTradingInstance`` from running, leaving the trading EC2 + up overnight (cost overrun). +- Someone targets the trading EC2 instead of the dashboard EC2 (the + trading EC2 doesn't have the dashboard repo or lib pin installed). +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + + +_REPO_ROOT = Path(__file__).resolve().parent.parent +_SF_PATH = _REPO_ROOT / "infrastructure" / "step_function_eod.json" + + +@pytest.fixture(scope="module") +def sf() -> dict: + return json.loads(_SF_PATH.read_text()) + + +@pytest.fixture(scope="module") +def states(sf) -> dict: + return sf["States"] + + +class TestStatePresence: + """Both new states must exist and chain after the existing EOD reconcile.""" + + def test_daily_substrate_check_state_exists(self, states): + assert "DailySubstrateHealthCheck" in states + + def test_wait_for_daily_substrate_check_exists(self, states): + assert "WaitForDailySubstrateHealthCheck" in states + + +class TestChainOrdering: + """Wiring goes: CheckEODStatus → Substrate → WaitForSubstrate → StopTradingInstance.""" + + def test_check_eod_status_success_routes_to_substrate(self, states): + choices = states["CheckEODStatus"]["Choices"] + success_choice = next( + (c for c in choices if c.get("StringEquals") == "Success"), None + ) + assert success_choice is not None, "CheckEODStatus must have a Success branch" + assert success_choice["Next"] == "DailySubstrateHealthCheck", ( + "CheckEODStatus Success must hand off to the substrate check, " + "not skip directly to StopTradingInstance." + ) + + def test_substrate_check_routes_to_wait_state(self, states): + assert states["DailySubstrateHealthCheck"]["Next"] == ( + "WaitForDailySubstrateHealthCheck" + ) + + def test_wait_for_substrate_routes_to_stop_trading_instance(self, states): + assert states["WaitForDailySubstrateHealthCheck"]["Next"] == "StopTradingInstance" + + +class TestCatchSemantics: + """Substrate failures must NOT halt EOD shutdown. + + Cost-guard requirement: trading EC2 must always stop, regardless of + substrate-check outcome. Per-row CloudWatch alarms own paging on + row-level failures; the SF Catch only fires on infra-level failures + (SSM unreachable, EC2 down). Either way, the failure path must + terminate at StopTradingInstance, not HandleFailure. + """ + + def test_substrate_check_catch_continues_to_stop(self, states): + catches = states["DailySubstrateHealthCheck"]["Catch"] + assert len(catches) >= 1 + for c in catches: + assert c["Next"] == "StopTradingInstance", ( + f"Substrate Catch must continue to StopTradingInstance, not " + f"{c['Next']!r} — letting the trading EC2 run overnight on a " + f"substrate-check infra failure is a cost regression." + ) + + def test_substrate_wait_catch_continues_to_stop(self, states): + catches = states["WaitForDailySubstrateHealthCheck"]["Catch"] + assert len(catches) >= 1 + for c in catches: + assert c["Next"] == "StopTradingInstance" + + +class TestCommandShape: + """The SSM command must invoke the lib CLI with --cadence daily --alert. + + Drops here would silently neuter the check (e.g. dropping --alert + suppresses SNS without changing exit code; dropping --cadence flips + to argparse error; flipping to --cadence weekly would re-check the + same rows the Sat SF already covers and miss nothing new). + """ + + @pytest.fixture + def commands(self, states) -> list[str]: + return states["DailySubstrateHealthCheck"]["Parameters"]["Parameters"]["commands"] + + def test_invokes_transparency_module(self, commands): + assert any( + "python -m alpha_engine_lib.transparency" in cmd for cmd in commands + ) + + def test_passes_cadence_daily(self, commands): + joined = " ".join(commands) + assert "--cadence daily" in joined, ( + "Daily SF must run --cadence daily; --cadence weekly would " + "duplicate the Sat SF coverage and miss the daily-only rows." + ) + + def test_passes_alert_flag(self, commands): + joined = " ".join(commands) + assert "--alert" in joined, ( + "Without --alert, row-level failures emit metrics but no SNS. " + "Removing this flag silently degrades the gate." + ) + + def test_runs_on_dashboard_ec2(self, commands): + # The dispatcher EC2 has the lib installed; confirm we cd there. + joined = " ".join(commands) + assert "alpha-engine-dashboard" in joined + + def test_pulls_latest_dashboard_main_before_running(self, commands): + # Stale repo on the dispatcher would run an outdated lib pin. + joined = " ".join(commands) + assert "git" in joined and "pull" in joined + + +class TestInstanceTargeting: + """The substrate state targets the dashboard EC2 (ec2_instance_id), + not the trading EC2 (trading_instance_id). + + The trading EC2 doesn't have alpha-engine-dashboard or the lib pin + installed; targeting it would fail at the cd step. The dashboard + EC2 is the SF dispatcher with the lib installed (same place the + Sat SF runs WeeklySubstrateHealthCheck). + """ + + def test_substrate_check_targets_dashboard_ec2(self, states): + params = states["DailySubstrateHealthCheck"]["Parameters"] + assert params["InstanceIds.$"] == "$.ec2_instance_id", ( + "Substrate check must target $.ec2_instance_id (dashboard EC2), " + "not $.trading_instance_id (trading EC2 lacks the dashboard repo)." + ) + + def test_wait_for_substrate_polls_dashboard_ec2(self, states): + params = states["WaitForDailySubstrateHealthCheck"]["Parameters"] + assert params["InstanceId.$"] == "$.ec2_instance_id[0]" + + +class TestResultPathIsolation: + """The substrate state must not stomp on the EOD reconcile result.""" + + def test_distinct_result_paths(self, states): + eod_path = states["EODReconcile"]["ResultPath"] + sub_path = states["DailySubstrateHealthCheck"]["ResultPath"] + assert eod_path != sub_path, ( + "Both states use ssm:sendCommand and need separate ResultPath " + "fields so the wait states can resolve the right CommandId." + ) + + def test_wait_state_reads_substrate_command_id(self, states): + params = states["WaitForDailySubstrateHealthCheck"]["Parameters"] + # SF Parameters use ``CommandId.$`` (the dot-dollar suffix marks + # the value as a JSONPath reference rather than a literal). + cmd_id = params["CommandId.$"] + assert "substrate_check_result" in cmd_id, ( + "WaitForDailySubstrateHealthCheck must poll the substrate " + "command, not the EOD reconcile command." + ) + + +class TestStopTradingInstanceUnchanged: + """StopTradingInstance must remain a terminal state — no rewiring + that defers it past the substrate check or makes it conditional.""" + + def test_stop_trading_instance_is_terminal(self, states): + # End=True means this state ends the execution (success path). + assert states["StopTradingInstance"].get("End") is True, ( + "StopTradingInstance must remain a terminal End=true state on " + "the success path — anything else is a cost-overrun risk." + )