-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdaemon.py
More file actions
1520 lines (1374 loc) · 67.1 KB
/
daemon.py
File metadata and controls
1520 lines (1374 loc) · 67.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Alpha Engine Intraday Daemon — sole order executor during market hours.
Runs from ~6:45 AM to 4:00 PM ET on trading days. Uses 15-minute delayed
streaming data from IB Gateway (free, no subscription required).
Architecture:
- Morning batch (main.py) writes the order book: approved entries, urgent
exits/reduces, and active stop records. It places NO orders.
- Daemon is the sole order executor:
Phase 0: Execute urgent exits/reduces immediately (no trigger delay)
Phase 1: Monitor entries for technical triggers (pullback, VWAP, support, expiry)
Phase 2: Monitor stops for exit rules (trailing stop, profit-take, collapse)
- All trades logged to trades.db with source="intraday_daemon"
- Telegram notifications sent for each trade
Usage:
python -m executor.daemon # run until market close
python -m executor.daemon --dry-run # log triggers without placing orders
The daemon uses clientId=2 to avoid conflicts with the morning batch (clientId=1).
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import signal
import sys
import time as _time # aliased to avoid shadowing by local 'time' variables
from datetime import date, datetime
import pytz
import yaml
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from executor.decision_capture import (
DecisionCaptureWriteError,
capture_entry_trigger,
capture_exit_rule,
is_decision_capture_enabled,
)
from executor.entry_triggers import EntryTriggerEngine
from executor.ibkr import IBKRClient
from executor.intraday_exit_manager import IntradayExitManager
from executor.intraday_snapshot import (
IntradaySnapshotWriter,
compute_surveillance_universe,
)
from executor.daemon_state_logger import get_logger as _get_decision_logger
from executor.market_hours import is_market_hours
from executor.notifier import send_daemon_status, send_trade_alert
from executor.order_book import OrderBook
from executor.price_monitor import PriceMonitor
from executor.strategies.config import load_strategy_config
from executor.trade_logger import init_db, log_trade, get_unmatched_entry
from alpha_engine_lib.logging import setup_logging
# See executor/main.py for the rationale on IB Error 10197 / 10349 suppression.
_FLOW_DOCTOR_EXCLUDE_PATTERNS = [r"Error 10197", r"Error 10349"]
_FLOW_DOCTOR_YAML = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "flow-doctor.yaml")
setup_logging("daemon", flow_doctor_yaml=_FLOW_DOCTOR_YAML, exclude_patterns=_FLOW_DOCTOR_EXCLUDE_PATTERNS)
logger = logging.getLogger(__name__)
# Terminology:
# "status" — IB order execution status: "Filled", "Rejected", "Timeout", etc.
# "signal" — trading action type from Research/strategy: "ENTER", "EXIT", "REDUCE", "COVER"
from executor.config_loader import get_config_path
# Order retry policy — applied uniformly to all order types (urgent exits, intraday exits, entries)
MAX_ORDER_RETRIES = 3
ORDER_RETRY_DELAYS = [0, 2, 5] # seconds between attempts
# Market timing constants (US Eastern)
MARKET_OPEN_HOUR = 9
MARKET_OPEN_MINUTE = 30
# US Eastern timezone — hoisted to module scope so helper functions like
# ``_execute_entry`` (decision-capture wiring, L2308) can read it without
# threading the tz through every call site.
_ET = pytz.timezone("US/Eastern")
# Connection retry limits
MAX_RECONNECT_BACKOFF_SECS = 300
DEFAULT_CONNECT_BACKOFF_BASE = 30
# Exception types that indicate a dropped IB Gateway connection
try:
from asyncio import IncompleteReadError, TimeoutError as AsyncTimeoutError
asyncio_exceptions = (IncompleteReadError, AsyncTimeoutError)
except ImportError:
asyncio_exceptions = ()
_shutdown_requested = False
_midday_backup_done = False
def _handle_signal(signum, frame):
global _shutdown_requested
logger.info("Shutdown signal received (%s)", signum)
_shutdown_requested = True
def _cleanup_connections(
monitor: "PriceMonitor | None",
ibkr: "IBKRClient | None",
) -> None:
"""Best-effort cleanup of IB connections."""
if monitor:
try:
monitor.unsubscribe_all()
except Exception:
logger.debug("monitor.unsubscribe_all failed during cleanup", exc_info=True)
if ibkr:
try:
ibkr.disconnect()
except Exception:
logger.debug("ibkr.disconnect failed during cleanup", exc_info=True)
def _reconnect(
ibkr: IBKRClient,
monitor: "PriceMonitor",
order_book: "OrderBook",
config: dict,
client_id: int,
max_reconnect_attempts: int = 10,
backoff_base: int = 30,
) -> tuple[IBKRClient, PriceMonitor]:
"""Reconnect to IB Gateway after a connection drop.
Returns (new_ibkr, new_monitor) tuple. Raises after max_reconnect_attempts.
"""
_cleanup_connections(monitor, ibkr)
send_daemon_status(
"\u26a0\ufe0f *IB Gateway connection lost* — attempting reconnect..."
)
for attempt in range(1, max_reconnect_attempts + 1):
if _shutdown_requested:
raise KeyboardInterrupt("Shutdown during reconnect")
wait = min(backoff_base * attempt, MAX_RECONNECT_BACKOFF_SECS)
logger.info("Reconnect attempt %d/%d — waiting %ds...", attempt, max_reconnect_attempts, wait)
_time.sleep(wait)
try:
new_ibkr = IBKRClient(
host=config["ibkr_host"],
port=config["ibkr_port"],
client_id=client_id,
reconnect_attempts=config.get("ibkr_reconnect_attempts", 3),
)
new_monitor = PriceMonitor(new_ibkr.ib)
new_monitor.subscribe(order_book.all_tickers())
logger.info("Reconnected to IB Gateway successfully")
send_daemon_status("\u2705 *IB Gateway reconnected*")
return new_ibkr, new_monitor
except Exception as e:
logger.warning("Reconnect attempt %d/%d failed: %s", attempt, max_reconnect_attempts, e)
if attempt == max_reconnect_attempts:
send_daemon_status(
f"\u274c *IB Gateway reconnect failed after {max_reconnect_attempts} attempts* — daemon exiting"
)
raise
_allow_shorts: bool = False # Set from config in run_daemon(); default: never short
def _validate_sell_shares(
positions: dict,
ticker: str,
shares: int,
action: str,
context: str,
pending_sell_shares: int = 0,
) -> int | None:
"""Validate and cap sell shares against held position minus in-flight sells.
Returns adjusted share count, or None if the sell should be skipped
(no capacity to sell — selling would go short).
``pending_sell_shares`` is the total un-filled SELL quantity already working
at the broker for this ticker. Passing it in lets us cap against
held - in-flight so a retry/duplicate can't cumulatively blow past the
position. (PFE incident 2026-04-22: three duplicate SELL 77s each passed
this check individually against held=155, summing to 231 → short 76.)
When ``_allow_shorts`` is True (set via ``allow_shorts: true`` in
risk.yaml), the guard is bypassed and sells are allowed to create
short positions.
"""
if _allow_shorts:
return shares
held = int(positions.get(ticker, {}).get("shares", 0))
available = held - int(pending_sell_shares)
if available <= 0:
logger.warning(
"SKIP %s %s %s: hold %d shares, in-flight SELL %d — no capacity",
context, action, ticker, held, pending_sell_shares,
)
return None
if shares > available:
logger.warning(
"CAPPING %s %s %s: requested %d but available %d (held %d minus in-flight SELL %d)",
context, action, ticker, shares, available, held, pending_sell_shares,
)
return available
return shares
def _place_order_with_retry(
ibkr: IBKRClient,
ticker: str,
side: str,
shares: int,
label: str,
use_bracket: bool = False,
bracket_kwargs: dict | None = None,
) -> dict:
"""Place a market order with retry on Rejected/Timeout.
Returns the order result dict. Logs retries and final failure.
Retry rules:
* Rejected → real failure (Cancelled/Inactive/ApiCancelled). Retry.
* Timeout → no answer from IB Gateway at all. Cancel prior orderId
(best-effort) to prevent a stale duplicate, then retry.
* Working → IB accepted the order and is holding it (PreSubmitted at
pre-open, Submitted routing, etc.). Do NOT retry — it will
fill when the hold releases. Retrying duplicates the order
(PFE incident 2026-04-22).
* Filled / PartialFill → done.
L133 (2026-05-22): the returned dict carries two new audit-trail
fields the caller embeds into ``rationale_json``:
* ``retry_count`` (int) — 0 if the first attempt succeeded.
* ``attempts`` (list of dict) — per-attempt
``{attempt, status, ib_order_id, retry_reason}`` records.
Closes the home-endnote gap that PR #100 had to reword from
"every order, fill, retry, and exit decision recorded with
rationale" — the retry chain is now persisted alongside the
final fill state, not lost on the floor.
"""
order_result = None
attempts: list[dict] = []
for attempt in range(MAX_ORDER_RETRIES):
retry_reason: str | None = None
if attempt > 0:
prior_id = order_result.get("ib_order_id") if order_result else None
prior_status = order_result.get("status") if order_result else None
retry_reason = prior_status # "Timeout" / "Rejected" → why we're retrying
if prior_status == "Timeout" and prior_id is not None:
try:
ibkr.cancel_order(prior_id)
except Exception as exc:
logger.warning("cancel_order(%s) raised: %s", prior_id, exc)
_time.sleep(ORDER_RETRY_DELAYS[attempt])
logger.info("Retry %d/%d: %s %s", attempt + 1, MAX_ORDER_RETRIES, label, ticker)
if use_bracket and bracket_kwargs:
from executor.bracket_orders import place_bracket_with_stop
order_result = place_bracket_with_stop(ibkr, ticker, shares, **bracket_kwargs)
else:
order_result = ibkr.place_market_order(ticker, side, shares)
attempts.append({
"attempt": attempt + 1,
"status": order_result.get("status"),
"ib_order_id": order_result.get("ib_order_id"),
"retry_reason": retry_reason,
})
if order_result["status"] not in ("Rejected", "Timeout"):
break
# Embed the audit trail on the returned dict so log_trade callers
# can include it verbatim in rationale_json. Routing decisions
# never read these — purely audit.
order_result["retry_count"] = len(attempts) - 1
order_result["attempts"] = attempts
return order_result
def _enqueue_cover_for_unintended_shorts(
positions: dict,
order_book: "OrderBook",
run_date: str,
) -> list[str]:
"""Scan IB positions; enqueue an URGENT COVER for any short.
Runs at the top of Phase 0 when ``allow_shorts=False`` (the default).
Any negative position is treated as unintended — we did not knowingly
open it — and must be flattened immediately at market open.
Mirror of urgent_exit: emits a COVER urgent into the order book so the
existing Phase 0 BUY path executes it. Dedupe is handled by
``OrderBook.add_urgent_exit`` (ticker+signal), so calling this twice in
a session is safe.
Returns the list of tickers that had an auto-cover enqueued.
"""
if _allow_shorts:
return []
covered = []
for ticker, pos in positions.items():
shares = int(pos.get("shares", 0))
if shares >= 0:
continue
qty = abs(shares)
logger.error(
"AUTO-COVER %s: detected short position %d — enqueuing URGENT COVER %d",
ticker, shares, qty,
)
order_book.add_urgent_exit({
"ticker": ticker,
"signal": "COVER",
"shares": qty,
"reason": "auto_cover_unintended_short",
"detail": f"position={shares} at Phase 0 open; allow_shorts=False",
"date": run_date,
})
covered.append(ticker)
return covered
def load_config() -> dict:
with open(get_config_path()) as f:
return yaml.safe_load(f)
def run_daemon(dry_run: bool = False) -> None:
"""Main daemon loop — runs until market close or shutdown signal."""
global _shutdown_requested
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
config = load_config()
strategy_config = load_strategy_config(config)
# Preflight: AWS_REGION + S3 bucket reachable. The check_ib_paper_account
# primitive on the returned preflight instance is reused after IBKRClient
# connects below (replaces the inline live-account SAFETY HALT).
from executor.preflight import ExecutorPreflight
preflight = ExecutorPreflight(bucket=config["signals_bucket"], mode="daemon")
preflight.run()
# Flow Doctor: retrieve the shared instance set up at module import
from alpha_engine_lib.logging import get_flow_doctor
fd = get_flow_doctor()
global _allow_shorts
_allow_shorts = config.get("allow_shorts", False)
if _allow_shorts:
logger.warning("allow_shorts=true — short-sell prevention is DISABLED")
if not strategy_config.get("intraday_enabled", False):
logger.info("Intraday daemon is disabled in config — exiting")
return
client_id = strategy_config.get("intraday_client_id", 2)
poll_interval = strategy_config.get("intraday_poll_interval_sec", 60)
run_date = date.today().isoformat()
logger.info(
"Intraday daemon starting | date=%s | dry_run=%s | clientId=%d | poll=%ds",
run_date, dry_run, client_id, poll_interval,
)
# Wait for order book — polls every 2 minutes until one appears or market closes.
# This allows the daemon to recover from a late predictor inference or morning batch.
# _ET timezone is module-scoped now (decision-capture wiring needs it from
# _execute_entry); rebinding here would shadow the module constant.
order_book_poll_secs = strategy_config.get("order_book_poll_interval_sec", 120)
order_book = OrderBook.load()
notified_no_order_book = False
while not order_book.has_content() and not _shutdown_requested:
now_et = datetime.now(_ET)
# Notify once at market open that we have no order book
if not notified_no_order_book and (now_et.hour > MARKET_OPEN_HOUR or (now_et.hour == MARKET_OPEN_HOUR and now_et.minute >= MARKET_OPEN_MINUTE)):
send_daemon_status(
"\u26a0\ufe0f *No order book at market open*\n"
f"Date: {run_date}\n"
"Waiting for morning batch to write order book..."
)
notified_no_order_book = True
# Give up once market session ends (4:15 PM ET, accounts for 15-min data delay)
if not is_market_hours(now_et) and (now_et.hour > MARKET_OPEN_HOUR or (now_et.hour == MARKET_OPEN_HOUR and now_et.minute >= MARKET_OPEN_MINUTE)):
send_daemon_status(
"\u274c *No order book received today*\n"
f"Date: {run_date}\n"
"Market closed — daemon exiting."
)
logger.info("Order book never arrived and market has closed — exiting.")
return
logger.info(
"Order book is empty — waiting %ds for morning batch to write it...",
order_book_poll_secs,
)
_time.sleep(order_book_poll_secs)
order_book = OrderBook.load()
if _shutdown_requested:
return
# Notify that order book has arrived (especially useful on late-start days)
if notified_no_order_book:
send_daemon_status(
"\u2705 *Order book received (late start)*\n"
f"Date: {run_date}\n"
f"Entries: {len(order_book.pending_entries())}\n"
f"Urgent exits: {len(order_book.pending_urgent_exits())}\n"
f"Stops: {len(order_book.active_stops())}"
)
logger.info("Order book arrived after market open — proceeding with late start")
# Connect to IB Gateway (with retry)
db_path = config["db_path"]
conn = init_db(db_path)
max_connect_attempts = config.get("ibkr_daemon_max_connect_attempts", 10)
connect_backoff_base = DEFAULT_CONNECT_BACKOFF_BASE
def _connect_ibkr() -> IBKRClient:
"""Connect to IB Gateway with exponential backoff. Raises after max attempts."""
for attempt in range(1, max_connect_attempts + 1):
try:
client = IBKRClient(
host=config["ibkr_host"],
port=config["ibkr_port"],
client_id=client_id,
reconnect_attempts=config.get("ibkr_reconnect_attempts", 3),
)
# Paper account safety check — delegate the "starts with D"
# rule to the shared preflight primitive so every module uses
# the same definition of "paper account."
try:
accounts = client.ib.managedAccounts()
acct = accounts[0] if accounts else ""
try:
preflight.check_ib_paper_account(acct)
except RuntimeError as exc:
logger.critical("SAFETY HALT: %s — daemon refusing to trade.", exc)
client.disconnect()
raise SystemExit(1) from exc
logger.info("Paper account verified: %s", acct)
except SystemExit:
raise
except Exception as e:
logger.error(
"Paper account verification failed on attempt %d: %s — will retry",
attempt, e,
)
client.disconnect()
raise # let outer retry loop reconnect and re-verify
return client
except SystemExit:
raise
except Exception as e:
wait = min(connect_backoff_base * attempt, MAX_RECONNECT_BACKOFF_SECS)
logger.warning(
"IB Gateway connection attempt %d/%d failed: %s — retrying in %ds",
attempt, max_connect_attempts, e, wait,
)
if attempt == max_connect_attempts:
raise
_time.sleep(wait)
if _shutdown_requested:
raise KeyboardInterrupt("Shutdown during reconnect")
ibkr = _connect_ibkr()
monitor = PriceMonitor(ibkr.ib)
exit_mgr = IntradayExitManager(strategy_config)
entry_engine = EntryTriggerEngine(strategy_config)
# Surveillance universe — signals.signals ∪ buy_candidates ∪ order_book ∪
# current_positions (+ SPY). Same universe is derived independently by
# the surveillance Lambda from the same canonical artifacts, so producer
# and consumer agree by construction (ROADMAP L1067). Best-effort:
# signals.json read failure degrades to order_book + positions only, so
# daemon still trades the order book even if signals are unreadable.
try:
from executor.signal_reader import read_signals_with_fallback
signals_for_surveillance = read_signals_with_fallback(
config["signals_bucket"], run_date,
)
except Exception as _sig_err: # noqa: BLE001
logger.warning(
"surveillance: read_signals_with_fallback failed (%s) — "
"universe degrades to order_book + positions only",
_sig_err,
)
signals_for_surveillance = None
try:
positions_for_surveillance = (
list(ibkr.get_positions().keys()) if not dry_run else []
)
except Exception as _pos_err: # noqa: BLE001
logger.warning("surveillance: get_positions failed (%s) — universe degrades", _pos_err)
positions_for_surveillance = []
tickers = compute_surveillance_universe(
signals_for_surveillance,
order_book_tickers=order_book.all_tickers(),
current_positions=positions_for_surveillance,
)
monitor.subscribe(tickers)
# S3 snapshot writer — publishes latest_prices + heartbeat at every poll
# tick. Surveillance Lambda treats heartbeat staleness as daemon-down.
snapshot_writer = IntradaySnapshotWriter(
bucket=config["signals_bucket"],
daemon_pid=os.getpid(),
)
n_urgent = len(order_book.pending_urgent_exits())
send_daemon_status(
f"\u2705 *Daemon started*\n"
f"Date: {run_date}\n"
f"Monitoring: {len(tickers)} tickers\n"
f"Urgent exits: {n_urgent}\n"
f"Entries: {len(order_book.pending_entries())}\n"
f"Stops: {len(order_book.active_stops())}"
)
trades_executed = 0
executed_tickers: set = set() # tracks tickers already traded today
# Track whether we reached the live trading window so the finally block
# can decide whether it's safe to fire the EOD pipeline. Pre-market exits
# (shutdown signal, early crash, etc.) must NOT trigger EOD — market-close
# side-effects like stopping the trading EC2 instance would then run
# before the market ever opened.
market_opened = False
try:
# ── Wait for market open (daemon may start before 9:30 AM ET) ────
if not is_market_hours():
logger.info("Market not yet open — waiting for 9:30 AM ET...")
while not _shutdown_requested and not is_market_hours():
ibkr.ib.sleep(15)
if _shutdown_requested:
return
logger.info("Market is open — proceeding")
# Guard: Phase 0 urgent exits + live IB order placement must only run
# once the market is actually open. Reaching this line means the
# wait-for-open loop above exited on is_market_hours() == True.
market_opened = True
# ── Phase 0: Execute urgent exits/covers immediately (no trigger delay) ──
# Fetch current positions once for short-sell prevention checks
_phase0_positions = ibkr.get_positions() if not dry_run else {}
# Auto-cover: any short position at Phase 0 open is unintended
# (allow_shorts=False is the configured invariant). Enqueue URGENT
# COVERs into the order book before the normal loop so the existing
# BUY path flattens them. Covers the residue from bugs like the PFE
# retry-duplicate incident 2026-04-22.
if not dry_run:
auto_covered = _enqueue_cover_for_unintended_shorts(
_phase0_positions, order_book, run_date,
)
if auto_covered:
send_daemon_status(
f"⚠️ *AUTO-COVER enqueued for unintended shorts*\n"
f"Tickers: {', '.join(auto_covered)}"
)
order_book.save()
for urgent in order_book.pending_urgent_exits():
ticker = urgent["ticker"]
action = urgent["signal"] # "EXIT", "REDUCE", or "COVER"
shares = urgent["shares"]
reason = urgent.get("reason", "research_signal")
# COVER = buy to close a short position
if action == "COVER":
side = "BUY"
else:
side = "SELL"
# Short-sell prevention: cap sell shares at held minus in-flight.
# In-flight check defends against the PFE incident 2026-04-22
# where a retry loop issued three duplicate SELL 77s that each
# individually passed held=155, summing to 231 → short 76.
pending = 0
if not dry_run:
try:
pending = ibkr.get_open_sell_shares(ticker)
except Exception as exc:
logger.warning("get_open_sell_shares(%s) failed: %s — treating as 0", ticker, exc)
validated = _validate_sell_shares(
_phase0_positions, ticker, shares, action, "URGENT",
pending_sell_shares=pending,
)
if validated is None:
order_book.mark_urgent_executed(ticker, action)
continue
shares = validated
logger.info(
"%sURGENT %s %s: %s %d shares | reason: %s",
"[DRY RUN] " if dry_run else "",
action, ticker, side, shares, reason,
)
if not dry_run:
order_result = _place_order_with_retry(ibkr, ticker, side, shares, f"URGENT {action}")
if order_result["status"] in ("Rejected", "Timeout"):
logger.error("URGENT %s %s FAILED after %d attempts: %s", action, ticker, MAX_ORDER_RETRIES, order_result["status"])
send_daemon_status(
f"\u26a0\ufe0f *URGENT {action} {ticker} FAILED*\n"
f"Status: {order_result['status']} after {MAX_ORDER_RETRIES} retries\n"
f"Shares: {shares} | Reason: {reason}"
)
continue
# Use actual filled quantity (handles PartialFill)
actual_shares = order_result.get("filled_shares") or shares
if actual_shares != shares:
logger.warning(
"URGENT %s %s partial fill: requested %d, filled %d",
action, ticker, shares, actual_shares,
)
fill_price = order_result.get("fill_price") or ibkr.get_current_price(ticker) or 0
# ── Roundtrip linkage for urgent exits ──
_entry = get_unmatched_entry(conn, ticker)
_entry_id = _entry["trade_id"] if _entry else None
_entry_fill = _entry["fill_price"] if _entry else None
_entry_spy = _entry.get("spy_price_at_order") if _entry else None
_entry_date = _entry["date"] if _entry else None
_spy_now = None
_spy_state = monitor.get_price("SPY")
if _spy_state:
_spy_now = _spy_state.get("last")
_rpnl = ((fill_price - _entry_fill) * actual_shares) if _entry_fill else None
_rpct = ((fill_price / _entry_fill) - 1) * 100 if _entry_fill else None
_spy_ret = ((_spy_now / _entry_spy) - 1) * 100 if (_spy_now and _entry_spy) else None
_ralpha = (_rpct - _spy_ret) if (_rpct is not None and _spy_ret is not None) else None
_dheld = (date.fromisoformat(run_date) - date.fromisoformat(_entry_date)).days if _entry_date else None
log_trade(conn, {
"date": run_date,
"ticker": ticker,
"action": action,
"shares": actual_shares,
"price_at_order": fill_price,
"portfolio_nav_at_order": None,
"position_pct": None,
"ib_order_id": order_result.get("ib_order_id"),
"fill_price": fill_price,
"fill_time": order_result.get("fill_time"),
"filled_shares": order_result.get("filled_shares"),
"status": order_result.get("status"),
"research_score": urgent.get("research_score"),
"research_conviction": urgent.get("research_conviction"),
"research_rating": urgent.get("research_rating"),
"sector_rating": urgent.get("sector_rating"),
"market_regime": urgent.get("market_regime"),
"predicted_direction": urgent.get("predicted_direction"),
"prediction_confidence": urgent.get("prediction_confidence"),
"exit_reason": reason,
"rationale_json": json.dumps({
"action": action,
"exit_reason": reason,
"exit_detail": urgent.get("detail", ""),
"source": "intraday_daemon",
"phase": "urgent",
# L133 — universal context on every exit, not
# just edge cases. Closes home endnote bullet 3.
"signal_context": {
"research_score": urgent.get("research_score"),
"research_rating": urgent.get("research_rating"),
"research_conviction": urgent.get("research_conviction"),
"sector": urgent.get("sector"),
"sector_rating": urgent.get("sector_rating"),
"market_regime": urgent.get("market_regime"),
"predicted_direction": urgent.get("predicted_direction"),
"prediction_confidence": urgent.get("prediction_confidence"),
},
# L133 — retry chain audit trail from
# _place_order_with_retry. retry_count=0 +
# single-entry attempts list = first-attempt
# success.
"retry_count": order_result.get("retry_count", 0),
"attempts": order_result.get("attempts", []),
}),
"entry_trade_id": _entry_id,
"trigger_price": fill_price,
"trigger_type": reason,
"spy_price_at_order": _spy_now,
"realized_pnl": _rpnl,
"realized_return_pct": _rpct,
"spy_return_during_hold": _spy_ret,
"realized_alpha_pct": _ralpha,
"days_held": _dheld,
# Phase 2 lineage — signal_date / prediction_date are
# the artifact filename dates the urgent_exits_with_meta
# record sourced from. Both default to None for COVER
# orders generated outside the deciders path.
"signal_date": urgent.get("signal_date"),
"prediction_date": urgent.get("prediction_date"),
})
order_book.mark_urgent_executed(ticker, action)
if action == "EXIT":
order_book.remove_stop(ticker)
# Update cached positions after execution
if action == "COVER":
_phase0_positions.pop(ticker, None)
elif ticker in _phase0_positions:
held_after = int(_phase0_positions[ticker].get("shares", 0)) - shares
_phase0_positions[ticker]["shares"] = held_after
# L165 (2026-05-22): pass the SEMANTIC action ("EXIT" /
# "REDUCE" / "COVER"), not the IB side ("SELL" / "BUY").
# The intraday `_execute_exit` path at L1094 already does
# this correctly; the prior asymmetric spelling caused
# "Telegram shows SELL but page 16 shows REDUCE" for
# morning urgent REDUCEs. The action label IS the
# alert's semantic payload — losing it silently fails
# the alert's purpose ([[feedback_no_silent_fails]]).
send_trade_alert(
action=action,
ticker=ticker,
shares=shares,
price=fill_price,
trigger=f"urgent_{reason}",
source="daemon",
)
# L139(a) — daemon-stage intraday replay capture. The
# urgent_exits loop is the morning-planner-to-daemon
# handoff; recording it here gives the backtester an
# entry point into the intraday decision stream for
# eventual gate-enforcement (L139b).
_get_decision_logger().record(
decision_type=(
"phase0_auto_cover" if action == "COVER"
else "urgent_exit"
),
ticker=ticker,
action=action,
trading_day=run_date,
shares=shares,
trigger_reason=reason,
fill_price=fill_price,
ib_order_id=order_result.get("ib_order_id"),
status=order_result.get("status"),
retry_count=order_result.get("retry_count", 0),
attempts=order_result.get("attempts", []),
context={
"exit_detail": urgent.get("detail", ""),
"research_score": urgent.get("research_score"),
"research_conviction": urgent.get("research_conviction"),
"research_rating": urgent.get("research_rating"),
"sector": urgent.get("sector"),
"sector_rating": urgent.get("sector_rating"),
"market_regime": urgent.get("market_regime"),
"predicted_direction": urgent.get("predicted_direction"),
"prediction_confidence": urgent.get("prediction_confidence"),
},
)
trades_executed += 1
# COVER trades shouldn't prevent new ENTER for the same ticker
if action != "COVER":
executed_tickers.add(ticker)
if n_urgent > 0:
order_book.save()
logger.info("Phase 0 complete: %d urgent exits processed", n_urgent)
# ── Phase 1+2: Monitor entries and exits ──────────────────────────
_last_heartbeat = _time.time()
_HEARTBEAT_INTERVAL = strategy_config.get("heartbeat_interval_sec", 3600)
while not _shutdown_requested:
# Check market hours
if not is_market_hours():
logger.info("Market closed — daemon shutting down")
break
try:
# Let ib_insync process events in short bursts so SIGTERM
# is checked promptly (avoids 60s blocking sleep that caused
# SIGKILL and dirty IB disconnects / competing sessions).
_poll_remaining = poll_interval
while _poll_remaining > 0 and not _shutdown_requested:
_chunk = min(_poll_remaining, 5)
ibkr.ib.sleep(_chunk)
_poll_remaining -= _chunk
if _shutdown_requested:
break
except (ConnectionError, OSError, asyncio_exceptions) as e:
logger.warning("IB Gateway connection lost during poll: %s — reconnecting", e)
ibkr, monitor = _reconnect(ibkr, monitor, order_book, config, client_id)
continue
# Reload order book in case morning batch updated it
order_book = OrderBook.load()
order_book.merge_executed(executed_tickers)
# Per-tick structured log line consumed by uptime_tracker.
# Format is stable — parsers match on the DAEMON_TICK prefix.
logger.info("DAEMON_TICK ib_connected=%s", str(ibkr.ib.isConnected()).lower())
# ── Intraday S3 snapshot (surveillance Lambda producer) ───
# Fire-and-forget; failures log a warning. Surveillance Lambda
# treats heartbeat staleness as daemon-down (ROADMAP L1067).
try:
snapshot_writer.write(
monitor.prices,
ib_connected=ibkr.ib.isConnected(),
subscribed_tickers=tickers,
)
except Exception as _snap_err: # noqa: BLE001
# Defensive — IntradaySnapshotWriter.write should already
# swallow S3 errors. This catch ensures no unexpected
# exception from the writer leaks into the trade loop.
logger.warning("intraday snapshot writer raised: %s", _snap_err)
# ── Heartbeat ─────────────────────────────────────────────
_elapsed = _time.time() - _last_heartbeat
if _elapsed >= _HEARTBEAT_INTERVAL:
n_pending = len(order_book.pending_entries())
n_stops = len(order_book.active_stops())
n_positions = len(ibkr.get_positions())
msg = (
f"\U0001f49a *Daemon heartbeat*\n"
f"Positions: {n_positions} | Stops: {n_stops} | "
f"Pending entries: {n_pending}\n"
f"Trades today: {trades_executed}"
)
ok = send_daemon_status(msg)
logger.info("Heartbeat sent (ok=%s) after %.0fs | pos=%d stops=%d pending=%d trades=%d", ok, _elapsed, n_positions, n_stops, n_pending, trades_executed)
_last_heartbeat = _time.time()
# ── Mid-day backup (noon ET) ─────────────────────────────
global _midday_backup_done
_now_bk = datetime.now(_ET)
if not _midday_backup_done and _now_bk.hour == 12 and _now_bk.minute < 5:
try:
from executor.trade_logger import backup_to_s3 as _midday_bk
_midday_bk(db_path, _now_bk.strftime("%Y-%m-%d"), config["signals_bucket"])
logger.info("Mid-day trades.db backup completed")
_midday_backup_done = True
except Exception as _bk_err:
logger.warning("Mid-day backup failed: %s", _bk_err)
# ── Check exits ──────────────────────────────────────────
for stop in order_book.active_stops():
ticker = stop["ticker"]
price_state = monitor.get_price(ticker)
if not price_state:
continue
# Check for trail update first
trail_update = exit_mgr.should_update_trail(stop, price_state["last"])
if trail_update:
new_high, new_stop = trail_update
order_book.update_stop_high_water(ticker, new_high, new_stop)
order_book.save()
logger.debug(
"Trail updated %s: high=$%.2f stop=$%.2f",
ticker, new_high, new_stop,
)
# Check exit rules
exit_signal = exit_mgr.evaluate(stop, price_state)
if exit_signal:
try:
_execute_exit(
ibkr, conn, order_book, exit_signal, price_state,
run_date, dry_run, monitor=monitor,
)
if not dry_run:
trades_executed += 1
executed_tickers.add(exit_signal.get("ticker"))
# Emit executor:exit_rules DecisionArtifact
# (L2308 PR 4 — daemon-side intraday exits).
# Lands AFTER the fill succeeded (mirrors PR 1
# entry-trigger capture pattern). Best-effort:
# capture failure must never kill subsequent
# exit executions. Skipped naturally on
# dry_run via the not-dry_run guard above.
if is_decision_capture_enabled():
try:
capture_exit_rule(
run_date=run_date,
stop=stop,
price_state=price_state,
exit_signal=exit_signal,
strategy_config=strategy_config,
)
except DecisionCaptureWriteError as _cap_exc:
logger.warning(
"decision_capture S3 write failed "
"for EXIT %s — continuing daemon "
"(capture is observability, not "
"load-bearing): %s",
exit_signal.get("ticker"), _cap_exc,
)
except Exception: # noqa: BLE001
logger.exception(
"decision_capture raised unexpected "
"exception for EXIT %s — continuing "
"daemon",
exit_signal.get("ticker"),
)
except (ConnectionError, OSError, asyncio_exceptions) as e:
logger.warning("Connection lost during exit %s: %s — reconnecting", exit_signal.get("ticker"), e)
ibkr, monitor = _reconnect(ibkr, monitor, order_book, config, client_id)
break
# ── Check entries ────────────────────────────────────────
if strategy_config.get("intraday_entry_triggers_enabled", True):
for entry in order_book.pending_entries():
ticker = entry["ticker"]
price_state = monitor.get_price(ticker)
if not price_state:
continue
should_enter, reason = entry_engine.should_enter(entry, price_state)
if should_enter:
try:
_execute_entry(
ibkr, conn, order_book, entry, price_state, reason,
run_date, strategy_config, dry_run, monitor=monitor,
)
if not dry_run:
trades_executed += 1
executed_tickers.add(ticker)
except (ConnectionError, OSError, asyncio_exceptions) as e:
logger.warning("Connection lost during entry %s: %s — reconnecting", ticker, e)
ibkr, monitor = _reconnect(ibkr, monitor, order_book, config, client_id)
break
except Exception as _exc:
logger.exception("Daemon error")
if fd:
fd.report(_exc, severity="critical", context={
"site": "daemon_main", "dry_run": dry_run, "run_date": run_date})
send_daemon_status("\u274c *Daemon crashed* — check logs")
raise
finally:
# ── L139(a) — flush intraday decision capture for replay parity ──
# Best-effort; flush failures WARN-log only (the primary trade
# path already completed via log_trade / send_trade_alert).
# Append semantics handle fix-and-rerun cycles within a single
# trading_day.
try:
_get_decision_logger().flush_to_s3(
bucket=config.get("signals_bucket", "alpha-engine-research"),
trading_day=run_date,
)
except Exception:
logger.debug("daemon_state flush failed", exc_info=True)
# ── Data manifest ──────────────────────────────────────────────────
try:
from executor.health_status import write_data_manifest
write_data_manifest(
bucket=config.get("signals_bucket", "alpha-engine-research"),
module_name="daemon",
run_date=run_date,
manifest={
"trades_executed": trades_executed,
"tickers_monitored": len(order_book.all_tickers()) if order_book else 0,
},
)
except Exception:
logger.debug("Data manifest write failed", exc_info=True)
_cleanup_connections(monitor, ibkr)
if conn:
conn.close()
if fd:
fd.log_summary(logger)
send_daemon_status(
f"\u23f9 *Daemon stopped*\n"
f"Trades executed: {trades_executed}"
)
logger.info("Daemon shutdown complete | trades=%d", trades_executed)
# Trigger EOD pipeline Step Function only when two conditions hold
# at exit time:
# 1. market_opened: the daemon actually entered the live trading
# window. Pre-market exits (crash, signal) must not fire EOD.
# 2. not is_market_hours(): the market is closed RIGHT NOW. This
# makes SIGTERM-driven mid-session restarts (systemctl restart,