-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpreflight.py
More file actions
428 lines (383 loc) · 19.6 KB
/
preflight.py
File metadata and controls
428 lines (383 loc) · 19.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
"""
Data-module preflight: connectivity + freshness checks run at the top of
``weekly_collector.main()`` before any real collection work starts.
Primitives live in ``alpha_engine_lib.preflight.BasePreflight``; this
module composes them with module-specific HTTP probes (polygon, FRED,
FMP /stable) + an ArcticDB-libraries-present gate.
Consolidated 2026-04-30 — the legacy ``validators/preflight.py`` has been
retired. Both files were running back-to-back from ``weekly_collector``
in the phase1 path with overlapping scope; the lib-based path is now
the single source of truth. See alpha-engine-lib README for the
2026-04-14 failure mode that motivated the library.
"""
from __future__ import annotations
import logging
import uuid
from typing import Any
from alpha_engine_lib.preflight import BasePreflight
from alpha_engine_lib.secrets import get_secret
log = logging.getLogger(__name__)
_HTTP_TIMEOUT_SECS = 10.0
# FMP /stable probe: cheapest auth-gated call that distinguishes
# (a) valid key on /stable from (b) a key that still works on the
# sunsetted v3 endpoints but would silently 402/403 across our real
# collector calls. AAPL is guaranteed to exist and returns a small
# payload. Added 2026-04-20 after the v3→/stable migration; the
# collectors had been silently zeroing fundamentals for two weeks
# before detection.
_FMP_STABLE_PROBE_URL = "https://financialmodelingprep.com/stable/key-metrics-ttm"
_FMP_STABLE_PROBE_SYMBOL = "AAPL"
# Polygon.io reference-data probe — cheapest auth-gated call that
# validates both network reachability AND API-key validity.
_POLYGON_PROBE_URL = "https://api.polygon.io/v3/reference/tickers/AAPL"
# FRED observation probe — DFF (Federal Funds Rate) is a well-known
# series guaranteed to exist; matches collectors/macro.py usage.
_FRED_PROBE_URL = "https://api.stlouisfed.org/fred/series/observations"
_FRED_PROBE_SERIES = "DFF"
class DataPreflight(BasePreflight):
"""Preflight checks for the alpha-engine-data entrypoint.
Mode determines which external services must be reachable:
- ``"daily"`` — weekday DailyData step. ArcticDB must be readable
and SPY must be ≤4 days stale (covers Fri→Tue long weekends +
1 day of buffer).
- ``"morning_enrich"`` — Saturday MorningEnrich state (split out of
DataPhase1 by the preflight-task-split 2026-05-16, plan
alpha-engine-docs/private/preflight-task-split-260516.md). Its
checks are the UNION of what ``_run_morning_enrich`` actually
needs: polygon + FRED secrets + reachability, S3 read/write,
ArcticDB libraries present. NO ArcticDB-freshness check —
morning-enrich is part of what *makes* ArcticDB fresh, so
asserting freshness at its own entry would be circular. This is a
strict subset of phase1's dependency set (phase1 additionally
builds the universe), so per-mode preflights are correct and
non-overlapping in responsibility.
- ``"phase1"`` — Saturday DataPhase1. External APIs (FRED, polygon)
needed; no ArcticDB freshness check (phase1 is what *populates*
ArcticDB).
- ``"phase2"`` — Saturday DataPhase2. FMP /stable + Finnhub + SEC
EDGAR needed.
"""
_MODES = ("daily", "morning_enrich", "phase1", "phase2")
def __init__(self, bucket: str, mode: str):
super().__init__(bucket)
if mode not in self._MODES:
raise ValueError(f"DataPreflight: unknown mode {mode!r}")
self.mode = mode
def run(self) -> None:
# Order: cheapest first so a trivially-broken run fails in <1s.
# 1. env vars (local lookup)
# 2. S3 bucket (~ms, IAM)
# 3. mode-specific HTTP probes (~200ms each)
# 4. ArcticDB checks (~100ms list_libraries; ~seconds for read)
# AWS_REGION is a plain env var (boto3 region) — not a secret —
# so it stays an os.environ check. The API keys below moved to
# SSM via get_secret() in the .env-deprecation arc (#241/#242):
# every collector + the reachability probes in this file resolve
# them via get_secret(), so an os.environ assertion here is stale
# and fails spuriously on the SSM-backed spot (no .env present).
# Origin: 2026-05-16 Saturday SF DataPhase1 — phase1 preflight
# aborted "required env vars missing: ['FRED_API_KEY',
# 'POLYGON_API_KEY']" even though MorningEnrich (same collectors)
# had just fetched polygon + FRED fine via get_secret().
self.check_env_vars("AWS_REGION")
if self.mode in ("morning_enrich", "phase1"):
# morning-enrich + phase1 both hit polygon + FRED. Today
# morning-enrich's polygon overwrite is the load-bearing call;
# FRED is included because the same macro collectors are in
# the morning-enrich code path and a drifted FRED key must
# fail in <1s here, not 28min into the spot run.
self._check_secrets("FRED_API_KEY", "POLYGON_API_KEY")
elif self.mode == "phase2":
self._check_secrets("FMP_API_KEY", "FINNHUB_API_KEY", "EDGAR_IDENTITY")
self.check_s3_bucket()
if self.mode in ("morning_enrich", "phase1"):
# Catch credential drift / upstream outages BEFORE the
# expensive collector work (phase1 ~30min, morning-enrich
# ~28min). Net ~400ms across both probes.
self._check_polygon_reachable()
self._check_fred_reachable()
# Bucket policies + IAM denies that HEAD doesn't catch:
# PUT a sentinel + DELETE it. ~50ms. Caught the 2026-04-12
# IAM-deny class on the spot's executor-role inline policy.
self._check_s3_writeable_sentinel()
# phase1 BUILDS ArcticDB universe + macro on first run;
# morning-enrich APPENDS the polygon-corrected Friday row to
# the universe library. Both need the libraries already
# present, so enforce existence here so a typo in path_prefix
# fails in 100ms not 28-50min into the run (catches the
# 2026-04-14 silent-skip class). Deliberately NOT a freshness
# check for morning_enrich — it is part of what makes
# ArcticDB fresh, so a freshness gate at its own entry would
# be circular.
self._check_arcticdb_libraries_present(("universe", "macro"))
elif self.mode == "phase2":
self._check_fmp_stable_reachable()
if self.mode == "daily":
# SPY lives in the `macro` library (market-wide series). The
# `universe` library holds per-stock OHLCV for S&P 500/400
# constituents. daily_append writes to both libraries, so
# macro/SPY freshness is a sufficient signal for the write
# path being healthy end-to-end.
#
# Trading-day-aware via alpha_engine_lib.dates: max_stale=1
# tolerates polygon's T+1 publish latency (yesterday's close
# may not have been written yet at this preflight's invocation
# time). The earlier 4-calendar-day threshold was a workaround
# for weekend artifacts under calendar arithmetic; trading-day
# arithmetic handles weekends + holidays natively.
self._check_macro_spy_fresh_trading_days(max_stale=1)
# Both libraries must be present — same gate as phase1 for
# operator-clarity on partial-deploy scenarios.
self._check_arcticdb_libraries_present(("universe", "macro"))
def _check_macro_spy_fresh_trading_days(self, *, max_stale: int) -> None:
"""Trading-day-aware SPY freshness for the `daily` preflight mode.
Bypasses the lib's calendar-day ``check_arcticdb_fresh`` (which is
load-bearing for other helpers and will be retired separately) and
calls the new ``alpha_engine_lib.dates.is_fresh_in_trading_days``
chokepoint directly. Mirrors the postflight pattern so producer-
and consumer-side checks agree on what "fresh" means.
"""
import pandas as pd
from datetime import datetime, timezone
from alpha_engine_lib.dates import (
is_fresh_in_trading_days,
trading_days_stale,
expected_last_close,
)
# Open arctic via the lib's helper which is already on the instance.
import arcticdb as adb
import os
region = os.environ.get("AWS_REGION", "us-east-1")
bucket = os.environ.get("RESEARCH_BUCKET", "alpha-engine-research")
uri = f"s3s://s3.{region}.amazonaws.com:{bucket}?path_prefix=arcticdb&aws_auth=true"
arctic = adb.Arctic(uri)
macro_lib = arctic.get_library("macro")
df = macro_lib.read("SPY", columns=["Close"]).data
if df is None or df.empty:
raise RuntimeError("ArcticDB macro.SPY has zero rows")
last_ts = pd.Timestamp(df.index[-1])
if last_ts.tzinfo is not None:
last_ts = last_ts.tz_convert("UTC").tz_localize(None)
last_date = last_ts.normalize().date()
today = datetime.now(timezone.utc).date().isoformat()
if not is_fresh_in_trading_days(last_date, today, max_stale=max_stale):
stale = trading_days_stale(last_date, today)
expected = expected_last_close(today)
raise RuntimeError(
f"ArcticDB macro.SPY last_date={last_date} is {stale} "
f"trading-day(s) behind the expected last close {expected} "
f"as of {today} (>{max_stale} trading-day threshold)"
)
log.info("preflight: ArcticDB macro.SPY last_date=%s ≤ %d trading-day(s) stale",
last_date, max_stale)
# ── Secret presence ──────────────────────────────────────────────────
def _check_secrets(self, *names: str) -> None:
"""SSM-aware equivalent of ``check_env_vars`` for API-key secrets.
Post the .env-deprecation arc (#241/#242) the API keys live in
SSM, fetched lazily via ``get_secret()`` deeper in the run. This
keeps the fail-fast intent of the old ``check_env_vars`` gate —
abort in <1s, not 30min in — but resolves from SSM (with env
fallback, which ``get_secret`` handles) instead of asserting
``os.environ`` directly. Raises the same RuntimeError shape so
operator-facing failure text is unchanged.
"""
missing = [
n
for n in names
if not (get_secret(n, required=False, default="") or "").strip()
]
if missing:
raise RuntimeError(
f"Pre-flight: required secrets missing: {missing}"
)
# ── Mode-specific primitives ─────────────────────────────────────────
def _check_fmp_stable_reachable(self) -> None:
"""Validate FMP /stable auth + endpoint availability.
Guards against the exact failure mode from the 2026-04 incident:
the v3 endpoints silently 403'd (or paid-tier endpoints 402'd),
the per-ticker exceptions logged at debug level, the collector
returned all-NEUTRAL, and two weeks of fundamentals were zeroed
before anyone noticed. A /stable probe at startup fails the
Step Function in ~1s instead.
"""
import requests
api_key = (get_secret("FMP_API_KEY", required=False, default="") or "").strip()
try:
resp = requests.get(
_FMP_STABLE_PROBE_URL,
params={"symbol": _FMP_STABLE_PROBE_SYMBOL, "apikey": api_key},
timeout=_HTTP_TIMEOUT_SECS,
)
except requests.RequestException as exc:
raise RuntimeError(
f"Pre-flight: FMP /stable unreachable: {exc} — network outage or egress blocked."
) from exc
if resp.status_code in (401, 403):
raise RuntimeError(
f"Pre-flight: FMP /stable auth failed (HTTP {resp.status_code}): "
f"FMP_API_KEY invalid, revoked, or still pointing at the sunsetted v3 plan."
)
if resp.status_code == 402:
raise RuntimeError(
f"Pre-flight: FMP /stable returned HTTP 402 Payment Required on "
f"key-metrics-ttm — the free tier no longer covers this endpoint. "
f"Subscribe or move the collector to a different provider."
)
if resp.status_code >= 500:
raise RuntimeError(
f"Pre-flight: FMP /stable returned HTTP {resp.status_code} — upstream outage."
)
if resp.status_code != 200:
raise RuntimeError(
f"Pre-flight: FMP /stable returned unexpected HTTP {resp.status_code} "
f"on {_FMP_STABLE_PROBE_URL}: {resp.text[:200]}"
)
payload = resp.json()
if not isinstance(payload, list) or not payload:
raise RuntimeError(
f"Pre-flight: FMP /stable returned 200 but body was empty/malformed "
f"for {_FMP_STABLE_PROBE_SYMBOL}: {str(payload)[:200]}"
)
log.info("preflight: FMP /stable reachable + auth valid (HTTP 200)")
def _check_polygon_reachable(self) -> None:
"""Validate polygon.io network + auth via reference-data call.
Catches expired API key, polygon outage, blocked egress. Does NOT
catch rate-limit ceiling (next collector call will retry/fail
loudly by design).
"""
import requests
api_key = (get_secret("POLYGON_API_KEY", required=False, default="") or "").strip()
try:
resp = requests.get(
_POLYGON_PROBE_URL,
params={"apiKey": api_key},
timeout=_HTTP_TIMEOUT_SECS,
)
except requests.RequestException as exc:
raise RuntimeError(
f"Pre-flight: polygon.io unreachable: {exc} — network outage or egress blocked."
) from exc
if resp.status_code in (401, 403):
raise RuntimeError(
f"Pre-flight: polygon.io auth failed (HTTP {resp.status_code}): "
f"POLYGON_API_KEY is invalid or revoked."
)
if resp.status_code >= 500:
raise RuntimeError(
f"Pre-flight: polygon.io returned HTTP {resp.status_code} on a reference-data call "
f"— upstream outage. Check status.polygon.io."
)
if resp.status_code != 200:
raise RuntimeError(
f"Pre-flight: polygon.io returned unexpected HTTP {resp.status_code} "
f"on {_POLYGON_PROBE_URL}: {resp.text[:200]}"
)
log.info("preflight: polygon.io reachable + auth valid (HTTP 200)")
def _check_fred_reachable(self) -> None:
"""Validate FRED network + auth via single-observation DFF call."""
import requests
api_key = (get_secret("FRED_API_KEY", required=False, default="") or "").strip()
try:
resp = requests.get(
_FRED_PROBE_URL,
params={
"series_id": _FRED_PROBE_SERIES,
"api_key": api_key,
"file_type": "json",
"sort_order": "desc",
"limit": 1,
},
timeout=_HTTP_TIMEOUT_SECS,
)
except requests.RequestException as exc:
raise RuntimeError(
f"Pre-flight: FRED unreachable: {exc} — network outage or egress blocked."
) from exc
if resp.status_code == 400:
# FRED returns 400 with body containing "api_key" on bad key
body = resp.text[:200].lower()
if "api_key" in body or "invalid" in body:
raise RuntimeError(
f"Pre-flight: FRED auth failed (HTTP 400): FRED_API_KEY is invalid. "
f"Response: {resp.text[:200]}"
)
if resp.status_code >= 500:
raise RuntimeError(
f"Pre-flight: FRED returned HTTP {resp.status_code} on DFF call "
f"— upstream outage."
)
if resp.status_code != 200:
raise RuntimeError(
f"Pre-flight: FRED returned unexpected HTTP {resp.status_code}: {resp.text[:200]}"
)
log.info("preflight: FRED reachable + auth valid (HTTP 200)")
def _check_s3_writeable_sentinel(self) -> None:
"""Validate S3 bucket grants PUT + DELETE via a sentinel object.
``BasePreflight.check_s3_bucket()`` only HEADs the bucket; that
passes when IAM grants ListBucket but denies PutObject (bucket
policy or scoped role). Surfacing the deny here saves ~40 min of
spot time burning on collectors that all silently write 0 rows.
"""
import boto3
s3 = boto3.client("s3", region_name=self.region)
sentinel_key = f"preflight/sentinel-{uuid.uuid4().hex}.txt"
try:
s3.put_object(
Bucket=self.bucket,
Key=sentinel_key,
Body=b"preflight-sentinel",
ContentType="text/plain",
)
except Exception as exc:
raise RuntimeError(
f"Pre-flight: S3 PUT s3://{self.bucket}/{sentinel_key} failed: {exc} — "
f"IAM lacks s3:PutObject or bucket policy blocks writes."
) from exc
try:
s3.delete_object(Bucket=self.bucket, Key=sentinel_key)
except Exception as exc:
# Non-fatal: PUT (the load-bearing op for collectors) succeeded;
# missing DELETE means sentinels accumulate but writes work.
log.warning(
"preflight: sentinel DELETE failed (%s) — preflight-sentinel objects "
"may accumulate in s3://%s/preflight/. Check s3:DeleteObject IAM grant.",
exc, self.bucket,
)
log.info("preflight: S3 bucket s3://%s read + write OK", self.bucket)
def _check_arcticdb_libraries_present(self, expected: tuple[str, ...]) -> None:
"""Validate ArcticDB connection + that ``expected`` libraries exist.
``BasePreflight.check_arcticdb_fresh()`` covers freshness on a
specific library/symbol pair, but the libraries-existence gate
is a separate concern — useful at the cold-start / partial-deploy
boundary where a typo in path_prefix or a half-applied infra
change leaves the bucket reachable but the libraries absent.
"""
try:
import arcticdb as adb
except ImportError as exc:
raise RuntimeError(
f"Pre-flight: arcticdb not importable — install "
f"alpha-engine-lib[arcticdb] or add arcticdb to the deploy image: {exc}"
) from exc
uri = (
f"s3s://s3.{self.region}.amazonaws.com:{self.bucket}"
"?path_prefix=arcticdb&aws_auth=true"
)
try:
arctic = adb.Arctic(uri)
libs = set(arctic.list_libraries())
except Exception as exc:
raise RuntimeError(
f"Pre-flight: ArcticDB connection failed at {uri}: {exc}. "
f"Check s3 prefix + credentials + arcticdb version."
) from exc
missing = set(expected) - libs
if missing:
raise RuntimeError(
f"Pre-flight: ArcticDB missing expected libraries: {sorted(missing)} "
f"(found: {sorted(libs)}). Run backfill or verify path_prefix."
)
log.info(
"preflight: ArcticDB connectable, libraries present: %s",
sorted(expected),
)