Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 131 additions & 11 deletions infrastructure/spot_data_weekly.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,39 @@ AMI_ID="ami-0c421724a94bba6d6" # Amazon Linux 2023 x86_64
# plus pip install + preflight. If the workload legitimately needs longer,
# bump this — don't silently rely on the orphan reaper.
MAX_RUNTIME_SECONDS="${MAX_RUNTIME_SECONDS:-5400}"
# ── Spot-interruption resilience (2026-05-30 incident) ──────────────────────
# The Saturday SF DataPhase1 failed when this run's nested spot
# (i-02e498e018441751f, c5.large/us-east-1a) was reclaimed by AWS *mid-
# workload* with spot-request status `instance-terminated-no-capacity`.
# The lib launcher (alpha_engine_lib.ec2_spot) already rotates
# instance_type × subnet on *acquisition* InsufficientInstanceCapacity,
# but nothing relaunched after a *mid-run* reclamation — the workload
# SSM command returned ResponseCode -1 (lost instance), the orchestrator
# exited 1, and the whole weekly pipeline failed.
#
# on_exit (the EXIT trap below) now classifies the failure: a *confirmed
# spot interruption* (no-capacity / price / capacity-oversubscribed
# reclamation, or all-combinations-exhausted launch) relaunches a fresh
# spot up to MAX_SPOT_ATTEMPTS. A *genuine workload error* (the inner
# script raised — instance still fulfilled) is NOT retried and fails
# loud per the fail-fast posture: blind retry would mask a real bug.
# SPOT_ATTEMPT is threaded across re-execs via the env.
#
# Default 2 (i.e. one relaunch). The binding constraint is the OUTER SSM
# executionTimeout the Saturday SF sets on the orchestrator invocation
# (DataPhase1 / MorningEnrich = 5400s, RAGIngestion = 3600s). A phase1
# relaunch worst-cases at ~65 min (boot ~7 + killed-mid-run, then boot ~7
# + full ~35), which fits 5400s; a SECOND relaunch (~107 min) would blow
# that outer timeout and the orchestrator command would be killed mid-
# attempt. So raising MAX_SPOT_ATTEMPTS REQUIRES raising the matching SF
# executionTimeout in alpha-engine-data/infrastructure/step_function.json
# in lockstep — otherwise the extra attempts are dead budget.
MAX_SPOT_ATTEMPTS="${MAX_SPOT_ATTEMPTS:-2}"
SPOT_ATTEMPT="${SPOT_ATTEMPT:-1}"
# Backoff before a relaunch so a transient regional capacity dip has a
# moment to clear (the launcher rotates AZ/type anyway, but a brief pause
# materially raises the odds the next attempt lands).
SPOT_RETRY_BACKOFF_SECONDS="${SPOT_RETRY_BACKOFF_SECONDS:-20}"
# Key-pair name kept ONLY for compatibility with
# alpha_engine_lib.ec2_spot's --key-name flag — the spot still launches
# with this key associated, but NOTHING in this script SSH's into the
Expand Down Expand Up @@ -148,6 +181,11 @@ RUN_MODE="full"
# vs RAG) runs; --preflight-only only swaps "preflight + work" for
# "preflight + exit 0".
PREFLIGHT_ONLY=0
# Preserve the verbatim CLI args so the spot-interruption retry can
# self-re-exec this script with an identical mode/flag set on a fresh
# spot (see the on_exit retry trap below). Captured BEFORE the parse
# loop's `shift`s consume them.
ORIG_ARGS=("$@")
while [[ $# -gt 0 ]]; do
case "$1" in
--smoke-only) RUN_MODE="smoke-only"; shift ;;
Expand Down Expand Up @@ -178,6 +216,7 @@ echo " AMI : $AMI_ID"
echo " Region : $AWS_REGION"
echo " Branch : $BRANCH"
echo " Run mode : $RUN_MODE"
echo " Spot attempt : $SPOT_ATTEMPT/$MAX_SPOT_ATTEMPTS (relaunch on confirmed spot interruption)"
echo " Preflight-only: $PREFLIGHT_ONLY (1 = boot + preflight + exit 0, NO fetch/write)"
echo " S3 bucket : $S3_BUCKET"
echo " Transport : SSM via lib chokepoint (python -m alpha_engine_lib.ssm_dispatcher)"
Expand Down Expand Up @@ -211,6 +250,94 @@ fi
# immediately. Replaces the 2026-05-22 broken-by-design hardcoded
# single-subnet + single-instance-type pattern that failed Evaluator's
# launch when us-east-1f ran out of c5.large.
# ── Cleanup + spot-interruption retry trap ──────────────────────────────────
# Installed BEFORE the launch so it uniformly covers BOTH an all-
# combinations-exhausted launch (ec2_spot rc 64) AND a mid-run
# reclamation. INSTANCE_ID / S3_STAGING are read at trap-FIRE time, so
# they pick up the values assigned after a successful launch below; both
# default empty so cleanup is a no-op if we never got that far.
INSTANCE_ID=""
S3_STAGING=""

cleanup() {
if [ -n "$INSTANCE_ID" ]; then
echo ""
echo "==> Terminating spot instance $INSTANCE_ID..."
aws ec2 terminate-instances --instance-ids "$INSTANCE_ID" --region "$AWS_REGION" --output text > /dev/null 2>&1 || true
fi
[ -n "$S3_STAGING" ] && aws s3 rm "$S3_STAGING" --recursive --quiet 2>/dev/null || true
[ -n "$INSTANCE_ID" ] && echo " Instance terminated; S3 staging cleaned."
return 0
}

# Echoes a non-empty reason string + returns 0 when the just-failed run
# was a CONFIRMED spot interruption — either the launcher exhausted every
# instance_type × subnet (ec2_spot rc 64) or AWS reclaimed the running
# spot (no-capacity / by-price / capacity-oversubscribed). A GENUINE
# inner-workload failure leaves the instance fulfilled/running and
# returns 1 → NOT retryable, fails loud per the fail-fast posture (blind
# retry would mask a real collector/prune bug).
_spot_failure_reason() {
local rc="$1"
# Launch-time exhaustion across all combinations (ec2_spot exit 64).
if [ "$rc" -eq 64 ]; then echo "launch-capacity-exhausted"; return 0; fi
# Nothing launched and not a clean exit → unclassifiable, treat hard.
[ -z "$INSTANCE_ID" ] && return 1
# Authoritative signal: the spot REQUEST status code.
local sir_code
sir_code=$(aws ec2 describe-spot-instance-requests \
--filters "Name=instance-id,Values=$INSTANCE_ID" \
--query 'SpotInstanceRequests[0].Status.Code' \
--output text --region "$AWS_REGION" 2>/dev/null || echo "")
case "$sir_code" in
instance-terminated-no-capacity|instance-terminated-by-price|instance-terminated-capacity-oversubscribed|instance-stopped-no-capacity|instance-stopped-by-price|instance-stopped-capacity-oversubscribed|marked-for-termination)
echo "$sir_code"; return 0 ;;
esac
# Fallback: the instance's own StateReason (spot reclamation surfaces
# as Server.SpotInstanceTermination; capacity events as
# Server.InsufficientInstanceCapacity).
local state_reason
state_reason=$(aws ec2 describe-instances --instance-ids "$INSTANCE_ID" \
--query 'Reservations[].Instances[].StateReason.Code' \
--output text --region "$AWS_REGION" 2>/dev/null || echo "")
case "$state_reason" in
Server.SpotInstanceTermination|Server.InsufficientInstanceCapacity)
echo "$state_reason"; return 0 ;;
esac
return 1
}

