-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpreflight.py
More file actions
444 lines (383 loc) · 18.4 KB
/
preflight.py
File metadata and controls
444 lines (383 loc) · 18.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
"""
Preflight: fast fail-fast connectivity + freshness checks.
``BasePreflight`` provides the shared primitives; consumer modules
subclass it and override ``run()`` to compose a module-specific check
sequence. The base raises ``RuntimeError`` on any failure — consumers
catch nothing, so the raise propagates up through ``main()`` → non-zero
exit → the orchestration layer's failure handler.
Design context (2026-04-14): the alpha-engine-data DailyData step
silently ran against a stale ArcticDB universe library for two
weekdays because an ``ImportError`` on ``arcticdb`` was caught at debug
level. A freshness check on SPY would have flagged the outage in ~1s.
Preflight exists to catch that class of failure *before* spending 30
minutes on real work.
Scope is deliberately narrow: **external-world handshakes only** (env
vars, S3 reachability, ArcticDB symbol freshness). Data-correctness
hard-fails still live in the hardened collectors themselves.
"""
from __future__ import annotations
import json
import logging
import os
import urllib.request
import warnings
from datetime import datetime, timezone
from pathlib import Path
log = logging.getLogger(__name__)
# Default location for the deploy-time GIT_SHA stamp inside a Lambda
# image. Stamped by deploy.sh via ``--build-arg GIT_SHA=…`` then COPYed
# to /var/task/GIT_SHA.txt; consumers running outside Lambda can pass an
# alternate path.
_DEFAULT_GIT_SHA_FILE = Path("/var/task/GIT_SHA.txt")
# Public-repo branch-HEAD API. No auth required; 60 req/hr unauth rate
# limit is fine for Lambda cold-starts and CI runs.
_GITHUB_BRANCH_URL = "https://api.github.com/repos/{repo}/branches/{branch}"
class BasePreflight:
"""Shared preflight primitives.
Subclass and override :meth:`run` to compose a module-specific
check sequence. Each primitive raises :class:`RuntimeError` on
failure with an explanatory message that includes what was checked
and what went wrong.
"""
def __init__(self, bucket: str, region: str | None = None):
if not bucket:
raise ValueError("BasePreflight: bucket is required")
self.bucket = bucket
self.region = region or os.environ.get("AWS_REGION", "us-east-1")
# ── Composition entry point ──────────────────────────────────────────
def run(self) -> None:
"""Execute the preflight check sequence.
Subclasses override this to compose primitives. The default
raises to prevent a misuse where a subclass forgets to override
and silently passes.
"""
raise NotImplementedError(
f"{type(self).__name__} must override run() to compose preflight checks"
)
# ── Primitives ───────────────────────────────────────────────────────
def check_env_vars(self, *names: str) -> None:
"""Raise if any of the given env vars are unset or empty."""
missing = [n for n in names if not os.environ.get(n)]
if missing:
raise RuntimeError(f"Pre-flight: required env vars missing: {missing}")
def check_s3_bucket(self) -> None:
"""Raise if the configured bucket is not reachable (auth, network, or missing)."""
import boto3
try:
boto3.client("s3").head_bucket(Bucket=self.bucket)
except Exception as exc:
raise RuntimeError(
f"Pre-flight: S3 bucket {self.bucket!r} unreachable: {exc}"
) from exc
def check_s3_key(self, key: str, max_age_days: int | None = None) -> None:
"""Raise if ``s3://{bucket}/{key}`` is missing or older than ``max_age_days``.
``max_age_days=None`` disables the freshness check — existence only.
"""
import boto3
from botocore.exceptions import ClientError
try:
head = boto3.client("s3").head_object(Bucket=self.bucket, Key=key)
except ClientError as exc:
err_code = exc.response.get("Error", {}).get("Code")
if err_code in ("404", "NoSuchKey"):
raise RuntimeError(
f"Pre-flight: S3 key s3://{self.bucket}/{key} does not exist"
) from exc
raise RuntimeError(
f"Pre-flight: S3 key s3://{self.bucket}/{key} unreachable: {exc}"
) from exc
if max_age_days is not None:
last_modified = head["LastModified"]
age_days = (datetime.now(timezone.utc) - last_modified).days
if age_days > max_age_days:
raise RuntimeError(
f"Pre-flight: S3 key s3://{self.bucket}/{key} is "
f"{age_days} days stale (threshold {max_age_days})"
)
def check_arcticdb_fresh(
self,
library: str,
symbol: str,
max_stale_days: int,
) -> None:
"""Raise if ``arcticdb`` is unavailable, the library/symbol is
unreadable, or the last date in ``symbol`` is older than
``max_stale_days`` calendar days from today (UTC).
Requires the ``arcticdb`` optional extra
(``alpha-engine-lib[arcticdb]``).
"""
try:
import arcticdb as adb
import pandas as pd
except ImportError as exc:
raise RuntimeError(
"Pre-flight: arcticdb not importable — install "
"alpha-engine-lib[arcticdb] or add arcticdb to the deploy image: "
f"{exc}"
) from exc
uri = (
f"s3s://s3.{self.region}.amazonaws.com:{self.bucket}"
"?path_prefix=arcticdb&aws_auth=true"
)
try:
lib = adb.Arctic(uri).get_library(library)
except Exception as exc:
raise RuntimeError(
f"Pre-flight: ArcticDB library {library!r} unreachable "
f"at {uri}: {exc}"
) from exc
try:
df = lib.read(symbol).data
except Exception as exc:
raise RuntimeError(
f"Pre-flight: ArcticDB {library}/{symbol} read failed: {exc}"
) from exc
if df.empty:
raise RuntimeError(
f"Pre-flight: ArcticDB {library}/{symbol} is empty"
)
last_ts = pd.Timestamp(df.index[-1])
# Normalize to tz-naive date for comparison against today's UTC date.
if last_ts.tzinfo is not None:
last_ts = last_ts.tz_convert("UTC").tz_localize(None)
today = pd.Timestamp(datetime.now(timezone.utc).date())
age_days = (today - last_ts.normalize()).days
if age_days > max_stale_days:
raise RuntimeError(
f"Pre-flight: ArcticDB {library}/{symbol} last date "
f"{last_ts.date()} is {age_days} days stale "
f"(threshold {max_stale_days})"
)
def check_arcticdb_universe_fresh(
self,
library: str,
max_stale_days: int,
*,
max_workers: int = 20,
) -> None:
"""[DEPRECATED 2026-05-05] Per-symbol freshness scan over an
ArcticDB library.
Deprecated because data-freshness now lives upstream in
``alpha-engine-data``'s preflight, which runs before any
consumer in every Step Function. Consumers (executor,
backtester, predictor) dropped their calls in 2026-05-05's
consolidation arc. Scheduled for removal after 6-month soak;
current callers should migrate to trusting SF ordering.
Original docstring follows.
Scan every symbol in ``library`` and raise if any symbol's
last_date is older than ``max_stale_days`` calendar days from
today (UTC).
Where :meth:`check_arcticdb_fresh` covers a single canonical
liveness probe (e.g. macro/SPY), this primitive catches the
partial-write class — individual tickers stop receiving writes
while the canonical SPY symbol stays fresh, so the single-symbol
check reports healthy but downstream consumers fail two hours
deep on stale per-ticker reads.
Motivation (2026-04-21 backtester incident): macro.SPY was fresh,
ASGN + MOH had stalled at 2026-04-01 because daily_append silently
skipped them, executor's load_atr_14_pct guard aborted the
backtester ~2 hours into its predictor-backtest mode. This scan
catches the same class at preflight in ~5-10 seconds (20 threads
× ~900 tickers × tail(1) read each).
Implementation notes:
- Reads ``tail(1)`` rather than the full series — ~20ms/symbol.
- Read errors on any symbol are themselves fatal: a silent read
error here would mask exactly the kind of write-skip this
primitive exists to catch.
- Stale list is sorted by stalest-first so the operator sees
the worst offenders without scrolling.
Requires the ``arcticdb`` optional extra
(``alpha-engine-lib[arcticdb]``).
Args:
library: ArcticDB library name to scan (e.g. ``"universe"``).
max_stale_days: Symbols with ``last_date`` older than today
minus this many calendar days are flagged as stale.
max_workers: Thread pool size for the per-symbol scan.
Default 20 matches backtester precedent. Tune lower for
rate-limited backends; higher for fan-out-bound cases.
Raises:
RuntimeError: If arcticdb is unimportable, the library is
unreachable, the library is empty, any symbol's
``tail(1)`` read raises, or ANY symbol is stale beyond
the threshold.
"""
warnings.warn(
"BasePreflight.check_arcticdb_universe_fresh is deprecated; "
"data-freshness now lives upstream in alpha-engine-data's "
"preflight (runs before consumers in every Step Function). "
"Scheduled for removal after 6-month soak.",
DeprecationWarning,
stacklevel=2,
)
from concurrent.futures import ThreadPoolExecutor
from datetime import date, timedelta
try:
import arcticdb as adb
import pandas as pd
except ImportError as exc:
raise RuntimeError(
"Pre-flight: arcticdb not importable — install "
"alpha-engine-lib[arcticdb] or add arcticdb to the deploy image: "
f"{exc}"
) from exc
uri = (
f"s3s://s3.{self.region}.amazonaws.com:{self.bucket}"
"?path_prefix=arcticdb&aws_auth=true"
)
try:
lib = adb.Arctic(uri).get_library(library)
except Exception as exc:
raise RuntimeError(
f"Pre-flight: ArcticDB library {library!r} unreachable "
f"at {uri}: {exc}"
) from exc
symbols = list(lib.list_symbols())
if not symbols:
raise RuntimeError(
f"Pre-flight: ArcticDB library {library!r} on bucket "
f"{self.bucket!r} has zero symbols — upstream pipeline "
"has not written anything."
)
today = date.today()
cutoff = today - timedelta(days=max_stale_days)
def _last_date_for(sym: str) -> tuple[str, "date | None", "str | None"]:
try:
df = lib.tail(sym, n=1).data
if df.empty:
return sym, None, "empty frame"
last_ts = pd.Timestamp(df.index[-1])
if last_ts.tzinfo is not None:
last_ts = last_ts.tz_convert("UTC").tz_localize(None)
return sym, last_ts.date(), None
except Exception as exc: # pragma: no cover — covered via mock
return sym, None, str(exc)
stale: list[tuple[str, date]] = []
errored: list[tuple[str, str]] = []
with ThreadPoolExecutor(max_workers=max_workers) as pool:
for sym, last_date, err in pool.map(_last_date_for, symbols):
if err is not None:
errored.append((sym, err))
elif last_date is None:
errored.append((sym, "no last_date"))
elif last_date < cutoff:
stale.append((sym, last_date))
if errored:
sample = [f"{s}({e[:40]})" for s, e in errored[:5]]
raise RuntimeError(
f"Pre-flight: {len(errored)} symbol(s) in ArcticDB "
f"library {library!r} could not be read for freshness check. "
f"Sample: {sample}. Treated as fatal because a silent read "
"error here would mask exactly the kind of per-symbol write "
"skip this scan exists to catch."
)
if stale:
stale.sort(key=lambda x: x[1])
summary = [f"{sym} (last={d.isoformat()})" for sym, d in stale[:10]]
more = f" (+{len(stale) - 10} more)" if len(stale) > 10 else ""
raise RuntimeError(
f"Pre-flight: {len(stale)}/{len(symbols)} symbol(s) in "
f"ArcticDB library {library!r} have stale data (older "
f"than {max_stale_days} calendar days, "
f"cutoff={cutoff.isoformat()}). Top offenders: "
f"{summary}{more}. Backfill upstream or investigate "
"the per-symbol write path before re-running."
)
def check_ib_paper_account(self, account_id: str) -> None:
"""Raise if ``account_id`` doesn't start with 'D' (IBKR paper prefix).
Defensive check for the executor — prevents live credentials
leaking into a paper-trading run (or vice versa).
"""
if not account_id:
raise RuntimeError("Pre-flight: IB account_id is empty")
if not account_id.startswith("D"):
raise RuntimeError(
f"Pre-flight: IB account_id {account_id!r} is not a paper "
"account (paper accounts start with 'D')"
)
def check_deploy_drift(
self,
repo: str,
branch: str = "main",
*,
sha_file: Path | None = None,
timeout: float = 5.0,
) -> None:
"""Hard-fail if the deploy-baked SHA lags ``repo@branch`` HEAD.
The deployed image is stamped with ``GIT_SHA`` at build time
(via Docker ``--build-arg GIT_SHA=…``); this check compares
that stamp against the current ``branch`` HEAD SHA on GitHub.
A mismatch means a merge landed on main but the CI deploy
workflow either failed, was skipped by a paths filter, or
hasn't run yet — i.e. the deployed code is a prior commit,
which is exactly the deploy-drift mode that motivated this
check (2026-04-20 coverage-gap session).
Degraded modes (warn, don't fail) — chosen so a GitHub outage
or an unstamped legacy image doesn't block a trading-hours
Lambda:
- Stamp file missing or "unknown" → image predates drift
checking; log warn and continue.
- GitHub API unreachable → log warn and continue.
Hard-fail mode — when both stamps are present and differ.
Args:
repo: ``"owner/name"`` — e.g. ``"cipher813/alpha-engine-predictor"``.
branch: Branch HEAD to compare against. Default ``"main"``.
sha_file: Path to the GIT_SHA stamp. Defaults to
``/var/task/GIT_SHA.txt`` (Lambda image convention).
timeout: GitHub API timeout in seconds.
"""
baked = _read_baked_git_sha(sha_file or _DEFAULT_GIT_SHA_FILE)
if baked is None:
log.warning(
"Deploy-drift: no baked GIT_SHA in image at %s (legacy build "
"or build-arg omitted). Rebuild via deploy.sh to enable this check.",
sha_file or _DEFAULT_GIT_SHA_FILE,
)
return
upstream = _fetch_origin_main_sha(repo, branch=branch, timeout=timeout)
if upstream is None:
# _fetch_origin_main_sha already logged the reason
return
if baked != upstream:
raise RuntimeError(
f"Deploy drift: image was built from {baked[:12]} but "
f"{repo}@{branch} is now at {upstream[:12]}. The CI deploy "
f"workflow did not promote the latest commit. Re-run "
f"`.github/workflows/deploy.yml` on main (or the local "
f"deploy.sh) before resuming. Refusing to proceed — "
f"running stale code on new signals is how 2026-04-20 happened."
)
log.info("Deploy-drift: image at %s matches %s@%s ✓", baked[:12], repo, branch)
def _read_baked_git_sha(sha_file: Path) -> str | None:
"""Return the SHA baked into the image by ``deploy.sh --build-arg GIT_SHA=…``.
Returns ``None`` if the stamp file is missing (legacy image) or holds
``"unknown"`` (build-arg omitted). Callers decide whether ``None`` is
warn-and-continue or hard-fail.
"""
try:
sha = sha_file.read_text().strip()
except FileNotFoundError:
return None
if not sha or sha == "unknown":
return None
return sha
def _fetch_origin_main_sha(repo: str, branch: str = "main", timeout: float = 5.0) -> str | None:
"""Fetch HEAD SHA of ``branch`` for ``repo`` via GitHub REST API.
Returns ``None`` on any network/parse error — the drift check treats a
GitHub outage as "unknown, proceed with warning" rather than blocking
the consumer. ``repo`` is ``"owner/name"`` (e.g.
``"cipher813/alpha-engine-predictor"``).
"""
url = _GITHUB_BRANCH_URL.format(repo=repo, branch=branch)
req = urllib.request.Request(url, headers={"Accept": "application/vnd.github+json"})
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
payload = json.loads(resp.read())
return payload.get("commit", {}).get("sha")
except (OSError, json.JSONDecodeError) as exc:
# OSError covers urllib.error.URLError/HTTPError plus the bare
# TimeoutError that urlopen raises on a read-phase timeout (the
# 2026-05-07 weekday SF DeployDriftCheck failure: read timed out
# inside http.client.getresponse, which is past urllib's
# OSError → URLError wrap point in do_open).
log.warning("Deploy-drift: GitHub API unreachable (%s) — cannot compare", exc)
return None