Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 210 additions & 60 deletions infrastructure/step_function_eod.json
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
{
"Comment": "Alpha Engine EOD Pipeline — post-market data capture, reconciliation, instance shutdown. Triggered by daemon shutdown on trading EC2.",
"Comment": "Alpha Engine EOD Pipeline — post-market data capture, snapshot capture, reconciliation, daily substrate health check, instance shutdown. Triggered by daemon shutdown on trading EC2.",
"StartAt": "PostMarketData",
"States": {

"PostMarketData": {
"Type": "Task",
"Comment": "Capture today's closing prices from polygon + append to ArcticDB (runs on micro EC2)",
"Comment": "Capture today's closing prices via yfinance + append to ArcticDB (runs on ae-trading). Hard-fails on non-zero exit via pipefail so tee does not mask python failures.",
"Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand",
"Parameters": {
"DocumentName": "AWS-RunShellScript",
"InstanceIds.$": "$.ec2_instance_id",
"InstanceIds.$": "$.trading_instance_id",
"Parameters": {
"commands": [
"set -o pipefail",
"cd /home/ec2-user/alpha-engine-data",
"set -a && source /home/ec2-user/.alpha-engine.env && set +a",
"source .venv/bin/activate",
"python weekly_collector.py --daily 2>&1 | tee /var/log/postmarket-data.log"
],
"executionTimeout": ["180"]
"executionTimeout": ["1200"]
},
"TimeoutSeconds": 180
"TimeoutSeconds": 1200
},
"TimeoutSeconds": 240,
"TimeoutSeconds": 1260,
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
Expand All @@ -33,21 +33,20 @@
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Comment": "PostMarketData failure is non-blocking — EOD can still use IB Gateway prices as fallback",
"Next": "EODReconcile",
"ResultPath": "$.postmarket_error"
"Comment": "Hard-fail: EOD reconcile requires fresh ArcticDB closes (price_cache raises RuntimeError on miss). Don't paper over a PostMarketData failure.",
"Next": "HandleFailure",
"ResultPath": "$.error"
}
],
"ResultPath": "$.postmarket_result",
"Next": "WaitForPostMarketData"
},

"WaitForPostMarketData": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation",
"Parameters": {
"CommandId.$": "$.postmarket_result.Command.CommandId",
"InstanceId.$": "$.ec2_instance_id[0]"
"InstanceId.$": "$.trading_instance_id[0]"
},
"Retry": [
{
Expand All @@ -60,51 +59,147 @@
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "EODReconcile",
"ResultPath": "$.postmarket_error"
"Next": "HandleFailure",
"ResultPath": "$.error"
}
],
"ResultPath": "$.postmarket_poll",
"Next": "CheckPostMarketStatus"
},

"CheckPostMarketStatus": {
"Type": "Choice",
"Comment": "Default routes to HandleFailure so any unexpected SSM status (Failed, Cancelled, TimedOut, ...) is surfaced, not silently swallowed.",
"Choices": [
{
"Variable": "$.postmarket_poll.Status",
"StringEquals": "Success",
"Next": "EODReconcile"
{ "Variable": "$.postmarket_poll.Status", "StringEquals": "Success", "Next": "CaptureSnapshot" },
{ "Variable": "$.postmarket_poll.Status", "StringEquals": "InProgress", "Next": "PostMarketWait" },
{ "Variable": "$.postmarket_poll.Status", "StringEquals": "Pending", "Next": "PostMarketWait" },
{ "Variable": "$.postmarket_poll.Status", "StringEquals": "Delayed", "Next": "PostMarketWait" }
],
"Default": "PostMarketStatusError"
},
"PostMarketStatusError": {
"Type": "Pass",
"Comment": "Choice Default path — synthesize $.error from poll result so HandleFailure's JsonToString($.error) has something to stringify.",
"Parameters": {
"source": "post_market_status",
"status.$": "$.postmarket_poll.Status",
"status_details.$": "$.postmarket_poll.StatusDetails",
"response_code.$": "$.postmarket_poll.ResponseCode",
"command_id.$": "$.postmarket_poll.CommandId",
"instance_id.$": "$.postmarket_poll.InstanceId"
},
"ResultPath": "$.error",
"Next": "HandleFailure"
},
"PostMarketWait": {
"Type": "Wait",
"Seconds": 15,
"Next": "WaitForPostMarketData"
},
"CaptureSnapshot": {
"Type": "Task",
"Comment": "Phase 2 of EOD-SF cutover: capture live IB state once at end-of-day and persist to s3://...trades/snapshots/{run_date}.json. EODReconcile then reads the snapshot instead of querying live IB. Decouples capture from reconciliation so the row keyed by run_date=X sources from observations made at time X.",
"Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand",
"Parameters": {
"DocumentName": "AWS-RunShellScript",
"InstanceIds.$": "$.trading_instance_id",
"Parameters": {
"commands": [
"set -o pipefail",
"cd /home/ec2-user/alpha-engine",
"set -a && source /home/ec2-user/.alpha-engine.env && set +a",
"source .venv/bin/activate",
"python executor/snapshot_capturer.py 2>&1 | tee /var/log/snapshot.log"
],
"executionTimeout": ["120"]
},
"TimeoutSeconds": 120
},
"TimeoutSeconds": 180,
"Retry": [
{
"Variable": "$.postmarket_poll.Status",
"StringEquals": "InProgress",
"Next": "PostMarketWait"
},
"ErrorEquals": ["States.TaskFailed"],
"MaxAttempts": 1,
"IntervalSeconds": 30,
"BackoffRate": 1.0
}
],
"Catch": [
{
"Variable": "$.postmarket_poll.Status",
"StringEquals": "Pending",
"Next": "PostMarketWait"
"ErrorEquals": ["States.ALL"],
"Comment": "Hard-fail: EODReconcile depends on this snapshot. No silent fallback to live IB — that's the whole point of Phase 2.",
"Next": "HandleFailure",
"ResultPath": "$.error"
}
],
"Default": "EODReconcile"
"ResultPath": "$.snapshot_result",
"Next": "WaitForCaptureSnapshot"
},

