From 3fb08fc40f0c6d12b85f0e27f7f4a9e3878a8dee Mon Sep 17 00:00:00 2001 From: createMonster Date: Fri, 4 Jul 2025 17:13:53 +0800 Subject: [PATCH 1/2] Add framework for paradex --- Cargo.lock | 169 ++++++++++++++++++++++++++- Cargo.toml | 1 + examples/paradex_example.rs | 16 +++ src/core/errors.rs | 3 + src/exchanges/mod.rs | 1 + src/exchanges/paradex/account.rs | 40 +++++++ src/exchanges/paradex/auth.rs | 95 +++++++++++++++ src/exchanges/paradex/client.rs | 12 ++ src/exchanges/paradex/converters.rs | 67 +++++++++++ src/exchanges/paradex/market_data.rs | 73 ++++++++++++ src/exchanges/paradex/mod.rs | 10 ++ src/exchanges/paradex/trading.rs | 40 +++++++ src/exchanges/paradex/types.rs | 59 ++++++++++ src/exchanges/paradex/websocket.rs | 1 + 14 files changed, 581 insertions(+), 6 deletions(-) create mode 100644 examples/paradex_example.rs create mode 100644 src/exchanges/paradex/account.rs create mode 100644 src/exchanges/paradex/auth.rs create mode 100644 src/exchanges/paradex/client.rs create mode 100644 src/exchanges/paradex/converters.rs create mode 100644 src/exchanges/paradex/market_data.rs create mode 100644 src/exchanges/paradex/mod.rs create mode 100644 src/exchanges/paradex/trading.rs create mode 100644 src/exchanges/paradex/types.rs create mode 100644 src/exchanges/paradex/websocket.rs diff --git a/Cargo.lock b/Cargo.lock index fc07756..5d25d00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64ct" version = "1.8.0" @@ -253,6 +259,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +dependencies = [ + "powerfmt", +] + [[package]] name = "digest" version = "0.10.7" @@ -484,8 +499,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -806,6 +823,21 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +dependencies = [ + "base64 0.22.1", + "js-sys", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "keccak" version = "0.1.5" @@ -861,7 +893,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "base64", + "base64 0.21.7", "chrono", "dotenv", "ed25519-dalek", @@ -870,6 +902,7 @@ dependencies = [ "governor", "hex", "hmac", + "jsonwebtoken", "nonzero_ext", "rand", "reqwest", @@ -879,7 +912,7 @@ dependencies = [ "serde_json", "sha2", "sha3", - "thiserror", + "thiserror 1.0.69", "tokio", "tokio-retry", "tokio-tungstenite", @@ -959,6 +992,31 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1056,6 +1114,16 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "pem" +version = "3.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3" +dependencies = [ + "base64 0.22.1", + "serde", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1125,6 +1193,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1227,7 +1301,7 @@ version = "0.11.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" dependencies = [ - "base64", + "base64 0.21.7", "bytes", "encoding_rs", "futures-core", @@ -1261,6 +1335,20 @@ dependencies = [ "winreg", ] +[[package]] +name = "ring" +version = "0.17.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" +dependencies = [ + "cc", + "cfg-if", + "getrandom 0.2.16", + "libc", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -1295,7 +1383,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64", + "base64 0.21.7", ] [[package]] @@ -1492,6 +1580,18 @@ dependencies = [ "rand_core", ] +[[package]] +name = "simple_asn1" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "297f631f50729c8c99b84667867963997ec0b50f32b2a7dbcab828ef0541e8bb" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror 2.0.12", + "time", +] + [[package]] name = "slab" version = "0.4.9" @@ -1616,7 +1716,16 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl 2.0.12", ] [[package]] @@ -1630,6 +1739,17 @@ dependencies = [ "syn", ] +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.8" @@ -1640,6 +1760,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" + +[[package]] +name = "time-macros" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.8.1" @@ -1811,7 +1962,7 @@ dependencies = [ "native-tls", "rand", "sha1", - "thiserror", + "thiserror 1.0.69", "url", "utf-8", ] @@ -1828,6 +1979,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.4" diff --git a/Cargo.toml b/Cargo.toml index b6c775c..c1bdb78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ base64 = "0.21" # Optional dependencies dotenv = { version = "0.15", optional = true } +jsonwebtoken = "9.3.1" [lints.clippy] all = { level = "warn", priority = -1 } diff --git a/examples/paradex_example.rs b/examples/paradex_example.rs new file mode 100644 index 0000000..1b9d2ee --- /dev/null +++ b/examples/paradex_example.rs @@ -0,0 +1,16 @@ +use lotusx::{ + core::{config::ExchangeConfig, traits::MarketDataSource}, + exchanges::paradex::ParadexConnector, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let config = ExchangeConfig::from_env("PARADEX")?; + let connector = ParadexConnector::new(config); + + // Test basic functionality + let markets = connector.get_markets().await?; + println!("Found {} markets", markets.len()); + + Ok(()) +} diff --git a/src/core/errors.rs b/src/core/errors.rs index 91ee15f..16cad9e 100644 --- a/src/core/errors.rs +++ b/src/core/errors.rs @@ -23,6 +23,9 @@ pub enum ExchangeError { #[error("Configuration error: {0}")] ConfigError(#[from] crate::core::config::ConfigError), + #[error("WebSocket error: {0}")] + WebSocketError(String), + #[error("Other error: {0}")] Other(String), diff --git a/src/exchanges/mod.rs b/src/exchanges/mod.rs index c38300e..2e4187b 100644 --- a/src/exchanges/mod.rs +++ b/src/exchanges/mod.rs @@ -4,3 +4,4 @@ pub mod binance_perp; pub mod bybit; pub mod bybit_perp; pub mod hyperliquid; +pub mod paradex; diff --git a/src/exchanges/paradex/account.rs b/src/exchanges/paradex/account.rs new file mode 100644 index 0000000..0053169 --- /dev/null +++ b/src/exchanges/paradex/account.rs @@ -0,0 +1,40 @@ +use crate::core::errors::ExchangeError; +use crate::core::traits::AccountInfo; +use crate::core::types::{Balance, Position}; +use crate::exchanges::paradex::auth::ParadexAuth; +use crate::exchanges::paradex::ParadexConnector; +use async_trait::async_trait; +use secrecy::ExposeSecret; + +#[async_trait] +impl AccountInfo for ParadexConnector { + async fn get_account_balance(&self) -> Result, ExchangeError> { + let auth = ParadexAuth::with_private_key(self.config.secret_key.expose_secret().as_str())?; + let token = auth.sign_jwt()?; + + let client = reqwest::Client::new(); + let response = client + .get("https://api.paradex.trade/v1/account") + .bearer_auth(token) + .send() + .await?; + + let balances: Vec = response.json().await?; + Ok(balances) + } + + async fn get_positions(&self) -> Result, ExchangeError> { + let auth = ParadexAuth::with_private_key(self.config.secret_key.expose_secret().as_str())?; + let token = auth.sign_jwt()?; + + let client = reqwest::Client::new(); + let response = client + .get("https://api.paradex.trade/v1/positions") + .bearer_auth(token) + .send() + .await?; + + let positions: Vec = response.json().await?; + Ok(positions) + } +} diff --git a/src/exchanges/paradex/auth.rs b/src/exchanges/paradex/auth.rs new file mode 100644 index 0000000..b55ac85 --- /dev/null +++ b/src/exchanges/paradex/auth.rs @@ -0,0 +1,95 @@ +use crate::core::errors::ExchangeError; +use jsonwebtoken::{encode, EncodingKey, Header}; +use secp256k1::{PublicKey, Secp256k1, SecretKey}; +use serde::{Deserialize, Serialize}; +use sha3::{Digest, Keccak256}; + +#[derive(Debug, Serialize, Deserialize)] +struct Claims { + sub: String, + exp: usize, +} + +pub struct ParadexAuth { + secret_key: Option, + wallet_address: Option, + secp: Secp256k1, +} + +impl Default for ParadexAuth { + fn default() -> Self { + Self::new() + } +} + +impl ParadexAuth { + pub fn new() -> Self { + Self { + secret_key: None, + wallet_address: None, + secp: Secp256k1::new(), + } + } + + pub fn with_private_key(private_key: &str) -> Result { + let secret_key = SecretKey::from_slice( + &hex::decode(private_key.trim_start_matches("0x")) + .map_err(|e| ExchangeError::AuthError(format!("Invalid private key hex: {}", e)))?, + ) + .map_err(|e| ExchangeError::AuthError(format!("Invalid private key: {}", e)))?; + + let secp = Secp256k1::new(); + let public_key = PublicKey::from_secret_key(&secp, &secret_key); + let wallet_address = public_key_to_address(&public_key); + + Ok(Self { + secret_key: Some(secret_key), + wallet_address: Some(wallet_address), + secp, + }) + } + + pub fn wallet_address(&self) -> Option<&str> { + self.wallet_address.as_deref() + } + + pub fn can_sign(&self) -> bool { + self.secret_key.is_some() + } + + pub fn sign_jwt(&self) -> Result { + let secret_key = self.secret_key.ok_or_else(|| { + ExchangeError::AuthError("No private key available for signing".to_string()) + })?; + + let claims = Claims { + sub: self.wallet_address.as_ref().unwrap().to_string(), + exp: (chrono::Utc::now() + chrono::Duration::minutes(5)).timestamp() as usize, + }; + + let token = encode( + &Header::default(), + &claims, + &EncodingKey::from_secret(secret_key.as_ref()), + ) + .map_err(|e| ExchangeError::AuthError(format!("Failed to sign JWT: {}", e)))?; + + Ok(token) + } +} + +fn public_key_to_address(public_key: &PublicKey) -> String { + let public_key_bytes = public_key.serialize_uncompressed(); + + // Remove the 0x04 prefix for uncompressed key + let key_without_prefix = &public_key_bytes[1..]; + + // Hash with Keccak256 + let mut hasher = Keccak256::new(); + hasher.update(key_without_prefix); + let hash = hasher.finalize(); + + // Take the last 20 bytes and format as hex address + let address_bytes = &hash[12..]; + format!("0x{}", hex::encode(address_bytes)) +} diff --git a/src/exchanges/paradex/client.rs b/src/exchanges/paradex/client.rs new file mode 100644 index 0000000..e2cdd20 --- /dev/null +++ b/src/exchanges/paradex/client.rs @@ -0,0 +1,12 @@ +use crate::core::config::ExchangeConfig; + +#[derive(Debug, Clone)] +pub struct ParadexConnector { + pub config: ExchangeConfig, +} + +impl ParadexConnector { + pub fn new(config: ExchangeConfig) -> Self { + Self { config } + } +} diff --git a/src/exchanges/paradex/converters.rs b/src/exchanges/paradex/converters.rs new file mode 100644 index 0000000..cc9d5ee --- /dev/null +++ b/src/exchanges/paradex/converters.rs @@ -0,0 +1,67 @@ +use crate::core::types::{ + Market, OrderResponse, OrderSide, OrderType, Position, PositionSide, Symbol, +}; +use crate::exchanges::paradex::types::{ParadexMarket, ParadexOrder, ParadexPosition}; + +impl From for Market { + fn from(market: ParadexMarket) -> Self { + Self { + symbol: Symbol { + base: market.base_asset.symbol, + quote: market.quote_asset.symbol, + symbol: market.symbol, + }, + status: market.status, + base_precision: market.base_asset.decimals, + quote_precision: market.quote_asset.decimals, + min_qty: Some(market.min_order_size), + max_qty: Some(market.max_order_size), + min_price: Some(market.min_price), + max_price: Some(market.max_price), + } + } +} + +impl From for OrderResponse { + fn from(order: ParadexOrder) -> Self { + Self { + order_id: order.id, + client_order_id: order.client_id, + symbol: order.market, + side: if order.side == "BUY" { + OrderSide::Buy + } else { + OrderSide::Sell + }, + order_type: match order.order_type.as_str() { + "MARKET" => OrderType::Market, + "LIMIT" => OrderType::Limit, + _ => OrderType::Market, // Or handle as an error + }, + quantity: order.size, + price: Some(order.price), + status: order.status, + timestamp: chrono::DateTime::parse_from_rfc3339(&order.created_at) + .unwrap() + .timestamp_millis(), + } + } +} + +impl From for Position { + fn from(position: ParadexPosition) -> Self { + Self { + symbol: position.market, + position_side: if position.side == "LONG" { + PositionSide::Long + } else { + PositionSide::Short + }, + entry_price: position.average_entry_price, + position_amount: position.size, + unrealized_pnl: position.unrealized_pnl, + liquidation_price: position.liquidation_price, + leverage: position.leverage, + } + } +} diff --git a/src/exchanges/paradex/market_data.rs b/src/exchanges/paradex/market_data.rs new file mode 100644 index 0000000..b11ab5d --- /dev/null +++ b/src/exchanges/paradex/market_data.rs @@ -0,0 +1,73 @@ +use crate::core::errors::ExchangeError; +use crate::core::traits::MarketDataSource; +use crate::core::types::{ + Kline, KlineInterval, Market, MarketDataType, SubscriptionType, WebSocketConfig, +}; +use crate::exchanges::paradex::types::ParadexMarket; +use crate::exchanges::paradex::ParadexConnector; +use async_trait::async_trait; +use futures_util::StreamExt; +use reqwest; +use tokio::sync::mpsc; +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; + +#[async_trait] +impl MarketDataSource for ParadexConnector { + async fn get_markets(&self) -> Result, ExchangeError> { + let client = reqwest::Client::new(); + let response = client + .get("https://api.paradex.trade/v1/markets") + .send() + .await?; + let markets: Vec = response.json().await?; + Ok(markets.into_iter().map(Into::into).collect()) + } + + async fn subscribe_market_data( + &self, + _symbols: Vec, + _subscription_types: Vec, + _config: Option, + ) -> Result, ExchangeError> { + let url = self.get_websocket_url(); + let (ws_stream, _) = connect_async(url) + .await + .map_err(|e| ExchangeError::WebSocketError(e.to_string()))?; + let (mut _write, mut read) = ws_stream.split(); + + let (_tx, rx) = mpsc::channel(100); + + tokio::spawn(async move { + while let Some(message) = read.next().await { + match message { + Ok(Message::Text(_text)) => { + // Process the message and send it to the channel + } + Err(e) => { + eprintln!("WebSocket error: {}", e); + break; + } + _ => {} + } + } + }); + + Ok(rx) + } + + fn get_websocket_url(&self) -> String { + "wss://ws.paradex.trade/v1".to_string() + } + + async fn get_klines( + &self, + _symbol: String, + _interval: KlineInterval, + _limit: Option, + _start_time: Option, + _end_time: Option, + ) -> Result, ExchangeError> { + // Implementation of get_klines will be added here + Ok(vec![]) + } +} diff --git a/src/exchanges/paradex/mod.rs b/src/exchanges/paradex/mod.rs new file mode 100644 index 0000000..f932b61 --- /dev/null +++ b/src/exchanges/paradex/mod.rs @@ -0,0 +1,10 @@ +pub mod account; +pub mod auth; +pub mod client; +pub mod converters; +pub mod market_data; +pub mod trading; +pub mod types; +pub mod websocket; + +pub use client::ParadexConnector; diff --git a/src/exchanges/paradex/trading.rs b/src/exchanges/paradex/trading.rs new file mode 100644 index 0000000..03b9ba3 --- /dev/null +++ b/src/exchanges/paradex/trading.rs @@ -0,0 +1,40 @@ +use crate::core::errors::ExchangeError; +use crate::core::traits::OrderPlacer; +use crate::core::types::{OrderRequest, OrderResponse}; +use crate::exchanges::paradex::auth::ParadexAuth; +use crate::exchanges::paradex::ParadexConnector; +use async_trait::async_trait; +use secrecy::ExposeSecret; + +#[async_trait] +impl OrderPlacer for ParadexConnector { + async fn place_order(&self, order: OrderRequest) -> Result { + let auth = ParadexAuth::with_private_key(self.config.secret_key.expose_secret().as_str())?; + let token = auth.sign_jwt()?; + + let client = reqwest::Client::new(); + let response = client + .post("https://api.paradex.trade/v1/orders") + .bearer_auth(token) + .json(&order) + .send() + .await?; + + let order_response: OrderResponse = response.json().await?; + Ok(order_response) + } + + async fn cancel_order(&self, _symbol: String, order_id: String) -> Result<(), ExchangeError> { + let auth = ParadexAuth::with_private_key(self.config.secret_key.expose_secret().as_str())?; + let token = auth.sign_jwt()?; + + let client = reqwest::Client::new(); + client + .delete(&format!("https://api.paradex.trade/v1/orders/{}", order_id)) + .bearer_auth(token) + .send() + .await?; + + Ok(()) + } +} diff --git a/src/exchanges/paradex/types.rs b/src/exchanges/paradex/types.rs new file mode 100644 index 0000000..7246531 --- /dev/null +++ b/src/exchanges/paradex/types.rs @@ -0,0 +1,59 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParadexAsset { + pub id: String, + pub symbol: String, + pub name: String, + pub decimals: i32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParadexMarket { + pub symbol: String, + pub base_asset: ParadexAsset, + pub quote_asset: ParadexAsset, + pub status: String, + pub state: String, + pub tick_size: String, + pub step_size: String, + pub min_order_size: String, + pub max_order_size: String, + pub min_price: String, + pub max_price: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParadexOrder { + pub id: String, + pub client_id: String, + pub market: String, + pub side: String, + pub order_type: String, + pub size: String, + pub price: String, + pub status: String, + pub created_at: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParadexFill { + pub id: i64, + pub market: String, + pub side: String, + pub size: String, + pub price: String, + pub fee: String, + pub created_at: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParadexPosition { + pub market: String, + pub side: String, + pub average_entry_price: String, + pub size: String, + pub unrealized_pnl: String, + pub liquidation_price: Option, + pub leverage: String, +} diff --git a/src/exchanges/paradex/websocket.rs b/src/exchanges/paradex/websocket.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/exchanges/paradex/websocket.rs @@ -0,0 +1 @@ + From 5d52a446245b9c60df02a2dfe2c3311df065d357 Mon Sep 17 00:00:00 2001 From: createMonster Date: Fri, 4 Jul 2025 17:43:43 +0800 Subject: [PATCH 2/2] Paradex implementation --- docs/next_move_0704.md | 63 ++++ examples/paradex_example.rs | 339 ++++++++++++++++++- src/exchanges/paradex/account.rs | 126 ++++++-- src/exchanges/paradex/auth.rs | 6 +- src/exchanges/paradex/client.rs | 123 ++++++- src/exchanges/paradex/converters.rs | 25 +- src/exchanges/paradex/market_data.rs | 466 +++++++++++++++++++++++++-- src/exchanges/paradex/trading.rs | 257 +++++++++++++-- src/exchanges/paradex/types.rs | 192 +++++++++++ src/exchanges/paradex/websocket.rs | 311 ++++++++++++++++++ src/utils/exchange_factory.rs | 33 ++ 11 files changed, 1864 insertions(+), 77 deletions(-) create mode 100644 docs/next_move_0704.md diff --git a/docs/next_move_0704.md b/docs/next_move_0704.md new file mode 100644 index 0000000..ba37fb9 --- /dev/null +++ b/docs/next_move_0704.md @@ -0,0 +1,63 @@ +**LotusX Connector Layer — Cross-Exchange Arbitrage Readiness +(English Summary in Markdown)** + +--- + +## 1. Scope + +This document focuses **exclusively on the *connector layer*** of LotusX and outlines the technical work required to make it “arbitrage-ready”. + +--- + +## 2. Capabilities Every Connector Must Expose + +| Module | Goal | Typical API | +| :------------------- | :--------------------------------------------------------------------------------------------------- | :-------------------------------------------------------------------------------- | +| **Market Data** | L2/L3 order-book, recent trades, candles, index price, funding rate, estimated liquidation price | `get_orderbook(level)`, `get_trades`, `get_funding_rate`, `get_liquidation_price` | +| **Trading** | Beyond plain *place/cancel*: batch orders, amend, batch cancel, order-type flags (IOC/FOK/Post-Only) | `place_batch_orders`, `amend_order`, `cancel_batch` | +| **Account / Assets** | Balances, positions, sub-accounts, leverage brackets, dynamic fee table | `get_balances`, `get_positions`, `get_fee_rates`, `get_leverage_bracket` | +| **Funds Transfer** | Internal transfers / on-chain withdrawals (stub if not yet implemented) | `internal_transfer`, `withdraw`, `deposit_history` | +| **System / Meta** | Time sync, exchange status, rate limits | `sync_time`, `get_system_status`, `get_rate_limits` | +| **Observability** | Tracing span + metrics for every REST/WS call | `instrumented_client.request(...)` | + +--- + +## 3. Code-Level Improvements Still Missing + +| Topic | Current State | Action Items | +| :-------------------- | :----------------------------- | :--------------------------------------------------------------------------------------- | +| **Unified Types** | `price/qty` often `String` | Switch to `rust_decimal::Decimal` with `serde` helpers; new-type-safe `Symbol` | +| **REST / WS Kernel** | Each connector rolls its own | Extract `RestClient` / `WsSession` traits handling signing, retries, rate limiting | +| **Feature Gating** | Single crate builds everything | Convert to Cargo **workspace** (`lotusx-core` + `connector-*`), enable with `--features` | +| **Error Granularity** | Generic `Other(String)` | Use `thiserror` + fine-grained mapping of exchange error codes | +| **Testing** | Mostly unit tests | CI on testnet (live order → query → cancel) + `vcr-rs` playback | +| **Observability** | Latency CLI only | Integrate `tracing` + Prometheus (p95 RTT, WS drops, rate-limit hits) | + +--- + +## 4. Next Exchanges to Add (Priority Order) + +1. **OKX** (spot + perp, excellent sub-account support) +2. **Coinbase Advanced Trade API** +3. **Kraken** (spot/futures) +4. **Bitget** & **KuCoin** +5. **Gate.io** / **BingX** (Asia-focused backup) + +--- + +## 5. Six-Week Connector Roadmap + +| Week | Milestone | Key Deliverables | +| :----- | :---------------------------- | :------------------------------------------------------------------------------- | +| **W1** | Workspace & Abstractions | `lotusx-core`, `connector-binance`…; implement `RestClient` + `WsSession` traits | +| **W2** | Type Safety Upgrade | Replace strings with `Decimal`, normalize `Symbol` | +| **W3** | Rate-Limit & Retry Middleware | Unified `RateLimiter` & `RetryPolicy`, dynamic quotas | +| **W4** | **OKX Connector MVP** | Full market, trade, account coverage; CI on OKX testnet | +| **W5** | Batch / Amend Support | `place_batch`, `amend_order` for Binance & Bybit; benchmark latency | +| **W6** | Observability & Docs | `tracing` + Prometheus metrics, README with Grafana dashboard sample | + +--- + +## 6. Summary + +Strengthening the connector layer along these axes—**capabilities, type safety, shared infrastructure, observability, and additional exchanges**—will transform LotusX into a plug-and-play, production-grade foundation for any cross-exchange arbitrage engine. diff --git a/examples/paradex_example.rs b/examples/paradex_example.rs index 1b9d2ee..ec1cb96 100644 --- a/examples/paradex_example.rs +++ b/examples/paradex_example.rs @@ -1,16 +1,347 @@ use lotusx::{ - core::{config::ExchangeConfig, traits::MarketDataSource}, + core::{ + config::ExchangeConfig, + traits::{AccountInfo, FundingRateSource, MarketDataSource, OrderPlacer}, + types::{OrderRequest, OrderSide, OrderType, SubscriptionType, WebSocketConfig}, + }, exchanges::paradex::ParadexConnector, }; +use secrecy::SecretString; +use std::env; +use tokio::time::{sleep, Duration}; +use tracing::{error, info, warn}; #[tokio::main] async fn main() -> Result<(), Box> { - let config = ExchangeConfig::from_env("PARADEX")?; + // Initialize tracing + tracing_subscriber::fmt::init(); + + info!("🚀 Starting Paradex Exchange Example (Perpetual Trading)"); + + // Load configuration from environment + let config = load_config_from_env(); let connector = ParadexConnector::new(config); - // Test basic functionality + // Test basic connectivity + info!("📡 Testing basic connectivity..."); + test_connectivity(&connector).await?; + + // Test market data + info!("📊 Testing market data..."); + test_market_data(&connector).await?; + + // Test funding rates (perpetual specific) + info!("💰 Testing funding rates..."); + test_funding_rates(&connector).await?; + + // Test WebSocket connection + info!("🔗 Testing WebSocket connection..."); + test_websocket(&connector).await?; + + // Test account information (requires credentials) + if connector.can_trade() { + info!("👤 Testing account information..."); + test_account_info(&connector).await?; + + // Test order placement (uncomment for live trading) + // warn!("⚠️ Skipping live order placement in example"); + // test_order_placement(&connector).await?; + } else { + warn!("⚠️ Skipping account and trading tests (missing credentials)"); + } + + info!("✅ Paradex example completed successfully!"); + Ok(()) +} + +fn load_config_from_env() -> ExchangeConfig { + let api_key = env::var("PARADEX_API_KEY").unwrap_or_else(|_| { + warn!("PARADEX_API_KEY not set, account features will be disabled"); + String::new() + }); + + let secret_key = env::var("PARADEX_SECRET_KEY").unwrap_or_else(|_| { + warn!("PARADEX_SECRET_KEY not set, trading features will be disabled"); + String::new() + }); + + let testnet = env::var("PARADEX_TESTNET") + .unwrap_or_else(|_| "true".to_string()) + .parse() + .unwrap_or(true); + + ExchangeConfig { + api_key: SecretString::new(api_key), + secret_key: SecretString::new(secret_key), + base_url: if testnet { + Some("https://api.testnet.paradex.trade".to_string()) + } else { + None + }, + testnet, + } +} + +async fn test_connectivity(connector: &ParadexConnector) -> Result<(), Box> { + info!(" 🔍 Fetching available markets..."); + let markets = connector.get_markets().await?; + info!(" 📈 Found {} markets", markets.len()); + + if !markets.is_empty() { + let sample_market = &markets[0]; + info!( + " 📊 Sample market: {} (status: {})", + sample_market.symbol.symbol, sample_market.status + ); + } + + Ok(()) +} + +async fn test_market_data(connector: &ParadexConnector) -> Result<(), Box> { + let markets = connector.get_markets().await?; + if markets.is_empty() { + warn!(" ⚠️ No markets available for testing"); + return Ok(()); + } + + let test_symbol = &markets[0].symbol.symbol; + info!(" 📊 Testing market data for symbol: {}", test_symbol); + + // Test klines + match connector + .get_klines( + test_symbol.clone(), + lotusx::core::types::KlineInterval::Hours1, + Some(5), + None, + None, + ) + .await + { + Ok(klines) => { + info!(" 📈 Retrieved {} klines", klines.len()); + } + Err(e) => { + warn!(" ⚠️ Klines not available: {}", e); + } + } + + Ok(()) +} + +async fn test_funding_rates( + connector: &ParadexConnector, +) -> Result<(), Box> { + info!(" 💰 Fetching all funding rates..."); + match connector.get_all_funding_rates().await { + Ok(rates) => { + info!(" 📊 Found funding rates for {} symbols", rates.len()); + if !rates.is_empty() { + let sample_rate = &rates[0]; + info!( + " 💰 Sample: {} - Rate: {:?}, Next time: {:?}", + sample_rate.symbol, sample_rate.funding_rate, sample_rate.next_funding_time + ); + } + } + Err(e) => { + error!(" ❌ Failed to fetch funding rates: {}", e); + } + } + + // Test single symbol funding rate + let markets = connector.get_markets().await?; + if !markets.is_empty() { + let test_symbol = &markets[0].symbol.symbol; + info!(" 🎯 Fetching funding rate for {}", test_symbol); + match connector + .get_funding_rates(Some(vec![test_symbol.clone()])) + .await + { + Ok(rates) => { + if !rates.is_empty() { + info!( + " 💰 Funding rate: {:?}, Mark price: {:?}", + rates[0].funding_rate, rates[0].mark_price + ); + } + } + Err(e) => { + warn!(" ⚠️ Single funding rate failed: {}", e); + } + } + } + + Ok(()) +} + +async fn test_websocket(connector: &ParadexConnector) -> Result<(), Box> { + let markets = connector.get_markets().await?; + if markets.is_empty() { + warn!(" ⚠️ No markets available for WebSocket testing"); + return Ok(()); + } + + let test_symbol = markets[0].symbol.symbol.clone(); + info!(" 🔗 Starting WebSocket connection for {}", test_symbol); + + let subscription_types = vec![ + SubscriptionType::Ticker, + SubscriptionType::OrderBook { depth: Some(5) }, + SubscriptionType::Trades, + ]; + + let config = WebSocketConfig { + auto_reconnect: true, + ping_interval: Some(30), + max_reconnect_attempts: Some(3), + }; + + match connector + .subscribe_market_data(vec![test_symbol], subscription_types, Some(config)) + .await + { + Ok(mut receiver) => { + info!(" 📡 WebSocket connected, listening for 10 seconds..."); + let timeout = tokio::time::timeout(Duration::from_secs(10), async { + let mut message_count = 0; + while let Some(data) = receiver.recv().await { + message_count += 1; + match data { + lotusx::core::types::MarketDataType::Ticker(ticker) => { + info!(" 📊 Ticker: {} @ {}", ticker.symbol, ticker.price); + } + lotusx::core::types::MarketDataType::OrderBook(book) => { + info!( + " 📖 Order Book: {} (bids: {}, asks: {})", + book.symbol, + book.bids.len(), + book.asks.len() + ); + } + lotusx::core::types::MarketDataType::Trade(trade) => { + info!( + " 💱 Trade: {} {} @ {}", + trade.symbol, trade.quantity, trade.price + ); + } + lotusx::core::types::MarketDataType::Kline(kline) => { + info!( + " 📈 Kline: {} {} -> {}", + kline.symbol, kline.open_price, kline.close_price + ); + } + } + + if message_count >= 10 { + break; + } + } + info!(" 📡 Received {} messages", message_count); + }); + + if (timeout.await).is_ok() { + info!(" ✅ WebSocket test completed"); + } else { + info!(" ⏰ WebSocket test timed out (this is normal)"); + } + } + Err(e) => { + error!(" ❌ WebSocket connection failed: {}", e); + } + } + + Ok(()) +} + +async fn test_account_info(connector: &ParadexConnector) -> Result<(), Box> { + info!(" 👤 Fetching account balance..."); + match connector.get_account_balance().await { + Ok(balances) => { + info!(" 💰 Account has {} assets", balances.len()); + for balance in balances.iter().take(5) { + info!( + " 💰 {}: {} free, {} locked", + balance.asset, balance.free, balance.locked + ); + } + } + Err(e) => { + error!(" ❌ Failed to fetch balance: {}", e); + } + } + + info!(" 📊 Fetching positions..."); + match connector.get_positions().await { + Ok(positions) => { + info!(" 🎯 Found {} positions", positions.len()); + for position in &positions { + info!( + " 🎯 {}: {} {:?} (PnL: {})", + position.symbol, + position.position_amount, + position.position_side, + position.unrealized_pnl + ); + } + } + Err(e) => { + error!(" ❌ Failed to fetch positions: {}", e); + } + } + + Ok(()) +} + +#[allow(dead_code)] +async fn test_order_placement( + connector: &ParadexConnector, +) -> Result<(), Box> { let markets = connector.get_markets().await?; - println!("Found {} markets", markets.len()); + if markets.is_empty() { + warn!(" ⚠️ No markets available for order testing"); + return Ok(()); + } + + let test_symbol = &markets[0].symbol.symbol; + warn!(" ⚠️ This will place a real order on {}", test_symbol); + + // Create a small test order (modify as needed) + let order = OrderRequest { + symbol: test_symbol.clone(), + side: OrderSide::Buy, + order_type: OrderType::Limit, + quantity: "0.001".to_string(), // Very small quantity + price: Some("1.0".to_string()), // Very low price (unlikely to fill) + time_in_force: Some(lotusx::core::types::TimeInForce::GTC), + stop_price: None, + }; + + info!(" 📝 Placing test order..."); + match connector.place_order(order).await { + Ok(response) => { + info!( + " ✅ Order placed: {} (status: {})", + response.order_id, response.status + ); + + // Wait a moment then cancel the order + sleep(Duration::from_secs(2)).await; + + info!(" 🗑️ Cancelling test order..."); + match connector + .cancel_order(test_symbol.clone(), response.order_id) + .await + { + Ok(_) => info!(" ✅ Order cancelled successfully"), + Err(e) => error!(" ❌ Failed to cancel order: {}", e), + } + } + Err(e) => { + error!(" ❌ Failed to place order: {}", e); + } + } Ok(()) } diff --git a/src/exchanges/paradex/account.rs b/src/exchanges/paradex/account.rs index 0053169..ad97e0f 100644 --- a/src/exchanges/paradex/account.rs +++ b/src/exchanges/paradex/account.rs @@ -1,40 +1,130 @@ +use super::auth::ParadexAuth; +use super::client::ParadexConnector; +use super::types::{ParadexBalance, ParadexPosition}; use crate::core::errors::ExchangeError; use crate::core::traits::AccountInfo; use crate::core::types::{Balance, Position}; -use crate::exchanges::paradex::auth::ParadexAuth; -use crate::exchanges::paradex::ParadexConnector; use async_trait::async_trait; use secrecy::ExposeSecret; +use tracing::{error, instrument}; #[async_trait] impl AccountInfo for ParadexConnector { + #[instrument(skip(self), fields(exchange = "paradex"))] async fn get_account_balance(&self) -> Result, ExchangeError> { - let auth = ParadexAuth::with_private_key(self.config.secret_key.expose_secret().as_str())?; - let token = auth.sign_jwt()?; + if !self.can_trade() { + return Err(ExchangeError::AuthError( + "Missing API credentials for account access".to_string(), + )); + } - let client = reqwest::Client::new(); - let response = client - .get("https://api.paradex.trade/v1/account") + let auth = ParadexAuth::with_private_key(self.config.secret_key.expose_secret().as_str()) + .map_err(|e| { + error!(error = %e, "Failed to create auth"); + ExchangeError::AuthError(format!("Authentication setup failed: {}", e)) + })?; + + let token = auth.sign_jwt().map_err(|e| { + error!(error = %e, "Failed to sign JWT"); + ExchangeError::AuthError(format!("JWT signing failed: {}", e)) + })?; + + let url = format!("{}/v1/account", self.base_url); + + let response = self + .client + .get(&url) .bearer_auth(token) .send() - .await?; + .await + .map_err(|e| { + error!(error = %e, "Failed to send account balance request"); + ExchangeError::NetworkError(format!("Account balance request failed: {}", e)) + })?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + + error!( + status = %status, + error_text = %error_text, + "Account balance request failed" + ); + + return Err(ExchangeError::ApiError { + code: status.as_u16() as i32, + message: format!("Account balance request failed: {}", error_text), + }); + } + + let balances: Vec = response.json().await.map_err(|e| { + error!(error = %e, "Failed to parse account balance response"); + ExchangeError::Other(format!("Failed to parse account balance response: {}", e)) + })?; - let balances: Vec = response.json().await?; - Ok(balances) + Ok(balances.into_iter().map(Into::into).collect()) } + #[instrument(skip(self), fields(exchange = "paradex"))] async fn get_positions(&self) -> Result, ExchangeError> { - let auth = ParadexAuth::with_private_key(self.config.secret_key.expose_secret().as_str())?; - let token = auth.sign_jwt()?; + if !self.can_trade() { + return Err(ExchangeError::AuthError( + "Missing API credentials for position access".to_string(), + )); + } - let client = reqwest::Client::new(); - let response = client - .get("https://api.paradex.trade/v1/positions") + let auth = ParadexAuth::with_private_key(self.config.secret_key.expose_secret().as_str()) + .map_err(|e| { + error!(error = %e, "Failed to create auth"); + ExchangeError::AuthError(format!("Authentication setup failed: {}", e)) + })?; + + let token = auth.sign_jwt().map_err(|e| { + error!(error = %e, "Failed to sign JWT"); + ExchangeError::AuthError(format!("JWT signing failed: {}", e)) + })?; + + let url = format!("{}/v1/positions", self.base_url); + + let response = self + .client + .get(&url) .bearer_auth(token) .send() - .await?; + .await + .map_err(|e| { + error!(error = %e, "Failed to send positions request"); + ExchangeError::NetworkError(format!("Positions request failed: {}", e)) + })?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + + error!( + status = %status, + error_text = %error_text, + "Positions request failed" + ); + + return Err(ExchangeError::ApiError { + code: status.as_u16() as i32, + message: format!("Positions request failed: {}", error_text), + }); + } + + let positions: Vec = response.json().await.map_err(|e| { + error!(error = %e, "Failed to parse positions response"); + ExchangeError::Other(format!("Failed to parse positions response: {}", e)) + })?; - let positions: Vec = response.json().await?; - Ok(positions) + Ok(positions.into_iter().map(Into::into).collect()) } } diff --git a/src/exchanges/paradex/auth.rs b/src/exchanges/paradex/auth.rs index b55ac85..7ffb06c 100644 --- a/src/exchanges/paradex/auth.rs +++ b/src/exchanges/paradex/auth.rs @@ -13,6 +13,7 @@ struct Claims { pub struct ParadexAuth { secret_key: Option, wallet_address: Option, + #[allow(dead_code)] secp: Secp256k1, } @@ -64,7 +65,10 @@ impl ParadexAuth { let claims = Claims { sub: self.wallet_address.as_ref().unwrap().to_string(), - exp: (chrono::Utc::now() + chrono::Duration::minutes(5)).timestamp() as usize, + exp: (chrono::Utc::now() + chrono::Duration::minutes(5)) + .timestamp() + .try_into() + .unwrap_or(0), }; let token = encode( diff --git a/src/exchanges/paradex/client.rs b/src/exchanges/paradex/client.rs index e2cdd20..f650865 100644 --- a/src/exchanges/paradex/client.rs +++ b/src/exchanges/paradex/client.rs @@ -1,12 +1,129 @@ -use crate::core::config::ExchangeConfig; +use super::types::ParadexError; +use crate::core::{config::ExchangeConfig, traits::ExchangeConnector}; +use reqwest::Client; +use std::time::Duration; +use tokio::time::sleep; +use tracing::instrument; #[derive(Debug, Clone)] pub struct ParadexConnector { - pub config: ExchangeConfig, + pub(crate) client: Client, + pub(crate) config: ExchangeConfig, + pub(crate) base_url: String, + pub(crate) ws_url: String, + pub(crate) max_retries: u32, + pub(crate) base_delay_ms: u64, } impl ParadexConnector { pub fn new(config: ExchangeConfig) -> Self { - Self { config } + let base_url = if config.testnet { + "https://api.testnet.paradex.trade".to_string() + } else { + config + .base_url + .clone() + .unwrap_or_else(|| "https://api.paradex.trade".to_string()) + }; + + let ws_url = if config.testnet { + "wss://ws.testnet.paradex.trade/v1".to_string() + } else { + "wss://ws.paradex.trade/v1".to_string() + }; + + Self { + client: Client::new(), + config, + base_url, + ws_url, + max_retries: 3, + base_delay_ms: 100, + } + } + + /// Request with retry logic for HFT latency optimization + #[instrument(skip(self, request_fn), fields(url = %url))] + pub(crate) async fn request_with_retry( + &self, + request_fn: impl Fn() -> reqwest::RequestBuilder, + url: &str, + ) -> Result + where + T: serde::de::DeserializeOwned, + { + let mut attempts = 0; + + loop { + let response = match request_fn().send().await { + Ok(resp) => resp, + Err(e) if attempts < self.max_retries && e.is_timeout() => { + attempts += 1; + let delay = self.base_delay_ms * 2_u64.pow(attempts - 1); + tracing::warn!( + attempt = attempts, + delay_ms = delay, + error = %e, + "Network timeout, retrying request" + ); + sleep(Duration::from_millis(delay)).await; + continue; + } + Err(e) => { + return Err(ParadexError::network_error(format!( + "Request failed after {} attempts: {}", + attempts, e + ))); + } + }; + + if response.status().is_success() { + return response.json::().await.map_err(|e| { + ParadexError::parse_error( + format!("Failed to parse response: {}", e), + Some(url.to_string()), + ) + }); + } else if response.status() == 429 && attempts < self.max_retries { + // Rate limit hit + attempts += 1; + let delay = self.base_delay_ms * 2_u64.pow(attempts - 1); + tracing::warn!( + attempt = attempts, + delay_ms = delay, + status = %response.status(), + "Rate limit hit, backing off" + ); + sleep(Duration::from_millis(delay)).await; + continue; + } + + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(ParadexError::api_error( + status.as_u16() as i32, + format!("HTTP {}: {}", status, error_text), + )); + } + } + + /// Get WebSocket URL for market data + pub fn get_websocket_url(&self) -> String { + self.ws_url.clone() + } + + /// Check if configuration is valid for trading + pub fn can_trade(&self) -> bool { + !self.config.api_key().is_empty() && !self.config.secret_key().is_empty() + } + + /// Get base URL for API requests + pub fn get_base_url(&self) -> &str { + &self.base_url } } + +impl ExchangeConnector for ParadexConnector {} diff --git a/src/exchanges/paradex/converters.rs b/src/exchanges/paradex/converters.rs index cc9d5ee..c7d8180 100644 --- a/src/exchanges/paradex/converters.rs +++ b/src/exchanges/paradex/converters.rs @@ -1,7 +1,9 @@ use crate::core::types::{ - Market, OrderResponse, OrderSide, OrderType, Position, PositionSide, Symbol, + Balance, Market, OrderResponse, OrderSide, OrderType, Position, PositionSide, Symbol, +}; +use crate::exchanges::paradex::types::{ + ParadexBalance, ParadexMarket, ParadexOrder, ParadexPosition, }; -use crate::exchanges::paradex::types::{ParadexMarket, ParadexOrder, ParadexPosition}; impl From for Market { fn from(market: ParadexMarket) -> Self { @@ -34,15 +36,18 @@ impl From for OrderResponse { OrderSide::Sell }, order_type: match order.order_type.as_str() { - "MARKET" => OrderType::Market, "LIMIT" => OrderType::Limit, - _ => OrderType::Market, // Or handle as an error + "STOP_MARKET" => OrderType::StopLoss, + "STOP_LIMIT" => OrderType::StopLossLimit, + "TAKE_PROFIT_MARKET" => OrderType::TakeProfit, + "TAKE_PROFIT_LIMIT" => OrderType::TakeProfitLimit, + _ => OrderType::Market, // Default fallback for MARKET and unknown types }, quantity: order.size, price: Some(order.price), status: order.status, timestamp: chrono::DateTime::parse_from_rfc3339(&order.created_at) - .unwrap() + .unwrap_or_else(|_| chrono::Utc::now().into()) .timestamp_millis(), } } @@ -65,3 +70,13 @@ impl From for Position { } } } + +impl From for Balance { + fn from(balance: ParadexBalance) -> Self { + Self { + asset: balance.asset, + free: balance.available, + locked: balance.locked, + } + } +} diff --git a/src/exchanges/paradex/market_data.rs b/src/exchanges/paradex/market_data.rs index b11ab5d..b37738c 100644 --- a/src/exchanges/paradex/market_data.rs +++ b/src/exchanges/paradex/market_data.rs @@ -1,50 +1,135 @@ +use super::client::ParadexConnector; +use super::types::{ParadexError, ParadexFundingRate, ParadexFundingRateHistory, ParadexMarket}; use crate::core::errors::ExchangeError; -use crate::core::traits::MarketDataSource; +use crate::core::traits::{FundingRateSource, MarketDataSource}; use crate::core::types::{ - Kline, KlineInterval, Market, MarketDataType, SubscriptionType, WebSocketConfig, + FundingRate, Kline, KlineInterval, Market, MarketDataType, SubscriptionType, WebSocketConfig, }; -use crate::exchanges::paradex::types::ParadexMarket; -use crate::exchanges::paradex::ParadexConnector; use async_trait::async_trait; -use futures_util::StreamExt; -use reqwest; +use futures_util::{SinkExt, StreamExt}; use tokio::sync::mpsc; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +use tracing::{error, instrument, warn}; #[async_trait] impl MarketDataSource for ParadexConnector { + #[instrument(skip(self), fields(exchange = "paradex"))] async fn get_markets(&self) -> Result, ExchangeError> { - let client = reqwest::Client::new(); - let response = client - .get("https://api.paradex.trade/v1/markets") + let url = format!("{}/v1/markets", self.base_url); + + // First let's see what the raw response looks like + let response = self + .client + .get(&url) .send() - .await?; - let markets: Vec = response.json().await?; - Ok(markets.into_iter().map(Into::into).collect()) + .await + .map_err(|e| ExchangeError::Other(format!("Markets request failed: {}", e)))?; + + if !response.status().is_success() { + let status_code = response.status().as_u16() as i32; + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(ExchangeError::ApiError { + code: status_code, + message: format!("Markets request failed: {}", error_text), + }); + } + + let response_text = response + .text() + .await + .map_err(|e| ExchangeError::Other(format!("Failed to read response text: {}", e)))?; + + tracing::info!("Raw markets response: {}", response_text); + + // Try to parse as different formats + if let Ok(markets_array) = serde_json::from_str::>(&response_text) { + Ok(markets_array.into_iter().map(Into::into).collect()) + } else if let Ok(response_obj) = serde_json::from_str::(&response_text) { + // Check if it's wrapped in a response object + if let Some(markets_data) = response_obj.get("markets") { + if let Ok(markets) = + serde_json::from_value::>(markets_data.clone()) + { + return Ok(markets.into_iter().map(Into::into).collect()); + } + } + + // Check if it's a data field + if let Some(data) = response_obj.get("data") { + if let Ok(markets) = serde_json::from_value::>(data.clone()) { + return Ok(markets.into_iter().map(Into::into).collect()); + } + } + + // Return an error with more details about the structure + Err(ExchangeError::Other(format!( + "Unexpected response format. Response structure: {:?}", + response_obj + ))) + } else { + Err(ExchangeError::Other(format!( + "Failed to parse markets response: {}", + response_text + ))) + } } + #[instrument( + skip(self, _config), + fields( + exchange = "paradex", + symbols_count = symbols.len(), + subscription_types = ?subscription_types + ) + )] async fn subscribe_market_data( &self, - _symbols: Vec, - _subscription_types: Vec, + symbols: Vec, + subscription_types: Vec, _config: Option, ) -> Result, ExchangeError> { let url = self.get_websocket_url(); - let (ws_stream, _) = connect_async(url) + let (ws_stream, _) = connect_async(&url) .await .map_err(|e| ExchangeError::WebSocketError(e.to_string()))?; - let (mut _write, mut read) = ws_stream.split(); - let (_tx, rx) = mpsc::channel(100); + let (mut write, mut read) = ws_stream.split(); + let (tx, rx) = mpsc::channel(1000); + + // Send subscription messages + let subscription_messages = self.build_subscription_messages(&symbols, &subscription_types); + for message in subscription_messages { + if let Err(e) = write.send(Message::Text(message)).await { + error!("Failed to send WebSocket subscription: {}", e); + return Err(ExchangeError::WebSocketError(format!( + "Subscription failed: {}", + e + ))); + } + } + // Spawn task to handle incoming messages + let tx_clone = tx.clone(); tokio::spawn(async move { while let Some(message) = read.next().await { match message { - Ok(Message::Text(_text)) => { - // Process the message and send it to the channel + Ok(Message::Text(text)) => { + if let Ok(parsed_data) = Self::parse_websocket_message(&text) { + if tx_clone.send(parsed_data).await.is_err() { + warn!("Receiver dropped, stopping WebSocket task"); + break; + } + } + } + Ok(Message::Close(_)) => { + warn!("WebSocket connection closed by server"); + break; } Err(e) => { - eprintln!("WebSocket error: {}", e); + error!("WebSocket error: {}", e); break; } _ => {} @@ -56,18 +141,343 @@ impl MarketDataSource for ParadexConnector { } fn get_websocket_url(&self) -> String { - "wss://ws.paradex.trade/v1".to_string() + self.get_websocket_url() } + #[instrument(skip(self), fields(exchange = "paradex", symbol = %symbol))] async fn get_klines( &self, - _symbol: String, - _interval: KlineInterval, - _limit: Option, - _start_time: Option, - _end_time: Option, + symbol: String, + interval: KlineInterval, + limit: Option, + start_time: Option, + end_time: Option, ) -> Result, ExchangeError> { - // Implementation of get_klines will be added here - Ok(vec![]) + let url = format!("{}/v1/klines", self.base_url); + + let mut params = vec![ + ("symbol", symbol.clone()), + ("interval", interval.to_paradex_format()), + ]; + + if let Some(limit_val) = limit { + params.push(("limit", limit_val.to_string())); + } + + if let Some(start) = start_time { + params.push(("startTime", start.to_string())); + } + + if let Some(end) = end_time { + params.push(("endTime", end.to_string())); + } + + let response = self + .client + .get(&url) + .query(¶ms) + .send() + .await + .map_err(|e| { + error!( + symbol = %symbol, + interval = ?interval, + error = %e, + "Failed to fetch klines" + ); + ExchangeError::Other(format!("Klines request failed: {}", e)) + })?; + + if !response.status().is_success() { + let status_code = response.status().as_u16() as i32; + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(ExchangeError::ApiError { + code: status_code, + message: format!("Klines request failed: {}", error_text), + }); + } + + // Parse klines response - this would need to be adapted based on Paradex's actual API + let klines_data: Vec = response + .json() + .await + .map_err(|e| ExchangeError::Other(format!("Failed to parse klines response: {}", e)))?; + + let mut klines = Vec::with_capacity(klines_data.len()); + for kline_data in klines_data { + // This parsing would need to be adapted based on Paradex's actual kline format + if let Some(kline) = Self::parse_kline_data(&kline_data, &symbol, interval) { + klines.push(kline); + } + } + + Ok(klines) + } +} + +// Funding Rate Implementation for Paradex Perpetual +#[async_trait] +impl FundingRateSource for ParadexConnector { + #[instrument(skip(self), fields(exchange = "paradex", symbols = ?symbols))] + async fn get_funding_rates( + &self, + symbols: Option>, + ) -> Result, ExchangeError> { + match symbols { + Some(symbol_list) if symbol_list.len() == 1 => self + .get_single_funding_rate(&symbol_list[0]) + .await + .map(|rate| vec![rate]), + Some(_) | None => self.get_all_funding_rates().await, + } + } + + #[instrument(skip(self), fields(exchange = "paradex"))] + async fn get_all_funding_rates(&self) -> Result, ExchangeError> { + let url = format!("{}/v1/funding-rates", self.base_url); + + let funding_rates: Vec = self + .request_with_retry(|| self.client.get(&url), &url) + .await + .map_err(|e| -> ExchangeError { + error!(error = %e, "Failed to fetch all funding rates"); + ExchangeError::Other(format!("All funding rates request failed: {}", e)) + })?; + + let mut result = Vec::with_capacity(funding_rates.len()); + for rate in funding_rates { + result.push(FundingRate { + symbol: rate.symbol, + funding_rate: Some(rate.funding_rate), + previous_funding_rate: None, + next_funding_rate: None, + funding_time: None, + next_funding_time: Some(rate.next_funding_time), + mark_price: Some(rate.mark_price), + index_price: Some(rate.index_price), + timestamp: rate.timestamp, + }); + } + + Ok(result) + } + + #[instrument(skip(self), fields(exchange = "paradex", symbol = %symbol))] + async fn get_funding_rate_history( + &self, + symbol: String, + start_time: Option, + end_time: Option, + limit: Option, + ) -> Result, ExchangeError> { + let url = format!("{}/v1/funding-rate-history", self.base_url); + + let mut params = vec![("symbol", symbol.clone())]; + + if let Some(limit_val) = limit { + params.push(("limit", limit_val.to_string())); + } + + if let Some(start) = start_time { + params.push(("startTime", start.to_string())); + } + + if let Some(end) = end_time { + params.push(("endTime", end.to_string())); + } + + let response = self + .client + .get(&url) + .query(¶ms) + .send() + .await + .map_err(|e| { + error!( + symbol = %symbol, + error = %e, + "Failed to fetch funding rate history" + ); + ExchangeError::Other(format!("Funding rate history request failed: {}", e)) + })?; + + if !response.status().is_success() { + let status_code = response.status().as_u16() as i32; + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(ExchangeError::ApiError { + code: status_code, + message: format!("Funding rate history request failed: {}", error_text), + }); + } + + let funding_rates: Vec = response.json().await.map_err(|e| { + ExchangeError::Other(format!("Failed to parse funding rate history: {}", e)) + })?; + + let mut result = Vec::with_capacity(funding_rates.len()); + for rate in funding_rates { + result.push(FundingRate { + symbol: rate.symbol, + funding_rate: Some(rate.funding_rate), + previous_funding_rate: None, + next_funding_rate: None, + funding_time: Some(rate.funding_time), + next_funding_time: None, + mark_price: None, + index_price: None, + timestamp: chrono::Utc::now().timestamp_millis(), + }); + } + + Ok(result) + } +} + +impl ParadexConnector { + async fn get_single_funding_rate(&self, symbol: &str) -> Result { + let url = format!("{}/v1/funding-rate", self.base_url); + + let params = vec![("symbol", symbol.to_string())]; + + let response = self + .client + .get(&url) + .query(¶ms) + .send() + .await + .map_err(|e| { + error!( + symbol = %symbol, + error = %e, + "Failed to fetch single funding rate" + ); + ExchangeError::Other(format!("Single funding rate request failed: {}", e)) + })?; + + if !response.status().is_success() { + let status_code = response.status().as_u16() as i32; + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + return Err(ExchangeError::ApiError { + code: status_code, + message: format!("Single funding rate request failed: {}", error_text), + }); + } + + let funding_rate: ParadexFundingRate = response.json().await.map_err(|e| { + ExchangeError::Other(format!("Failed to parse funding rate response: {}", e)) + })?; + + Ok(FundingRate { + symbol: funding_rate.symbol, + funding_rate: Some(funding_rate.funding_rate), + previous_funding_rate: None, + next_funding_rate: None, + funding_time: None, + next_funding_time: Some(funding_rate.next_funding_time), + mark_price: Some(funding_rate.mark_price), + index_price: Some(funding_rate.index_price), + timestamp: funding_rate.timestamp, + }) + } + + fn build_subscription_messages( + &self, + symbols: &[String], + subscription_types: &[SubscriptionType], + ) -> Vec { + let mut messages = Vec::new(); + + for symbol in symbols { + for sub_type in subscription_types { + let channel = match sub_type { + SubscriptionType::Ticker => format!("ticker@{}", symbol), + SubscriptionType::OrderBook { depth } => depth.as_ref().map_or_else( + || format!("depth@{}", symbol), + |d| format!("depth{}@{}", d, symbol), + ), + SubscriptionType::Trades => format!("trade@{}", symbol), + SubscriptionType::Klines { interval } => { + format!("kline_{}@{}", interval.to_paradex_format(), symbol) + } + }; + + let subscription = serde_json::json!({ + "method": "SUBSCRIBE", + "params": [channel], + "id": messages.len() + 1 + }); + + messages.push(subscription.to_string()); + } + } + + messages + } + + fn parse_websocket_message(_text: &str) -> Result { + // This would need to be implemented based on Paradex's actual WebSocket message format + // For now, return a placeholder error + Err(ExchangeError::Other( + "WebSocket message parsing not yet implemented".to_string(), + )) + } + + fn parse_kline_data( + _data: &serde_json::Value, + _symbol: &str, + _interval: KlineInterval, + ) -> Option { + // This would need to be implemented based on Paradex's actual kline data format + // For now, return None + None + } +} + +// Extend KlineInterval for Paradex format +trait ParadexKlineInterval { + fn to_paradex_format(&self) -> String; +} + +impl ParadexKlineInterval for KlineInterval { + fn to_paradex_format(&self) -> String { + match self { + Self::Seconds1 | Self::Minutes1 => "1m".to_string(), + Self::Minutes3 => "3m".to_string(), + Self::Minutes5 => "5m".to_string(), + Self::Minutes15 => "15m".to_string(), + Self::Minutes30 => "30m".to_string(), + Self::Hours1 => "1h".to_string(), + Self::Hours2 => "2h".to_string(), + Self::Hours4 => "4h".to_string(), + Self::Hours6 => "6h".to_string(), + Self::Hours8 => "8h".to_string(), + Self::Hours12 => "12h".to_string(), + Self::Days1 => "1d".to_string(), + Self::Days3 => "3d".to_string(), + Self::Weeks1 => "1w".to_string(), + Self::Months1 => "1M".to_string(), + } + } +} + +impl From for ExchangeError { + fn from(error: ParadexError) -> Self { + match error { + ParadexError::ApiError { code, message } => Self::ApiError { code, message }, + ParadexError::AuthError { reason } => Self::AuthError(reason), + ParadexError::NetworkError(e) => Self::NetworkError(e.to_string()), + ParadexError::JsonError(e) => Self::Other(e.to_string()), + ParadexError::WebSocketError { reason } => Self::WebSocketError(reason), + _ => Self::Other(error.to_string()), + } } } diff --git a/src/exchanges/paradex/trading.rs b/src/exchanges/paradex/trading.rs index 03b9ba3..413e766 100644 --- a/src/exchanges/paradex/trading.rs +++ b/src/exchanges/paradex/trading.rs @@ -1,40 +1,261 @@ +use super::auth::ParadexAuth; +use super::client::ParadexConnector; +use super::types::ParadexOrder; use crate::core::errors::ExchangeError; use crate::core::traits::OrderPlacer; use crate::core::types::{OrderRequest, OrderResponse}; -use crate::exchanges::paradex::auth::ParadexAuth; -use crate::exchanges::paradex::ParadexConnector; use async_trait::async_trait; use secrecy::ExposeSecret; +use tracing::{error, instrument}; #[async_trait] impl OrderPlacer for ParadexConnector { + #[instrument( + skip(self), + fields( + exchange = "paradex", + symbol = %order.symbol, + side = ?order.side, + order_type = ?order.order_type, + quantity = %order.quantity + ) + )] async fn place_order(&self, order: OrderRequest) -> Result { - let auth = ParadexAuth::with_private_key(self.config.secret_key.expose_secret().as_str())?; - let token = auth.sign_jwt()?; + if !self.can_trade() { + return Err(ExchangeError::AuthError( + "Missing API credentials for trading".to_string(), + )); + } - let client = reqwest::Client::new(); - let response = client - .post("https://api.paradex.trade/v1/orders") + let auth = ParadexAuth::with_private_key(self.config.secret_key.expose_secret().as_str()) + .map_err(|e| { + error!(error = %e, "Failed to create auth"); + ExchangeError::AuthError(format!("Authentication setup failed: {}", e)) + })?; + + let token = auth.sign_jwt().map_err(|e| { + error!(error = %e, "Failed to sign JWT"); + ExchangeError::AuthError(format!("JWT signing failed: {}", e)) + })?; + + let url = format!("{}/v1/orders", self.base_url); + + // Convert order to Paradex format + let paradex_order = convert_order_request(&order); + + let response = self + .client + .post(&url) .bearer_auth(token) - .json(&order) + .json(¶dex_order) .send() - .await?; + .await + .map_err(|e| { + error!( + symbol = %order.symbol, + error = %e, + "Failed to send order request" + ); + ExchangeError::NetworkError(format!("Order request failed: {}", e)) + })?; - let order_response: OrderResponse = response.json().await?; - Ok(order_response) + self.handle_order_response(response, &order).await } - async fn cancel_order(&self, _symbol: String, order_id: String) -> Result<(), ExchangeError> { - let auth = ParadexAuth::with_private_key(self.config.secret_key.expose_secret().as_str())?; - let token = auth.sign_jwt()?; + #[instrument( + skip(self), + fields( + exchange = "paradex", + symbol = %symbol, + order_id = %order_id + ) + )] + async fn cancel_order(&self, symbol: String, order_id: String) -> Result<(), ExchangeError> { + if !self.can_trade() { + return Err(ExchangeError::AuthError( + "Missing API credentials for trading".to_string(), + )); + } - let client = reqwest::Client::new(); - client - .delete(&format!("https://api.paradex.trade/v1/orders/{}", order_id)) + let auth = ParadexAuth::with_private_key(self.config.secret_key.expose_secret().as_str()) + .map_err(|e| { + error!(error = %e, "Failed to create auth"); + ExchangeError::AuthError(format!("Authentication setup failed: {}", e)) + })?; + + let token = auth.sign_jwt().map_err(|e| { + error!(error = %e, "Failed to sign JWT"); + ExchangeError::AuthError(format!("JWT signing failed: {}", e)) + })?; + + let url = format!("{}/v1/orders/{}", self.base_url, order_id); + + let response = self + .client + .delete(&url) .bearer_auth(token) .send() - .await?; + .await + .map_err(|e| { + error!( + symbol = %symbol, + order_id = %order_id, + error = %e, + "Failed to send cancel request" + ); + ExchangeError::NetworkError(format!("Cancel request failed: {}", e)) + })?; + + self.handle_cancel_response(response, &symbol, &order_id) + .await + } +} + +impl ParadexConnector { + #[cold] + #[inline(never)] + async fn handle_order_response( + &self, + response: reqwest::Response, + order: &OrderRequest, + ) -> Result { + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + + error!( + symbol = %order.symbol, + status = %status, + error_text = %error_text, + "Order placement failed" + ); + + return Err(ExchangeError::ApiError { + code: status.as_u16() as i32, + message: format!("Order placement failed: {}", error_text), + }); + } + + let paradex_response: ParadexOrder = response.json().await.map_err(|e| { + error!( + symbol = %order.symbol, + error = %e, + "Failed to parse order response" + ); + ExchangeError::Other(format!("Failed to parse order response: {}", e)) + })?; + + Ok(OrderResponse { + order_id: paradex_response.id, + client_order_id: paradex_response.client_id, + symbol: paradex_response.market, + side: order.side.clone(), + order_type: order.order_type.clone(), + quantity: paradex_response.size, + price: Some(paradex_response.price), + status: paradex_response.status, + timestamp: chrono::DateTime::parse_from_rfc3339(¶dex_response.created_at) + .unwrap_or_else(|_| chrono::Utc::now().into()) + .timestamp_millis(), + }) + } + + #[cold] + #[inline(never)] + async fn handle_cancel_response( + &self, + response: reqwest::Response, + symbol: &str, + order_id: &str, + ) -> Result<(), ExchangeError> { + if !response.status().is_success() { + let status = response.status(); + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + + error!( + symbol = %symbol, + order_id = %order_id, + status = %status, + error_text = %error_text, + "Order cancellation failed" + ); + + return Err(ExchangeError::ApiError { + code: status.as_u16() as i32, + message: format!("Order cancellation failed: {}", error_text), + }); + } Ok(()) } } + +/// Convert core `OrderRequest` to Paradex order format +fn convert_order_request(order: &OrderRequest) -> serde_json::Value { + let side = match order.side { + crate::core::types::OrderSide::Buy => "BUY", + crate::core::types::OrderSide::Sell => "SELL", + }; + + let order_type = match order.order_type { + crate::core::types::OrderType::Market => "MARKET", + crate::core::types::OrderType::Limit => "LIMIT", + crate::core::types::OrderType::StopLoss => "STOP_MARKET", + crate::core::types::OrderType::StopLossLimit => "STOP_LIMIT", + crate::core::types::OrderType::TakeProfit => "TAKE_PROFIT_MARKET", + crate::core::types::OrderType::TakeProfitLimit => "TAKE_PROFIT_LIMIT", + }; + + let mut paradex_order = serde_json::json!({ + "market": order.symbol, + "side": side, + "order_type": order_type, + "size": order.quantity, + }); + + // Add price for limit orders + if let Some(price) = &order.price { + if matches!( + order.order_type, + crate::core::types::OrderType::Limit + | crate::core::types::OrderType::StopLossLimit + | crate::core::types::OrderType::TakeProfitLimit + ) { + paradex_order["price"] = serde_json::Value::String(price.clone()); + } + } + + // Add stop price for stop orders + if let Some(stop_price) = &order.stop_price { + if matches!( + order.order_type, + crate::core::types::OrderType::StopLoss + | crate::core::types::OrderType::StopLossLimit + | crate::core::types::OrderType::TakeProfit + | crate::core::types::OrderType::TakeProfitLimit + ) { + paradex_order["stop_price"] = serde_json::Value::String(stop_price.clone()); + } + } + + // Add time in force for limit orders + if let Some(tif) = &order.time_in_force { + let time_in_force = match tif { + crate::core::types::TimeInForce::GTC => "GTC", + crate::core::types::TimeInForce::IOC => "IOC", + crate::core::types::TimeInForce::FOK => "FOK", + }; + paradex_order["time_in_force"] = serde_json::Value::String(time_in_force.to_string()); + } else if matches!(order.order_type, crate::core::types::OrderType::Limit) { + // Default to GTC for limit orders + paradex_order["time_in_force"] = serde_json::Value::String("GTC".to_string()); + } + + paradex_order +} diff --git a/src/exchanges/paradex/types.rs b/src/exchanges/paradex/types.rs index 7246531..d72e98f 100644 --- a/src/exchanges/paradex/types.rs +++ b/src/exchanges/paradex/types.rs @@ -1,4 +1,140 @@ use serde::{Deserialize, Serialize}; +use thiserror::Error; + +// Paradex-specific error types following HFT error handling guidelines +#[derive(Error, Debug)] +pub enum ParadexError { + #[error("API error {code}: {message}")] + ApiError { code: i32, message: String }, + + #[error("Authentication failed: {reason}")] + AuthError { reason: String }, + + #[error("Invalid order parameters: {details}")] + InvalidOrder { details: String }, + + #[error("Network request failed")] + NetworkError(#[from] reqwest::Error), + + #[error("JSON parsing failed")] + JsonError(#[from] serde_json::Error), + + #[error("JWT signing failed")] + JwtError(#[from] jsonwebtoken::errors::Error), + + #[error("Rate limit exceeded for endpoint: {endpoint}")] + RateLimit { endpoint: String }, + + #[error("Market not found: {symbol}")] + MarketNotFound { symbol: String }, + + #[error("Insufficient balance for operation")] + InsufficientBalance, + + #[error("WebSocket connection failed: {reason}")] + WebSocketError { reason: String }, + + #[error("Funding rate error: {message}, symbol={symbol:?}")] + FundingRateError { + message: String, + symbol: Option, + }, +} + +impl ParadexError { + /// Mark cold error paths to keep happy path in I-cache + #[cold] + #[inline(never)] + pub fn api_error(code: i32, message: String) -> Self { + Self::ApiError { code, message } + } + + #[cold] + #[inline(never)] + pub fn auth_error(reason: String) -> Self { + Self::AuthError { reason } + } + + #[cold] + #[inline(never)] + pub fn invalid_order(details: String) -> Self { + Self::InvalidOrder { details } + } + + #[cold] + #[inline(never)] + pub fn rate_limit(endpoint: String) -> Self { + Self::RateLimit { endpoint } + } + + #[cold] + #[inline(never)] + pub fn market_not_found(symbol: String) -> Self { + Self::MarketNotFound { symbol } + } + + #[cold] + #[inline(never)] + pub fn websocket_error(reason: String) -> Self { + Self::WebSocketError { reason } + } + + #[cold] + #[inline(never)] + pub fn funding_rate_error(message: String, symbol: Option) -> Self { + Self::FundingRateError { message, symbol } + } + + #[cold] + #[inline(never)] + pub fn network_error(message: String) -> Self { + Self::ApiError { code: 0, message } + } + + #[cold] + #[inline(never)] + pub fn parse_error(message: String, _context: Option) -> Self { + Self::ApiError { code: 1, message } + } +} + +// Helper trait for adding context to Paradex operations +pub trait ParadexResultExt { + fn with_symbol_context(self, symbol: &str) -> Result; + fn with_exchange_context(self, context_fn: F) -> Result + where + F: FnOnce() -> String; +} + +impl ParadexResultExt for Result +where + E: Into, +{ + fn with_symbol_context(self, symbol: &str) -> Result { + self.map_err(|e| { + let base_error = e.into(); + match base_error { + ParadexError::NetworkError(_) => { + ParadexError::api_error(0, format!("Network error for symbol {}", symbol)) + } + _ => base_error, + } + }) + } + + fn with_exchange_context(self, context_fn: F) -> Result + where + F: FnOnce() -> String, + { + self.map_err(|e| { + let base_error = e.into(); + match base_error { + ParadexError::NetworkError(_) => ParadexError::api_error(0, context_fn()), + _ => base_error, + } + }) + } +} #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ParadexAsset { @@ -57,3 +193,59 @@ pub struct ParadexPosition { pub liquidation_price: Option, pub leverage: String, } + +// API Response types +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParadexApiResponse { + pub success: bool, + pub data: Option, + pub error: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParadexApiError { + pub code: i32, + pub message: String, +} + +// Funding rate types for perpetual trading +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParadexFundingRate { + pub symbol: String, + pub funding_rate: String, + pub next_funding_time: i64, + pub mark_price: String, + pub index_price: String, + pub timestamp: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParadexFundingRateHistory { + pub symbol: String, + pub funding_rate: String, + pub funding_time: i64, +} + +// Balance and account types +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParadexBalance { + pub asset: String, + pub available: String, + pub locked: String, + pub total: String, +} + +// WebSocket types +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParadexWebSocketMessage { + pub channel: String, + pub data: serde_json::Value, + pub timestamp: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParadexWebSocketSubscription { + pub method: String, + pub params: Vec, + pub id: u64, +} diff --git a/src/exchanges/paradex/websocket.rs b/src/exchanges/paradex/websocket.rs index 8b13789..7e1cec4 100644 --- a/src/exchanges/paradex/websocket.rs +++ b/src/exchanges/paradex/websocket.rs @@ -1 +1,312 @@ +use super::client::ParadexConnector; +use crate::core::errors::ExchangeError; +use crate::core::types::{ + Kline, MarketDataType, OrderBook, OrderBookEntry, SubscriptionType, Ticker, Trade, + WebSocketConfig, +}; +use futures_util::{SinkExt, StreamExt}; +use serde_json::{json, Value}; +use tokio::sync::mpsc; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tracing::{error, instrument, warn}; +/// Public function to handle WebSocket market data subscription +/// This is called by the `MarketDataSource` trait implementation +#[instrument(skip(client, config), fields(exchange = "paradex"))] +pub async fn subscribe_market_data_impl( + client: &ParadexConnector, + symbols: Vec, + subscription_types: Vec, + config: Option, +) -> Result, ExchangeError> { + let ws_url = client.get_websocket_url(); + let (ws_stream, _) = connect_async(&ws_url) + .await + .map_err(|e| ExchangeError::NetworkError(format!("WebSocket connection failed: {}", e)))?; + + let (mut ws_sender, mut ws_receiver) = ws_stream.split(); + let (tx, rx) = mpsc::channel(1000); + + // Handle auto-reconnection if configured + let auto_reconnect = config.as_ref().map_or(true, |c| c.auto_reconnect); + let _max_reconnect_attempts = config + .as_ref() + .and_then(|c| c.max_reconnect_attempts) + .unwrap_or(5); + + // Send all subscriptions + send_subscriptions(&mut ws_sender, &symbols, &subscription_types).await?; + + // Spawn task to handle incoming messages + let tx_clone = tx.clone(); + let symbols_clone = symbols.clone(); + + tokio::spawn(async move { + let mut heartbeat_interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); + + loop { + tokio::select! { + // Handle incoming WebSocket messages + msg = ws_receiver.next() => { + if handle_websocket_message(msg, &tx_clone, &symbols_clone, auto_reconnect).await { + break; + } + } + // Send periodic heartbeat/ping + _ = heartbeat_interval.tick() => { + if send_heartbeat(&mut ws_sender).await { + break; + } + } + } + } + }); + + Ok(rx) +} + +// Helper function to send all WebSocket subscriptions +async fn send_subscriptions( + ws_sender: &mut futures_util::stream::SplitSink< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + Message, + >, + symbols: &[String], + subscription_types: &[SubscriptionType], +) -> Result<(), ExchangeError> { + let mut subscription_id = 1; + + // Create all subscription combinations + for symbol in symbols { + for sub_type in subscription_types { + let channel = create_subscription_channel(symbol, sub_type); + let subscription = json!({ + "method": "SUBSCRIBE", + "params": [channel], + "id": subscription_id + }); + + let msg = Message::Text(subscription.to_string()); + ws_sender.send(msg).await.map_err(|e| { + ExchangeError::NetworkError(format!("Failed to send subscription: {}", e)) + })?; + + subscription_id += 1; + } + } + + Ok(()) +} + +// Helper function to create subscription channel name +fn create_subscription_channel(symbol: &str, sub_type: &SubscriptionType) -> String { + match sub_type { + SubscriptionType::Ticker => format!("ticker@{}", symbol), + SubscriptionType::OrderBook { depth } => depth.as_ref().map_or_else( + || format!("depth@{}", symbol), + |d| format!("depth{}@{}", d, symbol), + ), + SubscriptionType::Trades => format!("trade@{}", symbol), + SubscriptionType::Klines { interval } => { + format!("kline_{}@{}", interval.to_string().to_lowercase(), symbol) + } + } +} + +// Helper function to handle incoming WebSocket messages +async fn handle_websocket_message( + msg: Option>, + tx: &mpsc::Sender, + symbols: &[String], + auto_reconnect: bool, +) -> bool { + match msg { + Some(Ok(Message::Text(text))) => process_text_message(&text, tx, symbols).await, + Some(Ok(Message::Binary(_) | Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => { + // Handle binary, ping, pong, and frame messages - continue processing + false + } + Some(Ok(Message::Close(_))) => { + warn!("WebSocket connection closed by server"); + true + } + Some(Err(e)) => { + error!("WebSocket error: {}", e); + if auto_reconnect { + warn!("Attempting to reconnect..."); + } + true + } + None => { + warn!("WebSocket stream ended"); + true + } + } +} + +// Helper function to process text messages +async fn process_text_message( + text: &str, + tx: &mpsc::Sender, + symbols: &[String], +) -> bool { + let Ok(parsed) = serde_json::from_str::(text) else { + warn!("Failed to parse WebSocket message: {}", text); + return false; + }; + + // Handle subscription confirmations + if parsed.get("result").is_some() && parsed.get("id").is_some() { + // This is a subscription confirmation, continue processing + return false; + } + + let Some(channel) = parsed.get("channel").and_then(|c| c.as_str()) else { + return false; + }; + + let Some(data) = parsed.get("data") else { + return false; + }; + + // Process data for relevant symbols + for symbol in symbols { + if channel.contains(symbol) { + if let Some(market_data) = convert_ws_data(channel, data, symbol) { + if tx.send(market_data).await.is_err() { + warn!("Receiver dropped, stopping WebSocket task"); + return true; + } + } + } + } + + false +} + +// Helper function to convert WebSocket data to MarketDataType +fn convert_ws_data(channel: &str, data: &Value, symbol: &str) -> Option { + if channel.contains("ticker") { + convert_ticker_data(data, symbol) + } else if channel.contains("depth") { + convert_orderbook_data(data, symbol) + } else if channel.contains("trade") { + convert_trade_data(data, symbol) + } else if channel.contains("kline") { + convert_kline_data(data, symbol) + } else { + None + } +} + +// Convert ticker data +fn convert_ticker_data(data: &Value, symbol: &str) -> Option { + let ticker = Ticker { + symbol: symbol.to_string(), + price: data.get("price")?.as_str()?.to_string(), + price_change: data.get("price_change")?.as_str()?.to_string(), + price_change_percent: data.get("price_change_percent")?.as_str()?.to_string(), + high_price: data.get("high")?.as_str()?.to_string(), + low_price: data.get("low")?.as_str()?.to_string(), + volume: data.get("volume")?.as_str()?.to_string(), + quote_volume: data.get("quote_volume")?.as_str()?.to_string(), + open_time: data.get("open_time")?.as_i64()?, + close_time: data.get("close_time")?.as_i64()?, + count: data.get("count")?.as_i64()?, + }; + Some(MarketDataType::Ticker(ticker)) +} + +// Convert order book data +fn convert_orderbook_data(data: &Value, symbol: &str) -> Option { + let bids = data + .get("bids")? + .as_array()? + .iter() + .filter_map(|bid| { + if let [price, quantity] = bid.as_array()?.as_slice() { + Some(OrderBookEntry { + price: price.as_str()?.to_string(), + quantity: quantity.as_str()?.to_string(), + }) + } else { + None + } + }) + .collect(); + + let asks = data + .get("asks")? + .as_array()? + .iter() + .filter_map(|ask| { + if let [price, quantity] = ask.as_array()?.as_slice() { + Some(OrderBookEntry { + price: price.as_str()?.to_string(), + quantity: quantity.as_str()?.to_string(), + }) + } else { + None + } + }) + .collect(); + + let order_book = OrderBook { + symbol: symbol.to_string(), + bids, + asks, + last_update_id: data.get("last_update_id")?.as_i64()?, + }; + + Some(MarketDataType::OrderBook(order_book)) +} + +// Convert trade data +fn convert_trade_data(data: &Value, symbol: &str) -> Option { + let trade = Trade { + symbol: symbol.to_string(), + id: data.get("id")?.as_i64()?, + price: data.get("price")?.as_str()?.to_string(), + quantity: data.get("quantity")?.as_str()?.to_string(), + time: data.get("time")?.as_i64()?, + is_buyer_maker: data.get("is_buyer_maker")?.as_bool()?, + }; + Some(MarketDataType::Trade(trade)) +} + +// Convert kline data +fn convert_kline_data(data: &Value, symbol: &str) -> Option { + let kline = Kline { + symbol: symbol.to_string(), + open_time: data.get("open_time")?.as_i64()?, + close_time: data.get("close_time")?.as_i64()?, + interval: data.get("interval")?.as_str()?.to_string(), + open_price: data.get("open")?.as_str()?.to_string(), + high_price: data.get("high")?.as_str()?.to_string(), + low_price: data.get("low")?.as_str()?.to_string(), + close_price: data.get("close")?.as_str()?.to_string(), + volume: data.get("volume")?.as_str()?.to_string(), + number_of_trades: data.get("trades")?.as_i64()?, + final_bar: data.get("final")?.as_bool()?, + }; + Some(MarketDataType::Kline(kline)) +} + +// Helper function to send heartbeat +async fn send_heartbeat( + ws_sender: &mut futures_util::stream::SplitSink< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + Message, + >, +) -> bool { + let ping_msg = Message::Ping(vec![]); + if let Err(e) = ws_sender.send(ping_msg).await { + error!("Failed to send heartbeat: {}", e); + return true; + } + false +} diff --git a/src/utils/exchange_factory.rs b/src/utils/exchange_factory.rs index bae1887..1b90055 100644 --- a/src/utils/exchange_factory.rs +++ b/src/utils/exchange_factory.rs @@ -2,6 +2,7 @@ use crate::core::{config::ExchangeConfig, traits::MarketDataSource}; use crate::exchanges::{ backpack::BackpackConnector, binance::BinanceConnector, binance_perp::BinancePerpConnector, bybit::BybitConnector, bybit_perp::BybitPerpConnector, hyperliquid::HyperliquidClient, + paradex::ParadexConnector, }; /// Configuration for an exchange in the latency test @@ -24,6 +25,7 @@ pub enum ExchangeType { BybitPerp, Backpack, Hyperliquid, + Paradex, } impl std::fmt::Display for ExchangeType { @@ -35,6 +37,7 @@ impl std::fmt::Display for ExchangeType { Self::BybitPerp => write!(f, "Bybit Perp"), Self::Backpack => write!(f, "Backpack"), Self::Hyperliquid => write!(f, "Hyperliquid"), + Self::Paradex => write!(f, "Paradex"), } } } @@ -78,6 +81,10 @@ impl ExchangeFactory { } } ExchangeType::Hyperliquid => Ok(Box::new(HyperliquidClient::read_only(testnet))), + ExchangeType::Paradex => { + let cfg = config.unwrap_or_else(|| ExchangeConfig::read_only().testnet(testnet)); + Ok(Box::new(ParadexConnector::new(cfg))) + } } } @@ -140,6 +147,18 @@ impl ExchangeFactory { requires_auth: false, symbols: vec!["BTC".to_string(), "ETH".to_string(), "SOL".to_string()], }, + ExchangeTestConfig { + name: "Paradex".to_string(), + exchange_type: ExchangeType::Paradex, + testnet: true, // Default to testnet for Paradex + base_url: None, + requires_auth: false, + symbols: vec![ + "BTC-USD".to_string(), + "ETH-USD".to_string(), + "SOL-USD".to_string(), + ], + }, // Note: Backpack excluded from default config as it requires valid credentials ] } @@ -160,6 +179,18 @@ impl ExchangeFactory { }); } + // Add Paradex if credentials are available + if ExchangeConfig::from_env("PARADEX").is_ok() { + configs.push(ExchangeTestConfig { + name: "Paradex (Auth)".to_string(), + exchange_type: ExchangeType::Paradex, + testnet: true, + base_url: None, + requires_auth: true, + symbols: vec!["BTC-USD".to_string(), "ETH-USD".to_string()], + }); + } + configs } @@ -172,6 +203,7 @@ impl ExchangeFactory { ExchangeType::BybitPerp, ExchangeType::Backpack, ExchangeType::Hyperliquid, + ExchangeType::Paradex, ] } } @@ -203,6 +235,7 @@ impl ExchangeTestConfigBuilder { let symbols = match exchange_type { ExchangeType::Hyperliquid => vec!["BTC".to_string(), "ETH".to_string()], ExchangeType::Backpack => vec!["SOL_USDC".to_string(), "BTC_USDC".to_string()], + ExchangeType::Paradex => vec!["BTC-USD".to_string(), "ETH-USD".to_string()], _ => vec!["BTCUSDT".to_string(), "ETHUSDT".to_string()], };