diff --git a/infrastructure/spot_data_weekly.sh b/infrastructure/spot_data_weekly.sh index 4018ae9..b3e3f09 100755 --- a/infrastructure/spot_data_weekly.sh +++ b/infrastructure/spot_data_weekly.sh @@ -97,6 +97,39 @@ AMI_ID="ami-0c421724a94bba6d6" # Amazon Linux 2023 x86_64 # plus pip install + preflight. If the workload legitimately needs longer, # bump this — don't silently rely on the orphan reaper. MAX_RUNTIME_SECONDS="${MAX_RUNTIME_SECONDS:-5400}" +# ── Spot-interruption resilience (2026-05-30 incident) ────────────────────── +# The Saturday SF DataPhase1 failed when this run's nested spot +# (i-02e498e018441751f, c5.large/us-east-1a) was reclaimed by AWS *mid- +# workload* with spot-request status `instance-terminated-no-capacity`. +# The lib launcher (alpha_engine_lib.ec2_spot) already rotates +# instance_type × subnet on *acquisition* InsufficientInstanceCapacity, +# but nothing relaunched after a *mid-run* reclamation — the workload +# SSM command returned ResponseCode -1 (lost instance), the orchestrator +# exited 1, and the whole weekly pipeline failed. +# +# on_exit (the EXIT trap below) now classifies the failure: a *confirmed +# spot interruption* (no-capacity / price / capacity-oversubscribed +# reclamation, or all-combinations-exhausted launch) relaunches a fresh +# spot up to MAX_SPOT_ATTEMPTS. A *genuine workload error* (the inner +# script raised — instance still fulfilled) is NOT retried and fails +# loud per the fail-fast posture: blind retry would mask a real bug. +# SPOT_ATTEMPT is threaded across re-execs via the env. +# +# Default 2 (i.e. one relaunch). The binding constraint is the OUTER SSM +# executionTimeout the Saturday SF sets on the orchestrator invocation +# (DataPhase1 / MorningEnrich = 5400s, RAGIngestion = 3600s). A phase1 +# relaunch worst-cases at ~65 min (boot ~7 + killed-mid-run, then boot ~7 +# + full ~35), which fits 5400s; a SECOND relaunch (~107 min) would blow +# that outer timeout and the orchestrator command would be killed mid- +# attempt. So raising MAX_SPOT_ATTEMPTS REQUIRES raising the matching SF +# executionTimeout in alpha-engine-data/infrastructure/step_function.json +# in lockstep — otherwise the extra attempts are dead budget. +MAX_SPOT_ATTEMPTS="${MAX_SPOT_ATTEMPTS:-2}" +SPOT_ATTEMPT="${SPOT_ATTEMPT:-1}" +# Backoff before a relaunch so a transient regional capacity dip has a +# moment to clear (the launcher rotates AZ/type anyway, but a brief pause +# materially raises the odds the next attempt lands). +SPOT_RETRY_BACKOFF_SECONDS="${SPOT_RETRY_BACKOFF_SECONDS:-20}" # Key-pair name kept ONLY for compatibility with # alpha_engine_lib.ec2_spot's --key-name flag — the spot still launches # with this key associated, but NOTHING in this script SSH's into the @@ -148,6 +181,11 @@ RUN_MODE="full" # vs RAG) runs; --preflight-only only swaps "preflight + work" for # "preflight + exit 0". PREFLIGHT_ONLY=0 +# Preserve the verbatim CLI args so the spot-interruption retry can +# self-re-exec this script with an identical mode/flag set on a fresh +# spot (see the on_exit retry trap below). Captured BEFORE the parse +# loop's `shift`s consume them. +ORIG_ARGS=("$@") while [[ $# -gt 0 ]]; do case "$1" in --smoke-only) RUN_MODE="smoke-only"; shift ;; @@ -178,6 +216,7 @@ echo " AMI : $AMI_ID" echo " Region : $AWS_REGION" echo " Branch : $BRANCH" echo " Run mode : $RUN_MODE" +echo " Spot attempt : $SPOT_ATTEMPT/$MAX_SPOT_ATTEMPTS (relaunch on confirmed spot interruption)" echo " Preflight-only: $PREFLIGHT_ONLY (1 = boot + preflight + exit 0, NO fetch/write)" echo " S3 bucket : $S3_BUCKET" echo " Transport : SSM via lib chokepoint (python -m alpha_engine_lib.ssm_dispatcher)" @@ -211,6 +250,94 @@ fi # immediately. Replaces the 2026-05-22 broken-by-design hardcoded # single-subnet + single-instance-type pattern that failed Evaluator's # launch when us-east-1f ran out of c5.large. +# ── Cleanup + spot-interruption retry trap ────────────────────────────────── +# Installed BEFORE the launch so it uniformly covers BOTH an all- +# combinations-exhausted launch (ec2_spot rc 64) AND a mid-run +# reclamation. INSTANCE_ID / S3_STAGING are read at trap-FIRE time, so +# they pick up the values assigned after a successful launch below; both +# default empty so cleanup is a no-op if we never got that far. +INSTANCE_ID="" +S3_STAGING="" + +cleanup() { + if [ -n "$INSTANCE_ID" ]; then + echo "" + echo "==> Terminating spot instance $INSTANCE_ID..." + aws ec2 terminate-instances --instance-ids "$INSTANCE_ID" --region "$AWS_REGION" --output text > /dev/null 2>&1 || true + fi + [ -n "$S3_STAGING" ] && aws s3 rm "$S3_STAGING" --recursive --quiet 2>/dev/null || true + [ -n "$INSTANCE_ID" ] && echo " Instance terminated; S3 staging cleaned." + return 0 +} + +# Echoes a non-empty reason string + returns 0 when the just-failed run +# was a CONFIRMED spot interruption — either the launcher exhausted every +# instance_type × subnet (ec2_spot rc 64) or AWS reclaimed the running +# spot (no-capacity / by-price / capacity-oversubscribed). A GENUINE +# inner-workload failure leaves the instance fulfilled/running and +# returns 1 → NOT retryable, fails loud per the fail-fast posture (blind +# retry would mask a real collector/prune bug). +_spot_failure_reason() { + local rc="$1" + # Launch-time exhaustion across all combinations (ec2_spot exit 64). + if [ "$rc" -eq 64 ]; then echo "launch-capacity-exhausted"; return 0; fi + # Nothing launched and not a clean exit → unclassifiable, treat hard. + [ -z "$INSTANCE_ID" ] && return 1 + # Authoritative signal: the spot REQUEST status code. + local sir_code + sir_code=$(aws ec2 describe-spot-instance-requests \ + --filters "Name=instance-id,Values=$INSTANCE_ID" \ + --query 'SpotInstanceRequests[0].Status.Code' \ + --output text --region "$AWS_REGION" 2>/dev/null || echo "") + case "$sir_code" in + instance-terminated-no-capacity|instance-terminated-by-price|instance-terminated-capacity-oversubscribed|instance-stopped-no-capacity|instance-stopped-by-price|instance-stopped-capacity-oversubscribed|marked-for-termination) + echo "$sir_code"; return 0 ;; + esac + # Fallback: the instance's own StateReason (spot reclamation surfaces + # as Server.SpotInstanceTermination; capacity events as + # Server.InsufficientInstanceCapacity). + local state_reason + state_reason=$(aws ec2 describe-instances --instance-ids "$INSTANCE_ID" \ + --query 'Reservations[].Instances[].StateReason.Code' \ + --output text --region "$AWS_REGION" 2>/dev/null || echo "") + case "$state_reason" in + Server.SpotInstanceTermination|Server.InsufficientInstanceCapacity) + echo "$state_reason"; return 0 ;; + esac + return 1 +} + +on_exit() { + local rc=$? + # Classify BEFORE cleanup() terminates the instance, while the spot + # request status is still queryable. + local reason="" + if [ "$rc" -ne 0 ]; then + reason="$(_spot_failure_reason "$rc")" || reason="" + fi + cleanup + if [ "$rc" -ne 0 ] && [ -n "$reason" ] && [ "$SPOT_ATTEMPT" -lt "$MAX_SPOT_ATTEMPTS" ]; then + # Record the absorbed interruption on a named CloudWatch surface + # (fail-loud posture: the retry is observable, never silent). + aws cloudwatch put-metric-data \ + --namespace "AlphaEngine" \ + --metric-name "SpotInterruptionRetry" \ + --dimensions "Process=data-weekly" \ + --value 1 --unit "Count" \ + --region "$AWS_REGION" 2>/dev/null || true + echo "" >&2 + echo "==> Spot interruption (reason=$reason) on attempt $SPOT_ATTEMPT/$MAX_SPOT_ATTEMPTS — relaunching a fresh spot in ${SPOT_RETRY_BACKOFF_SECONDS}s..." >&2 + sleep "$SPOT_RETRY_BACKOFF_SECONDS" + trap - EXIT + SPOT_ATTEMPT=$((SPOT_ATTEMPT + 1)) exec bash "$0" ${ORIG_ARGS[@]+"${ORIG_ARGS[@]}"} + fi + if [ "$rc" -ne 0 ] && [ -n "$reason" ]; then + echo "ERROR: spot interruption (reason=$reason) persisted across all $MAX_SPOT_ATTEMPTS attempt(s) — giving up. The weekly pipeline fails loud; redrive once spot capacity returns." >&2 + fi + exit "$rc" +} +trap on_exit EXIT + echo "==> Requesting spot instance (lib CLI rotation: types=[$INSTANCE_TYPES], subnets=[$SUBNETS])..." INSTANCE_ID=$("$LIB_PYTHON" -m alpha_engine_lib.ec2_spot launch \ @@ -234,19 +361,12 @@ echo " Instance ID: $INSTANCE_ID" RUN_ID="$(date +%Y%m%dT%H%M%SZ)-${INSTANCE_ID}" S3_STAGING_PREFIX="tmp/spot_data_weekly/${RUN_ID}" +# S3_STAGING is consumed by cleanup() (declared with the on_exit trap +# above); assigning it here arms staging-prefix removal now that the +# launch succeeded. (S3 lifecycle on tmp/ is the belt-and-suspenders if +# the trap never fires.) S3_STAGING="s3://${S3_BUCKET}/${S3_STAGING_PREFIX}" -# Cleanup — always terminate the instance + remove the S3 staging prefix. -# (S3 lifecycle on tmp/ is the belt-and-suspenders if the trap never fires.) -cleanup() { - echo "" - echo "==> Terminating spot instance $INSTANCE_ID..." - aws ec2 terminate-instances --instance-ids "$INSTANCE_ID" --region "$AWS_REGION" --output text > /dev/null 2>&1 || true - aws s3 rm "$S3_STAGING" --recursive --quiet 2>/dev/null || true - echo " Instance terminated; S3 staging cleaned." -} -trap cleanup EXIT - echo "==> Waiting for instance to enter running state..." aws ec2 wait instance-running --instance-ids "$INSTANCE_ID" --region "$AWS_REGION" diff --git a/tests/test_spot_data_weekly_interruption_retry.py b/tests/test_spot_data_weekly_interruption_retry.py new file mode 100644 index 0000000..039074e --- /dev/null +++ b/tests/test_spot_data_weekly_interruption_retry.py @@ -0,0 +1,167 @@ +"""Pins the mid-run spot-interruption retry in +infrastructure/spot_data_weekly.sh. + +Origin: 2026-05-30 Saturday SF DataPhase1 failure. The nested data spot +(i-02e498e018441751f, c5.large/us-east-1a) was reclaimed by AWS *mid- +workload* with spot-request status `instance-terminated-no-capacity`. +The lib launcher (alpha_engine_lib.ec2_spot) rotates instance_type × +subnet on *acquisition* capacity errors, but nothing relaunched after a +*mid-run* reclamation — the workload SSM command returned ResponseCode +-1, the orchestrator exited 1, and the entire weekly pipeline failed. + +The fix adds an EXIT trap (`on_exit`) that classifies the failure: a +CONFIRMED spot interruption (no-capacity / price / capacity- +oversubscribed reclamation, or all-combinations-exhausted launch) +relaunches a fresh spot up to MAX_SPOT_ATTEMPTS; a GENUINE workload +error is NOT retried and fails loud (blind retry would mask a real bug). + +These are static greps (the script only runs end-to-end on a real spot) +mirroring tests/test_spot_data_weekly_run_modes.py — they catch the +regression class where the retry is removed, un-gated from the +interruption-classification, or made to retry genuine workload errors. +""" + +from __future__ import annotations + +import re +from pathlib import Path + +import pytest + +_REPO_ROOT = Path(__file__).resolve().parent.parent +_SCRIPT = _REPO_ROOT / "infrastructure" / "spot_data_weekly.sh" + + +@pytest.fixture(scope="module") +def text() -> str: + return _SCRIPT.read_text() + + +class TestRetryConfig: + def test_max_attempts_env_overridable(self, text): + assert 'MAX_SPOT_ATTEMPTS="${MAX_SPOT_ATTEMPTS:-2}"' in text, ( + "MAX_SPOT_ATTEMPTS must default to 2 (one relaunch) and be env-" + "overridable; raising it requires bumping the SF executionTimeout." + ) + + def test_attempt_counter_threaded_via_env(self, text): + assert 'SPOT_ATTEMPT="${SPOT_ATTEMPT:-1}"' in text, ( + "SPOT_ATTEMPT must be env-threaded so a re-exec knows its attempt #." + ) + + def test_orig_args_captured_before_parse(self, text): + """ORIG_ARGS must be captured BEFORE the arg-parse while-loop's + shifts consume the positional params, so the re-exec can replay + the identical mode/flags.""" + cap = text.index('ORIG_ARGS=("$@")') + parse = text.index("while [[ $# -gt 0 ]]; do") + assert cap < parse, "ORIG_ARGS must be captured before the parse loop." + + +class TestTrapInstalledBeforeLaunch: + def test_on_exit_trap_installed(self, text): + assert "trap on_exit EXIT" in text, "on_exit must be the EXIT trap." + + def test_trap_precedes_launch(self, text): + """The trap must be armed BEFORE the ec2_spot launch so it also + covers an all-combinations-exhausted launch (rc 64), not only a + mid-run reclamation.""" + trap_at = text.index("trap on_exit EXIT") + launch_at = text.index("alpha_engine_lib.ec2_spot launch") + assert trap_at < launch_at, ( + "trap on_exit EXIT must be installed before the spot launch." + ) + + +class TestInterruptionClassification: + @pytest.mark.parametrize( + "code", + [ + "instance-terminated-no-capacity", + "instance-terminated-by-price", + "instance-terminated-capacity-oversubscribed", + "instance-stopped-no-capacity", + "marked-for-termination", + ], + ) + def test_spot_request_status_codes_classified(self, text, code): + assert code in text, ( + f"spot-request status {code!r} must be classified as a retryable " + "interruption in _spot_failure_reason." + ) + + def test_launch_exhaustion_rc64_retryable(self, text): + assert re.search(r'\[ "\$rc" -eq 64 \]', text), ( + "ec2_spot rc 64 (all instance_type × subnet exhausted) must be " + "treated as a retryable capacity interruption." + ) + + def test_instance_statereason_fallback(self, text): + assert "Server.SpotInstanceTermination" in text, ( + "Instance StateReason fallback must recognize spot reclamation." + ) + + def test_classifier_queries_spot_request_before_terminate(self, text): + """Classification must read the spot-request status; the comment + + call order ensure it happens before cleanup() terminates.""" + assert "describe-spot-instance-requests" in text + # on_exit computes `reason` before calling cleanup. + on_exit = text[text.index("on_exit() {"):] + reason_at = on_exit.index("_spot_failure_reason") + cleanup_at = on_exit.index("\n cleanup") + assert reason_at < cleanup_at, ( + "Failure must be classified BEFORE cleanup() terminates the " + "instance (the spot-request status is only queryable while it lives)." + ) + + +class TestFailLoudOnGenuineError: + def test_retry_gated_on_nonempty_reason(self, text): + """The relaunch must be gated on a non-empty interruption reason — + a genuine workload failure (empty reason) must NOT retry.""" + assert re.search( + r'\[ "\$rc" -ne 0 \] && \[ -n "\$reason" \] && ' + r'\[ "\$SPOT_ATTEMPT" -lt "\$MAX_SPOT_ATTEMPTS" \]', + text, + ), ( + "Relaunch must require rc!=0 AND a confirmed interruption reason " + "AND attempts remaining — genuine workload errors fail loud." + ) + + def test_exhausted_attempts_fail_loud(self, text): + assert "persisted across all $MAX_SPOT_ATTEMPTS attempt(s)" in text, ( + "When interruption persists across all attempts the script must " + "surface a loud ERROR and propagate the non-zero exit." + ) + + def test_original_exit_code_propagated(self, text): + on_exit = text[text.index("on_exit() {"):] + assert 'exit "$rc"' in on_exit, ( + "on_exit must propagate the original failure code, not mask it." + ) + + +class TestReexecAndObservability: + def test_reexec_preserves_args_and_increments_attempt(self, text): + assert ( + 'SPOT_ATTEMPT=$((SPOT_ATTEMPT + 1)) exec bash "$0" ' + '${ORIG_ARGS[@]+"${ORIG_ARGS[@]}"}' + in text + ), ( + "Relaunch must self-re-exec via `exec bash \"$0\"` with the " + "preserved ORIG_ARGS and an incremented SPOT_ATTEMPT." + ) + + def test_trap_disarmed_before_reexec(self, text): + """`trap - EXIT` must precede the exec so the replaced process does + not double-run cleanup for the already-terminated instance.""" + on_exit = text[text.index("on_exit() {"):] + disarm_at = on_exit.index("trap - EXIT") + exec_at = on_exit.index("exec bash") + assert disarm_at < exec_at + + def test_retry_emits_named_cloudwatch_metric(self, text): + assert 'metric-name "SpotInterruptionRetry"' in text, ( + "Each absorbed interruption must emit the SpotInterruptionRetry " + "CloudWatch metric — the retry is observable, never silent." + )