diff --git a/migrations/20260324000000_create_issued_swap_calldata.sql b/migrations/20260324000000_create_issued_swap_calldata.sql new file mode 100644 index 0000000..d957cf9 --- /dev/null +++ b/migrations/20260324000000_create_issued_swap_calldata.sql @@ -0,0 +1,28 @@ +CREATE TABLE IF NOT EXISTS issued_swap_calldata ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + api_key_id INTEGER NOT NULL, + key_id TEXT NOT NULL, + label TEXT NOT NULL, + owner TEXT NOT NULL, + chain_id INTEGER NOT NULL, + taker TEXT NOT NULL, + to_address TEXT NOT NULL, + tx_value TEXT NOT NULL, + calldata TEXT NOT NULL, + calldata_hash TEXT NOT NULL, + input_token TEXT NOT NULL, + output_token TEXT NOT NULL, + output_amount TEXT NOT NULL, + maximum_io_ratio TEXT NOT NULL, + estimated_input TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')) +); + +CREATE INDEX idx_issued_swap_calldata_api_key_id_created + ON issued_swap_calldata (api_key_id, created_at); +CREATE INDEX idx_issued_swap_calldata_chain_id_created + ON issued_swap_calldata (chain_id, created_at); +CREATE INDEX idx_issued_swap_calldata_calldata_hash_created + ON issued_swap_calldata (calldata_hash, created_at); +CREATE INDEX idx_issued_swap_calldata_taker_created + ON issued_swap_calldata (taker, created_at); diff --git a/src/cli.rs b/src/cli.rs index ad327f4..ad17bb3 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -27,6 +27,13 @@ pub enum Command { #[command(subcommand)] command: KeysCommand, }, + #[command(about = "Run internal reports")] + Report { + #[arg(long)] + config: PathBuf, + #[command(subcommand)] + command: ReportCommand, + }, } #[derive(Subcommand)] @@ -48,12 +55,26 @@ pub enum KeysCommand { Delete { key_id: String }, } +#[derive(Subcommand)] +pub enum ReportCommand { + #[command(about = "Report executed customer swap volume")] + CustomerVolume { + #[arg(long)] + start_time: u64, + #[arg(long)] + end_time: u64, + #[arg(long, default_value_t = false)] + json: bool, + }, +} + pub fn print_usage() { println!("Usage: st0x_rest_api "); println!(); println!("Commands:"); println!(" serve Start the API server"); println!(" keys Manage API keys"); + println!(" report Run internal reports"); println!(); println!("Run 'st0x_rest_api --help' for more information on a command."); } @@ -74,6 +95,32 @@ pub async fn handle_keys_command( } } +pub async fn handle_report_command( + command: ReportCommand, + pool: DbPool, + raindex: &crate::raindex::RaindexProvider, +) -> Result<(), Box> { + match command { + ReportCommand::CustomerVolume { + start_time, + end_time, + json, + } => { + crate::reporting::customer_volume::run( + &pool, + raindex, + crate::reporting::customer_volume::CustomerVolumeReportArgs { + start_time, + end_time, + json, + }, + ) + .await?; + Ok(()) + } + } +} + async fn create_key( pool: &DbPool, label: &str, @@ -237,6 +284,40 @@ mod tests { } } + #[test] + fn test_report_parses_config_flag() { + let cli = Cli::try_parse_from([ + "app", + "report", + "--config", + "/path/to/config.toml", + "customer-volume", + "--start-time", + "10", + "--end-time", + "20", + ]) + .expect("parse"); + + match cli.command { + Some(Command::Report { + config, + command: + ReportCommand::CustomerVolume { + start_time, + end_time, + json, + }, + }) => { + assert_eq!(config, PathBuf::from("/path/to/config.toml")); + assert_eq!(start_time, 10); + assert_eq!(end_time, 20); + assert!(!json); + } + _ => panic!("expected Report command"), + } + } + #[tokio::test] async fn test_create_key_inserts_row() { let pool = test_pool().await; diff --git a/src/db/issued_swap_calldata.rs b/src/db/issued_swap_calldata.rs new file mode 100644 index 0000000..4c00c1c --- /dev/null +++ b/src/db/issued_swap_calldata.rs @@ -0,0 +1,63 @@ +use crate::db::DbPool; + +pub(crate) struct NewIssuedSwapCalldata { + pub api_key_id: i64, + pub key_id: String, + pub label: String, + pub owner: String, + pub chain_id: i64, + pub taker: String, + pub to_address: String, + pub tx_value: String, + pub calldata: String, + pub calldata_hash: String, + pub input_token: String, + pub output_token: String, + pub output_amount: String, + pub maximum_io_ratio: String, + pub estimated_input: String, +} + +pub(crate) async fn insert( + pool: &DbPool, + record: &NewIssuedSwapCalldata, +) -> Result<(), sqlx::Error> { + sqlx::query( + "INSERT INTO issued_swap_calldata ( + api_key_id, + key_id, + label, + owner, + chain_id, + taker, + to_address, + tx_value, + calldata, + calldata_hash, + input_token, + output_token, + output_amount, + maximum_io_ratio, + estimated_input + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(record.api_key_id) + .bind(&record.key_id) + .bind(&record.label) + .bind(&record.owner) + .bind(record.chain_id) + .bind(&record.taker) + .bind(&record.to_address) + .bind(&record.tx_value) + .bind(&record.calldata) + .bind(&record.calldata_hash) + .bind(&record.input_token) + .bind(&record.output_token) + .bind(&record.output_amount) + .bind(&record.maximum_io_ratio) + .bind(&record.estimated_input) + .execute(pool) + .await?; + + Ok(()) +} diff --git a/src/db/mod.rs b/src/db/mod.rs index 5bbc962..f88e245 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod issued_swap_calldata; mod migrate; mod pool; pub(crate) mod settings; diff --git a/src/main.rs b/src/main.rs index 440ab10..96980b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ mod db; mod error; mod fairings; mod raindex; +mod reporting; mod routes; mod telemetry; mod types; @@ -110,6 +111,47 @@ fn configure_cors() -> Result { .to_cors()?) } +async fn load_raindex_provider( + cfg: &config::Config, + pool: &db::DbPool, +) -> Result { + let db_url = db::settings::get_setting(pool, "registry_url") + .await + .map_err(|e| format!("failed to read registry_url from database: {e}"))? + .filter(|url| !url.is_empty()); + + let registry_url = match db_url { + Some(url) => { + tracing::info!(registry_url = %url, "loaded registry_url from database"); + url + } + None if !cfg.registry_url.is_empty() => { + if let Err(e) = db::settings::set_setting(pool, "registry_url", &cfg.registry_url).await + { + tracing::warn!(error = %e, "failed to seed registry_url into database"); + } + cfg.registry_url.clone() + } + None => { + return Err("registry_url not found in database and not set in config file".into()); + } + }; + + let local_db_path = std::path::PathBuf::from(&cfg.local_db_path); + if let Some(parent) = local_db_path.parent() { + std::fs::create_dir_all(parent).map_err(|e| { + format!( + "failed to create local db directory {}: {e}", + parent.display() + ) + })?; + } + + raindex::RaindexProvider::load(®istry_url, Some(local_db_path)) + .await + .map_err(|e| format!("failed to load raindex registry: {e}")) +} + pub(crate) fn rocket( pool: db::DbPool, rate_limiter: fairings::RateLimiter, @@ -159,7 +201,9 @@ async fn main() { }; let config_path = match &command { - cli::Command::Serve { config } | cli::Command::Keys { config, .. } => config.clone(), + cli::Command::Serve { config } + | cli::Command::Keys { config, .. } + | cli::Command::Report { config, .. } => config.clone(), }; let cfg = match config::Config::load(&config_path) { @@ -195,56 +239,13 @@ async fn main() { match command { cli::Command::Serve { .. } => { - let db_url = db::settings::get_setting(&pool, "registry_url") - .await - .ok() - .flatten(); - - let registry_url = match db_url { - Some(url) if !url.is_empty() => { - tracing::info!(registry_url = %url, "loaded registry_url from database"); - url - } - _ if !cfg.registry_url.is_empty() => { - if let Err(e) = - db::settings::set_setting(&pool, "registry_url", &cfg.registry_url).await - { - tracing::warn!(error = %e, "failed to seed registry_url into database"); - } - cfg.registry_url - } - _ => { - tracing::error!( - "registry_url not found in database and not set in config file" - ); - drop(log_guard); - std::process::exit(1); - } - }; - - let local_db_path = std::path::PathBuf::from(&cfg.local_db_path); - if let Some(parent) = local_db_path.parent() { - if !parent.exists() { - if let Err(e) = std::fs::create_dir_all(parent) { - tracing::error!(error = %e, path = %parent.display(), "failed to create local db directory"); - drop(log_guard); - std::process::exit(1); - } - } - } - - let raindex_config = match raindex::RaindexProvider::load( - ®istry_url, - Some(local_db_path), - ) - .await - { + let raindex_config = match load_raindex_provider(&cfg, &pool).await { Ok(config) => { - tracing::info!(registry_url = %registry_url, "raindex registry loaded"); + tracing::info!("raindex registry loaded"); config } Err(e) => { - tracing::error!(error = %e, registry_url = %registry_url, "failed to load raindex registry"); + tracing::error!(error = %e, "failed to load raindex registry"); drop(log_guard); std::process::exit(1); } @@ -283,6 +284,25 @@ async fn main() { std::process::exit(1); } } + cli::Command::Report { command, .. } => { + let raindex_config = match load_raindex_provider(&cfg, &pool).await { + Ok(config) => { + tracing::info!("raindex registry loaded for report"); + config + } + Err(e) => { + tracing::error!(error = %e, "failed to load raindex registry"); + drop(log_guard); + std::process::exit(1); + } + }; + + if let Err(e) = cli::handle_report_command(command, pool, &raindex_config).await { + tracing::error!(error = %e, "report command failed"); + drop(log_guard); + std::process::exit(1); + } + } } drop(log_guard); diff --git a/src/reporting/customer_volume.rs b/src/reporting/customer_volume.rs new file mode 100644 index 0000000..21e77cf --- /dev/null +++ b/src/reporting/customer_volume.rs @@ -0,0 +1,1974 @@ +use crate::db::DbPool; +use crate::raindex::RaindexProvider; +use futures::stream::{self, StreamExt}; +use rain_math_float::Float; +use rain_orderbook_common::raindex_client::RaindexError; +use reqwest::Client as HttpClient; +use reqwest::Url; +use serde::{Deserialize, Serialize}; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; +use sqlx::{FromRow, QueryBuilder}; +use std::collections::{BTreeMap, HashMap}; +use std::ops::Add; +use std::time::Duration; + +const MATCH_WINDOW_SECS: i64 = 2 * 60; +const RPC_LOOKUP_CONCURRENCY: usize = 8; +const RPC_LOOKUP_TIMEOUT_SECS: u64 = 5; +const RPC_LOOKUP_MAX_ATTEMPTS: u32 = 3; +const RPC_LOOKUP_RETRY_BASE_DELAY_MS: u64 = 200; + +#[derive(Debug, Clone, Copy)] +pub(crate) struct CustomerVolumeReportArgs { + pub start_time: u64, + pub end_time: u64, + pub json: bool, +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum CustomerVolumeReportError { + #[error("start_time must be less than or equal to end_time")] + InvalidWindow, + #[error("raindex local db path is unavailable")] + MissingLocalDbPath, + #[error("raindex local db file not found: {path}")] + MissingLocalDb { path: String }, + #[error("no RPC URLs configured for chain {chain_id}")] + MissingRpcUrls { chain_id: u32 }, + #[error("database error: {0}")] + Database(#[from] sqlx::Error), + #[error("http error: {0}")] + Http(#[from] reqwest::Error), + #[error("json error: {0}")] + Json(#[from] serde_json::Error), + #[error("float error: {0}")] + Float(#[from] rain_math_float::FloatError), + #[error("raindex client error: {0}")] + Raindex(#[from] RaindexError), + #[error("failed to fetch transaction input for {tx_hash}: {message}")] + RpcLookup { tx_hash: String, message: String }, + #[error("transaction {tx_hash} was not found via RPC")] + TransactionNotFound { tx_hash: String }, + #[error("inconsistent candidate transaction data for {tx_hash}")] + InconsistentCandidate { tx_hash: String }, +} + +#[derive(Debug, Clone, FromRow)] +struct IssuedSwapRow { + id: i64, + api_key_id: i64, + key_id: String, + label: String, + owner: String, + taker: String, + to_address: String, + calldata: String, + input_token: String, + output_token: String, + created_at_unix: i64, +} + +#[derive(Debug, Clone, FromRow)] +struct TakeOrderRow { + orderbook_address: String, + transaction_hash: String, + block_number: i64, + block_timestamp: i64, + sender: String, + taker_input: String, + taker_output: String, +} + +#[derive(Debug, Clone)] +struct CandidateExecution { + tx_hash: String, + orderbook_address: String, + sender: String, + block_number: u64, + timestamp: u64, + total_input_hex: String, + total_output_hex: String, +} + +#[derive(Debug, Clone)] +struct MatchedExecution { + api_key_id: i64, + key_id: String, + label: String, + owner: String, + tx_hash: String, + sender: String, + to_address: String, + block_number: u64, + timestamp: u64, + input_token: String, + output_token: String, + total_input_hex: String, + total_output_hex: String, + duplicate_match_count: usize, +} + +#[derive(Debug, Clone, Serialize)] +struct ReportAudit { + matched_executions: usize, + unmatched_executions: usize, + ambiguous_matches: usize, + rpc_lookup_failures: usize, +} + +#[derive(Debug, Clone, Serialize)] +struct PairVolume { + input_token: String, + output_token: String, + total_input_amount: String, + total_output_amount: String, +} + +#[derive(Debug, Clone, Serialize)] +struct CustomerVolumeSummary { + api_key_id: i64, + key_id: String, + label: String, + owner: String, + executed_txs: usize, + pairs: Vec, +} + +#[derive(Debug, Clone, Serialize)] +struct MatchedTransactionReport { + tx_hash: String, + api_key_id: i64, + key_id: String, + label: String, + owner: String, + sender: String, + to_address: String, + block_number: u64, + timestamp: u64, + input_token: String, + output_token: String, + total_input_amount: String, + total_output_amount: String, + duplicate_match_count: usize, +} + +#[derive(Debug, Clone, Serialize)] +struct UnmatchedTransactionReport { + tx_hash: String, + sender: String, + to_address: String, + block_number: u64, + timestamp: u64, + total_input_amount: String, + total_output_amount: String, +} + +#[derive(Debug, Clone, Serialize)] +struct AmbiguousMatchCandidateReport { + api_key_id: i64, + key_id: String, + label: String, + owner: String, +} + +#[derive(Debug, Clone, Serialize)] +struct AmbiguousTransactionReport { + tx_hash: String, + sender: String, + to_address: String, + block_number: u64, + timestamp: u64, + total_input_amount: String, + total_output_amount: String, + candidates: Vec, +} + +#[derive(Debug, Clone, Serialize)] +struct RpcLookupFailureReport { + tx_hash: String, + sender: String, + to_address: String, + block_number: u64, + timestamp: u64, + error: String, +} + +impl RpcLookupFailureReport { + fn from_error(candidate: &CandidateExecution, error: CustomerVolumeReportError) -> Self { + let (tx_hash, error) = match error { + CustomerVolumeReportError::RpcLookup { tx_hash, message } => (tx_hash, message), + CustomerVolumeReportError::TransactionNotFound { tx_hash } => { + (tx_hash, "transaction not found via RPC".to_string()) + } + other => (candidate.tx_hash.clone(), other.to_string()), + }; + + Self { + tx_hash, + sender: candidate.sender.clone(), + to_address: candidate.orderbook_address.clone(), + block_number: candidate.block_number, + timestamp: candidate.timestamp, + error, + } + } +} + +#[derive(Debug, Clone, Serialize)] +struct CustomerVolumeReport { + start_time: u64, + end_time: u64, + audit: ReportAudit, + customers: Vec, + matched_transactions: Vec, + unmatched_transactions: Vec, + ambiguous_transactions: Vec, + rpc_lookup_failures: Vec, +} + +type IssuedRowIndex<'a> = HashMap<(String, String, String), Vec<&'a IssuedSwapRow>>; + +#[derive(Debug, Deserialize)] +struct JsonRpcEnvelope { + result: Option, + error: Option, +} + +#[derive(Debug, Deserialize)] +struct JsonRpcError { + message: String, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct RpcTransactionResponse { + input: String, +} + +pub(crate) async fn run( + pool: &DbPool, + raindex: &RaindexProvider, + args: CustomerVolumeReportArgs, +) -> Result<(), CustomerVolumeReportError> { + if args.start_time > args.end_time { + return Err(CustomerVolumeReportError::InvalidWindow); + } + + tracing::info!( + start_time = args.start_time, + end_time = args.end_time, + "customer volume report requested" + ); + + let report = build_report(pool, raindex, args.start_time, args.end_time).await?; + + if args.json { + println!("{}", serde_json::to_string_pretty(&report)?); + } else { + print_human_report(&report); + } + + tracing::info!( + matched = report.audit.matched_executions, + unmatched = report.audit.unmatched_executions, + ambiguous = report.audit.ambiguous_matches, + rpc_lookup_failures = report.audit.rpc_lookup_failures, + "customer volume report complete" + ); + + Ok(()) +} + +async fn build_report( + pool: &DbPool, + raindex: &RaindexProvider, + start_time: u64, + end_time: u64, +) -> Result { + let issued_rows = load_issued_rows(pool, start_time, end_time).await?; + let issued_row_index = build_issued_row_index(&issued_rows); + + tracing::info!( + issued_count = issued_rows.len(), + "loaded issued swap rows for customer volume report" + ); + + let raindex_db = open_raindex_db(raindex).await?; + let candidate_rows = load_take_order_rows(&raindex_db, start_time, end_time).await?; + let candidates = aggregate_candidate_rows(candidate_rows)?; + let rpc_urls = rpc_urls_for_chain(raindex)?; + let rpc = HttpClient::builder() + .timeout(Duration::from_secs(RPC_LOOKUP_TIMEOUT_SECS)) + .build() + .map_err(CustomerVolumeReportError::Http)?; + + tracing::info!( + candidate_count = candidates.len(), + rpc_url_count = rpc_urls.len(), + "loaded candidate executions from raindex db" + ); + + let mut matched = Vec::new(); + let mut unmatched = Vec::new(); + let mut ambiguous_matches = 0usize; + let mut ambiguous_transactions = Vec::new(); + let mut rpc_lookup_failures = Vec::new(); + + let mut lookup_results = stream::iter(candidates.into_iter().map(|candidate| { + let rpc = rpc.clone(); + let rpc_urls = rpc_urls.clone(); + + async move { + let tx_input = fetch_transaction_input(&rpc, &rpc_urls, &candidate.tx_hash).await; + (candidate, tx_input) + } + })) + .buffered(RPC_LOOKUP_CONCURRENCY); + + while let Some((candidate, tx_input)) = lookup_results.next().await { + let tx_input = match tx_input { + Ok(tx_input) => tx_input, + Err(error @ CustomerVolumeReportError::RpcLookup { .. }) + | Err(error @ CustomerVolumeReportError::TransactionNotFound { .. }) => { + let failure = RpcLookupFailureReport::from_error(&candidate, error); + tracing::warn!( + tx_hash = %failure.tx_hash, + sender = %failure.sender, + to = %failure.to_address, + error = %failure.error, + "skipping candidate execution after RPC lookup failure" + ); + rpc_lookup_failures.push(failure); + continue; + } + Err(error) => return Err(error), + }; + let selection = select_issued_row(&candidate, &tx_input, &issued_row_index); + + if selection.distinct_match_count > 1 { + ambiguous_matches += 1; + ambiguous_transactions.push(AmbiguousTransactionReport { + tx_hash: candidate.tx_hash.clone(), + sender: candidate.sender.clone(), + to_address: candidate.orderbook_address.clone(), + block_number: candidate.block_number, + timestamp: candidate.timestamp, + total_input_amount: format_hex(&candidate.total_input_hex)?, + total_output_amount: format_hex(&candidate.total_output_hex)?, + candidates: selection + .distinct_matches + .iter() + .map(|row| AmbiguousMatchCandidateReport { + api_key_id: row.api_key_id, + key_id: row.key_id.clone(), + label: row.label.clone(), + owner: row.owner.clone(), + }) + .collect(), + }); + continue; + } + + match selection.row { + Some(row) => matched.push(MatchedExecution { + api_key_id: row.api_key_id, + key_id: row.key_id.clone(), + label: row.label.clone(), + owner: row.owner.clone(), + tx_hash: candidate.tx_hash.clone(), + sender: candidate.sender.clone(), + to_address: candidate.orderbook_address.clone(), + block_number: candidate.block_number, + timestamp: candidate.timestamp, + input_token: row.input_token.clone(), + output_token: row.output_token.clone(), + total_input_hex: candidate.total_input_hex.clone(), + total_output_hex: candidate.total_output_hex.clone(), + duplicate_match_count: selection.matching_row_count, + }), + None => unmatched.push(UnmatchedTransactionReport { + tx_hash: candidate.tx_hash.clone(), + sender: candidate.sender.clone(), + to_address: candidate.orderbook_address.clone(), + block_number: candidate.block_number, + timestamp: candidate.timestamp, + total_input_amount: format_hex(&candidate.total_input_hex)?, + total_output_amount: format_hex(&candidate.total_output_hex)?, + }), + } + } + + let customers = build_customer_summaries(&matched)?; + let matched_transactions = matched + .iter() + .map(|execution| { + Ok(MatchedTransactionReport { + tx_hash: execution.tx_hash.clone(), + api_key_id: execution.api_key_id, + key_id: execution.key_id.clone(), + label: execution.label.clone(), + owner: execution.owner.clone(), + sender: execution.sender.clone(), + to_address: execution.to_address.clone(), + block_number: execution.block_number, + timestamp: execution.timestamp, + input_token: execution.input_token.clone(), + output_token: execution.output_token.clone(), + total_input_amount: format_hex(&execution.total_input_hex)?, + total_output_amount: format_hex(&execution.total_output_hex)?, + duplicate_match_count: execution.duplicate_match_count, + }) + }) + .collect::, CustomerVolumeReportError>>()?; + + Ok(CustomerVolumeReport { + start_time, + end_time, + audit: ReportAudit { + matched_executions: matched_transactions.len(), + unmatched_executions: unmatched.len(), + ambiguous_matches, + rpc_lookup_failures: rpc_lookup_failures.len(), + }, + customers, + matched_transactions, + unmatched_transactions: unmatched, + ambiguous_transactions, + rpc_lookup_failures, + }) +} + +fn rpc_urls_for_chain(raindex: &RaindexProvider) -> Result, CustomerVolumeReportError> { + let network = raindex.client().get_network_by_chain_id(crate::CHAIN_ID)?; + if network.rpcs.is_empty() { + return Err(CustomerVolumeReportError::MissingRpcUrls { + chain_id: crate::CHAIN_ID, + }); + } + Ok(network.rpcs) +} + +async fn open_raindex_db( + raindex: &RaindexProvider, +) -> Result, CustomerVolumeReportError> { + let path = raindex + .db_path() + .ok_or(CustomerVolumeReportError::MissingLocalDbPath)?; + + if !path.exists() { + return Err(CustomerVolumeReportError::MissingLocalDb { + path: path.display().to_string(), + }); + } + + let options = SqliteConnectOptions::new().filename(path).read_only(true); + + SqlitePoolOptions::new() + .max_connections(1) + .connect_with(options) + .await + .map_err(CustomerVolumeReportError::Database) +} + +async fn load_issued_rows( + pool: &DbPool, + start_time: u64, + end_time: u64, +) -> Result, CustomerVolumeReportError> { + let min_issue_time = start_time.saturating_sub(MATCH_WINDOW_SECS as u64) as i64; + let max_issue_time = end_time as i64; + + sqlx::query_as::<_, IssuedSwapRow>( + "SELECT + isc.id, + isc.api_key_id, + isc.key_id, + isc.label, + isc.owner, + isc.taker, + isc.to_address, + isc.calldata, + isc.input_token, + isc.output_token, + CAST(strftime('%s', isc.created_at) AS INTEGER) AS created_at_unix + FROM issued_swap_calldata isc + WHERE isc.chain_id = ? + AND isc.created_at >= datetime(?,'unixepoch') + AND isc.created_at <= datetime(?,'unixepoch') + ORDER BY isc.created_at DESC, isc.id DESC", + ) + .bind(crate::CHAIN_ID as i64) + .bind(min_issue_time) + .bind(max_issue_time) + .fetch_all(pool) + .await + .map_err(CustomerVolumeReportError::Database) +} + +async fn load_take_order_rows( + raindex_db: &sqlx::Pool, + start_time: u64, + end_time: u64, +) -> Result, CustomerVolumeReportError> { + let mut query_builder = QueryBuilder::::new( + "SELECT + orderbook_address, + transaction_hash, + block_number, + block_timestamp, + sender, + taker_input, + taker_output + FROM take_orders + WHERE chain_id = ", + ); + + query_builder + .push_bind(crate::CHAIN_ID as i64) + .push(" AND block_timestamp >= ") + .push_bind(start_time as i64) + .push(" AND block_timestamp <= ") + .push_bind(end_time as i64) + .push(" ORDER BY block_number, log_index"); + + query_builder + .build_query_as::() + .fetch_all(raindex_db) + .await + .map_err(CustomerVolumeReportError::Database) +} + +fn aggregate_candidate_rows( + rows: Vec, +) -> Result, CustomerVolumeReportError> { + let mut aggregates: HashMap<(String, String), CandidateExecution> = HashMap::new(); + + for row in rows { + let tx_hash = normalize_hex(&row.transaction_hash); + let sender = normalize_address(&row.sender); + let orderbook_address = normalize_address(&row.orderbook_address); + let input_delta = normalize_hex(&row.taker_output); + let output_delta = normalize_hex(&row.taker_input); + + let aggregate_key = (tx_hash.clone(), orderbook_address.clone()); + + match aggregates.get_mut(&aggregate_key) { + Some(existing) => { + if existing.sender != sender + || existing.block_number != row.block_number as u64 + || existing.timestamp != row.block_timestamp as u64 + { + return Err(CustomerVolumeReportError::InconsistentCandidate { tx_hash }); + } + + add_hex_amount(&mut existing.total_input_hex, &input_delta)?; + add_hex_amount(&mut existing.total_output_hex, &output_delta)?; + } + None => { + let mut total_input_hex = zero_hex()?; + add_hex_amount(&mut total_input_hex, &input_delta)?; + + let mut total_output_hex = zero_hex()?; + add_hex_amount(&mut total_output_hex, &output_delta)?; + + aggregates.insert( + aggregate_key, + CandidateExecution { + tx_hash, + orderbook_address, + sender, + block_number: row.block_number as u64, + timestamp: row.block_timestamp as u64, + total_input_hex, + total_output_hex, + }, + ); + } + } + } + + let mut candidates = aggregates.into_values().collect::>(); + candidates.sort_by(|left, right| { + left.timestamp + .cmp(&right.timestamp) + .then_with(|| left.block_number.cmp(&right.block_number)) + .then_with(|| left.tx_hash.cmp(&right.tx_hash)) + .then_with(|| left.orderbook_address.cmp(&right.orderbook_address)) + }); + Ok(candidates) +} + +struct Selection<'a> { + row: Option<&'a IssuedSwapRow>, + matching_row_count: usize, + distinct_match_count: usize, + distinct_matches: Vec<&'a IssuedSwapRow>, +} + +fn build_issued_row_index(issued_rows: &[IssuedSwapRow]) -> IssuedRowIndex<'_> { + let mut index = HashMap::new(); + + for row in issued_rows { + index + .entry(issued_row_key(&row.taker, &row.to_address, &row.calldata)) + .or_insert_with(Vec::new) + .push(row); + } + + for rows in index.values_mut() { + rows.sort_by(|left, right| { + right + .created_at_unix + .cmp(&left.created_at_unix) + .then_with(|| right.id.cmp(&left.id)) + }); + } + + index +} + +fn issued_row_key(taker: &str, to_address: &str, calldata: &str) -> (String, String, String) { + ( + normalize_address(taker), + normalize_address(to_address), + normalize_hex(calldata), + ) +} + +fn select_issued_row<'a>( + candidate: &CandidateExecution, + tx_input: &str, + issued_row_index: &IssuedRowIndex<'a>, +) -> Selection<'a> { + let candidate_timestamp = candidate.timestamp as i64; + let issued_rows = match issued_row_index.get(&issued_row_key( + &candidate.sender, + &candidate.orderbook_address, + tx_input, + )) { + Some(rows) => rows, + None => { + return Selection { + row: None, + matching_row_count: 0, + distinct_match_count: 0, + distinct_matches: Vec::new(), + }; + } + }; + + let mut matched_row = None; + let mut matching_row_count = 0; + let mut distinct_matches = Vec::new(); + let mut seen_api_keys = std::collections::HashSet::new(); + + for row in issued_rows { + if row.created_at_unix > candidate_timestamp { + continue; + } + + if candidate_timestamp <= row.created_at_unix + MATCH_WINDOW_SECS { + matching_row_count += 1; + if matched_row.is_none() { + matched_row = Some(*row); + } + + if seen_api_keys.insert(row.api_key_id) { + distinct_matches.push(*row); + } + continue; + } + + break; + } + + let row = if distinct_matches.len() == 1 { + matched_row + } else { + None + }; + + Selection { + row, + matching_row_count, + distinct_match_count: distinct_matches.len(), + distinct_matches, + } +} + +fn build_customer_summaries( + executions: &[MatchedExecution], +) -> Result, CustomerVolumeReportError> { + #[derive(Debug)] + struct PairAccumulator { + total_input_hex: String, + total_output_hex: String, + } + + #[derive(Debug)] + struct CustomerAccumulator { + api_key_id: i64, + key_id: String, + label: String, + owner: String, + executed_txs: usize, + pairs: BTreeMap<(String, String), PairAccumulator>, + } + + let mut customers: BTreeMap = BTreeMap::new(); + + for execution in executions { + let customer = + customers + .entry(execution.api_key_id) + .or_insert_with(|| CustomerAccumulator { + api_key_id: execution.api_key_id, + key_id: execution.key_id.clone(), + label: execution.label.clone(), + owner: execution.owner.clone(), + executed_txs: 0, + pairs: BTreeMap::new(), + }); + + customer.executed_txs += 1; + + let pair_key = ( + execution.input_token.clone(), + execution.output_token.clone(), + ); + match customer.pairs.entry(pair_key) { + std::collections::btree_map::Entry::Occupied(mut entry) => { + add_hex_amount( + &mut entry.get_mut().total_input_hex, + &execution.total_input_hex, + )?; + add_hex_amount( + &mut entry.get_mut().total_output_hex, + &execution.total_output_hex, + )?; + } + std::collections::btree_map::Entry::Vacant(entry) => { + let mut total_input_hex = zero_hex()?; + add_hex_amount(&mut total_input_hex, &execution.total_input_hex)?; + + let mut total_output_hex = zero_hex()?; + add_hex_amount(&mut total_output_hex, &execution.total_output_hex)?; + + entry.insert(PairAccumulator { + total_input_hex, + total_output_hex, + }); + } + } + } + + customers + .into_values() + .map(|customer| { + let pairs = customer + .pairs + .into_iter() + .map(|((input_token, output_token), pair)| { + Ok(PairVolume { + input_token, + output_token, + total_input_amount: format_hex(&pair.total_input_hex)?, + total_output_amount: format_hex(&pair.total_output_hex)?, + }) + }) + .collect::, CustomerVolumeReportError>>()?; + + Ok(CustomerVolumeSummary { + api_key_id: customer.api_key_id, + key_id: customer.key_id, + label: customer.label, + owner: customer.owner, + executed_txs: customer.executed_txs, + pairs, + }) + }) + .collect() +} + +async fn fetch_transaction_input( + client: &HttpClient, + rpc_urls: &[Url], + tx_hash: &str, +) -> Result { + let payload = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "eth_getTransactionByHash", + "params": [tx_hash], + }); + + let mut last_error = None; + + for rpc_url in rpc_urls { + for attempt in 1..=RPC_LOOKUP_MAX_ATTEMPTS { + let response = match client.post(rpc_url.clone()).json(&payload).send().await { + Ok(response) => response, + Err(error) => { + last_error = Some(error.to_string()); + sleep_before_retry(attempt).await; + continue; + } + }; + + if !response.status().is_success() { + last_error = Some(format!("rpc status {}", response.status())); + sleep_before_retry(attempt).await; + continue; + } + + let envelope = match response + .json::>() + .await + { + Ok(envelope) => envelope, + Err(error) => { + last_error = Some(error.to_string()); + sleep_before_retry(attempt).await; + continue; + } + }; + + if let Some(result) = envelope.result { + return Ok(normalize_hex(&result.input)); + } + + if let Some(error) = envelope.error { + last_error = Some(error.message); + } + + break; + } + } + + if let Some(message) = last_error { + return Err(CustomerVolumeReportError::RpcLookup { + tx_hash: tx_hash.to_string(), + message, + }); + } + + Err(CustomerVolumeReportError::TransactionNotFound { + tx_hash: tx_hash.to_string(), + }) +} + +async fn sleep_before_retry(attempt: u32) { + if attempt >= RPC_LOOKUP_MAX_ATTEMPTS { + return; + } + + tokio::time::sleep(Duration::from_millis( + RPC_LOOKUP_RETRY_BASE_DELAY_MS * u64::from(attempt), + )) + .await; +} + +fn print_human_report(report: &CustomerVolumeReport) { + println!("Customer volume report"); + println!("Window: {} -> {}", report.start_time, report.end_time); + println!( + "Matched executed txs: {} | Unmatched executed txs: {} | Ambiguous matches: {} | RPC lookup failures: {}", + report.audit.matched_executions, + report.audit.unmatched_executions, + report.audit.ambiguous_matches, + report.audit.rpc_lookup_failures + ); + println!(); + + if !report.rpc_lookup_failures.is_empty() { + println!("RPC lookup failures:"); + for failure in &report.rpc_lookup_failures { + println!( + " {} | sender={} | to={} | block={} | timestamp={} | error={}", + failure.tx_hash, + failure.sender, + failure.to_address, + failure.block_number, + failure.timestamp, + failure.error + ); + } + println!(); + } + + if !report.ambiguous_transactions.is_empty() { + println!("Ambiguous transactions:"); + for transaction in &report.ambiguous_transactions { + println!( + " {} | sender={} | to={} | block={} | timestamp={} | input={} | output={}", + transaction.tx_hash, + transaction.sender, + transaction.to_address, + transaction.block_number, + transaction.timestamp, + transaction.total_input_amount, + transaction.total_output_amount + ); + for candidate in &transaction.candidates { + println!( + " candidate api_key_id={} key_id={} label={} owner={}", + candidate.api_key_id, candidate.key_id, candidate.label, candidate.owner + ); + } + } + println!(); + } + + if report.customers.is_empty() { + println!("No matched customer executions found."); + } else { + for customer in &report.customers { + println!( + "{} ({}) [{}] executions={}", + customer.label, customer.owner, customer.key_id, customer.executed_txs + ); + + for pair in &customer.pairs { + println!( + " {} -> {} | input={} | output={}", + pair.input_token, + pair.output_token, + pair.total_input_amount, + pair.total_output_amount + ); + } + + println!(); + } + } +} + +fn normalize_address(value: &str) -> String { + normalize_prefixed_hex(value) +} + +fn normalize_hex(value: &str) -> String { + normalize_prefixed_hex(value) +} + +fn normalize_prefixed_hex(value: &str) -> String { + let trimmed = value.trim(); + if let Some(hex) = trimmed + .strip_prefix("0x") + .or_else(|| trimmed.strip_prefix("0X")) + { + format!("0x{}", hex.to_ascii_lowercase()) + } else { + format!("0x{}", trimmed.to_ascii_lowercase()) + } +} + +fn zero_hex() -> Result { + Ok(Float::zero()?.as_hex().to_ascii_lowercase()) +} + +fn add_hex_amount( + total_hex: &mut String, + value_hex: &str, +) -> Result<(), CustomerVolumeReportError> { + let total = Float::from_hex(total_hex)?; + let value = Float::from_hex(value_hex)?; + *total_hex = total.add(value)?.as_hex().to_ascii_lowercase(); + Ok(()) +} + +fn format_hex(value_hex: &str) -> Result { + Ok(Float::from_hex(value_hex)?.format()?) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }; + + #[derive(Debug)] + struct SeededMatchContext { + api_key_id: i64, + key_id: String, + } + + async fn test_pool() -> DbPool { + let id = uuid::Uuid::new_v4(); + crate::db::init(&format!("sqlite:file:{id}?mode=memory&cache=shared")) + .await + .expect("test database init") + } + + async fn seed_match_context(pool: &DbPool, created_at_unix: i64) -> SeededMatchContext { + let key_id = uuid::Uuid::new_v4().to_string(); + let secret_hash = crate::auth::hash_secret("test-secret").expect("hash secret"); + let api_key_insert = sqlx::query( + "INSERT INTO api_keys (key_id, secret_hash, label, owner) VALUES (?, ?, ?, ?)", + ) + .bind(&key_id) + .bind(secret_hash) + .bind("alpha") + .bind("owner-a") + .execute(pool) + .await + .expect("insert api key"); + let api_key_id = api_key_insert.last_insert_rowid(); + + sqlx::query( + "INSERT INTO issued_swap_calldata ( + api_key_id, + key_id, + label, + owner, + chain_id, + taker, + to_address, + tx_value, + calldata, + calldata_hash, + input_token, + output_token, + output_amount, + maximum_io_ratio, + estimated_input, + created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime(?,'unixepoch'))", + ) + .bind(api_key_id) + .bind(&key_id) + .bind("alpha") + .bind("owner-a") + .bind(crate::CHAIN_ID as i64) + .bind("0x1111111111111111111111111111111111111111") + .bind("0xd2938e7c9fe3597f78832ce780feb61945c377d7") + .bind("0") + .bind("0xabcdef") + .bind("0xdeadbeef") + .bind("0xinput") + .bind("0xoutput") + .bind("1") + .bind("205") + .bind("50") + .bind(created_at_unix) + .execute(pool) + .await + .expect("insert issued calldata"); + + SeededMatchContext { api_key_id, key_id } + } + + fn issued_row( + id: i64, + api_key_id: i64, + label: &str, + owner: &str, + created_at_unix: i64, + ) -> IssuedSwapRow { + IssuedSwapRow { + id, + api_key_id, + key_id: format!("key-{api_key_id}"), + label: label.to_string(), + owner: owner.to_string(), + taker: "0x1111111111111111111111111111111111111111".to_string(), + to_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), + calldata: "0xabcdef".to_string(), + input_token: "0xinput".to_string(), + output_token: "0xoutput".to_string(), + created_at_unix, + } + } + + fn candidate(tx_hash: &str, sender: &str, to: &str, timestamp: u64) -> CandidateExecution { + CandidateExecution { + tx_hash: tx_hash.to_string(), + orderbook_address: to.to_string(), + sender: sender.to_string(), + block_number: 1, + timestamp, + total_input_hex: Float::parse("10".to_string()).expect("float").as_hex(), + total_output_hex: Float::parse("2".to_string()).expect("float").as_hex(), + } + } + + #[test] + fn test_aggregate_candidate_rows_sums_same_transaction() { + let rows = vec![ + TakeOrderRow { + orderbook_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), + transaction_hash: + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(), + block_number: 10, + block_timestamp: 100, + sender: "0x1111111111111111111111111111111111111111".to_string(), + taker_input: Float::parse("0.25".to_string()).expect("float").as_hex(), + taker_output: Float::parse("50".to_string()).expect("float").as_hex(), + }, + TakeOrderRow { + orderbook_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), + transaction_hash: + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(), + block_number: 10, + block_timestamp: 100, + sender: "0x1111111111111111111111111111111111111111".to_string(), + taker_input: Float::parse("0.20".to_string()).expect("float").as_hex(), + taker_output: Float::parse("40".to_string()).expect("float").as_hex(), + }, + ]; + + let aggregated = aggregate_candidate_rows(rows).expect("aggregate"); + assert_eq!(aggregated.len(), 1); + assert_eq!( + format_hex(&aggregated[0].total_input_hex).expect("format"), + "90" + ); + assert_eq!( + format_hex(&aggregated[0].total_output_hex).expect("format"), + "0.45" + ); + } + + #[test] + fn test_aggregate_candidate_rows_keeps_same_tx_separate_per_orderbook() { + let rows = vec![ + TakeOrderRow { + orderbook_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), + transaction_hash: + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(), + block_number: 10, + block_timestamp: 100, + sender: "0x1111111111111111111111111111111111111111".to_string(), + taker_input: Float::parse("0.25".to_string()).expect("float").as_hex(), + taker_output: Float::parse("50".to_string()).expect("float").as_hex(), + }, + TakeOrderRow { + orderbook_address: "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(), + transaction_hash: + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(), + block_number: 10, + block_timestamp: 100, + sender: "0x1111111111111111111111111111111111111111".to_string(), + taker_input: Float::parse("0.20".to_string()).expect("float").as_hex(), + taker_output: Float::parse("40".to_string()).expect("float").as_hex(), + }, + ]; + + let aggregated = aggregate_candidate_rows(rows).expect("aggregate"); + assert_eq!(aggregated.len(), 2); + assert_eq!( + aggregated[0].orderbook_address, + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + ); + assert_eq!( + aggregated[1].orderbook_address, + "0xd2938e7c9fe3597f78832ce780feb61945c377d7" + ); + } + + #[test] + fn test_select_issued_row_requires_exact_match_and_window() { + let rows = vec![ + issued_row(1, 1, "alpha", "owner-a", 100), + issued_row(2, 2, "beta", "owner-b", 10), + ]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 180, + ), + "0xabcdef", + &issued_row_index, + ); + + assert_eq!(selection.row.map(|row| row.id), Some(1)); + assert_eq!(selection.matching_row_count, 1); + assert_eq!(selection.distinct_match_count, 1); + } + + #[test] + fn test_select_issued_row_rejects_future_issuance() { + let rows = vec![issued_row(1, 1, "alpha", "owner-a", 200)]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 199, + ), + "0xabcdef", + &issued_row_index, + ); + + assert!(selection.row.is_none()); + assert_eq!(selection.matching_row_count, 0); + assert_eq!(selection.distinct_match_count, 0); + } + + #[test] + fn test_select_issued_row_accepts_same_timestamp_issuance() { + let rows = vec![issued_row(1, 1, "alpha", "owner-a", 100)]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 100, + ), + "0xabcdef", + &issued_row_index, + ); + + assert_eq!(selection.row.map(|row| row.id), Some(1)); + assert_eq!(selection.matching_row_count, 1); + assert_eq!(selection.distinct_match_count, 1); + } + + #[test] + fn test_select_issued_row_accepts_exact_match_window_boundary() { + let rows = vec![issued_row(1, 1, "alpha", "owner-a", 100)]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 220, + ), + "0xabcdef", + &issued_row_index, + ); + + assert_eq!(selection.row.map(|row| row.id), Some(1)); + assert_eq!(selection.matching_row_count, 1); + assert_eq!(selection.distinct_match_count, 1); + } + + #[test] + fn test_select_issued_row_rejects_after_match_window_boundary() { + let rows = vec![issued_row(1, 1, "alpha", "owner-a", 100)]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 221, + ), + "0xabcdef", + &issued_row_index, + ); + + assert!(selection.row.is_none()); + assert_eq!(selection.matching_row_count, 0); + assert_eq!(selection.distinct_match_count, 0); + } + + #[test] + fn test_select_issued_row_returns_none_for_ambiguous_distinct_keys() { + let rows = vec![ + issued_row(1, 1, "alpha", "owner-a", 100), + issued_row(2, 2, "beta", "owner-b", 200), + ]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 210, + ), + "0xabcdef", + &issued_row_index, + ); + + assert!(selection.row.is_none()); + assert_eq!(selection.matching_row_count, 2); + assert_eq!(selection.distinct_match_count, 2); + } + + #[test] + fn test_select_issued_row_treats_same_key_duplicates_as_single_match() { + let rows = vec![ + issued_row(1, 1, "alpha", "owner-a", 100), + issued_row(2, 1, "alpha", "owner-a", 200), + ]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 210, + ), + "0xabcdef", + &issued_row_index, + ); + + assert_eq!(selection.row.map(|row| row.id), Some(2)); + assert_eq!(selection.matching_row_count, 2); + assert_eq!(selection.distinct_match_count, 1); + } + + #[test] + fn test_build_customer_summaries_groups_by_customer_and_pair() { + let executions = vec![ + MatchedExecution { + api_key_id: 1, + key_id: "key-1".to_string(), + label: "alpha".to_string(), + owner: "owner-a".to_string(), + tx_hash: "0xtx1".to_string(), + sender: "0x1111111111111111111111111111111111111111".to_string(), + to_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), + block_number: 1, + timestamp: 10, + input_token: "0xinput".to_string(), + output_token: "0xoutput".to_string(), + total_input_hex: Float::parse("50".to_string()).expect("float").as_hex(), + total_output_hex: Float::parse("0.25".to_string()).expect("float").as_hex(), + duplicate_match_count: 1, + }, + MatchedExecution { + api_key_id: 1, + key_id: "key-1".to_string(), + label: "alpha".to_string(), + owner: "owner-a".to_string(), + tx_hash: "0xtx2".to_string(), + sender: "0x1111111111111111111111111111111111111111".to_string(), + to_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), + block_number: 2, + timestamp: 11, + input_token: "0xinput".to_string(), + output_token: "0xoutput".to_string(), + total_input_hex: Float::parse("40".to_string()).expect("float").as_hex(), + total_output_hex: Float::parse("0.20".to_string()).expect("float").as_hex(), + duplicate_match_count: 1, + }, + ]; + + let customers = build_customer_summaries(&executions).expect("summary"); + assert_eq!(customers.len(), 1); + assert_eq!(customers[0].executed_txs, 2); + assert_eq!(customers[0].pairs.len(), 1); + assert_eq!(customers[0].pairs[0].total_input_amount, "90"); + assert_eq!(customers[0].pairs[0].total_output_amount, "0.45"); + } + + async fn mock_rpc_url_with_body(body: String) -> String { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("bind mock rpc server"); + let addr = listener.local_addr().expect("mock rpc addr"); + + tokio::spawn(async move { + loop { + let Ok((mut socket, _)) = listener.accept().await else { + break; + }; + + let body = body.clone(); + tokio::spawn(async move { + let mut buffer = [0u8; 4096]; + let _ = tokio::io::AsyncReadExt::read(&mut socket, &mut buffer).await; + let response = format!( + "HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}", + body.len(), + body + ); + let _ = + tokio::io::AsyncWriteExt::write_all(&mut socket, response.as_bytes()).await; + }); + } + }); + + format!("http://{addr}") + } + + async fn mock_rpc_url_with_bodies(bodies: Vec) -> String { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("bind mock rpc server"); + let addr = listener.local_addr().expect("mock rpc addr"); + let bodies = Arc::new(bodies); + let next_index = Arc::new(AtomicUsize::new(0)); + + tokio::spawn(async move { + loop { + let Ok((mut socket, _)) = listener.accept().await else { + break; + }; + + let bodies = Arc::clone(&bodies); + let next_index = Arc::clone(&next_index); + + tokio::spawn(async move { + let mut buffer = [0u8; 4096]; + let _ = tokio::io::AsyncReadExt::read(&mut socket, &mut buffer).await; + + let index = next_index.fetch_add(1, Ordering::SeqCst); + let body = bodies + .get(index) + .or_else(|| bodies.last()) + .cloned() + .expect("at least one mock rpc body"); + + let response = format!( + "HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}", + body.len(), + body + ); + let _ = + tokio::io::AsyncWriteExt::write_all(&mut socket, response.as_bytes()).await; + }); + } + }); + + format!("http://{addr}") + } + + async fn mock_rpc_url_for_input(input: &str) -> String { + mock_rpc_url_with_body( + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": { + "input": input, + } + }) + .to_string(), + ) + .await + } + + async fn mock_rpc_url_for_missing_transaction() -> String { + mock_rpc_url_with_body( + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": serde_json::Value::Null, + }) + .to_string(), + ) + .await + } + + async fn mock_rpc_url_for_rpc_error(message: &str) -> String { + mock_rpc_url_with_body( + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "error": { + "message": message, + } + }) + .to_string(), + ) + .await + } + + async fn seed_raindex_take_orders( + db_path: &std::path::Path, + tx_hash: &str, + block_timestamp: i64, + ) { + seed_raindex_take_orders_for_orderbook( + db_path, + tx_hash, + block_timestamp, + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + ) + .await; + } + + async fn seed_raindex_take_orders_for_orderbook( + db_path: &std::path::Path, + tx_hash: &str, + block_timestamp: i64, + orderbook_address: &str, + ) { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect_with( + SqliteConnectOptions::new() + .filename(db_path) + .create_if_missing(true), + ) + .await + .expect("open test raindex db"); + + sqlx::query( + "CREATE TABLE IF NOT EXISTS take_orders ( + chain_id INTEGER NOT NULL, + orderbook_address TEXT NOT NULL, + transaction_hash TEXT NOT NULL, + block_number INTEGER NOT NULL, + block_timestamp INTEGER NOT NULL, + sender TEXT NOT NULL, + taker_input TEXT NOT NULL, + taker_output TEXT NOT NULL, + log_index INTEGER NOT NULL + )", + ) + .execute(&pool) + .await + .expect("create take_orders table"); + + sqlx::query( + "INSERT INTO take_orders ( + chain_id, + orderbook_address, + transaction_hash, + block_number, + block_timestamp, + sender, + taker_input, + taker_output, + log_index + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(crate::CHAIN_ID as i64) + .bind(orderbook_address) + .bind(tx_hash) + .bind(10_i64) + .bind(block_timestamp) + .bind("0x1111111111111111111111111111111111111111") + .bind(Float::parse("0.25".to_string()).expect("float").as_hex()) + .bind(Float::parse("50".to_string()).expect("float").as_hex()) + .bind(0_i64) + .execute(&pool) + .await + .expect("insert take order"); + } + + fn test_registry_settings(rpc_url: &str) -> String { + test_registry_settings_with_orderbook(rpc_url, "0xd2938e7c9fe3597f78832ce780feb61945c377d7") + } + + fn test_registry_settings_with_orderbook(rpc_url: &str, orderbook_address: &str) -> String { + format!( + "version: 4 +networks: + base: + rpcs: + - {rpc_url} + chain-id: 8453 + currency: ETH +subgraphs: + base: https://example.com/subgraph +orderbooks: + base: + address: {orderbook_address} + network: base + subgraph: base + deployment-block: 0 +deployers: + base: + address: 0xC1A14cE2fd58A3A2f99deCb8eDd866204eE07f8D + network: base +tokens: + token1: + address: 0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913 + network: base +" + ) + } + + async fn load_test_raindex( + db_path: std::path::PathBuf, + tx_input: &str, + ) -> crate::raindex::RaindexProvider { + let rpc_url = mock_rpc_url_for_input(tx_input).await; + load_test_raindex_with_rpc_url(db_path, &rpc_url).await + } + + async fn load_test_raindex_with_rpc_url( + db_path: std::path::PathBuf, + rpc_url: &str, + ) -> crate::raindex::RaindexProvider { + let settings = test_registry_settings(rpc_url); + let registry_url = + crate::test_helpers::mock_raindex_registry_url_with_settings(&settings).await; + + crate::raindex::RaindexProvider::load(®istry_url, Some(db_path)) + .await + .expect("load test raindex provider") + } + + async fn load_test_raindex_with_rpc_url_and_orderbook( + db_path: std::path::PathBuf, + rpc_url: &str, + orderbook_address: &str, + ) -> crate::raindex::RaindexProvider { + let settings = test_registry_settings_with_orderbook(rpc_url, orderbook_address); + let registry_url = + crate::test_helpers::mock_raindex_registry_url_with_settings(&settings).await; + + crate::raindex::RaindexProvider::load(®istry_url, Some(db_path)) + .await + .expect("load test raindex provider") + } + + #[tokio::test] + async fn test_build_report_reconciles_customer_volume_report() { + let pool = test_pool().await; + let expected = seed_match_context(&pool, 100).await; + let tx_hash = "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders(&db_path, tx_hash, 105).await; + + let raindex = load_test_raindex(db_path, "0xabcdef").await; + + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.start_time, 90); + assert_eq!(report.end_time, 110); + assert_eq!(report.audit.matched_executions, 1); + assert_eq!(report.audit.unmatched_executions, 0); + assert_eq!(report.audit.ambiguous_matches, 0); + assert_eq!(report.audit.rpc_lookup_failures, 0); + assert_eq!(report.customers.len(), 1); + assert_eq!(report.customers[0].api_key_id, expected.api_key_id); + assert_eq!(report.customers[0].key_id, expected.key_id); + assert_eq!(report.customers[0].label, "alpha"); + assert_eq!(report.customers[0].owner, "owner-a"); + assert_eq!(report.customers[0].executed_txs, 1); + assert_eq!(report.customers[0].pairs.len(), 1); + assert_eq!(report.customers[0].pairs[0].input_token, "0xinput"); + assert_eq!(report.customers[0].pairs[0].output_token, "0xoutput"); + assert_eq!(report.customers[0].pairs[0].total_input_amount, "50"); + assert_eq!(report.customers[0].pairs[0].total_output_amount, "0.25"); + assert_eq!(report.matched_transactions.len(), 1); + assert_eq!(report.matched_transactions[0].tx_hash, tx_hash); + assert_eq!( + report.matched_transactions[0].api_key_id, + expected.api_key_id + ); + assert_eq!(report.matched_transactions[0].key_id, expected.key_id); + assert_eq!( + report.matched_transactions[0].sender, + "0x1111111111111111111111111111111111111111" + ); + assert_eq!( + report.matched_transactions[0].to_address, + "0xd2938e7c9fe3597f78832ce780feb61945c377d7" + ); + assert_eq!(report.matched_transactions[0].input_token, "0xinput"); + assert_eq!(report.matched_transactions[0].output_token, "0xoutput"); + assert_eq!(report.matched_transactions[0].total_input_amount, "50"); + assert_eq!(report.matched_transactions[0].total_output_amount, "0.25"); + assert_eq!(report.unmatched_transactions.len(), 0); + assert!(report.ambiguous_transactions.is_empty()); + assert!(report.rpc_lookup_failures.is_empty()); + } + + #[tokio::test] + async fn test_build_report_uses_issued_snapshot_after_api_key_deletion() { + let pool = test_pool().await; + let expected = seed_match_context(&pool, 100).await; + let tx_hash = "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; + + sqlx::query("DELETE FROM api_keys WHERE id = ?") + .bind(expected.api_key_id) + .execute(&pool) + .await + .expect("delete api key"); + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders(&db_path, tx_hash, 105).await; + + let raindex = load_test_raindex(db_path, "0xabcdef").await; + + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.customers.len(), 1); + assert_eq!(report.customers[0].api_key_id, expected.api_key_id); + assert_eq!(report.customers[0].key_id, expected.key_id); + assert_eq!(report.customers[0].label, "alpha"); + assert_eq!(report.customers[0].owner, "owner-a"); + assert_eq!(report.matched_transactions.len(), 1); + assert_eq!( + report.matched_transactions[0].key_id, + report.customers[0].key_id + ); + assert!(report.ambiguous_transactions.is_empty()); + assert!(report.rpc_lookup_failures.is_empty()); + } + + #[tokio::test] + async fn test_build_report_uses_chain_history_not_current_orderbook_registry() { + let pool = test_pool().await; + let expected = seed_match_context(&pool, 100).await; + let tx_hash = "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders_for_orderbook( + &db_path, + tx_hash, + 105, + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + ) + .await; + + let rpc_url = mock_rpc_url_for_input("0xabcdef").await; + let raindex = load_test_raindex_with_rpc_url_and_orderbook( + db_path, + &rpc_url, + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + ) + .await; + + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.audit.matched_executions, 1); + assert_eq!(report.matched_transactions.len(), 1); + assert_eq!(report.matched_transactions[0].tx_hash, tx_hash); + assert_eq!( + report.matched_transactions[0].api_key_id, + expected.api_key_id + ); + assert!(report.unmatched_transactions.is_empty()); + assert!(report.ambiguous_transactions.is_empty()); + assert!(report.rpc_lookup_failures.is_empty()); + } + + #[tokio::test] + async fn test_build_report_does_not_match_when_execution_precedes_issuance() { + let pool = test_pool().await; + seed_match_context(&pool, 105).await; + let tx_hash = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders(&db_path, tx_hash, 90).await; + + let raindex = load_test_raindex(db_path, "0xabcdef").await; + + let report = build_report(&pool, &raindex, 80, 110) + .await + .expect("build report"); + + assert_eq!(report.audit.matched_executions, 0); + assert_eq!(report.matched_transactions.len(), 0); + assert_eq!(report.audit.unmatched_executions, 1); + assert_eq!(report.unmatched_transactions.len(), 1); + assert_eq!(report.unmatched_transactions[0].tx_hash, tx_hash); + assert!(report.ambiguous_transactions.is_empty()); + assert!(report.rpc_lookup_failures.is_empty()); + } + + #[tokio::test] + async fn test_build_report_excludes_issued_rows_after_execution() { + let pool = test_pool().await; + seed_match_context(&pool, 111).await; + let tx_hash = "0xabababababababababababababababababababababababababababababababab"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders(&db_path, tx_hash, 110).await; + + let raindex = load_test_raindex(db_path, "0xabcdef").await; + + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.audit.matched_executions, 0); + assert_eq!(report.matched_transactions.len(), 0); + assert_eq!(report.audit.unmatched_executions, 1); + assert_eq!(report.unmatched_transactions.len(), 1); + assert_eq!(report.unmatched_transactions[0].tx_hash, tx_hash); + assert!(report.ambiguous_transactions.is_empty()); + assert!(report.rpc_lookup_failures.is_empty()); + } + + #[tokio::test] + async fn test_build_report_keeps_same_tx_hash_separate_per_orderbook() { + let pool = test_pool().await; + let expected = seed_match_context(&pool, 100).await; + let tx_hash = "0xedededededededededededededededededededededededededededededededed"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders_for_orderbook( + &db_path, + tx_hash, + 105, + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + ) + .await; + seed_raindex_take_orders_for_orderbook( + &db_path, + tx_hash, + 105, + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + ) + .await; + + let raindex = load_test_raindex(db_path, "0xabcdef").await; + + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.audit.matched_executions, 1); + assert_eq!(report.audit.unmatched_executions, 1); + assert_eq!(report.matched_transactions.len(), 1); + assert_eq!(report.matched_transactions[0].tx_hash, tx_hash); + assert_eq!( + report.matched_transactions[0].api_key_id, + expected.api_key_id + ); + assert_eq!(report.unmatched_transactions.len(), 1); + assert_eq!(report.unmatched_transactions[0].tx_hash, tx_hash); + assert_eq!( + report.unmatched_transactions[0].to_address, + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + ); + assert!(report.ambiguous_transactions.is_empty()); + assert!(report.rpc_lookup_failures.is_empty()); + } + + #[tokio::test] + async fn test_build_report_records_ambiguous_transactions_without_attribution() { + let pool = test_pool().await; + let first = seed_match_context(&pool, 100).await; + let second = seed_match_context(&pool, 101).await; + let tx_hash = "0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders(&db_path, tx_hash, 105).await; + + let raindex = load_test_raindex(db_path, "0xabcdef").await; + + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.audit.matched_executions, 0); + assert_eq!(report.audit.unmatched_executions, 0); + assert_eq!(report.audit.ambiguous_matches, 1); + assert!(report.customers.is_empty()); + assert!(report.matched_transactions.is_empty()); + assert!(report.unmatched_transactions.is_empty()); + assert_eq!(report.ambiguous_transactions.len(), 1); + assert_eq!(report.ambiguous_transactions[0].tx_hash, tx_hash); + assert_eq!(report.ambiguous_transactions[0].candidates.len(), 2); + assert_eq!( + report.ambiguous_transactions[0].candidates[0].api_key_id, + second.api_key_id + ); + assert_eq!( + report.ambiguous_transactions[0].candidates[1].api_key_id, + first.api_key_id + ); + assert!(report.rpc_lookup_failures.is_empty()); + } + + #[tokio::test] + async fn test_build_report_records_rpc_lookup_failures_without_aborting() { + let pool = test_pool().await; + let tx_hash = "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders(&db_path, tx_hash, 105).await; + + let rpc_url = mock_rpc_url_for_missing_transaction().await; + let raindex = load_test_raindex_with_rpc_url(db_path, &rpc_url).await; + + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.audit.matched_executions, 0); + assert_eq!(report.audit.unmatched_executions, 0); + assert_eq!(report.audit.ambiguous_matches, 0); + assert_eq!(report.audit.rpc_lookup_failures, 1); + assert!(report.customers.is_empty()); + assert!(report.matched_transactions.is_empty()); + assert!(report.unmatched_transactions.is_empty()); + assert!(report.ambiguous_transactions.is_empty()); + assert_eq!(report.rpc_lookup_failures.len(), 1); + assert_eq!(report.rpc_lookup_failures[0].tx_hash, tx_hash); + assert_eq!( + report.rpc_lookup_failures[0].error, + "transaction not found via RPC" + ); + } + + #[tokio::test] + async fn test_fetch_transaction_input_retries_same_rpc_url_after_invalid_json() { + let rpc_url = mock_rpc_url_with_bodies(vec![ + "not-json".to_string(), + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": { + "input": "0xabcdef", + } + }) + .to_string(), + ]) + .await; + let client = HttpClient::builder() + .timeout(Duration::from_secs(RPC_LOOKUP_TIMEOUT_SECS)) + .build() + .expect("http client"); + + let input = + fetch_transaction_input(&client, &[Url::parse(&rpc_url).expect("rpc url")], "0xtx") + .await + .expect("rpc retry should succeed"); + + assert_eq!(input, "0xabcdef"); + } + + #[tokio::test] + async fn test_fetch_transaction_input_falls_back_to_next_rpc_url() { + let failing_rpc_url = mock_rpc_url_for_rpc_error("upstream unavailable").await; + let succeeding_rpc_url = mock_rpc_url_for_input("0xabcdef").await; + let client = HttpClient::builder() + .timeout(Duration::from_secs(RPC_LOOKUP_TIMEOUT_SECS)) + .build() + .expect("http client"); + + let input = fetch_transaction_input( + &client, + &[ + Url::parse(&failing_rpc_url).expect("failing rpc url"), + Url::parse(&succeeding_rpc_url).expect("succeeding rpc url"), + ], + "0xtx", + ) + .await + .expect("rpc fallback should succeed"); + + assert_eq!(input, "0xabcdef"); + } +} diff --git a/src/reporting/mod.rs b/src/reporting/mod.rs new file mode 100644 index 0000000..37ad6b0 --- /dev/null +++ b/src/reporting/mod.rs @@ -0,0 +1 @@ +pub(crate) mod customer_volume; diff --git a/src/routes/swap/calldata.rs b/src/routes/swap/calldata.rs index cee7d06..2406cb4 100644 --- a/src/routes/swap/calldata.rs +++ b/src/routes/swap/calldata.rs @@ -1,8 +1,11 @@ use super::{RaindexSwapDataSource, SwapDataSource}; use crate::auth::AuthenticatedKey; +use crate::db::DbPool; use crate::error::{ApiError, ApiErrorResponse}; use crate::fairings::{GlobalRateLimit, TracingSpan}; use crate::types::swap::{SwapCalldataRequest, SwapCalldataResponse}; +use alloy::hex::encode_prefixed; +use alloy::primitives::keccak256; use rain_orderbook_common::raindex_client::take_orders::TakeOrdersRequest; use rain_orderbook_common::take_orders::TakeOrdersMode; use rocket::serde::json::Json; @@ -27,19 +30,21 @@ use tracing::Instrument; #[post("/calldata", data = "")] pub async fn post_swap_calldata( _global: GlobalRateLimit, - _key: AuthenticatedKey, + key: AuthenticatedKey, + pool: &State, shared_raindex: &State, span: TracingSpan, request: Json, ) -> Result, ApiError> { let req = request.into_inner(); + let pool = pool.inner().clone(); async move { - tracing::info!(body = ?req, "request received"); + tracing::info!(api_key_id = key.id, key_id = %key.key_id, body = ?req, "request received"); let raindex = shared_raindex.read().await; let ds = RaindexSwapDataSource { client: raindex.client(), }; - let response = process_swap_calldata(&ds, req).await?; + let response = handle_swap_calldata(&ds, &pool, &key, req).await?; Ok(Json(response)) } .instrument(span.0) @@ -48,7 +53,7 @@ pub async fn post_swap_calldata( async fn process_swap_calldata( ds: &dyn SwapDataSource, - req: SwapCalldataRequest, + req: &SwapCalldataRequest, ) -> Result { let take_req = TakeOrdersRequest { taker: req.taker.to_string(), @@ -56,21 +61,88 @@ async fn process_swap_calldata( sell_token: req.input_token.to_string(), buy_token: req.output_token.to_string(), mode: TakeOrdersMode::BuyUpTo, - amount: req.output_amount, - price_cap: req.maximum_io_ratio, + amount: req.output_amount.clone(), + price_cap: req.maximum_io_ratio.clone(), }; ds.get_calldata(take_req).await } +async fn handle_swap_calldata( + ds: &dyn SwapDataSource, + pool: &DbPool, + key: &AuthenticatedKey, + req: SwapCalldataRequest, +) -> Result { + let response = process_swap_calldata(ds, &req).await?; + if let Err(error) = persist_issued_swap_calldata(pool, key, &req, &response).await { + tracing::warn!( + error = %error, + api_key_id = key.id, + key_id = %key.key_id, + taker = %req.taker, + to = %response.to, + "continuing after issued swap calldata persistence failure" + ); + } + Ok(response) +} + +async fn persist_issued_swap_calldata( + pool: &DbPool, + key: &AuthenticatedKey, + request: &SwapCalldataRequest, + response: &SwapCalldataResponse, +) -> Result<(), sqlx::Error> { + if !should_persist_issued_swap_calldata(response) { + return Ok(()); + } + + let record = crate::db::issued_swap_calldata::NewIssuedSwapCalldata { + api_key_id: key.id, + key_id: key.key_id.clone(), + label: key.label.clone(), + owner: key.owner.clone(), + chain_id: crate::CHAIN_ID as i64, + taker: format!("{:#x}", request.taker), + to_address: format!("{:#x}", response.to), + tx_value: response.value.to_string(), + calldata: encode_prefixed(response.data.as_ref()), + calldata_hash: encode_prefixed(keccak256(response.data.as_ref())), + input_token: format!("{:#x}", request.input_token), + output_token: format!("{:#x}", request.output_token), + output_amount: request.output_amount.clone(), + maximum_io_ratio: request.maximum_io_ratio.clone(), + estimated_input: response.estimated_input.clone(), + }; + + crate::db::issued_swap_calldata::insert(pool, &record).await?; + + tracing::info!( + api_key_id = key.id, + key_id = %key.key_id, + taker = %request.taker, + to = %response.to, + "persisted issued swap calldata" + ); + + Ok(()) +} + +fn should_persist_issued_swap_calldata(response: &SwapCalldataResponse) -> bool { + !response.data.is_empty() && response.approvals.is_empty() +} + #[cfg(test)] mod tests { use super::*; use crate::routes::swap::test_fixtures::MockSwapDataSource; - use crate::test_helpers::TestClientBuilder; + use crate::test_helpers::{basic_auth_header, seed_api_key, TestClientBuilder}; use crate::types::common::Approval; - use alloy::primitives::{address, Address, Bytes, U256}; - use rocket::http::{ContentType, Status}; + use alloy::hex::encode_prefixed; + use alloy::primitives::{address, keccak256, Address, Bytes, U256}; + use rocket::http::{ContentType, Header, Status}; + use sqlx::FromRow; const USDC: Address = address!("833589fCD6eDb6E08f4c7C32D4f71b54bdA02913"); const WETH: Address = address!("4200000000000000000000000000000000000006"); @@ -120,9 +192,8 @@ mod tests { candidates: vec![], calldata_result: Ok(ready_response()), }; - let result = process_swap_calldata(&ds, calldata_request("100", "2.5")) - .await - .unwrap(); + let request = calldata_request("100", "2.5"); + let result = process_swap_calldata(&ds, &request).await.unwrap(); assert_eq!(result.to, ORDERBOOK); assert!(!result.data.is_empty()); @@ -138,9 +209,8 @@ mod tests { candidates: vec![], calldata_result: Ok(approval_response()), }; - let result = process_swap_calldata(&ds, calldata_request("100", "2.5")) - .await - .unwrap(); + let request = calldata_request("100", "2.5"); + let result = process_swap_calldata(&ds, &request).await.unwrap(); assert_eq!(result.to, ORDERBOOK); assert!(result.data.is_empty()); @@ -158,7 +228,8 @@ mod tests { "no liquidity found for this pair".into(), )), }; - let result = process_swap_calldata(&ds, calldata_request("100", "2.5")).await; + let request = calldata_request("100", "2.5"); + let result = process_swap_calldata(&ds, &request).await; assert!(matches!(result, Err(ApiError::NotFound(msg)) if msg.contains("no liquidity"))); } @@ -169,7 +240,8 @@ mod tests { candidates: vec![], calldata_result: Err(ApiError::BadRequest("invalid parameters".into())), }; - let result = process_swap_calldata(&ds, calldata_request("not-a-number", "2.5")).await; + let request = calldata_request("not-a-number", "2.5"); + let result = process_swap_calldata(&ds, &request).await; assert!(matches!(result, Err(ApiError::BadRequest(_)))); } @@ -180,10 +252,252 @@ mod tests { candidates: vec![], calldata_result: Err(ApiError::Internal("failed to generate calldata".into())), }; - let result = process_swap_calldata(&ds, calldata_request("100", "2.5")).await; + let request = calldata_request("100", "2.5"); + let result = process_swap_calldata(&ds, &request).await; assert!(matches!(result, Err(ApiError::Internal(_)))); } + #[derive(Debug, FromRow)] + struct IssuedSwapCalldataRow { + api_key_id: i64, + key_id: String, + label: String, + owner: String, + chain_id: i64, + taker: String, + to_address: String, + tx_value: String, + calldata: String, + calldata_hash: String, + input_token: String, + output_token: String, + output_amount: String, + maximum_io_ratio: String, + estimated_input: String, + } + + #[derive(Debug, FromRow)] + struct AuthenticatedKeyRow { + id: i64, + key_id: String, + label: String, + owner: String, + is_admin: bool, + } + + async fn fetch_issued_rows(pool: &DbPool) -> Vec { + sqlx::query_as::<_, IssuedSwapCalldataRow>( + "SELECT + api_key_id, + key_id, + label, + owner, + chain_id, + taker, + to_address, + tx_value, + calldata, + calldata_hash, + input_token, + output_token, + output_amount, + maximum_io_ratio, + estimated_input + FROM issued_swap_calldata + ORDER BY id", + ) + .fetch_all(pool) + .await + .expect("query issued swap calldata") + } + + async fn fetch_authenticated_key(pool: &DbPool, key_id: &str) -> AuthenticatedKey { + let row = sqlx::query_as::<_, AuthenticatedKeyRow>( + "SELECT id, key_id, label, owner, is_admin FROM api_keys WHERE key_id = ?", + ) + .bind(key_id) + .fetch_one(pool) + .await + .expect("authenticated key"); + + AuthenticatedKey { + id: row.id, + key_id: row.key_id, + label: row.label, + owner: row.owner, + is_admin: row.is_admin, + } + } + + async fn issued_row_count(pool: &DbPool) -> i64 { + let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM issued_swap_calldata") + .fetch_one(pool) + .await + .expect("query issued swap calldata count"); + row.0 + } + + #[rocket::async_test] + async fn test_handle_swap_calldata_persists_executable_payload() { + let client = TestClientBuilder::new().build().await; + let (key_id, _secret) = seed_api_key(&client).await; + let pool = client.rocket().state::().expect("pool"); + let key = fetch_authenticated_key(pool, &key_id).await; + + let ds = MockSwapDataSource { + orders: Ok(vec![]), + candidates: vec![], + calldata_result: Ok(ready_response()), + }; + let request = calldata_request("100", "2.5"); + + let response = handle_swap_calldata(&ds, pool, &key, request.clone()) + .await + .expect("successful swap calldata"); + + assert_eq!(response.to, ORDERBOOK); + assert_eq!(response.data, ready_response().data); + assert_eq!(response.value, U256::ZERO); + assert_eq!(response.estimated_input, "150"); + assert!(response.approvals.is_empty()); + + let rows = fetch_issued_rows(pool).await; + assert_eq!(rows.len(), 1); + let row = &rows[0]; + assert_eq!(row.api_key_id, key.id); + assert_eq!(row.key_id, key.key_id.as_str()); + assert_eq!(row.label, key.label.as_str()); + assert_eq!(row.owner, key.owner.as_str()); + assert_eq!(row.chain_id, crate::CHAIN_ID as i64); + assert_eq!(row.taker, format!("{:#x}", request.taker)); + assert_eq!(row.to_address, format!("{:#x}", ORDERBOOK)); + assert_eq!(row.tx_value, U256::ZERO.to_string()); + assert_eq!(row.calldata, encode_prefixed(response.data.as_ref())); + assert_eq!( + row.calldata_hash, + encode_prefixed(keccak256(response.data.as_ref())) + ); + assert_eq!(row.input_token, format!("{:#x}", request.input_token)); + assert_eq!(row.output_token, format!("{:#x}", request.output_token)); + assert_eq!(row.output_amount, request.output_amount); + assert_eq!(row.maximum_io_ratio, request.maximum_io_ratio); + assert_eq!(row.estimated_input, response.estimated_input); + } + + #[rocket::async_test] + async fn test_handle_swap_calldata_skips_approval_only_payload() { + let client = TestClientBuilder::new().build().await; + let (key_id, _secret) = seed_api_key(&client).await; + let pool = client.rocket().state::().expect("pool"); + let key = fetch_authenticated_key(pool, &key_id).await; + + let ds = MockSwapDataSource { + orders: Ok(vec![]), + candidates: vec![], + calldata_result: Ok(approval_response()), + }; + + handle_swap_calldata(&ds, pool, &key, calldata_request("100", "2.5")) + .await + .expect("successful approval response"); + + let rows = fetch_issued_rows(pool).await; + assert!(rows.is_empty()); + } + + #[rocket::async_test] + async fn test_handle_swap_calldata_allows_duplicate_payloads() { + let client = TestClientBuilder::new().build().await; + let (key_id, _secret) = seed_api_key(&client).await; + let pool = client.rocket().state::().expect("pool"); + let key = fetch_authenticated_key(pool, &key_id).await; + + let ds = MockSwapDataSource { + orders: Ok(vec![]), + candidates: vec![], + calldata_result: Ok(ready_response()), + }; + let request = calldata_request("100", "2.5"); + + handle_swap_calldata(&ds, pool, &key, request.clone()) + .await + .expect("first swap calldata"); + handle_swap_calldata(&ds, pool, &key, request) + .await + .expect("second swap calldata"); + + let rows = fetch_issued_rows(pool).await; + assert_eq!(rows.len(), 2); + assert_eq!(rows[0].calldata_hash, rows[1].calldata_hash); + } + + #[rocket::async_test] + async fn test_issued_swap_calldata_survives_api_key_deletion() { + let client = TestClientBuilder::new().build().await; + let (key_id, _secret) = seed_api_key(&client).await; + let pool = client.rocket().state::().expect("pool"); + let key = fetch_authenticated_key(pool, &key_id).await; + + let ds = MockSwapDataSource { + orders: Ok(vec![]), + candidates: vec![], + calldata_result: Ok(ready_response()), + }; + + handle_swap_calldata(&ds, pool, &key, calldata_request("100", "2.5")) + .await + .expect("successful swap calldata"); + + sqlx::query("DELETE FROM api_keys WHERE key_id = ?") + .bind(&key_id) + .execute(pool) + .await + .expect("delete api key"); + + let rows = fetch_issued_rows(pool).await; + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].key_id, key_id); + assert_eq!(rows[0].label, "test-key"); + assert_eq!(rows[0].owner, "test-owner"); + } + + #[test] + fn test_should_persist_issued_swap_calldata() { + assert!(should_persist_issued_swap_calldata(&ready_response())); + assert!(!should_persist_issued_swap_calldata(&approval_response())); + } + + #[rocket::async_test] + async fn test_handle_swap_calldata_returns_response_if_persistence_fails() { + let client = TestClientBuilder::new().build().await; + let pool = client.rocket().state::().expect("pool"); + + sqlx::query("DROP TABLE issued_swap_calldata") + .execute(pool) + .await + .expect("drop issued swap calldata table"); + + let ds = MockSwapDataSource { + orders: Ok(vec![]), + candidates: vec![], + calldata_result: Ok(ready_response()), + }; + + let key = AuthenticatedKey { + id: i64::MAX, + key_id: "missing".to_string(), + label: "missing".to_string(), + owner: "missing".to_string(), + is_admin: false, + }; + + let result = handle_swap_calldata(&ds, pool, &key, calldata_request("100", "2.5")) + .await + .expect("swap calldata should still succeed"); + assert_eq!(result.to, ORDERBOOK); + assert_eq!(result.data, ready_response().data); + } + #[rocket::async_test] async fn test_swap_calldata_401_without_auth() { let client = TestClientBuilder::new().build().await; @@ -195,4 +509,38 @@ mod tests { .await; assert_eq!(response.status(), Status::Unauthorized); } + + #[rocket::async_test] + async fn test_unauthenticated_swap_calldata_creates_no_issued_row() { + let client = TestClientBuilder::new().build().await; + let response = client + .post("/v1/swap/calldata") + .header(ContentType::JSON) + .body(r#"{"taker":"0x1111111111111111111111111111111111111111","inputToken":"0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913","outputToken":"0x4200000000000000000000000000000000000006","outputAmount":"100","maximumIoRatio":"2.5"}"#) + .dispatch() + .await; + assert_eq!(response.status(), Status::Unauthorized); + + let pool = client.rocket().state::().expect("pool"); + assert_eq!(issued_row_count(pool).await, 0); + } + + #[rocket::async_test] + async fn test_failed_authenticated_swap_calldata_creates_no_issued_row() { + let client = TestClientBuilder::new().build().await; + let (key_id, secret) = seed_api_key(&client).await; + let header = Header::new("Authorization", basic_auth_header(&key_id, &secret)); + + let response = client + .post("/v1/swap/calldata") + .header(ContentType::JSON) + .header(header) + .body(r#"{"taker":"0x1111111111111111111111111111111111111111","inputToken":"0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913","outputToken":"0x4200000000000000000000000000000000000006","outputAmount":"not-a-number","maximumIoRatio":"2.5"}"#) + .dispatch() + .await; + assert_eq!(response.status(), Status::BadRequest); + + let pool = client.rocket().state::().expect("pool"); + assert_eq!(issued_row_count(pool).await, 0); + } }