-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathorder_book.py
More file actions
284 lines (245 loc) · 11.5 KB
/
order_book.py
File metadata and controls
284 lines (245 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
"""
JSON-based intraday order book.
The morning batch writes approved entries and active stop state here.
The intraday daemon reads and updates it throughout the trading day.
Persisted to disk so the daemon can restart mid-day without losing state.
"""
from __future__ import annotations
import json
import logging
from datetime import date, datetime
from pathlib import Path
logger = logging.getLogger(__name__)
_ORDER_BOOK_DIR = Path(__file__).resolve().parent.parent / "data"
_ORDER_BOOK_PATH = _ORDER_BOOK_DIR / "order_book.json"
def _default_book(run_date: str | None = None) -> dict:
return {
"date": run_date or date.today().isoformat(),
"approved_entries": [],
"urgent_exits": [],
"active_stops": [],
"executed_today": [],
}
class OrderBook:
"""Manages the intraday order book (JSON on disk)."""
def __init__(self, data: dict, path: Path = _ORDER_BOOK_PATH):
self._data = data
self._path = path
@classmethod
def load(cls, path: Path = _ORDER_BOOK_PATH) -> "OrderBook":
"""Load order book from disk (with file lock). Returns empty book if missing or stale."""
import fcntl
if path.exists():
try:
lock_path = path.with_suffix(".lock")
with open(lock_path, "w") as lock_f:
fcntl.flock(lock_f, fcntl.LOCK_SH)
try:
data = json.loads(path.read_text())
finally:
fcntl.flock(lock_f, fcntl.LOCK_UN)
# Discard stale book from a previous day
if data.get("date") != date.today().isoformat():
logger.info("Order book is from %s — starting fresh", data.get("date"))
data = _default_book()
return cls(data, path)
except (json.JSONDecodeError, KeyError) as e:
logger.warning("Corrupt order book — starting fresh: %s", e)
return cls(_default_book(), path)
def save(self) -> None:
"""Write order book to disk (atomic via tmp + rename, with file lock)."""
import fcntl
self._path.parent.mkdir(parents=True, exist_ok=True)
lock_path = self._path.with_suffix(".lock")
with open(lock_path, "w") as lock_f:
fcntl.flock(lock_f, fcntl.LOCK_EX)
try:
tmp_path = self._path.with_suffix(".tmp")
tmp_path.write_text(json.dumps(self._data, indent=2, default=str))
tmp_path.rename(self._path)
finally:
fcntl.flock(lock_f, fcntl.LOCK_UN)
def backup_to_s3(self, bucket: str, run_date: str) -> None:
"""Backup full order book to S3 for audit trail and debugging."""
try:
import boto3
s3 = boto3.client("s3")
key = f"trades/order_book/{run_date}.json"
s3.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps(self._data, indent=2, default=str),
ContentType="application/json",
)
logger.info("Order book backed up to s3://%s/%s", bucket, key)
except Exception as e:
logger.warning("Order book S3 backup failed (non-fatal): %s", e)
# ── Queries ──────────────────────────────────────────────────────────────
@property
def data(self) -> dict:
return self._data
def all_tickers(self) -> list[str]:
"""All unique tickers across entries, urgent exits, and stops."""
tickers = set()
for entry in self._data.get("approved_entries", []):
tickers.add(entry["ticker"])
for urgent in self._data.get("urgent_exits", []):
tickers.add(urgent["ticker"])
for stop in self._data.get("active_stops", []):
tickers.add(stop["ticker"])
return sorted(tickers)
def pending_entries(self) -> list[dict]:
"""Return entries with status == 'pending'."""
return [
e for e in self._data.get("approved_entries", [])
if e.get("status") == "pending"
]
def active_stops(self) -> list[dict]:
"""Return all active stop records."""
return self._data.get("active_stops", [])
def pending_urgent_exits(self) -> list[dict]:
"""Return urgent exits with status == 'pending'."""
return [
e for e in self._data.get("urgent_exits", [])
if e.get("status") == "pending"
]
def has_content(self) -> bool:
"""Return True if the book has any entries, exits, or stops."""
return bool(
self._data.get("approved_entries")
or self._data.get("urgent_exits")
or self._data.get("active_stops")
)
# ── Mutations ────────────────────────────────────────────────────────────
def add_entry(self, entry: dict) -> None:
"""Add an approved entry to the book.
Deduplicates by ticker: if a pending entry for the same ticker
already exists, the new record is skipped.
"""
ticker = entry.get("ticker")
existing = self._data.get("approved_entries", [])
for ex in existing:
if ex.get("ticker") == ticker and ex.get("status") == "pending":
logger.warning(
"Skipping duplicate entry for %s — already pending", ticker,
)
return
entry.setdefault("status", "pending")
self._data.setdefault("approved_entries", []).append(entry)
def add_urgent_exit(self, record: dict) -> None:
"""Add an urgent exit/reduce/cover to the book (executed immediately by daemon).
Deduplicates by ticker+signal: if a pending urgent with the same ticker
and signal type already exists, the new record is skipped.
"""
ticker = record.get("ticker")
signal = record.get("signal")
existing = self._data.get("urgent_exits", [])
for ex in existing:
if (ex.get("ticker") == ticker
and ex.get("signal") == signal
and ex.get("status") == "pending"):
logger.warning(
"Skipping duplicate urgent %s for %s — already pending",
signal, ticker,
)
return
record.setdefault("status", "pending")
self._data.setdefault("urgent_exits", []).append(record)
def add_stop(self, stop: dict) -> None:
"""Add an active stop record."""
self._data.setdefault("active_stops", []).append(stop)
def mark_entry_executed(self, ticker: str, trigger_reason: str) -> None:
"""Mark an entry as executed and move to executed_today."""
entries = self._data.get("approved_entries", [])
for entry in entries:
if entry["ticker"] == ticker and entry.get("status") == "pending":
entry["status"] = "executed"
entry["trigger_reason"] = trigger_reason
entry["executed_at"] = datetime.now().isoformat()
self._data.setdefault("executed_today", []).append(entry)
break
self._data["approved_entries"] = [
e for e in entries if not (e["ticker"] == ticker and e.get("status") == "executed")
]
def mark_urgent_executed(self, ticker: str, action: str) -> None:
"""Mark an urgent exit as executed and move to executed_today."""
exits = self._data.get("urgent_exits", [])
for record in exits:
if record["ticker"] == ticker and record.get("signal") == action and record.get("status") == "pending":
record["status"] = "executed"
record["executed_at"] = datetime.now().isoformat()
self._data.setdefault("executed_today", []).append(record)
break
self._data["urgent_exits"] = [
e for e in exits
if not (e["ticker"] == ticker and e.get("signal") == action and e.get("status") == "executed")
]
def remove_stop(self, ticker: str) -> None:
"""Remove a stop record (after exit execution)."""
self._data["active_stops"] = [
s for s in self._data.get("active_stops", [])
if s["ticker"] != ticker
]
def update_stop_high_water(self, ticker: str, new_high: float, new_stop: float) -> None:
"""Update the high-water mark and trailing stop for a position."""
for stop in self._data.get("active_stops", []):
if stop["ticker"] == ticker:
stop["high_water"] = new_high
stop["current_stop"] = new_stop
break
def update_stop_shares(self, ticker: str, new_shares: int) -> None:
"""Update the share count on a stop record after a partial REDUCE."""
for stop in self._data.get("active_stops", []):
if stop["ticker"] == ticker:
stop["shares"] = new_shares
break
def mark_profit_take_executed(self, ticker: str) -> None:
"""Mark a stop record so profit-take doesn't fire again."""
for stop in self._data.get("active_stops", []):
if stop["ticker"] == ticker:
stop["profit_take_executed"] = True
break
def set_date(self, run_date: str) -> None:
"""Set the book date (used by morning batch)."""
self._data["date"] = run_date
def reset_pending(self) -> None:
"""Clear all pending items, preserving executed_today and date.
Called by main.py before rebuilding the order book. Makes main.py
idempotent — running it twice on the same day produces the same
order book rather than appending duplicates.
"""
cleared = (
len(self._data.get("approved_entries", []))
+ len(self._data.get("urgent_exits", []))
+ len(self._data.get("active_stops", []))
)
self._data["approved_entries"] = []
self._data["urgent_exits"] = []
self._data["active_stops"] = []
if cleared:
logger.info("Order book reset: cleared %d pending items (executed_today preserved)", cleared)
def merge_executed(self, executed_tickers: set[str]) -> None:
"""Remove entries and urgent exits for tickers already executed today.
Called by the daemon after reloading the order book from disk,
in case main.py re-ran and wrote fresh 'pending' entries for
tickers the daemon already traded.
"""
if not executed_tickers:
return
before_entries = len(self._data.get("approved_entries", []))
before_urgents = len(self._data.get("urgent_exits", []))
self._data["approved_entries"] = [
e for e in self._data.get("approved_entries", [])
if e["ticker"] not in executed_tickers
]
self._data["urgent_exits"] = [
e for e in self._data.get("urgent_exits", [])
if e["ticker"] not in executed_tickers
]
removed_entries = before_entries - len(self._data["approved_entries"])
removed_urgents = before_urgents - len(self._data["urgent_exits"])
if removed_entries or removed_urgents:
logger.info(
"Merged executed state: removed %d entries, %d urgent exits for already-traded tickers",
removed_entries, removed_urgents,
)