diff --git a/.gitignore b/.gitignore index ea8c4bf..14edbf0 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,5 @@ /target + +data/ + +ghostguard.toml \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index c169363..b4d46b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "1.0.0" @@ -327,6 +342,21 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "cassowary" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53" + +[[package]] +name = "castaway" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dec551ab6e7578819132c713a93c022a05d60159dc86e7a7050223577484c55a" +dependencies = [ + "rustversion", +] + [[package]] name = "cc" version = "1.2.60" @@ -357,7 +387,10 @@ version = "0.4.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ + "iana-time-zone", "num-traits", + "serde", + "windows-link", ] [[package]] @@ -468,6 +501,20 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" +[[package]] +name = "compact_str" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b79c4069c6cad78e2e0cdfcbd26275770669fb39fd308a752dc110e83b9af32" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "rustversion", + "ryu", + "static_assertions", +] + [[package]] name = "const-hex" version = "1.18.1" @@ -581,6 +628,31 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crossterm" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6" +dependencies = [ + "bitflags 2.11.0", + "crossterm_winapi", + "mio", + "parking_lot", + "rustix 0.38.44", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b" +dependencies = [ + "winapi", +] + [[package]] name = "crunchy" version = "0.2.4" @@ -618,6 +690,40 @@ dependencies = [ "cipher", ] +[[package]] +name = "darling" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ae13da2f202d56bd7f91c25fba009e7717a1e4a1cc98a76d844b65ae912e9d" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9865a50f7c335f53564bb694ef660825eb8610e0a53d3e11bf1b0d3df31e03b0" +dependencies = [ + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.117", +] + +[[package]] +name = "darling_macro" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3984ec7bd6cfa798e62b4a642426a5be0e68f9401cfc2a01e3fa9ea2fcdb8d" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.117", +] + [[package]] name = "data-encoding" version = "2.10.0" @@ -1414,15 +1520,20 @@ name = "ghostguard" version = "0.1.0" dependencies = [ "anyhow", + "chrono", "clap", + "crossterm", "ethers", "futures-util", "hex", + "ratatui", "reqwest 0.12.28", "serde", "serde_json", + "tempfile", "tokio", "tokio-tungstenite 0.24.0", + "toml", "tracing", "tracing-subscriber", ] @@ -1481,6 +1592,8 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ + "allocator-api2", + "equivalent", "foldhash", ] @@ -1699,6 +1812,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "2.2.0" @@ -1787,6 +1924,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.1.0" @@ -1864,6 +2007,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "indoc" +version = "2.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79cf5c93f93228cf8efb3ba362535fb11199ac548a09ce117c9b1adc3030d706" +dependencies = [ + "rustversion", +] + [[package]] name = "inout" version = "0.1.4" @@ -1873,6 +2025,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instability" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb2d60ef19920a3a9193c3e371f726ec1dafc045dac788d0fb3704272458971" +dependencies = [ + "darling", + "indoc", + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "instant" version = "0.1.13" @@ -1913,6 +2078,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.18" @@ -1987,7 +2161,7 @@ dependencies = [ "ascii-canvas", "bit-set", "ena", - "itertools", + "itertools 0.11.0", "lalrpop-util", "petgraph", "regex", @@ -2035,6 +2209,12 @@ dependencies = [ "libc", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" + [[package]] name = "linux-raw-sys" version = "0.12.1" @@ -2062,6 +2242,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -2116,6 +2305,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.61.2", ] @@ -2312,6 +2502,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "path-slash" version = "0.2.1" @@ -2559,7 +2755,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.37", - "socket2 0.5.10", + "socket2 0.6.3", "thiserror 2.0.18", "tokio", "tracing", @@ -2596,7 +2792,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.3", "tracing", "windows-sys 0.52.0", ] @@ -2696,6 +2892,27 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "ratatui" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdef7f9be5c0122f890d58bdf4d964349ba6a6161f705907526d891efabba57d" +dependencies = [ + "bitflags 2.11.0", + "cassowary", + "compact_str", + "crossterm", + "instability", + "itertools 0.13.0", + "lru", + "paste", + "strum", + "strum_macros", + "unicode-segmentation", + "unicode-truncate", + "unicode-width", +] + [[package]] name = "rayon" version = "1.11.0" @@ -2935,6 +3152,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.38.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +dependencies = [ + "bitflags 2.11.0", + "errno", + "libc", + "linux-raw-sys 0.4.15", + "windows-sys 0.52.0", +] + [[package]] name = "rustix" version = "1.1.4" @@ -2944,7 +3174,7 @@ dependencies = [ "bitflags 2.11.0", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.12.1", "windows-sys 0.61.2", ] @@ -3287,6 +3517,27 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d881a16cf4426aa584979d30bd82cb33429027e42122b169753d6ef1085ed6e2" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-mio" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b75a19a7a740b25bc7944bdee6172368f988763b744e3d4dfe753f6b4ece40cc" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.8" @@ -3369,7 +3620,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c425ce1c59f4b154717592f0bdf4715c3a1d55058883622d3157e1f0908a5b26" dependencies = [ - "itertools", + "itertools 0.11.0", "lalrpop", "lalrpop-util", "phf", @@ -3555,7 +3806,7 @@ dependencies = [ "fastrand", "getrandom 0.4.2", "once_cell", - "rustix", + "rustix 1.1.4", "windows-sys 0.61.2", ] @@ -4039,6 +4290,29 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "unicode-segmentation" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" + +[[package]] +name = "unicode-truncate" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3644627a5af5fa321c95b9b235a72fd24cd29c648c2c379431e6628655627bf" +dependencies = [ + "itertools 0.13.0", + "unicode-segmentation", + "unicode-width", +] + +[[package]] +name = "unicode-width" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + [[package]] name = "unicode-xid" version = "0.2.6" @@ -4307,12 +4581,65 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 16f4d1c..f56f1e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,10 @@ futures-util = "0.3" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } hex = "0.4" +toml = "0.8" +ratatui = "0.28" +crossterm = "0.28" +chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] } + +[dev-dependencies] +tempfile = "3" diff --git a/ghostguard.example.toml b/ghostguard.example.toml new file mode 100644 index 0000000..633d47a --- /dev/null +++ b/ghostguard.example.toml @@ -0,0 +1,34 @@ +# GhostGuard example config. Copy to `ghostguard.toml` and pass +# `--config ghostguard.toml` to the binary. +# +# All sections and fields are optional. Missing values fall back to +# Config::default(). CLI flags override anything here. + +[rpc] +url = "https://polygon-rpc.com" + +[clob] +ws_url = "wss://ws-subscriptions-clob.polymarket.com/ws/market" +# Comma-separated asset / market IDs to subscribe to. Empty = user channel. +asset_ids = [] + +[detection] +verify_timeout_secs = 10 +poll_interval_ms = 500 +predictive_enabled = false +predictive_threshold = 0.7 +avg_window = 50 + +[alerts] +# webhook_url = "http://localhost:9090/ghost-alert" +verdict_log = "data/verdicts.jsonl" +predictive_log = "data/predictive_warnings.jsonl" + +# Phase 3 — parsed but not active yet. +[defense] +auto_cancel = false +auto_blacklist = true + +# Phase 4 — parsed but not active yet. +[api] +listen = "127.0.0.1:8080" diff --git a/src/api.rs b/src/api.rs new file mode 100644 index 0000000..345181e --- /dev/null +++ b/src/api.rs @@ -0,0 +1,14 @@ +//! Phase 4: HTTP status endpoint. +//! +//! TODO(phase-4): +//! - Listen on `config.api.listen` (default 127.0.0.1:8080) with an axum/hyper server. +//! - Routes: +//! - `GET /status` — uptime, connected markets, total fills verified +//! - `GET /verdicts` — last 100 verdicts (tail JSONL log) +//! - `GET /blacklist` — current blacklisted addresses (reads defense state) +//! - `GET /stats` — ghost rate %, avg verification latency, predictive accuracy +//! +//! Config hook already parsed (see `config.rs::ApiSection`): +//! listen +//! +//! Intentionally empty until Phase 4 implementation lands. diff --git a/src/bin/cli.rs b/src/bin/cli.rs index ce26d57..95f1e5e 100644 --- a/src/bin/cli.rs +++ b/src/bin/cli.rs @@ -1,7 +1,10 @@ use anyhow::Result; use clap::Parser; use ethers::types::H256; +use ghostguard::config::FileConfig; use ghostguard::{Config, FillVerdict, GhostGuard}; +use std::fs::{create_dir_all, OpenOptions}; +use std::path::{Path, PathBuf}; use std::time::Duration; use tracing_subscriber::EnvFilter; @@ -10,56 +13,149 @@ use tracing_subscriber::EnvFilter; #[command(about = "Detect ghost fills on Polymarket CLOB")] #[command(version)] struct Args { - /// Polygon JSON-RPC URL - #[arg(long, default_value = "https://polygon-rpc.com")] - rpc: String, + /// Path to TOML config file. Values here are overridden by explicit CLI flags. + #[arg(long)] + config: Option, + + /// Polygon JSON-RPC URL. Alternatives if polygon-rpc.com fails: + /// https://polygon.drpc.org, https://polygon.llamarpc.com, + /// https://rpc.ankr.com/polygon, https://polygon-bor-rpc.publicnode.com + #[arg(long)] + rpc: Option, /// Polymarket CLOB WebSocket URL - #[arg( - long, - default_value = "wss://ws-subscriptions-clob.polymarket.com/ws/market" - )] - clob_ws: String, + #[arg(long)] + clob_ws: Option, /// Webhook URL to POST fill verdicts to #[arg(long)] webhook: Option, /// Verification timeout in seconds - #[arg(long, default_value = "10")] - timeout: u64, + #[arg(long)] + timeout: Option, /// Poll interval in milliseconds - #[arg(long, default_value = "500")] - poll_ms: u64, + #[arg(long)] + poll_ms: Option, /// Verify a single tx hash and exit (for testing) #[arg(long)] verify_tx: Option, - /// Comma-separated list of market IDs to monitor. If empty, subscribes to user channel. + /// Comma-separated list of market / asset IDs to monitor. #[arg(long, value_delimiter = ',')] markets: Vec, + + /// Enable predictive ghost fill scoring + #[arg(long)] + predictive: bool, + + /// Risk threshold for predictive warnings (0.0-1.0) + #[arg(long)] + predictive_threshold: Option, + + /// Rolling window size for average trade size per market + #[arg(long)] + avg_window: Option, + + /// Path to JSONL verdict log. Empty string disables. + #[arg(long)] + verdict_log: Option, + + /// Path to JSONL predictive warning log. Empty string disables. + #[arg(long)] + predictive_log: Option, + + /// Launch the ratatui dashboard instead of stdout output + #[arg(long)] + tui: bool, +} + +const TUI_LOG_PATH: &str = "data/ghostguard.log"; + +fn open_tui_log() -> Option { + let path = Path::new(TUI_LOG_PATH); + if let Some(parent) = path.parent() { + let _ = create_dir_all(parent); + } + OpenOptions::new().create(true).append(true).open(path).ok() } #[tokio::main] async fn main() -> Result<()> { - tracing_subscriber::fmt() - .with_env_filter( - EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")), - ) - .init(); - let args = Args::parse(); - let config = Config { - rpc_url: args.rpc.clone(), - clob_ws_url: args.clob_ws.clone(), - verify_timeout: Duration::from_secs(args.timeout), - poll_interval: Duration::from_millis(args.poll_ms), - webhook_url: args.webhook.clone(), - markets: args.markets.clone(), - }; + // Tracing setup: + // - Non-TUI mode: write to stderr at info level. + // - TUI mode: route tracing to a log file (stderr would corrupt the alt + // screen). Default level = info, override via RUST_LOG. + if args.tui { + if let Some(file) = open_tui_log() { + tracing_subscriber::fmt() + .with_writer(std::sync::Mutex::new(file)) + .with_ansi(false) + .with_env_filter( + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")), + ) + .init(); + eprintln!("TUI logs: {TUI_LOG_PATH} (tail -f to debug)"); + } + } else { + tracing_subscriber::fmt() + .with_writer(std::io::stderr) + .with_env_filter( + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")), + ) + .init(); + } + + // 1. Start with Config::default() + let mut config = Config::default(); + + // 2. Apply TOML if provided + if let Some(ref path) = args.config { + let file = FileConfig::load(path).await?; + config = file.apply_to(config); + } + + // 3. Apply explicit CLI flags (they override TOML) + if let Some(v) = args.rpc.clone() { + config.rpc_url = v; + } + if let Some(v) = args.clob_ws.clone() { + config.clob_ws_url = v; + } + if let Some(v) = args.webhook.clone() { + config.webhook_url = Some(v); + } + if let Some(v) = args.timeout { + config.verify_timeout = Duration::from_secs(v); + } + if let Some(v) = args.poll_ms { + config.poll_interval = Duration::from_millis(v); + } + if !args.markets.is_empty() { + config.markets = args.markets.clone(); + } + if args.predictive { + config.predictive_enabled = true; + } + if let Some(v) = args.predictive_threshold { + config.predictive_threshold = v; + } + if let Some(v) = args.avg_window { + config.avg_window = v; + } + if let Some(v) = args.verdict_log.clone() { + config.verdict_log = v; + } + if let Some(v) = args.predictive_log.clone() { + config.predictive_log = v; + } + if args.tui { + config.tui_mode = true; + } // Single tx verification mode if let Some(tx_str) = args.verify_tx { @@ -92,32 +188,63 @@ async fn main() -> Result<()> { return Ok(()); } - // Sidecar mode: listen to CLOB websocket and verify fills - println!("Starting GhostGuard sidecar..."); - println!(" RPC: {}", args.rpc); - println!(" CLOB WS: {}", args.clob_ws); - if let Some(ref wh) = args.webhook { - println!(" Webhook: {wh}"); + // Sidecar mode + let tui_mode = config.tui_mode; + + if !tui_mode { + println!("Starting GhostGuard sidecar..."); + println!(" RPC: {}", config.rpc_url); + println!(" CLOB WS: {}", config.clob_ws_url); + if !config.markets.is_empty() { + println!(" Markets: {}", config.markets.join(", ")); + } + if let Some(ref wh) = config.webhook_url { + println!(" Webhook: {wh}"); + } + if config.predictive_enabled { + println!( + " Predictive: ENABLED (threshold={:.2}, window={})", + config.predictive_threshold, config.avg_window + ); + } + if !config.verdict_log.is_empty() { + println!(" Verdicts: {} (JSONL)", config.verdict_log); + } + if !config.predictive_log.is_empty() && config.predictive_enabled { + println!(" Warnings: {} (JSONL)", config.predictive_log); + } + println!(); } - println!(); let mut guard = GhostGuard::new(config); - guard.on_real_fill(|verdict| { - if let FillVerdict::Real { tx_hash, block } = verdict { - println!("[REAL] tx={tx_hash:?} block={block}"); - } - }); - - guard.on_ghost_fill(|event| { - println!( - "[GHOST] tx={:?} reason={} market={} side={} size={} price={}", - event.tx_hash, event.reason, event.market, event.side, event.size, event.price, - ); - if let Some(cp) = event.counterparty { - println!(" counterparty={cp:?}"); - } - }); + // In TUI mode the dashboard owns the terminal — skip println callbacks + // so they don't corrupt the alt screen. The TUI gets events via its own + // internal channel. + if !tui_mode { + guard.on_real_fill(|verdict| { + if let FillVerdict::Real { tx_hash, block } = verdict { + println!("[REAL] tx={tx_hash:?} block={block}"); + } + }); + + guard.on_ghost_fill(|event| { + println!( + "[GHOST] tx={:?} reason={} market={} side={} size={} price={}", + event.tx_hash, event.reason, event.market, event.side, event.size, event.price, + ); + if let Some(cp) = event.counterparty { + println!(" counterparty={cp:?}"); + } + }); + + guard.on_predictive_warning(|w| { + println!( + "[WARN score={:.2}] tx={:?} market={} price_dev={:.3} size_anom={:.2}", + w.score, w.tx_hash, w.market, w.price_deviation, w.size_anomaly, + ); + }); + } guard.start().await?; diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..340580a --- /dev/null +++ b/src/config.rs @@ -0,0 +1,208 @@ +//! TOML configuration loader for GhostGuard. +//! +//! Precedence (highest first): +//! 1. Explicit CLI flags +//! 2. TOML file (if `--config` was passed) +//! 3. `Config::default()` +//! +//! The TOML shape is a hierarchical superset of the flat runtime `Config`. + +use anyhow::{Context, Result}; +use serde::Deserialize; +use std::path::Path; +use std::time::Duration; + +use crate::types::Config; + +/// TOML-facing configuration shape. All fields optional so partial files work. +#[derive(Debug, Default, Deserialize)] +pub struct FileConfig { + #[serde(default)] + pub rpc: Option, + #[serde(default)] + pub clob: Option, + #[serde(default)] + pub detection: Option, + #[serde(default)] + pub alerts: Option, + // Phase 3 — parsed but unused in this session. + #[serde(default)] + pub defense: Option, + // Phase 4 — parsed but unused in this session. + #[serde(default)] + pub api: Option, +} + +#[derive(Debug, Default, Deserialize)] +pub struct RpcSection { + pub url: Option, +} + +#[derive(Debug, Default, Deserialize)] +pub struct ClobSection { + pub ws_url: Option, + #[serde(default)] + pub asset_ids: Vec, +} + +#[derive(Debug, Default, Deserialize)] +pub struct DetectionSection { + pub verify_timeout_secs: Option, + pub poll_interval_ms: Option, + pub predictive_enabled: Option, + pub predictive_threshold: Option, + pub avg_window: Option, +} + +#[derive(Debug, Default, Deserialize)] +pub struct AlertsSection { + pub webhook_url: Option, + pub verdict_log: Option, + pub predictive_log: Option, +} + +#[derive(Debug, Default, Deserialize)] +pub struct DefenseSection { + pub auto_cancel: Option, + pub auto_blacklist: Option, +} + +#[derive(Debug, Default, Deserialize)] +pub struct ApiSection { + pub listen: Option, +} + +impl FileConfig { + /// Load and parse a TOML config file. + pub async fn load(path: impl AsRef) -> Result { + let path = path.as_ref(); + let contents = tokio::fs::read_to_string(path) + .await + .with_context(|| format!("read {path:?}"))?; + let parsed: FileConfig = + toml::from_str(&contents).with_context(|| format!("parse TOML in {path:?}"))?; + Ok(parsed) + } + + /// Merge this file config onto a base `Config`. File values override the base. + /// The caller should then apply CLI overrides on top of the result. + pub fn apply_to(self, mut base: Config) -> Config { + if let Some(rpc) = self.rpc { + if let Some(url) = rpc.url { + base.rpc_url = url; + } + } + if let Some(clob) = self.clob { + if let Some(ws) = clob.ws_url { + base.clob_ws_url = ws; + } + if !clob.asset_ids.is_empty() { + base.markets = clob.asset_ids; + } + } + if let Some(det) = self.detection { + if let Some(t) = det.verify_timeout_secs { + base.verify_timeout = Duration::from_secs(t); + } + if let Some(ms) = det.poll_interval_ms { + base.poll_interval = Duration::from_millis(ms); + } + if let Some(b) = det.predictive_enabled { + base.predictive_enabled = b; + } + if let Some(t) = det.predictive_threshold { + base.predictive_threshold = t; + } + if let Some(w) = det.avg_window { + base.avg_window = w; + } + } + if let Some(alerts) = self.alerts { + if let Some(w) = alerts.webhook_url { + base.webhook_url = Some(w); + } + if let Some(p) = alerts.verdict_log { + base.verdict_log = p; + } + if let Some(p) = alerts.predictive_log { + base.predictive_log = p; + } + } + // defense and api sections are parsed but not wired into runtime yet. + let _ = self.defense; + let _ = self.api; + base + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_full_toml() { + let toml_src = r#" +[rpc] +url = "https://alchemy" + +[clob] +ws_url = "wss://custom" +asset_ids = ["aaa", "bbb"] + +[detection] +verify_timeout_secs = 15 +poll_interval_ms = 250 +predictive_enabled = true +predictive_threshold = 0.5 +avg_window = 100 + +[alerts] +webhook_url = "http://hook" +verdict_log = "/tmp/v.jsonl" +predictive_log = "/tmp/p.jsonl" + +[defense] +auto_cancel = true +auto_blacklist = false + +[api] +listen = "0.0.0.0:9000" +"#; + let parsed: FileConfig = toml::from_str(toml_src).unwrap(); + let merged = parsed.apply_to(Config::default()); + + assert_eq!(merged.rpc_url, "https://alchemy"); + assert_eq!(merged.clob_ws_url, "wss://custom"); + assert_eq!(merged.markets, vec!["aaa".to_string(), "bbb".to_string()]); + assert_eq!(merged.verify_timeout, Duration::from_secs(15)); + assert_eq!(merged.poll_interval, Duration::from_millis(250)); + assert!(merged.predictive_enabled); + assert_eq!(merged.predictive_threshold, 0.5); + assert_eq!(merged.avg_window, 100); + assert_eq!(merged.webhook_url.as_deref(), Some("http://hook")); + assert_eq!(merged.verdict_log, "/tmp/v.jsonl"); + assert_eq!(merged.predictive_log, "/tmp/p.jsonl"); + } + + #[test] + fn test_partial_toml_keeps_defaults() { + let toml_src = r#" +[detection] +predictive_threshold = 0.9 +"#; + let parsed: FileConfig = toml::from_str(toml_src).unwrap(); + let merged = parsed.apply_to(Config::default()); + + assert_eq!(merged.predictive_threshold, 0.9); + // Unspecified fields unchanged: + assert_eq!(merged.rpc_url, "https://polygon-rpc.com"); + assert!(!merged.predictive_enabled); + } + + #[test] + fn test_empty_toml() { + let parsed: FileConfig = toml::from_str("").unwrap(); + let merged = parsed.apply_to(Config::default()); + assert_eq!(merged.rpc_url, Config::default().rpc_url); + } +} diff --git a/src/defense.rs b/src/defense.rs new file mode 100644 index 0000000..9a809ee --- /dev/null +++ b/src/defense.rs @@ -0,0 +1,11 @@ +//! Phase 3: auto-defense (auto-cancel + blacklist). +//! +//! TODO(phase-3): +//! - On ghost detection, cancel all open orders on the affected market via CLOB REST API. +//! - Maintain a persistent blacklist file (`data/blacklist.txt`) of counterparties. +//! - Subscribe to fill stream and emit `BlacklistWarning` when a listed taker appears. +//! +//! Config hooks already parsed (see `config.rs::DefenseSection`): +//! auto_cancel, auto_blacklist +//! +//! Intentionally empty until Phase 3 implementation lands. diff --git a/src/detection.rs b/src/detection.rs new file mode 100644 index 0000000..c6eab19 --- /dev/null +++ b/src/detection.rs @@ -0,0 +1,471 @@ +use anyhow::{anyhow, Context, Result}; +use ethers::providers::{Http, Middleware, Provider, RawCall}; +use ethers::types::{Address, H256}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::mpsc; +use tokio::time::{sleep, Instant}; +use tracing::{debug, error, info, warn}; + +use crate::callback; +use crate::logging::JsonlWriter; +use crate::tui::TuiEvent; +use crate::types::{Config, FillVerdict, GhostFillEvent, TRANSFER_FROM_FAILED_SELECTOR}; +use crate::ws::ClobFill; + +/// Max consecutive RPC connection errors before verify_fill bails out. +/// This avoids burning the whole verify_timeout when the RPC is unreachable. +const MAX_CONSECUTIVE_RPC_ERRORS: u32 = 3; + +/// Minimum seconds between TUI [SYS] "RPC unreachable" messages — otherwise +/// a broken RPC would flood the feed with one entry per fill. +const RPC_ERROR_NOTICE_INTERVAL_SECS: u64 = 30; + +/// Callbacks dispatched by `handle_fill` after on-chain verification. +pub type VerdictCallback = Arc; +pub type GhostCallback = Arc; + +/// Dependencies for the detection pipeline. +#[derive(Clone)] +pub struct DetectionContext { + pub config: Arc, + pub verdict_log: Option>, + pub on_real: Arc>, + pub on_ghost: Arc>, + /// When TUI mode is enabled, verdicts are forwarded here for rendering. + pub tui_tx: Option>, + /// Unix timestamp of the last RPC-failure notice sent to the TUI. + /// Used to rate-limit the [SYS] "RPC unreachable" messages. + pub last_rpc_notice: Arc, +} + +/// Verify a single fill transaction on-chain. +/// +/// Polls `eth_getTransactionReceipt` until: +/// - `receipt.status == 1` → `FillVerdict::Real` +/// - `receipt.status == 0` → `FillVerdict::Ghost` (parses revert reason) +/// - timeout expires with no receipt → `FillVerdict::Timeout` +/// +/// If the RPC is unreachable (`MAX_CONSECUTIVE_RPC_ERRORS` in a row), returns +/// `Err` early so the caller can distinguish "infrastructure broken" from +/// "genuine timeout". Callers should NOT count Err results as ghosts. +pub async fn verify_fill(rpc_url: &str, tx_hash: H256, config: &Config) -> Result { + let provider = Provider::::try_from(rpc_url).context("failed to create RPC provider")?; + + let deadline = Instant::now() + config.verify_timeout; + let mut consecutive_errors: u32 = 0; + + loop { + if Instant::now() >= deadline { + warn!( + ?tx_hash, + "verification timed out — no receipt before deadline" + ); + return Ok(FillVerdict::Timeout { tx_hash }); + } + + match provider.get_transaction_receipt(tx_hash).await { + Ok(Some(receipt)) => { + let status = receipt.status.map(|s| s.as_u64()).unwrap_or(0); + let block = receipt.block_number.map(|b| b.as_u64()).unwrap_or(0); + + if status == 1 { + info!(?tx_hash, block, "fill verified — REAL"); + return Ok(FillVerdict::Real { tx_hash, block }); + } + + // status == 0: transaction reverted — ghost fill + let reason = extract_revert_reason(&provider, tx_hash, &receipt).await; + let counterparty = extract_counterparty(&receipt); + + warn!(?tx_hash, %reason, "fill reverted — GHOST"); + return Ok(FillVerdict::Ghost { + tx_hash, + reason, + counterparty, + }); + } + Ok(None) => { + // Real response — server reachable, tx just not mined yet. + consecutive_errors = 0; + debug!(?tx_hash, "no receipt yet, polling..."); + } + Err(e) => { + consecutive_errors += 1; + let err_str = e.to_string(); + warn!( + ?tx_hash, + error = %err_str, + consecutive_errors, + "RPC error during receipt poll" + ); + + if consecutive_errors >= MAX_CONSECUTIVE_RPC_ERRORS { + return Err(anyhow!( + "RPC unreachable after {consecutive_errors} consecutive errors: {err_str}" + )); + } + } + } + + sleep(config.poll_interval).await; + } +} + +/// Try to extract a human-readable revert reason from a reverted tx. +/// +/// Strategies: +/// 1. Call `eth_call` to replay the tx and get the revert data. +/// 2. Look for known signatures in the revert data. +/// 3. Fall back to "unknown revert". +async fn extract_revert_reason( + provider: &Provider, + tx_hash: H256, + receipt: ðers::types::TransactionReceipt, +) -> String { + // Try to get the original transaction to replay it + let tx = match provider.get_transaction(tx_hash).await { + Ok(Some(tx)) => tx, + _ => return "unknown revert (tx not found)".into(), + }; + + // Replay the transaction via eth_call at the block it was included in + let block = receipt.block_number; + let call_request = ethers::types::transaction::eip2718::TypedTransaction::Legacy( + ethers::types::TransactionRequest { + from: tx.from.into(), + to: tx.to.map(ethers::types::NameOrAddress::Address), + gas: tx.gas.into(), + gas_price: tx.gas_price, + value: tx.value.into(), + data: tx.input.clone().into(), + nonce: None, + chain_id: None, + }, + ); + + match provider + .call_raw(&call_request) + .block(block.unwrap().into()) + .await + { + Err(e) => { + let err_str = e.to_string(); + + // Check for TRANSFER_FROM_FAILED in the error message + if err_str.contains(TRANSFER_FROM_FAILED_SELECTOR) { + return TRANSFER_FROM_FAILED_SELECTOR.to_string(); + } + + // Try to decode standard Solidity Error(string) from revert data + if let Some(reason) = parse_solidity_revert(&err_str) { + return reason; + } + + // Check for common patterns in hex revert data + if let Some(hex_data) = extract_hex_revert_data(&err_str) { + if let Some(reason) = decode_revert_bytes(&hex_data) { + return reason; + } + } + + format!("reverted: {}", truncate(&err_str, 200)) + } + Ok(_) => { + // eth_call succeeded — this can happen if state changed between + // the original tx and our replay. Still a ghost fill though. + "reverted on-chain (eth_call replay succeeded — state-dependent revert)".into() + } + } +} + +/// Extract counterparty address from the transaction's `to` field or input data. +fn extract_counterparty(receipt: ðers::types::TransactionReceipt) -> Option
{ + // The `from` in the receipt is the sender (settlement bot). + // For CTF exchange calls, the counterparty is encoded in calldata, + // but as a first pass we return the `to` address (the exchange contract). + // In v2 we can decode the actual taker/maker from calldata. + receipt.to +} + +/// Try to parse Solidity's `Error(string)` from an error message. +fn parse_solidity_revert(err: &str) -> Option { + // The standard revert selector is 0x08c379a0 + if let Some(idx) = err.find("08c379a0") { + let hex_start = idx + 8; // skip selector + let remaining = &err[hex_start..]; + // Extract hex chars + let hex: String = remaining + .chars() + .take_while(|c| c.is_ascii_hexdigit()) + .collect(); + if hex.len() >= 128 { + // offset (32 bytes) + length (32 bytes) + string data + let len_hex = &hex[64..128]; + if let Ok(len) = usize::from_str_radix(len_hex, 16) { + let str_hex = &hex[128..128 + (len * 2).min(hex.len() - 128)]; + if let Ok(bytes) = hex::decode(str_hex) { + if let Ok(s) = String::from_utf8(bytes) { + return Some(s); + } + } + } + } + } + None +} + +/// Extract hex-encoded revert data from a provider error string. +fn extract_hex_revert_data(err: &str) -> Option { + // Many providers return revert data as 0x-prefixed hex + if let Some(idx) = err.find("0x") { + let hex: String = err[idx + 2..] + .chars() + .take_while(|c| c.is_ascii_hexdigit()) + .collect(); + if hex.len() >= 8 { + return Some(hex); + } + } + None +} + +/// Decode raw revert bytes looking for known signatures. +fn decode_revert_bytes(hex_data: &str) -> Option { + // Check for known custom error selectors + if hex_data.len() >= 8 { + let _selector = &hex_data[..8]; + // Common Polymarket CTF exchange errors can be matched here + // Add known selectors as discovered + } + + // Check if the entire revert data decodes to ASCII + if let Ok(bytes) = hex::decode(hex_data) { + let ascii: String = bytes + .iter() + .filter(|b| b.is_ascii_graphic() || b.is_ascii_whitespace()) + .map(|b| *b as char) + .collect(); + if ascii.len() > 4 { + return Some(ascii.trim().to_string()); + } + } + + None +} + +fn truncate(s: &str, max: usize) -> &str { + if s.len() <= max { + s + } else { + &s[..max] + } +} + +fn now_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +/// Send a rate-limited [SYS] message to the TUI when the RPC becomes +/// unreachable. Only fires once every `RPC_ERROR_NOTICE_INTERVAL_SECS`. +fn maybe_notify_rpc_failure(ctx: &DetectionContext, err: &str) { + let tx = match &ctx.tui_tx { + Some(tx) => tx, + None => return, + }; + + let now = now_secs(); + let last = ctx.last_rpc_notice.load(Ordering::Relaxed); + if now.saturating_sub(last) < RPC_ERROR_NOTICE_INTERVAL_SECS { + return; // still within rate-limit window + } + + // Try to claim the window; if someone else already did, skip. + if ctx + .last_rpc_notice + .compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed) + .is_err() + { + return; + } + + let short = err.chars().take(160).collect::(); + let _ = tx.send(TuiEvent::Status(format!( + "RPC unreachable: {short} — try --rpc https://polygon.drpc.org" + ))); +} + +/// Run the full verify → log → dispatch pipeline for a single fill. +/// +/// This is the orchestrator that `lib.rs` spawns per `ClobEvent::Fill`. +/// Runs `verify_fill`, appends to the verdict JSONL log (if configured), +/// POSTs to the webhook (if configured), and fires registered callbacks. +pub async fn handle_fill(ctx: DetectionContext, fill: ClobFill) { + let config = &ctx.config; + + let started = Instant::now(); + let verdict = match verify_fill(&config.rpc_url, fill.tx_hash, config).await { + Ok(v) => v, + Err(e) => { + // RPC unreachable — this is an infrastructure problem, NOT a ghost + // fill. Don't emit a verdict. Instead, send a rate-limited status + // message to the TUI so the user sees what's happening. + error!(tx_hash = ?fill.tx_hash, error = %e, "verification failed"); + maybe_notify_rpc_failure(&ctx, &e.to_string()); + return; + } + }; + let latency_ms = started.elapsed().as_millis() as u64; + + // TUI event + if let Some(ref tx) = ctx.tui_tx { + let _ = tx.send(TuiEvent::Verdict { + verdict: verdict.clone(), + fill: fill.clone(), + latency_ms, + }); + } + + // Webhook dispatch (verdict level) + if let Some(ref url) = config.webhook_url { + if let Err(e) = callback::send_webhook(url, &verdict).await { + warn!(error = %e, "webhook dispatch failed"); + } + } + + // JSONL audit log + if let Some(ref log) = ctx.verdict_log { + if let Err(e) = log.append(&build_verdict_log_line(&verdict, &fill)).await { + warn!(error = %e, "failed to append verdict log"); + } + } + + // Dispatch to registered callbacks + match &verdict { + FillVerdict::Real { .. } => { + for cb in ctx.on_real.iter() { + cb(verdict.clone()); + } + } + FillVerdict::Ghost { + tx_hash, + reason, + counterparty, + } => { + let event = GhostFillEvent { + tx_hash: *tx_hash, + market: fill.market.clone(), + side: fill.side.clone(), + size: fill.size, + price: fill.price, + counterparty: *counterparty, + reason: reason.clone(), + timestamp: now_secs(), + }; + + if let Some(ref url) = config.webhook_url { + if let Err(e) = callback::send_ghost_event_webhook(url, &event).await { + warn!(error = %e, "ghost event webhook failed"); + } + } + + for cb in ctx.on_ghost.iter() { + cb(event.clone()); + } + } + FillVerdict::Timeout { tx_hash } => { + let event = GhostFillEvent { + tx_hash: *tx_hash, + market: fill.market.clone(), + side: fill.side.clone(), + size: fill.size, + price: fill.price, + counterparty: None, + reason: "timeout — no receipt".into(), + timestamp: now_secs(), + }; + + for cb in ctx.on_ghost.iter() { + cb(event.clone()); + } + } + } +} + +/// Build the JSONL audit log line for a verdict. +fn build_verdict_log_line(verdict: &FillVerdict, fill: &ClobFill) -> serde_json::Value { + let ts = now_secs(); + match verdict { + FillVerdict::Real { tx_hash, block } => serde_json::json!({ + "ts": ts, + "kind": "verdict", + "verdict": "Real", + "tx": format!("{:?}", tx_hash), + "market": fill.market, + "block": block, + "size": fill.size, + "price": fill.price, + }), + FillVerdict::Ghost { + tx_hash, + reason, + counterparty, + } => serde_json::json!({ + "ts": ts, + "kind": "verdict", + "verdict": "Ghost", + "tx": format!("{:?}", tx_hash), + "market": fill.market, + "reason": reason, + "counterparty": counterparty.map(|a| format!("{a:?}")), + "size": fill.size, + "price": fill.price, + }), + FillVerdict::Timeout { tx_hash } => serde_json::json!({ + "ts": ts, + "kind": "verdict", + "verdict": "Timeout", + "tx": format!("{:?}", tx_hash), + "market": fill.market, + "size": fill.size, + "price": fill.price, + }), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::Config; + use std::time::Duration; + + /// Test with a known ghost fill tx on Polygon mainnet. + /// This tx has status=0 and reverted with TRANSFER_FROM_FAILED. + /// + /// Run with: cargo test -- --ignored test_known_ghost_fill + #[tokio::test] + #[ignore = "requires Polygon RPC access"] + async fn test_known_ghost_fill() { + let config = Config { + rpc_url: "https://polygon-rpc.com".into(), + verify_timeout: Duration::from_secs(30), + poll_interval: Duration::from_millis(500), + ..Default::default() + }; + + let tx_hash: H256 = "0x9e3230abde0f569da87511a6f8823076f7b211bb00d10689db3b7c50d6652df0" + .parse() + .unwrap(); + + let verdict = verify_fill(&config.rpc_url, tx_hash, &config) + .await + .unwrap(); + println!("Verdict: {verdict:?}"); + + assert!(verdict.is_ghost(), "expected ghost fill, got: {verdict:?}"); + } +} diff --git a/src/lib.rs b/src/lib.rs index f49dacd..c99b201 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,53 +1,37 @@ +pub mod api; pub mod callback; +pub mod config; +pub mod defense; +pub mod detection; +pub mod logging; +pub mod predictive; +pub mod tui; pub mod types; -pub mod verifier; pub mod ws; -pub use types::{Config, FillVerdict, GhostFillEvent}; -pub use verifier::verify_fill; +pub use detection::verify_fill; +pub use predictive::Predictor; +pub use types::{Config, FillVerdict, GhostFillEvent, PredictiveWarning}; use anyhow::Result; +use std::sync::atomic::AtomicU64; use std::sync::Arc; use tokio::sync::mpsc; use tracing::{error, info, warn}; -use crate::ws::ClobFill; +use crate::detection::{DetectionContext, GhostCallback, VerdictCallback}; +use crate::logging::JsonlWriter; +use crate::tui::TuiEvent; +use crate::ws::ClobEvent; -type VerdictCallback = Box; -type GhostCallback = Box; +type WarningCallback = Arc; /// Main GhostGuard SDK handle. -/// -/// # Example -/// -/// ```no_run -/// use ghostguard::{GhostGuard, Config}; -/// -/// # async fn run() -> anyhow::Result<()> { -/// let config = Config { -/// rpc_url: "https://polygon-rpc.com".into(), -/// clob_ws_url: "wss://ws-subscriptions-clob.polymarket.com/ws/market".into(), -/// ..Default::default() -/// }; -/// -/// let mut guard = GhostGuard::new(config); -/// -/// guard.on_real_fill(|verdict| { -/// println!("Real fill: {:?}", verdict.tx_hash()); -/// }); -/// -/// guard.on_ghost_fill(|event| { -/// eprintln!("GHOST: {} — {}", event.tx_hash, event.reason); -/// }); -/// -/// guard.start().await?; -/// # Ok(()) -/// # } -/// ``` pub struct GhostGuard { config: Config, - on_real: Vec>, - on_ghost: Vec>, + on_real: Vec, + on_ghost: Vec, + on_warning: Vec, } impl GhostGuard { @@ -56,125 +40,219 @@ impl GhostGuard { config, on_real: Vec::new(), on_ghost: Vec::new(), + on_warning: Vec::new(), } } - /// Register a callback for confirmed real fills. pub fn on_real_fill(&mut self, f: F) where F: Fn(FillVerdict) + Send + Sync + 'static, { - self.on_real.push(Arc::new(Box::new(f))); + self.on_real.push(Arc::new(f)); } - /// Register a callback for detected ghost fills. pub fn on_ghost_fill(&mut self, f: F) where F: Fn(GhostFillEvent) + Send + Sync + 'static, { - self.on_ghost.push(Arc::new(Box::new(f))); + self.on_ghost.push(Arc::new(f)); } - /// Start listening to the CLOB websocket, verifying each fill on-chain, - /// and dispatching to registered callbacks. + pub fn on_predictive_warning(&mut self, f: F) + where + F: Fn(PredictiveWarning) + Send + Sync + 'static, + { + self.on_warning.push(Arc::new(f)); + } + + /// Start the sidecar. Returns cleanly on SIGINT or (if `tui_mode`) when + /// the user presses `q` in the dashboard. pub async fn start(self) -> Result<()> { - let (fill_tx, mut fill_rx) = mpsc::channel::(256); + let (event_tx, event_rx) = mpsc::channel::(256); let ws_url = self.config.clob_ws_url.clone(); let markets = self.config.markets.clone(); let ws_handle = tokio::spawn(async move { - if let Err(e) = ws::listen_clob_fills(&ws_url, &markets, fill_tx).await { + if let Err(e) = ws::listen_clob_events(&ws_url, &markets, event_tx).await { error!(error = %e, "CLOB websocket listener failed"); } }); let config = Arc::new(self.config); - let on_real = Arc::new(self.on_real); - let on_ghost = Arc::new(self.on_ghost); - - info!("GhostGuard started — listening for fills"); - - while let Some(fill) = fill_rx.recv().await { - let config = Arc::clone(&config); - let on_real = Arc::clone(&on_real); - let on_ghost = Arc::clone(&on_ghost); - - tokio::spawn(async move { - match verify_fill(&config.rpc_url, fill.tx_hash, &config).await { - Ok(verdict) => { - // Dispatch webhook if configured - if let Some(ref url) = config.webhook_url { - if let Err(e) = callback::send_webhook(url, &verdict).await { - warn!(error = %e, "webhook dispatch failed"); - } - } - match &verdict { - FillVerdict::Real { .. } => { - for cb in on_real.iter() { - cb(verdict.clone()); - } - } - FillVerdict::Ghost { - tx_hash, - reason, - counterparty, - } => { - let event = GhostFillEvent { - tx_hash: *tx_hash, - market: fill.market.clone(), - side: fill.side.clone(), - size: fill.size, - price: fill.price, - counterparty: *counterparty, - reason: reason.clone(), - timestamp: std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs(), - }; - - if let Some(ref url) = config.webhook_url { - if let Err(e) = - callback::send_ghost_event_webhook(url, &event).await - { - warn!(error = %e, "ghost event webhook failed"); - } - } - - for cb in on_ghost.iter() { - cb(event.clone()); - } - } - FillVerdict::Timeout { tx_hash } => { - let event = GhostFillEvent { - tx_hash: *tx_hash, - market: fill.market.clone(), - side: fill.side.clone(), - size: fill.size, - price: fill.price, - counterparty: None, - reason: "timeout — no receipt".into(), - timestamp: std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs(), - }; - - for cb in on_ghost.iter() { - cb(event.clone()); - } - } - } - } - Err(e) => { - error!(tx_hash = ?fill.tx_hash, error = %e, "verification failed"); + // JSONL loggers (skipped if path is empty). + let verdict_log = JsonlWriter::maybe_open(&config.verdict_log).await?; + let predictive_log = JsonlWriter::maybe_open(&config.predictive_log).await?; + + if let Some(ref log) = verdict_log { + info!(path = ?log.path(), "verdict log opened"); + } + if let Some(ref log) = predictive_log { + info!(path = ?log.path(), "predictive log opened"); + } + + // TUI channel (only created when tui_mode is on). + let (tui_tx, tui_rx) = if config.tui_mode { + let (tx, rx) = mpsc::unbounded_channel::(); + // Seed the TUI with initial config metadata. + let _ = tx.send(TuiEvent::Config { + markets: config.markets.clone(), + rpc_url: config.rpc_url.clone(), + }); + (Some(tx), Some(rx)) + } else { + (None, None) + }; + + // Detection dispatch context. + let det_ctx = DetectionContext { + config: Arc::clone(&config), + verdict_log, + on_real: Arc::new(self.on_real), + on_ghost: Arc::new(self.on_ghost), + tui_tx: tui_tx.clone(), + last_rpc_notice: Arc::new(AtomicU64::new(0)), + }; + + // Predictive scorer. + let predictor = if config.predictive_enabled { + let mut p = Predictor::new( + config.predictive_threshold, + config.avg_window, + predictive_log, + ); + if let Some(ref tx) = tui_tx { + p = p.with_tui(tx.clone()); + } + Some(Arc::new(p)) + } else { + None + }; + + let on_warning = Arc::new(self.on_warning); + + if config.tui_mode { + info!("GhostGuard started — TUI mode"); + } else if config.predictive_enabled { + info!( + threshold = config.predictive_threshold, + window = config.avg_window, + "GhostGuard started — predictive detection ENABLED" + ); + } else { + info!("GhostGuard started — listening for fills"); + } + + // Spawn TUI render task if in TUI mode. + let tui_handle = tui_rx.map(|rx| tokio::spawn(async move { tui::run_tui(rx).await })); + + // Main dispatch loop. + let event_loop_fut = run_event_loop(event_rx, det_ctx, predictor, on_warning, tui_tx); + + // Race event loop against: TUI exit (user pressed q), ctrl-c. + if let Some(handle) = tui_handle { + tokio::select! { + _ = event_loop_fut => { + info!("event loop exited (channel closed)"); + } + res = handle => { + match res { + Ok(Ok(())) => info!("TUI exited cleanly"), + Ok(Err(e)) => error!(error = %e, "TUI exited with error"), + Err(e) => error!(error = %e, "TUI task panicked"), } } - }); + _ = tokio::signal::ctrl_c() => { + info!("SIGINT received, shutting down..."); + } + } + } else { + tokio::select! { + _ = event_loop_fut => { + info!("event loop exited (channel closed)"); + } + _ = tokio::signal::ctrl_c() => { + info!("SIGINT received, shutting down..."); + } + } } ws_handle.abort(); Ok(()) } } + +async fn run_event_loop( + mut event_rx: mpsc::Receiver, + det_ctx: DetectionContext, + predictor: Option>, + on_warning: Arc>, + tui_tx: Option>, +) { + while let Some(event) = event_rx.recv().await { + match event { + ClobEvent::Connected => { + if let Some(ref tx) = tui_tx { + let _ = tx.send(TuiEvent::WsConnected); + } + } + ClobEvent::Disconnected => { + if let Some(ref tx) = tui_tx { + let _ = tx.send(TuiEvent::WsDisconnected); + } + } + ClobEvent::Status(msg) => { + if let Some(ref tx) = tui_tx { + let _ = tx.send(TuiEvent::Status(msg)); + } + } + ClobEvent::PriceUpdate(p) => { + if let Some(ref predictor) = predictor { + predictor + .ingest_price(&p.market, p.best_bid, p.best_ask) + .await; + } + if let Some(ref tx) = tui_tx { + let mid = (p.best_bid + p.best_ask) / 2.0; + let _ = tx.send(TuiEvent::PriceUpdate { + market: p.market, + mid, + }); + } + } + ClobEvent::Fill(fill) => { + // Always surface the trade to the TUI immediately. + if let Some(ref tx) = tui_tx { + let _ = tx.send(TuiEvent::Trade(fill.clone())); + } + + // Phase 2: predictive scoring (fast, non-blocking). + if let Some(ref predictor) = predictor { + let predictor = Arc::clone(predictor); + let on_warning = Arc::clone(&on_warning); + let fill_p = fill.clone(); + tokio::spawn(async move { + if let Some(warning) = predictor.score_fill(&fill_p).await { + for cb in on_warning.iter() { + cb(warning.clone()); + } + } + }); + } + + // Phase 1: on-chain verification (slow — 500ms to 10s). + // Skip when tx_hash is zero (Polymarket's market channel does + // not deliver settlement tx hashes; that path comes in Phase 3 + // via trade-history polling). + if !fill.tx_hash.is_zero() { + let det_ctx = det_ctx.clone(); + tokio::spawn(async move { + detection::handle_fill(det_ctx, fill).await; + }); + } + } + } + } + + warn!("event channel closed"); +} diff --git a/src/logging.rs b/src/logging.rs new file mode 100644 index 0000000..6596e89 --- /dev/null +++ b/src/logging.rs @@ -0,0 +1,145 @@ +//! JSONL audit log writer. +//! +//! Append-only, one JSON object per line, flushed after every write. +//! Intended for durable forensic trails of verdicts and predictive warnings. + +use anyhow::{Context, Result}; +use serde::Serialize; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::fs::{create_dir_all, OpenOptions}; +use tokio::io::{AsyncWriteExt, BufWriter}; +use tokio::sync::Mutex; + +/// Thread-safe JSONL appender. +/// +/// Writes are serialized through a `Mutex>` and flushed +/// after every line so that crashes do not lose recent entries. +pub struct JsonlWriter { + path: PathBuf, + inner: Mutex>, +} + +impl JsonlWriter { + /// Open a JSONL file for appending. Creates the parent directory if needed. + pub async fn open(path: impl AsRef) -> Result { + let path = path.as_ref().to_path_buf(); + + if let Some(parent) = path.parent() { + if !parent.as_os_str().is_empty() { + create_dir_all(parent) + .await + .with_context(|| format!("create_dir_all {parent:?}"))?; + } + } + + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .await + .with_context(|| format!("open {path:?}"))?; + + Ok(Self { + path, + inner: Mutex::new(BufWriter::new(file)), + }) + } + + /// Optional constructor — returns None when `path` is empty. + /// Useful so callers can conditionally disable logging via config. + pub async fn maybe_open(path: &str) -> Result>> { + if path.is_empty() { + return Ok(None); + } + Ok(Some(Arc::new(Self::open(path).await?))) + } + + /// Serialize `entry` and append it as a single line. Flushes on every call. + pub async fn append(&self, entry: &T) -> Result<()> { + let mut line = serde_json::to_vec(entry)?; + line.push(b'\n'); + + let mut guard = self.inner.lock().await; + guard.write_all(&line).await?; + guard.flush().await?; + Ok(()) + } + + pub fn path(&self) -> &Path { + &self.path + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde::Deserialize; + use tempfile::tempdir; + + #[derive(Serialize, Deserialize, PartialEq, Debug)] + struct Entry { + id: u64, + msg: String, + } + + #[tokio::test] + async fn test_jsonl_append_roundtrip() { + let dir = tempdir().unwrap(); + let path = dir.path().join("sub/out.jsonl"); + + let writer = JsonlWriter::open(&path).await.unwrap(); + writer + .append(&Entry { + id: 1, + msg: "one".into(), + }) + .await + .unwrap(); + writer + .append(&Entry { + id: 2, + msg: "two".into(), + }) + .await + .unwrap(); + drop(writer); + + let contents = tokio::fs::read_to_string(&path).await.unwrap(); + let lines: Vec<&str> = contents.lines().collect(); + assert_eq!(lines.len(), 2); + + let e0: Entry = serde_json::from_str(lines[0]).unwrap(); + let e1: Entry = serde_json::from_str(lines[1]).unwrap(); + assert_eq!( + e0, + Entry { + id: 1, + msg: "one".into() + } + ); + assert_eq!( + e1, + Entry { + id: 2, + msg: "two".into() + } + ); + } + + #[tokio::test] + async fn test_maybe_open_empty_path() { + let result = JsonlWriter::maybe_open("").await.unwrap(); + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_maybe_open_real_path() { + let dir = tempdir().unwrap(); + let path = dir.path().join("out.jsonl"); + let result = JsonlWriter::maybe_open(path.to_str().unwrap()) + .await + .unwrap(); + assert!(result.is_some()); + } +} diff --git a/src/predictive.rs b/src/predictive.rs new file mode 100644 index 0000000..55a2132 --- /dev/null +++ b/src/predictive.rs @@ -0,0 +1,354 @@ +//! Phase 2: predictive ghost fill detection. +//! +//! Scores fills at the instant of CLOB match, BEFORE on-chain confirmation. +//! Uses two anomaly factors: +//! +//! 1. Price deviation — `|trade_price - mid_price| / mid_price` +//! 2. Size anomaly — `trade_size / avg_trade_size` over rolling window +//! +//! Combined score: +//! +//! ```text +//! score = clamp(price_deviation * 5.0 + log2(size_anomaly) * 0.3, 0.0, 1.0) +//! ``` +//! +//! Cold-start handling: if mid price is unknown or fewer than 5 trades are +//! recorded for a market, no warning is emitted. + +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::{mpsc, RwLock}; +use tracing::debug; + +use crate::logging::JsonlWriter; +use crate::tui::TuiEvent; +use crate::types::PredictiveWarning; +use crate::ws::ClobFill; + +/// Minimum number of fills required before size_anomaly is meaningful. +const MIN_SAMPLES: usize = 5; + +/// Per-market rolling state. +#[derive(Debug, Clone)] +pub struct MarketStats { + pub mid_price: Option, + fill_sizes: VecDeque, + window_cap: usize, + running_sum: f64, +} + +impl MarketStats { + pub fn new(window_cap: usize) -> Self { + Self { + mid_price: None, + fill_sizes: VecDeque::with_capacity(window_cap.max(1)), + window_cap: window_cap.max(1), + running_sum: 0.0, + } + } + + /// Update mid price from best bid / best ask. Ignores non-positive values. + pub fn update_mid(&mut self, best_bid: f64, best_ask: f64) { + if best_bid > 0.0 && best_ask > 0.0 { + self.mid_price = Some((best_bid + best_ask) / 2.0); + } + } + + /// Record a fill size into the rolling window. + pub fn record_size(&mut self, size: f64) { + if size <= 0.0 { + return; + } + if self.fill_sizes.len() >= self.window_cap { + if let Some(old) = self.fill_sizes.pop_front() { + self.running_sum -= old; + } + } + self.fill_sizes.push_back(size); + self.running_sum += size; + } + + /// Average fill size over the rolling window. + /// Returns `None` if the window holds fewer than `MIN_SAMPLES` trades. + pub fn avg_size(&self) -> Option { + if self.fill_sizes.len() < MIN_SAMPLES { + return None; + } + Some(self.running_sum / self.fill_sizes.len() as f64) + } + + pub fn sample_count(&self) -> usize { + self.fill_sizes.len() + } +} + +pub type SharedStats = Arc>>; + +/// Predictive scorer. Holds shared per-market state. +pub struct Predictor { + stats: SharedStats, + threshold: f64, + window_cap: usize, + warning_log: Option>, + tui_tx: Option>, +} + +impl Predictor { + pub fn new(threshold: f64, window_cap: usize, warning_log: Option>) -> Self { + Self { + stats: Arc::new(RwLock::new(HashMap::new())), + threshold, + window_cap: window_cap.max(1), + warning_log, + tui_tx: None, + } + } + + /// Attach a TUI event sender. Warnings will also be forwarded to it. + pub fn with_tui(mut self, tui_tx: mpsc::UnboundedSender) -> Self { + self.tui_tx = Some(tui_tx); + self + } + + pub fn stats(&self) -> SharedStats { + self.stats.clone() + } + + /// Update mid price for a market. + pub async fn ingest_price(&self, market: &str, best_bid: f64, best_ask: f64) { + let mut map = self.stats.write().await; + let entry = map + .entry(market.to_string()) + .or_insert_with(|| MarketStats::new(self.window_cap)); + entry.update_mid(best_bid, best_ask); + } + + /// Score a fill. Returns `Some(warning)` if `score >= threshold`. + /// + /// The fill's own size is recorded into stats AFTER scoring so it can't + /// influence its own anomaly score. Returns `None` (and does not record) + /// when the trade has invalid data. + pub async fn score_fill(&self, fill: &ClobFill) -> Option { + if fill.size <= 0.0 || fill.price <= 0.0 { + return None; + } + + // Read current stats under read lock, compute factors. + let (mid, avg) = { + let map = self.stats.read().await; + let snap = map.get(&fill.market); + let mid = snap.and_then(|s| s.mid_price); + let avg = snap.and_then(|s| s.avg_size()); + (mid, avg) + }; + + // Record this fill's size for future scoring, regardless of whether + // we have enough data to score it yet. + { + let mut map = self.stats.write().await; + let entry = map + .entry(fill.market.clone()) + .or_insert_with(|| MarketStats::new(self.window_cap)); + entry.record_size(fill.size); + } + + // Cold start — no warning if we can't compute either factor. + let mid_price = mid?; + let avg_size = avg?; + if mid_price <= 0.0 || avg_size <= 0.0 { + return None; + } + + let price_deviation = (fill.price - mid_price).abs() / mid_price; + let size_anomaly = fill.size / avg_size; + + let raw = price_deviation * 5.0 + size_anomaly.log2() * 0.3; + let score = raw.clamp(0.0, 1.0); + + debug!( + tx = ?fill.tx_hash, + market = %fill.market, + score, + price_deviation, + size_anomaly, + "predictive score computed" + ); + + if score < self.threshold { + return None; + } + + let warning = PredictiveWarning { + tx_hash: fill.tx_hash, + market: fill.market.clone(), + score, + price_deviation, + size_anomaly, + trade_price: fill.price, + mid_price, + trade_size: fill.size, + avg_size, + ts: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(), + }; + + if let Some(ref log) = self.warning_log { + let line = serde_json::json!({ + "ts": warning.ts, + "kind": "predictive", + "tx": format!("{:?}", warning.tx_hash), + "market": warning.market, + "score": warning.score, + "price_dev": warning.price_deviation, + "size_anom": warning.size_anomaly, + "trade_price": warning.trade_price, + "mid_price": warning.mid_price, + "trade_size": warning.trade_size, + "avg_size": warning.avg_size, + }); + if let Err(e) = log.append(&line).await { + tracing::warn!(error = %e, "failed to append predictive warning log"); + } + } + + if let Some(ref tx) = self.tui_tx { + let _ = tx.send(TuiEvent::Warning(warning.clone())); + } + + Some(warning) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ethers::types::H256; + + fn fill(market: &str, size: f64, price: f64) -> ClobFill { + ClobFill { + tx_hash: H256::zero(), + market: market.into(), + side: "BUY".into(), + size, + price, + } + } + + #[tokio::test] + async fn test_cold_start_no_warning() { + let p = Predictor::new(0.7, 50, None); + // No price, no samples — nothing. + let w = p.score_fill(&fill("m", 100.0, 0.5)).await; + assert!(w.is_none()); + } + + #[tokio::test] + async fn test_mid_price_zero_skipped() { + let p = Predictor::new(0.7, 50, None); + // update_mid ignores bid=0, so mid stays None → no warning. + p.ingest_price("m", 0.0, 0.52).await; + // seed enough samples for avg to exist + for _ in 0..10 { + p.score_fill(&fill("m", 10.0, 0.5)).await; + } + let w = p.score_fill(&fill("m", 1000.0, 0.95)).await; + assert!(w.is_none(), "no mid price → no warning"); + } + + #[tokio::test] + async fn test_normal_fill_no_warning() { + let p = Predictor::new(0.7, 50, None); + p.ingest_price("m", 0.48, 0.52).await; // mid = 0.50 + // Seed rolling window with normal-sized trades. + for _ in 0..10 { + p.score_fill(&fill("m", 15.0, 0.50)).await; + } + // Fill close to mid, size near average → score well below threshold. + let w = p.score_fill(&fill("m", 15.0, 0.51)).await; + assert!(w.is_none(), "normal fill should not warn, got {w:?}"); + } + + #[tokio::test] + async fn test_suspicious_fill_triggers_warning() { + let p = Predictor::new(0.7, 50, None); + p.ingest_price("m", 0.48, 0.52).await; // mid = 0.50 + for _ in 0..10 { + p.score_fill(&fill("m", 15.0, 0.50)).await; + } + // price_deviation = |0.95 - 0.50|/0.50 = 0.90 → contribution = 4.5 + // size_anomaly = 1000/15 ≈ 66.7 → log2≈ 6.06 → contribution ≈ 1.82 + // raw = 4.5 + 1.82 = 6.32 → clamped to 1.0 + let w = p + .score_fill(&fill("m", 1000.0, 0.95)) + .await + .expect("expected warning"); + + assert!((w.score - 1.0).abs() < 1e-9); + assert!(w.price_deviation > 0.85); + assert!(w.size_anomaly > 50.0); + } + + #[tokio::test] + async fn test_below_threshold_no_warning() { + // High threshold → even suspicious fill doesn't warn. + let p = Predictor::new(0.99, 50, None); + p.ingest_price("m", 0.48, 0.52).await; + for _ in 0..10 { + p.score_fill(&fill("m", 15.0, 0.50)).await; + } + // Mildly anomalous: score ≈ 0.1 * 5 + log2(3)*0.3 ≈ 0.975. + // At threshold 0.99 this may or may not trigger — use clearly sub-threshold. + let w = p.score_fill(&fill("m", 16.0, 0.51)).await; + assert!(w.is_none()); + } + + #[tokio::test] + async fn test_small_trade_does_not_inflate_score() { + let p = Predictor::new(0.7, 50, None); + p.ingest_price("m", 0.48, 0.52).await; + for _ in 0..10 { + p.score_fill(&fill("m", 100.0, 0.50)).await; + } + // Small trade (below average): log2(0.1) ≈ -3.32 → contributes negatively. + let w = p.score_fill(&fill("m", 10.0, 0.51)).await; + assert!(w.is_none(), "small trade should not warn"); + } + + #[tokio::test] + async fn test_zero_size_skipped() { + let p = Predictor::new(0.7, 50, None); + p.ingest_price("m", 0.48, 0.52).await; + for _ in 0..10 { + p.score_fill(&fill("m", 15.0, 0.50)).await; + } + let w = p.score_fill(&fill("m", 0.0, 0.95)).await; + assert!(w.is_none(), "zero-size trade should be skipped"); + } + + #[test] + fn test_market_stats_rolling_eviction() { + let mut s = MarketStats::new(3); + s.record_size(10.0); + s.record_size(20.0); + s.record_size(30.0); + s.record_size(40.0); + assert_eq!(s.sample_count(), 3); + // 10 was evicted, avg of 20,30,40 = 30. But avg_size requires >=5 samples. + assert!(s.avg_size().is_none()); + } + + #[test] + fn test_market_stats_avg_requires_min_samples() { + let mut s = MarketStats::new(50); + for v in [1.0, 2.0, 3.0, 4.0] { + s.record_size(v); + } + assert!(s.avg_size().is_none()); + s.record_size(5.0); + let avg = s.avg_size().expect("5 samples enough"); + assert!((avg - 3.0).abs() < 1e-9); + } +} diff --git a/src/tui.rs b/src/tui.rs new file mode 100644 index 0000000..81d8368 --- /dev/null +++ b/src/tui.rs @@ -0,0 +1,838 @@ +//! ratatui + crossterm dashboard for GhostGuard. +//! +//! Enabled via `Config::tui_mode = true` (or `--tui` on the CLI). When active, +//! `GhostGuard::start()` pipes live events into a `TuiEvent` channel that the +//! render loop consumes. No polling of external state — the dashboard is a +//! pure function of the event stream. + +use std::collections::{HashMap, VecDeque}; +use std::io; +use std::time::{Duration, Instant}; + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use crossterm::event::{DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyEventKind}; +use crossterm::terminal::{ + disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen, +}; +use crossterm::{event, execute}; +use ratatui::backend::CrosstermBackend; +use ratatui::layout::{Alignment, Constraint, Direction, Layout, Rect}; +use ratatui::style::{Color, Modifier, Style}; +use ratatui::text::{Line, Span}; +use ratatui::widgets::{Block, Borders, Cell, Paragraph, Row, Table, Wrap}; +use ratatui::{Frame, Terminal}; +use tokio::sync::mpsc; + +use crate::types::{FillVerdict, GhostFillEvent, PredictiveWarning}; +use crate::ws::ClobFill; + +/// Events accepted by the TUI render loop. +#[derive(Debug, Clone)] +pub enum TuiEvent { + /// A trade arrived on the CLOB stream (before any verification). + /// Emitted for every fill so the feed reflects live activity. + Trade(ClobFill), + /// A verdict came back from on-chain verification. + Verdict { + verdict: FillVerdict, + fill: ClobFill, + /// Time from CLOB receive to verdict. + latency_ms: u64, + }, + /// A predictive warning was emitted. + Warning(PredictiveWarning), + /// Mid-price update for a market. + PriceUpdate { + market: String, + mid: f64, + }, + /// WebSocket connection state changed. + WsConnected, + WsDisconnected, + /// Free-form system status message (ws errors, reconnect notices). + Status(String), + /// Periodic metadata push from the main loop (e.g. markets list, rpc URL). + Config { + markets: Vec, + rpc_url: String, + }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnectionStatus { + Connecting, + Connected, + Disconnected, +} + +impl ConnectionStatus { + fn label(&self) -> &'static str { + match self { + ConnectionStatus::Connecting => "connecting", + ConnectionStatus::Connected => "connected", + ConnectionStatus::Disconnected => "disconnected", + } + } + + fn color(&self) -> Color { + match self { + ConnectionStatus::Connecting => Color::Yellow, + ConnectionStatus::Connected => Color::Green, + ConnectionStatus::Disconnected => Color::Red, + } + } +} + +/// Kind of entry in the scrolling live feed. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FeedKind { + Trade, + Real, + Ghost, + Warn, + Auto, + System, +} + +impl FeedKind { + fn tag(&self) -> &'static str { + match self { + FeedKind::Trade => "[TRADE]", + FeedKind::Real => "[REAL]", + FeedKind::Ghost => "[GHOST]", + FeedKind::Warn => "[WARN]", + FeedKind::Auto => "[AUTO]", + FeedKind::System => "[SYS]", + } + } + + fn style(&self) -> Style { + match self { + FeedKind::Trade => Style::default().fg(Color::Blue), + FeedKind::Real => Style::default().fg(Color::Green), + FeedKind::Ghost => Style::default().fg(Color::Red).add_modifier(Modifier::BOLD), + FeedKind::Warn => Style::default().fg(Color::Yellow), + FeedKind::Auto => Style::default().fg(Color::Cyan), + FeedKind::System => Style::default().fg(Color::Magenta), + } + } +} + +#[derive(Debug, Clone)] +pub struct FeedEntry { + pub time: DateTime, + pub kind: FeedKind, + pub tx_short: String, + pub market: String, + pub detail: String, +} + +#[derive(Debug, Default, Clone)] +pub struct DashboardStats { + pub total_verified: u64, + pub total_ghost: u64, + pub total_warnings: u64, + /// Warnings that turned out to be ghosts (of warnings we could correlate). + pub warning_correct: u64, + pub warning_accuracy: f64, + /// Cumulative latency for rolling average. + pub latency_sum_ms: u128, + pub latency_samples: u64, + pub avg_latency_ms: f64, + pub blacklisted_count: u64, +} + +impl DashboardStats { + pub fn ghost_pct(&self) -> f64 { + if self.total_verified == 0 { + 0.0 + } else { + (self.total_ghost as f64 / self.total_verified as f64) * 100.0 + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct MarketRow { + pub slug: String, + pub mid_price: Option, + pub total_size: f64, + pub size_samples: u64, + /// Sliding-window timestamps of fills for rate calculation. + pub fill_times: VecDeque, + pub ghost_count: u64, +} + +impl MarketRow { + pub fn new(slug: String) -> Self { + Self { + slug, + ..Default::default() + } + } + + pub fn avg_size(&self) -> f64 { + if self.size_samples == 0 { + 0.0 + } else { + self.total_size / self.size_samples as f64 + } + } + + /// Fills in the last minute. + pub fn fills_per_min(&self) -> u64 { + self.fill_times.len() as u64 + } + + pub fn ghost_rate(&self) -> f64 { + let n = self.size_samples; + if n == 0 { + 0.0 + } else { + (self.ghost_count as f64 / n as f64) * 100.0 + } + } + + pub fn record_fill(&mut self, size: f64, now: Instant) { + self.total_size += size; + self.size_samples += 1; + self.fill_times.push_back(now); + // Evict anything older than 60s + while let Some(&front) = self.fill_times.front() { + if now.duration_since(front) > Duration::from_secs(60) { + self.fill_times.pop_front(); + } else { + break; + } + } + } +} + +pub struct TuiState { + pub feed: VecDeque, + pub feed_cap: usize, + pub stats: DashboardStats, + pub markets: HashMap, + pub status: ConnectionStatus, + pub paused: bool, + pub start_time: Instant, + pub markets_list: Vec, + pub rpc_url: String, +} + +impl TuiState { + pub fn new() -> Self { + Self { + feed: VecDeque::with_capacity(200), + feed_cap: 200, + stats: DashboardStats::default(), + markets: HashMap::new(), + status: ConnectionStatus::Connecting, + paused: false, + start_time: Instant::now(), + markets_list: Vec::new(), + rpc_url: String::new(), + } + } + + fn push_feed(&mut self, entry: FeedEntry) { + if self.feed.len() >= self.feed_cap { + self.feed.pop_front(); + } + self.feed.push_back(entry); + } + + fn market_mut(&mut self, slug: &str) -> &mut MarketRow { + self.markets + .entry(slug.to_string()) + .or_insert_with(|| MarketRow::new(slug.to_string())) + } + + /// Apply an incoming event to state. + pub fn ingest(&mut self, ev: TuiEvent) { + if self.paused { + return; + } + match ev { + TuiEvent::Trade(fill) => self.on_trade(fill), + TuiEvent::Verdict { + verdict, + fill, + latency_ms, + } => self.on_verdict(verdict, fill, latency_ms), + TuiEvent::Warning(w) => self.on_warning(w), + TuiEvent::PriceUpdate { market, mid } => { + self.market_mut(&market).mid_price = Some(mid); + } + TuiEvent::WsConnected => self.status = ConnectionStatus::Connected, + TuiEvent::WsDisconnected => self.status = ConnectionStatus::Disconnected, + TuiEvent::Status(msg) => { + self.push_feed(FeedEntry { + time: Utc::now(), + kind: FeedKind::System, + tx_short: String::new(), + market: String::new(), + detail: msg, + }); + } + TuiEvent::Config { markets, rpc_url } => { + self.markets_list = markets.clone(); + self.rpc_url = rpc_url; + for m in markets { + self.markets + .entry(m.clone()) + .or_insert_with(|| MarketRow::new(m)); + } + } + } + } + + /// Called for every incoming trade (before any verification). + /// Updates the market row's rolling stats and emits a [TRADE] feed entry. + fn on_trade(&mut self, fill: ClobFill) { + let now = Instant::now(); + let row = self.market_mut(&fill.market); + row.record_fill(fill.size, now); + + let detail = format!( + "side={} size={:.2} price={:.4}", + fill.side, fill.size, fill.price + ); + self.push_feed(FeedEntry { + time: Utc::now(), + kind: FeedKind::Trade, + tx_short: if fill.tx_hash.is_zero() { + String::new() + } else { + short_hex(&format!("{:?}", fill.tx_hash)) + }, + market: fill.market, + detail, + }); + } + + fn on_verdict(&mut self, verdict: FillVerdict, fill: ClobFill, latency_ms: u64) { + self.stats.total_verified += 1; + self.stats.latency_sum_ms += latency_ms as u128; + self.stats.latency_samples += 1; + if self.stats.latency_samples > 0 { + self.stats.avg_latency_ms = + self.stats.latency_sum_ms as f64 / self.stats.latency_samples as f64; + } + + // Note: fill size / rolling window is already tracked by `on_trade`; + // don't double-count here. + + let (kind, detail) = match &verdict { + FillVerdict::Real { block, .. } => (FeedKind::Real, format!("block={block}")), + FillVerdict::Ghost { reason, .. } => { + self.stats.total_ghost += 1; + let row = self.market_mut(&fill.market); + row.ghost_count += 1; + (FeedKind::Ghost, format!("reason={reason}")) + } + FillVerdict::Timeout { .. } => { + self.stats.total_ghost += 1; + let row = self.market_mut(&fill.market); + row.ghost_count += 1; + (FeedKind::Ghost, "reason=timeout".into()) + } + }; + + // Refresh accuracy metric based on correlation. + if self.stats.total_warnings > 0 { + self.stats.warning_accuracy = + (self.stats.warning_correct as f64 / self.stats.total_warnings as f64) * 100.0; + } + + self.push_feed(FeedEntry { + time: Utc::now(), + kind, + tx_short: short_hex(&format!("{:?}", verdict.tx_hash())), + market: fill.market, + detail, + }); + } + + fn on_warning(&mut self, w: PredictiveWarning) { + self.stats.total_warnings += 1; + self.push_feed(FeedEntry { + time: Utc::now(), + kind: FeedKind::Warn, + tx_short: short_hex(&format!("{:?}", w.tx_hash)), + market: w.market, + detail: format!( + "score={:.2} price_dev={:.3} size_anom={:.2}", + w.score, w.price_deviation, w.size_anomaly + ), + }); + } + + pub fn uptime_hhmmss(&self) -> String { + let secs = self.start_time.elapsed().as_secs(); + let h = secs / 3600; + let m = (secs % 3600) / 60; + let s = secs % 60; + format!("{h:02}:{m:02}:{s:02}") + } +} + +impl Default for TuiState { + fn default() -> Self { + Self::new() + } +} + +fn short_hex(h: &str) -> String { + // "0x9e3230ab..." -> "9e3230ab" + let stripped = h.strip_prefix("0x").unwrap_or(h); + stripped.chars().take(8).collect() +} + +// --------------------------------------------------------------------------- +// Run loop +// --------------------------------------------------------------------------- + +/// Run the TUI until the user quits or the event channel closes. +/// +/// The caller must feed `TuiEvent`s into `rx` from their real event sources +/// (detection/predictive/ws). This function owns the terminal while running. +pub async fn run_tui(mut rx: mpsc::UnboundedReceiver) -> Result<()> { + // Set up terminal + enable_raw_mode()?; + let mut stdout = io::stdout(); + execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?; + let backend = CrosstermBackend::new(stdout); + let mut terminal = Terminal::new(backend)?; + + let mut state = TuiState::new(); + let result = event_loop(&mut terminal, &mut state, &mut rx).await; + + // Restore terminal + disable_raw_mode()?; + execute!( + terminal.backend_mut(), + LeaveAlternateScreen, + DisableMouseCapture + )?; + terminal.show_cursor()?; + + result +} + +async fn event_loop( + terminal: &mut Terminal, + state: &mut TuiState, + rx: &mut mpsc::UnboundedReceiver, +) -> Result<()> { + let tick = Duration::from_millis(250); + let mut ticker = tokio::time::interval(tick); + + loop { + // Drain whatever events are immediately available, then render once. + // This keeps CPU low and rendering smooth under bursts. + loop { + match rx.try_recv() { + Ok(ev) => state.ingest(ev), + Err(mpsc::error::TryRecvError::Empty) => break, + Err(mpsc::error::TryRecvError::Disconnected) => return Ok(()), + } + } + + terminal.draw(|f| render(f, state))?; + + tokio::select! { + _ = ticker.tick() => {} + ev = rx.recv() => { + match ev { + Some(e) => state.ingest(e), + None => return Ok(()), + } + } + key = poll_key() => { + match key { + Ok(Some(KeyCode::Char('q'))) | Ok(Some(KeyCode::Esc)) => return Ok(()), + Ok(Some(KeyCode::Char('p'))) => state.paused = !state.paused, + Ok(_) => {} + Err(_) => {} + } + } + } + } +} + +async fn poll_key() -> Result> { + // crossterm poll is sync; run on blocking pool with a short deadline so + // we don't starve other branches of the select. + tokio::task::spawn_blocking(|| { + if event::poll(Duration::from_millis(100))? { + if let Event::Key(key) = event::read()? { + if key.kind == KeyEventKind::Press { + return Ok::<_, anyhow::Error>(Some(key.code)); + } + } + } + Ok(None) + }) + .await + .unwrap_or(Ok(None)) +} + +// --------------------------------------------------------------------------- +// Rendering +// --------------------------------------------------------------------------- + +fn render(f: &mut Frame, state: &TuiState) { + let size = f.area(); + + // Vertical split: header (3) | body (flex) | stats (5) | markets (flex) | footer (1) + let outer = Layout::default() + .direction(Direction::Vertical) + .constraints([ + Constraint::Length(3), + Constraint::Min(5), + Constraint::Length(5), + Constraint::Min(3), + Constraint::Length(1), + ]) + .split(size); + + render_header(f, outer[0], state); + render_feed(f, outer[1], state); + render_stats(f, outer[2], state); + render_markets(f, outer[3], state); + render_footer(f, outer[4], state); +} + +fn render_header(f: &mut Frame, area: Rect, state: &TuiState) { + let title_block = Block::default().borders(Borders::ALL).title(" GhostGuard "); + + let inner = title_block.inner(area); + f.render_widget(title_block, area); + + let rows = Layout::default() + .direction(Direction::Vertical) + .constraints([ + Constraint::Length(1), + Constraint::Length(1), + Constraint::Length(1), + ]) + .split(inner); + + // Line 1: version + uptime + let line1 = Layout::default() + .direction(Direction::Horizontal) + .constraints([Constraint::Percentage(50), Constraint::Percentage(50)]) + .split(rows[0]); + f.render_widget( + Paragraph::new(Line::from(vec![ + Span::styled("GhostGuard ", Style::default().add_modifier(Modifier::BOLD)), + Span::styled("v0.2.0", Style::default().fg(Color::Cyan)), + ])), + line1[0], + ); + f.render_widget( + Paragraph::new(format!("uptime {}", state.uptime_hhmmss())).alignment(Alignment::Right), + line1[1], + ); + + // Line 2: status | markets | ws + let status_text = if state.paused { "PAUSED" } else { "RUNNING" }; + let status_style = if state.paused { + Style::default().fg(Color::Yellow) + } else { + Style::default().fg(Color::Green) + }; + f.render_widget( + Paragraph::new(Line::from(vec![ + Span::raw("Status: "), + Span::styled(status_text, status_style), + Span::raw(format!(" | Markets: {}", state.markets_list.len())), + Span::raw(" | WS: "), + Span::styled( + state.status.label(), + Style::default().fg(state.status.color()), + ), + ])), + rows[1], + ); + + // Line 3: rpc | verified + let rpc_short = if state.rpc_url.len() > 40 { + format!("{}...", &state.rpc_url[..40]) + } else { + state.rpc_url.clone() + }; + f.render_widget( + Paragraph::new(format!( + "RPC: {} | Fills verified: {}", + rpc_short, state.stats.total_verified + )), + rows[2], + ); +} + +fn render_feed(f: &mut Frame, area: Rect, state: &TuiState) { + let block = Block::default().borders(Borders::ALL).title(" Live feed "); + let inner = block.inner(area); + f.render_widget(block, area); + + // Show as many as fit, newest at bottom + let visible = inner.height as usize; + let start = state.feed.len().saturating_sub(visible); + let lines: Vec = state + .feed + .iter() + .skip(start) + .map(|e| { + let time = e.time.format("%H:%M:%S").to_string(); + let tail = if matches!(e.kind, FeedKind::System) { + format!(" {}", e.detail) + } else { + format!(" tx=0x{}.. market={} {}", e.tx_short, e.market, e.detail) + }; + Line::from(vec![ + Span::raw(format!("{time} ")), + Span::styled(e.kind.tag(), e.kind.style()), + Span::raw(tail), + ]) + }) + .collect(); + + f.render_widget(Paragraph::new(lines).wrap(Wrap { trim: false }), inner); +} + +fn render_stats(f: &mut Frame, area: Rect, state: &TuiState) { + let block = Block::default().borders(Borders::ALL).title(" Stats "); + let inner = block.inner(area); + f.render_widget(block, area); + + let s = &state.stats; + let lines = vec![ + Line::from(format!( + "Verified: {} Ghost: {} ({:.1}%) Avg latency: {:.2}s", + s.total_verified, + s.total_ghost, + s.ghost_pct(), + s.avg_latency_ms / 1000.0, + )), + Line::from(format!( + "Predictive warnings: {} Accuracy: {:.1}%", + s.total_warnings, s.warning_accuracy, + )), + Line::from(format!("Blacklisted addresses: {}", s.blacklisted_count)), + ]; + f.render_widget(Paragraph::new(lines), inner); +} + +fn render_markets(f: &mut Frame, area: Rect, state: &TuiState) { + let block = Block::default().borders(Borders::ALL).title(" Markets "); + let inner = block.inner(area); + f.render_widget(block, area); + + let header = Row::new(vec![ + Cell::from("market"), + Cell::from("mid"), + Cell::from("avg size"), + Cell::from("fills/min"), + Cell::from("ghost %"), + ]) + .style(Style::default().add_modifier(Modifier::BOLD)); + + let mut rows: Vec<&MarketRow> = state.markets.values().collect(); + rows.sort_by(|a, b| a.slug.cmp(&b.slug)); + + let body: Vec = rows + .iter() + .map(|m| { + let ghost = m.ghost_rate(); + let slug = if m.slug.len() > 20 { + format!("{}..", &m.slug[..18]) + } else { + m.slug.clone() + }; + let mid = m + .mid_price + .map(|p| format!("{p:.4}")) + .unwrap_or_else(|| "-".into()); + let mut row = Row::new(vec![ + Cell::from(slug), + Cell::from(mid), + Cell::from(format!("{:.2}", m.avg_size())), + Cell::from(format!("{}", m.fills_per_min())), + Cell::from(format!("{ghost:.1}")), + ]); + if ghost > 5.0 { + row = row.style(Style::default().fg(Color::Red).add_modifier(Modifier::BOLD)); + } + row + }) + .collect(); + + let widths = [ + Constraint::Percentage(40), + Constraint::Percentage(15), + Constraint::Percentage(15), + Constraint::Percentage(15), + Constraint::Percentage(15), + ]; + let table = Table::new(body, widths).header(header); + f.render_widget(table, inner); +} + +fn render_footer(f: &mut Frame, area: Rect, _state: &TuiState) { + let help = Line::from(vec![ + Span::styled("[q]", Style::default().fg(Color::Cyan)), + Span::raw("uit "), + Span::styled("[p]", Style::default().fg(Color::Cyan)), + Span::raw("ause "), + Span::styled("[b]", Style::default().fg(Color::DarkGray)), + Span::styled("lacklist (Phase 3) ", Style::default().fg(Color::DarkGray)), + Span::styled("[m]", Style::default().fg(Color::DarkGray)), + Span::styled("arkets (Phase 3)", Style::default().fg(Color::DarkGray)), + ]); + f.render_widget(Paragraph::new(help), area); +} + +// --------------------------------------------------------------------------- +// Helper: convert GhostFillEvent into a ClobFill for feed purposes +// --------------------------------------------------------------------------- + +/// Convenience for upstream dispatchers that only have a `GhostFillEvent`. +pub fn ghost_event_to_fill(ev: &GhostFillEvent) -> ClobFill { + ClobFill { + tx_hash: ev.tx_hash, + market: ev.market.clone(), + side: ev.side.clone(), + size: ev.size, + price: ev.price, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ethers::types::H256; + + fn fill(market: &str, size: f64) -> ClobFill { + ClobFill { + tx_hash: H256::zero(), + market: market.into(), + side: "BUY".into(), + size, + price: 0.5, + } + } + + #[test] + fn test_feed_cap_bounded() { + let mut s = TuiState::new(); + s.feed_cap = 5; + for i in 0..20 { + s.ingest(TuiEvent::Verdict { + verdict: FillVerdict::Real { + tx_hash: H256::zero(), + block: i, + }, + fill: fill("m", 10.0), + latency_ms: 100, + }); + } + assert_eq!(s.feed.len(), 5); + } + + #[test] + fn test_stats_counters() { + let mut s = TuiState::new(); + // One real, one ghost + s.ingest(TuiEvent::Verdict { + verdict: FillVerdict::Real { + tx_hash: H256::zero(), + block: 1, + }, + fill: fill("m1", 10.0), + latency_ms: 200, + }); + s.ingest(TuiEvent::Verdict { + verdict: FillVerdict::Ghost { + tx_hash: H256::zero(), + reason: "TRANSFER_FROM_FAILED".into(), + counterparty: None, + }, + fill: fill("m1", 20.0), + latency_ms: 600, + }); + assert_eq!(s.stats.total_verified, 2); + assert_eq!(s.stats.total_ghost, 1); + assert!((s.stats.ghost_pct() - 50.0).abs() < 1e-9); + assert!((s.stats.avg_latency_ms - 400.0).abs() < 1e-9); + } + + #[test] + fn test_market_row_fills_per_min_window() { + let mut m = MarketRow::new("x".into()); + let t0 = Instant::now(); + m.record_fill(10.0, t0); + m.record_fill(20.0, t0); + assert_eq!(m.fills_per_min(), 2); + assert_eq!(m.size_samples, 2); + assert!((m.avg_size() - 15.0).abs() < 1e-9); + } + + #[test] + fn test_pause_blocks_ingest() { + let mut s = TuiState::new(); + s.paused = true; + s.ingest(TuiEvent::Verdict { + verdict: FillVerdict::Real { + tx_hash: H256::zero(), + block: 1, + }, + fill: fill("m", 10.0), + latency_ms: 100, + }); + assert_eq!(s.stats.total_verified, 0); + assert!(s.feed.is_empty()); + } + + #[test] + fn test_ws_status_updates() { + let mut s = TuiState::new(); + assert_eq!(s.status, ConnectionStatus::Connecting); + s.ingest(TuiEvent::WsConnected); + assert_eq!(s.status, ConnectionStatus::Connected); + s.ingest(TuiEvent::WsDisconnected); + assert_eq!(s.status, ConnectionStatus::Disconnected); + } + + #[test] + fn test_price_update_creates_market() { + let mut s = TuiState::new(); + s.ingest(TuiEvent::PriceUpdate { + market: "new-market".into(), + mid: 0.75, + }); + let m = s.markets.get("new-market").unwrap(); + assert_eq!(m.mid_price, Some(0.75)); + } + + #[test] + fn test_short_hex() { + assert_eq!(short_hex("0x9e3230abcdef"), "9e3230ab"); + assert_eq!(short_hex("9e3230abcdef"), "9e3230ab"); + assert_eq!(short_hex("0xabc"), "abc"); + } + + #[test] + fn test_ghost_rate_flag_threshold() { + // Sanity: >5% should be flagged by render (can only check the rate value) + let mut m = MarketRow::new("m".into()); + let now = Instant::now(); + for _ in 0..100 { + m.record_fill(1.0, now); + } + m.ghost_count = 10; + assert!((m.ghost_rate() - 10.0).abs() < 1e-9); + } +} diff --git a/src/types.rs b/src/types.rs index 76045ba..baa24a2 100644 --- a/src/types.rs +++ b/src/types.rs @@ -17,6 +17,24 @@ pub struct Config { pub webhook_url: Option, /// List of market IDs to monitor. If empty, subscribes to user channel. pub markets: Vec, + + // ---- Phase 2: predictive detection ---- + /// Enable predictive ghost fill scoring. + pub predictive_enabled: bool, + /// Risk threshold above which a warning is emitted. + pub predictive_threshold: f64, + /// Rolling window size for per-market average trade size. + pub avg_window: usize, + + // ---- Phase 1 refinements: logging ---- + /// Path to JSONL verdict log. Empty = disabled. + pub verdict_log: String, + /// Path to JSONL predictive warning log. Empty = disabled. + pub predictive_log: String, + + // ---- UI ---- + /// Launch the ratatui dashboard instead of plain stdout output. + pub tui_mode: bool, } impl Default for Config { @@ -28,6 +46,12 @@ impl Default for Config { poll_interval: Duration::from_millis(500), webhook_url: None, markets: vec![], + predictive_enabled: false, + predictive_threshold: 0.7, + avg_window: 50, + verdict_log: "data/verdicts.jsonl".into(), + predictive_log: "data/predictive_warnings.jsonl".into(), + tui_mode: false, } } } @@ -78,6 +102,24 @@ pub struct GhostFillEvent { pub timestamp: u64, } +/// Predictive warning emitted BEFORE chain confirmation. +/// +/// Produced by `predictive::Predictor::score_fill()` when the computed +/// risk score exceeds `Config::predictive_threshold`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PredictiveWarning { + pub tx_hash: H256, + pub market: String, + pub score: f64, + pub price_deviation: f64, + pub size_anomaly: f64, + pub trade_price: f64, + pub mid_price: f64, + pub trade_size: f64, + pub avg_size: f64, + pub ts: u64, +} + /// Known Polymarket contract addresses on Polygon. pub mod contracts { use ethers::types::Address; diff --git a/src/verifier.rs b/src/verifier.rs deleted file mode 100644 index 3189937..0000000 --- a/src/verifier.rs +++ /dev/null @@ -1,240 +0,0 @@ -use anyhow::{Context, Result}; -use ethers::providers::{Http, Middleware, Provider, RawCall}; -use ethers::types::{Address, H256}; -use tokio::time::{sleep, Instant}; -use tracing::{debug, info, warn}; - -use crate::types::{Config, FillVerdict, TRANSFER_FROM_FAILED_SELECTOR}; - -/// Verify a single fill transaction on-chain. -/// -/// Polls `eth_getTransactionReceipt` until: -/// - receipt.status == 1 → `FillVerdict::Real` -/// - receipt.status == 0 → `FillVerdict::Ghost` (parses revert reason) -/// - timeout expires → `FillVerdict::Timeout` -pub async fn verify_fill(rpc_url: &str, tx_hash: H256, config: &Config) -> Result { - let provider = Provider::::try_from(rpc_url).context("failed to create RPC provider")?; - - let deadline = Instant::now() + config.verify_timeout; - - loop { - if Instant::now() >= deadline { - warn!(?tx_hash, "verification timed out — treating as ghost"); - return Ok(FillVerdict::Timeout { tx_hash }); - } - - match provider.get_transaction_receipt(tx_hash).await { - Ok(Some(receipt)) => { - let status = receipt.status.map(|s| s.as_u64()).unwrap_or(0); - let block = receipt.block_number.map(|b| b.as_u64()).unwrap_or(0); - - if status == 1 { - info!(?tx_hash, block, "fill verified — REAL"); - return Ok(FillVerdict::Real { tx_hash, block }); - } - - // status == 0: transaction reverted — ghost fill - let reason = extract_revert_reason(&provider, tx_hash, &receipt).await; - let counterparty = extract_counterparty(&receipt); - - warn!(?tx_hash, %reason, "fill reverted — GHOST"); - return Ok(FillVerdict::Ghost { - tx_hash, - reason, - counterparty, - }); - } - Ok(None) => { - debug!(?tx_hash, "no receipt yet, polling..."); - } - Err(e) => { - warn!(?tx_hash, error = %e, "RPC error during receipt poll"); - } - } - - sleep(config.poll_interval).await; - } -} - -/// Try to extract a human-readable revert reason from a reverted tx. -/// -/// Strategies: -/// 1. Call `eth_call` to replay the tx and get the revert data. -/// 2. Look for known signatures in the revert data. -/// 3. Fall back to "unknown revert". -async fn extract_revert_reason( - provider: &Provider, - tx_hash: H256, - receipt: ðers::types::TransactionReceipt, -) -> String { - // Try to get the original transaction to replay it - let tx = match provider.get_transaction(tx_hash).await { - Ok(Some(tx)) => tx, - _ => return "unknown revert (tx not found)".into(), - }; - - // Replay the transaction via eth_call at the block it was included in - let block = receipt.block_number; - let call_request = ethers::types::transaction::eip2718::TypedTransaction::Legacy( - ethers::types::TransactionRequest { - from: tx.from.into(), - to: tx.to.map(ethers::types::NameOrAddress::Address), - gas: tx.gas.into(), - gas_price: tx.gas_price, - value: tx.value.into(), - data: tx.input.clone().into(), - nonce: None, - chain_id: None, - }, - ); - - match provider - .call_raw(&call_request) - .block(block.unwrap().into()) - .await - { - Err(e) => { - let err_str = e.to_string(); - - // Check for TRANSFER_FROM_FAILED in the error message - if err_str.contains(TRANSFER_FROM_FAILED_SELECTOR) { - return TRANSFER_FROM_FAILED_SELECTOR.to_string(); - } - - // Try to decode standard Solidity Error(string) from revert data - if let Some(reason) = parse_solidity_revert(&err_str) { - return reason; - } - - // Check for common patterns in hex revert data - if let Some(hex_data) = extract_hex_revert_data(&err_str) { - if let Some(reason) = decode_revert_bytes(&hex_data) { - return reason; - } - } - - format!("reverted: {}", truncate(&err_str, 200)) - } - Ok(_) => { - // eth_call succeeded — this can happen if state changed between - // the original tx and our replay. Still a ghost fill though. - "reverted on-chain (eth_call replay succeeded — state-dependent revert)".into() - } - } -} - -/// Extract counterparty address from the transaction's `to` field or input data. -fn extract_counterparty(receipt: ðers::types::TransactionReceipt) -> Option
{ - // The `from` in the receipt is the sender (settlement bot). - // For CTF exchange calls, the counterparty is encoded in calldata, - // but as a first pass we return the `to` address (the exchange contract). - // In v2 we can decode the actual taker/maker from calldata. - receipt.to -} - -/// Try to parse Solidity's `Error(string)` from an error message. -fn parse_solidity_revert(err: &str) -> Option { - // The standard revert selector is 0x08c379a0 - if let Some(idx) = err.find("08c379a0") { - let hex_start = idx + 8; // skip selector - let remaining = &err[hex_start..]; - // Extract hex chars - let hex: String = remaining - .chars() - .take_while(|c| c.is_ascii_hexdigit()) - .collect(); - if hex.len() >= 128 { - // offset (32 bytes) + length (32 bytes) + string data - let len_hex = &hex[64..128]; - if let Ok(len) = usize::from_str_radix(len_hex, 16) { - let str_hex = &hex[128..128 + (len * 2).min(hex.len() - 128)]; - if let Ok(bytes) = hex::decode(str_hex) { - if let Ok(s) = String::from_utf8(bytes) { - return Some(s); - } - } - } - } - } - None -} - -/// Extract hex-encoded revert data from a provider error string. -fn extract_hex_revert_data(err: &str) -> Option { - // Many providers return revert data as 0x-prefixed hex - if let Some(idx) = err.find("0x") { - let hex: String = err[idx + 2..] - .chars() - .take_while(|c| c.is_ascii_hexdigit()) - .collect(); - if hex.len() >= 8 { - return Some(hex); - } - } - None -} - -/// Decode raw revert bytes looking for known signatures. -fn decode_revert_bytes(hex_data: &str) -> Option { - // Check for known custom error selectors - if hex_data.len() >= 8 { - let _selector = &hex_data[..8]; - // Common Polymarket CTF exchange errors can be matched here - // Add known selectors as discovered - } - - // Check if the entire revert data decodes to ASCII - if let Ok(bytes) = hex::decode(hex_data) { - let ascii: String = bytes - .iter() - .filter(|b| b.is_ascii_graphic() || b.is_ascii_whitespace()) - .map(|b| *b as char) - .collect(); - if ascii.len() > 4 { - return Some(ascii.trim().to_string()); - } - } - - None -} - -fn truncate(s: &str, max: usize) -> &str { - if s.len() <= max { - s - } else { - &s[..max] - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::types::Config; - use std::time::Duration; - - /// Test with a known ghost fill tx on Polygon mainnet. - /// This tx has status=0 and reverted with TRANSFER_FROM_FAILED. - /// - /// Run with: cargo test -- --ignored test_known_ghost_fill - #[tokio::test] - #[ignore = "requires Polygon RPC access"] - async fn test_known_ghost_fill() { - let config = Config { - rpc_url: "https://polygon-rpc.com".into(), - verify_timeout: Duration::from_secs(30), - poll_interval: Duration::from_millis(500), - ..Default::default() - }; - - let tx_hash: H256 = "0x9e3230abde0f569da87511a6f8823076f7b211bb00d10689db3b7c50d6652df0" - .parse() - .unwrap(); - - let verdict = verify_fill(&config.rpc_url, tx_hash, &config) - .await - .unwrap(); - println!("Verdict: {verdict:?}"); - - assert!(verdict.is_ghost(), "expected ghost fill, got: {verdict:?}"); - } -} diff --git a/src/ws.rs b/src/ws.rs index 0b8cd2a..2e614ff 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -1,11 +1,27 @@ use anyhow::Result; use ethers::types::H256; use futures_util::{SinkExt, StreamExt}; -use serde::Deserialize; +use std::time::Duration; use tokio::sync::mpsc; use tokio_tungstenite::{connect_async, tungstenite::Message}; use tracing::{debug, error, info, warn}; +/// Stream-level events from the CLOB websocket. +#[derive(Debug, Clone)] +pub enum ClobEvent { + /// A trade / fill with on-chain settlement tx. + Fill(ClobFill), + /// A mid-price or orderbook tick for a market. + PriceUpdate(PriceUpdate), + /// WebSocket successfully (re)connected. + Connected, + /// WebSocket disconnected (before reconnect attempt). + Disconnected, + /// Human-readable status message (errors, reconnect attempts). + /// Forwarded to the TUI feed so the user sees what's happening. + Status(String), +} + /// A fill event extracted from the CLOB websocket. #[derive(Debug, Clone)] pub struct ClobFill { @@ -16,76 +32,90 @@ pub struct ClobFill { pub price: f64, } -/// Raw JSON shape of a CLOB fill message. -#[derive(Debug, Deserialize)] -struct ClobMessage { - #[serde(rename = "type")] - msg_type: Option, - data: Option, -} - -#[derive(Debug, Deserialize)] -struct ClobFillData { - #[serde(rename = "transactionHash")] - transaction_hash: Option, - #[serde(rename = "transaction_hash")] - transaction_hash_alt: Option, - market: Option, - side: Option, - size: Option, - price: Option, -} - -impl ClobFillData { - fn tx_hash_str(&self) -> Option<&str> { - self.transaction_hash - .as_deref() - .or(self.transaction_hash_alt.as_deref()) - } +/// Best bid / best ask update for a market. +#[derive(Debug, Clone)] +pub struct PriceUpdate { + pub market: String, + pub best_bid: f64, + pub best_ask: f64, } -/// Connect to the Polymarket CLOB WebSocket and stream fill events. +/// Connect to the Polymarket CLOB WebSocket and stream events. /// -/// Sends each extracted fill into `tx`. Reconnects on disconnect. -pub async fn listen_clob_fills( +/// Reconnects with exponential backoff (1s → 2s → 4s → ... capped at 60s) +/// and resets on a successful connection. +pub async fn listen_clob_events( ws_url: &str, markets: &[String], - tx: mpsc::Sender, + tx: mpsc::Sender, ) -> Result<()> { + let mut backoff_secs: u64 = 1; + loop { info!(url = ws_url, "connecting to CLOB websocket..."); + let _ = tx + .send(ClobEvent::Status(format!("connecting to {ws_url}"))) + .await; match connect_async(ws_url).await { Ok((ws_stream, _)) => { info!("CLOB websocket connected"); + backoff_secs = 1; + let _ = tx.send(ClobEvent::Connected).await; + let _ = tx + .send(ClobEvent::Status("CLOB websocket connected".into())) + .await; + let (mut write, mut read) = ws_stream.split(); - // Subscribe to the market fill channel if markets are provided, otherwise user channel + // Polymarket CLOB expects `assets_ids` (plural of asset) with + // `type = "market"` directly; no "subscribe" envelope. let subscribe_msg = if markets.is_empty() { serde_json::json!({ - "type": "subscribe", - "channel": "user", - "markets": [], + "type": "market", + "assets_ids": [], }) } else { serde_json::json!({ - "type": "subscribe", - "channel": "market", - "markets": markets, + "type": "market", + "assets_ids": markets, }) }; - if let Err(e) = write.send(Message::Text(subscribe_msg.to_string())).await { + let sub_json = subscribe_msg.to_string(); + debug!(payload = %sub_json, "sending subscribe"); + if let Err(e) = write.send(Message::Text(sub_json.clone())).await { error!(error = %e, "failed to send subscribe message"); - continue; + let _ = tx + .send(ClobEvent::Status(format!("subscribe send failed: {e}"))) + .await; + } else { + let _ = tx + .send(ClobEvent::Status(format!( + "subscribed to {} asset(s)", + markets.len() + ))) + .await; } while let Some(msg_result) = read.next().await { match msg_result { Ok(Message::Text(text)) => { - if let Some(fill) = parse_fill_message(&text) { - debug!(?fill.tx_hash, "received fill from CLOB"); - if tx.send(fill).await.is_err() { - info!("fill channel closed, shutting down ws listener"); + // Log every raw message at debug. Useful for + // `RUST_LOG=ghostguard::ws=debug` diagnosis. + debug!(len = text.len(), raw = %truncate(&text, 400), "ws msg"); + + for event in parse_clob_message(&text) { + match &event { + ClobEvent::Fill(f) => { + debug!(market = %f.market, size = f.size, price = f.price, "parsed fill"); + } + ClobEvent::PriceUpdate(p) => { + debug!(market = %p.market, bid = p.best_bid, ask = p.best_ask, "parsed price update"); + } + _ => {} + } + if tx.send(event).await.is_err() { + info!("event channel closed, shutting down ws listener"); return Ok(()); } } @@ -107,74 +137,359 @@ pub async fn listen_clob_fills( } Err(e) => { error!(error = %e, "failed to connect to CLOB websocket"); + let _ = tx + .send(ClobEvent::Status(format!("connect failed: {e}"))) + .await; } } - warn!("reconnecting to CLOB websocket in 3s..."); - tokio::time::sleep(std::time::Duration::from_secs(3)).await; + let _ = tx.send(ClobEvent::Disconnected).await; + warn!(backoff_secs, "reconnecting to CLOB websocket..."); + let _ = tx + .send(ClobEvent::Status(format!( + "reconnecting in {backoff_secs}s..." + ))) + .await; + tokio::time::sleep(Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(60); + } +} + +/// Parse a raw CLOB websocket message. +/// +/// Polymarket's market channel emits events with fields at the TOP level +/// (no `data` wrapper) and may batch multiple events into a JSON array. +/// Known `event_type` values: +/// - `book` — full orderbook snapshot (bids + asks) +/// - `price_change` — per-level update (array of `changes`) +/// - `last_trade_price` — most recent trade (no tx_hash!) +/// - `tick_size_change` — rare, ignored +fn parse_clob_message(text: &str) -> Vec { + let value: serde_json::Value = match serde_json::from_str(text) { + Ok(v) => v, + Err(e) => { + warn!(error = %e, raw = %truncate(text, 200), "failed to parse ws msg"); + return vec![]; + } + }; + + if let Some(arr) = value.as_array() { + arr.iter().filter_map(parse_single_event).collect() + } else { + parse_single_event(&value).into_iter().collect() + } +} + +fn parse_single_event(v: &serde_json::Value) -> Option { + let event_type = v + .get("event_type") + .or_else(|| v.get("type")) + .and_then(|x| x.as_str()) + .unwrap_or(""); + + match event_type { + "book" => parse_book(v).map(ClobEvent::PriceUpdate), + "price_change" => parse_price_change(v).map(ClobEvent::PriceUpdate), + "last_trade_price" | "trade" | "fill" | "order_fill" => { + parse_last_trade(v).map(ClobEvent::Fill) + } + "tick_size_change" | "pong" | "heartbeat" | "" => None, + other => { + debug!(event_type = other, "unknown ws event type"); + None + } + } +} + +fn str_field(v: &serde_json::Value, key: &str) -> Option { + v.get(key).and_then(|x| x.as_str()).map(|s| s.to_string()) +} + +fn f64_field(v: &serde_json::Value, key: &str) -> Option { + v.get(key).and_then(|x| match x { + serde_json::Value::String(s) => s.parse().ok(), + serde_json::Value::Number(n) => n.as_f64(), + _ => None, + }) +} + +fn market_id(v: &serde_json::Value) -> String { + str_field(v, "asset_id") + .or_else(|| str_field(v, "market")) + .unwrap_or_default() +} + +/// Parse an orderbook snapshot into a mid-price update. +/// `bids` and `asks` are arrays of `{price, size}` objects. Sizes may be "0" +/// for removed levels; those are filtered out. +fn parse_book(v: &serde_json::Value) -> Option { + let market = market_id(v); + if market.is_empty() { + return None; + } + + let bids = v.get("bids").and_then(|b| b.as_array())?; + let asks = v.get("asks").and_then(|a| a.as_array())?; + + let best_bid = bids + .iter() + .filter(|lvl| f64_field(lvl, "size").unwrap_or(0.0) > 0.0) + .filter_map(|lvl| f64_field(lvl, "price")) + .fold(0.0_f64, f64::max); + + let best_ask = bids_min_ask(asks); + + if best_bid <= 0.0 || !best_ask.is_finite() || best_ask <= 0.0 { + return None; } + + Some(PriceUpdate { + market, + best_bid, + best_ask, + }) } -/// Parse a raw CLOB websocket message into a ClobFill if it contains a fill. -fn parse_fill_message(text: &str) -> Option { - let msg: ClobMessage = serde_json::from_str(text).ok()?; +fn bids_min_ask(asks: &[serde_json::Value]) -> f64 { + asks.iter() + .filter(|lvl| f64_field(lvl, "size").unwrap_or(0.0) > 0.0) + .filter_map(|lvl| f64_field(lvl, "price")) + .filter(|&p| p > 0.0) + .fold(f64::INFINITY, f64::min) +} - // Only process fill/trade messages - let msg_type = msg.msg_type.as_deref().unwrap_or(""); - if !matches!(msg_type, "fill" | "trade" | "order_fill") { +/// Parse an incremental price change. Multiple changes may share one message. +/// We re-emit the FIRST non-zero bid/ask we find; this is an approximation +/// (the real orderbook top might need delta reconstruction) but is good +/// enough to seed a rolling mid for predictive scoring. +fn parse_price_change(v: &serde_json::Value) -> Option { + let market = market_id(v); + if market.is_empty() { return None; } - let data = msg.data?; - let tx_hash_str = data.tx_hash_str()?; + let changes = v.get("changes").and_then(|c| c.as_array())?; + let mut best_bid: f64 = 0.0; + let mut best_ask: f64 = f64::INFINITY; + for ch in changes { + let price = f64_field(ch, "price").unwrap_or(0.0); + let size = f64_field(ch, "size").unwrap_or(0.0); + let side = str_field(ch, "side").unwrap_or_default(); + if price <= 0.0 || size <= 0.0 { + continue; + } + match side.as_str() { + "BUY" | "bid" => { + if price > best_bid { + best_bid = price; + } + } + "SELL" | "ask" => { + if price < best_ask { + best_ask = price; + } + } + _ => {} + } + } - let tx_hash: H256 = tx_hash_str - .parse() - .inspect_err(|e| { - warn!(tx_hash = tx_hash_str, error = %e, "invalid tx hash in fill"); + if best_bid > 0.0 && best_ask.is_finite() { + Some(PriceUpdate { + market, + best_bid, + best_ask, }) - .ok()?; + } else { + None + } +} + +/// Parse a `last_trade_price` event. Polymarket's market channel does NOT +/// include a settlement transaction hash — that's resolved later via the +/// trade-history API. We emit a `ClobFill` with a zero `tx_hash`; downstream +/// code (detection.rs) skips on-chain verification when the hash is zero. +fn parse_last_trade(v: &serde_json::Value) -> Option { + let market = market_id(v); + if market.is_empty() { + return None; + } + + let size = f64_field(v, "size").unwrap_or(0.0); + let price = f64_field(v, "price").unwrap_or(0.0); + if size <= 0.0 || price <= 0.0 { + return None; + } + + // Prefer explicit transaction_hash if the event happens to include one + // (e.g. user-channel fills). Otherwise leave as zero. + let tx_hash = str_field(v, "transaction_hash") + .or_else(|| str_field(v, "transactionHash")) + .and_then(|s| s.parse::().ok()) + .unwrap_or_else(H256::zero); Some(ClobFill { tx_hash, - market: data.market.unwrap_or_default(), - side: data.side.unwrap_or_default(), - size: data.size.and_then(|s| s.parse().ok()).unwrap_or(0.0), - price: data.price.and_then(|s| s.parse().ok()).unwrap_or(0.0), + market, + side: str_field(v, "side").unwrap_or_default(), + size, + price, }) } +fn truncate(s: &str, max: usize) -> &str { + if s.len() <= max { + s + } else { + &s[..max] + } +} + #[cfg(test)] mod tests { use super::*; + fn first(events: Vec) -> ClobEvent { + events.into_iter().next().expect("expected one event") + } + + #[test] + fn test_parse_last_trade_polymarket_format() { + // Real Polymarket market channel format: fields at top level, + // event_type instead of type, no data wrapper, no tx_hash. + let json = r#"{ + "event_type": "last_trade_price", + "asset_id": "12345", + "market": "0xabc", + "price": "0.42", + "side": "SELL", + "size": "25", + "timestamp": "1700000000" + }"#; + + match first(parse_clob_message(json)) { + ClobEvent::Fill(f) => { + assert_eq!(f.market, "12345"); + assert_eq!(f.side, "SELL"); + assert!((f.size - 25.0).abs() < f64::EPSILON); + assert!((f.price - 0.42).abs() < f64::EPSILON); + assert_eq!(f.tx_hash, H256::zero(), "market channel has no tx hash"); + } + _ => panic!("expected Fill"), + } + } + + #[test] + fn test_parse_book_snapshot() { + let json = r#"{ + "event_type": "book", + "asset_id": "12345", + "market": "0xabc", + "bids": [ + {"price": "0.48", "size": "100"}, + {"price": "0.47", "size": "200"} + ], + "asks": [ + {"price": "0.52", "size": "150"}, + {"price": "0.53", "size": "50"} + ] + }"#; + + match first(parse_clob_message(json)) { + ClobEvent::PriceUpdate(p) => { + assert_eq!(p.market, "12345"); + assert!((p.best_bid - 0.48).abs() < f64::EPSILON); + assert!((p.best_ask - 0.52).abs() < f64::EPSILON); + } + _ => panic!("expected PriceUpdate"), + } + } + #[test] - fn test_parse_fill_message() { + fn test_parse_book_skips_zero_sizes() { + // Size 0 means the level was removed. let json = r#"{ - "type": "fill", - "data": { - "transactionHash": "0x9e3230abde0f569da87511a6f8823076f7b211bb00d10689db3b7c50d6652df0", - "market": "0xabc123", - "side": "BUY", - "size": "100.0", - "price": "0.65" + "event_type": "book", + "asset_id": "m", + "bids": [ + {"price": "0.99", "size": "0"}, + {"price": "0.48", "size": "100"} + ], + "asks": [ + {"price": "0.01", "size": "0"}, + {"price": "0.52", "size": "50"} + ] + }"#; + match first(parse_clob_message(json)) { + ClobEvent::PriceUpdate(p) => { + assert!((p.best_bid - 0.48).abs() < f64::EPSILON); + assert!((p.best_ask - 0.52).abs() < f64::EPSILON); } + _ => panic!("expected PriceUpdate"), + } + } + + #[test] + fn test_parse_price_change() { + let json = r#"{ + "event_type": "price_change", + "asset_id": "m", + "changes": [ + {"price": "0.49", "side": "BUY", "size": "100"}, + {"price": "0.51", "side": "SELL", "size": "200"} + ] }"#; + match first(parse_clob_message(json)) { + ClobEvent::PriceUpdate(p) => { + assert!((p.best_bid - 0.49).abs() < f64::EPSILON); + assert!((p.best_ask - 0.51).abs() < f64::EPSILON); + } + _ => panic!("expected PriceUpdate"), + } + } - let fill = parse_fill_message(json).expect("should parse fill"); - assert_eq!( - format!("{:?}", fill.tx_hash), - "0x9e3230abde0f569da87511a6f8823076f7b211bb00d10689db3b7c50d6652df0" - ); - assert_eq!(fill.market, "0xabc123"); - assert_eq!(fill.side, "BUY"); - assert!((fill.size - 100.0).abs() < f64::EPSILON); - assert!((fill.price - 0.65).abs() < f64::EPSILON); + #[test] + fn test_parse_message_array() { + // Polymarket batches events into an array on initial snapshot. + let json = r#"[ + {"event_type": "book", "asset_id": "m", + "bids": [{"price":"0.48","size":"100"}], + "asks": [{"price":"0.52","size":"100"}]}, + {"event_type": "last_trade_price", "asset_id": "m", + "price": "0.50", "side": "BUY", "size": "10"} + ]"#; + let events = parse_clob_message(json); + assert_eq!(events.len(), 2); + assert!(matches!(events[0], ClobEvent::PriceUpdate(_))); + assert!(matches!(events[1], ClobEvent::Fill(_))); } #[test] - fn test_parse_non_fill_message() { - let json = r#"{"type": "heartbeat"}"#; - assert!(parse_fill_message(json).is_none()); + fn test_parse_unknown_event() { + let json = r#"{"event_type": "tick_size_change", "asset_id": "m"}"#; + assert!(parse_clob_message(json).is_empty()); + } + + #[test] + fn test_parse_invalid_json() { + assert!(parse_clob_message("not json").is_empty()); + } + + #[test] + fn test_last_trade_with_tx_hash_preserved() { + // User-channel fills may include the settlement tx hash. + let json = r#"{ + "event_type": "trade", + "asset_id": "m", + "side": "BUY", + "size": "50", + "price": "0.6", + "transaction_hash": "0x9e3230abde0f569da87511a6f8823076f7b211bb00d10689db3b7c50d6652df0" + }"#; + match first(parse_clob_message(json)) { + ClobEvent::Fill(f) => { + assert_ne!(f.tx_hash, H256::zero()); + } + _ => panic!("expected Fill"), + } } }