"PostMarketWait": {
"WaitForCaptureSnapshot": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation",
"Parameters": {
"CommandId.$": "$.snapshot_result.Command.CommandId",
"InstanceId.$": "$.trading_instance_id[0]"
},
"Retry": [
{
"ErrorEquals": ["Ssm.InvocationDoesNotExistException"],
"MaxAttempts": 10,
"IntervalSeconds": 10,
"BackoffRate": 1.5
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleFailure",
"ResultPath": "$.error"
}
],
"ResultPath": "$.snapshot_poll",
"Next": "CheckSnapshotStatus"
},
"CheckSnapshotStatus": {
"Type": "Choice",
"Comment": "Default routes to HandleFailure so any unexpected SSM status is surfaced, not silently swallowed.",
"Choices": [
{ "Variable": "$.snapshot_poll.Status", "StringEquals": "Success", "Next": "EODReconcile" },
{ "Variable": "$.snapshot_poll.Status", "StringEquals": "InProgress", "Next": "SnapshotWait" },
{ "Variable": "$.snapshot_poll.Status", "StringEquals": "Pending", "Next": "SnapshotWait" },
{ "Variable": "$.snapshot_poll.Status", "StringEquals": "Delayed", "Next": "SnapshotWait" }
],
"Default": "SnapshotStatusError"
},
"SnapshotStatusError": {
"Type": "Pass",
"Comment": "Choice Default path — synthesize $.error from poll result.",
"Parameters": {
"source": "snapshot_status",
"status.$": "$.snapshot_poll.Status",
"status_details.$": "$.snapshot_poll.StatusDetails",
"response_code.$": "$.snapshot_poll.ResponseCode",
"command_id.$": "$.snapshot_poll.CommandId",
"instance_id.$": "$.snapshot_poll.InstanceId"
},
"ResultPath": "$.error",
"Next": "HandleFailure"
},
"SnapshotWait": {
"Type": "Wait",
"Seconds": 15,
"Next": "WaitForPostMarketData"
"Seconds": 5,
"Next": "WaitForCaptureSnapshot"
},

"EODReconcile": {
"Type": "Task",
"Comment": "Run EOD reconciliation on trading EC2 — reads closes from S3, computes P&L, sends email",
"Comment": "Run EOD reconciliation on trading EC2 — reads closes from ArcticDB + state from S3 snapshot (no live IB), computes P&L, sends email. pipefail surfaces python crashes that tee would otherwise mask.",
"Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand",
"Parameters": {
"DocumentName": "AWS-RunShellScript",
"InstanceIds.$": "$.trading_instance_id",
"Parameters": {
"commands": [
"set -o pipefail",
"cd /home/ec2-user/alpha-engine",
"set -a && source /home/ec2-user/.alpha-engine.env && set +a",
"source .venv/bin/activate",
Expand All @@ -126,14 +221,13 @@
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "StopTradingInstance",
"ResultPath": "$.eod_error"
"Next": "HandleFailure",
"ResultPath": "$.error"
}
],
"ResultPath": "$.eod_result",
"Next": "WaitForEOD"
},

"WaitForEOD": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation",
Expand All @@ -152,42 +246,101 @@
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "StopTradingInstance",
"ResultPath": "$.eod_error"
"Next": "HandleFailure",
"ResultPath": "$.error"
}
],
"ResultPath": "$.eod_poll",
"Next": "CheckEODStatus"
},