on_exit() {
local rc=$?
# Classify BEFORE cleanup() terminates the instance, while the spot
# request status is still queryable.
local reason=""
if [ "$rc" -ne 0 ]; then
reason="$(_spot_failure_reason "$rc")" || reason=""
fi
cleanup
if [ "$rc" -ne 0 ] && [ -n "$reason" ] && [ "$SPOT_ATTEMPT" -lt "$MAX_SPOT_ATTEMPTS" ]; then
# Record the absorbed interruption on a named CloudWatch surface
# (fail-loud posture: the retry is observable, never silent).
aws cloudwatch put-metric-data \
--namespace "AlphaEngine" \
--metric-name "SpotInterruptionRetry" \
--dimensions "Process=data-weekly" \
--value 1 --unit "Count" \
--region "$AWS_REGION" 2>/dev/null || true
echo "" >&2
echo "==> Spot interruption (reason=$reason) on attempt $SPOT_ATTEMPT/$MAX_SPOT_ATTEMPTS — relaunching a fresh spot in ${SPOT_RETRY_BACKOFF_SECONDS}s..." >&2
sleep "$SPOT_RETRY_BACKOFF_SECONDS"
trap - EXIT
SPOT_ATTEMPT=$((SPOT_ATTEMPT + 1)) exec bash "$0" ${ORIG_ARGS[@]+"${ORIG_ARGS[@]}"}
fi
if [ "$rc" -ne 0 ] && [ -n "$reason" ]; then
echo "ERROR: spot interruption (reason=$reason) persisted across all $MAX_SPOT_ATTEMPTS attempt(s) — giving up. The weekly pipeline fails loud; redrive once spot capacity returns." >&2
fi
exit "$rc"
}
trap on_exit EXIT

