-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpolygon_client.py
More file actions
347 lines (289 loc) · 12.6 KB
/
polygon_client.py
File metadata and controls
347 lines (289 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
"""
Polygon.io (Massive) market data client with rate limiting and dividend adjustment.
Replaces yfinance as primary price data source. Free tier: 5 API calls/min,
~2 years historical depth, EOD data only. Index tickers (VIX/TNX/IRX) are
not available on free tier — use FRED or yfinance for those.
Usage:
from polygon_client import PolygonClient, polygon_client
# Singleton (reads POLYGON_API_KEY from env):
client = polygon_client()
bars = client.get_daily_bars("AAPL", "2025-01-01", "2026-03-28")
# Dividend-adjusted (matches yfinance auto_adjust=True):
bars = client.get_daily_bars_dividend_adjusted("XOM", "2025-01-01", "2026-03-28")
# All US stocks for a single date:
prices = client.get_grouped_daily("2026-03-28")
# -> {"AAPL": {"open": 253.9, "high": 255.5, ...}, ...}
"""
from __future__ import annotations
import logging
import time
from collections import deque
from datetime import date, datetime, timedelta
import pandas as pd
import requests
from alpha_engine_lib.secrets import get_secret
logger = logging.getLogger(__name__)
_BASE_URL = "https://api.polygon.io"
_MAX_BARS_PER_REQUEST = 50_000 # polygon limit param max
class PolygonRateLimitError(Exception):
"""Raised when rate limit is exhausted and caller should backoff."""
class PolygonClient:
"""Rate-limited polygon.io REST client with dividend adjustment."""
def __init__(self, api_key: str | None = None, calls_per_min: int = 5):
self._api_key = api_key or get_secret("POLYGON_API_KEY", required=False, default="")
if not self._api_key:
raise ValueError("POLYGON_API_KEY not set")
self._calls_per_min = calls_per_min
self._call_times: deque[float] = deque()
self._session = requests.Session()
self._session.params = {"apiKey": self._api_key} # type: ignore[assignment]
# ── Rate limiter ──────────────────────────────────────────────────────
def _wait_for_slot(self) -> None:
"""Block until a rate limit slot is available."""
now = time.monotonic()
window = 60.0 # 1 minute window
# Purge old timestamps
while self._call_times and now - self._call_times[0] > window:
self._call_times.popleft()
if len(self._call_times) >= self._calls_per_min:
wait = window - (now - self._call_times[0]) + 0.5
logger.debug("Rate limit: waiting %.1fs", wait)
time.sleep(wait)
# Purge again after sleep
now = time.monotonic()
while self._call_times and now - self._call_times[0] > window:
self._call_times.popleft()
self._call_times.append(time.monotonic())
def _get(self, path: str, params: dict | None = None) -> dict:
"""Make a rate-limited GET request. Handles 429 with retry."""
self._wait_for_slot()
url = f"{_BASE_URL}{path}"
for attempt in range(3):
resp = self._session.get(url, params=params or {}, timeout=30)
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 15))
logger.warning("Rate limited (429), waiting %ds", retry_after)
time.sleep(retry_after)
self._call_times.clear() # Reset window after forced wait
continue
if resp.status_code == 403:
data = resp.json()
msg = data.get("message", "Not authorized")
logger.warning("Polygon 403: %s (path=%s)", msg, path)
return {"results": [], "resultsCount": 0, "status": "FORBIDDEN"}
resp.raise_for_status()
return resp.json()
raise PolygonRateLimitError("Rate limited after 3 retries")
# ── Core endpoints ────────────────────────────────────────────────────
def get_daily_bars(
self,
ticker: str,
start: str,
end: str,
adjusted: bool = True,
) -> pd.DataFrame:
"""Fetch daily OHLCV bars for a single ticker.
Returns DataFrame with DatetimeIndex and columns:
[Open, High, Low, Close, Volume]
Prices are split-adjusted (adjusted=True) but NOT dividend-adjusted.
Use get_daily_bars_dividend_adjusted() for fully-adjusted prices.
"""
params = {
"adjusted": str(adjusted).lower(),
"sort": "asc",
"limit": _MAX_BARS_PER_REQUEST,
}
data = self._get(
f"/v2/aggs/ticker/{ticker}/range/1/day/{start}/{end}",
params=params,
)
results = data.get("results", [])
if not results:
return pd.DataFrame(columns=["Open", "High", "Low", "Close", "Volume"])
df = pd.DataFrame(results)
df["date"] = pd.to_datetime(df["t"], unit="ms", utc=True).dt.tz_localize(None).dt.normalize()
df = df.rename(columns={"o": "Open", "h": "High", "l": "Low", "c": "Close", "v": "Volume"})
df = df.set_index("date")[["Open", "High", "Low", "Close", "Volume"]]
df = df.sort_index()
return df
def get_grouped_daily(self, date_str: str) -> dict[str, dict]:
"""Fetch OHLCV for ALL US stocks on a single date.
Returns {ticker: {"open": float, "high": float, "low": float,
"close": float, "volume": float}}
"""
data = self._get(
f"/v2/aggs/grouped/locale/us/market/stocks/{date_str}",
params={"adjusted": "true"},
)
results = data.get("results", [])
return {
r["T"]: {
"open": r["o"],
"high": r["h"],
"low": r["l"],
"close": r["c"],
"volume": r["v"],
}
for r in results
if "T" in r
}
def get_dividends(
self,
ticker: str,
start: str | None = None,
limit: int = 1000,
) -> list[dict]:
"""Fetch dividend history for a ticker.
Returns list of dicts with keys:
ex_dividend_date, cash_amount, frequency, declaration_date, pay_date, etc.
"""
params: dict = {"ticker": ticker, "limit": limit, "sort": "ex_dividend_date"}
if start:
params["ex_dividend_date.gte"] = start
all_dividends: list[dict] = []
next_url: str | None = None
# First page
data = self._get("/v3/reference/dividends", params=params)
all_dividends.extend(data.get("results", []))
next_url = data.get("next_url")
# Paginate
while next_url:
resp = self._get_raw_url(next_url)
all_dividends.extend(resp.get("results", []))
next_url = resp.get("next_url")
return all_dividends
def _get_raw_url(self, url: str) -> dict:
"""GET a full URL (for pagination next_url)."""
self._wait_for_slot()
# next_url already includes apiKey
if "apiKey" not in url:
url += f"&apiKey={self._api_key}" if "?" in url else f"?apiKey={self._api_key}"
resp = self._session.get(url, timeout=30)
resp.raise_for_status()
return resp.json()
# ── Dividend adjustment ───────────────────────────────────────────────
def get_daily_bars_dividend_adjusted(
self,
ticker: str,
start: str,
end: str,
) -> pd.DataFrame:
"""Fetch daily bars with full adjustment (splits + dividends).
Produces prices equivalent to yfinance auto_adjust=True.
"""
bars = self.get_daily_bars(ticker, start, end, adjusted=True)
if bars.empty:
return bars
divs = self.get_dividends(ticker, start=start)
if not divs:
return bars # No dividends → split-adjusted is sufficient
return _apply_dividend_adjustment(bars, divs)
# ── Batch helpers ─────────────────────────────────────────────────────
def fetch_batch(
self,
tickers: list[str],
start: str,
end: str,
dividend_adjusted: bool = True,
) -> dict[str, pd.DataFrame]:
"""Fetch OHLCV for multiple tickers with rate limiting.
Returns dict[ticker, DataFrame].
"""
results: dict[str, pd.DataFrame] = {}
fetch_fn = (
self.get_daily_bars_dividend_adjusted
if dividend_adjusted
else self.get_daily_bars
)
for i, ticker in enumerate(tickers):
try:
df = fetch_fn(ticker, start, end)
if not df.empty:
results[ticker] = df
except Exception as e:
logger.warning("Failed to fetch %s: %s", ticker, e)
if (i + 1) % 50 == 0:
logger.info("Batch progress: %d/%d tickers", i + 1, len(tickers))
return results
def get_single_close(self, ticker: str, date_str: str) -> float | None:
"""Get closing price for a single ticker on a single date.
Tries grouped daily first (if we happen to have it cached),
falls back to per-ticker bars.
"""
bars = self.get_daily_bars(ticker, date_str, date_str, adjusted=True)
if not bars.empty:
return float(bars["Close"].iloc[-1])
return None
# ── Dividend adjustment logic ─────────────────────────────────────────────
def _apply_dividend_adjustment(
bars: pd.DataFrame,
dividends: list[dict],
) -> pd.DataFrame:
"""Apply backward dividend adjustment to split-adjusted OHLCV bars.
For each bar date, computes:
factor = product(1 - div_amount / close_before_ex)
for all dividends with ex_date > bar_date
Then: adjusted_price = split_adjusted_price * factor
"""
df = bars.copy()
price_cols = ["Open", "High", "Low", "Close"]
# Parse and sort dividends by ex-date ascending
last_bar_date = df.index[-1]
div_records = []
for d in dividends:
ex_date = d.get("ex_dividend_date")
amount = d.get("cash_amount")
if ex_date and amount and float(amount) > 0:
ex_ts = pd.Timestamp(ex_date)
# Skip future dividends not yet ex within the data range
if ex_ts > last_bar_date:
continue
div_records.append({
"ex_date": ex_ts,
"amount": float(amount),
})
if not div_records:
return df
div_records.sort(key=lambda x: x["ex_date"])
# For each dividend, find the close price on the trading day before ex-date
# to compute the adjustment ratio
adjustment_factors = []
for div in div_records:
ex_date = div["ex_date"]
# Find closest trading day before ex-date
prior_bars = df[df.index < ex_date]
if prior_bars.empty:
# Dividend ex-date is before our data range — skip
continue
close_before = prior_bars["Close"].iloc[-1]
if close_before <= 0:
continue
ratio = 1.0 - div["amount"] / close_before
if ratio <= 0 or ratio > 1:
logger.warning(
"Skipping suspicious dividend ratio %.4f (amount=%.2f, close=%.2f)",
ratio, div["amount"], close_before,
)
continue
adjustment_factors.append({"ex_date": ex_date, "ratio": ratio})
if not adjustment_factors:
return df
# Apply cumulative backward adjustment:
# Bars before the earliest ex-date get ALL factors applied
# Bars between ex-dates get progressively fewer factors
# Bars on/after the latest ex-date get no adjustment
for col in price_cols:
adjusted = df[col].copy()
for af in adjustment_factors:
mask = df.index < af["ex_date"]
adjusted[mask] *= af["ratio"]
df[col] = adjusted
return df
# ── Singleton ─────────────────────────────────────────────────────────────
_singleton: PolygonClient | None = None
def polygon_client(api_key: str | None = None) -> PolygonClient:
"""Get or create a singleton PolygonClient."""
global _singleton
if _singleton is None:
_singleton = PolygonClient(api_key=api_key)
return _singleton