From e39c6660d1f046771ab94bc7274059284a4b3fe5 Mon Sep 17 00:00:00 2001 From: unknown Date: Sun, 8 Mar 2026 18:35:06 -0400 Subject: [PATCH] add cross-provider analysis showcase --- README.md | 7 + examples/09_daily_nba_markets.py | 100 +++++ examples/10_daily_nba_markets_polymarket.py | 152 +++++++ examples/12_cross_provider_analysis_bridge.py | 320 ++++++++++++++ .../13_deterministic_cross_provider_replay.py | 403 ++++++++++++++++++ neural/data_collection/kalshi.py | 256 ++++------- neural/data_collection/polymarket_us.py | 58 ++- neural/trading/polymarket_us_adapter.py | 182 ++++++-- .../test_kalshi_sports_parsing.py | 101 +++++ .../test_polymarket_us_source.py | 35 +- tests/exchanges/test_polymarket_adapter.py | 211 +++++++-- ...est_deterministic_cross_provider_replay.py | 38 ++ 12 files changed, 1598 insertions(+), 265 deletions(-) create mode 100644 examples/09_daily_nba_markets.py create mode 100644 examples/10_daily_nba_markets_polymarket.py create mode 100644 examples/12_cross_provider_analysis_bridge.py create mode 100644 examples/13_deterministic_cross_provider_replay.py create mode 100644 tests/data_collection/test_kalshi_sports_parsing.py create mode 100644 tests/streaming/test_deterministic_cross_provider_replay.py diff --git a/README.md b/README.md index be9f9239..24fdf481 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,13 @@ Current bridge commands: - `deployments logs` - `deployments stop` +## Highlighted Examples + +- [examples/09_daily_nba_markets.py](./examples/09_daily_nba_markets.py): public Kalshi NBA market snapshot with sports-aware parsing. +- [examples/10_daily_nba_markets_polymarket.py](./examples/10_daily_nba_markets_polymarket.py): public Polymarket US NBA market snapshot using the normalized market interface. +- [examples/12_cross_provider_analysis_bridge.py](./examples/12_cross_provider_analysis_bridge.py): live cross-provider normalization plus one shared analysis pass. +- [examples/13_deterministic_cross_provider_replay.py](./examples/13_deterministic_cross_provider_replay.py): reproducible replay proving the analysis stack generates `buy_yes`, `buy_no`, and `hold` decisions on standardized cross-provider data. + ## Credentials Create a `.env` file with your Kalshi credentials: diff --git a/examples/09_daily_nba_markets.py b/examples/09_daily_nba_markets.py new file mode 100644 index 00000000..6ad27417 --- /dev/null +++ b/examples/09_daily_nba_markets.py @@ -0,0 +1,100 @@ +""" +Daily NBA market snapshot example. + +Fetches current Kalshi NBA game markets with the Neural SDK and writes +the snapshot to a dated CSV file for downstream reporting or automation. + +Examples: + python examples/09_daily_nba_markets.py + python examples/09_daily_nba_markets.py --status open --limit 200 + python examples/09_daily_nba_markets.py --output-dir data/nba_snapshots +""" + +from __future__ import annotations + +import argparse +import asyncio +import os +import sys +from datetime import datetime, timezone +from pathlib import Path + +# Allow running the example directly from the repository root. +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from neural.data_collection import get_nba_games + + +async def fetch_daily_nba_markets( + *, + status: str, + limit: int, + output_dir: Path, + use_authenticated: bool, +) -> Path: + markets = await get_nba_games( + status=status, + limit=limit, + use_authenticated=use_authenticated, + ) + + output_dir.mkdir(parents=True, exist_ok=True) + snapshot_date = datetime.now(timezone.utc).strftime("%Y-%m-%d") + output_path = output_dir / f"nba_markets_{snapshot_date}.csv" + markets.to_csv(output_path, index=False) + + today_markets = 0 + if not markets.empty and "game_date" in markets.columns: + today = datetime.now().date() + today_markets = int(markets["game_date"].dt.date.eq(today).sum()) + + print(f"Wrote {len(markets)} NBA markets to {output_path.as_posix()}") + if "game_date" in markets.columns: + print(f"Markets dated for today: {today_markets}") + if not markets.empty: + preview_columns = [ + column + for column in ("ticker", "title", "home_team", "away_team", "game_date", "yes_ask", "volume") + if column in markets.columns + ] + if preview_columns: + print() + print(markets[preview_columns].head(10).to_string(index=False)) + + return output_path + + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Fetch and store a daily NBA market snapshot.") + parser.add_argument("--status", default="open", help="Kalshi market status filter, default: open") + parser.add_argument("--limit", type=int, default=200, help="Maximum markets to fetch, default: 200") + parser.add_argument( + "--output-dir", + type=Path, + default=Path("data") / "nba_snapshots", + help="Directory for dated CSV snapshots, default: data/nba_snapshots", + ) + parser.add_argument( + "--authenticated", + action="store_true", + help="Use authenticated Kalshi API access instead of the public market endpoint", + ) + return parser.parse_args() + + + +def main() -> None: + args = parse_args() + asyncio.run( + fetch_daily_nba_markets( + status=args.status, + limit=args.limit, + output_dir=args.output_dir, + use_authenticated=args.authenticated, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/examples/10_daily_nba_markets_polymarket.py b/examples/10_daily_nba_markets_polymarket.py new file mode 100644 index 00000000..573bd9b0 --- /dev/null +++ b/examples/10_daily_nba_markets_polymarket.py @@ -0,0 +1,152 @@ +""" +Daily Polymarket US NBA market snapshot example. + +Fetches current Polymarket US NBA markets with the Neural SDK and writes +the snapshot to a dated CSV file for downstream reporting or automation. + +Public market reads do not require credentials. If Polymarket US API +credentials are configured locally, the same SDK adapter can also be used for +authenticated account endpoints elsewhere in the SDK. + +Examples: + uv run python examples/10_daily_nba_markets_polymarket.py + uv run python examples/10_daily_nba_markets_polymarket.py --sport nba --status open --limit 200 + uv run python examples/10_daily_nba_markets_polymarket.py --output-dir data/polymarket_nba_snapshots +""" + +from __future__ import annotations + +import argparse +import os +import sys +from datetime import datetime, timezone +from pathlib import Path + +# Allow running the example directly from the repository root. +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from neural.data_collection import PolymarketUSMarketsSource + + +def _hydrate_quotes(source: PolymarketUSMarketsSource, markets): + if markets.empty or "market_id" not in markets.columns: + return markets + + markets = markets.copy() + last_prices = [] + volumes = [] + yes_prices = [] + no_prices = [] + for market_id in markets["market_id"]: + quote = source.adapter.get_quote(str(market_id)) + yes_prices.append(quote.yes_ask if quote.yes_ask is not None else quote.yes_bid) + no_prices.append(quote.no_ask if quote.no_ask is not None else quote.no_bid) + last_prices.append(quote.last_price) + volumes.append(quote.volume) + + markets["yes_price"] = yes_prices + markets["no_price"] = no_prices + markets["last_price"] = last_prices + markets["volume"] = volumes + return markets + + +def fetch_daily_polymarket_markets( + *, + sport: str, + status: str | None, + limit: int, + output_dir: Path, + sports_only: bool, +) -> Path: + source = PolymarketUSMarketsSource( + config={ + "sport": sport, + "limit": limit, + "sports_only": sports_only, + } + ) + try: + markets = source.get_markets_df() + markets = _hydrate_quotes(source, markets) + finally: + source.adapter.close() + + if status and not markets.empty and "status" in markets.columns: + status_mask = markets["status"].astype(str).str.lower() == status.lower() + markets = markets[status_mask].reset_index(drop=True) + + output_dir.mkdir(parents=True, exist_ok=True) + snapshot_date = datetime.now(timezone.utc).strftime("%Y-%m-%d") + output_path = output_dir / f"polymarket_{sport.lower()}_markets_{snapshot_date}.csv" + markets.to_csv(output_path, index=False) + + print( + f"Wrote {len(markets)} Polymarket {sport.upper()} markets to {output_path.as_posix()}" + ) + if not markets.empty: + preview_columns = [ + column + for column in ( + "market_id", + "ticker", + "title", + "status", + "home_team", + "away_team", + "game_date", + "market_type", + "yes_price", + "no_price", + "last_price", + "volume", + "sport", + "category", + ) + if column in markets.columns + ] + if preview_columns: + print() + print(markets[preview_columns].head(10).to_string(index=False)) + + return output_path + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Fetch and store a daily Polymarket US sports market snapshot." + ) + parser.add_argument("--sport", default="nba", help="Sport/league filter, default: nba") + parser.add_argument( + "--status", + default="open", + help="Optional status filter applied after fetch, default: open", + ) + parser.add_argument("--limit", type=int, default=200, help="Maximum markets to fetch, default: 200") + parser.add_argument( + "--output-dir", + type=Path, + default=Path("data") / "polymarket_nba_snapshots", + help="Directory for dated CSV snapshots, default: data/polymarket_nba_snapshots", + ) + parser.add_argument( + "--all-markets", + action="store_true", + help="Use the generic markets endpoint instead of the sports-only endpoint", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + fetch_daily_polymarket_markets( + sport=args.sport, + status=args.status, + limit=args.limit, + output_dir=args.output_dir, + sports_only=not args.all_markets, + ) + + +if __name__ == "__main__": + main() diff --git a/examples/12_cross_provider_analysis_bridge.py b/examples/12_cross_provider_analysis_bridge.py new file mode 100644 index 00000000..13796b3c --- /dev/null +++ b/examples/12_cross_provider_analysis_bridge.py @@ -0,0 +1,320 @@ +""" +Cross-provider public market normalization and analysis demo. + +This example shows how the SDK can: +1. Pull public NBA markets from Kalshi and Polymarket US. +2. Standardize them into one shared tabular schema. +3. Poll both providers to create a simple provider-agnostic event stream. +4. Run the same analysis stack over both feeds. + +The goal is not to prove alpha. It demonstrates that external provider data can +flow through one normalization layer and then into one analysis layer. + +Examples: + uv run python examples/12_cross_provider_analysis_bridge.py + uv run python examples/12_cross_provider_analysis_bridge.py --limit 3 --rounds 3 --interval 1.0 + uv run python examples/12_cross_provider_analysis_bridge.py --output-dir data/cross_provider_analysis +""" + +from __future__ import annotations + +import argparse +import asyncio +import os +import sys +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +import pandas as pd + +# Allow running the example directly from the repository root. +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from neural.analysis import edge_proportional, fixed_percentage, kelly_criterion +from neural.analysis.strategies import MeanReversionStrategy +from neural.data_collection import PolymarketUSMarketsSource, get_nba_games + +ANALYSIS_CAPITAL = 1000.0 + + +def _coerce_probability(value: Any) -> float | None: + if value is None or pd.isna(value): + return None + try: + numeric = float(value) + except (TypeError, ValueError): + return None + if numeric > 1: + numeric /= 100.0 + if 0 <= numeric <= 1: + return numeric + return None + + +def _snapshot_now() -> pd.Timestamp: + return pd.Timestamp(datetime.now(UTC)) + + +async def fetch_kalshi_nba(limit: int) -> pd.DataFrame: + df = await get_nba_games(status="open", limit=max(limit, 1), use_authenticated=False) + if df.empty: + return pd.DataFrame() + + df = df.copy().head(limit) + df["exchange"] = "kalshi" + df["market_id"] = df["ticker"].astype(str) + df["yes_price"] = df["yes_ask"].apply(_coerce_probability) + df["no_price"] = df["no_ask"].apply(_coerce_probability) + df["last_price"] = df["yes_price"] + df["market_type"] = "moneyline" + df["stream_timestamp"] = _snapshot_now() + return df[ + [ + "exchange", + "market_id", + "ticker", + "title", + "status", + "home_team", + "away_team", + "game_date", + "market_type", + "yes_price", + "no_price", + "last_price", + "volume", + "stream_timestamp", + ] + ].reset_index(drop=True) + + +def fetch_polymarket_nba(limit: int) -> pd.DataFrame: + source = PolymarketUSMarketsSource(config={"sport": "nba", "limit": limit, "sports_only": True}) + try: + df = source.get_markets_df() + if df.empty: + return df + + df = df.copy().head(limit) + yes_prices = [] + no_prices = [] + last_prices = [] + volumes = [] + for market_id in df["market_id"]: + quote = source.adapter.get_quote(str(market_id)) + yes_prices.append(quote.yes_ask if quote.yes_ask is not None else quote.yes_bid) + no_prices.append(quote.no_ask if quote.no_ask is not None else quote.no_bid) + last_prices.append(quote.last_price) + volumes.append(quote.volume) + + df["exchange"] = "polymarket_us" + df["yes_price"] = yes_prices + df["no_price"] = no_prices + df["last_price"] = last_prices + df["volume"] = volumes + df["stream_timestamp"] = _snapshot_now() + return df[ + [ + "exchange", + "market_id", + "ticker", + "title", + "status", + "home_team", + "away_team", + "game_date", + "market_type", + "yes_price", + "no_price", + "last_price", + "volume", + "stream_timestamp", + ] + ].reset_index(drop=True) + finally: + source.adapter.close() + + +async def fetch_standardized_snapshots(limit: int) -> pd.DataFrame: + kalshi_df, polymarket_df = await asyncio.gather(fetch_kalshi_nba(limit), asyncio.to_thread(fetch_polymarket_nba, limit)) + frames = [df for df in (kalshi_df, polymarket_df) if not df.empty] + if not frames: + return pd.DataFrame() + + records: list[dict[str, Any]] = [] + for frame in frames: + records.extend(frame.to_dict("records")) + + combined = pd.DataFrame.from_records(records) + combined["game_date"] = pd.to_datetime(combined["game_date"], errors="coerce", utc=True) + for column in ("yes_price", "no_price", "last_price", "volume"): + combined[column] = pd.to_numeric(combined[column], errors="coerce") + return combined + + +def _event_frame_from_snapshot(snapshot: pd.DataFrame) -> pd.DataFrame: + if snapshot.empty: + return snapshot + + events = snapshot.copy() + events["yes_ask"] = events["yes_price"].fillna(events["last_price"]) + events["no_ask"] = events["no_price"] + missing_no = events["no_ask"].isna() & events["yes_ask"].notna() + events.loc[missing_no, "no_ask"] = 1.0 - events.loc[missing_no, "yes_ask"] + events = events.dropna(subset=["yes_ask", "no_ask"]) + return events[ + [ + "exchange", + "market_id", + "ticker", + "title", + "stream_timestamp", + "yes_ask", + "no_ask", + "last_price", + "volume", + ] + ].reset_index(drop=True) + + +async def build_polling_stream(limit: int, rounds: int, interval: float) -> pd.DataFrame: + frames: list[pd.DataFrame] = [] + for round_index in range(rounds): + snapshot = await fetch_standardized_snapshots(limit) + if not snapshot.empty: + snapshot = snapshot.copy() + snapshot["poll_round"] = round_index + 1 + frames.append(_event_frame_from_snapshot(snapshot)) + if round_index < rounds - 1 and interval > 0: + await asyncio.sleep(interval) + + if not frames: + return pd.DataFrame() + stream = pd.concat(frames, ignore_index=True) + stream["stream_timestamp"] = pd.to_datetime(stream["stream_timestamp"], errors="coerce") + return stream + + +def analyze_standardized_stream(stream: pd.DataFrame) -> pd.DataFrame: + if stream.empty: + return pd.DataFrame() + + rows: list[dict[str, Any]] = [] + for (exchange, market_id), market_events in stream.groupby(["exchange", "market_id"], sort=False): + market_events = market_events.sort_values("stream_timestamp").reset_index(drop=True) + market_data = market_events[["ticker", "yes_ask", "no_ask", "volume"]].copy() + + strategy = MeanReversionStrategy( + name=f"{exchange}:{market_id}", + divergence_threshold=0.001, + lookback_periods=1, + use_sportsbook=False, + initial_capital=ANALYSIS_CAPITAL, + max_position_size=0.05, + ) + signal = strategy.hold(str(market_id)) + for end_idx in range(1, len(market_data) + 1): + signal = strategy.analyze(market_data.iloc[:end_idx]) + + latest_yes = float(market_data.iloc[-1]["yes_ask"]) + rolling_mean = float(market_data["yes_ask"].mean()) + edge = abs(latest_yes - rolling_mean) + rows.append( + { + "exchange": exchange, + "market_id": market_id, + "ticker": market_events.iloc[-1]["ticker"], + "latest_yes_price": latest_yes, + "rolling_mean_yes_price": rolling_mean, + "edge_vs_mean": edge, + "signal": signal.signal_type.value, + "signal_confidence": signal.confidence, + "recommended_size": signal.recommended_size, + "kelly_fraction": kelly_criterion(edge=edge, odds=1.0), + "edge_contracts": edge_proportional(edge=edge, capital=ANALYSIS_CAPITAL), + "fixed_contracts": fixed_percentage(capital=ANALYSIS_CAPITAL, percentage=0.02), + } + ) + + result = pd.DataFrame(rows) + return result.sort_values(["exchange", "edge_vs_mean"], ascending=[True, False]).reset_index(drop=True) + + +async def main() -> None: + args = parse_args() + output_dir = args.output_dir + output_dir.mkdir(parents=True, exist_ok=True) + + snapshot = await fetch_standardized_snapshots(args.limit) + stream = await build_polling_stream(args.limit, args.rounds, args.interval) + analysis = analyze_standardized_stream(stream) + + date_stamp = datetime.now(UTC).strftime("%Y-%m-%d") + snapshot_path = output_dir / f"cross_provider_snapshot_{date_stamp}.csv" + stream_path = output_dir / f"cross_provider_stream_{date_stamp}.csv" + analysis_path = output_dir / f"cross_provider_analysis_{date_stamp}.csv" + + snapshot.to_csv(snapshot_path, index=False) + stream.to_csv(stream_path, index=False) + analysis.to_csv(analysis_path, index=False) + + print(f"Wrote standardized snapshot to {snapshot_path.as_posix()}") + print(f"Wrote polling stream to {stream_path.as_posix()}") + print(f"Wrote analysis output to {analysis_path.as_posix()}") + + if not snapshot.empty: + print("\nStandardized Snapshot Preview:\n") + print( + snapshot[ + [ + column + for column in ( + "exchange", + "ticker", + "title", + "home_team", + "away_team", + "game_date", + "yes_price", + "no_price", + "last_price", + "volume", + ) + if column in snapshot.columns + ] + ] + .head(10) + .to_string(index=False) + ) + + if not stream.empty: + print("\nStandardized Stream Preview:\n") + print(stream.head(10).to_string(index=False)) + + if not analysis.empty: + print("\nAnalysis Preview:\n") + print(analysis.head(10).to_string(index=False)) + else: + print("\nAnalysis Preview:\n") + print("No analyzable stream rows were produced.") + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Pull public NBA data from multiple providers, normalize it, and run one analysis pass." + ) + parser.add_argument("--limit", type=int, default=3, help="Markets per provider to include, default: 3") + parser.add_argument("--rounds", type=int, default=2, help="Polling rounds for the standardized stream, default: 2") + parser.add_argument("--interval", type=float, default=1.0, help="Seconds between polling rounds, default: 1.0") + parser.add_argument( + "--output-dir", + type=Path, + default=Path("data") / "cross_provider_analysis", + help="Directory for normalized snapshot, stream, and analysis CSVs", + ) + return parser.parse_args() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/13_deterministic_cross_provider_replay.py b/examples/13_deterministic_cross_provider_replay.py new file mode 100644 index 00000000..1a7d25dd --- /dev/null +++ b/examples/13_deterministic_cross_provider_replay.py @@ -0,0 +1,403 @@ +""" +Deterministic cross-provider normalization and analysis replay. + +This example is the complement to the live public bridge example. Instead of +depending on live market movement, it replays fixed provider-shaped records so +the output is reproducible and the analysis stack produces non-trivial signals. + +It demonstrates: +1. Provider-specific raw pulls with different field names. +2. One shared normalization schema across Kalshi and Polymarket US. +3. One provider-agnostic event stream built from repeated polling rounds. +4. One analysis pass using the same strategy and risk sizing methods. + +Examples: + uv run python examples/13_deterministic_cross_provider_replay.py + uv run python examples/13_deterministic_cross_provider_replay.py --output-dir data/deterministic_cross_provider +""" + +from __future__ import annotations + +import argparse +import os +import sys +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +import pandas as pd + +# Allow running the example directly from the repository root. +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from neural.analysis import edge_proportional, fixed_percentage, kelly_criterion +from neural.analysis.strategies import MeanReversionStrategy + +ANALYSIS_CAPITAL = 1000.0 + + +RAW_KALSHI_ROUNDS: list[list[dict[str, Any]]] = [ + [ + { + "ticker": "KXNBAGAME-26MAR10CHINYK-NYK", + "title": "Chicago at New York Winner?", + "status": "open", + "home_team": "New York Knicks", + "away_team": "Chicago Bulls", + "game_date": "2026-03-10T23:00:00Z", + "yes_ask": 55, + "no_ask": 45, + "volume": 100, + } + ], + [ + { + "ticker": "KXNBAGAME-26MAR10CHINYK-NYK", + "title": "Chicago at New York Winner?", + "status": "open", + "home_team": "New York Knicks", + "away_team": "Chicago Bulls", + "game_date": "2026-03-10T23:00:00Z", + "yes_ask": 54, + "no_ask": 46, + "volume": 110, + } + ], + [ + { + "ticker": "KXNBAGAME-26MAR10CHINYK-NYK", + "title": "Chicago at New York Winner?", + "status": "open", + "home_team": "New York Knicks", + "away_team": "Chicago Bulls", + "game_date": "2026-03-10T23:00:00Z", + "yes_ask": 40, + "no_ask": 60, + "volume": 180, + } + ], +] + + +RAW_POLYMARKET_ROUNDS: list[list[dict[str, Any]]] = [ + [ + { + "id": "pm-bos-lal-moneyline", + "slug": "lakers-celtics-moneyline", + "question": "Will the Boston Celtics beat the Los Angeles Lakers?", + "status": "open", + "homeTeam": "Boston Celtics", + "awayTeam": "Los Angeles Lakers", + "gameStartTime": "2026-03-11T00:30:00Z", + "marketType": "moneyline", + "yes_price": 0.42, + "no_price": 0.58, + "last_price": 0.42, + "volume": 90, + }, + { + "id": "pm-den-dal-moneyline", + "slug": "mavericks-nuggets-moneyline", + "question": "Will the Denver Nuggets beat the Dallas Mavericks?", + "status": "open", + "homeTeam": "Denver Nuggets", + "awayTeam": "Dallas Mavericks", + "gameStartTime": "2026-03-11T02:00:00Z", + "marketType": "moneyline", + "yes_price": 0.50, + "no_price": 0.50, + "last_price": 0.50, + "volume": 70, + }, + ], + [ + { + "id": "pm-bos-lal-moneyline", + "slug": "lakers-celtics-moneyline", + "question": "Will the Boston Celtics beat the Los Angeles Lakers?", + "status": "open", + "homeTeam": "Boston Celtics", + "awayTeam": "Los Angeles Lakers", + "gameStartTime": "2026-03-11T00:30:00Z", + "marketType": "moneyline", + "yes_price": 0.43, + "no_price": 0.57, + "last_price": 0.43, + "volume": 95, + }, + { + "id": "pm-den-dal-moneyline", + "slug": "mavericks-nuggets-moneyline", + "question": "Will the Denver Nuggets beat the Dallas Mavericks?", + "status": "open", + "homeTeam": "Denver Nuggets", + "awayTeam": "Dallas Mavericks", + "gameStartTime": "2026-03-11T02:00:00Z", + "marketType": "moneyline", + "yes_price": 0.51, + "no_price": 0.49, + "last_price": 0.51, + "volume": 72, + }, + ], + [ + { + "id": "pm-bos-lal-moneyline", + "slug": "lakers-celtics-moneyline", + "question": "Will the Boston Celtics beat the Los Angeles Lakers?", + "status": "open", + "homeTeam": "Boston Celtics", + "awayTeam": "Los Angeles Lakers", + "gameStartTime": "2026-03-11T00:30:00Z", + "marketType": "moneyline", + "yes_price": 0.64, + "no_price": 0.36, + "last_price": 0.64, + "volume": 200, + }, + { + "id": "pm-den-dal-moneyline", + "slug": "mavericks-nuggets-moneyline", + "question": "Will the Denver Nuggets beat the Dallas Mavericks?", + "status": "open", + "homeTeam": "Denver Nuggets", + "awayTeam": "Dallas Mavericks", + "gameStartTime": "2026-03-11T02:00:00Z", + "marketType": "moneyline", + "yes_price": 0.505, + "no_price": 0.495, + "last_price": 0.505, + "volume": 74, + }, + ], +] + + +ROUND_TIMESTAMPS = [ + pd.Timestamp("2026-03-08T12:00:00Z"), + pd.Timestamp("2026-03-08T12:01:00Z"), + pd.Timestamp("2026-03-08T12:02:00Z"), +] + + +def _normalize_kalshi_round(records: list[dict[str, Any]], stream_timestamp: pd.Timestamp) -> pd.DataFrame: + rows = [] + for record in records: + yes_price = float(record["yes_ask"]) / 100.0 + no_price = float(record["no_ask"]) / 100.0 + rows.append( + { + "exchange": "kalshi", + "market_id": record["ticker"], + "ticker": record["ticker"], + "title": record["title"], + "status": record["status"], + "home_team": record["home_team"], + "away_team": record["away_team"], + "game_date": pd.Timestamp(record["game_date"]), + "market_type": "moneyline", + "yes_price": yes_price, + "no_price": no_price, + "last_price": yes_price, + "volume": float(record["volume"]), + "stream_timestamp": stream_timestamp, + } + ) + return pd.DataFrame.from_records(rows) + + +def _normalize_polymarket_round( + records: list[dict[str, Any]], stream_timestamp: pd.Timestamp +) -> pd.DataFrame: + rows = [] + for record in records: + rows.append( + { + "exchange": "polymarket_us", + "market_id": record["id"], + "ticker": record["slug"], + "title": record["question"], + "status": record["status"], + "home_team": record["homeTeam"], + "away_team": record["awayTeam"], + "game_date": pd.Timestamp(record["gameStartTime"]), + "market_type": record["marketType"], + "yes_price": float(record["yes_price"]), + "no_price": float(record["no_price"]), + "last_price": float(record["last_price"]), + "volume": float(record["volume"]), + "stream_timestamp": stream_timestamp, + } + ) + return pd.DataFrame.from_records(rows) + + +def build_replay_stream() -> pd.DataFrame: + frames: list[pd.DataFrame] = [] + for round_index, stream_timestamp in enumerate(ROUND_TIMESTAMPS): + kalshi_frame = _normalize_kalshi_round(RAW_KALSHI_ROUNDS[round_index], stream_timestamp) + polymarket_frame = _normalize_polymarket_round( + RAW_POLYMARKET_ROUNDS[round_index], stream_timestamp + ) + round_frame = pd.concat([kalshi_frame, polymarket_frame], ignore_index=True) + round_frame["poll_round"] = round_index + 1 + frames.append(round_frame) + + stream = pd.concat(frames, ignore_index=True) + stream["stream_timestamp"] = pd.to_datetime(stream["stream_timestamp"], utc=True) + stream["game_date"] = pd.to_datetime(stream["game_date"], utc=True) + return stream + + +def latest_snapshot(stream: pd.DataFrame) -> pd.DataFrame: + if stream.empty: + return stream + latest_round = int(stream["poll_round"].max()) + snapshot = stream[stream["poll_round"] == latest_round].copy() + return snapshot.reset_index(drop=True) + + +def analyze_standardized_stream(stream: pd.DataFrame) -> pd.DataFrame: + if stream.empty: + return pd.DataFrame() + + rows: list[dict[str, Any]] = [] + for (exchange, market_id), market_events in stream.groupby(["exchange", "market_id"], sort=False): + market_events = market_events.sort_values("stream_timestamp").reset_index(drop=True) + market_data = market_events[["ticker", "yes_price", "no_price", "volume"]].rename( + columns={"yes_price": "yes_ask", "no_price": "no_ask"} + ) + + strategy = MeanReversionStrategy( + name=f"{exchange}:{market_id}", + divergence_threshold=0.03, + lookback_periods=2, + use_sportsbook=False, + initial_capital=ANALYSIS_CAPITAL, + max_position_size=0.05, + ) + + signal = strategy.hold(str(market_id)) + for end_idx in range(1, len(market_data) + 1): + signal = strategy.analyze(market_data.iloc[:end_idx]) + + latest_yes = float(market_data.iloc[-1]["yes_ask"]) + rolling_mean = float(market_data["yes_ask"].mean()) + edge = abs(latest_yes - rolling_mean) + + rows.append( + { + "exchange": exchange, + "market_id": market_id, + "ticker": market_events.iloc[-1]["ticker"], + "title": market_events.iloc[-1]["title"], + "signal": signal.signal_type.value, + "signal_confidence": signal.confidence, + "recommended_fraction": signal.recommended_size, + "latest_yes_price": latest_yes, + "rolling_mean_yes_price": rolling_mean, + "edge_vs_mean": edge, + "kelly_fraction": kelly_criterion(edge=edge, odds=1.0), + "edge_contracts": edge_proportional(edge=edge, capital=ANALYSIS_CAPITAL), + "fixed_contracts": fixed_percentage(capital=ANALYSIS_CAPITAL, percentage=0.02), + "entry_price": (signal.metadata or {}).get("entry_price"), + "target_price": (signal.metadata or {}).get("target_price"), + "fair_value": (signal.metadata or {}).get("fair_value"), + } + ) + + return pd.DataFrame.from_records(rows).sort_values( + ["exchange", "edge_vs_mean"], ascending=[True, False] + ).reset_index(drop=True) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Replay deterministic cross-provider market pulls through one normalization and analysis layer." + ) + parser.add_argument( + "--output-dir", + type=Path, + default=Path("data") / "deterministic_cross_provider", + help="Directory for replay snapshot, stream, and analysis CSVs", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + output_dir = args.output_dir + output_dir.mkdir(parents=True, exist_ok=True) + + stream = build_replay_stream() + snapshot = latest_snapshot(stream) + analysis = analyze_standardized_stream(stream) + + date_stamp = datetime.now(UTC).strftime("%Y-%m-%d") + snapshot_path = output_dir / f"deterministic_snapshot_{date_stamp}.csv" + stream_path = output_dir / f"deterministic_stream_{date_stamp}.csv" + analysis_path = output_dir / f"deterministic_analysis_{date_stamp}.csv" + + snapshot.to_csv(snapshot_path, index=False) + stream.to_csv(stream_path, index=False) + analysis.to_csv(analysis_path, index=False) + + print(f"Wrote deterministic snapshot to {snapshot_path.as_posix()}") + print(f"Wrote deterministic stream to {stream_path.as_posix()}") + print(f"Wrote deterministic analysis to {analysis_path.as_posix()}") + + print("\nLatest Standardized Snapshot:\n") + print( + snapshot[ + [ + "exchange", + "ticker", + "title", + "home_team", + "away_team", + "game_date", + "yes_price", + "no_price", + "last_price", + "volume", + ] + ].to_string(index=False) + ) + + print("\nDeterministic Replay Stream:\n") + print( + stream[ + [ + "poll_round", + "exchange", + "ticker", + "stream_timestamp", + "yes_price", + "no_price", + "volume", + ] + ].to_string(index=False) + ) + + print("\nAnalysis Output:\n") + print( + analysis[ + [ + "exchange", + "ticker", + "signal", + "signal_confidence", + "recommended_fraction", + "latest_yes_price", + "rolling_mean_yes_price", + "edge_vs_mean", + "kelly_fraction", + "edge_contracts", + "fixed_contracts", + ] + ].to_string(index=False) + ) + + +if __name__ == "__main__": + main() diff --git a/neural/data_collection/kalshi.py b/neural/data_collection/kalshi.py index 040590ef..0262a6d1 100644 --- a/neural/data_collection/kalshi.py +++ b/neural/data_collection/kalshi.py @@ -39,6 +39,76 @@ def _resolve_series_list(series: Iterable[str] | None) -> list[str]: return [s for s in (_normalize_series(item) for item in series) if s] +_MONTH_MAP = { + "JAN": 1, + "FEB": 2, + "MAR": 3, + "APR": 4, + "MAY": 5, + "JUN": 6, + "JUL": 7, + "AUG": 8, + "SEP": 9, + "OCT": 10, + "NOV": 11, + "DEC": 12, +} + + +def _parse_market_date_from_ticker(ticker: Any) -> pd.Timestamp: + if not isinstance(ticker, str): + return pd.NaT + + match = re.search(r"-(\d{2}[A-Z]{3}\d{2})", ticker) + if not match: + return pd.NaT + + date_str = match.group(1) + try: + year = int(date_str[0:2]) + 2000 + month = _MONTH_MAP.get(date_str[2:5]) + day = int(date_str[5:7]) + if month is None: + return pd.NaT + return pd.to_datetime(f"{year}-{month:02d}-{day:02d}") + except Exception: + return pd.NaT + + +def _parse_matchup_teams(row: pd.Series) -> pd.Series: + title = str(row.get("title", "")) + subtitle = str(row.get("subtitle", "")) + + patterns = [ + r"Will the (?P.+?) beat the (?P.+?)\?", + r"(?P.+?) at (?P.+?) Winner\?", + r"(?P.+?) vs (?P.+?) Winner\?", + ] + for source in (title, subtitle): + for pattern in patterns: + match = re.search(pattern, source, re.IGNORECASE) + if match: + return pd.Series( + { + "home_team": match.group("home").strip(), + "away_team": match.group("away").strip(), + } + ) + + for separator in (" vs ", " at "): + if separator in subtitle: + teams = subtitle.split(separator) + if len(teams) == 2: + return pd.Series( + { + "home_team": teams[1].strip(), + "away_team": teams[0].strip(), + } + ) + + return pd.Series({"home_team": None, "away_team": None}) + + async def _fetch_markets( params: dict[str, Any], *, @@ -356,64 +426,9 @@ async def get_nfl_games( ) if not df.empty: - # Parse teams from title (common format: "Will the [Away] beat the [Home]?" or similar) - def parse_teams(row): - title = row["title"] - match = re.search( - r"Will the (\w+(?:\s\w+)?) beat the (\w+(?:\s\w+)?)\?", title, re.IGNORECASE - ) - if match: - away, home = match.groups() - return pd.Series({"home_team": home, "away_team": away}) - # Fallback: extract from subtitle or ticker - subtitle = row.get("subtitle", "") - if " vs " in subtitle: - teams = subtitle.split(" vs ") - return pd.Series( - { - "home_team": teams[1].strip() if len(teams) > 1 else None, - "away_team": teams[0].strip(), - } - ) - return pd.Series({"home_team": None, "away_team": None}) - - team_df = df.apply(parse_teams, axis=1) + team_df = df.apply(_parse_matchup_teams, axis=1) df = pd.concat([df, team_df], axis=1) - - # Parse game date from ticker (format: KXNFLGAME-25SEP22DETBAL -> 25SEP22) - def parse_game_date(ticker): - match = re.search(r"-(\d{2}[A-Z]{3}\d{2})", ticker) - if match: - date_str = match.group(1) - try: - # Assume YYMMMDD, convert to full year (e.g., 22 -> 2022) - year = ( - int(date_str[-2:]) + 2000 - if int(date_str[-2:]) < 50 - else 1900 + int(date_str[-2:]) - ) - month_map = { - "JAN": 1, - "FEB": 2, - "MAR": 3, - "APR": 4, - "MAY": 5, - "JUN": 6, - "JUL": 7, - "AUG": 8, - "SEP": 9, - "OCT": 10, - "NOV": 11, - "DEC": 12, - } - month = month_map.get(date_str[2:5]) - day = int(date_str[0:2]) - return pd.to_datetime(f"{year}-{month:02d}-{day:02d}") - except Exception: - pass - return pd.NaT - - df["game_date"] = df["ticker"].apply(parse_game_date) + df["game_date"] = df["ticker"].apply(_parse_market_date_from_ticker) # Bug Fix #4, #12: Filter using ticker (which exists) instead of series_ticker (which doesn't) # The series_ticker field doesn't exist in Kalshi API responses, use ticker or event_ticker instead @@ -455,73 +470,9 @@ async def get_nba_games( ) if not df.empty: - # Parse teams from title (NBA format: "Will the [Away] beat the [Home]?" or similar) - def parse_teams(row): - title = row["title"] - match = re.search( - r"Will the (\w+(?:\s\w+)?) beat the (\w+(?:\s\w+)?)\?", title, re.IGNORECASE - ) - if match: - away, home = match.groups() - return pd.Series({"home_team": home, "away_team": away}) - # Fallback: extract from subtitle or ticker - subtitle = row.get("subtitle", "") - if " vs " in subtitle: - teams = subtitle.split(" vs ") - return pd.Series( - { - "home_team": teams[1].strip() if len(teams) > 1 else None, - "away_team": teams[0].strip(), - } - ) - # NBA-specific: Try "at" format (Away at Home) - if " at " in subtitle: - teams = subtitle.split(" at ") - return pd.Series( - { - "home_team": teams[1].strip() if len(teams) > 1 else None, - "away_team": teams[0].strip(), - } - ) - return pd.Series({"home_team": None, "away_team": None}) - - team_df = df.apply(parse_teams, axis=1) + team_df = df.apply(_parse_matchup_teams, axis=1) df = pd.concat([df, team_df], axis=1) - - # Parse game date from ticker (format: KXNBA-25OCT15LALGSW -> 25OCT15) - def parse_game_date(ticker): - match = re.search(r"-(\d{2}[A-Z]{3}\d{2})", ticker) - if match: - date_str = match.group(1) - try: - # Assume YYMMMDD, convert to full year (e.g., 25 -> 2025) - year = ( - int(date_str[-2:]) + 2000 - if int(date_str[-2:]) < 50 - else 1900 + int(date_str[-2:]) - ) - month_map = { - "JAN": 1, - "FEB": 2, - "MAR": 3, - "APR": 4, - "MAY": 5, - "JUN": 6, - "JUL": 7, - "AUG": 8, - "SEP": 9, - "OCT": 10, - "NOV": 11, - "DEC": 12, - } - month = month_map.get(date_str[2:5]) - day = int(date_str[0:2]) - return pd.to_datetime(f"{year}-{month:02d}-{day:02d}") - except Exception: - pass - return pd.NaT - - df["game_date"] = df["ticker"].apply(parse_game_date) + df["game_date"] = df["ticker"].apply(_parse_market_date_from_ticker) # Filter for NBA games only nba_mask = df["ticker"].str.contains("KXNBA", na=False) | df["title"].str.contains( @@ -553,7 +504,7 @@ async def get_cfb_games( DataFrame with CFB markets, including parsed teams and game date """ df = await get_markets_by_sport( - sport="NCAA Football", + sport="CFB", status=status, limit=limit, use_authenticated=use_authenticated, @@ -562,62 +513,9 @@ async def get_cfb_games( ) if not df.empty: - # Parse teams similar to NFL - def parse_teams(row): - title = row["title"] - match = re.search( - r"Will the (\w+(?:\s\w+)?) beat the (\w+(?:\s\w+)?)\?", title, re.IGNORECASE - ) - if match: - away, home = match.groups() - return pd.Series({"home_team": home, "away_team": away}) - subtitle = row.get("subtitle", "") - if " vs " in subtitle: - teams = subtitle.split(" vs ") - return pd.Series( - { - "home_team": teams[1].strip() if len(teams) > 1 else None, - "away_team": teams[0].strip(), - } - ) - return pd.Series({"home_team": None, "away_team": None}) - - team_df = df.apply(parse_teams, axis=1) + team_df = df.apply(_parse_matchup_teams, axis=1) df = pd.concat([df, team_df], axis=1) - - # Parse game date from ticker - def parse_game_date(ticker): - match = re.search(r"-(\d{2}[A-Z]{3}\d{2})", ticker) - if match: - date_str = match.group(1) - try: - year = ( - int(date_str[-2:]) + 2000 - if int(date_str[-2:]) < 50 - else 1900 + int(date_str[-2:]) - ) - month_map = { - "JAN": 1, - "FEB": 2, - "MAR": 3, - "APR": 4, - "MAY": 5, - "JUN": 6, - "JUL": 7, - "AUG": 8, - "SEP": 9, - "OCT": 10, - "NOV": 11, - "DEC": 12, - } - month = month_map.get(date_str[2:5]) - day = int(date_str[0:2]) - return pd.to_datetime(f"{year}-{month:02d}-{day:02d}") - except Exception: - pass - return pd.NaT - - df["game_date"] = df["ticker"].apply(parse_game_date) + df["game_date"] = df["ticker"].apply(_parse_market_date_from_ticker) # Bug Fix #4, #12: Filter using ticker (which exists) instead of series_ticker (which doesn't) # The series_ticker field doesn't exist in Kalshi API responses, use ticker or event_ticker instead diff --git a/neural/data_collection/polymarket_us.py b/neural/data_collection/polymarket_us.py index fdf1054b..cccbded6 100644 --- a/neural/data_collection/polymarket_us.py +++ b/neural/data_collection/polymarket_us.py @@ -12,6 +12,31 @@ from .base import DataSource +def _extract_game_context(raw: dict[str, Any]) -> dict[str, Any]: + market_sides = raw.get("marketSides") + home_team = None + away_team = None + if isinstance(market_sides, list): + for side in market_sides: + if not isinstance(side, dict): + continue + team = side.get("team") if isinstance(side.get("team"), dict) else {} + team_name = team.get("name") or side.get("description") + ordering = team.get("ordering") or side.get("ordering") + if ordering == "home": + home_team = team_name + elif ordering == "away": + away_team = team_name + + return { + "home_team": home_team, + "away_team": away_team, + "game_date": raw.get("gameStartTime") or raw.get("startDate") or raw.get("endDate"), + "market_type": raw.get("sportsMarketTypeV2") or raw.get("sportsMarketType") or raw.get("marketType"), + } + + + @dataclass(slots=True) class PolymarketUSConfig: sport: str | None = None @@ -71,20 +96,27 @@ def get_markets_df(self) -> pd.DataFrame: limit=self._source_cfg.limit, sports_only=self._source_cfg.sports_only, ) - rows = [ - { - "market_id": m.market_id, - "ticker": m.ticker, - "title": m.title, - "status": m.status, - "yes_price": m.yes_price, - "no_price": m.no_price, - "sport": m.sport, - "category": m.category, + rows = [] + for market in markets: + raw = market.metadata.get("raw", {}) if isinstance(market.metadata, dict) else {} + row = { + "market_id": market.market_id, + "ticker": market.ticker, + "title": market.title, + "status": market.status, + "yes_price": market.yes_price, + "no_price": market.no_price, + "sport": market.sport, + "category": market.category, } - for m in markets - ] - return pd.DataFrame(rows) + if isinstance(raw, dict): + row.update(_extract_game_context(raw)) + rows.append(row) + + df = pd.DataFrame(rows) + if not df.empty and "game_date" in df.columns: + df["game_date"] = pd.to_datetime(df["game_date"], errors="coerce", utc=True) + return df def get_market_history( self, diff --git a/neural/trading/polymarket_us_adapter.py b/neural/trading/polymarket_us_adapter.py index d424af70..aaea6428 100644 --- a/neural/trading/polymarket_us_adapter.py +++ b/neural/trading/polymarket_us_adapter.py @@ -28,6 +28,7 @@ _LOG = logging.getLogger(__name__) +DEFAULT_POLYMARKET_US_PUBLIC_BASE_URL = "https://gateway.polymarket.us" DEFAULT_POLYMARKET_US_MARKET_WS_URL = "wss://ws.polymarket.us/markets" DEFAULT_POLYMARKET_US_USER_WS_URL = "wss://ws.polymarket.us/user" @@ -40,27 +41,37 @@ class PolymarketUSAdapter(BaseExchangeAdapter): api_secret: bytes | None = None passphrase: str | None = None base_url: str | None = None + public_base_url: str | None = None timeout: int = 30 max_retries: int = 3 session: requests.Session | None = None name: str = "polymarket_us" _http: requests.Session = field(init=False) - _signer: PolymarketUSSigner = field(init=False) + _signer: PolymarketUSSigner | None = field(init=False, default=None) def __post_init__(self) -> None: BaseExchangeAdapter.__init__(self) creds: dict[str, Any] = {} if self.api_key is None or self.api_secret is None or self.passphrase is None: - creds = get_polymarket_us_credentials() + try: + creds = get_polymarket_us_credentials() + except (FileNotFoundError, ValueError): + creds = {} api_key = self.api_key or creds.get("api_key") api_secret = self.api_secret or creds.get("api_secret") passphrase = self.passphrase or creds.get("passphrase") + self.base_url = (self.base_url or get_polymarket_us_base_url()).rstrip("/") + self.public_base_url = (self.public_base_url or DEFAULT_POLYMARKET_US_PUBLIC_BASE_URL).rstrip("/") + self._http = self.session or requests.Session() + if api_key is None or api_secret is None or passphrase is None: - raise ValueError( - "Polymarket US credentials are required: api_key, api_secret, passphrase" - ) + self.api_key = None + self.api_secret = None + self.passphrase = None + self._signer = None + return self.api_key = str(api_key) if isinstance(api_secret, str): @@ -68,14 +79,11 @@ def __post_init__(self) -> None: else: self.api_secret = bytes(api_secret) self.passphrase = str(passphrase) - self.base_url = (self.base_url or get_polymarket_us_base_url()).rstrip("/") - self._signer = PolymarketUSSigner( api_key=self.api_key, api_secret=self.api_secret, passphrase=self.passphrase, ) - self._http = self.session or requests.Session() def capabilities(self) -> ExchangeCapabilities: return ExchangeCapabilities(read=True, paper=False, live=False, streaming=True) @@ -83,27 +91,46 @@ def capabilities(self) -> ExchangeCapabilities: def list_markets( self, *, sport: str | None = None, limit: int = 100, sports_only: bool = True ) -> list[NormalizedMarket]: - path = "/api/v1/sports/markets" if sports_only else "/api/v1/markets" - params: dict[str, Any] = {"limit": limit} - if sport: - params["sport"] = sport - - payload = self._request("GET", path, params=params) - rows = _extract_rows(payload) - out = [self._normalize_market(r) for r in rows] + target_sport = sport.lower() if sport else None + page_size = min(max(limit, 1), 200) + offset = 0 + out: list[NormalizedMarket] = [] - if sports_only: - out = [m for m in out if _is_sports_market(m)] - if sport: - out = [m for m in out if (m.sport or "").lower() == sport.lower()] - return out + while True: + params: dict[str, Any] = {"limit": page_size, "offset": offset, "active": "true"} + if sports_only: + params["categories"] = "sports" + + payload = self._request_public("GET", "/v1/markets", params=params) + rows = _extract_rows(payload) + if not rows: + break + + chunk = [self._normalize_market(r) for r in rows] + if sports_only: + chunk = [m for m in chunk if _is_sports_market(m)] + if target_sport: + chunk = [m for m in chunk if (m.sport or "").lower() == target_sport] + out.extend(chunk) + if len(out) >= limit: + return out[:limit] + if len(rows) < page_size: + break + offset += len(rows) + + return out[:limit] def get_quote(self, market_id: str) -> NormalizedQuote: - path = f"/api/v1/markets/{market_id}/book" - payload = self._request("GET", path) - row = payload.get("book") or payload.get("data") or payload - yes_bid = _to_prob(row.get("yes_bid") or row.get("best_bid_yes") or row.get("bid")) - yes_ask = _to_prob(row.get("yes_ask") or row.get("best_ask_yes") or row.get("ask")) + market_lookup_path = ( + f"/v1/market/id/{market_id}" if str(market_id).isdigit() else f"/v1/market/slug/{market_id}" + ) + market_payload = self._request_public("GET", market_lookup_path) + market_row = market_payload.get("market") or market_payload.get("data") or market_payload + market_slug = str(market_row.get("slug") or market_id) + payload = self._request_public("GET", f"/v1/markets/{market_slug}/bbo") + row = payload.get("marketData") or payload.get("data") or payload + yes_bid = _to_prob(row.get("bestBid") or row.get("bid") or row.get("yes_bid")) + yes_ask = _to_prob(row.get("bestAsk") or row.get("ask") or row.get("yes_ask")) no_bid = _to_prob(row.get("no_bid") or row.get("best_bid_no")) no_ask = _to_prob(row.get("no_ask") or row.get("best_ask_no")) @@ -118,10 +145,15 @@ def get_quote(self, market_id: str) -> NormalizedQuote: yes_ask=yes_ask, no_bid=no_bid, no_ask=no_ask, - last_price=_to_prob(row.get("last_price") or row.get("last")), - volume=_to_float(row.get("volume")), + last_price=_to_prob( + (row.get("lastTradePx") or {}).get("value") + or (row.get("currentPx") or {}).get("value") + or row.get("last_price") + or row.get("last") + ), + volume=_to_float(row.get("sharesTraded") or row.get("volume")), timestamp=_to_float(row.get("timestamp") or row.get("ts")), - metadata={"exchange": self.name, "raw": row}, + metadata={"exchange": self.name, "raw": {"market": market_row, "bbo": row}}, ) def place_order( @@ -141,7 +173,8 @@ def get_order_status(self, order_id: str) -> NormalizedOrderResult: ) def get_positions(self) -> list[NormalizedPosition]: - raw = self._request("GET", "/api/v1/portfolio/positions") + self._require_auth("portfolio positions") + raw = self._request("GET", "/api/v1/portfolio/positions", require_auth=True) rows = _extract_rows(raw) out: list[NormalizedPosition] = [] for row in rows: @@ -173,7 +206,7 @@ def get_candles( if end_ts_ms is not None: params["end_ts"] = end_ts_ms - payload = self._request("GET", f"/api/v1/markets/{market_id}/candles", params=params) + payload = self._request("GET", f"/api/v1/markets/{market_id}/candles", params=params, require_auth=True) rows = payload.get("candles") or payload.get("data") or [] if not isinstance(rows, list): return [] @@ -189,7 +222,7 @@ def get_trade_replay( params: dict[str, Any] = {"limit": limit} if cursor: params["cursor"] = cursor - payload = self._request("GET", f"/api/v1/markets/{market_id}/trades", params=params) + payload = self._request("GET", f"/api/v1/markets/{market_id}/trades", params=params, require_auth=True) rows = payload.get("trades") or payload.get("data") or [] clean_rows = rows if isinstance(rows, list) else [] return { @@ -208,7 +241,7 @@ def get_market_events( params: dict[str, Any] = {"limit": limit} if cursor: params["cursor"] = cursor - payload = self._request("GET", f"/api/v1/markets/{market_id}/events", params=params) + payload = self._request("GET", f"/api/v1/markets/{market_id}/events", params=params, require_auth=True) rows = payload.get("events") or payload.get("data") or [] clean_rows = rows if isinstance(rows, list) else [] return { @@ -221,13 +254,15 @@ def market_ws_client( self, url: str = DEFAULT_POLYMARKET_US_MARKET_WS_URL, ) -> PolymarketUSMarketWebSocketClient: - return PolymarketUSMarketWebSocketClient(url=url, signer=self._signer) + signer = self._require_auth("market websocket") + return PolymarketUSMarketWebSocketClient(url=url, signer=signer) def user_ws_client( self, url: str = DEFAULT_POLYMARKET_US_USER_WS_URL, ) -> PolymarketUSUserWebSocketClient: - return PolymarketUSUserWebSocketClient(url=url, signer=self._signer) + signer = self._require_auth("user websocket") + return PolymarketUSUserWebSocketClient(url=url, signer=signer) def close(self) -> None: self._http.close() @@ -239,12 +274,15 @@ def _request( *, params: dict[str, Any] | None = None, json_data: dict[str, Any] | None = None, + require_auth: bool = False, ) -> dict[str, Any]: body = json.dumps(json_data, separators=(",", ":")) if json_data else "" - headers = { - "Content-Type": "application/json", - **self._signer.headers(method, path, body=body), - } + headers = {"Content-Type": "application/json"} + if require_auth: + signer = self._require_auth(path) + headers.update(signer.headers(method, path, body=body)) + elif self._signer is not None: + headers.update(self._signer.headers(method, path, body=body)) url = f"{self.base_url}{path}" retry = 0 @@ -297,12 +335,42 @@ def _request( self._raise_http_error(response) + def _request_public( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + ) -> dict[str, Any]: + url = f"{self.public_base_url}{path}" + response = self._http.request(method=method, url=url, params=params, timeout=self.timeout) + if response.status_code < 400: + if response.text: + try: + data = response.json() + except ValueError as exc: + raise RuntimeError("Polymarket US response body was not valid JSON") from exc + else: + data = {} + if isinstance(data, dict): + return data + return {"data": data} + self._raise_http_error(response) + + def _require_auth(self, operation: str) -> PolymarketUSSigner: + if self._signer is None: + raise ValueError( + "Polymarket US credentials are required for " + f"{operation}: api_key, api_secret, passphrase" + ) + return self._signer + @staticmethod def _normalize_market(raw: dict[str, Any]) -> NormalizedMarket: market_id = str(raw.get("id") or raw.get("market_id") or raw.get("slug") or "") title = str(raw.get("title") or raw.get("question") or raw.get("name") or market_id) category = str(raw.get("category") or raw.get("topic") or "sports") - sport = str(raw.get("sport") or raw.get("league") or category) + sport = _extract_market_sport(raw) or str(raw.get("sport") or raw.get("league") or category) yes_price = _to_prob( raw.get("yes_price") or raw.get("price_yes") or raw.get("best_ask_yes") @@ -315,7 +383,7 @@ def _normalize_market(raw: dict[str, Any]) -> NormalizedMarket: market_id=market_id, ticker=str(raw.get("ticker") or raw.get("slug") or market_id), title=title, - status=str(raw.get("status") or "open"), + status=_extract_market_status(raw), yes_price=yes_price, no_price=no_price, category=category, @@ -352,6 +420,38 @@ def _extract_rows(payload: dict[str, Any]) -> list[dict[str, Any]]: return [] +def _extract_market_status(raw: dict[str, Any]) -> str: + status = raw.get("status") + if isinstance(status, str) and status: + return status + if raw.get("archived") is True: + return "archived" + if raw.get("active") is True: + return "open" + if raw.get("closed") is True: + return "closed" + return "inactive" + + +def _extract_market_sport(raw: dict[str, Any]) -> str | None: + league = raw.get("league") or raw.get("sport") + if isinstance(league, str) and league: + return league.lower() + + market_sides = raw.get("marketSides") + if isinstance(market_sides, list): + for side in market_sides: + if not isinstance(side, dict): + continue + team = side.get("team") + if not isinstance(team, dict): + continue + league = team.get("league") + if isinstance(league, str) and league: + return league.lower() + return None + + def _is_sports_market(market: NormalizedMarket) -> bool: haystack = f"{market.category or ''} {market.sport or ''} {market.title}".lower() return "sport" in haystack or any( diff --git a/tests/data_collection/test_kalshi_sports_parsing.py b/tests/data_collection/test_kalshi_sports_parsing.py new file mode 100644 index 00000000..c0a54d73 --- /dev/null +++ b/tests/data_collection/test_kalshi_sports_parsing.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import pandas as pd +import pytest + +from neural.data_collection import kalshi + + +def test_parse_market_date_from_ticker_uses_yy_mmm_dd() -> None: + parsed = kalshi._parse_market_date_from_ticker("KXNBAGAME-26MAR10CHIGSW-GSW") + + assert parsed == pd.Timestamp("2026-03-10") + + +def test_parse_matchup_teams_supports_title_at_winner_format() -> None: + row = pd.Series({"title": "Chicago at Golden State Winner?", "subtitle": ""}) + + teams = kalshi._parse_matchup_teams(row) + + assert teams.to_dict() == {"home_team": "Golden State", "away_team": "Chicago"} + + +@pytest.mark.asyncio +async def test_get_nba_games_enriches_rows(monkeypatch: pytest.MonkeyPatch) -> None: + sample = pd.DataFrame( + [ + { + "ticker": "KXNBAGAME-26MAR10CHIGSW-GSW", + "title": "Chicago at Golden State Winner?", + "subtitle": "", + "yes_ask": 70, + "volume": 0, + } + ] + ) + + async def fake_get_markets_by_sport(*args, **kwargs): + return sample + + monkeypatch.setattr(kalshi, "get_markets_by_sport", fake_get_markets_by_sport) + + result = await kalshi.get_nba_games(use_authenticated=False) + + assert len(result) == 1 + assert result.iloc[0]["home_team"] == "Golden State" + assert result.iloc[0]["away_team"] == "Chicago" + assert result.iloc[0]["game_date"] == pd.Timestamp("2026-03-10") + + +@pytest.mark.asyncio +async def test_get_nfl_games_enriches_rows(monkeypatch: pytest.MonkeyPatch) -> None: + sample = pd.DataFrame( + [ + { + "ticker": "KXNFLGAME-26SEP22DETBAL-BAL", + "title": "Will the Detroit beat the Baltimore?", + "subtitle": "", + } + ] + ) + + async def fake_get_markets_by_sport(*args, **kwargs): + return sample + + monkeypatch.setattr(kalshi, "get_markets_by_sport", fake_get_markets_by_sport) + + result = await kalshi.get_nfl_games(use_authenticated=False) + + assert len(result) == 1 + assert result.iloc[0]["home_team"] == "Baltimore" + assert result.iloc[0]["away_team"] == "Detroit" + assert result.iloc[0]["game_date"] == pd.Timestamp("2026-09-22") + + +@pytest.mark.asyncio +async def test_get_cfb_games_enriches_rows(monkeypatch: pytest.MonkeyPatch) -> None: + sample = pd.DataFrame( + [ + { + "ticker": "KXNCAAFGAME-26NOV28MICHOSU-OSU", + "title": "Michigan at Ohio State Winner?", + "subtitle": "", + } + ] + ) + + seen: dict[str, object] = {} + + async def fake_get_markets_by_sport(*args, **kwargs): + seen.update(kwargs) + return sample + + monkeypatch.setattr(kalshi, "get_markets_by_sport", fake_get_markets_by_sport) + + result = await kalshi.get_cfb_games(use_authenticated=False) + + assert seen["sport"] == "CFB" + assert len(result) == 1 + assert result.iloc[0]["home_team"] == "Ohio State" + assert result.iloc[0]["away_team"] == "Michigan" + assert result.iloc[0]["game_date"] == pd.Timestamp("2026-11-28") diff --git a/tests/data_collection/test_polymarket_us_source.py b/tests/data_collection/test_polymarket_us_source.py index ae014cd9..cb7155c0 100644 --- a/tests/data_collection/test_polymarket_us_source.py +++ b/tests/data_collection/test_polymarket_us_source.py @@ -3,6 +3,7 @@ from dataclasses import dataclass from typing import Any +import pandas as pd import pytest import neural.data_collection.polymarket_us as source_module @@ -25,13 +26,29 @@ def list_markets( return [ NormalizedMarket( market_id="MKT-1", - ticker="MKT-1", - title="Example", + ticker="nba-game-1", + title="Chicago vs. New York", status="open", yes_price=0.6, no_price=0.4, category="sports", - sport=sport or "nfl", + sport=sport or "nba", + metadata={ + "raw": { + "gameStartTime": "2026-03-10T23:00:00Z", + "marketType": "moneyline", + "marketSides": [ + { + "description": "Chicago", + "team": {"name": "Chicago Bulls", "ordering": "away", "league": "nba"}, + }, + { + "description": "New York", + "team": {"name": "New York Knicks", "ordering": "home", "league": "nba"}, + }, + ], + } + }, ) ] @@ -112,6 +129,18 @@ def __init__(self) -> None: PolymarketUSMarketsSource() +def test_get_markets_df_enriches_sports_rows() -> None: + source = PolymarketUSMarketsSource(adapter=FakeAdapter()) + markets = source.get_markets_df() + + assert markets.iloc[0]["ticker"] == "nba-game-1" + assert markets.iloc[0]["home_team"] == "New York Knicks" + assert markets.iloc[0]["away_team"] == "Chicago Bulls" + assert markets.iloc[0]["market_type"] == "moneyline" + assert markets.iloc[0]["game_date"] == pd.Timestamp("2026-03-10T23:00:00Z") + + + def test_market_history_uses_adapter_public_candles_api() -> None: source = PolymarketUSMarketsSource(adapter=FakeAdapter()) history = source.get_market_history("MKT-1") diff --git a/tests/exchanges/test_polymarket_adapter.py b/tests/exchanges/test_polymarket_adapter.py index ac37a5fd..14c05628 100644 --- a/tests/exchanges/test_polymarket_adapter.py +++ b/tests/exchanges/test_polymarket_adapter.py @@ -42,31 +42,66 @@ def request( timeout: int | None = None, ) -> FakeResponse: self.calls.append((method, url, {"params": params, "data": data, "headers": headers})) - if method == "GET" and url.endswith("/api/v1/sports/markets"): + if method == "GET" and url.endswith("/v1/markets"): return FakeResponse( { - "data": [ + "markets": [ { - "id": "MKT-SPORT-1", - "title": "Will Team A win?", + "id": "1", + "slug": "nfl-team-a-team-b", + "question": "Team A vs. Team B", "category": "sports", - "sport": "nfl", - "yes_price": 0.62, + "active": True, + "closed": False, + "marketType": "moneyline", + "gameStartTime": "2026-03-10T23:00:00Z", + "marketSides": [ + { + "description": "Team A", + "team": {"name": "Team A", "league": "nfl", "ordering": "away"}, + }, + { + "description": "Team B", + "team": {"name": "Team B", "league": "nfl", "ordering": "home"}, + }, + ], }, { - "id": "MKT-POL-1", - "title": "Will candidate X win?", + "id": "2", + "slug": "candidate-x", + "question": "Will candidate X win?", "category": "politics", - "yes_price": 0.51, + "active": True, + "closed": False, }, ] } ) - if method == "GET" and url.endswith("/api/v1/markets/MKT-SPORT-1/book"): - return FakeResponse({"book": {"yes_bid": 0.61, "yes_ask": 0.63, "volume": 1234}}) + if method == "GET" and url.endswith("/v1/market/id/1"): + return FakeResponse( + { + "market": { + "id": "1", + "slug": "nfl-team-a-team-b", + "question": "Team A vs. Team B", + } + } + ) + + if method == "GET" and url.endswith("/v1/markets/nfl-team-a-team-b/bbo"): + return FakeResponse( + { + "marketData": { + "bestBid": 0.61, + "bestAsk": 0.63, + "lastTradePx": {"value": "0.62"}, + "sharesTraded": "1234", + } + } + ) - if method == "GET" and url.endswith("/api/v1/markets/MKT-SPORT-1/candles"): + if method == "GET" and url.endswith("/api/v1/markets/1/candles"): return FakeResponse( { "candles": [ @@ -81,7 +116,7 @@ def request( } ) - if method == "GET" and url.endswith("/api/v1/markets/MKT-SPORT-1/trades"): + if method == "GET" and url.endswith("/api/v1/markets/1/trades"): return FakeResponse( { "trades": [ @@ -97,7 +132,7 @@ def request( } ) - if method == "GET" and url.endswith("/api/v1/markets/MKT-SPORT-1/events"): + if method == "GET" and url.endswith("/api/v1/markets/1/events"): return FakeResponse( { "events": [ @@ -112,7 +147,7 @@ def request( { "positions": [ { - "market_id": "MKT-SPORT-1", + "market_id": "1", "side": "yes", "quantity": 10, "entry_price": 0.6, @@ -122,7 +157,7 @@ def request( } ) - if method == "GET" and url.endswith("/api/v1/markets/MKT-SPORT-1/candles"): + if method == "GET" and url.endswith("/api/v1/markets/1/candles"): return FakeResponse({"candles": [{"timestamp": 1700000000000, "open": 0.5}]}) return FakeResponse({}, status_code=404) @@ -142,6 +177,19 @@ def _new_adapter(session: FakeSession, **kwargs: Any) -> PolymarketUSAdapter: ) +def _new_public_adapter( + session: FakeSession, + monkeypatch: pytest.MonkeyPatch, + **kwargs: Any, +) -> PolymarketUSAdapter: + monkeypatch.setattr(adapter_module, "get_polymarket_us_credentials", lambda: {}) + return PolymarketUSAdapter( + base_url="https://api.polymarket.us", + session=session, + **kwargs, + ) + + def test_list_markets_filters_non_sports_by_default() -> None: session = FakeSession() adapter = _new_adapter(session) @@ -149,16 +197,17 @@ def test_list_markets_filters_non_sports_by_default() -> None: markets = adapter.list_markets(limit=20, sports_only=True) assert len(markets) == 1 - assert markets[0].market_id == "MKT-SPORT-1" + assert markets[0].market_id == "1" + assert markets[0].sport == "nfl" def test_get_quote_maps_book_shape() -> None: session = FakeSession() adapter = _new_adapter(session) - quote = adapter.get_quote("MKT-SPORT-1") + quote = adapter.get_quote("1") - assert quote.market_id == "MKT-SPORT-1" + assert quote.market_id == "1" assert quote.yes_bid == pytest.approx(0.61) assert quote.yes_ask == pytest.approx(0.63) assert quote.no_bid == pytest.approx(0.37) @@ -179,7 +228,7 @@ def test_get_positions_returns_normalized_rows() -> None: positions = adapter.get_positions() assert len(positions) == 1 - assert positions[0].market_id == "MKT-SPORT-1" + assert positions[0].market_id == "1" assert positions[0].quantity == 10 @@ -187,7 +236,7 @@ def test_get_candles_returns_rows() -> None: session = FakeSession() adapter = _new_adapter(session) - rows = adapter.get_candles("MKT-SPORT-1", interval="1m", limit=5) + rows = adapter.get_candles("1", interval="1m", limit=5) assert len(rows) == 1 assert rows[0]["close"] == pytest.approx(0.55) @@ -197,8 +246,8 @@ def test_trade_replay_and_event_replay_return_cursor() -> None: session = FakeSession() adapter = _new_adapter(session) - trades = adapter.get_trade_replay("MKT-SPORT-1", limit=25) - events = adapter.get_market_events("MKT-SPORT-1", limit=25) + trades = adapter.get_trade_replay("1", limit=25) + events = adapter.get_market_events("1", limit=25) assert len(trades["items"]) == 1 assert trades["next_cursor"] == "next-1" @@ -223,7 +272,7 @@ def request( adapter = _new_adapter(_SoftFailSession()) with pytest.raises(RuntimeError, match="raise_for_status\\(\\) did not raise"): - adapter.get_quote("MKT-SPORT-1") + adapter.get_quote("1") def test_request_raises_for_invalid_json_response() -> None: @@ -262,18 +311,122 @@ def test_numeric_parsing_helpers_return_none_for_invalid_values() -> None: assert _to_float(object()) is None -def test_adapter_raises_clear_error_for_missing_credentials( + + +def test_list_markets_paginates_until_sport_filter_is_satisfied( monkeypatch: pytest.MonkeyPatch, ) -> None: - monkeypatch.setattr(adapter_module, "get_polymarket_us_credentials", lambda: {}) - with pytest.raises(ValueError, match="credentials are required"): - PolymarketUSAdapter(base_url="https://api.polymarket.us") + class _PagedSession(FakeSession): + def request( + self, + method: str, + url: str, + *, + params: dict[str, Any] | None = None, + data: str | None = None, + headers: dict[str, Any] | None = None, + timeout: int | None = None, + ) -> FakeResponse: + if method == "GET" and url.endswith("/v1/markets"): + offset = int((params or {}).get("offset", 0)) + if offset == 0: + return FakeResponse( + { + "markets": [ + { + "id": "10", + "slug": "nfl-a", + "question": "NFL A vs. B", + "category": "sports", + "active": True, + "closed": False, + "marketSides": [ + {"team": {"name": "A", "league": "nfl", "ordering": "away"}}, + {"team": {"name": "B", "league": "nfl", "ordering": "home"}}, + ], + } + ] + } + ) + if offset == 1: + return FakeResponse( + { + "markets": [ + { + "id": "20", + "slug": "nba-a", + "question": "NBA A vs. B", + "category": "sports", + "active": True, + "closed": False, + "marketSides": [ + {"team": {"name": "A", "league": "nba", "ordering": "away"}}, + {"team": {"name": "B", "league": "nba", "ordering": "home"}}, + ], + } + ] + } + ) + return FakeResponse({"markets": []}) + return super().request( + method, + url, + params=params, + data=data, + headers=headers, + timeout=timeout, + ) + + adapter = _new_public_adapter(_PagedSession(), monkeypatch) + markets = adapter.list_markets(sport="nba", limit=1) + + assert len(markets) == 1 + assert markets[0].sport == "nba" + assert markets[0].market_id == "20" + + +def test_public_market_reads_work_without_credentials( + monkeypatch: pytest.MonkeyPatch, +) -> None: + session = FakeSession() + adapter = _new_public_adapter(session, monkeypatch) + + markets = adapter.list_markets(limit=20, sports_only=True) + quote = adapter.get_quote("1") + + assert len(markets) == 1 + assert markets[0].status == "open" + assert quote.market_id == "1" + assert quote.last_price == pytest.approx(0.62) + first_headers = session.calls[0][2]["headers"] or {} + assert "PM-ACCESS-KEY" not in first_headers + assert "PM-ACCESS-SIGNATURE" not in first_headers + + + +def test_private_polymarket_methods_still_require_credentials( + monkeypatch: pytest.MonkeyPatch, +) -> None: + adapter = _new_public_adapter(FakeSession(), monkeypatch) + + with pytest.raises(ValueError, match="credentials are required for portfolio positions"): + adapter.get_positions() + with pytest.raises(ValueError, match="credentials are required for /api/v1/markets/1/candles"): + adapter.get_candles("1") + with pytest.raises(ValueError, match="credentials are required for /api/v1/markets/1/trades"): + adapter.get_trade_replay("1") + with pytest.raises(ValueError, match="credentials are required for /api/v1/markets/1/events"): + adapter.get_market_events("1") + with pytest.raises(ValueError, match="credentials are required for market websocket"): + adapter.market_ws_client() + with pytest.raises(ValueError, match="credentials are required for user websocket"): + adapter.user_ws_client() def test_get_candles_returns_normalized_rows() -> None: session = FakeSession() adapter = _new_adapter(session) - rows = adapter.get_candles("MKT-SPORT-1") + rows = adapter.get_candles("1") assert rows assert rows[0]["timestamp"] == 1700000000000 assert rows[0]["open"] == 0.5 diff --git a/tests/streaming/test_deterministic_cross_provider_replay.py b/tests/streaming/test_deterministic_cross_provider_replay.py new file mode 100644 index 00000000..07f4d6c0 --- /dev/null +++ b/tests/streaming/test_deterministic_cross_provider_replay.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import importlib.util +from pathlib import Path + + +def _load_example_module(): + root = Path(__file__).resolve().parents[2] + module_path = root / "examples" / "13_deterministic_cross_provider_replay.py" + spec = importlib.util.spec_from_file_location("deterministic_cross_provider_replay", module_path) + assert spec is not None + assert spec.loader is not None + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def test_deterministic_replay_produces_expected_signals(): + module = _load_example_module() + + stream = module.build_replay_stream() + snapshot = module.latest_snapshot(stream) + analysis = module.analyze_standardized_stream(stream) + + assert not stream.empty + assert not snapshot.empty + assert not analysis.empty + + signals = { + row["market_id"]: row["signal"] + for row in analysis[["market_id", "signal"]].to_dict("records") + } + + assert signals["KXNBAGAME-26MAR10CHINYK-NYK"] == "buy_yes" + assert signals["pm-bos-lal-moneyline"] == "buy_no" + assert signals["pm-den-dal-moneyline"] == "hold" + + assert set(snapshot["exchange"]) == {"kalshi", "polymarket_us"}