From d7845f61efc8430708a02cfdadc48e17f2e97ac9 Mon Sep 17 00:00:00 2001 From: Brian McMahon Date: Wed, 20 May 2026 06:04:31 -0700 Subject: [PATCH] feat(notifications): add sf-telegram-notifier Lambda for all 3 alpha-engine SFs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Subscribes to EventBridge `Step Functions Execution Status Change` events for the saturday / weekday / eod pipelines and forwards human-readable summaries to the alpha-engine Telegram bot via the canonical `alpha_engine_lib.telegram.send_message` primitive. Purely additive: existing SNS → email path on every SF (NotifyComplete + HandleFailure) is unchanged. New EventBridge rule subscribes to the same status changes in parallel, zero blast radius on the trade-decision pipeline. One rule covers all five status transitions in a single mechanism: RUNNING (silent — avoids 5:45 AM PT daily phone-buzz), SUCCEEDED, FAILED, TIMED_OUT, ABORTED (all push). FAILED enriches the message with the error+cause snippet via best-effort states:DescribeExecution. Telegram credentials reuse the SSM parameters already provisioned for the executor notifier.py arc (ROADMAP L1067, 2026-05-13) — no new secret material. Operator-deployed (--bootstrap + code-only update flows), not wired into CI, matching the spot-orphan-reaper / changelog-cloudwatch-mirror convention to keep the OIDC role's blast radius narrow. Tests: 12 new unit tests covering all 5 status transitions, FAILED cause fetch + fail-soft, duration formatting, unknown-SF fallback, send_message failure surfacing. Full repo suite still 1399 passed / 1 skipped. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lambdas/sf-telegram-notifier/README.md | 80 +++++++ .../lambdas/sf-telegram-notifier/deploy.sh | 214 ++++++++++++++++++ .../sf-telegram-notifier/iam-policy.json | 30 +++ .../lambdas/sf-telegram-notifier/index.py | 147 ++++++++++++ .../sf-telegram-notifier/requirements.txt | 4 + .../sf-telegram-notifier/test_handler.py | 187 +++++++++++++++ 6 files changed, 662 insertions(+) create mode 100644 infrastructure/lambdas/sf-telegram-notifier/README.md create mode 100755 infrastructure/lambdas/sf-telegram-notifier/deploy.sh create mode 100644 infrastructure/lambdas/sf-telegram-notifier/iam-policy.json create mode 100644 infrastructure/lambdas/sf-telegram-notifier/index.py create mode 100644 infrastructure/lambdas/sf-telegram-notifier/requirements.txt create mode 100644 infrastructure/lambdas/sf-telegram-notifier/test_handler.py diff --git a/infrastructure/lambdas/sf-telegram-notifier/README.md b/infrastructure/lambdas/sf-telegram-notifier/README.md new file mode 100644 index 0000000..85744cb --- /dev/null +++ b/infrastructure/lambdas/sf-telegram-notifier/README.md @@ -0,0 +1,80 @@ +# sf-telegram-notifier + +Fans EventBridge `Step Functions Execution Status Change` events for the +three Alpha Engine Step Functions into Telegram via the canonical +`alpha_engine_lib.telegram.send_message` primitive. + +**Purely additive.** The existing SNS → email path on every SF +(`NotifyComplete` success + `HandleFailure` failure branches) is unchanged. +This Lambda subscribes to a separate EventBridge rule and never touches the +SF JSON definitions. + +## Coverage + +| SF | Source ARN suffix | Pretty label | +| --- | --- | --- | +| Saturday weekly pipeline | `alpha-engine-saturday-pipeline` | `Saturday SF` | +| Weekday daily pipeline | `alpha-engine-weekday-pipeline` | `Weekday SF` | +| EOD post-market pipeline | `alpha-engine-eod-pipeline` | `EOD SF` | + +| Status | Emoji | Push? | Extra detail | +| --- | --- | --- | --- | +| `RUNNING` | 🚀 | silent | execution name only | +| `SUCCEEDED` | ✅ | loud | duration | +| `FAILED` | 🔴 | loud | duration + `error: cause` via `DescribeExecution` (best-effort, truncated at 280 chars) | +| `TIMED_OUT` | ⏰ | loud | duration | +| `ABORTED` | ⛔ | loud | duration | + +`RUNNING` is delivered silently (in-channel awareness, no phone buzz) so the +weekday SF's daily 5:45 AM PT start does not page on every trading day. + +## Architecture + +``` +SF status transition + │ + ▼ +EventBridge default bus + (aws.states / Step Functions Execution Status Change, + filtered to the 3 alpha-engine SF ARNs) + │ + ▼ +alpha-engine-sf-telegram-notifier ──► alpha_engine_lib.telegram.send_message + │ + ▼ + Telegram bot API + (alpha-engine primary bot) +``` + +Telegram credentials are resolved at runtime by the lib from SSM under +`/alpha-engine/TELEGRAM_BOT_TOKEN` + `/alpha-engine/TELEGRAM_CHAT_ID`, +which were provisioned for the executor `notifier.py` arc +(ROADMAP L1067, 2026-05-13). No new secret material is required. + +## Deploy + +```bash +# First-time bootstrap — creates IAM role, Lambda, EventBridge rule, permission +bash infrastructure/lambdas/sf-telegram-notifier/deploy.sh --bootstrap + +# Code-only update (default) +bash infrastructure/lambdas/sf-telegram-notifier/deploy.sh + +# Dry-run (validate + package, do not apply) +bash infrastructure/lambdas/sf-telegram-notifier/deploy.sh --dry-run + +# Smoke-test (invoke with a synthetic SUCCEEDED event) +bash infrastructure/lambdas/sf-telegram-notifier/deploy.sh --smoke +``` + +Auth: uses active AWS CLI creds. Personal IAM user has enough perms; +deliberately not wired into CI to keep the OIDC role's blast radius narrow, +matching the spot-orphan-reaper / changelog-cloudwatch-mirror convention. + +## IAM (inline policy) + +- `logs:CreateLogGroup/Stream + PutLogEvents` on the Lambda's own log group +- `ssm:GetParameter` on `/alpha-engine/TELEGRAM_BOT_TOKEN` + + `/alpha-engine/TELEGRAM_CHAT_ID` (no other parameters) +- `states:DescribeExecution` on `arn:aws:states:…:execution:alpha-engine-*:*` + — only used to enrich `FAILED` events with the error+cause snippet diff --git a/infrastructure/lambdas/sf-telegram-notifier/deploy.sh b/infrastructure/lambdas/sf-telegram-notifier/deploy.sh new file mode 100755 index 0000000..48406e9 --- /dev/null +++ b/infrastructure/lambdas/sf-telegram-notifier/deploy.sh @@ -0,0 +1,214 @@ +#!/usr/bin/env bash +# deploy.sh — Create or update the alpha-engine-sf-telegram-notifier Lambda +# and wire its EventBridge SF status-change trigger. +# +# This Lambda subscribes to `aws.states` / "Step Functions Execution Status +# Change" events for the three Alpha Engine SFs (saturday / weekday / eod) +# and forwards human-readable summaries to Telegram via +# `alpha_engine_lib.telegram.send_message`. Existing SNS → email path is +# unaffected. +# +# Managed outside CloudFormation — same rationale as spot-orphan-reaper + +# changelog-cloudwatch-mirror (keeps the github-actions-lambda-deploy +# OIDC role's blast radius narrow; operator-deployed only). +# +# Usage: +# bash infrastructure/lambdas/sf-telegram-notifier/deploy.sh # update code only +# bash infrastructure/lambdas/sf-telegram-notifier/deploy.sh --bootstrap # first-time create + wire EventBridge +# bash infrastructure/lambdas/sf-telegram-notifier/deploy.sh --dry-run # show actions, do not apply +# bash infrastructure/lambdas/sf-telegram-notifier/deploy.sh --smoke # invoke once with a synthetic SUCCEEDED event + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +FUNCTION_NAME="alpha-engine-sf-telegram-notifier" +ROLE_NAME="alpha-engine-sf-telegram-notifier-role" +POLICY_NAME="alpha-engine-sf-telegram-notifier-policy" +RULE_NAME="alpha-engine-sf-status-change" +REGION="${AWS_REGION:-us-east-1}" +ACCOUNT_ID="${ACCOUNT_ID:-711398986525}" + +DRY_RUN=false +BOOTSTRAP=false +SMOKE=false +for arg in "$@"; do + case "$arg" in + --dry-run) DRY_RUN=true ;; + --bootstrap) BOOTSTRAP=true ;; + --smoke) SMOKE=true ;; + -h|--help) sed -n '2,/^$/p' "$0"; exit 0 ;; + esac +done + +run() { + if $DRY_RUN; then + echo "DRY: $*" + else + "$@" + fi +} + +# ----- 0. Validate handler + run unit tests ---------------------------------- + +python3 -c " +import ast +src = open('${SCRIPT_DIR}/index.py').read() +ast.parse(src) +print('index.py syntax OK') +" + +if [[ -f "${SCRIPT_DIR}/test_handler.py" ]]; then + echo "Running handler unit tests..." + python3 -m pytest "${SCRIPT_DIR}/test_handler.py" -q +fi + +# ----- 1. Package: pip install deps + zip handler --------------------------- + +PKG=$(mktemp -d) +trap "rm -rf '$PKG'" EXIT + +echo "Installing deps into ${PKG} (pip install -t)..." +python3 -m pip install \ + --quiet \ + --target "${PKG}" \ + --upgrade \ + -r "${SCRIPT_DIR}/requirements.txt" + +cp "${SCRIPT_DIR}/index.py" "${PKG}/index.py" +ZIP="${PKG}/function.zip" +(cd "${PKG}" && zip -qr "function.zip" . -x "function.zip") +echo "Packaged ${ZIP} ($(wc -c < "${ZIP}") bytes)" + +# ----- 2. Bootstrap (first-time only) --------------------------------------- + +if $BOOTSTRAP; then + echo "Bootstrapping ${FUNCTION_NAME}..." + + TRUST_POLICY='{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"Service":"lambda.amazonaws.com"},"Action":"sts:AssumeRole"}]}' + if ! aws iam get-role --role-name "${ROLE_NAME}" --query 'Role.RoleName' --output text >/dev/null 2>&1; then + echo " Creating IAM role: ${ROLE_NAME}" + run aws iam create-role \ + --role-name "${ROLE_NAME}" \ + --assume-role-policy-document "${TRUST_POLICY}" \ + --query 'Role.RoleName' --output text + else + echo " IAM role exists: ${ROLE_NAME}" + fi + + echo " Applying inline policy: ${POLICY_NAME}" + run aws iam put-role-policy \ + --role-name "${ROLE_NAME}" \ + --policy-name "${POLICY_NAME}" \ + --policy-document "file://${SCRIPT_DIR}/iam-policy.json" + + if ! $DRY_RUN; then + echo " Waiting 10s for IAM role propagation..." + sleep 10 + fi + + ROLE_ARN="arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}" + if ! aws lambda get-function --function-name "${FUNCTION_NAME}" --query 'Configuration.FunctionName' --output text >/dev/null 2>&1; then + echo " Creating Lambda: ${FUNCTION_NAME}" + run aws lambda create-function \ + --function-name "${FUNCTION_NAME}" \ + --runtime python3.12 \ + --role "${ROLE_ARN}" \ + --handler index.handler \ + --zip-file "fileb://${ZIP}" \ + --timeout 30 \ + --memory-size 256 \ + --environment 'Variables={LOG_LEVEL=INFO}' \ + --region "${REGION}" \ + --query 'FunctionArn' --output text + else + echo " Lambda exists, code will be updated in step 3" + fi + + # EventBridge rule: Step Functions Execution Status Change for the 3 alpha-engine SFs + echo " Creating EventBridge rule: ${RULE_NAME}" + EVENT_PATTERN=$(cat </dev/null || true +fi + +# ----- 3. Update function code (always after bootstrap, idempotent) --------- + +echo "Updating Lambda function code: ${FUNCTION_NAME}" +run aws lambda update-function-code \ + --function-name "${FUNCTION_NAME}" \ + --zip-file "fileb://${ZIP}" \ + --region "${REGION}" \ + --query 'LastUpdateStatus' --output text + +if ! $DRY_RUN; then + aws lambda wait function-updated \ + --function-name "${FUNCTION_NAME}" \ + --region "${REGION}" +fi + +echo "✓ Code deployed." + +# ----- 4. Smoke (synthetic SUCCEEDED event) --------------------------------- + +if $SMOKE; then + echo "" + echo "Smoke-testing via direct invoke (synthetic SUCCEEDED event)..." + RESP=$(mktemp) + PAYLOAD=$(cat <<'EOF' +{ + "source": "aws.states", + "detail-type": "Step Functions Execution Status Change", + "detail": { + "status": "SUCCEEDED", + "stateMachineArn": "arn:aws:states:us-east-1:711398986525:stateMachine:alpha-engine-saturday-pipeline", + "executionArn": "arn:aws:states:us-east-1:711398986525:execution:alpha-engine-saturday-pipeline:smoke-test", + "name": "smoke-test", + "startDate": 0, + "stopDate": 60000 + } +} +EOF +) + aws lambda invoke \ + --function-name "${FUNCTION_NAME}" \ + --cli-binary-format raw-in-base64-out \ + --payload "${PAYLOAD}" \ + --region "${REGION}" \ + "${RESP}" >/dev/null + cat "${RESP}" + echo "" + rm -f "${RESP}" +fi diff --git a/infrastructure/lambdas/sf-telegram-notifier/iam-policy.json b/infrastructure/lambdas/sf-telegram-notifier/iam-policy.json new file mode 100644 index 0000000..06514df --- /dev/null +++ b/infrastructure/lambdas/sf-telegram-notifier/iam-policy.json @@ -0,0 +1,30 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "Logs", + "Effect": "Allow", + "Action": [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ], + "Resource": "arn:aws:logs:us-east-1:711398986525:log-group:/aws/lambda/alpha-engine-sf-telegram-notifier:*" + }, + { + "Sid": "TelegramSecretsSSM", + "Effect": "Allow", + "Action": ["ssm:GetParameter"], + "Resource": [ + "arn:aws:ssm:us-east-1:711398986525:parameter/alpha-engine/TELEGRAM_BOT_TOKEN", + "arn:aws:ssm:us-east-1:711398986525:parameter/alpha-engine/TELEGRAM_CHAT_ID" + ] + }, + { + "Sid": "DescribeExecutionForFailureCause", + "Effect": "Allow", + "Action": ["states:DescribeExecution"], + "Resource": "arn:aws:states:us-east-1:711398986525:execution:alpha-engine-*:*" + } + ] +} diff --git a/infrastructure/lambdas/sf-telegram-notifier/index.py b/infrastructure/lambdas/sf-telegram-notifier/index.py new file mode 100644 index 0000000..e1b6edd --- /dev/null +++ b/infrastructure/lambdas/sf-telegram-notifier/index.py @@ -0,0 +1,147 @@ +"""alpha-engine-sf-telegram-notifier — fan SF status changes into Telegram. + +Subscribes to EventBridge `Step Functions Execution Status Change` events for +the three Alpha Engine Step Functions (saturday / weekday / eod) and forwards +a human-readable summary to the alpha-engine Telegram bot via the canonical +``alpha_engine_lib.telegram.send_message`` primitive. + +The existing SNS → email path is unaffected: this Lambda subscribes to a new +EventBridge rule (``alpha-engine-sf-status-change``) and does not touch any +SF definition. Adding/removing Telegram coverage is a single-resource flip +with zero blast radius on the trade-decision pipeline. + +Event source: ``aws.states`` / ``Step Functions Execution Status Change`` +covers all five terminal-or-transition statuses in one rule: +RUNNING, SUCCEEDED, FAILED, TIMED_OUT, ABORTED. RUNNING fires once at +execution start; the four terminal statuses fire once each at end. RUNNING +notifications go out silent (in-channel awareness without a phone buzz) so +the weekday SF's daily 5:45 AM PT start does not buzz on every trading day; +all terminal statuses push. + +On FAILED, the handler best-effort calls ``states:DescribeExecution`` to +surface the failure cause string. The Telegram primitive never raises, so a +misconfigured bot or transient network error returns ``False`` from +``send_message`` and is logged at WARNING — the EventBridge invocation still +returns success and is not retried. +""" + +from __future__ import annotations + +import logging +import os + +import boto3 + +from alpha_engine_lib.telegram import send_message + +logger = logging.getLogger() +logger.setLevel(os.environ.get("LOG_LEVEL", "INFO")) + +REGION = os.environ.get("AWS_REGION", "us-east-1") + +_SF_LABELS: dict[str, str] = { + "alpha-engine-saturday-pipeline": "Saturday SF", + "alpha-engine-weekday-pipeline": "Weekday SF", + "alpha-engine-eod-pipeline": "EOD SF", +} + +_STATUS_EMOJI: dict[str, str] = { + "RUNNING": "\U0001f680", # 🚀 + "SUCCEEDED": "✅", # ✅ + "FAILED": "\U0001f534", # 🔴 + "TIMED_OUT": "⏰", # ⏰ + "ABORTED": "⛔", # ⛔ +} + +_CAUSE_MAX_CHARS = 280 + + +def _label_for_arn(sm_arn: str) -> str: + name = sm_arn.rsplit(":", 1)[-1] if sm_arn else "" + return _SF_LABELS.get(name, name or "Unknown SF") + + +def _format_duration(started_ms: int | None, stopped_ms: int | None) -> str: + if started_ms is None or stopped_ms is None: + return "" + secs = max(0, (int(stopped_ms) - int(started_ms)) // 1000) + h, rem = divmod(secs, 3600) + m, _ = divmod(rem, 60) + if h: + return f"{h}h {m}m" + return f"{m}m" + + +def _fetch_failure_cause(execution_arn: str) -> str: + """Best-effort fetch of error+cause via DescribeExecution. Never raises.""" + if not execution_arn: + return "" + try: + sf = boto3.client("stepfunctions", region_name=REGION) + resp = sf.describe_execution(executionArn=execution_arn) + except Exception as exc: # noqa: BLE001 — fire-and-forget enrichment + logger.warning("describe_execution failed for %s: %s", execution_arn, exc) + return "" + error = (resp.get("error") or "").strip() + cause = (resp.get("cause") or "").strip() + if error and cause: + snippet = f"{error}: {cause}" + else: + snippet = error or cause + if len(snippet) > _CAUSE_MAX_CHARS: + snippet = snippet[: _CAUSE_MAX_CHARS - 1] + "…" + return snippet + + +def _build_message(detail: dict) -> tuple[str, bool]: + """Return (text, disable_notification) for the given event detail.""" + status = detail.get("status", "UNKNOWN") + label = _label_for_arn(detail.get("stateMachineArn", "")) + emoji = _STATUS_EMOJI.get(status, "\U0001f4e8") # 📨 fallback + exec_name = detail.get("name", "") or "(unknown execution)" + + lines = [f"{emoji} *{label} — {status}*"] + + if status == "RUNNING": + lines.append(f"Execution: {exec_name}") + return "\n".join(lines), True + + duration = _format_duration(detail.get("startDate"), detail.get("stopDate")) + if duration: + lines.append(f"Duration: {duration}") + + if status == "FAILED": + cause = _fetch_failure_cause(detail.get("executionArn", "")) + if cause: + lines.append(f"Cause: {cause}") + + lines.append(f"Execution: {exec_name}") + return "\n".join(lines), False + + +def handler(event: dict, context) -> dict: # noqa: ARG001 — Lambda contract + """EventBridge handler for SF execution status changes. + + Expected event shape (CloudWatch Events / EventBridge from aws.states): + detail.status: RUNNING | SUCCEEDED | FAILED | TIMED_OUT | ABORTED + detail.stateMachineArn: SF arn + detail.executionArn: execution arn + detail.name: execution id + detail.startDate: epoch ms + detail.stopDate: epoch ms (null while RUNNING) + """ + detail = event.get("detail") or {} + status = detail.get("status", "UNKNOWN") + sm_name = (detail.get("stateMachineArn") or "").rsplit(":", 1)[-1] + logger.info("SF status change: sf=%s status=%s", sm_name, status) + + text, silent = _build_message(detail) + ok = send_message(text, disable_notification=silent) + + return { + "status": status, + "state_machine": sm_name, + "execution": detail.get("name", ""), + "telegram_sent": ok, + "silent": silent, + } diff --git a/infrastructure/lambdas/sf-telegram-notifier/requirements.txt b/infrastructure/lambdas/sf-telegram-notifier/requirements.txt new file mode 100644 index 0000000..de67064 --- /dev/null +++ b/infrastructure/lambdas/sf-telegram-notifier/requirements.txt @@ -0,0 +1,4 @@ +# Pinned to match alpha-engine-data/requirements.txt — the main data Lambda +# already runs on this version, so reusing it keeps the lib substrate +# coherent across all alpha-engine-data Lambdas. +alpha-engine-lib @ git+https://github.com/cipher813/alpha-engine-lib@v0.20.0 diff --git a/infrastructure/lambdas/sf-telegram-notifier/test_handler.py b/infrastructure/lambdas/sf-telegram-notifier/test_handler.py new file mode 100644 index 0000000..45d6c8a --- /dev/null +++ b/infrastructure/lambdas/sf-telegram-notifier/test_handler.py @@ -0,0 +1,187 @@ +"""Unit tests for sf-telegram-notifier index.handler. + +Mocks alpha_engine_lib.telegram.send_message so tests do not hit the live +Telegram API. Each test asserts the exact (text, disable_notification) tuple +the handler hands to the primitive, plus the return value shape. +""" + +from __future__ import annotations + +import sys +import types +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# Stub `alpha_engine_lib.telegram` before importing the handler so test +# environments without the lib installed (CI runners pre-pip-install) still +# pass — the handler only depends on this one import path from the lib. +_lib_pkg = types.ModuleType("alpha_engine_lib") +_telegram_mod = types.ModuleType("alpha_engine_lib.telegram") +_telegram_mod.send_message = MagicMock(return_value=True) +_lib_pkg.telegram = _telegram_mod +sys.modules.setdefault("alpha_engine_lib", _lib_pkg) +sys.modules.setdefault("alpha_engine_lib.telegram", _telegram_mod) + +sys.path.insert(0, str(Path(__file__).parent)) +import index # noqa: E402 + + +SATURDAY_ARN = "arn:aws:states:us-east-1:711398986525:stateMachine:alpha-engine-saturday-pipeline" +WEEKDAY_ARN = "arn:aws:states:us-east-1:711398986525:stateMachine:alpha-engine-weekday-pipeline" +EOD_ARN = "arn:aws:states:us-east-1:711398986525:stateMachine:alpha-engine-eod-pipeline" + + +def _event(status: str, sm_arn: str = SATURDAY_ARN, **detail_overrides) -> dict: + detail = { + "status": status, + "stateMachineArn": sm_arn, + "executionArn": f"arn:aws:states:us-east-1:711398986525:execution:{sm_arn.rsplit(':', 1)[-1]}:exec-001", + "name": "exec-001", + "startDate": 1_700_000_000_000, + "stopDate": 1_700_000_060_000, # 60s after start + } + detail.update(detail_overrides) + return {"detail": detail} + + +@pytest.fixture(autouse=True) +def reset_send_message(): + _telegram_mod.send_message.reset_mock() + _telegram_mod.send_message.return_value = True + yield + + +def test_running_sends_silent_message_without_duration_or_cause(): + event = _event("RUNNING", stopDate=None) + result = index.handler(event, None) + + _telegram_mod.send_message.assert_called_once() + text, kwargs = _telegram_mod.send_message.call_args.args[0], _telegram_mod.send_message.call_args.kwargs + assert "Saturday SF — RUNNING" in text + assert "Execution: exec-001" in text + assert "Duration:" not in text + assert "Cause:" not in text + assert kwargs["disable_notification"] is True + assert result["status"] == "RUNNING" + assert result["silent"] is True + assert result["telegram_sent"] is True + + +def test_succeeded_sends_loud_message_with_duration(): + event = _event("SUCCEEDED", sm_arn=WEEKDAY_ARN) + result = index.handler(event, None) + + text = _telegram_mod.send_message.call_args.args[0] + kwargs = _telegram_mod.send_message.call_args.kwargs + assert "Weekday SF — SUCCEEDED" in text + assert "Duration: 1m" in text + assert kwargs["disable_notification"] is False + assert result["silent"] is False + + +def test_succeeded_long_duration_formats_hours_and_minutes(): + # 4h 12m → start 0, stop = (4*3600 + 12*60) * 1000 + event = _event("SUCCEEDED", startDate=0, stopDate=(4 * 3600 + 12 * 60) * 1000) + index.handler(event, None) + text = _telegram_mod.send_message.call_args.args[0] + assert "Duration: 4h 12m" in text + + +def test_failed_fetches_and_includes_cause(): + event = _event("FAILED", sm_arn=EOD_ARN) + fake_sf_client = MagicMock() + fake_sf_client.describe_execution.return_value = { + "error": "States.TaskFailed", + "cause": "EODReconcile state failed: NoCredentialsError", + } + with patch("index.boto3.client", return_value=fake_sf_client) as boto_client: + result = index.handler(event, None) + + boto_client.assert_called_once_with("stepfunctions", region_name=index.REGION) + fake_sf_client.describe_execution.assert_called_once_with( + executionArn=event["detail"]["executionArn"] + ) + text = _telegram_mod.send_message.call_args.args[0] + kwargs = _telegram_mod.send_message.call_args.kwargs + assert "EOD SF — FAILED" in text + assert "Cause: States.TaskFailed: EODReconcile state failed: NoCredentialsError" in text + assert kwargs["disable_notification"] is False + assert result["status"] == "FAILED" + + +def test_failed_with_describe_execution_error_still_sends(): + """DescribeExecution failures must not block the Telegram send.""" + event = _event("FAILED") + fake_sf_client = MagicMock() + fake_sf_client.describe_execution.side_effect = RuntimeError("API throttled") + with patch("index.boto3.client", return_value=fake_sf_client): + result = index.handler(event, None) + + text = _telegram_mod.send_message.call_args.args[0] + assert "Saturday SF — FAILED" in text + assert "Cause:" not in text # enrichment silently dropped + assert result["telegram_sent"] is True + + +def test_failed_truncates_long_cause(): + event = _event("FAILED") + fake_sf_client = MagicMock() + fake_sf_client.describe_execution.return_value = { + "error": "E", + "cause": "x" * 500, + } + with patch("index.boto3.client", return_value=fake_sf_client): + index.handler(event, None) + + text = _telegram_mod.send_message.call_args.args[0] + cause_line = [line for line in text.splitlines() if line.startswith("Cause:")][0] + # "Cause: " prefix + cap (_CAUSE_MAX_CHARS) + ellipsis (1) = bounded + assert len(cause_line) <= len("Cause: ") + index._CAUSE_MAX_CHARS + + +def test_timed_out_sends_loud_message(): + event = _event("TIMED_OUT") + index.handler(event, None) + text = _telegram_mod.send_message.call_args.args[0] + kwargs = _telegram_mod.send_message.call_args.kwargs + assert "Saturday SF — TIMED_OUT" in text + assert kwargs["disable_notification"] is False + + +def test_aborted_sends_loud_message(): + event = _event("ABORTED") + index.handler(event, None) + text = _telegram_mod.send_message.call_args.args[0] + kwargs = _telegram_mod.send_message.call_args.kwargs + assert "Saturday SF — ABORTED" in text + assert kwargs["disable_notification"] is False + + +def test_unknown_sf_arn_falls_back_to_arn_tail(): + unknown_arn = "arn:aws:states:us-east-1:711398986525:stateMachine:alpha-engine-future-pipeline" + event = _event("SUCCEEDED", sm_arn=unknown_arn) + index.handler(event, None) + text = _telegram_mod.send_message.call_args.args[0] + assert "alpha-engine-future-pipeline — SUCCEEDED" in text + + +def test_send_message_failure_returned_in_result(): + _telegram_mod.send_message.return_value = False + result = index.handler(_event("SUCCEEDED"), None) + assert result["telegram_sent"] is False + assert result["status"] == "SUCCEEDED" + + +def test_label_lookup_table_covers_all_three_sfs(): + assert index._SF_LABELS["alpha-engine-saturday-pipeline"] == "Saturday SF" + assert index._SF_LABELS["alpha-engine-weekday-pipeline"] == "Weekday SF" + assert index._SF_LABELS["alpha-engine-eod-pipeline"] == "EOD SF" + + +def test_format_duration_handles_missing_timestamps(): + assert index._format_duration(None, None) == "" + assert index._format_duration(1000, None) == "" + assert index._format_duration(None, 2000) == "" + assert index._format_duration(0, 1000) == "0m" # sub-minute rounds down