echo "==> Requesting spot instance (lib CLI rotation: types=[$INSTANCE_TYPES], subnets=[$SUBNETS])..."

INSTANCE_ID=$("$LIB_PYTHON" -m alpha_engine_lib.ec2_spot launch \
Expand All @@ -234,19 +361,12 @@ echo " Instance ID: $INSTANCE_ID"

RUN_ID="$(date +%Y%m%dT%H%M%SZ)-${INSTANCE_ID}"
S3_STAGING_PREFIX="tmp/spot_data_weekly/${RUN_ID}"
# S3_STAGING is consumed by cleanup() (declared with the on_exit trap
# above); assigning it here arms staging-prefix removal now that the
# launch succeeded. (S3 lifecycle on tmp/ is the belt-and-suspenders if
# the trap never fires.)
S3_STAGING="s3://${S3_BUCKET}/${S3_STAGING_PREFIX}"

# Cleanup — always terminate the instance + remove the S3 staging prefix.
# (S3 lifecycle on tmp/ is the belt-and-suspenders if the trap never fires.)
cleanup() {
echo ""
echo "==> Terminating spot instance $INSTANCE_ID..."
aws ec2 terminate-instances --instance-ids "$INSTANCE_ID" --region "$AWS_REGION" --output text > /dev/null 2>&1 || true
aws s3 rm "$S3_STAGING" --recursive --quiet 2>/dev/null || true
echo " Instance terminated; S3 staging cleaned."
}
trap cleanup EXIT

echo "==> Waiting for instance to enter running state..."
aws ec2 wait instance-running --instance-ids "$INSTANCE_ID" --region "$AWS_REGION"

Expand Down
167 changes: 167 additions & 0 deletions tests/test_spot_data_weekly_interruption_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""Pins the mid-run spot-interruption retry in
infrastructure/spot_data_weekly.sh.

Origin: 2026-05-30 Saturday SF DataPhase1 failure. The nested data spot
(i-02e498e018441751f, c5.large/us-east-1a) was reclaimed by AWS *mid-
workload* with spot-request status `instance-terminated-no-capacity`.
The lib launcher (alpha_engine_lib.ec2_spot) rotates instance_type ×
subnet on *acquisition* capacity errors, but nothing relaunched after a
*mid-run* reclamation — the workload SSM command returned ResponseCode
-1, the orchestrator exited 1, and the entire weekly pipeline failed.

The fix adds an EXIT trap (`on_exit`) that classifies the failure: a
CONFIRMED spot interruption (no-capacity / price / capacity-
oversubscribed reclamation, or all-combinations-exhausted launch)
relaunches a fresh spot up to MAX_SPOT_ATTEMPTS; a GENUINE workload
error is NOT retried and fails loud (blind retry would mask a real bug).

These are static greps (the script only runs end-to-end on a real spot)
mirroring tests/test_spot_data_weekly_run_modes.py — they catch the
regression class where the retry is removed, un-gated from the
interruption-classification, or made to retry genuine workload errors.
"""

from __future__ import annotations

import re
from pathlib import Path

import pytest

_REPO_ROOT = Path(__file__).resolve().parent.parent
_SCRIPT = _REPO_ROOT / "infrastructure" / "spot_data_weekly.sh"


@pytest.fixture(scope="module")
def text() -> str:
return _SCRIPT.read_text()


class TestRetryConfig:
def test_max_attempts_env_overridable(self, text):
assert 'MAX_SPOT_ATTEMPTS="${MAX_SPOT_ATTEMPTS:-2}"' in text, (
"MAX_SPOT_ATTEMPTS must default to 2 (one relaunch) and be env-"
"overridable; raising it requires bumping the SF executionTimeout."
)

def test_attempt_counter_threaded_via_env(self, text):
assert 'SPOT_ATTEMPT="${SPOT_ATTEMPT:-1}"' in text, (
"SPOT_ATTEMPT must be env-threaded so a re-exec knows its attempt #."
)

def test_orig_args_captured_before_parse(self, text):
"""ORIG_ARGS must be captured BEFORE the arg-parse while-loop's
shifts consume the positional params, so the re-exec can replay
the identical mode/flags."""
cap = text.index('ORIG_ARGS=("$@")')
parse = text.index("while [[ $# -gt 0 ]]; do")
assert cap < parse, "ORIG_ARGS must be captured before the parse loop."


class TestTrapInstalledBeforeLaunch:
def test_on_exit_trap_installed(self, text):
assert "trap on_exit EXIT" in text, "on_exit must be the EXIT trap."

def test_trap_precedes_launch(self, text):
"""The trap must be armed BEFORE the ec2_spot launch so it also
covers an all-combinations-exhausted launch (rc 64), not only a
mid-run reclamation."""
trap_at = text.index("trap on_exit EXIT")
launch_at = text.index("alpha_engine_lib.ec2_spot launch")
assert trap_at < launch_at, (
"trap on_exit EXIT must be installed before the spot launch."
)


class TestInterruptionClassification:
@pytest.mark.parametrize(
"code",
[
"instance-terminated-no-capacity",
"instance-terminated-by-price",
"instance-terminated-capacity-oversubscribed",
"instance-stopped-no-capacity",
"marked-for-termination",
],
)
def test_spot_request_status_codes_classified(self, text, code):
assert code in text, (
f"spot-request status {code!r} must be classified as a retryable "
"interruption in _spot_failure_reason."
)

def test_launch_exhaustion_rc64_retryable(self, text):
assert re.search(r'\[ "\$rc" -eq 64 \]', text), (
"ec2_spot rc 64 (all instance_type × subnet exhausted) must be "
"treated as a retryable capacity interruption."
)

def test_instance_statereason_fallback(self, text):
assert "Server.SpotInstanceTermination" in text, (
"Instance StateReason fallback must recognize spot reclamation."
)

def test_classifier_queries_spot_request_before_terminate(self, text):
"""Classification must read the spot-request status; the comment +
call order ensure it happens before cleanup() terminates."""
assert "describe-spot-instance-requests" in text
# on_exit computes `reason` before calling cleanup.
on_exit = text[text.index("on_exit() {"):]
reason_at = on_exit.index("_spot_failure_reason")
cleanup_at = on_exit.index("\n cleanup")
assert reason_at < cleanup_at, (
"Failure must be classified BEFORE cleanup() terminates the "
"instance (the spot-request status is only queryable while it lives)."
)


class TestFailLoudOnGenuineError:
def test_retry_gated_on_nonempty_reason(self, text):
"""The relaunch must be gated on a non-empty interruption reason —
a genuine workload failure (empty reason) must NOT retry."""
assert re.search(
r'\[ "\$rc" -ne 0 \] && \[ -n "\$reason" \] && '
r'\[ "\$SPOT_ATTEMPT" -lt "\$MAX_SPOT_ATTEMPTS" \]',
text,
), (
"Relaunch must require rc!=0 AND a confirmed interruption reason "
"AND attempts remaining — genuine workload errors fail loud."
)

def test_exhausted_attempts_fail_loud(self, text):
assert "persisted across all $MAX_SPOT_ATTEMPTS attempt(s)" in text, (
"When interruption persists across all attempts the script must "
"surface a loud ERROR and propagate the non-zero exit."
)

def test_original_exit_code_propagated(self, text):
on_exit = text[text.index("on_exit() {"):]
assert 'exit "$rc"' in on_exit, (
"on_exit must propagate the original failure code, not mask it."
)


class TestReexecAndObservability:
def test_reexec_preserves_args_and_increments_attempt(self, text):
assert (
'SPOT_ATTEMPT=$((SPOT_ATTEMPT + 1)) exec bash "$0" '
'${ORIG_ARGS[@]+"${ORIG_ARGS[@]}"}'
in text
), (
"Relaunch must self-re-exec via `exec bash \"$0\"` with the "
"preserved ORIG_ARGS and an incremented SPOT_ATTEMPT."
)

def test_trap_disarmed_before_reexec(self, text):
"""`trap - EXIT` must precede the exec so the replaced process does
not double-run cleanup for the already-terminated instance."""
on_exit = text[text.index("on_exit() {"):]
disarm_at = on_exit.index("trap - EXIT")
exec_at = on_exit.index("exec bash")
assert disarm_at < exec_at

def test_retry_emits_named_cloudwatch_metric(self, text):
assert 'metric-name "SpotInterruptionRetry"' in text, (
"Each absorbed interruption must emit the SpotInterruptionRetry "
"CloudWatch metric — the retry is observable, never silent."
)
Loading