"CheckEODStatus": {
"Type": "Choice",
"Comment": "Default routes to HandleFailure so any unexpected SSM status (Failed, Cancelled, TimedOut, ...) is surfaced, not silently swallowed.",
"Choices": [
{
"Variable": "$.eod_poll.Status",
"StringEquals": "Success",
"Next": "StopTradingInstance"
},
{
"Variable": "$.eod_poll.Status",
"StringEquals": "InProgress",
"Next": "EODWait"
},
{
"Variable": "$.eod_poll.Status",
"StringEquals": "Pending",
"Next": "EODWait"
}
{ "Variable": "$.eod_poll.Status", "StringEquals": "Success", "Next": "DailySubstrateHealthCheck" },
{ "Variable": "$.eod_poll.Status", "StringEquals": "InProgress", "Next": "EODWait" },
{ "Variable": "$.eod_poll.Status", "StringEquals": "Pending", "Next": "EODWait" },
{ "Variable": "$.eod_poll.Status", "StringEquals": "Delayed", "Next": "EODWait" }
],
"Default": "StopTradingInstance"
"Default": "EODStatusError"
},
"EODStatusError": {
"Type": "Pass",
"Comment": "Choice Default path — synthesize $.error from poll result so HandleFailure's JsonToString($.error) has something to stringify.",
"Parameters": {
"source": "eod_status",
"status.$": "$.eod_poll.Status",
"status_details.$": "$.eod_poll.StatusDetails",
"response_code.$": "$.eod_poll.ResponseCode",
"command_id.$": "$.eod_poll.CommandId",
"instance_id.$": "$.eod_poll.InstanceId"
},
"ResultPath": "$.error",
"Next": "HandleFailure"
},

"EODWait": {
"Type": "Wait",
"Seconds": 10,
"Next": "WaitForEOD"
},

"DailySubstrateHealthCheck": {
"Type": "Task",
"Comment": "Phase 2 → 3 transparency-substrate health check, daily cadence. Mirrors the Sat SF WeeklySubstrateHealthCheck but runs only the daily-cadence rows of transparency_inventory.yaml (lineage, risk_events, residual_pct). Closes the gap where daily-emitting rows would otherwise only get checked once per week. Per-row CloudWatch metrics emit to AlphaEngine/Substrate; the same alarms PR #176 created cover both cadences (SubstrateRowOK metric is cadence-agnostic). Non-blocking (alerts on failure but does not halt pipeline) — same pattern as WeeklySubstrateHealthCheck. Runs on dashboard EC2 (the SF dispatcher) where alpha-engine-dashboard + lib pin are installed.",
"Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand",
"Parameters": {
"DocumentName": "AWS-RunShellScript",
"InstanceIds.$": "$.ec2_instance_id",
"Parameters": {
"commands": [
"set -eo pipefail",
"sudo -u ec2-user git -C /home/ec2-user/alpha-engine-dashboard pull --ff-only origin main",
"cd /home/ec2-user/alpha-engine-dashboard",
"source .venv/bin/activate",
"pip install --quiet --upgrade -r requirements.txt",
"python -m alpha_engine_lib.transparency --cadence daily --alert 2>&1 | tee /var/log/substrate-health-check-daily.log"
],
"executionTimeout": ["180"]
},
"TimeoutSeconds": 180
},
"TimeoutSeconds": 240,
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Comment": "Substrate check failure is non-blocking — continue to StopTradingInstance. Per-row CloudWatch alarms catch row-level failures; this Catch only fires on infra/SSM failure (e.g. dashboard EC2 unreachable). Cost-guard requirement: trading EC2 must always stop, regardless of substrate-check outcome.",
"Next": "StopTradingInstance",
"ResultPath": "$.substrate_check_error"
}
],
"ResultPath": "$.substrate_check_result",
"Next": "WaitForDailySubstrateHealthCheck"
},
"WaitForDailySubstrateHealthCheck": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation",
"Parameters": {
"CommandId.$": "$.substrate_check_result.Command.CommandId",
"InstanceId.$": "$.ec2_instance_id[0]"
},
"Retry": [
{
"ErrorEquals": ["Ssm.InvocationDoesNotExistException"],
"MaxAttempts": 5,
"IntervalSeconds": 5,
"BackoffRate": 1.5
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Comment": "Non-blocking — continue to StopTradingInstance even if polling fails. Row-level alarms still page on failure regardless of polling outcome.",
"Next": "StopTradingInstance",
"ResultPath": "$.substrate_check_error"
}
],
"ResultPath": "$.substrate_check_poll",
"Next": "StopTradingInstance"
},
"StopTradingInstance": {
"Type": "Task",
"Comment": "Stop trading EC2 instance after EOD completes",
Expand All @@ -213,7 +366,6 @@
"ResultPath": "$.stop_result",
"End": true
},

"HandleFailure": {
"Type": "Task",
"Comment": "Failure alert via SNS — instance still stops to avoid cost",
Expand All @@ -226,7 +378,6 @@
"ResultPath": "$.failure_notify",
"Next": "ForceStopInstance"
},

"ForceStopInstance": {
"Type": "Task",
"Comment": "Always stop trading instance even on failure — avoid cost overrun",
Expand All @@ -237,7 +388,6 @@
"ResultPath": "$.force_stop_result",
"Next": "FailExecution"
},

"FailExecution": {
"Type": "Fail",
"Error": "EODPipelineFailure",
Expand Down
Loading
Loading