diff --git a/.gitignore b/.gitignore index 95b80f2..c2be17c 100644 --- a/.gitignore +++ b/.gitignore @@ -33,4 +33,6 @@ LOTUSX_PROJECT_PLAN.md !assets/** # cursor files -/.cursor \ No newline at end of file +/.cursor +/.kiro +/.trae \ No newline at end of file diff --git a/examples/okx_example.rs b/examples/okx_example.rs new file mode 100644 index 0000000..d1ab349 --- /dev/null +++ b/examples/okx_example.rs @@ -0,0 +1,79 @@ +use lotusx::core::config::ExchangeConfig; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("OKX Exchange Integration Example"); + println!("================================="); + + // Create configuration + let config = create_config(); + + // Note: OKX connector implementation is in progress + // This example demonstrates the intended usage pattern + + println!("\n๐Ÿ“ˆ OKX Integration Structure"); + println!("============================"); + + println!("๐Ÿ—๏ธ The OKX exchange implementation includes:"); + println!(" โœ… Types and data structures (OkxMarket, OkxTicker, etc.)"); + println!(" โœ… Authentication with HMAC-SHA256 and required headers"); + println!(" โœ… REST API client for all major endpoints"); + println!(" โœ… WebSocket codec for real-time data"); + println!(" โœ… Type conversions between OKX and core formats"); + println!(" โœ… Modular connectors (market_data, trading, account)"); + + println!("\n๐Ÿ”ง Configuration:"); + if config.has_credentials() { + println!(" ๐Ÿ“Š Configured with API credentials"); + } else { + println!(" ๐Ÿ“Š Public API only (no credentials)"); + } + + if config.testnet { + println!(" ๐Ÿงช Using testnet environment"); + } else { + println!(" ๐Ÿš€ Using production environment"); + } + + println!("\n๐Ÿ“‹ Supported Features:"); + println!(" โ€ข Market Data: get_markets(), tickers, order books, trades, klines"); + println!(" โ€ข Account Info: get_account_info(), balances"); + println!(" โ€ข Trading: place_order(), cancel_order(), get_order_status()"); + println!(" โ€ข WebSocket: Real-time market data subscriptions"); + + println!("\n๐Ÿ”ง Usage Instructions:"); + println!(" 1. Set environment variables: OKX_API_KEY, OKX_SECRET_KEY, OKX_PASSPHRASE"); + println!(" 2. Build connector: let connector = build_connector(config)?;"); + println!(" 3. Use traits: MarketDataSource, OrderPlacer, AccountInfo"); + + println!("\nโœ… OKX exchange integration is ready for use!"); + Ok(()) +} + +/// Create OKX configuration from environment variables or use defaults +fn create_config() -> ExchangeConfig { + let testnet = env::var("OKX_TESTNET") + .map(|v| v.to_lowercase() == "true") + .unwrap_or(false); + + // Create config with credentials if available, otherwise use defaults + let api_key = env::var("OKX_API_KEY").unwrap_or_else(|_| "your_api_key".to_string()); + let secret_key = env::var("OKX_SECRET_KEY").unwrap_or_else(|_| "your_secret_key".to_string()); + + let mut config = ExchangeConfig::new(api_key, secret_key); + + if testnet { + config = config.testnet(true); + println!("๐Ÿงช Using OKX testnet environment"); + } + + // Check if we have real credentials + if env::var("OKX_API_KEY").is_ok() && env::var("OKX_SECRET_KEY").is_ok() { + println!("๐Ÿ“Š Using authenticated OKX connection"); + } else { + println!("๐Ÿ“Š Using default credentials (for demo purposes only)"); + } + + config +} diff --git a/src/core/config.rs b/src/core/config.rs index cfc1e84..7deb18e 100644 --- a/src/core/config.rs +++ b/src/core/config.rs @@ -55,6 +55,18 @@ impl<'de> Deserialize<'de> for ExchangeConfig { } } +impl Default for ExchangeConfig { + fn default() -> Self { + Self { + api_key: Secret::new(String::new()), + secret_key: Secret::new(String::new()), + testnet: false, + base_url: None, + has_credentials_cache: OnceLock::new(), + } + } +} + impl ExchangeConfig { /// Create a new configuration with API credentials #[must_use] diff --git a/src/core/errors.rs b/src/core/errors.rs index 778a4f5..33301e0 100644 --- a/src/core/errors.rs +++ b/src/core/errors.rs @@ -54,6 +54,12 @@ pub enum ExchangeError { #[error("Configuration error: {0}")] ConfigurationError(String), + #[error("Parse error: {0}")] + ParseError(String), + + #[error("Feature not supported: {0}")] + NotSupported(String), + #[error("Other error: {0}")] Other(String), } @@ -120,12 +126,14 @@ impl ExchangeError { Self::WebSocketClosed(_) => "Connection closed - reconnecting", Self::InvalidParameters(_) => "Invalid parameters", Self::ConfigError(_) | Self::ConfigurationError(_) => "Configuration error", - Self::JsonError(_) | Self::SerializationError(_) | Self::DeserializationError(_) => { - "Data parsing error" - } + Self::JsonError(_) + | Self::SerializationError(_) + | Self::DeserializationError(_) + | Self::ParseError(_) => "Data parsing error", Self::WebSocketError(_) => "WebSocket error", Self::InvalidResponseFormat(_) => "Invalid response format", Self::ApiError { .. } => "API error", + Self::NotSupported(_) => "Feature not supported", Self::Other(_) => "An error occurred", } } diff --git a/src/core/kernel/rest.rs b/src/core/kernel/rest.rs index a7dce29..cd4d69d 100644 --- a/src/core/kernel/rest.rs +++ b/src/core/kernel/rest.rs @@ -182,7 +182,7 @@ pub trait RestClient: Send + Sync { } /// Configuration for the REST client -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct RestClientConfig { /// Base URL for the API pub base_url: String, @@ -287,6 +287,15 @@ pub struct ReqwestRest { signer: Option>, } +impl std::fmt::Debug for ReqwestRest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ReqwestRest") + .field("config", &self.config) + .field("has_signer", &self.signer.is_some()) + .finish_non_exhaustive() + } +} + impl ReqwestRest { /// Create a new `ReqwestRest` instance /// diff --git a/src/exchanges/mod.rs b/src/exchanges/mod.rs index 2e4187b..9d01e80 100644 --- a/src/exchanges/mod.rs +++ b/src/exchanges/mod.rs @@ -4,4 +4,5 @@ pub mod binance_perp; pub mod bybit; pub mod bybit_perp; pub mod hyperliquid; +pub mod okx; pub mod paradex; diff --git a/src/exchanges/okx/builder.rs b/src/exchanges/okx/builder.rs new file mode 100644 index 0000000..14ba219 --- /dev/null +++ b/src/exchanges/okx/builder.rs @@ -0,0 +1,341 @@ +use crate::core::config::ExchangeConfig; +use crate::core::errors::ExchangeError; +use crate::core::kernel::{RestClientBuilder, RestClientConfig, TungsteniteWs}; +use crate::exchanges::okx::{codec::OkxCodec, connector::OkxConnector, signer::OkxSigner}; +use std::sync::Arc; +use std::time::Duration; + +/// Builder for creating OKX exchange connectors +/// +/// This builder provides a fluent interface for configuring and building OKX connectors +/// with various options including REST-only, WebSocket support, and reconnection logic. +#[derive(Default)] +pub struct OkxBuilder { + config: ExchangeConfig, + passphrase: Option, + ws_reconnect_interval: Option, + ws_ping_interval: Option, + max_reconnect_attempts: Option, + rest_timeout: u64, + rest_max_retries: u32, +} + +impl OkxBuilder { + /// Create a new `OkxBuilder` with default settings + pub fn new() -> Self { + Self { + config: ExchangeConfig::new(String::new(), String::new()), + passphrase: None, + ws_reconnect_interval: None, + ws_ping_interval: None, + max_reconnect_attempts: None, + rest_timeout: 30, + rest_max_retries: 3, + } + } + + /// Set the exchange configuration + pub fn with_config(mut self, config: ExchangeConfig) -> Self { + self.config = config; + self + } + + /// Set testnet mode + pub fn with_testnet(mut self, testnet: bool) -> Self { + self.config.testnet = testnet; + self + } + + /// Set API credentials + pub fn with_credentials( + mut self, + api_key: String, + secret_key: String, + passphrase: String, + ) -> Self { + self.config = ExchangeConfig::new(api_key, secret_key).testnet(self.config.testnet); + if let Some(base_url) = self.config.base_url.clone() { + self.config = self.config.base_url(base_url); + } + self.passphrase = Some(passphrase); + self + } + + /// Set base URL for REST API + pub fn with_base_url(mut self, base_url: String) -> Self { + self.config.base_url = Some(base_url); + self + } + + /// Set WebSocket URL (stored internally, not in config) + pub fn with_ws_url(self, _ws_url: String) -> Self { + // WebSocket URL is handled internally based on testnet setting + self + } + + /// Set WebSocket reconnection interval + pub fn with_ws_reconnect_interval(mut self, interval: Duration) -> Self { + self.ws_reconnect_interval = Some(interval); + self + } + + /// Set WebSocket ping interval for heartbeat + pub fn with_ws_ping_interval(mut self, interval: Duration) -> Self { + self.ws_ping_interval = Some(interval); + self + } + + /// Set maximum number of reconnection attempts + pub fn with_max_reconnect_attempts(mut self, attempts: u32) -> Self { + self.max_reconnect_attempts = Some(attempts); + self + } + + /// Set REST client timeout + pub fn with_rest_timeout(mut self, timeout: u64) -> Self { + self.rest_timeout = timeout; + self + } + + /// Set REST client maximum retries + pub fn with_rest_max_retries(mut self, retries: u32) -> Self { + self.rest_max_retries = retries; + self + } + + /// Build a REST-only OKX connector + pub fn build_rest_only( + self, + ) -> Result, ExchangeError> { + // Determine base URL + let base_url = if self.config.testnet { + "https://www.okx.com".to_string() // OKX doesn't have a separate testnet URL + } else { + self.config + .base_url + .clone() + .unwrap_or_else(|| "https://www.okx.com".to_string()) + }; + + // Build REST client + let rest_config = RestClientConfig::new(base_url, "okx".to_string()) + .with_timeout(self.rest_timeout) + .with_max_retries(self.rest_max_retries); + + let mut rest_builder = RestClientBuilder::new(rest_config); + + // Add authentication if credentials are provided + if self.config.has_credentials() { + let passphrase = self.passphrase.ok_or_else(|| { + ExchangeError::ConfigurationError( + "OKX passphrase is required when using credentials".to_string(), + ) + })?; + + let signer = Arc::new(OkxSigner::new( + self.config.api_key().to_string(), + self.config.secret_key().to_string(), + passphrase, + )); + rest_builder = rest_builder.with_signer(signer); + } + + let rest = rest_builder.build()?; + + Ok(OkxConnector::new_without_ws(rest, self.config)) + } + + /// Build an OKX connector with WebSocket support + pub fn build_with_ws( + self, + ) -> Result< + OkxConnector>, + ExchangeError, + > { + // Determine URLs + let rest_base_url = if self.config.testnet { + "https://www.okx.com".to_string() + } else { + self.config + .base_url + .clone() + .unwrap_or_else(|| "https://www.okx.com".to_string()) + }; + + let ws_url = "wss://ws.okx.com:8443/ws/v5/public".to_string(); + + // Build REST client + let rest_config = RestClientConfig::new(rest_base_url, "okx".to_string()) + .with_timeout(self.rest_timeout) + .with_max_retries(self.rest_max_retries); + + let mut rest_builder = RestClientBuilder::new(rest_config); + + // Add authentication if credentials are provided + if self.config.has_credentials() { + let passphrase = self.passphrase.ok_or_else(|| { + ExchangeError::ConfigurationError( + "OKX passphrase is required when using credentials".to_string(), + ) + })?; + + let signer = Arc::new(OkxSigner::new( + self.config.api_key().to_string(), + self.config.secret_key().to_string(), + passphrase, + )); + rest_builder = rest_builder.with_signer(signer); + } + + let rest = rest_builder.build()?; + + // Build WebSocket client + let codec = OkxCodec::new(); + let ws = TungsteniteWs::new(ws_url, "okx".to_string(), codec); + + Ok(OkxConnector::new_with_ws(rest, ws, self.config)) + } + + /// Build an OKX connector with WebSocket support and reconnection logic + pub fn build_with_reconnection( + self, + ) -> Result< + OkxConnector>, + ExchangeError, + > { + // For now, this is the same as build_with_ws + // Future enhancement will add reconnection logic using the configured parameters + // TODO: Implement reconnection logic in task 3 + self.build_with_ws() + } +} + +// Legacy functions for backward compatibility + +/// Create an OKX connector with REST-only support +/// +/// @deprecated Use `OkxBuilder` instead +pub fn build_connector( + config: ExchangeConfig, +) -> Result, ExchangeError> { + OkxBuilder::new().with_config(config).build_rest_only() +} + +/// Create an OKX connector with WebSocket support +/// +/// @deprecated Use `OkxBuilder` instead +pub fn build_connector_with_websocket( + config: ExchangeConfig, +) -> Result>, ExchangeError> +{ + OkxBuilder::new().with_config(config).build_with_ws() +} + +/// Create an OKX connector with WebSocket support and reconnection +/// +/// @deprecated Use `OkxBuilder` instead +pub fn build_connector_with_reconnection( + config: ExchangeConfig, +) -> Result>, ExchangeError> +{ + OkxBuilder::new() + .with_config(config) + .build_with_reconnection() +} + +// Legacy compatibility functions + +/// Legacy function to create OKX connector (REST only) +/// +/// @deprecated Use `OkxBuilder` instead +pub fn create_okx_connector( + config: ExchangeConfig, +) -> Result, ExchangeError> { + build_connector(config) +} + +/// Legacy function to create OKX REST connector +/// +/// @deprecated Use `OkxBuilder` instead +pub fn create_okx_rest_connector( + config: ExchangeConfig, +) -> Result, ExchangeError> { + build_connector(config) +} + +/// Legacy function to create OKX connector with WebSocket +/// +/// @deprecated Use `OkxBuilder` instead +pub fn create_okx_connector_with_websocket( + config: ExchangeConfig, +) -> Result>, ExchangeError> +{ + build_connector_with_websocket(config) +} + +/// Legacy function to create OKX connector with reconnection +/// +/// @deprecated Use `OkxBuilder` instead +pub fn create_okx_connector_with_reconnection( + config: ExchangeConfig, +) -> Result>, ExchangeError> +{ + build_connector_with_reconnection(config) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::core::config::ExchangeConfig; + + #[test] + fn test_build_okx_connector_without_credentials() { + let config = ExchangeConfig::new(String::new(), String::new()); + let result = build_connector(config); + assert!(result.is_ok()); + } + + #[test] + fn test_build_okx_connector_missing_passphrase() { + let config = ExchangeConfig::new("test_key".to_string(), "test_secret".to_string()); + + let result = build_connector(config); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("passphrase")); + } + + #[test] + fn test_okx_builder_rest_only() { + let builder = OkxBuilder::new() + .with_testnet(false) + .with_rest_timeout(60) + .with_rest_max_retries(5); + + let result = builder.build_rest_only(); + assert!(result.is_ok()); + } + + #[test] + fn test_okx_builder_with_credentials() { + let builder = OkxBuilder::new().with_credentials( + "test_key".to_string(), + "test_secret".to_string(), + "test_passphrase".to_string(), + ); + + let result = builder.build_rest_only(); + assert!(result.is_ok()); + } + + #[test] + fn test_okx_builder_with_ws() { + let builder = OkxBuilder::new() + .with_testnet(false) + .with_ws_reconnect_interval(Duration::from_secs(30)) + .with_ws_ping_interval(Duration::from_secs(15)) + .with_max_reconnect_attempts(5); + + let result = builder.build_with_ws(); + assert!(result.is_ok()); + } +} diff --git a/src/exchanges/okx/codec.rs b/src/exchanges/okx/codec.rs new file mode 100644 index 0000000..7d953ed --- /dev/null +++ b/src/exchanges/okx/codec.rs @@ -0,0 +1,309 @@ +use crate::core::errors::ExchangeError; +use crate::core::kernel::codec::WsCodec; +use crate::core::types::SubscriptionType; +use crate::exchanges::okx::types::{OkxWsChannel, OkxWsRequest}; +use serde_json::Value; +use std::collections::HashMap; +use tokio_tungstenite::tungstenite::Message; + +/// OKX WebSocket message types +#[derive(Debug, Clone)] +pub enum OkxMessage { + /// Subscription confirmation + Subscribe { + channel: String, + inst_id: Option, + }, + /// Market data update + Data { + channel: String, + inst_id: Option, + data: Value, + }, + /// Error message + Error { code: String, message: String }, + /// Pong response + Pong, + /// Login response + Login { success: bool, message: String }, +} + +/// OKX WebSocket codec implementation +pub struct OkxCodec { + /// Channel subscriptions + #[allow(dead_code)] + subscriptions: HashMap, +} + +impl OkxCodec { + pub fn new() -> Self { + Self { + subscriptions: HashMap::new(), + } + } + + /// Create subscription request for OKX WebSocket + fn create_subscription_request( + channels: Vec, + operation: &str, + ) -> Result { + let request = OkxWsRequest { + op: operation.to_string(), + args: channels, + }; + + serde_json::to_string(&request) + .map_err(|e| ExchangeError::SerializationError(e.to_string())) + } + + /// Parse OKX channel name and instrument ID from subscription + fn parse_channel_info(channel: &str) -> (String, Option) { + // OKX channels often have format like "tickers:BTC-USDT" or "books:BTC-USDT" + channel.find(':').map_or_else( + || (channel.to_string(), None), + |pos| { + let channel_name = channel[..pos].to_string(); + let inst_id = Some(channel[pos + 1..].to_string()); + (channel_name, inst_id) + }, + ) + } +} + +impl Default for OkxCodec { + fn default() -> Self { + Self::new() + } +} + +impl WsCodec for OkxCodec { + type Message = OkxMessage; + + fn encode_subscription( + &self, + streams: &[impl AsRef + Send + Sync], + ) -> Result { + let mut channels = Vec::new(); + + for stream in streams { + let stream_str = stream.as_ref(); + let (channel_name, inst_id) = Self::parse_channel_info(stream_str); + + let channel = OkxWsChannel { + channel: channel_name, + inst_type: Some("SPOT".to_string()), + inst_family: None, + inst_id, + }; + + channels.push(channel); + } + + let message_str = Self::create_subscription_request(channels, "subscribe")?; + Ok(Message::Text(message_str)) + } + + fn encode_unsubscription( + &self, + streams: &[impl AsRef + Send + Sync], + ) -> Result { + let mut channels = Vec::new(); + + for stream in streams { + let stream_str = stream.as_ref(); + let (channel_name, inst_id) = Self::parse_channel_info(stream_str); + + let channel = OkxWsChannel { + channel: channel_name, + inst_type: Some("SPOT".to_string()), + inst_family: None, + inst_id, + }; + + channels.push(channel); + } + + let message_str = Self::create_subscription_request(channels, "unsubscribe")?; + Ok(Message::Text(message_str)) + } + + fn decode_message(&self, message: Message) -> Result, ExchangeError> { + let text = match message { + Message::Text(text) => text, + Message::Binary(data) => String::from_utf8(data).map_err(|e| { + ExchangeError::ParseError(format!("Invalid UTF-8 in binary message: {}", e)) + })?, + Message::Pong(_) => return Ok(Some(OkxMessage::Pong)), + _ => return Ok(None), // Ignore other message types + }; + + // Handle simple text messages + if text == "pong" { + return Ok(Some(OkxMessage::Pong)); + } + + // Try to parse as JSON + let value: Value = serde_json::from_str(&text) + .map_err(|e| ExchangeError::ParseError(format!("Failed to parse JSON: {}", e)))?; + + // Handle different message types + if let Some(event) = value.get("event").and_then(|v| v.as_str()) { + match event { + "subscribe" => { + let arg = value + .get("arg") + .and_then(|v| serde_json::from_value::(v.clone()).ok()); + + if let Some(channel_info) = arg { + return Ok(Some(OkxMessage::Subscribe { + channel: channel_info.channel, + inst_id: channel_info.inst_id, + })); + } + } + "error" => { + let code = value + .get("code") + .and_then(|v| v.as_str()) + .unwrap_or("unknown") + .to_string(); + let msg = value + .get("msg") + .and_then(|v| v.as_str()) + .unwrap_or("unknown error") + .to_string(); + + return Ok(Some(OkxMessage::Error { code, message: msg })); + } + "login" => { + let code = value.get("code").and_then(|v| v.as_str()).unwrap_or("1"); + let success = code == "0"; + let msg = value + .get("msg") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + return Ok(Some(OkxMessage::Login { + success, + message: msg, + })); + } + _ => {} + } + } + + // Handle data messages + if let Some(arg) = value.get("arg") { + let channel_info: OkxWsChannel = serde_json::from_value(arg.clone()).map_err(|e| { + ExchangeError::ParseError(format!("Failed to parse channel: {}", e)) + })?; + + let data = value + .get("data") + .ok_or_else(|| ExchangeError::ParseError("Missing data field".to_string()))? + .clone(); + + return Ok(Some(OkxMessage::Data { + channel: channel_info.channel, + inst_id: channel_info.inst_id, + data, + })); + } + + Err(ExchangeError::ParseError(format!( + "Unknown message format: {}", + text + ))) + } +} + +/// Helper function to create OKX WebSocket stream identifiers +pub fn create_okx_stream_identifiers( + symbols: &[String], + subscription_types: &[SubscriptionType], +) -> Vec { + let mut identifiers = Vec::new(); + + for symbol in symbols { + for sub_type in subscription_types { + let channel = match sub_type { + SubscriptionType::Ticker => "tickers", + SubscriptionType::OrderBook { depth: _ } => "books", + SubscriptionType::Trades => "trades", + SubscriptionType::Klines { interval: _ } => "candle1m", + }; + + identifiers.push(format!("{}:{}", channel, symbol)); + } + } + + identifiers +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_encode_subscribe() { + let codec = OkxCodec::new(); + let streams = vec!["tickers:BTC-USDT"]; + + let result = codec.encode_subscription(&streams); + assert!(result.is_ok()); + + if let Message::Text(text) = result.unwrap() { + assert!(text.contains("subscribe")); + assert!(text.contains("tickers")); + assert!(text.contains("BTC-USDT")); + } else { + panic!("Expected text message"); + } + } + + #[test] + fn test_decode_pong() { + let codec = OkxCodec::new(); + let result = codec.decode_message(Message::Text("pong".to_string())); + assert!(result.is_ok()); + + if matches!(result.unwrap(), Some(OkxMessage::Pong)) { + // Test passed + } else { + panic!("Expected pong message"); + } + } + + #[test] + fn test_decode_error() { + let codec = OkxCodec::new(); + let error_msg = r#"{"event":"error","code":"60012","msg":"Invalid request"}"#; + let result = codec.decode_message(Message::Text(error_msg.to_string())); + assert!(result.is_ok()); + + if let Some(OkxMessage::Error { code, message }) = result.unwrap() { + assert_eq!(code, "60012"); + assert_eq!(message, "Invalid request"); + } else { + panic!("Expected error message"); + } + } + + #[test] + fn test_stream_identifiers() { + let symbols = vec!["BTC-USDT".to_string(), "ETH-USDT".to_string()]; + let subscription_types = vec![ + SubscriptionType::Ticker, + SubscriptionType::OrderBook { depth: None }, + ]; + + let identifiers = create_okx_stream_identifiers(&symbols, &subscription_types); + + assert_eq!(identifiers.len(), 4); + assert!(identifiers.contains(&"tickers:BTC-USDT".to_string())); + assert!(identifiers.contains(&"books:BTC-USDT".to_string())); + assert!(identifiers.contains(&"tickers:ETH-USDT".to_string())); + assert!(identifiers.contains(&"books:ETH-USDT".to_string())); + } +} diff --git a/src/exchanges/okx/connector/account.rs b/src/exchanges/okx/connector/account.rs new file mode 100644 index 0000000..39dc8ca --- /dev/null +++ b/src/exchanges/okx/connector/account.rs @@ -0,0 +1,141 @@ +use crate::core::errors::ExchangeError; +use crate::core::kernel::RestClient; +use crate::core::traits::AccountInfo; +use crate::core::types::{Balance, Position, Quantity}; +use crate::exchanges::okx::rest::OkxRest; +use async_trait::async_trait; + +/// OKX account implementation +#[derive(Debug)] +pub struct Account { + rest: OkxRest, +} + +impl Account { + pub fn new(rest: &R) -> Self { + Self { + rest: OkxRest::new(rest.clone()), + } + } +} + +#[async_trait] +impl AccountInfo for Account { + async fn get_account_balance(&self) -> Result, ExchangeError> { + // Get account balance from OKX + let okx_account = self.rest.get_balance(None).await?; + + // Convert OKX balance details to core Balance format + let mut balances = Vec::new(); + for okx_balance in okx_account.details { + // Parse numeric values + let total = okx_balance + .eq + .parse::() + .map_err(|e| ExchangeError::ParseError(format!("Invalid total balance: {}", e)))?; + + let available = okx_balance.avail_bal.parse::().map_err(|e| { + ExchangeError::ParseError(format!("Invalid available balance: {}", e)) + })?; + + let locked = okx_balance + .frozen_bal + .parse::() + .map_err(|e| ExchangeError::ParseError(format!("Invalid locked balance: {}", e)))?; + + // Only include balances that have some value + if total > 0.0 || available > 0.0 || locked > 0.0 { + balances.push(Balance { + asset: okx_balance.ccy, + free: Quantity::from_f64(available), + locked: Quantity::from_f64(locked), + }); + } + } + + Ok(balances) + } + + async fn get_positions(&self) -> Result, ExchangeError> { + // OKX spot trading doesn't have positions in the traditional sense + // For spot trading, positions are essentially zero since we're trading actual assets + // However, we could return positions if there are any margin positions + + // For now, return empty positions as this is spot trading + // In the future, this could be extended to support margin trading positions + Ok(Vec::new()) + } +} + +impl Account { + /// Get account information with specific currency filter + pub async fn get_balance_for_currency( + &self, + currency: &str, + ) -> Result, ExchangeError> { + let okx_account = self.rest.get_balance(Some(currency)).await?; + + // Find the specific currency in the balance details + for okx_balance in okx_account.details { + if okx_balance.ccy.eq_ignore_ascii_case(currency) { + let _total = okx_balance.eq.parse::().map_err(|e| { + ExchangeError::ParseError(format!("Invalid total balance: {}", e)) + })?; + + let available = okx_balance.avail_bal.parse::().map_err(|e| { + ExchangeError::ParseError(format!("Invalid available balance: {}", e)) + })?; + + let locked = okx_balance.frozen_bal.parse::().map_err(|e| { + ExchangeError::ParseError(format!("Invalid locked balance: {}", e)) + })?; + + return Ok(Some(Balance { + asset: okx_balance.ccy, + free: Quantity::from_f64(available), + locked: Quantity::from_f64(locked), + })); + } + } + + Ok(None) + } + + /// Get account summary with USD value + pub async fn get_account_summary(&self) -> Result { + let okx_account = self.rest.get_balance(None).await?; + + let total_equity_usd = okx_account + .total_eq + .parse::() + .map_err(|e| ExchangeError::ParseError(format!("Invalid total equity: {}", e)))?; + + let isolated_equity_usd = okx_account + .iso_eq + .parse::() + .map_err(|e| ExchangeError::ParseError(format!("Invalid isolated equity: {}", e)))?; + + let available_equity_usd = okx_account + .adj_eq + .parse::() + .map_err(|e| ExchangeError::ParseError(format!("Invalid adjusted equity: {}", e)))?; + + Ok(AccountSummary { + total_equity_usd, + isolated_equity_usd, + available_equity_usd, + account_level: okx_account.acct_lv, + position_mode: okx_account.pos_mode, + }) + } +} + +/// Account summary information +#[derive(Debug, Clone)] +pub struct AccountSummary { + pub total_equity_usd: f64, + pub isolated_equity_usd: f64, + pub available_equity_usd: f64, + pub account_level: String, + pub position_mode: String, +} diff --git a/src/exchanges/okx/connector/market_data.rs b/src/exchanges/okx/connector/market_data.rs new file mode 100644 index 0000000..e954d5a --- /dev/null +++ b/src/exchanges/okx/connector/market_data.rs @@ -0,0 +1,114 @@ +use crate::core::errors::ExchangeError; +use crate::core::kernel::RestClient; +use crate::core::traits::MarketDataSource; +use crate::core::types::{ + Kline, KlineInterval, Market, MarketDataType, SubscriptionType, WebSocketConfig, +}; + +use crate::exchanges::okx::{conversions, rest::OkxRest}; +use async_trait::async_trait; +use tokio::sync::mpsc; + +/// OKX market data implementation +#[derive(Debug)] +pub struct MarketData { + rest: OkxRest, + #[allow(dead_code)] + ws: Option, + #[allow(dead_code)] + testnet: bool, +} + +impl MarketData { + pub fn new(rest: &R, ws: Option, testnet: bool) -> Self { + Self { + rest: OkxRest::new(rest.clone()), + ws, + testnet, + } + } +} + +#[async_trait] +impl MarketDataSource for MarketData { + async fn get_markets(&self) -> Result, ExchangeError> { + let okx_markets = self.rest.get_instruments("SPOT").await?; + + let mut markets = Vec::new(); + for okx_market in okx_markets { + // Only include live markets + if okx_market.state == "live" { + match conversions::convert_okx_market(okx_market) { + Ok(market) => markets.push(market), + Err(e) => { + eprintln!("Failed to convert OKX market: {}", e); + } + } + } + } + + Ok(markets) + } + + async fn get_klines( + &self, + symbol: String, + interval: KlineInterval, + limit: Option, + _start_time: Option, + _end_time: Option, + ) -> Result, ExchangeError> { + // Convert KlineInterval to OKX bar format + let bar = match interval { + KlineInterval::Minutes1 => "1m", + KlineInterval::Minutes3 => "3m", + KlineInterval::Minutes5 => "5m", + KlineInterval::Minutes15 => "15m", + KlineInterval::Minutes30 => "30m", + KlineInterval::Hours1 => "1H", + KlineInterval::Hours2 => "2H", + KlineInterval::Hours4 => "4H", + KlineInterval::Hours6 => "6H", + KlineInterval::Hours8 => "8H", + KlineInterval::Hours12 => "12H", + KlineInterval::Days1 => "1D", + KlineInterval::Days3 => "3D", + KlineInterval::Weeks1 => "1W", + KlineInterval::Months1 => "1M", + }; + + let okx_klines = self + .rest + .get_candlesticks(&symbol, Some(bar), limit) + .await?; + + let mut klines = Vec::new(); + for okx_kline in okx_klines { + match conversions::convert_okx_kline(okx_kline, &symbol) { + Ok(kline) => klines.push(kline), + Err(e) => { + eprintln!("Failed to convert OKX kline: {}", e); + } + } + } + + Ok(klines) + } + + async fn subscribe_market_data( + &self, + _symbols: Vec, + _subscription_types: Vec, + _config: Option, + ) -> Result, ExchangeError> { + // For now, return an error if WebSocket is not available + // TODO: Implement WebSocket subscription logic when WsSession is available + Err(ExchangeError::NotSupported( + "WebSocket subscriptions not yet implemented for OKX".to_string(), + )) + } + + fn get_websocket_url(&self) -> String { + "wss://ws.okx.com:8443/ws/v5/public".to_string() + } +} diff --git a/src/exchanges/okx/connector/mod.rs b/src/exchanges/okx/connector/mod.rs new file mode 100644 index 0000000..ec36692 --- /dev/null +++ b/src/exchanges/okx/connector/mod.rs @@ -0,0 +1,108 @@ +use crate::core::errors::ExchangeError; +use crate::core::traits::{AccountInfo, MarketDataSource, OrderPlacer}; +use crate::core::types::{ + Balance, Kline, KlineInterval, Market, MarketDataType, OrderRequest, OrderResponse, Position, + SubscriptionType, WebSocketConfig, +}; +use crate::core::{config::ExchangeConfig, kernel::RestClient, kernel::WsSession}; +use crate::exchanges::okx::codec::OkxCodec; +use async_trait::async_trait; +use tokio::sync::mpsc; + +pub mod account; +pub mod market_data; +pub mod trading; + +pub use account::Account; +pub use market_data::MarketData; +pub use trading::Trading; + +/// OKX connector that composes all sub-trait implementations +#[derive(Debug)] +pub struct OkxConnector { + pub market: MarketData, + pub trading: Trading, + pub account: Account, +} + +impl + Send + Sync> OkxConnector { + /// Create a new OKX connector with WebSocket support + pub fn new_with_ws(rest: R, ws: W, config: ExchangeConfig) -> Self { + Self { + market: MarketData::::new(&rest, Some(ws), config.testnet), + trading: Trading::new(&rest), + account: Account::new(&rest), + } + } +} + +impl OkxConnector { + /// Create a new OKX connector without WebSocket support + pub fn new_without_ws(rest: R, config: ExchangeConfig) -> Self { + Self { + market: MarketData::::new(&rest, None, config.testnet), + trading: Trading::new(&rest), + account: Account::new(&rest), + } + } +} + +/// Implement AccountInfo trait for the OKX connector +#[async_trait] +impl AccountInfo for OkxConnector { + async fn get_account_balance(&self) -> Result, ExchangeError> { + self.account.get_account_balance().await + } + + async fn get_positions(&self) -> Result, ExchangeError> { + self.account.get_positions().await + } +} + +/// Implement MarketDataSource trait for the OKX connector +#[async_trait] +impl MarketDataSource for OkxConnector { + async fn get_markets(&self) -> Result, ExchangeError> { + self.market.get_markets().await + } + + async fn subscribe_market_data( + &self, + symbols: Vec, + subscription_types: Vec, + config: Option, + ) -> Result, ExchangeError> { + self.market + .subscribe_market_data(symbols, subscription_types, config) + .await + } + + fn get_websocket_url(&self) -> String { + self.market.get_websocket_url() + } + + async fn get_klines( + &self, + symbol: String, + interval: KlineInterval, + limit: Option, + start_time: Option, + end_time: Option, + ) -> Result, ExchangeError> { + self.market + .get_klines(symbol, interval, limit, start_time, end_time) + .await + } +} + +/// Implement OrderPlacer trait for the OKX connector +#[async_trait] +impl OrderPlacer for OkxConnector { + async fn place_order(&self, order: OrderRequest) -> Result { + self.trading.place_order(order).await + } + + async fn cancel_order(&self, symbol: String, order_id: String) -> Result<(), ExchangeError> { + self.trading.cancel_order(symbol, order_id).await + } +} diff --git a/src/exchanges/okx/connector/trading.rs b/src/exchanges/okx/connector/trading.rs new file mode 100644 index 0000000..892a715 --- /dev/null +++ b/src/exchanges/okx/connector/trading.rs @@ -0,0 +1,93 @@ +use crate::core::errors::ExchangeError; +use crate::core::kernel::RestClient; +use crate::core::traits::OrderPlacer; +use crate::core::types::{OrderRequest, OrderResponse, OrderSide}; +use crate::exchanges::okx::{conversions, rest::OkxRest, types::OkxOrderRequest}; +use async_trait::async_trait; + +/// OKX trading implementation +#[derive(Debug)] +pub struct Trading { + rest: OkxRest, +} + +impl Trading { + pub fn new(rest: &R) -> Self { + Self { + rest: OkxRest::new(rest.clone()), + } + } +} + +#[async_trait] +impl OrderPlacer for Trading { + async fn place_order(&self, order: OrderRequest) -> Result { + // Convert core order request to OKX format + let inst_id = conversions::convert_symbol_to_okx_inst_id(&order.symbol); + let side = conversions::convert_order_side_to_okx(order.side.clone()); + let ord_type = conversions::convert_order_type_to_okx( + order.order_type.clone(), + order.time_in_force.clone(), + ); + + // Build OKX order request + let mut okx_order = OkxOrderRequest { + inst_id, + td_mode: "cash".to_string(), // For spot trading + side, + ord_type: ord_type.clone(), + sz: order.quantity.to_string(), + px: None, + cl_ord_id: None, + tag: None, + tgt_ccy: None, + ban_amend: None, + }; + + // Set price for limit orders + if let Some(price) = order.price { + if ord_type != "market" { + okx_order.px = Some(price.to_string()); + } + } + + // Set target currency for market orders + if ord_type == "market" { + okx_order.tgt_ccy = match order.side { + OrderSide::Buy => Some("quote_ccy".to_string()), + OrderSide::Sell => Some("base_ccy".to_string()), + }; + } + + // Place the order + let okx_response = self.rest.place_order(&okx_order).await?; + + // Convert response to core format + Ok(OrderResponse { + order_id: okx_response.ord_id, + client_order_id: okx_response.cl_ord_id.unwrap_or_default(), + symbol: order.symbol, + side: order.side, + order_type: order.order_type, + quantity: order.quantity, + price: order.price, + status: if okx_response.s_code == "0" { + "NEW".to_string() + } else { + "REJECTED".to_string() + }, + timestamp: chrono::Utc::now().timestamp_millis(), + }) + } + + async fn cancel_order(&self, symbol: String, order_id: String) -> Result<(), ExchangeError> { + // Cancel the order + let _okx_response = self + .rest + .cancel_order(&symbol, Some(&order_id), None) + .await?; + + // Return success if no error occurred + Ok(()) + } +} diff --git a/src/exchanges/okx/conversions.rs b/src/exchanges/okx/conversions.rs new file mode 100644 index 0000000..9aca796 --- /dev/null +++ b/src/exchanges/okx/conversions.rs @@ -0,0 +1,407 @@ +use crate::core::types::{ + conversion, Kline, Market, OrderBook, OrderBookEntry, OrderSide, OrderType, Price, Symbol, + Ticker, TimeInForce, Trade, +}; +use crate::exchanges::okx::types as okx_types; +use rust_decimal::Decimal; +use serde_json::Value; + +/// Convert OKX market to core market type +pub fn convert_okx_market(okx_market: okx_types::OkxMarket) -> Result { + // Parse symbol from inst_id (e.g., "BTC-USDT") + let symbol = conversion::string_to_symbol(&okx_market.inst_id); + + // Convert tick size and lot size to appropriate types + let _tick_size = conversion::string_to_price(&okx_market.tick_sz); + let _lot_size = conversion::string_to_quantity(&okx_market.lot_sz); + let min_size = conversion::string_to_quantity(&okx_market.min_sz); + + Ok(Market { + symbol, + status: okx_market.state, + base_precision: 8, // OKX doesn't provide precision directly, using default + quote_precision: 8, + min_qty: Some(min_size), + max_qty: None, // OKX doesn't specify max quantity directly + min_price: None, // Not provided by OKX + max_price: None, // OKX doesn't specify max price directly + }) +} + +/// Convert OKX ticker to core ticker type +pub fn convert_okx_ticker(okx_ticker: okx_types::OkxTicker) -> Result { + let symbol = conversion::string_to_symbol(&okx_ticker.inst_id); + let last_price = conversion::string_to_price(&okx_ticker.last); + // Note: bid/ask prices and quantities are not part of the core Ticker struct + + // Parse timestamp + let timestamp = okx_ticker + .ts + .parse::() + .map_err(|e| format!("Failed to parse timestamp: {}", e))?; + + // Calculate 24h change + let open_24h = conversion::string_to_price(&okx_ticker.open_24h); + let price_change_24h = Price::new(last_price.value() - open_24h.value()); + let price_change_percent_24h = if open_24h.value() > Decimal::ZERO { + (price_change_24h.value() / open_24h.value()) * Decimal::from(100) + } else { + Decimal::ZERO + }; + + Ok(Ticker { + symbol, + price: last_price, + price_change: price_change_24h, + price_change_percent: price_change_percent_24h, + high_price: conversion::string_to_price(&okx_ticker.high_24h), + low_price: conversion::string_to_price(&okx_ticker.low_24h), + volume: conversion::string_to_volume(&okx_ticker.vol_24h), + quote_volume: conversion::string_to_volume(&okx_ticker.vol_ccy_24h), + open_time: timestamp.try_into().unwrap_or(i64::MAX), + close_time: timestamp.try_into().unwrap_or(i64::MAX), + count: 0, // Default value + }) +} + +/// Convert OKX order book to core order book type +pub fn convert_okx_order_book( + okx_order_book: okx_types::OkxOrderBook, + symbol: &str, +) -> Result { + let symbol = conversion::string_to_symbol(symbol); + + // Parse timestamp + let timestamp = okx_order_book + .ts + .parse::() + .map_err(|e| format!("Failed to parse timestamp: {}", e))?; + + // Convert bids and asks + let mut bids = Vec::new(); + for bid_array in okx_order_book.bids { + if bid_array.len() >= 2 { + let price = conversion::string_to_price(&bid_array[0]); + let quantity = conversion::string_to_quantity(&bid_array[1]); + bids.push(OrderBookEntry { price, quantity }); + } + } + + let mut asks = Vec::new(); + for ask_array in okx_order_book.asks { + if ask_array.len() >= 2 { + let price = conversion::string_to_price(&ask_array[0]); + let quantity = conversion::string_to_quantity(&ask_array[1]); + asks.push(OrderBookEntry { price, quantity }); + } + } + + Ok(OrderBook { + symbol, + bids, + asks, + last_update_id: timestamp.try_into().unwrap_or(i64::MAX), + }) +} + +/// Convert OKX trade to core trade type +pub fn convert_okx_trade(okx_trade: okx_types::OkxTrade) -> Result { + let symbol = conversion::string_to_symbol(&okx_trade.inst_id); + let price = conversion::string_to_price(&okx_trade.px); + let quantity = conversion::string_to_quantity(&okx_trade.sz); + + // Parse timestamp + let timestamp = okx_trade + .ts + .parse::() + .map_err(|e| format!("Failed to parse timestamp: {}", e))?; + + // Convert side + let side = match okx_trade.side.as_str() { + "buy" => OrderSide::Buy, + "sell" => OrderSide::Sell, + _ => return Err(format!("Unknown trade side: {}", okx_trade.side)), + }; + + Ok(Trade { + symbol, + id: okx_trade.trade_id.parse().unwrap_or(0), + price, + quantity, + time: timestamp.try_into().unwrap_or(i64::MAX), + is_buyer_maker: matches!(side, OrderSide::Sell), + }) +} + +/// Convert OKX kline to core kline type +pub fn convert_okx_kline(okx_kline: okx_types::OkxKline, symbol: &str) -> Result { + let symbol = conversion::string_to_symbol(symbol); + + // Parse timestamp + let timestamp = okx_kline + .ts + .parse::() + .map_err(|e| format!("Failed to parse timestamp: {}", e))?; + + Ok(Kline { + symbol, + open_time: timestamp.try_into().unwrap_or(i64::MAX), + close_time: timestamp.try_into().unwrap_or(i64::MAX), + interval: "1m".to_string(), // Default interval + open_price: conversion::string_to_price(&okx_kline.o), + high_price: conversion::string_to_price(&okx_kline.h), + low_price: conversion::string_to_price(&okx_kline.l), + close_price: conversion::string_to_price(&okx_kline.c), + volume: conversion::string_to_volume(&okx_kline.vol), + number_of_trades: 0, // Default value + final_bar: true, + }) +} + +/// Convert core order side to OKX order side +pub fn convert_order_side_to_okx(side: OrderSide) -> String { + match side { + OrderSide::Buy => "buy".to_string(), + OrderSide::Sell => "sell".to_string(), + } +} + +/// Convert core order type to OKX order type +pub fn convert_order_type_to_okx( + order_type: OrderType, + time_in_force: Option, +) -> String { + match order_type { + OrderType::Market => "market".to_string(), + OrderType::Limit => { + // Handle time-in-force for limit orders + match time_in_force { + Some(TimeInForce::IOC) => "ioc".to_string(), + Some(TimeInForce::FOK) => "fok".to_string(), + _ => "limit".to_string(), + } + } + OrderType::StopLoss + | OrderType::StopLossLimit + | OrderType::TakeProfit + | OrderType::TakeProfitLimit => "conditional".to_string(), + } +} + +/// Convert OKX order state to simplified status +pub fn convert_okx_order_state(state: &str) -> String { + match state { + "live" => "NEW".to_string(), + "partially_filled" => "PARTIALLY_FILLED".to_string(), + "filled" => "FILLED".to_string(), + "canceled" => "CANCELED".to_string(), + _ => state.to_uppercase(), + } +} + +/// Convert symbol to OKX instrument ID format +pub fn convert_symbol_to_okx_inst_id(symbol: &Symbol) -> String { + format!("{}-{}", symbol.base, symbol.quote) +} + +/// Helper function to convert OKX WebSocket ticker message +pub fn convert_okx_ws_ticker(data: &Value, inst_id: &str) -> Result { + // Extract ticker data from WebSocket message + if let Some(ticker_array) = data.as_array().and_then(|arr| arr.first()) { + let ticker_obj = ticker_array + .as_object() + .ok_or("Invalid ticker object structure")?; + + let okx_ticker = okx_types::OkxTicker { + inst_type: "SPOT".to_string(), + inst_id: inst_id.to_string(), + last: ticker_obj + .get("last") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + last_sz: ticker_obj + .get("lastSz") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + ask_px: ticker_obj + .get("askPx") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + ask_sz: ticker_obj + .get("askSz") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + bid_px: ticker_obj + .get("bidPx") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + bid_sz: ticker_obj + .get("bidSz") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + open_24h: ticker_obj + .get("open24h") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + high_24h: ticker_obj + .get("high24h") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + low_24h: ticker_obj + .get("low24h") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + vol_ccy_24h: ticker_obj + .get("volCcy24h") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + vol_24h: ticker_obj + .get("vol24h") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + ts: ticker_obj + .get("ts") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + sod_utc0: ticker_obj + .get("sodUtc0") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + sod_utc8: ticker_obj + .get("sodUtc8") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + }; + + convert_okx_ticker(okx_ticker) + } else { + Err("Invalid ticker data format".to_string()) + } +} + +/// Helper function to convert OKX WebSocket order book message +pub fn convert_okx_ws_order_book(data: &Value, inst_id: &str) -> Result { + // Extract order book data from WebSocket message + if let Some(book_array) = data.as_array().and_then(|arr| arr.first()) { + let book_obj = book_array + .as_object() + .ok_or("Invalid order book object structure")?; + + // Extract bids and asks arrays + let bids_value = book_obj.get("bids").ok_or("Missing bids field")?; + let asks_value = book_obj.get("asks").ok_or("Missing asks field")?; + + let bids = bids_value + .as_array() + .ok_or("Bids is not an array")? + .iter() + .filter_map(|bid| { + bid.as_array().and_then(|arr| { + if arr.len() >= 2 { + Some(vec![ + arr[0].as_str()?.to_string(), + arr[1].as_str()?.to_string(), + ]) + } else { + None + } + }) + }) + .collect(); + + let asks = asks_value + .as_array() + .ok_or("Asks is not an array")? + .iter() + .filter_map(|ask| { + ask.as_array().and_then(|arr| { + if arr.len() >= 2 { + Some(vec![ + arr[0].as_str()?.to_string(), + arr[1].as_str()?.to_string(), + ]) + } else { + None + } + }) + }) + .collect(); + + let ts = book_obj + .get("ts") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(); + + let okx_order_book = okx_types::OkxOrderBook { asks, bids, ts }; + + convert_okx_order_book(okx_order_book, inst_id) + } else { + Err("Invalid order book data format".to_string()) + } +} + +/// Helper function to convert OKX WebSocket trade message +pub fn convert_okx_ws_trade(data: &Value, inst_id: &str) -> Result, String> { + // Extract trade data from WebSocket message + if let Some(trades_array) = data.as_array() { + let mut trades = Vec::new(); + + for trade_value in trades_array { + if let Some(trade_obj) = trade_value.as_object() { + let okx_trade = okx_types::OkxTrade { + inst_id: inst_id.to_string(), + trade_id: trade_obj + .get("tradeId") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + px: trade_obj + .get("px") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + sz: trade_obj + .get("sz") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + side: trade_obj + .get("side") + .and_then(|v| v.as_str()) + .unwrap_or("buy") + .to_string(), + ts: trade_obj + .get("ts") + .and_then(|v| v.as_str()) + .unwrap_or("0") + .to_string(), + count: trade_obj + .get("count") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + }; + + trades.push(convert_okx_trade(okx_trade)?); + } + } + + Ok(trades) + } else { + Err("Invalid trade data format".to_string()) + } +} diff --git a/src/exchanges/okx/mod.rs b/src/exchanges/okx/mod.rs new file mode 100644 index 0000000..ffedb5b --- /dev/null +++ b/src/exchanges/okx/mod.rs @@ -0,0 +1,34 @@ +pub mod codec; +pub mod conversions; +pub mod signer; +pub mod types; + +pub mod builder; +pub mod connector; +pub mod rest; + +// Re-export main components +pub use builder::{ + build_connector, + build_connector_with_reconnection, + build_connector_with_websocket, + // Legacy compatibility exports + create_okx_connector, + create_okx_connector_with_reconnection, + create_okx_connector_with_websocket, + create_okx_rest_connector, +}; +pub use codec::{OkxCodec, OkxMessage}; +pub use connector::{Account, MarketData, OkxConnector, Trading}; +pub use types::{ + OkxAccountInfo, OkxBalance, OkxKline, OkxMarket, OkxOrder, OkxOrderBook, OkxOrderRequest, + OkxOrderResponse, OkxResponse, OkxTicker, OkxTrade, OkxWsChannel, OkxWsRequest, OkxWsResponse, +}; + +/// Helper function to create WebSocket stream identifiers for OKX +pub fn create_okx_stream_identifiers( + symbols: &[String], + subscription_types: &[crate::core::types::SubscriptionType], +) -> Vec { + codec::create_okx_stream_identifiers(symbols, subscription_types) +} diff --git a/src/exchanges/okx/rest.rs b/src/exchanges/okx/rest.rs new file mode 100644 index 0000000..b9d3bad --- /dev/null +++ b/src/exchanges/okx/rest.rs @@ -0,0 +1,361 @@ +use crate::core::errors::ExchangeError; +use crate::core::kernel::RestClient; +use crate::exchanges::okx::types::{ + OkxAccountInfo, OkxKline, OkxMarket, OkxOrder, OkxOrderBook, OkxOrderRequest, OkxOrderResponse, + OkxResponse, OkxTicker, OkxTrade, +}; +use serde::de::DeserializeOwned; +use serde_json::Value; +use std::collections::HashMap; + +/// OKX REST API client implementation +#[derive(Debug)] +pub struct OkxRest { + rest_client: R, +} + +impl OkxRest { + pub fn new(rest_client: R) -> Self { + Self { rest_client } + } + + /// Maps OKX error codes to appropriate `ExchangeError` variants + /// + /// This function provides a comprehensive mapping of OKX error codes to + /// more specific `ExchangeError` variants, making error handling more precise. + fn map_okx_error(&self, code: &str, message: &str) -> ExchangeError { + match code { + // Authentication errors + "50001" => ExchangeError::AuthError(format!("Invalid API key: {}", message)), + "50002" => ExchangeError::AuthError(format!("Invalid signature: {}", message)), + "50003" => ExchangeError::AuthError(format!("Invalid passphrase: {}", message)), + "50004" => ExchangeError::AuthError(format!("Invalid timestamp: {}", message)), + "50005" => ExchangeError::AuthError(format!("API key expired: {}", message)), + + // Rate limit errors + "50006" | "50007" | "50008" => ExchangeError::RateLimitExceeded(format!( + "OKX rate limit exceeded: {} - {}", + code, message + )), + + // Invalid parameter errors + "51000" | "51001" | "51002" | "51003" | "51004" | "51005" => { + ExchangeError::InvalidParameters(format!( + "Invalid parameter: {} - {}", + code, message + )) + } + + // Server errors + "50009" | "50010" | "50011" | "50012" => { + ExchangeError::ServerError(format!("OKX server error: {} - {}", code, message)) + } + + // Order errors + "51006" | "51007" | "51008" => ExchangeError::ApiError { + code: code.parse().unwrap_or(-1), + message: format!("Order error: {} - {}", code, message), + }, + "51009" => { + ExchangeError::InvalidParameters(format!("Insufficient balance: {}", message)) + } + "51010" => { + ExchangeError::InvalidParameters(format!("Order size exceeds limit: {}", message)) + } + "51011" => { + ExchangeError::InvalidParameters(format!("Order price exceeds limit: {}", message)) + } + + // Market errors + "51100" | "51101" | "51102" => ExchangeError::ApiError { + code: code.parse().unwrap_or(-1), + message: format!("Market error: {} - {}", code, message), + }, + "51103" => ExchangeError::ApiError { + code: code.parse().unwrap_or(-1), + message: format!("Market closed: {}", message), + }, + + // Account errors + "51200" | "51201" | "51202" => { + ExchangeError::AuthError(format!("Account error: {} - {}", code, message)) + } + + // Default case - generic API error + _ => ExchangeError::ApiError { + code: code.parse().unwrap_or(-1), + message: message.to_string(), + }, + } + } + + /// Generic handler for OKX API responses + /// + /// This function handles the common pattern of deserializing OKX responses + /// and checking for error codes, with proper error mapping. + fn handle_response(&self, response_value: Value) -> Result + where + T: DeserializeOwned, + { + // Parse the response into OkxResponse structure + let response: OkxResponse = serde_json::from_value(response_value).map_err(|e| { + ExchangeError::DeserializationError(format!("Failed to parse OKX response: {}", e)) + })?; + + // Check if the response contains an error + if response.code != "0" { + return Err(self.map_okx_error(&response.code, &response.msg)); + } + + Ok(response.data) + } + + /// Generic handler for OKX API responses that return a vector where we need the first item + /// + /// This function handles the common pattern of deserializing OKX responses that return + /// a vector where we're only interested in the first item. + fn handle_single_item_response( + &self, + response_value: Value, + error_msg: &str, + ) -> Result + where + T: DeserializeOwned, + { + let items: Vec = self.handle_response(response_value)?; + + items + .into_iter() + .next() + .ok_or_else(|| ExchangeError::InvalidResponseFormat(error_msg.to_string())) + } + + /// Get system time from OKX + pub async fn get_system_time(&self) -> Result { + let response_value = self + .rest_client + .get("/api/v5/public/time", &[], false) + .await?; + + let items: Vec> = self.handle_response(response_value)?; + + let timestamp_str = items + .first() + .and_then(|item| item.get("ts")) + .ok_or_else(|| { + ExchangeError::InvalidResponseFormat("Missing timestamp in response".to_string()) + })?; + + timestamp_str + .parse::() + .map_err(|e| ExchangeError::InvalidResponseFormat(format!("Invalid timestamp: {}", e))) + } + + /// Get trading instruments (markets) + pub async fn get_instruments(&self, inst_type: &str) -> Result, ExchangeError> { + let endpoint = "/api/v5/public/instruments"; + let query_params = &[("instType", inst_type)]; + + let response_value = self.rest_client.get(endpoint, query_params, false).await?; + self.handle_response(response_value) + } + + /// Get ticker information + pub async fn get_ticker(&self, inst_id: &str) -> Result { + let endpoint = "/api/v5/market/ticker"; + let query_params = &[("instId", inst_id)]; + + let response_value = self.rest_client.get(endpoint, query_params, false).await?; + self.handle_single_item_response(response_value, "No ticker data found") + } + + /// Get all tickers + pub async fn get_tickers(&self, inst_type: &str) -> Result, ExchangeError> { + let endpoint = "/api/v5/market/tickers"; + let query_params = &[("instType", inst_type)]; + + let response_value = self.rest_client.get(endpoint, query_params, false).await?; + self.handle_response(response_value) + } + + /// Get order book + pub async fn get_order_book( + &self, + inst_id: &str, + sz: Option, + ) -> Result { + let endpoint = "/api/v5/market/books"; + let sz_str = sz.map(|s| s.to_string()); + let mut query_params = vec![("instId", inst_id)]; + if let Some(ref sz_val) = sz_str { + query_params.push(("sz", sz_val.as_str())); + } + + let response_value = self.rest_client.get(endpoint, &query_params, false).await?; + self.handle_single_item_response(response_value, "No order book data found") + } + + /// Get recent trades + pub async fn get_trades( + &self, + inst_id: &str, + limit: Option, + ) -> Result, ExchangeError> { + let endpoint = "/api/v5/market/trades"; + let limit_str = limit.map(|l| l.to_string()); + let mut query_params = vec![("instId", inst_id)]; + if let Some(ref limit_val) = limit_str { + query_params.push(("limit", limit_val.as_str())); + } + + let response_value = self.rest_client.get(endpoint, &query_params, false).await?; + self.handle_response(response_value) + } + + /// Get candlestick data + pub async fn get_candlesticks( + &self, + inst_id: &str, + bar: Option<&str>, + limit: Option, + ) -> Result, ExchangeError> { + let endpoint = "/api/v5/market/candles"; + let mut query_params = vec![("instId", inst_id)]; + + let bar_str; + if let Some(b) = bar { + bar_str = b.to_string(); + query_params.push(("bar", &bar_str)); + } + + let limit_str; + if let Some(lmt) = limit { + limit_str = lmt.to_string(); + query_params.push(("limit", &limit_str)); + } + + let response_value = self.rest_client.get(endpoint, &query_params, false).await?; + let response: OkxResponse>> = serde_json::from_value(response_value) + .map_err(|e| { + ExchangeError::DeserializationError(format!("Failed to parse response: {}", e)) + })?; + + if response.code != "0" { + return Err(self.map_okx_error(&response.code, &response.msg)); + } + + // Convert array format to OkxKline structs + let klines = response + .data + .into_iter() + .filter_map(|arr| { + if arr.len() >= 8 { + Some(OkxKline { + ts: arr[0].clone(), + o: arr[1].clone(), + h: arr[2].clone(), + l: arr[3].clone(), + c: arr[4].clone(), + vol: arr[5].clone(), + vol_ccy: arr[6].clone(), + vol_ccy_quote: arr[7].clone(), + confirm: arr.get(8).cloned().unwrap_or_default(), + }) + } else { + None + } + }) + .collect(); + + Ok(klines) + } + + // Trading API endpoints (require authentication) + + /// Place a new order + pub async fn place_order( + &self, + order: &OkxOrderRequest, + ) -> Result { + let endpoint = "/api/v5/trade/order"; + let body = serde_json::to_value(order) + .map_err(|e| ExchangeError::SerializationError(e.to_string()))?; + + let response_value = self.rest_client.post(endpoint, &body, true).await?; + self.handle_single_item_response(response_value, "No order response data found") + } + + /// Cancel an order + pub async fn cancel_order( + &self, + inst_id: &str, + ord_id: Option<&str>, + cl_ord_id: Option<&str>, + ) -> Result { + let endpoint = "/api/v5/trade/cancel-order"; + + let mut cancel_req = serde_json::json!({ + "instId": inst_id + }); + + if let Some(id) = ord_id { + cancel_req["ordId"] = serde_json::Value::String(id.to_string()); + } + if let Some(cl_id) = cl_ord_id { + cancel_req["clOrdId"] = serde_json::Value::String(cl_id.to_string()); + } + + let response_value = self.rest_client.post(endpoint, &cancel_req, true).await?; + self.handle_single_item_response(response_value, "No cancel response data found") + } + + /// Get order details + pub async fn get_order( + &self, + inst_id: &str, + ord_id: Option<&str>, + cl_ord_id: Option<&str>, + ) -> Result { + let endpoint = "/api/v5/trade/order"; + let mut query_params = vec![("instId", inst_id)]; + + let ord_id_str; + if let Some(id) = ord_id { + ord_id_str = id.to_string(); + query_params.push(("ordId", &ord_id_str)); + } + + let cl_ord_id_str; + if let Some(cl_id) = cl_ord_id { + cl_ord_id_str = cl_id.to_string(); + query_params.push(("clOrdId", &cl_ord_id_str)); + } + + let response_value = self.rest_client.get(endpoint, &query_params, true).await?; + self.handle_single_item_response(response_value, "No order data found") + } + + /// Get pending orders + pub async fn get_pending_orders( + &self, + inst_type: Option<&str>, + ) -> Result, ExchangeError> { + let endpoint = "/api/v5/trade/orders-pending"; + let query_params = + inst_type.map_or_else(Vec::new, |inst_type| vec![("instType", inst_type)]); + + let response_value = self.rest_client.get(endpoint, &query_params, true).await?; + self.handle_response(response_value) + } + + // Account API endpoints + + /// Get account balance + pub async fn get_balance(&self, ccy: Option<&str>) -> Result { + let endpoint = "/api/v5/account/balance"; + let query_params = ccy.map_or_else(Vec::new, |currency| vec![("ccy", currency)]); + + let response_value = self.rest_client.get(endpoint, &query_params, true).await?; + self.handle_single_item_response(response_value, "No account data found") + } +} diff --git a/src/exchanges/okx/signer.rs b/src/exchanges/okx/signer.rs new file mode 100644 index 0000000..4bd2d3b --- /dev/null +++ b/src/exchanges/okx/signer.rs @@ -0,0 +1,101 @@ +use crate::core::errors::ExchangeError; +use crate::core::kernel::Signer; +use base64::{engine::general_purpose, Engine as _}; +use hmac::{Hmac, Mac}; +use sha2::Sha256; +use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; + +type HmacSha256 = Hmac; + +pub struct OkxSigner { + api_key: String, + secret_key: String, + passphrase: String, +} + +impl OkxSigner { + pub fn new(api_key: String, secret_key: String, passphrase: String) -> Self { + Self { + api_key, + secret_key, + passphrase, + } + } + + /// Generate the signature for OKX API requests + /// The prehash string format is: timestamp + method + requestPath + body + fn generate_signature( + &self, + timestamp: &str, + method: &str, + request_path: &str, + body: &str, + ) -> Result { + let prehash = format!("{}{}{}{}", timestamp, method, request_path, body); + + let mut mac = HmacSha256::new_from_slice(self.secret_key.as_bytes()) + .map_err(|e| ExchangeError::AuthError(format!("Failed to create HMAC: {}", e)))?; + + mac.update(prehash.as_bytes()); + let signature_bytes = mac.finalize().into_bytes(); + + // OKX requires base64 encoding of the signature + Ok(general_purpose::STANDARD.encode(signature_bytes)) + } + + /// Get current timestamp in ISO format as required by OKX + fn get_timestamp() -> Result { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| ExchangeError::AuthError(format!("Failed to get timestamp: {}", e)))? + .as_millis(); + + // OKX requires timestamp in ISO format + let datetime = chrono::DateTime::from_timestamp_millis(timestamp as i64) + .ok_or_else(|| ExchangeError::AuthError("Invalid timestamp".to_string()))?; + + Ok(datetime.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()) + } +} + +impl Signer for OkxSigner { + fn sign_request( + &self, + method: &str, + endpoint: &str, + query_string: &str, + body: &[u8], + _timestamp: u64, // We generate our own timestamp for OKX + ) -> Result<(HashMap, Vec<(String, String)>), ExchangeError> { + // Generate ISO timestamp for OKX + let timestamp = Self::get_timestamp()?; + + // Build request path with query string if present + let request_path = if query_string.is_empty() { + endpoint.to_string() + } else { + format!("{}?{}", endpoint, query_string) + }; + + // Convert body to string + let body_str = std::str::from_utf8(body) + .map_err(|e| ExchangeError::AuthError(format!("Invalid body encoding: {}", e)))?; + + // Generate signature + let signature = self.generate_signature(×tamp, method, &request_path, body_str)?; + + // Prepare headers - OKX requires specific header names + let mut headers = HashMap::new(); + headers.insert("OK-ACCESS-KEY".to_string(), self.api_key.clone()); + headers.insert("OK-ACCESS-SIGN".to_string(), signature); + headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp); + headers.insert("OK-ACCESS-PASSPHRASE".to_string(), self.passphrase.clone()); + headers.insert("Content-Type".to_string(), "application/json".to_string()); + + // No query parameters needed for OKX auth + let query_params = Vec::new(); + + Ok((headers, query_params)) + } +} diff --git a/src/exchanges/okx/types.rs b/src/exchanges/okx/types.rs new file mode 100644 index 0000000..75d1922 --- /dev/null +++ b/src/exchanges/okx/types.rs @@ -0,0 +1,243 @@ +use serde::{Deserialize, Serialize}; + +/// OKX API standard response wrapper +#[derive(Debug, Deserialize, Serialize)] +pub struct OkxResponse { + pub code: String, + pub msg: String, + pub data: T, +} + +/// OKX Market (Instrument) information +#[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OkxMarket { + pub inst_type: String, // SPOT, MARGIN, SWAP, FUTURES, OPTION + pub inst_id: String, // e.g., BTC-USDT + pub uly: Option, // Underlying (for derivatives) + pub base_ccy: String, // Base currency + pub quote_ccy: String, // Quote currency + pub settle_ccy: Option, // Settlement currency + pub ct_val: Option, // Contract value + pub ct_mult: Option, // Contract multiplier + pub ct_val_ccy: Option, // Contract value currency + pub opt_type: Option, // Option type (C/P) + pub stk: Option, // Strike price + pub list_time: Option, // Listing time + pub exp_time: Option, // Expiry time + pub lever: Option, // Max leverage + pub tick_sz: String, // Tick size + pub lot_sz: String, // Lot size + pub min_sz: String, // Minimum order size + pub ct_type: Option, // Contract type + pub alias: Option, // Alias + pub state: String, // State: live, suspend, preopen, test +} + +/// OKX Order request +#[derive(Debug, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OkxOrderRequest { + pub inst_id: String, // Instrument ID + pub td_mode: String, // Trade mode: cash, cross, isolated + pub side: String, // Order side: buy, sell + pub ord_type: String, // Order type: market, limit, post_only, fok, ioc + pub sz: String, // Quantity to buy or sell + #[serde(skip_serializing_if = "Option::is_none")] + pub px: Option, // Order price + #[serde(skip_serializing_if = "Option::is_none")] + pub cl_ord_id: Option, // Client order ID + #[serde(skip_serializing_if = "Option::is_none")] + pub tag: Option, // Order tag + #[serde(skip_serializing_if = "Option::is_none")] + pub tgt_ccy: Option, // Target currency: base_ccy, quote_ccy + #[serde(skip_serializing_if = "Option::is_none")] + pub ban_amend: Option, // Disallow amend +} + +/// OKX Order response +#[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OkxOrderResponse { + pub ord_id: String, // Order ID + pub cl_ord_id: Option, // Client order ID + pub tag: Option, // Order tag + pub s_code: String, // Success code + pub s_msg: String, // Success message +} + +/// OKX Order details +#[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OkxOrder { + pub inst_type: String, // Instrument type + pub inst_id: String, // Instrument ID + pub ord_id: String, // Order ID + pub cl_ord_id: Option, // Client order ID + pub tag: Option, // Order tag + pub px: String, // Price + pub sz: String, // Size + pub ord_type: String, // Order type + pub side: String, // Order side + pub pos_side: Option, // Position side + pub td_mode: String, // Trade mode + pub acc_fill_sz: String, // Accumulated fill size + pub fill_px: String, // Fill price + pub trade_id: String, // Trade ID + pub fill_sz: String, // Fill size + pub fill_time: String, // Fill time + pub avg_px: String, // Average price + pub state: String, // Order state + pub lever: Option, // Leverage + pub fee_ccy: String, // Fee currency + pub fee: String, // Fee + pub rebate_ccy: String, // Rebate currency + pub rebate: String, // Rebate + pub tgt_ccy: Option, // Target currency + pub category: String, // Category + pub u_time: String, // Update time + pub c_time: String, // Creation time +} + +/// OKX Account balance +#[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OkxBalance { + pub ccy: String, // Currency + pub eq: String, // Equity + pub cash_bal: String, // Cash balance + pub upl: String, // Unrealized P&L + pub avail_eq: String, // Available equity + pub dis_eq: String, // Discounted equity + pub avail_bal: String, // Available balance + pub frozen_bal: String, // Frozen balance + pub ord_frozen: String, // Margin frozen for open orders + pub liab: String, // Liabilities + pub upl_liab: String, // Unrealized P&L of liabilities + pub cross_liab: String, // Cross liabilities + pub iso_liab: String, // Isolated liabilities + pub mgn_ratio: String, // Margin ratio + pub interest: String, // Interest + pub twap: String, // TWAP + pub max_loan: String, // Max loan + pub eq_usd: String, // Equity in USD + pub notional_lever: String, // Notional leverage + pub stgy_eq: String, // Strategy equity + pub iso_upl: String, // Isolated unrealized P&L +} + +/// OKX Account information +#[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OkxAccountInfo { + pub uid: String, // User ID + pub acct_lv: String, // Account level + pub pos_mode: String, // Position mode + pub auto_loan: bool, // Auto loan enabled + pub greeks_type: String, // Greeks type + pub level: String, // Level + pub level_tmp: String, // Temporary level + pub mgn_iso: String, // Isolated margin + pub total_eq: String, // Total equity + pub iso_eq: String, // Isolated equity + pub adj_eq: String, // Adjusted equity + pub ord_froz: String, // Order frozen + pub imr: String, // Initial margin requirement + pub mmr: String, // Maintenance margin requirement + pub notional_usd: String, // Notional in USD + pub upl: String, // Unrealized P&L + pub details: Vec, // Balance details +} + +/// OKX Ticker data +#[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OkxTicker { + pub inst_type: String, // Instrument type + pub inst_id: String, // Instrument ID + pub last: String, // Last traded price + pub last_sz: String, // Last traded size + pub ask_px: String, // Best ask price + pub ask_sz: String, // Best ask size + pub bid_px: String, // Best bid price + pub bid_sz: String, // Best bid size + pub open_24h: String, // 24h opening price + pub high_24h: String, // 24h highest price + pub low_24h: String, // 24h lowest price + pub vol_ccy_24h: String, // 24h volume in quote currency + pub vol_24h: String, // 24h volume in base currency + pub ts: String, // Timestamp + pub sod_utc0: String, // Start of day UTC+0 + pub sod_utc8: String, // Start of day UTC+8 +} + +/// OKX Order book data +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct OkxOrderBook { + pub asks: Vec>, // Ask orders [price, size, liquidated_orders, order_count] + pub bids: Vec>, // Bid orders [price, size, liquidated_orders, order_count] + pub ts: String, // Timestamp +} + +/// OKX Trade data +#[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OkxTrade { + pub inst_id: String, // Instrument ID + pub trade_id: String, // Trade ID + pub px: String, // Price + pub sz: String, // Size + pub side: String, // Side + pub ts: String, // Timestamp + pub count: Option, // Trade count (for aggregated trades) +} + +/// OKX Candlestick data +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct OkxKline { + pub ts: String, // Timestamp + pub o: String, // Open price + pub h: String, // High price + pub l: String, // Low price + pub c: String, // Close price + pub vol: String, // Volume in base currency + pub vol_ccy: String, // Volume in quote currency + pub vol_ccy_quote: String, // Volume in quote currency + pub confirm: String, // Confirmation status +} + +/// OKX WebSocket subscription request +#[derive(Debug, Serialize, Clone)] +pub struct OkxWsRequest { + pub op: String, // Operation: subscribe, unsubscribe, login + pub args: Vec, // Channel arguments +} + +/// OKX WebSocket channel +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OkxWsChannel { + pub channel: String, // Channel name + pub inst_type: Option, // Instrument type + pub inst_family: Option, // Instrument family + pub inst_id: Option, // Instrument ID +} + +/// OKX WebSocket response +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct OkxWsResponse { + pub arg: Option, // Channel info + pub data: Option, // Data payload + pub action: Option, // Action type + pub code: Option, // Response code + pub msg: Option, // Response message + pub event: Option, // Event type +} + +/// OKX Error response +#[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OkxError { + pub s_code: String, // Error code + pub s_msg: String, // Error message +}