-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsignal_reader.py
More file actions
603 lines (513 loc) · 23.5 KB
/
signal_reader.py
File metadata and controls
603 lines (513 loc) · 23.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
"""
Read signals.json from S3 and return parsed, validated signal data.
"""
from __future__ import annotations
import json
import logging
from datetime import date, timedelta
import boto3
from botocore.exceptions import ClientError
from alpha_engine_lib.eval_artifacts import load_latest_eval_artifact
from alpha_engine_lib.universe import filter_to_universe
logger = logging.getLogger(__name__)
REGIME_SUBSTRATE_PREFIX = "regime"
def read_regime_substrate(s3_bucket: str) -> dict | None:
"""Read the latest regime substrate artifact via canonical sidecar.
Wraps ``alpha_engine_lib.eval_artifacts.load_latest_eval_artifact``
pointed at ``s3://{bucket}/regime/latest.json``. Returns the parsed
payload dict (composite.intensity_z, hmm.posterior, bocpd.run_length_z,
guardrails, run_id) or ``None`` if the artifact is unavailable.
Substrate is produced weekly by alpha-engine-predictor's regime
Lambda (Saturday SF ``RegimeSubstrate`` state) — see
``regime-v3-260514.md`` §6 Stage A. Returning None on read failure
is the contract: the executor's regime-aware sizing defaults to a
1.0× multiplier when intensity_z is unavailable (legacy behavior
preserved).
"""
s3 = boto3.client("s3")
return load_latest_eval_artifact(
s3, bucket=s3_bucket, prefix=REGIME_SUBSTRATE_PREFIX,
)
def extract_intensity_z(substrate: dict | None) -> float | None:
"""Pull ``composite.intensity_z`` out of a regime substrate payload.
Returns ``None`` if ``substrate`` is None, missing the ``composite``
block, or its ``intensity_z`` is non-numeric. The substrate writer
(predictor regime/substrate.py) guarantees the shape on success;
this helper just defends against schema drift + None propagation.
"""
if not isinstance(substrate, dict):
return None
composite = substrate.get("composite")
if not isinstance(composite, dict):
return None
val = composite.get("intensity_z")
if isinstance(val, (int, float)) and not isinstance(val, bool):
return float(val)
return None
REGIME_FAST_SIGNAL_PREFIX = "regime/fast_signal"
def read_fast_signal(s3_bucket: str) -> dict | None:
"""Read the latest daily fast-signal artifact (Stage F2).
Mirror of ``read_regime_substrate``. Wraps
``load_latest_eval_artifact`` on ``s3://{bucket}/regime/fast_signal/
latest.json`` — the observe-only daily BOCPD circuit-breaker output
produced by alpha-engine-predictor's ``regime_fast_signal``
inference stage (regime-fast-signal-260515.md). Returns the parsed
payload (``forced_bear``, ``intensity_z``, ``change_confidence``,
``warmup``, …) or ``None`` if unavailable. ``None`` ⇒ the executor
treats forced_bear as False (legacy behavior preserved).
"""
s3 = boto3.client("s3")
return load_latest_eval_artifact(
s3, bucket=s3_bucket, prefix=REGIME_FAST_SIGNAL_PREFIX,
)
def extract_forced_bear(fast_signal: dict | None) -> bool:
"""Pull the ``forced_bear`` latch out of a fast-signal payload.
Returns ``False`` when the payload is None, malformed, or still in
``warmup`` (a warming detector must never assert a regime break —
the producer already suppresses forced_bear during warmup, this is
defence-in-depth against schema drift / a stale artifact).
"""
if not isinstance(fast_signal, dict):
return False
if fast_signal.get("warmup") is True:
return False
return fast_signal.get("forced_bear") is True
REGIME_DRAWDOWN_PREFIX = "regime/drawdown"
def read_drawdown_substrate(s3_bucket: str) -> dict | None:
"""Read the latest daily drawdown-leg artifact (regime ensemble leg 3).
Mirror of ``read_fast_signal``. Wraps ``load_latest_eval_artifact``
on ``s3://{bucket}/regime/drawdown/latest.json`` — the observe-only
deterministic drawdown leg produced by alpha-engine-predictor's
daily ``regime_fast_signal`` stage (regime-drawdown-hysteresis-
260518.md). Returns the parsed payload (``spy``, ``excess``,
``effective_regime``, ``observed``, …) or ``None`` if unavailable.
``None`` ⇒ the executor applies no drawdown override (legacy
behavior preserved).
"""
s3 = boto3.client("s3")
return load_latest_eval_artifact(
s3, bucket=s3_bucket, prefix=REGIME_DRAWDOWN_PREFIX,
)
def extract_drawdown_effective_regime(payload: dict | None) -> str | None:
"""Pull the composed ``effective_regime`` string out of a drawdown
artifact.
The daily stage stores ``effective_regime`` as the
``compose_effective_regime`` dict (``{"effective_regime": str,
"drivers": {...}}``); a bare string is also tolerated for schema
resilience. Returns ``None`` when absent/malformed (⇒ no override).
"""
if not isinstance(payload, dict):
return None
er = payload.get("effective_regime")
if isinstance(er, dict):
er = er.get("effective_regime")
return er if isinstance(er, str) and er else None
def read_predictions(s3_bucket: str) -> tuple[dict[str, dict], str | None]:
"""
Read predictor/predictions/latest.json from S3.
Returns: ``({ticker: prediction_dict}, predictions_date)`` where
``predictions_date`` is the top-level ``date`` field on the JSON
payload (the ``predictions/{date}.json`` filename date the GBM run
produced). Returns ``({}, None)`` if not available.
The date is surfaced separately because ``latest.json`` is a pointer
that may resolve to a prior trading day's predictions during the
Saturday/holiday window — readers (esp. trade logging for
transparency lineage) need the actual filename date, not today's
date. See ROADMAP "Phase 2 transparency-inventory" → trade execution
decisions row.
"""
s3 = boto3.client("s3")
try:
obj = s3.get_object(Bucket=s3_bucket, Key="predictor/predictions/latest.json")
data = json.loads(obj["Body"].read())
if data.get("timed_out"):
logger.warning(
"Predictions timed out — using partial set (%d tickers)",
len(data.get("predictions", [])),
)
preds = data.get("predictions", [])
result = {p["ticker"]: p for p in preds if "ticker" in p}
predictions_date = data.get("date")
logger.info(
"Predictions loaded | n=%d | date=%s", len(result), predictions_date,
)
return result, predictions_date
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
logger.warning("predictions/latest.json not found — running without GBM input")
return {}, None
raise
def read_signals(s3_bucket: str, run_date: str | None = None) -> dict:
"""
Download signals/{date}/signals.json from S3.
Returns parsed signals dict. Raises ClientError if not found.
"""
d = run_date or str(date.today())
key = f"signals/{d}/signals.json"
s3 = boto3.client("s3")
logger.info(f"Reading signals from s3://{s3_bucket}/{key}")
obj = s3.get_object(Bucket=s3_bucket, Key=key)
data = json.loads(obj["Body"].read())
logger.info(
f"Signals loaded | market_regime={data.get('market_regime')} "
f"| universe={len(data.get('universe', []))} "
f"| candidates={len(data.get('buy_candidates', []))}"
)
return data
def read_signals_with_fallback(s3_bucket: str, run_date: str | None = None, max_lookback: int = 14) -> dict:
"""
Read the latest signals from S3.
Tries signals/latest.json first (written by Research alongside the dated file).
Falls back to date-scanning if the pointer doesn't exist.
The default max_lookback of 14 days covers the Research pipeline's weekly
cadence (Saturday 00:00 UTC) plus a one-week buffer for a missed Saturday
run. Shorter windows are fragile: by Friday of any given week, the most
recent Saturday signals file is already 6 days old, and a 5-day window
would fail even on a normal week.
A staleness WARNING is logged when the signals being returned are more
than 7 calendar days old, so a quietly-missed Saturday run becomes visible
in the executor's log stream even if the signals still load successfully.
Returns the signals dict. Raises RuntimeError if nothing found.
"""
s3 = boto3.client("s3")
# Try the latest.json pointer first
try:
obj = s3.get_object(Bucket=s3_bucket, Key="signals/latest.json")
data = json.loads(obj["Body"].read())
logger.info(
f"Signals loaded from signals/latest.json | date={data.get('date')} "
f"| universe={len(data.get('universe', []))}"
)
_warn_if_stale(data.get("date"), run_date)
return data
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
logger.info("signals/latest.json not found — falling back to date scan")
else:
raise
# Fallback: scan backward by date
start = date.fromisoformat(run_date) if run_date else date.today()
tried: list[str] = []
for days_back in range(max_lookback + 1):
candidate = start - timedelta(days=days_back)
try:
signals = read_signals(s3_bucket, str(candidate))
if days_back > 0:
log_fn = logger.warning if days_back > 7 else logger.info
log_fn(
f"No signals for {start} — using {candidate} "
f"({days_back} calendar day(s) old). Dates tried: {tried}. "
f"Research pipeline may have missed a weekly run."
if days_back > 7
else f"No signals for {start} — using {candidate} "
f"({days_back} calendar day(s) old). Dates tried: {tried}"
)
return signals
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
logger.info(f"No signals file for {candidate}, looking further back...")
tried.append(str(candidate))
continue
raise
raise RuntimeError(
f"No signals found within {max_lookback} calendar days of {start}. "
f"Dates tried: {tried}. Check that the research pipeline ran recently."
)
def _warn_if_stale(signals_date: str | None, run_date: str | None) -> None:
"""Log a WARNING if the loaded signals are more than 7 days old relative to run_date.
Staleness >7 days means the Research pipeline likely missed its Saturday
run — the signals will still work, but something upstream needs investigation.
"""
if not signals_date:
return
try:
sig = date.fromisoformat(signals_date)
except (ValueError, TypeError):
return
ref = date.fromisoformat(run_date) if run_date else date.today()
age = (ref - sig).days
if age > 7:
logger.warning(
f"Loaded signals are {age} calendar days old (signals_date={sig}, "
f"run_date={ref}). Research pipeline may have missed a weekly run."
)
def filter_buy_candidates_to_universe(
signals: dict,
signals_bucket: str,
) -> dict:
"""Drop buy_candidates whose tickers aren't in the ArcticDB universe library.
Defense-in-depth layer. Research's ``population_selector.
compute_exits_and_open_slots`` (alpha-engine-research#41) is the
primary universe guardrail — it drops non-S&P incumbents at the
exit-evaluator stage. This function is the caller-side net: if a
ticker somehow sneaks past (manual signals.json edit, a Research
bug, a universe-drift window where DataPhase1 hasn't repopulated
the ArcticDB library yet), dropping here prevents sizing positions
against data that doesn't exist.
Scope: ``buy_candidates`` only. The ``universe`` list (EXIT/REDUCE/
HOLD for existing holdings) is left unfiltered — if we somehow
hold a position outside the universe we still need to exit it,
and per-ticker ArcticDB reads for ATR/VWAP on those will surface
as clear named errors downstream if they fail.
If the ArcticDB read itself fails (library unreachable, IAM miss),
the filter is skipped with a WARNING — better to let the
executor's own ArcticDB reads surface that as their own, clearer
error than to block on a defense-in-depth layer.
Origin: 2026-04-20 — TSM + ASML persisted as population incumbents
despite being absent from constituents.json; manifested as
``NoSuchVersionException`` deep in executor-sim replay.
"""
buy = signals.get("buy_candidates") or []
if not buy:
return signals
try:
# Local import — avoids top-level circular (price_cache imports
# executor.market_hours which touches signal_reader indirectly).
from executor.price_cache import _open_universe_library
universe_lib = _open_universe_library(signals_bucket)
universe_symbols = frozenset(universe_lib.list_symbols())
except Exception as exc: # noqa: BLE001 — see docstring
logger.warning(
"Skipping buy-candidate universe filter — could not open ArcticDB "
"universe library: %s. Executor's direct ArcticDB reads will surface "
"any data issues as their own named errors downstream.",
exc,
)
return signals
# Membership predicate is delegated to ``alpha_engine_lib.universe`` so
# this Layer 2 filter and research's Layer 1 ``population_selector`` filter
# share one canonical code path (no silent divergence on universe drift —
# see lib v0.13.0 docstring).
allowed, dropped_entries = filter_to_universe(buy, universe_symbols)
dropped_tickers = [
entry["ticker"] for entry in dropped_entries
if isinstance(entry, dict) and isinstance(entry.get("ticker"), str)
]
if dropped_tickers:
logger.warning(
"[signal_reader] dropped %d buy_candidate(s) not in ArcticDB "
"universe: %s. Research's population_selector (alpha-engine-"
"research#41) should have caught these — the fact that they "
"reached here means one of: (a) Research bug, (b) manual edit "
"to signals.json, (c) universe-library drift window. Not hard-"
"failing because the remaining %d buy_candidate(s) are valid.",
len(dropped_tickers),
dropped_tickers,
len(allowed),
)
signals = dict(signals) # shallow copy to avoid mutating caller's dict
signals["buy_candidates"] = allowed
return signals
def filter_buy_candidates_by_coverage(
signals: dict,
coverage_map: dict[str, float],
min_coverage: float,
) -> dict:
"""Drop buy_candidates whose feature coverage is below ``min_coverage``.
Admission gate — the hard lower bound of the graceful-degrade chain
introduced 2026-04-21 evening + 2026-04-22 (Brian's aggressive-new-listings
posture reconfirmation). Coverage comes from ``price_cache.load_feature_coverage``;
tickers absent from the ArcticDB universe library appear with 0.0 and
naturally fail this gate.
Scope: ``buy_candidates`` only. Held positions (``universe`` list —
EXIT/REDUCE/HOLD) are NEVER filtered here. A held ticker whose
coverage drops below threshold still needs its exit/management path
evaluated (stop-loss, drawdown sizing, etc.) — admission-refuse
applies to NEW ENTRY decisions, not to unwinding existing exposure.
Rejected tickers are logged with their named coverage + threshold +
top missing features and emitted to the ``admission_refused``
CloudWatch metric so low-coverage admissions are observable.
"""
buy = signals.get("buy_candidates") or []
if not buy:
return signals
allowed: list[dict] = []
refused: list[tuple[str, float]] = []
for entry in buy:
ticker = entry.get("ticker") if isinstance(entry, dict) else None
if not ticker:
continue
cov = coverage_map.get(ticker, 0.0)
if cov >= min_coverage:
allowed.append(entry)
else:
refused.append((ticker, cov))
if refused:
logger.warning(
"[signal_reader] admission gate refused %d buy_candidate(s) "
"below min_coverage=%.2f: %s. ``REFUSED_INSUFFICIENT_COVERAGE`` "
"— pure pre-history IPOs or extremely short-history tickers "
"cannot be meaningfully scored on long-window features. The %d "
"remaining candidate(s) will be sized normally (position sizer "
"will derate any partial-coverage tickers via "
"``coverage_sizing_enabled``).",
len(refused), min_coverage,
[(t, round(c, 3)) for t, c in refused],
len(allowed),
)
_emit_admission_refused_metric(len(refused))
signals = dict(signals) # avoid mutating caller
signals["buy_candidates"] = allowed
return signals
def _emit_admission_refused_metric(count: int) -> None:
"""Emit ``AlphaEngine/Executor/admission_refused_count`` gauge.
Best-effort: CloudWatch errors WARN but don't fail the planner —
admission gate decision is the load-bearing path, metrics are
observability. Parallel shape to ``_emit_unscored_count_metric``.
"""
try:
import boto3
cw = boto3.client("cloudwatch")
cw.put_metric_data(
Namespace="AlphaEngine/Executor",
MetricData=[{
"MetricName": "admission_refused_count",
"Value": float(count),
"Unit": "Count",
}],
)
except Exception as exc:
logger.warning(
"CloudWatch admission_refused_count metric failed: %s. "
"Not blocking the planner — admission decision already made.",
exc,
)
class UnscoredBuyCandidatesError(RuntimeError):
"""Raised when signals.json has buy_candidates that are missing from
predictions.json — the GBM veto gate is structurally unreachable for
those tickers, so sizing positions would route around a risk control.
Self-healing should happen upstream (weekday Step Function's coverage-gap
Choice state re-invokes predictor with --tickers). This error is the
read-time defense-in-depth backstop: if the gap reaches the executor, we
refuse to trade rather than bypass the veto.
"""
def __init__(self, missing: list[str], n_buy: int, n_preds: int):
self.missing = missing
self.n_buy = n_buy
self.n_preds = n_preds
super().__init__(
f"Coverage gap: {len(missing)} of {n_buy} buy_candidate(s) "
f"not present in predictions.json (which has {n_preds} tickers). "
f"Missing: {', '.join(missing)}. "
"Refusing to size positions — GBM veto gate is unreachable for "
"these tickers. Re-run predictor with --tickers to close the gap."
)
def _emit_unscored_count_metric(count: int) -> None:
"""Emit CloudWatch metric AlphaEngine/Predictor/unscored_buy_candidates_count.
Best-effort; never raises. The hard-fail (UnscoredBuyCandidatesError) is
the functional guard — this metric + CloudWatch alarm is the long-term
guard against the self-healing mechanism silently regressing.
"""
try:
cw = boto3.client("cloudwatch")
cw.put_metric_data(
Namespace="AlphaEngine/Predictor",
MetricData=[{
"MetricName": "unscored_buy_candidates_count",
"Value": float(count),
"Unit": "Count",
}],
)
except Exception as exc: # noqa: BLE001 — observability, must never block trading path
logger.warning("CloudWatch metric emission failed: %s", exc)
def assert_predictions_cover_buy_candidates(
signals: dict,
predictions_by_ticker: dict,
) -> None:
"""Verify every buy_candidate has a prediction row. Raise on gap.
Always emits the `unscored_buy_candidates_count` CloudWatch metric (even
with value 0) so alarm baselines are continuous.
"""
buy = signals.get("buy_candidates") or []
buy_tickers = {
(e.get("ticker") or "").upper()
for e in buy if isinstance(e, dict) and e.get("ticker")
}
pred_tickers = {
(t or "").upper() for t in (predictions_by_ticker or {}).keys()
}
missing = sorted(buy_tickers - pred_tickers)
_emit_unscored_count_metric(len(missing))
if missing:
raise UnscoredBuyCandidatesError(
missing=missing,
n_buy=len(buy_tickers),
n_preds=len(pred_tickers),
)
def patch_unknown_sectors_with_constituents(signals_raw: dict, s3_bucket: str) -> int:
"""Backfill sector="Unknown" or missing on ENTER signals using
constituents.json sector_map as the authoritative GICS source.
Defense-in-depth against an escape from research's signals.json
preflight (alpha-engine-research#126). The 2026-05-04 EOG/NVT
incident wrote "Unknown" into trades.db because the planner consumed
research's first-pass file before sector_map had loaded. The research
preflight is the primary gate; this is the executor-side catch.
Mutates ``signals_raw["buy_candidates"]`` and ``signals_raw["universe"]``
in place. Returns count of patches applied for caller logging.
Lazy-loads the constituents map only when at least one ENTER signal
needs patching, so the typical clean path pays no S3 round-trip.
"""
needs_patch = False
for key in ("buy_candidates", "universe"):
for s in signals_raw.get(key) or []:
if not isinstance(s, dict) or s.get("signal") != "ENTER":
continue
cur = s.get("sector")
if not cur or cur == "Unknown":
needs_patch = True
break
if needs_patch:
break
if not needs_patch:
return 0
from executor.eod_reconcile import _load_constituents_sector_map
constituents_map = _load_constituents_sector_map(s3_bucket)
if not constituents_map:
return 0
patched = 0
for key in ("buy_candidates", "universe"):
for s in signals_raw.get(key) or []:
if not isinstance(s, dict):
continue
cur = s.get("sector")
if cur and cur != "Unknown":
continue
ticker = s.get("ticker")
if not ticker:
continue
mapped = constituents_map.get(ticker)
if mapped:
s["sector"] = mapped
patched += 1
return patched
def get_actionable_signals(signals: dict) -> dict:
"""
Filter signals to actionable entries by signal type.
Returns:
{
"enter": [signal, ...],
"exit": [signal, ...],
"reduce": [signal, ...],
"hold": [signal, ...],
"market_regime": str,
"sector_ratings": {sector: {"rating": str, "modifier": float, "rationale": str}},
}
"""
universe = signals.get("universe", [])
candidates = signals.get("buy_candidates", [])
# Candidates take precedence — dedupe by ticker
seen: set[str] = set()
all_stocks: list[dict] = []
for s in candidates + universe:
ticker = s.get("ticker")
if ticker and ticker not in seen:
seen.add(ticker)
all_stocks.append(s)
return {
"enter": [s for s in all_stocks if s.get("signal") == "ENTER"],
"exit": [s for s in all_stocks if s.get("signal") == "EXIT"],
"reduce": [s for s in all_stocks if s.get("signal") == "REDUCE"],
"hold": [s for s in all_stocks if s.get("signal") == "HOLD"],
"market_regime": signals.get("market_regime", "neutral"),
"sector_ratings": signals.get("sector_ratings", {}),
}