-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsnapshot_capturer.py
More file actions
188 lines (158 loc) · 6.51 KB
/
snapshot_capturer.py
File metadata and controls
188 lines (158 loc) · 6.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
EOD snapshot capturer — reads live IB state once at end-of-day and
persists an immutable snapshot to S3 keyed by run_date.
This is Phase 2 of the EOD-SF cutover. Decouples capture from
reconciliation so `eod_reconcile.py` can read date-locked state from
S3 instead of reading current live IB state at write-time. The
architectural invariant: a row keyed by `run_date=X` must source its
inputs from observations made at time X. Live IB at write-time only
satisfied this by accident (because the timer happened to fire once
a day right after close); a snapshot makes it explicit.
Idempotent. Re-running on the same `run_date` overwrites the existing
snapshot. Hard-fails on IB connection failure or S3 write failure —
no silent fallback (the reconcile path depends on this snapshot).
SF orchestration: this script runs as the `CaptureSnapshot` step in
`alpha-engine-eod-pipeline`, between `PostMarketData` and
`EODReconcile`. Both depend on IB Gateway being up; the SF's
`StopTradingInstance` step (which kills IB) only fires after
EODReconcile completes.
S3 path: s3://alpha-engine-research/trades/snapshots/{run_date}.json
Schema (additive-only per CLAUDE.md S3 contract):
{
"run_date": "YYYY-MM-DD",
"captured_at": ISO8601,
"schema_version": 1,
"account": {net_liquidation, total_cash, settled_cash,
accrued_interest, gross_position_value,
buying_power, unrealized_pnl, realized_pnl},
"positions": {ticker: {shares, market_value, avg_cost,
unrealized_pnl, sector}},
"accrued_dividends": {ticker: float},
}
"""
from __future__ import annotations
import json
import logging
import os
import sys
from datetime import datetime, timezone
import boto3
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from executor.config_loader import load_config
from executor.ibkr import IBKRClient
from alpha_engine_lib.dates import now_dual
from alpha_engine_lib.logging import setup_logging
_FLOW_DOCTOR_EXCLUDE_PATTERNS = [r"Error 10197", r"Error 10349"]
_FLOW_DOCTOR_YAML = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"flow-doctor.yaml",
)
setup_logging(
"snapshot",
flow_doctor_yaml=_FLOW_DOCTOR_YAML,
exclude_patterns=_FLOW_DOCTOR_EXCLUDE_PATTERNS,
)
logger = logging.getLogger(__name__)
SCHEMA_VERSION = 1
def _snapshot_key(run_date: str) -> str:
return f"trades/snapshots/{run_date}.json"
def run(run_date: str | None = None) -> None:
"""Capture live IB state and write to S3 keyed by run_date.
Default `run_date` resolves via `now_dual().trading_day` (NYSE-aware,
Pacific-time "last completed trading day"). Explicit `run_date`
arguments are accepted but are expected to match today — capture
only makes sense for the current trading day since IB's account
state is now-as-of, not historical.
"""
today_trading_day = now_dual().trading_day
if run_date is None:
run_date = today_trading_day
logger.info(
"Snapshot capture | run_date=%s (resolved from now_dual().trading_day)",
run_date,
)
else:
if run_date != today_trading_day:
raise RuntimeError(
f"Snapshot capturer refusing run_date={run_date!r} "
f"!= today's trading_day {today_trading_day!r}. "
f"Snapshots can only be captured live (`get_account_snapshot()` "
f"returns now-as-of state); a historical run_date would "
f"persist today's state under yesterday's key."
)
logger.info(
"Snapshot capture | run_date=%s (explicit; matches today's trading_day)",
run_date,
)
config = load_config()
bucket = config["trades_bucket"]
# ── Connect to IB Gateway ─────────────────────────────────────────────
ibkr = IBKRClient(
host=config["ibkr_host"],
port=config["ibkr_port"],
client_id=config["ibkr_client_id"],
)
try:
account = ibkr.get_account_snapshot()
positions = ibkr.get_positions()
accrued_dividends = ibkr.get_accrued_dividends_by_symbol()
finally:
ibkr.disconnect()
payload = {
"run_date": run_date,
"captured_at": datetime.now(timezone.utc).isoformat(),
"schema_version": SCHEMA_VERSION,
"account": account,
"positions": positions,
"accrued_dividends": accrued_dividends,
}
# ── Write to S3 ─────────────────────────────────────────────────────────
s3 = boto3.client("s3", region_name=config.get("aws_region", "us-east-1"))
key = _snapshot_key(run_date)
s3.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps(payload, default=str).encode("utf-8"),
ContentType="application/json",
)
logger.info(
"Snapshot written | s3://%s/%s NAV=%s positions=%d dividends=%d",
bucket,
key,
account.get("net_liquidation"),
len(positions),
len(accrued_dividends),
)
def load_snapshot(bucket: str, run_date: str, region: str = "us-east-1") -> dict | None:
"""Load the snapshot for `run_date`. Returns None if not found.
Used by `eod_reconcile.py` to substitute for the three live IB calls
(`get_account_snapshot`, `get_positions`, `get_accrued_dividends_by_symbol`).
"""
s3 = boto3.client("s3", region_name=region)
try:
obj = s3.get_object(Bucket=bucket, Key=_snapshot_key(run_date))
except s3.exceptions.NoSuchKey:
return None
except Exception as exc:
# 404 from raw HTTPClientError can also mean "not found" depending
# on bucket config — try parsing first, surface anything else loud.
if "NoSuchKey" in str(exc) or "404" in str(exc):
return None
raise
return json.loads(obj["Body"].read())
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description=(
"Capture live IB state to S3 keyed by run_date. Defaults to "
"today's trading_day via now_dual; --date must equal today "
"(snapshots can only be captured live)."
)
)
parser.add_argument(
"--date",
default=None,
help="YYYY-MM-DD; must equal today's trading_day or the run aborts.",
)
args = parser.parse_args()
run(run_date=args.date)