From f6124111a14c32b3e6bb875bf86df64b5da30fbb Mon Sep 17 00:00:00 2001 From: Arda Nakisci Date: Tue, 24 Mar 2026 13:36:00 +0300 Subject: [PATCH 1/5] Persist issued swap calldata for API key attribution --- ...0324000000_create_issued_swap_calldata.sql | 24 ++ src/db/issued_swap_calldata.rs | 48 +++ src/db/mod.rs | 1 + src/routes/swap/calldata.rs | 297 ++++++++++++++++-- 4 files changed, 352 insertions(+), 18 deletions(-) create mode 100644 migrations/20260324000000_create_issued_swap_calldata.sql create mode 100644 src/db/issued_swap_calldata.rs diff --git a/migrations/20260324000000_create_issued_swap_calldata.sql b/migrations/20260324000000_create_issued_swap_calldata.sql new file mode 100644 index 0000000..0ec6c21 --- /dev/null +++ b/migrations/20260324000000_create_issued_swap_calldata.sql @@ -0,0 +1,24 @@ +CREATE TABLE IF NOT EXISTS issued_swap_calldata ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + api_key_id INTEGER NOT NULL, + chain_id INTEGER NOT NULL, + taker TEXT NOT NULL, + to_address TEXT NOT NULL, + tx_value TEXT NOT NULL, + calldata TEXT NOT NULL, + calldata_hash TEXT NOT NULL, + input_token TEXT NOT NULL, + output_token TEXT NOT NULL, + output_amount TEXT NOT NULL, + maximum_io_ratio TEXT NOT NULL, + estimated_input TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + FOREIGN KEY (api_key_id) REFERENCES api_keys(id) ON DELETE CASCADE +); + +CREATE INDEX idx_issued_swap_calldata_api_key_id_created + ON issued_swap_calldata (api_key_id, created_at); +CREATE INDEX idx_issued_swap_calldata_calldata_hash_created + ON issued_swap_calldata (calldata_hash, created_at); +CREATE INDEX idx_issued_swap_calldata_taker_created + ON issued_swap_calldata (taker, created_at); diff --git a/src/db/issued_swap_calldata.rs b/src/db/issued_swap_calldata.rs new file mode 100644 index 0000000..3cc3b72 --- /dev/null +++ b/src/db/issued_swap_calldata.rs @@ -0,0 +1,48 @@ +use crate::db::DbPool; +use crate::types::swap::{SwapCalldataRequest, SwapCalldataResponse}; +use alloy::hex::encode_prefixed; +use alloy::primitives::keccak256; + +pub(crate) async fn insert( + pool: &DbPool, + api_key_id: i64, + chain_id: u32, + request: &SwapCalldataRequest, + response: &SwapCalldataResponse, +) -> Result<(), sqlx::Error> { + let calldata = encode_prefixed(response.data.as_ref()); + let calldata_hash = encode_prefixed(keccak256(response.data.as_ref())); + + sqlx::query( + "INSERT INTO issued_swap_calldata ( + api_key_id, + chain_id, + taker, + to_address, + tx_value, + calldata, + calldata_hash, + input_token, + output_token, + output_amount, + maximum_io_ratio, + estimated_input + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(api_key_id) + .bind(chain_id as i64) + .bind(format!("{:#x}", request.taker)) + .bind(format!("{:#x}", response.to)) + .bind(response.value.to_string()) + .bind(calldata) + .bind(calldata_hash) + .bind(format!("{:#x}", request.input_token)) + .bind(format!("{:#x}", request.output_token)) + .bind(&request.output_amount) + .bind(&request.maximum_io_ratio) + .bind(&response.estimated_input) + .execute(pool) + .await?; + + Ok(()) +} diff --git a/src/db/mod.rs b/src/db/mod.rs index 5bbc962..f88e245 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod issued_swap_calldata; mod migrate; mod pool; pub(crate) mod settings; diff --git a/src/routes/swap/calldata.rs b/src/routes/swap/calldata.rs index cee7d06..0a40f52 100644 --- a/src/routes/swap/calldata.rs +++ b/src/routes/swap/calldata.rs @@ -1,5 +1,6 @@ use super::{RaindexSwapDataSource, SwapDataSource}; use crate::auth::AuthenticatedKey; +use crate::db::DbPool; use crate::error::{ApiError, ApiErrorResponse}; use crate::fairings::{GlobalRateLimit, TracingSpan}; use crate::types::swap::{SwapCalldataRequest, SwapCalldataResponse}; @@ -27,19 +28,21 @@ use tracing::Instrument; #[post("/calldata", data = "")] pub async fn post_swap_calldata( _global: GlobalRateLimit, - _key: AuthenticatedKey, + key: AuthenticatedKey, + pool: &State, shared_raindex: &State, span: TracingSpan, request: Json, ) -> Result, ApiError> { let req = request.into_inner(); + let pool = pool.inner().clone(); async move { - tracing::info!(body = ?req, "request received"); + tracing::info!(api_key_id = key.id, key_id = %key.key_id, body = ?req, "request received"); let raindex = shared_raindex.read().await; let ds = RaindexSwapDataSource { client: raindex.client(), }; - let response = process_swap_calldata(&ds, req).await?; + let response = handle_swap_calldata(&ds, &pool, key.id, req).await?; Ok(Json(response)) } .instrument(span.0) @@ -48,7 +51,7 @@ pub async fn post_swap_calldata( async fn process_swap_calldata( ds: &dyn SwapDataSource, - req: SwapCalldataRequest, + req: &SwapCalldataRequest, ) -> Result { let take_req = TakeOrdersRequest { taker: req.taker.to_string(), @@ -56,21 +59,71 @@ async fn process_swap_calldata( sell_token: req.input_token.to_string(), buy_token: req.output_token.to_string(), mode: TakeOrdersMode::BuyUpTo, - amount: req.output_amount, - price_cap: req.maximum_io_ratio, + amount: req.output_amount.clone(), + price_cap: req.maximum_io_ratio.clone(), }; ds.get_calldata(take_req).await } +async fn handle_swap_calldata( + ds: &dyn SwapDataSource, + pool: &DbPool, + api_key_id: i64, + req: SwapCalldataRequest, +) -> Result { + let response = process_swap_calldata(ds, &req).await?; + persist_issued_swap_calldata(pool, api_key_id, &req, &response).await?; + Ok(response) +} + +async fn persist_issued_swap_calldata( + pool: &DbPool, + api_key_id: i64, + request: &SwapCalldataRequest, + response: &SwapCalldataResponse, +) -> Result<(), ApiError> { + if !should_persist_issued_swap_calldata(response) { + return Ok(()); + } + + crate::db::issued_swap_calldata::insert(pool, api_key_id, crate::CHAIN_ID, request, response) + .await + .map_err(|error| { + tracing::error!( + error = %error, + api_key_id, + taker = %request.taker, + to = %response.to, + "failed to persist issued swap calldata" + ); + ApiError::Internal("failed to persist issued swap calldata".into()) + })?; + + tracing::info!( + api_key_id, + taker = %request.taker, + to = %response.to, + "persisted issued swap calldata" + ); + + Ok(()) +} + +fn should_persist_issued_swap_calldata(response: &SwapCalldataResponse) -> bool { + !response.data.is_empty() && response.approvals.is_empty() +} + #[cfg(test)] mod tests { use super::*; use crate::routes::swap::test_fixtures::MockSwapDataSource; - use crate::test_helpers::TestClientBuilder; + use crate::test_helpers::{basic_auth_header, seed_api_key, TestClientBuilder}; use crate::types::common::Approval; - use alloy::primitives::{address, Address, Bytes, U256}; - use rocket::http::{ContentType, Status}; + use alloy::hex::encode_prefixed; + use alloy::primitives::{address, keccak256, Address, Bytes, U256}; + use rocket::http::{ContentType, Header, Status}; + use sqlx::FromRow; const USDC: Address = address!("833589fCD6eDb6E08f4c7C32D4f71b54bdA02913"); const WETH: Address = address!("4200000000000000000000000000000000000006"); @@ -120,9 +173,8 @@ mod tests { candidates: vec![], calldata_result: Ok(ready_response()), }; - let result = process_swap_calldata(&ds, calldata_request("100", "2.5")) - .await - .unwrap(); + let request = calldata_request("100", "2.5"); + let result = process_swap_calldata(&ds, &request).await.unwrap(); assert_eq!(result.to, ORDERBOOK); assert!(!result.data.is_empty()); @@ -138,9 +190,8 @@ mod tests { candidates: vec![], calldata_result: Ok(approval_response()), }; - let result = process_swap_calldata(&ds, calldata_request("100", "2.5")) - .await - .unwrap(); + let request = calldata_request("100", "2.5"); + let result = process_swap_calldata(&ds, &request).await.unwrap(); assert_eq!(result.to, ORDERBOOK); assert!(result.data.is_empty()); @@ -158,7 +209,8 @@ mod tests { "no liquidity found for this pair".into(), )), }; - let result = process_swap_calldata(&ds, calldata_request("100", "2.5")).await; + let request = calldata_request("100", "2.5"); + let result = process_swap_calldata(&ds, &request).await; assert!(matches!(result, Err(ApiError::NotFound(msg)) if msg.contains("no liquidity"))); } @@ -169,7 +221,8 @@ mod tests { candidates: vec![], calldata_result: Err(ApiError::BadRequest("invalid parameters".into())), }; - let result = process_swap_calldata(&ds, calldata_request("not-a-number", "2.5")).await; + let request = calldata_request("not-a-number", "2.5"); + let result = process_swap_calldata(&ds, &request).await; assert!(matches!(result, Err(ApiError::BadRequest(_)))); } @@ -180,10 +233,184 @@ mod tests { candidates: vec![], calldata_result: Err(ApiError::Internal("failed to generate calldata".into())), }; - let result = process_swap_calldata(&ds, calldata_request("100", "2.5")).await; + let request = calldata_request("100", "2.5"); + let result = process_swap_calldata(&ds, &request).await; assert!(matches!(result, Err(ApiError::Internal(_)))); } + #[derive(Debug, FromRow)] + struct IssuedSwapCalldataRow { + api_key_id: i64, + chain_id: i64, + taker: String, + to_address: String, + tx_value: String, + calldata: String, + calldata_hash: String, + input_token: String, + output_token: String, + output_amount: String, + maximum_io_ratio: String, + estimated_input: String, + } + + async fn fetch_issued_rows(pool: &DbPool) -> Vec { + sqlx::query_as::<_, IssuedSwapCalldataRow>( + "SELECT + api_key_id, + chain_id, + taker, + to_address, + tx_value, + calldata, + calldata_hash, + input_token, + output_token, + output_amount, + maximum_io_ratio, + estimated_input + FROM issued_swap_calldata + ORDER BY id", + ) + .fetch_all(pool) + .await + .expect("query issued swap calldata") + } + + async fn fetch_api_key_id(pool: &DbPool, key_id: &str) -> i64 { + let row: (i64,) = sqlx::query_as("SELECT id FROM api_keys WHERE key_id = ?") + .bind(key_id) + .fetch_one(pool) + .await + .expect("api key"); + row.0 + } + + async fn issued_row_count(pool: &DbPool) -> i64 { + let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM issued_swap_calldata") + .fetch_one(pool) + .await + .expect("query issued swap calldata count"); + row.0 + } + + #[rocket::async_test] + async fn test_handle_swap_calldata_persists_executable_payload() { + let client = TestClientBuilder::new().build().await; + let (key_id, _secret) = seed_api_key(&client).await; + let pool = client.rocket().state::().expect("pool"); + let api_key_id = fetch_api_key_id(pool, &key_id).await; + + let ds = MockSwapDataSource { + orders: Ok(vec![]), + candidates: vec![], + calldata_result: Ok(ready_response()), + }; + let request = calldata_request("100", "2.5"); + + let response = handle_swap_calldata(&ds, pool, api_key_id, request.clone()) + .await + .expect("successful swap calldata"); + + assert_eq!(response.to, ORDERBOOK); + assert_eq!(response.data, ready_response().data); + assert_eq!(response.value, U256::ZERO); + assert_eq!(response.estimated_input, "150"); + assert!(response.approvals.is_empty()); + + let rows = fetch_issued_rows(pool).await; + assert_eq!(rows.len(), 1); + let row = &rows[0]; + assert_eq!(row.api_key_id, api_key_id); + assert_eq!(row.chain_id, crate::CHAIN_ID as i64); + assert_eq!(row.taker, format!("{:#x}", request.taker)); + assert_eq!(row.to_address, format!("{:#x}", ORDERBOOK)); + assert_eq!(row.tx_value, U256::ZERO.to_string()); + assert_eq!(row.calldata, encode_prefixed(response.data.as_ref())); + assert_eq!( + row.calldata_hash, + encode_prefixed(keccak256(response.data.as_ref())) + ); + assert_eq!(row.input_token, format!("{:#x}", request.input_token)); + assert_eq!(row.output_token, format!("{:#x}", request.output_token)); + assert_eq!(row.output_amount, request.output_amount); + assert_eq!(row.maximum_io_ratio, request.maximum_io_ratio); + assert_eq!(row.estimated_input, response.estimated_input); + } + + #[rocket::async_test] + async fn test_handle_swap_calldata_skips_approval_only_payload() { + let client = TestClientBuilder::new().build().await; + let (key_id, _secret) = seed_api_key(&client).await; + let pool = client.rocket().state::().expect("pool"); + let api_key_id = fetch_api_key_id(pool, &key_id).await; + + let ds = MockSwapDataSource { + orders: Ok(vec![]), + candidates: vec![], + calldata_result: Ok(approval_response()), + }; + + handle_swap_calldata(&ds, pool, api_key_id, calldata_request("100", "2.5")) + .await + .expect("successful approval response"); + + let rows = fetch_issued_rows(pool).await; + assert!(rows.is_empty()); + } + + #[rocket::async_test] + async fn test_handle_swap_calldata_allows_duplicate_payloads() { + let client = TestClientBuilder::new().build().await; + let (key_id, _secret) = seed_api_key(&client).await; + let pool = client.rocket().state::().expect("pool"); + let api_key_id = fetch_api_key_id(pool, &key_id).await; + + let ds = MockSwapDataSource { + orders: Ok(vec![]), + candidates: vec![], + calldata_result: Ok(ready_response()), + }; + let request = calldata_request("100", "2.5"); + + handle_swap_calldata(&ds, pool, api_key_id, request.clone()) + .await + .expect("first swap calldata"); + handle_swap_calldata(&ds, pool, api_key_id, request) + .await + .expect("second swap calldata"); + + let rows = fetch_issued_rows(pool).await; + assert_eq!(rows.len(), 2); + assert_eq!(rows[0].calldata_hash, rows[1].calldata_hash); + } + + #[test] + fn test_should_persist_issued_swap_calldata() { + assert!(should_persist_issued_swap_calldata(&ready_response())); + assert!(!should_persist_issued_swap_calldata(&approval_response())); + } + + #[rocket::async_test] + async fn test_handle_swap_calldata_returns_internal_if_persistence_fails() { + let client = TestClientBuilder::new().build().await; + let pool = client.rocket().state::().expect("pool"); + + let ds = MockSwapDataSource { + orders: Ok(vec![]), + candidates: vec![], + calldata_result: Ok(ready_response()), + }; + + let result = + handle_swap_calldata(&ds, pool, i64::MAX, calldata_request("100", "2.5")).await; + assert!(matches!( + result, + Err(ApiError::Internal(msg)) if msg == "failed to persist issued swap calldata" + )); + assert_eq!(issued_row_count(pool).await, 0); + } + #[rocket::async_test] async fn test_swap_calldata_401_without_auth() { let client = TestClientBuilder::new().build().await; @@ -195,4 +422,38 @@ mod tests { .await; assert_eq!(response.status(), Status::Unauthorized); } + + #[rocket::async_test] + async fn test_unauthenticated_swap_calldata_creates_no_issued_row() { + let client = TestClientBuilder::new().build().await; + let response = client + .post("/v1/swap/calldata") + .header(ContentType::JSON) + .body(r#"{"taker":"0x1111111111111111111111111111111111111111","inputToken":"0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913","outputToken":"0x4200000000000000000000000000000000000006","outputAmount":"100","maximumIoRatio":"2.5"}"#) + .dispatch() + .await; + assert_eq!(response.status(), Status::Unauthorized); + + let pool = client.rocket().state::().expect("pool"); + assert_eq!(issued_row_count(pool).await, 0); + } + + #[rocket::async_test] + async fn test_failed_authenticated_swap_calldata_creates_no_issued_row() { + let client = TestClientBuilder::new().build().await; + let (key_id, secret) = seed_api_key(&client).await; + let header = Header::new("Authorization", basic_auth_header(&key_id, &secret)); + + let response = client + .post("/v1/swap/calldata") + .header(ContentType::JSON) + .header(header) + .body(r#"{"taker":"0x1111111111111111111111111111111111111111","inputToken":"0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913","outputToken":"0x4200000000000000000000000000000000000006","outputAmount":"not-a-number","maximumIoRatio":"2.5"}"#) + .dispatch() + .await; + assert_eq!(response.status(), Status::BadRequest); + + let pool = client.rocket().state::().expect("pool"); + assert_eq!(issued_row_count(pool).await, 0); + } } From f4cf44e01028a11d4f6ef023d8f7cd4ac0c28677 Mon Sep 17 00:00:00 2001 From: Arda Nakisci Date: Tue, 24 Mar 2026 15:32:19 +0300 Subject: [PATCH 2/5] Add customer swap volume report --- ...0_create_customer_volume_report_tables.sql | 36 + src/cli.rs | 81 + src/main.rs | 114 +- src/reporting/customer_volume.rs | 1438 +++++++++++++++++ src/reporting/mod.rs | 1 + 5 files changed, 1623 insertions(+), 47 deletions(-) create mode 100644 migrations/20260324010000_create_customer_volume_report_tables.sql create mode 100644 src/reporting/customer_volume.rs create mode 100644 src/reporting/mod.rs diff --git a/migrations/20260324010000_create_customer_volume_report_tables.sql b/migrations/20260324010000_create_customer_volume_report_tables.sql new file mode 100644 index 0000000..d793b7e --- /dev/null +++ b/migrations/20260324010000_create_customer_volume_report_tables.sql @@ -0,0 +1,36 @@ +CREATE TABLE IF NOT EXISTS attributed_swap_txs ( + tx_hash TEXT PRIMARY KEY, + issued_swap_calldata_id INTEGER NOT NULL, + api_key_id INTEGER NOT NULL, + chain_id INTEGER NOT NULL, + sender TEXT NOT NULL, + to_address TEXT NOT NULL, + block_number INTEGER NOT NULL, + block_timestamp INTEGER NOT NULL, + input_token TEXT NOT NULL, + output_token TEXT NOT NULL, + total_input_amount TEXT NOT NULL, + total_output_amount TEXT NOT NULL, + matched_at TEXT NOT NULL DEFAULT (datetime('now')), + FOREIGN KEY (issued_swap_calldata_id) REFERENCES issued_swap_calldata(id) ON DELETE CASCADE, + FOREIGN KEY (api_key_id) REFERENCES api_keys(id) ON DELETE CASCADE +); + +CREATE INDEX idx_attributed_swap_txs_api_key_id_timestamp + ON attributed_swap_txs (api_key_id, block_timestamp); +CREATE INDEX idx_attributed_swap_txs_chain_id_timestamp + ON attributed_swap_txs (chain_id, block_timestamp); + +CREATE TABLE IF NOT EXISTS swap_report_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + report_name TEXT NOT NULL, + start_time INTEGER NOT NULL, + end_time INTEGER NOT NULL, + matched_count INTEGER NOT NULL, + unmatched_count INTEGER NOT NULL, + ambiguous_count INTEGER NOT NULL, + completed_at TEXT NOT NULL DEFAULT (datetime('now')) +); + +CREATE INDEX idx_swap_report_runs_name_completed + ON swap_report_runs (report_name, completed_at); diff --git a/src/cli.rs b/src/cli.rs index ad327f4..ad17bb3 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -27,6 +27,13 @@ pub enum Command { #[command(subcommand)] command: KeysCommand, }, + #[command(about = "Run internal reports")] + Report { + #[arg(long)] + config: PathBuf, + #[command(subcommand)] + command: ReportCommand, + }, } #[derive(Subcommand)] @@ -48,12 +55,26 @@ pub enum KeysCommand { Delete { key_id: String }, } +#[derive(Subcommand)] +pub enum ReportCommand { + #[command(about = "Report executed customer swap volume")] + CustomerVolume { + #[arg(long)] + start_time: u64, + #[arg(long)] + end_time: u64, + #[arg(long, default_value_t = false)] + json: bool, + }, +} + pub fn print_usage() { println!("Usage: st0x_rest_api "); println!(); println!("Commands:"); println!(" serve Start the API server"); println!(" keys Manage API keys"); + println!(" report Run internal reports"); println!(); println!("Run 'st0x_rest_api --help' for more information on a command."); } @@ -74,6 +95,32 @@ pub async fn handle_keys_command( } } +pub async fn handle_report_command( + command: ReportCommand, + pool: DbPool, + raindex: &crate::raindex::RaindexProvider, +) -> Result<(), Box> { + match command { + ReportCommand::CustomerVolume { + start_time, + end_time, + json, + } => { + crate::reporting::customer_volume::run( + &pool, + raindex, + crate::reporting::customer_volume::CustomerVolumeReportArgs { + start_time, + end_time, + json, + }, + ) + .await?; + Ok(()) + } + } +} + async fn create_key( pool: &DbPool, label: &str, @@ -237,6 +284,40 @@ mod tests { } } + #[test] + fn test_report_parses_config_flag() { + let cli = Cli::try_parse_from([ + "app", + "report", + "--config", + "/path/to/config.toml", + "customer-volume", + "--start-time", + "10", + "--end-time", + "20", + ]) + .expect("parse"); + + match cli.command { + Some(Command::Report { + config, + command: + ReportCommand::CustomerVolume { + start_time, + end_time, + json, + }, + }) => { + assert_eq!(config, PathBuf::from("/path/to/config.toml")); + assert_eq!(start_time, 10); + assert_eq!(end_time, 20); + assert!(!json); + } + _ => panic!("expected Report command"), + } + } + #[tokio::test] async fn test_create_key_inserts_row() { let pool = test_pool().await; diff --git a/src/main.rs b/src/main.rs index 440ab10..96980b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ mod db; mod error; mod fairings; mod raindex; +mod reporting; mod routes; mod telemetry; mod types; @@ -110,6 +111,47 @@ fn configure_cors() -> Result { .to_cors()?) } +async fn load_raindex_provider( + cfg: &config::Config, + pool: &db::DbPool, +) -> Result { + let db_url = db::settings::get_setting(pool, "registry_url") + .await + .map_err(|e| format!("failed to read registry_url from database: {e}"))? + .filter(|url| !url.is_empty()); + + let registry_url = match db_url { + Some(url) => { + tracing::info!(registry_url = %url, "loaded registry_url from database"); + url + } + None if !cfg.registry_url.is_empty() => { + if let Err(e) = db::settings::set_setting(pool, "registry_url", &cfg.registry_url).await + { + tracing::warn!(error = %e, "failed to seed registry_url into database"); + } + cfg.registry_url.clone() + } + None => { + return Err("registry_url not found in database and not set in config file".into()); + } + }; + + let local_db_path = std::path::PathBuf::from(&cfg.local_db_path); + if let Some(parent) = local_db_path.parent() { + std::fs::create_dir_all(parent).map_err(|e| { + format!( + "failed to create local db directory {}: {e}", + parent.display() + ) + })?; + } + + raindex::RaindexProvider::load(®istry_url, Some(local_db_path)) + .await + .map_err(|e| format!("failed to load raindex registry: {e}")) +} + pub(crate) fn rocket( pool: db::DbPool, rate_limiter: fairings::RateLimiter, @@ -159,7 +201,9 @@ async fn main() { }; let config_path = match &command { - cli::Command::Serve { config } | cli::Command::Keys { config, .. } => config.clone(), + cli::Command::Serve { config } + | cli::Command::Keys { config, .. } + | cli::Command::Report { config, .. } => config.clone(), }; let cfg = match config::Config::load(&config_path) { @@ -195,56 +239,13 @@ async fn main() { match command { cli::Command::Serve { .. } => { - let db_url = db::settings::get_setting(&pool, "registry_url") - .await - .ok() - .flatten(); - - let registry_url = match db_url { - Some(url) if !url.is_empty() => { - tracing::info!(registry_url = %url, "loaded registry_url from database"); - url - } - _ if !cfg.registry_url.is_empty() => { - if let Err(e) = - db::settings::set_setting(&pool, "registry_url", &cfg.registry_url).await - { - tracing::warn!(error = %e, "failed to seed registry_url into database"); - } - cfg.registry_url - } - _ => { - tracing::error!( - "registry_url not found in database and not set in config file" - ); - drop(log_guard); - std::process::exit(1); - } - }; - - let local_db_path = std::path::PathBuf::from(&cfg.local_db_path); - if let Some(parent) = local_db_path.parent() { - if !parent.exists() { - if let Err(e) = std::fs::create_dir_all(parent) { - tracing::error!(error = %e, path = %parent.display(), "failed to create local db directory"); - drop(log_guard); - std::process::exit(1); - } - } - } - - let raindex_config = match raindex::RaindexProvider::load( - ®istry_url, - Some(local_db_path), - ) - .await - { + let raindex_config = match load_raindex_provider(&cfg, &pool).await { Ok(config) => { - tracing::info!(registry_url = %registry_url, "raindex registry loaded"); + tracing::info!("raindex registry loaded"); config } Err(e) => { - tracing::error!(error = %e, registry_url = %registry_url, "failed to load raindex registry"); + tracing::error!(error = %e, "failed to load raindex registry"); drop(log_guard); std::process::exit(1); } @@ -283,6 +284,25 @@ async fn main() { std::process::exit(1); } } + cli::Command::Report { command, .. } => { + let raindex_config = match load_raindex_provider(&cfg, &pool).await { + Ok(config) => { + tracing::info!("raindex registry loaded for report"); + config + } + Err(e) => { + tracing::error!(error = %e, "failed to load raindex registry"); + drop(log_guard); + std::process::exit(1); + } + }; + + if let Err(e) = cli::handle_report_command(command, pool, &raindex_config).await { + tracing::error!(error = %e, "report command failed"); + drop(log_guard); + std::process::exit(1); + } + } } drop(log_guard); diff --git a/src/reporting/customer_volume.rs b/src/reporting/customer_volume.rs new file mode 100644 index 0000000..17ccf6b --- /dev/null +++ b/src/reporting/customer_volume.rs @@ -0,0 +1,1438 @@ +use crate::db::DbPool; +use crate::raindex::RaindexProvider; +use rain_math_float::Float; +use rain_orderbook_common::raindex_client::RaindexError; +use reqwest::Client as HttpClient; +use reqwest::Url; +use serde::{Deserialize, Serialize}; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; +use sqlx::{FromRow, QueryBuilder}; +use std::collections::{BTreeMap, HashMap}; +use std::ops::Add; + +const MATCH_WINDOW_SECS: i64 = 5 * 60; + +#[derive(Debug, Clone, Copy)] +pub(crate) struct CustomerVolumeReportArgs { + pub start_time: u64, + pub end_time: u64, + pub json: bool, +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum CustomerVolumeReportError { + #[error("start_time must be less than or equal to end_time")] + InvalidWindow, + #[error("raindex local db path is unavailable")] + MissingLocalDbPath, + #[error("raindex local db file not found: {path}")] + MissingLocalDb { path: String }, + #[error("no orderbooks configured for chain {chain_id}")] + MissingOrderbooks { chain_id: u32 }, + #[error("no RPC URLs configured for chain {chain_id}")] + MissingRpcUrls { chain_id: u32 }, + #[error("database error: {0}")] + Database(#[from] sqlx::Error), + #[error("http error: {0}")] + Http(#[from] reqwest::Error), + #[error("json error: {0}")] + Json(#[from] serde_json::Error), + #[error("float error: {0}")] + Float(#[from] rain_math_float::FloatError), + #[error("raindex client error: {0}")] + Raindex(#[from] RaindexError), + #[error("failed to fetch transaction input for {tx_hash}: {message}")] + RpcLookup { tx_hash: String, message: String }, + #[error("transaction {tx_hash} was not found via RPC")] + TransactionNotFound { tx_hash: String }, + #[error("inconsistent candidate transaction data for {tx_hash}")] + InconsistentCandidate { tx_hash: String }, +} + +#[derive(Debug, Clone, FromRow)] +struct IssuedSwapRow { + id: i64, + api_key_id: i64, + key_id: String, + label: String, + owner: String, + taker: String, + to_address: String, + calldata: String, + input_token: String, + output_token: String, + created_at_unix: i64, +} + +#[derive(Debug, Clone, FromRow)] +struct TakeOrderRow { + orderbook_address: String, + transaction_hash: String, + block_number: i64, + block_timestamp: i64, + sender: String, + taker_input: String, + taker_output: String, +} + +#[derive(Debug, Clone)] +struct CandidateExecution { + tx_hash: String, + orderbook_address: String, + sender: String, + block_number: u64, + timestamp: u64, + total_input_hex: String, + total_output_hex: String, +} + +#[derive(Debug, Clone)] +struct MatchedExecution { + issued_swap_calldata_id: i64, + api_key_id: i64, + key_id: String, + label: String, + owner: String, + tx_hash: String, + sender: String, + to_address: String, + block_number: u64, + timestamp: u64, + input_token: String, + output_token: String, + total_input_hex: String, + total_output_hex: String, + duplicate_match_count: usize, +} + +#[derive(Debug, Clone, Serialize)] +struct ReportAudit { + matched_executions: usize, + unmatched_executions: usize, + ambiguous_matches: usize, +} + +#[derive(Debug, Clone, Serialize)] +struct PairVolume { + input_token: String, + output_token: String, + total_input_amount: String, + total_output_amount: String, +} + +#[derive(Debug, Clone, Serialize)] +struct CustomerVolumeSummary { + api_key_id: i64, + key_id: String, + label: String, + owner: String, + executed_txs: usize, + pairs: Vec, +} + +#[derive(Debug, Clone, Serialize)] +struct MatchedTransactionReport { + tx_hash: String, + api_key_id: i64, + key_id: String, + label: String, + owner: String, + sender: String, + to_address: String, + block_number: u64, + timestamp: u64, + input_token: String, + output_token: String, + total_input_amount: String, + total_output_amount: String, + duplicate_match_count: usize, +} + +#[derive(Debug, Clone, Serialize)] +struct UnmatchedTransactionReport { + tx_hash: String, + sender: String, + to_address: String, + block_number: u64, + timestamp: u64, + total_input_amount: String, + total_output_amount: String, +} + +#[derive(Debug, Clone, Serialize)] +struct CustomerVolumeReport { + start_time: u64, + end_time: u64, + audit: ReportAudit, + customers: Vec, + matched_transactions: Vec, + unmatched_transactions: Vec, +} + +#[derive(Debug)] +struct BuiltCustomerVolumeReport { + report: CustomerVolumeReport, + matched_executions: Vec, +} + +type IssuedRowIndex<'a> = HashMap<(String, String, String), Vec<&'a IssuedSwapRow>>; + +#[derive(Debug, Deserialize)] +struct JsonRpcEnvelope { + result: Option, + error: Option, +} + +#[derive(Debug, Deserialize)] +struct JsonRpcError { + message: String, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct RpcTransactionResponse { + input: String, +} + +pub(crate) async fn run( + pool: &DbPool, + raindex: &RaindexProvider, + args: CustomerVolumeReportArgs, +) -> Result<(), CustomerVolumeReportError> { + if args.start_time > args.end_time { + return Err(CustomerVolumeReportError::InvalidWindow); + } + + tracing::info!( + start_time = args.start_time, + end_time = args.end_time, + "customer volume report requested" + ); + + let built_report = build_report(pool, raindex, args.start_time, args.end_time).await?; + persist_matches(pool, &built_report.matched_executions).await?; + persist_report_run(pool, &built_report.report).await?; + + if args.json { + println!("{}", serde_json::to_string_pretty(&built_report.report)?); + } else { + print_human_report(&built_report.report); + } + + tracing::info!( + matched = built_report.report.audit.matched_executions, + unmatched = built_report.report.audit.unmatched_executions, + ambiguous = built_report.report.audit.ambiguous_matches, + "customer volume report complete" + ); + + Ok(()) +} + +async fn build_report( + pool: &DbPool, + raindex: &RaindexProvider, + start_time: u64, + end_time: u64, +) -> Result { + let chain_orderbooks = orderbooks_for_chain(raindex)?; + let issued_rows = load_issued_rows(pool, start_time, end_time).await?; + let issued_row_index = build_issued_row_index(&issued_rows); + + tracing::info!( + orderbook_count = chain_orderbooks.len(), + issued_count = issued_rows.len(), + "loaded issued swap rows for customer volume report" + ); + + let raindex_db = open_raindex_db(raindex).await?; + let candidate_rows = + load_take_order_rows(&raindex_db, &chain_orderbooks, start_time, end_time).await?; + let candidates = aggregate_candidate_rows(candidate_rows)?; + let rpc_urls = rpc_urls_for_chain(raindex)?; + let rpc = HttpClient::new(); + + tracing::info!( + candidate_count = candidates.len(), + rpc_url_count = rpc_urls.len(), + "loaded candidate executions from raindex db" + ); + + let mut matched = Vec::new(); + let mut unmatched = Vec::new(); + let mut ambiguous_matches = 0usize; + + for candidate in candidates { + let tx_input = fetch_transaction_input(&rpc, &rpc_urls, &candidate.tx_hash).await?; + let selection = select_issued_row(&candidate, &tx_input, &issued_row_index); + + if selection.duplicate_match_count > 1 { + ambiguous_matches += 1; + } + + match selection.row { + Some(row) => matched.push(MatchedExecution { + issued_swap_calldata_id: row.id, + api_key_id: row.api_key_id, + key_id: row.key_id.clone(), + label: row.label.clone(), + owner: row.owner.clone(), + tx_hash: candidate.tx_hash.clone(), + sender: candidate.sender.clone(), + to_address: candidate.orderbook_address.clone(), + block_number: candidate.block_number, + timestamp: candidate.timestamp, + input_token: row.input_token.clone(), + output_token: row.output_token.clone(), + total_input_hex: candidate.total_input_hex.clone(), + total_output_hex: candidate.total_output_hex.clone(), + duplicate_match_count: selection.duplicate_match_count, + }), + None => unmatched.push(UnmatchedTransactionReport { + tx_hash: candidate.tx_hash.clone(), + sender: candidate.sender.clone(), + to_address: candidate.orderbook_address.clone(), + block_number: candidate.block_number, + timestamp: candidate.timestamp, + total_input_amount: format_hex(&candidate.total_input_hex)?, + total_output_amount: format_hex(&candidate.total_output_hex)?, + }), + } + } + + let customers = build_customer_summaries(&matched)?; + let matched_transactions = matched + .iter() + .map(|execution| { + Ok(MatchedTransactionReport { + tx_hash: execution.tx_hash.clone(), + api_key_id: execution.api_key_id, + key_id: execution.key_id.clone(), + label: execution.label.clone(), + owner: execution.owner.clone(), + sender: execution.sender.clone(), + to_address: execution.to_address.clone(), + block_number: execution.block_number, + timestamp: execution.timestamp, + input_token: execution.input_token.clone(), + output_token: execution.output_token.clone(), + total_input_amount: format_hex(&execution.total_input_hex)?, + total_output_amount: format_hex(&execution.total_output_hex)?, + duplicate_match_count: execution.duplicate_match_count, + }) + }) + .collect::, CustomerVolumeReportError>>()?; + + Ok(BuiltCustomerVolumeReport { + report: CustomerVolumeReport { + start_time, + end_time, + audit: ReportAudit { + matched_executions: matched_transactions.len(), + unmatched_executions: unmatched.len(), + ambiguous_matches, + }, + customers, + matched_transactions, + unmatched_transactions: unmatched, + }, + matched_executions: matched, + }) +} + +fn orderbooks_for_chain( + raindex: &RaindexProvider, +) -> Result, CustomerVolumeReportError> { + let mut addresses = raindex + .client() + .get_all_orderbooks()? + .into_values() + .filter(|orderbook| orderbook.network.chain_id == crate::CHAIN_ID) + .map(|orderbook| normalize_address(&format!("{:#x}", orderbook.address))) + .collect::>(); + + addresses.sort(); + addresses.dedup(); + + if addresses.is_empty() { + return Err(CustomerVolumeReportError::MissingOrderbooks { + chain_id: crate::CHAIN_ID, + }); + } + + Ok(addresses) +} + +fn rpc_urls_for_chain(raindex: &RaindexProvider) -> Result, CustomerVolumeReportError> { + let network = raindex.client().get_network_by_chain_id(crate::CHAIN_ID)?; + if network.rpcs.is_empty() { + return Err(CustomerVolumeReportError::MissingRpcUrls { + chain_id: crate::CHAIN_ID, + }); + } + Ok(network.rpcs) +} + +async fn open_raindex_db( + raindex: &RaindexProvider, +) -> Result, CustomerVolumeReportError> { + let path = raindex + .db_path() + .ok_or(CustomerVolumeReportError::MissingLocalDbPath)?; + + if !path.exists() { + return Err(CustomerVolumeReportError::MissingLocalDb { + path: path.display().to_string(), + }); + } + + let options = SqliteConnectOptions::new().filename(path).read_only(true); + + SqlitePoolOptions::new() + .max_connections(1) + .connect_with(options) + .await + .map_err(CustomerVolumeReportError::Database) +} + +async fn load_issued_rows( + pool: &DbPool, + start_time: u64, + end_time: u64, +) -> Result, CustomerVolumeReportError> { + let min_issue_time = start_time.saturating_sub(MATCH_WINDOW_SECS as u64) as i64; + let max_issue_time = end_time as i64; + + sqlx::query_as::<_, IssuedSwapRow>( + "SELECT + isc.id, + isc.api_key_id, + ak.key_id, + ak.label, + ak.owner, + isc.taker, + isc.to_address, + isc.calldata, + isc.input_token, + isc.output_token, + CAST(strftime('%s', isc.created_at) AS INTEGER) AS created_at_unix + FROM issued_swap_calldata isc + JOIN api_keys ak ON ak.id = isc.api_key_id + WHERE isc.chain_id = ? + AND CAST(strftime('%s', isc.created_at) AS INTEGER) >= ? + AND CAST(strftime('%s', isc.created_at) AS INTEGER) <= ? + ORDER BY created_at_unix DESC, isc.id DESC", + ) + .bind(crate::CHAIN_ID as i64) + .bind(min_issue_time) + .bind(max_issue_time) + .fetch_all(pool) + .await + .map_err(CustomerVolumeReportError::Database) +} + +async fn load_take_order_rows( + raindex_db: &sqlx::Pool, + orderbook_addresses: &[String], + start_time: u64, + end_time: u64, +) -> Result, CustomerVolumeReportError> { + let mut query_builder = QueryBuilder::::new( + "SELECT + orderbook_address, + transaction_hash, + block_number, + block_timestamp, + sender, + taker_input, + taker_output + FROM take_orders + WHERE chain_id = ", + ); + + query_builder + .push_bind(crate::CHAIN_ID as i64) + .push(" AND block_timestamp >= ") + .push_bind(start_time as i64) + .push(" AND block_timestamp <= ") + .push_bind(end_time as i64) + .push(" AND orderbook_address IN ("); + + let mut separated = query_builder.separated(", "); + for orderbook in orderbook_addresses { + separated.push_bind(orderbook); + } + separated.push_unseparated(")"); + query_builder.push(" ORDER BY block_number, log_index"); + + query_builder + .build_query_as::() + .fetch_all(raindex_db) + .await + .map_err(CustomerVolumeReportError::Database) +} + +fn aggregate_candidate_rows( + rows: Vec, +) -> Result, CustomerVolumeReportError> { + let mut aggregates: HashMap = HashMap::new(); + + for row in rows { + let tx_hash = normalize_hex(&row.transaction_hash); + let sender = normalize_address(&row.sender); + let orderbook_address = normalize_address(&row.orderbook_address); + let input_delta = normalize_hex(&row.taker_output); + let output_delta = normalize_hex(&row.taker_input); + + match aggregates.get_mut(&tx_hash) { + Some(existing) => { + if existing.sender != sender + || existing.orderbook_address != orderbook_address + || existing.block_number != row.block_number as u64 + || existing.timestamp != row.block_timestamp as u64 + { + return Err(CustomerVolumeReportError::InconsistentCandidate { tx_hash }); + } + + add_hex_amount(&mut existing.total_input_hex, &input_delta)?; + add_hex_amount(&mut existing.total_output_hex, &output_delta)?; + } + None => { + let mut total_input_hex = zero_hex()?; + add_hex_amount(&mut total_input_hex, &input_delta)?; + + let mut total_output_hex = zero_hex()?; + add_hex_amount(&mut total_output_hex, &output_delta)?; + + aggregates.insert( + tx_hash.clone(), + CandidateExecution { + tx_hash, + orderbook_address, + sender, + block_number: row.block_number as u64, + timestamp: row.block_timestamp as u64, + total_input_hex, + total_output_hex, + }, + ); + } + } + } + + let mut candidates = aggregates.into_values().collect::>(); + candidates.sort_by(|left, right| { + left.timestamp + .cmp(&right.timestamp) + .then_with(|| left.block_number.cmp(&right.block_number)) + .then_with(|| left.tx_hash.cmp(&right.tx_hash)) + }); + Ok(candidates) +} + +struct Selection<'a> { + row: Option<&'a IssuedSwapRow>, + duplicate_match_count: usize, +} + +fn build_issued_row_index(issued_rows: &[IssuedSwapRow]) -> IssuedRowIndex<'_> { + let mut index = HashMap::new(); + + for row in issued_rows { + index + .entry(issued_row_key(&row.taker, &row.to_address, &row.calldata)) + .or_insert_with(Vec::new) + .push(row); + } + + for rows in index.values_mut() { + rows.sort_by(|left, right| { + right + .created_at_unix + .cmp(&left.created_at_unix) + .then_with(|| right.id.cmp(&left.id)) + }); + } + + index +} + +fn issued_row_key(taker: &str, to_address: &str, calldata: &str) -> (String, String, String) { + ( + normalize_address(taker), + normalize_address(to_address), + normalize_hex(calldata), + ) +} + +fn select_issued_row<'a>( + candidate: &CandidateExecution, + tx_input: &str, + issued_row_index: &IssuedRowIndex<'a>, +) -> Selection<'a> { + let candidate_timestamp = candidate.timestamp as i64; + let issued_rows = match issued_row_index.get(&issued_row_key( + &candidate.sender, + &candidate.orderbook_address, + tx_input, + )) { + Some(rows) => rows, + None => { + return Selection { + row: None, + duplicate_match_count: 0, + }; + } + }; + + let mut matched_row = None; + let mut duplicate_match_count = 0; + + for row in issued_rows { + if row.created_at_unix > candidate_timestamp { + continue; + } + + if candidate_timestamp <= row.created_at_unix + MATCH_WINDOW_SECS { + duplicate_match_count += 1; + if matched_row.is_none() { + matched_row = Some(*row); + } + continue; + } + + break; + } + + Selection { + row: matched_row, + duplicate_match_count, + } +} + +fn build_customer_summaries( + executions: &[MatchedExecution], +) -> Result, CustomerVolumeReportError> { + #[derive(Debug)] + struct PairAccumulator { + total_input_hex: String, + total_output_hex: String, + } + + #[derive(Debug)] + struct CustomerAccumulator { + api_key_id: i64, + key_id: String, + label: String, + owner: String, + executed_txs: usize, + pairs: BTreeMap<(String, String), PairAccumulator>, + } + + let mut customers: BTreeMap = BTreeMap::new(); + + for execution in executions { + let customer = + customers + .entry(execution.api_key_id) + .or_insert_with(|| CustomerAccumulator { + api_key_id: execution.api_key_id, + key_id: execution.key_id.clone(), + label: execution.label.clone(), + owner: execution.owner.clone(), + executed_txs: 0, + pairs: BTreeMap::new(), + }); + + customer.executed_txs += 1; + + let pair_key = ( + execution.input_token.clone(), + execution.output_token.clone(), + ); + match customer.pairs.entry(pair_key) { + std::collections::btree_map::Entry::Occupied(mut entry) => { + add_hex_amount( + &mut entry.get_mut().total_input_hex, + &execution.total_input_hex, + )?; + add_hex_amount( + &mut entry.get_mut().total_output_hex, + &execution.total_output_hex, + )?; + } + std::collections::btree_map::Entry::Vacant(entry) => { + let mut total_input_hex = zero_hex()?; + add_hex_amount(&mut total_input_hex, &execution.total_input_hex)?; + + let mut total_output_hex = zero_hex()?; + add_hex_amount(&mut total_output_hex, &execution.total_output_hex)?; + + entry.insert(PairAccumulator { + total_input_hex, + total_output_hex, + }); + } + } + } + + customers + .into_values() + .map(|customer| { + let pairs = customer + .pairs + .into_iter() + .map(|((input_token, output_token), pair)| { + Ok(PairVolume { + input_token, + output_token, + total_input_amount: format_hex(&pair.total_input_hex)?, + total_output_amount: format_hex(&pair.total_output_hex)?, + }) + }) + .collect::, CustomerVolumeReportError>>()?; + + Ok(CustomerVolumeSummary { + api_key_id: customer.api_key_id, + key_id: customer.key_id, + label: customer.label, + owner: customer.owner, + executed_txs: customer.executed_txs, + pairs, + }) + }) + .collect() +} + +async fn fetch_transaction_input( + client: &HttpClient, + rpc_urls: &[Url], + tx_hash: &str, +) -> Result { + let payload = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "eth_getTransactionByHash", + "params": [tx_hash], + }); + + let mut last_error = None; + + for rpc_url in rpc_urls { + let response = match client.post(rpc_url.clone()).json(&payload).send().await { + Ok(response) => response, + Err(error) => { + last_error = Some(error.to_string()); + continue; + } + }; + + let envelope = match response + .json::>() + .await + { + Ok(envelope) => envelope, + Err(error) => { + last_error = Some(error.to_string()); + continue; + } + }; + + if let Some(result) = envelope.result { + return Ok(normalize_hex(&result.input)); + } + + if let Some(error) = envelope.error { + last_error = Some(error.message); + } + } + + if let Some(message) = last_error { + return Err(CustomerVolumeReportError::RpcLookup { + tx_hash: tx_hash.to_string(), + message, + }); + } + + Err(CustomerVolumeReportError::TransactionNotFound { + tx_hash: tx_hash.to_string(), + }) +} + +async fn persist_matches( + pool: &DbPool, + matches: &[MatchedExecution], +) -> Result<(), CustomerVolumeReportError> { + let mut transaction = pool.begin().await?; + + for entry in matches { + sqlx::query( + "INSERT INTO attributed_swap_txs ( + tx_hash, + issued_swap_calldata_id, + api_key_id, + chain_id, + sender, + to_address, + block_number, + block_timestamp, + input_token, + output_token, + total_input_amount, + total_output_amount + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(tx_hash) DO UPDATE SET + issued_swap_calldata_id = excluded.issued_swap_calldata_id, + api_key_id = excluded.api_key_id, + chain_id = excluded.chain_id, + sender = excluded.sender, + to_address = excluded.to_address, + block_number = excluded.block_number, + block_timestamp = excluded.block_timestamp, + input_token = excluded.input_token, + output_token = excluded.output_token, + total_input_amount = excluded.total_input_amount, + total_output_amount = excluded.total_output_amount", + ) + .bind(&entry.tx_hash) + .bind(entry.issued_swap_calldata_id) + .bind(entry.api_key_id) + .bind(crate::CHAIN_ID as i64) + .bind(&entry.sender) + .bind(&entry.to_address) + .bind(entry.block_number as i64) + .bind(entry.timestamp as i64) + .bind(&entry.input_token) + .bind(&entry.output_token) + .bind(format_hex(&entry.total_input_hex)?) + .bind(format_hex(&entry.total_output_hex)?) + .execute(&mut *transaction) + .await?; + } + + transaction.commit().await?; + Ok(()) +} + +async fn persist_report_run( + pool: &DbPool, + report: &CustomerVolumeReport, +) -> Result<(), CustomerVolumeReportError> { + sqlx::query( + "INSERT INTO swap_report_runs ( + report_name, + start_time, + end_time, + matched_count, + unmatched_count, + ambiguous_count + ) VALUES (?, ?, ?, ?, ?, ?)", + ) + .bind("customer-volume") + .bind(report.start_time as i64) + .bind(report.end_time as i64) + .bind(report.audit.matched_executions as i64) + .bind(report.audit.unmatched_executions as i64) + .bind(report.audit.ambiguous_matches as i64) + .execute(pool) + .await?; + + Ok(()) +} + +fn print_human_report(report: &CustomerVolumeReport) { + println!("Customer volume report"); + println!("Window: {} -> {}", report.start_time, report.end_time); + println!( + "Matched executed txs: {} | Unmatched executed txs: {} | Ambiguous matches: {}", + report.audit.matched_executions, + report.audit.unmatched_executions, + report.audit.ambiguous_matches + ); + println!(); + + if report.customers.is_empty() { + println!("No matched customer executions found."); + return; + } + + for customer in &report.customers { + println!( + "{} ({}) [{}] executions={}", + customer.label, customer.owner, customer.key_id, customer.executed_txs + ); + + for pair in &customer.pairs { + println!( + " {} -> {} | input={} | output={}", + pair.input_token, + pair.output_token, + pair.total_input_amount, + pair.total_output_amount + ); + } + + println!(); + } +} + +fn normalize_address(value: &str) -> String { + let trimmed = value.trim(); + if let Some(address) = trimmed + .strip_prefix("0x") + .or_else(|| trimmed.strip_prefix("0X")) + { + format!("0x{}", address.to_ascii_lowercase()) + } else { + format!("0x{}", trimmed.to_ascii_lowercase()) + } +} + +fn normalize_hex(value: &str) -> String { + let trimmed = value.trim(); + if let Some(hex) = trimmed + .strip_prefix("0x") + .or_else(|| trimmed.strip_prefix("0X")) + { + format!("0x{}", hex.to_ascii_lowercase()) + } else { + format!("0x{}", trimmed.to_ascii_lowercase()) + } +} + +fn zero_hex() -> Result { + Ok(Float::zero()?.as_hex().to_ascii_lowercase()) +} + +fn add_hex_amount( + total_hex: &mut String, + value_hex: &str, +) -> Result<(), CustomerVolumeReportError> { + let total = Float::from_hex(total_hex)?; + let value = Float::from_hex(value_hex)?; + *total_hex = total.add(value)?.as_hex().to_ascii_lowercase(); + Ok(()) +} + +fn format_hex(value_hex: &str) -> Result { + Ok(Float::from_hex(value_hex)?.format()?) +} + +#[cfg(test)] +mod tests { + use super::*; + use sqlx::Row; + + async fn test_pool() -> DbPool { + let id = uuid::Uuid::new_v4(); + crate::db::init(&format!("sqlite:file:{id}?mode=memory&cache=shared")) + .await + .expect("test database init") + } + + async fn seed_match_context(pool: &DbPool, created_at_unix: i64) -> MatchedExecution { + let key_id = uuid::Uuid::new_v4().to_string(); + let secret_hash = crate::auth::hash_secret("test-secret").expect("hash secret"); + let api_key_insert = sqlx::query( + "INSERT INTO api_keys (key_id, secret_hash, label, owner) VALUES (?, ?, ?, ?)", + ) + .bind(&key_id) + .bind(secret_hash) + .bind("alpha") + .bind("owner-a") + .execute(pool) + .await + .expect("insert api key"); + let api_key_id = api_key_insert.last_insert_rowid(); + + let issued_insert = sqlx::query( + "INSERT INTO issued_swap_calldata ( + api_key_id, + chain_id, + taker, + to_address, + tx_value, + calldata, + calldata_hash, + input_token, + output_token, + output_amount, + maximum_io_ratio, + estimated_input, + created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime(?,'unixepoch'))", + ) + .bind(api_key_id) + .bind(crate::CHAIN_ID as i64) + .bind("0x1111111111111111111111111111111111111111") + .bind("0xd2938e7c9fe3597f78832ce780feb61945c377d7") + .bind("0") + .bind("0xabcdef") + .bind("0xdeadbeef") + .bind("0xinput") + .bind("0xoutput") + .bind("1") + .bind("205") + .bind("50") + .bind(created_at_unix) + .execute(pool) + .await + .expect("insert issued calldata"); + let issued_swap_calldata_id = issued_insert.last_insert_rowid(); + + MatchedExecution { + issued_swap_calldata_id, + api_key_id, + key_id, + label: "alpha".to_string(), + owner: "owner-a".to_string(), + tx_hash: "0xtx1".to_string(), + sender: "0x1111111111111111111111111111111111111111".to_string(), + to_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), + block_number: 10, + timestamp: 100, + input_token: "0xinput".to_string(), + output_token: "0xoutput".to_string(), + total_input_hex: Float::parse("50".to_string()).expect("float").as_hex(), + total_output_hex: Float::parse("0.25".to_string()).expect("float").as_hex(), + duplicate_match_count: 1, + } + } + + fn issued_row( + id: i64, + api_key_id: i64, + label: &str, + owner: &str, + created_at_unix: i64, + ) -> IssuedSwapRow { + IssuedSwapRow { + id, + api_key_id, + key_id: format!("key-{api_key_id}"), + label: label.to_string(), + owner: owner.to_string(), + taker: "0x1111111111111111111111111111111111111111".to_string(), + to_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), + calldata: "0xabcdef".to_string(), + input_token: "0xinput".to_string(), + output_token: "0xoutput".to_string(), + created_at_unix, + } + } + + fn candidate(tx_hash: &str, sender: &str, to: &str, timestamp: u64) -> CandidateExecution { + CandidateExecution { + tx_hash: tx_hash.to_string(), + orderbook_address: to.to_string(), + sender: sender.to_string(), + block_number: 1, + timestamp, + total_input_hex: Float::parse("10".to_string()).expect("float").as_hex(), + total_output_hex: Float::parse("2".to_string()).expect("float").as_hex(), + } + } + + #[test] + fn test_aggregate_candidate_rows_sums_same_transaction() { + let rows = vec![ + TakeOrderRow { + orderbook_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), + transaction_hash: + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(), + block_number: 10, + block_timestamp: 100, + sender: "0x1111111111111111111111111111111111111111".to_string(), + taker_input: Float::parse("0.25".to_string()).expect("float").as_hex(), + taker_output: Float::parse("50".to_string()).expect("float").as_hex(), + }, + TakeOrderRow { + orderbook_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), + transaction_hash: + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(), + block_number: 10, + block_timestamp: 100, + sender: "0x1111111111111111111111111111111111111111".to_string(), + taker_input: Float::parse("0.20".to_string()).expect("float").as_hex(), + taker_output: Float::parse("40".to_string()).expect("float").as_hex(), + }, + ]; + + let aggregated = aggregate_candidate_rows(rows).expect("aggregate"); + assert_eq!(aggregated.len(), 1); + assert_eq!( + format_hex(&aggregated[0].total_input_hex).expect("format"), + "90" + ); + assert_eq!( + format_hex(&aggregated[0].total_output_hex).expect("format"), + "0.45" + ); + } + + #[test] + fn test_select_issued_row_requires_exact_match_and_window() { + let rows = vec![ + issued_row(1, 1, "alpha", "owner-a", 100), + issued_row(2, 2, "beta", "owner-b", 10), + ]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 350, + ), + "0xabcdef", + &issued_row_index, + ); + + assert_eq!(selection.row.map(|row| row.id), Some(1)); + assert_eq!(selection.duplicate_match_count, 1); + } + + #[test] + fn test_select_issued_row_prefers_latest_matching_issuance() { + let rows = vec![ + issued_row(1, 1, "alpha", "owner-a", 100), + issued_row(2, 2, "beta", "owner-b", 200), + ]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 240, + ), + "0xabcdef", + &issued_row_index, + ); + + assert_eq!(selection.row.map(|row| row.id), Some(2)); + assert_eq!(selection.duplicate_match_count, 2); + } + + #[test] + fn test_build_customer_summaries_groups_by_customer_and_pair() { + let executions = vec![ + MatchedExecution { + issued_swap_calldata_id: 1, + api_key_id: 1, + key_id: "key-1".to_string(), + label: "alpha".to_string(), + owner: "owner-a".to_string(), + tx_hash: "0xtx1".to_string(), + sender: "0x1111111111111111111111111111111111111111".to_string(), + to_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), + block_number: 1, + timestamp: 10, + input_token: "0xinput".to_string(), + output_token: "0xoutput".to_string(), + total_input_hex: Float::parse("50".to_string()).expect("float").as_hex(), + total_output_hex: Float::parse("0.25".to_string()).expect("float").as_hex(), + duplicate_match_count: 1, + }, + MatchedExecution { + issued_swap_calldata_id: 2, + api_key_id: 1, + key_id: "key-1".to_string(), + label: "alpha".to_string(), + owner: "owner-a".to_string(), + tx_hash: "0xtx2".to_string(), + sender: "0x1111111111111111111111111111111111111111".to_string(), + to_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), + block_number: 2, + timestamp: 11, + input_token: "0xinput".to_string(), + output_token: "0xoutput".to_string(), + total_input_hex: Float::parse("40".to_string()).expect("float").as_hex(), + total_output_hex: Float::parse("0.20".to_string()).expect("float").as_hex(), + duplicate_match_count: 1, + }, + ]; + + let customers = build_customer_summaries(&executions).expect("summary"); + assert_eq!(customers.len(), 1); + assert_eq!(customers[0].executed_txs, 2); + assert_eq!(customers[0].pairs.len(), 1); + assert_eq!(customers[0].pairs[0].total_input_amount, "90"); + assert_eq!(customers[0].pairs[0].total_output_amount, "0.45"); + } + + async fn mock_rpc_url_for_input(input: &str) -> String { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("bind mock rpc server"); + let addr = listener.local_addr().expect("mock rpc addr"); + let body = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": { + "input": input, + } + }) + .to_string(); + + tokio::spawn(async move { + loop { + let Ok((mut socket, _)) = listener.accept().await else { + break; + }; + + let body = body.clone(); + tokio::spawn(async move { + let mut buffer = [0u8; 4096]; + let _ = tokio::io::AsyncReadExt::read(&mut socket, &mut buffer).await; + let response = format!( + "HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}", + body.len(), + body + ); + let _ = + tokio::io::AsyncWriteExt::write_all(&mut socket, response.as_bytes()).await; + }); + } + }); + + format!("http://{addr}") + } + + async fn seed_raindex_take_orders( + db_path: &std::path::Path, + tx_hash: &str, + block_timestamp: i64, + ) { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect_with( + SqliteConnectOptions::new() + .filename(db_path) + .create_if_missing(true), + ) + .await + .expect("open test raindex db"); + + sqlx::query( + "CREATE TABLE IF NOT EXISTS take_orders ( + chain_id INTEGER NOT NULL, + orderbook_address TEXT NOT NULL, + transaction_hash TEXT NOT NULL, + block_number INTEGER NOT NULL, + block_timestamp INTEGER NOT NULL, + sender TEXT NOT NULL, + taker_input TEXT NOT NULL, + taker_output TEXT NOT NULL, + log_index INTEGER NOT NULL + )", + ) + .execute(&pool) + .await + .expect("create take_orders table"); + + sqlx::query( + "INSERT INTO take_orders ( + chain_id, + orderbook_address, + transaction_hash, + block_number, + block_timestamp, + sender, + taker_input, + taker_output, + log_index + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(crate::CHAIN_ID as i64) + .bind("0xd2938e7c9fe3597f78832ce780feb61945c377d7") + .bind(tx_hash) + .bind(10_i64) + .bind(block_timestamp) + .bind("0x1111111111111111111111111111111111111111") + .bind(Float::parse("0.25".to_string()).expect("float").as_hex()) + .bind(Float::parse("50".to_string()).expect("float").as_hex()) + .bind(0_i64) + .execute(&pool) + .await + .expect("insert take order"); + } + + #[tokio::test] + async fn test_run_reconciles_and_persists_customer_volume_report() { + let pool = test_pool().await; + let expected = seed_match_context(&pool, 100).await; + let tx_hash = "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders(&db_path, tx_hash, 105).await; + + let rpc_url = mock_rpc_url_for_input("0xabcdef").await; + let settings = format!( + "version: 4 +networks: + base: + rpcs: + - {rpc_url} + chain-id: 8453 + currency: ETH +subgraphs: + base: https://example.com/subgraph +orderbooks: + base: + address: 0xd2938e7c9fe3597f78832ce780feb61945c377d7 + network: base + subgraph: base + deployment-block: 0 +deployers: + base: + address: 0xC1A14cE2fd58A3A2f99deCb8eDd866204eE07f8D + network: base +tokens: + token1: + address: 0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913 + network: base +" + ); + let registry_url = + crate::test_helpers::mock_raindex_registry_url_with_settings(&settings).await; + let raindex = crate::raindex::RaindexProvider::load(®istry_url, Some(db_path)) + .await + .expect("load test raindex provider"); + + run( + &pool, + &raindex, + CustomerVolumeReportArgs { + start_time: 90, + end_time: 110, + json: true, + }, + ) + .await + .expect("run report"); + + let attributed = sqlx::query( + "SELECT tx_hash, issued_swap_calldata_id, api_key_id, total_input_amount, total_output_amount + FROM attributed_swap_txs", + ) + .fetch_one(&pool) + .await + .expect("fetch attributed tx"); + + assert_eq!(attributed.get::("tx_hash"), tx_hash); + assert_eq!( + attributed.get::("issued_swap_calldata_id"), + expected.issued_swap_calldata_id + ); + assert_eq!(attributed.get::("api_key_id"), expected.api_key_id); + assert_eq!(attributed.get::("total_input_amount"), "50"); + assert_eq!(attributed.get::("total_output_amount"), "0.25"); + + let run_counts = sqlx::query( + "SELECT matched_count, unmatched_count, ambiguous_count FROM swap_report_runs", + ) + .fetch_one(&pool) + .await + .expect("fetch report run"); + assert_eq!(run_counts.get::("matched_count"), 1); + assert_eq!(run_counts.get::("unmatched_count"), 0); + assert_eq!(run_counts.get::("ambiguous_count"), 0); + + run( + &pool, + &raindex, + CustomerVolumeReportArgs { + start_time: 90, + end_time: 110, + json: true, + }, + ) + .await + .expect("rerun report"); + + let attributed_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM attributed_swap_txs") + .fetch_one(&pool) + .await + .expect("count attributed txs"); + let report_run_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM swap_report_runs") + .fetch_one(&pool) + .await + .expect("count report runs"); + + assert_eq!(attributed_count, 1); + assert_eq!(report_run_count, 2); + } + + #[tokio::test] + async fn test_persist_matches_is_idempotent() { + let pool = test_pool().await; + let mut execution = seed_match_context(&pool, 100).await; + + persist_matches(&pool, &[execution.clone()]) + .await + .expect("persist match"); + + execution.total_input_hex = Float::parse("60".to_string()).expect("float").as_hex(); + execution.total_output_hex = Float::parse("0.30".to_string()).expect("float").as_hex(); + + persist_matches(&pool, &[execution]) + .await + .expect("persist updated match"); + + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM attributed_swap_txs") + .fetch_one(&pool) + .await + .expect("count matches"); + assert_eq!(count, 1); + + let amounts: (String, String) = sqlx::query_as( + "SELECT total_input_amount, total_output_amount FROM attributed_swap_txs WHERE tx_hash = ?", + ) + .bind("0xtx1") + .fetch_one(&pool) + .await + .expect("fetch persisted amounts"); + assert_eq!(amounts.0, "60"); + assert_eq!(amounts.1, "0.3"); + } + + #[tokio::test] + async fn test_persist_report_run_records_audit_counts() { + let pool = test_pool().await; + let report = CustomerVolumeReport { + start_time: 10, + end_time: 20, + audit: ReportAudit { + matched_executions: 2, + unmatched_executions: 1, + ambiguous_matches: 1, + }, + customers: Vec::new(), + matched_transactions: Vec::new(), + unmatched_transactions: Vec::new(), + }; + + persist_report_run(&pool, &report) + .await + .expect("persist report run"); + + let stored: (String, i64, i64, i64, i64, i64) = sqlx::query_as( + "SELECT report_name, start_time, end_time, matched_count, unmatched_count, ambiguous_count + FROM swap_report_runs", + ) + .fetch_one(&pool) + .await + .expect("fetch report run"); + + assert_eq!(stored.0, "customer-volume"); + assert_eq!(stored.1, 10); + assert_eq!(stored.2, 20); + assert_eq!(stored.3, 2); + assert_eq!(stored.4, 1); + assert_eq!(stored.5, 1); + } +} diff --git a/src/reporting/mod.rs b/src/reporting/mod.rs new file mode 100644 index 0000000..37ad6b0 --- /dev/null +++ b/src/reporting/mod.rs @@ -0,0 +1 @@ +pub(crate) mod customer_volume; From 355de1cc358e59c67ef66411e952bf478d44bfce Mon Sep 17 00:00:00 2001 From: Arda Nakisci Date: Tue, 24 Mar 2026 15:49:45 +0300 Subject: [PATCH 3/5] Simplify customer volume report --- ...0_create_customer_volume_report_tables.sql | 36 -- src/reporting/customer_volume.rs | 329 ++++-------------- 2 files changed, 65 insertions(+), 300 deletions(-) delete mode 100644 migrations/20260324010000_create_customer_volume_report_tables.sql diff --git a/migrations/20260324010000_create_customer_volume_report_tables.sql b/migrations/20260324010000_create_customer_volume_report_tables.sql deleted file mode 100644 index d793b7e..0000000 --- a/migrations/20260324010000_create_customer_volume_report_tables.sql +++ /dev/null @@ -1,36 +0,0 @@ -CREATE TABLE IF NOT EXISTS attributed_swap_txs ( - tx_hash TEXT PRIMARY KEY, - issued_swap_calldata_id INTEGER NOT NULL, - api_key_id INTEGER NOT NULL, - chain_id INTEGER NOT NULL, - sender TEXT NOT NULL, - to_address TEXT NOT NULL, - block_number INTEGER NOT NULL, - block_timestamp INTEGER NOT NULL, - input_token TEXT NOT NULL, - output_token TEXT NOT NULL, - total_input_amount TEXT NOT NULL, - total_output_amount TEXT NOT NULL, - matched_at TEXT NOT NULL DEFAULT (datetime('now')), - FOREIGN KEY (issued_swap_calldata_id) REFERENCES issued_swap_calldata(id) ON DELETE CASCADE, - FOREIGN KEY (api_key_id) REFERENCES api_keys(id) ON DELETE CASCADE -); - -CREATE INDEX idx_attributed_swap_txs_api_key_id_timestamp - ON attributed_swap_txs (api_key_id, block_timestamp); -CREATE INDEX idx_attributed_swap_txs_chain_id_timestamp - ON attributed_swap_txs (chain_id, block_timestamp); - -CREATE TABLE IF NOT EXISTS swap_report_runs ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - report_name TEXT NOT NULL, - start_time INTEGER NOT NULL, - end_time INTEGER NOT NULL, - matched_count INTEGER NOT NULL, - unmatched_count INTEGER NOT NULL, - ambiguous_count INTEGER NOT NULL, - completed_at TEXT NOT NULL DEFAULT (datetime('now')) -); - -CREATE INDEX idx_swap_report_runs_name_completed - ON swap_report_runs (report_name, completed_at); diff --git a/src/reporting/customer_volume.rs b/src/reporting/customer_volume.rs index 17ccf6b..5a6cc83 100644 --- a/src/reporting/customer_volume.rs +++ b/src/reporting/customer_volume.rs @@ -88,7 +88,6 @@ struct CandidateExecution { #[derive(Debug, Clone)] struct MatchedExecution { - issued_swap_calldata_id: i64, api_key_id: i64, key_id: String, label: String, @@ -169,12 +168,6 @@ struct CustomerVolumeReport { unmatched_transactions: Vec, } -#[derive(Debug)] -struct BuiltCustomerVolumeReport { - report: CustomerVolumeReport, - matched_executions: Vec, -} - type IssuedRowIndex<'a> = HashMap<(String, String, String), Vec<&'a IssuedSwapRow>>; #[derive(Debug, Deserialize)] @@ -209,20 +202,18 @@ pub(crate) async fn run( "customer volume report requested" ); - let built_report = build_report(pool, raindex, args.start_time, args.end_time).await?; - persist_matches(pool, &built_report.matched_executions).await?; - persist_report_run(pool, &built_report.report).await?; + let report = build_report(pool, raindex, args.start_time, args.end_time).await?; if args.json { - println!("{}", serde_json::to_string_pretty(&built_report.report)?); + println!("{}", serde_json::to_string_pretty(&report)?); } else { - print_human_report(&built_report.report); + print_human_report(&report); } tracing::info!( - matched = built_report.report.audit.matched_executions, - unmatched = built_report.report.audit.unmatched_executions, - ambiguous = built_report.report.audit.ambiguous_matches, + matched = report.audit.matched_executions, + unmatched = report.audit.unmatched_executions, + ambiguous = report.audit.ambiguous_matches, "customer volume report complete" ); @@ -234,7 +225,7 @@ async fn build_report( raindex: &RaindexProvider, start_time: u64, end_time: u64, -) -> Result { +) -> Result { let chain_orderbooks = orderbooks_for_chain(raindex)?; let issued_rows = load_issued_rows(pool, start_time, end_time).await?; let issued_row_index = build_issued_row_index(&issued_rows); @@ -272,7 +263,6 @@ async fn build_report( match selection.row { Some(row) => matched.push(MatchedExecution { - issued_swap_calldata_id: row.id, api_key_id: row.api_key_id, key_id: row.key_id.clone(), label: row.label.clone(), @@ -323,20 +313,17 @@ async fn build_report( }) .collect::, CustomerVolumeReportError>>()?; - Ok(BuiltCustomerVolumeReport { - report: CustomerVolumeReport { - start_time, - end_time, - audit: ReportAudit { - matched_executions: matched_transactions.len(), - unmatched_executions: unmatched.len(), - ambiguous_matches, - }, - customers, - matched_transactions, - unmatched_transactions: unmatched, + Ok(CustomerVolumeReport { + start_time, + end_time, + audit: ReportAudit { + matched_executions: matched_transactions.len(), + unmatched_executions: unmatched.len(), + ambiguous_matches, }, - matched_executions: matched, + customers, + matched_transactions, + unmatched_transactions: unmatched, }) } @@ -759,87 +746,6 @@ async fn fetch_transaction_input( }) } -async fn persist_matches( - pool: &DbPool, - matches: &[MatchedExecution], -) -> Result<(), CustomerVolumeReportError> { - let mut transaction = pool.begin().await?; - - for entry in matches { - sqlx::query( - "INSERT INTO attributed_swap_txs ( - tx_hash, - issued_swap_calldata_id, - api_key_id, - chain_id, - sender, - to_address, - block_number, - block_timestamp, - input_token, - output_token, - total_input_amount, - total_output_amount - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(tx_hash) DO UPDATE SET - issued_swap_calldata_id = excluded.issued_swap_calldata_id, - api_key_id = excluded.api_key_id, - chain_id = excluded.chain_id, - sender = excluded.sender, - to_address = excluded.to_address, - block_number = excluded.block_number, - block_timestamp = excluded.block_timestamp, - input_token = excluded.input_token, - output_token = excluded.output_token, - total_input_amount = excluded.total_input_amount, - total_output_amount = excluded.total_output_amount", - ) - .bind(&entry.tx_hash) - .bind(entry.issued_swap_calldata_id) - .bind(entry.api_key_id) - .bind(crate::CHAIN_ID as i64) - .bind(&entry.sender) - .bind(&entry.to_address) - .bind(entry.block_number as i64) - .bind(entry.timestamp as i64) - .bind(&entry.input_token) - .bind(&entry.output_token) - .bind(format_hex(&entry.total_input_hex)?) - .bind(format_hex(&entry.total_output_hex)?) - .execute(&mut *transaction) - .await?; - } - - transaction.commit().await?; - Ok(()) -} - -async fn persist_report_run( - pool: &DbPool, - report: &CustomerVolumeReport, -) -> Result<(), CustomerVolumeReportError> { - sqlx::query( - "INSERT INTO swap_report_runs ( - report_name, - start_time, - end_time, - matched_count, - unmatched_count, - ambiguous_count - ) VALUES (?, ?, ?, ?, ?, ?)", - ) - .bind("customer-volume") - .bind(report.start_time as i64) - .bind(report.end_time as i64) - .bind(report.audit.matched_executions as i64) - .bind(report.audit.unmatched_executions as i64) - .bind(report.audit.ambiguous_matches as i64) - .execute(pool) - .await?; - - Ok(()) -} - fn print_human_report(report: &CustomerVolumeReport) { println!("Customer volume report"); println!("Window: {} -> {}", report.start_time, report.end_time); @@ -921,7 +827,12 @@ fn format_hex(value_hex: &str) -> Result { #[cfg(test)] mod tests { use super::*; - use sqlx::Row; + + #[derive(Debug)] + struct SeededMatchContext { + api_key_id: i64, + key_id: String, + } async fn test_pool() -> DbPool { let id = uuid::Uuid::new_v4(); @@ -930,7 +841,7 @@ mod tests { .expect("test database init") } - async fn seed_match_context(pool: &DbPool, created_at_unix: i64) -> MatchedExecution { + async fn seed_match_context(pool: &DbPool, created_at_unix: i64) -> SeededMatchContext { let key_id = uuid::Uuid::new_v4().to_string(); let secret_hash = crate::auth::hash_secret("test-secret").expect("hash secret"); let api_key_insert = sqlx::query( @@ -945,7 +856,7 @@ mod tests { .expect("insert api key"); let api_key_id = api_key_insert.last_insert_rowid(); - let issued_insert = sqlx::query( + sqlx::query( "INSERT INTO issued_swap_calldata ( api_key_id, chain_id, @@ -978,25 +889,8 @@ mod tests { .execute(pool) .await .expect("insert issued calldata"); - let issued_swap_calldata_id = issued_insert.last_insert_rowid(); - MatchedExecution { - issued_swap_calldata_id, - api_key_id, - key_id, - label: "alpha".to_string(), - owner: "owner-a".to_string(), - tx_hash: "0xtx1".to_string(), - sender: "0x1111111111111111111111111111111111111111".to_string(), - to_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), - block_number: 10, - timestamp: 100, - input_token: "0xinput".to_string(), - output_token: "0xoutput".to_string(), - total_input_hex: Float::parse("50".to_string()).expect("float").as_hex(), - total_output_hex: Float::parse("0.25".to_string()).expect("float").as_hex(), - duplicate_match_count: 1, - } + SeededMatchContext { api_key_id, key_id } } fn issued_row( @@ -1120,7 +1014,6 @@ mod tests { fn test_build_customer_summaries_groups_by_customer_and_pair() { let executions = vec![ MatchedExecution { - issued_swap_calldata_id: 1, api_key_id: 1, key_id: "key-1".to_string(), label: "alpha".to_string(), @@ -1137,7 +1030,6 @@ mod tests { duplicate_match_count: 1, }, MatchedExecution { - issued_swap_calldata_id: 2, api_key_id: 1, key_id: "key-1".to_string(), label: "alpha".to_string(), @@ -1261,7 +1153,7 @@ mod tests { } #[tokio::test] - async fn test_run_reconciles_and_persists_customer_volume_report() { + async fn test_build_report_reconciles_customer_volume_report() { let pool = test_pool().await; let expected = seed_match_context(&pool, 100).await; let tx_hash = "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; @@ -1303,136 +1195,45 @@ tokens: .await .expect("load test raindex provider"); - run( - &pool, - &raindex, - CustomerVolumeReportArgs { - start_time: 90, - end_time: 110, - json: true, - }, - ) - .await - .expect("run report"); - - let attributed = sqlx::query( - "SELECT tx_hash, issued_swap_calldata_id, api_key_id, total_input_amount, total_output_amount - FROM attributed_swap_txs", - ) - .fetch_one(&pool) - .await - .expect("fetch attributed tx"); - - assert_eq!(attributed.get::("tx_hash"), tx_hash); + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.start_time, 90); + assert_eq!(report.end_time, 110); + assert_eq!(report.audit.matched_executions, 1); + assert_eq!(report.audit.unmatched_executions, 0); + assert_eq!(report.audit.ambiguous_matches, 0); + assert_eq!(report.customers.len(), 1); + assert_eq!(report.customers[0].api_key_id, expected.api_key_id); + assert_eq!(report.customers[0].key_id, expected.key_id); + assert_eq!(report.customers[0].label, "alpha"); + assert_eq!(report.customers[0].owner, "owner-a"); + assert_eq!(report.customers[0].executed_txs, 1); + assert_eq!(report.customers[0].pairs.len(), 1); + assert_eq!(report.customers[0].pairs[0].input_token, "0xinput"); + assert_eq!(report.customers[0].pairs[0].output_token, "0xoutput"); + assert_eq!(report.customers[0].pairs[0].total_input_amount, "50"); + assert_eq!(report.customers[0].pairs[0].total_output_amount, "0.25"); + assert_eq!(report.matched_transactions.len(), 1); + assert_eq!(report.matched_transactions[0].tx_hash, tx_hash); assert_eq!( - attributed.get::("issued_swap_calldata_id"), - expected.issued_swap_calldata_id + report.matched_transactions[0].api_key_id, + expected.api_key_id ); - assert_eq!(attributed.get::("api_key_id"), expected.api_key_id); - assert_eq!(attributed.get::("total_input_amount"), "50"); - assert_eq!(attributed.get::("total_output_amount"), "0.25"); - - let run_counts = sqlx::query( - "SELECT matched_count, unmatched_count, ambiguous_count FROM swap_report_runs", - ) - .fetch_one(&pool) - .await - .expect("fetch report run"); - assert_eq!(run_counts.get::("matched_count"), 1); - assert_eq!(run_counts.get::("unmatched_count"), 0); - assert_eq!(run_counts.get::("ambiguous_count"), 0); - - run( - &pool, - &raindex, - CustomerVolumeReportArgs { - start_time: 90, - end_time: 110, - json: true, - }, - ) - .await - .expect("rerun report"); - - let attributed_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM attributed_swap_txs") - .fetch_one(&pool) - .await - .expect("count attributed txs"); - let report_run_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM swap_report_runs") - .fetch_one(&pool) - .await - .expect("count report runs"); - - assert_eq!(attributed_count, 1); - assert_eq!(report_run_count, 2); - } - - #[tokio::test] - async fn test_persist_matches_is_idempotent() { - let pool = test_pool().await; - let mut execution = seed_match_context(&pool, 100).await; - - persist_matches(&pool, &[execution.clone()]) - .await - .expect("persist match"); - - execution.total_input_hex = Float::parse("60".to_string()).expect("float").as_hex(); - execution.total_output_hex = Float::parse("0.30".to_string()).expect("float").as_hex(); - - persist_matches(&pool, &[execution]) - .await - .expect("persist updated match"); - - let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM attributed_swap_txs") - .fetch_one(&pool) - .await - .expect("count matches"); - assert_eq!(count, 1); - - let amounts: (String, String) = sqlx::query_as( - "SELECT total_input_amount, total_output_amount FROM attributed_swap_txs WHERE tx_hash = ?", - ) - .bind("0xtx1") - .fetch_one(&pool) - .await - .expect("fetch persisted amounts"); - assert_eq!(amounts.0, "60"); - assert_eq!(amounts.1, "0.3"); - } - - #[tokio::test] - async fn test_persist_report_run_records_audit_counts() { - let pool = test_pool().await; - let report = CustomerVolumeReport { - start_time: 10, - end_time: 20, - audit: ReportAudit { - matched_executions: 2, - unmatched_executions: 1, - ambiguous_matches: 1, - }, - customers: Vec::new(), - matched_transactions: Vec::new(), - unmatched_transactions: Vec::new(), - }; - - persist_report_run(&pool, &report) - .await - .expect("persist report run"); - - let stored: (String, i64, i64, i64, i64, i64) = sqlx::query_as( - "SELECT report_name, start_time, end_time, matched_count, unmatched_count, ambiguous_count - FROM swap_report_runs", - ) - .fetch_one(&pool) - .await - .expect("fetch report run"); - - assert_eq!(stored.0, "customer-volume"); - assert_eq!(stored.1, 10); - assert_eq!(stored.2, 20); - assert_eq!(stored.3, 2); - assert_eq!(stored.4, 1); - assert_eq!(stored.5, 1); + assert_eq!(report.matched_transactions[0].key_id, expected.key_id); + assert_eq!( + report.matched_transactions[0].sender, + "0x1111111111111111111111111111111111111111" + ); + assert_eq!( + report.matched_transactions[0].to_address, + "0xd2938e7c9fe3597f78832ce780feb61945c377d7" + ); + assert_eq!(report.matched_transactions[0].input_token, "0xinput"); + assert_eq!(report.matched_transactions[0].output_token, "0xoutput"); + assert_eq!(report.matched_transactions[0].total_input_amount, "50"); + assert_eq!(report.matched_transactions[0].total_output_amount, "0.25"); + assert_eq!(report.unmatched_transactions.len(), 0); } } From da852ec488af2339ef9384a0105f5ad4c9e6d6fb Mon Sep 17 00:00:00 2001 From: Arda Nakisci Date: Tue, 24 Mar 2026 16:48:02 +0300 Subject: [PATCH 4/5] Add durable customer volume attribution snapshots --- ...0324000000_create_issued_swap_calldata.sql | 6 +- src/db/issued_swap_calldata.rs | 19 ++- src/reporting/customer_volume.rs | 94 +++++++++---- src/routes/swap/calldata.rs | 127 ++++++++++++++---- 4 files changed, 194 insertions(+), 52 deletions(-) diff --git a/migrations/20260324000000_create_issued_swap_calldata.sql b/migrations/20260324000000_create_issued_swap_calldata.sql index 0ec6c21..6790747 100644 --- a/migrations/20260324000000_create_issued_swap_calldata.sql +++ b/migrations/20260324000000_create_issued_swap_calldata.sql @@ -1,6 +1,9 @@ CREATE TABLE IF NOT EXISTS issued_swap_calldata ( id INTEGER PRIMARY KEY AUTOINCREMENT, api_key_id INTEGER NOT NULL, + key_id TEXT NOT NULL, + label TEXT NOT NULL, + owner TEXT NOT NULL, chain_id INTEGER NOT NULL, taker TEXT NOT NULL, to_address TEXT NOT NULL, @@ -12,8 +15,7 @@ CREATE TABLE IF NOT EXISTS issued_swap_calldata ( output_amount TEXT NOT NULL, maximum_io_ratio TEXT NOT NULL, estimated_input TEXT NOT NULL, - created_at TEXT NOT NULL DEFAULT (datetime('now')), - FOREIGN KEY (api_key_id) REFERENCES api_keys(id) ON DELETE CASCADE + created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX idx_issued_swap_calldata_api_key_id_created diff --git a/src/db/issued_swap_calldata.rs b/src/db/issued_swap_calldata.rs index 3cc3b72..6b0bd32 100644 --- a/src/db/issued_swap_calldata.rs +++ b/src/db/issued_swap_calldata.rs @@ -3,9 +3,16 @@ use crate::types::swap::{SwapCalldataRequest, SwapCalldataResponse}; use alloy::hex::encode_prefixed; use alloy::primitives::keccak256; +pub(crate) struct IssuedSwapCalldataKeySnapshot<'a> { + pub api_key_id: i64, + pub key_id: &'a str, + pub label: &'a str, + pub owner: &'a str, +} + pub(crate) async fn insert( pool: &DbPool, - api_key_id: i64, + key: &IssuedSwapCalldataKeySnapshot<'_>, chain_id: u32, request: &SwapCalldataRequest, response: &SwapCalldataResponse, @@ -16,6 +23,9 @@ pub(crate) async fn insert( sqlx::query( "INSERT INTO issued_swap_calldata ( api_key_id, + key_id, + label, + owner, chain_id, taker, to_address, @@ -27,9 +37,12 @@ pub(crate) async fn insert( output_amount, maximum_io_ratio, estimated_input - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) - .bind(api_key_id) + .bind(key.api_key_id) + .bind(key.key_id) + .bind(key.label) + .bind(key.owner) .bind(chain_id as i64) .bind(format!("{:#x}", request.taker)) .bind(format!("{:#x}", response.to)) diff --git a/src/reporting/customer_volume.rs b/src/reporting/customer_volume.rs index 5a6cc83..28d3e6b 100644 --- a/src/reporting/customer_volume.rs +++ b/src/reporting/customer_volume.rs @@ -10,7 +10,7 @@ use sqlx::{FromRow, QueryBuilder}; use std::collections::{BTreeMap, HashMap}; use std::ops::Add; -const MATCH_WINDOW_SECS: i64 = 5 * 60; +const MATCH_WINDOW_SECS: i64 = 2 * 60; #[derive(Debug, Clone, Copy)] pub(crate) struct CustomerVolumeReportArgs { @@ -394,9 +394,9 @@ async fn load_issued_rows( "SELECT isc.id, isc.api_key_id, - ak.key_id, - ak.label, - ak.owner, + isc.key_id, + isc.label, + isc.owner, isc.taker, isc.to_address, isc.calldata, @@ -404,7 +404,6 @@ async fn load_issued_rows( isc.output_token, CAST(strftime('%s', isc.created_at) AS INTEGER) AS created_at_unix FROM issued_swap_calldata isc - JOIN api_keys ak ON ak.id = isc.api_key_id WHERE isc.chain_id = ? AND CAST(strftime('%s', isc.created_at) AS INTEGER) >= ? AND CAST(strftime('%s', isc.created_at) AS INTEGER) <= ? @@ -859,6 +858,9 @@ mod tests { sqlx::query( "INSERT INTO issued_swap_calldata ( api_key_id, + key_id, + label, + owner, chain_id, taker, to_address, @@ -871,9 +873,12 @@ mod tests { maximum_io_ratio, estimated_input, created_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime(?,'unixepoch'))", + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime(?,'unixepoch'))", ) .bind(api_key_id) + .bind(&key_id) + .bind("alpha") + .bind("owner-a") .bind(crate::CHAIN_ID as i64) .bind("0x1111111111111111111111111111111111111111") .bind("0xd2938e7c9fe3597f78832ce780feb61945c377d7") @@ -977,7 +982,7 @@ mod tests { "0xtx", "0x1111111111111111111111111111111111111111", "0xd2938e7c9fe3597f78832ce780feb61945c377d7", - 350, + 180, ), "0xabcdef", &issued_row_index, @@ -1000,7 +1005,7 @@ mod tests { "0xtx", "0x1111111111111111111111111111111111111111", "0xd2938e7c9fe3597f78832ce780feb61945c377d7", - 240, + 210, ), "0xabcdef", &issued_row_index, @@ -1152,18 +1157,8 @@ mod tests { .expect("insert take order"); } - #[tokio::test] - async fn test_build_report_reconciles_customer_volume_report() { - let pool = test_pool().await; - let expected = seed_match_context(&pool, 100).await; - let tx_hash = "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - - let temp_dir = tempfile::tempdir().expect("temp dir"); - let db_path = temp_dir.path().join("raindex.db"); - seed_raindex_take_orders(&db_path, tx_hash, 105).await; - - let rpc_url = mock_rpc_url_for_input("0xabcdef").await; - let settings = format!( + fn test_registry_settings(rpc_url: &str) -> String { + format!( "version: 4 networks: base: @@ -1188,12 +1183,34 @@ tokens: address: 0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913 network: base " - ); + ) + } + + async fn load_test_raindex( + db_path: std::path::PathBuf, + tx_input: &str, + ) -> crate::raindex::RaindexProvider { + let rpc_url = mock_rpc_url_for_input(tx_input).await; + let settings = test_registry_settings(&rpc_url); let registry_url = crate::test_helpers::mock_raindex_registry_url_with_settings(&settings).await; - let raindex = crate::raindex::RaindexProvider::load(®istry_url, Some(db_path)) + + crate::raindex::RaindexProvider::load(®istry_url, Some(db_path)) .await - .expect("load test raindex provider"); + .expect("load test raindex provider") + } + + #[tokio::test] + async fn test_build_report_reconciles_customer_volume_report() { + let pool = test_pool().await; + let expected = seed_match_context(&pool, 100).await; + let tx_hash = "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders(&db_path, tx_hash, 105).await; + + let raindex = load_test_raindex(db_path, "0xabcdef").await; let report = build_report(&pool, &raindex, 90, 110) .await @@ -1236,4 +1253,35 @@ tokens: assert_eq!(report.matched_transactions[0].total_output_amount, "0.25"); assert_eq!(report.unmatched_transactions.len(), 0); } + + #[tokio::test] + async fn test_build_report_uses_issued_snapshot_after_api_key_deletion() { + let pool = test_pool().await; + let expected = seed_match_context(&pool, 100).await; + let tx_hash = "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"; + + sqlx::query("DELETE FROM api_keys WHERE id = ?") + .bind(expected.api_key_id) + .execute(&pool) + .await + .expect("delete api key"); + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders(&db_path, tx_hash, 105).await; + + let raindex = load_test_raindex(db_path, "0xabcdef").await; + + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.customers.len(), 1); + assert_eq!(report.customers[0].api_key_id, expected.api_key_id); + assert_eq!(report.customers[0].key_id, expected.key_id); + assert_eq!(report.customers[0].label, "alpha"); + assert_eq!(report.customers[0].owner, "owner-a"); + assert_eq!(report.matched_transactions.len(), 1); + assert_eq!(report.matched_transactions[0].key_id, report.customers[0].key_id); + } } diff --git a/src/routes/swap/calldata.rs b/src/routes/swap/calldata.rs index 0a40f52..3720877 100644 --- a/src/routes/swap/calldata.rs +++ b/src/routes/swap/calldata.rs @@ -42,7 +42,7 @@ pub async fn post_swap_calldata( let ds = RaindexSwapDataSource { client: raindex.client(), }; - let response = handle_swap_calldata(&ds, &pool, key.id, req).await?; + let response = handle_swap_calldata(&ds, &pool, &key, req).await?; Ok(Json(response)) } .instrument(span.0) @@ -69,17 +69,17 @@ async fn process_swap_calldata( async fn handle_swap_calldata( ds: &dyn SwapDataSource, pool: &DbPool, - api_key_id: i64, + key: &AuthenticatedKey, req: SwapCalldataRequest, ) -> Result { let response = process_swap_calldata(ds, &req).await?; - persist_issued_swap_calldata(pool, api_key_id, &req, &response).await?; + persist_issued_swap_calldata(pool, key, &req, &response).await?; Ok(response) } async fn persist_issued_swap_calldata( pool: &DbPool, - api_key_id: i64, + key: &AuthenticatedKey, request: &SwapCalldataRequest, response: &SwapCalldataResponse, ) -> Result<(), ApiError> { @@ -87,12 +87,26 @@ async fn persist_issued_swap_calldata( return Ok(()); } - crate::db::issued_swap_calldata::insert(pool, api_key_id, crate::CHAIN_ID, request, response) + let key_snapshot = crate::db::issued_swap_calldata::IssuedSwapCalldataKeySnapshot { + api_key_id: key.id, + key_id: &key.key_id, + label: &key.label, + owner: &key.owner, + }; + + crate::db::issued_swap_calldata::insert( + pool, + &key_snapshot, + crate::CHAIN_ID, + request, + response, + ) .await .map_err(|error| { tracing::error!( error = %error, - api_key_id, + api_key_id = key.id, + key_id = %key.key_id, taker = %request.taker, to = %response.to, "failed to persist issued swap calldata" @@ -101,7 +115,8 @@ async fn persist_issued_swap_calldata( })?; tracing::info!( - api_key_id, + api_key_id = key.id, + key_id = %key.key_id, taker = %request.taker, to = %response.to, "persisted issued swap calldata" @@ -241,6 +256,9 @@ mod tests { #[derive(Debug, FromRow)] struct IssuedSwapCalldataRow { api_key_id: i64, + key_id: String, + label: String, + owner: String, chain_id: i64, taker: String, to_address: String, @@ -254,10 +272,22 @@ mod tests { estimated_input: String, } + #[derive(Debug, FromRow)] + struct AuthenticatedKeyRow { + id: i64, + key_id: String, + label: String, + owner: String, + is_admin: bool, + } + async fn fetch_issued_rows(pool: &DbPool) -> Vec { sqlx::query_as::<_, IssuedSwapCalldataRow>( "SELECT api_key_id, + key_id, + label, + owner, chain_id, taker, to_address, @@ -277,13 +307,22 @@ mod tests { .expect("query issued swap calldata") } - async fn fetch_api_key_id(pool: &DbPool, key_id: &str) -> i64 { - let row: (i64,) = sqlx::query_as("SELECT id FROM api_keys WHERE key_id = ?") - .bind(key_id) - .fetch_one(pool) - .await - .expect("api key"); - row.0 + async fn fetch_authenticated_key(pool: &DbPool, key_id: &str) -> AuthenticatedKey { + let row = sqlx::query_as::<_, AuthenticatedKeyRow>( + "SELECT id, key_id, label, owner, is_admin FROM api_keys WHERE key_id = ?", + ) + .bind(key_id) + .fetch_one(pool) + .await + .expect("authenticated key"); + + AuthenticatedKey { + id: row.id, + key_id: row.key_id, + label: row.label, + owner: row.owner, + is_admin: row.is_admin, + } } async fn issued_row_count(pool: &DbPool) -> i64 { @@ -299,7 +338,7 @@ mod tests { let client = TestClientBuilder::new().build().await; let (key_id, _secret) = seed_api_key(&client).await; let pool = client.rocket().state::().expect("pool"); - let api_key_id = fetch_api_key_id(pool, &key_id).await; + let key = fetch_authenticated_key(pool, &key_id).await; let ds = MockSwapDataSource { orders: Ok(vec![]), @@ -308,7 +347,7 @@ mod tests { }; let request = calldata_request("100", "2.5"); - let response = handle_swap_calldata(&ds, pool, api_key_id, request.clone()) + let response = handle_swap_calldata(&ds, pool, &key, request.clone()) .await .expect("successful swap calldata"); @@ -321,7 +360,10 @@ mod tests { let rows = fetch_issued_rows(pool).await; assert_eq!(rows.len(), 1); let row = &rows[0]; - assert_eq!(row.api_key_id, api_key_id); + assert_eq!(row.api_key_id, key.id); + assert_eq!(row.key_id, key.key_id.as_str()); + assert_eq!(row.label, key.label.as_str()); + assert_eq!(row.owner, key.owner.as_str()); assert_eq!(row.chain_id, crate::CHAIN_ID as i64); assert_eq!(row.taker, format!("{:#x}", request.taker)); assert_eq!(row.to_address, format!("{:#x}", ORDERBOOK)); @@ -343,7 +385,7 @@ mod tests { let client = TestClientBuilder::new().build().await; let (key_id, _secret) = seed_api_key(&client).await; let pool = client.rocket().state::().expect("pool"); - let api_key_id = fetch_api_key_id(pool, &key_id).await; + let key = fetch_authenticated_key(pool, &key_id).await; let ds = MockSwapDataSource { orders: Ok(vec![]), @@ -351,7 +393,7 @@ mod tests { calldata_result: Ok(approval_response()), }; - handle_swap_calldata(&ds, pool, api_key_id, calldata_request("100", "2.5")) + handle_swap_calldata(&ds, pool, &key, calldata_request("100", "2.5")) .await .expect("successful approval response"); @@ -364,7 +406,7 @@ mod tests { let client = TestClientBuilder::new().build().await; let (key_id, _secret) = seed_api_key(&client).await; let pool = client.rocket().state::().expect("pool"); - let api_key_id = fetch_api_key_id(pool, &key_id).await; + let key = fetch_authenticated_key(pool, &key_id).await; let ds = MockSwapDataSource { orders: Ok(vec![]), @@ -373,10 +415,10 @@ mod tests { }; let request = calldata_request("100", "2.5"); - handle_swap_calldata(&ds, pool, api_key_id, request.clone()) + handle_swap_calldata(&ds, pool, &key, request.clone()) .await .expect("first swap calldata"); - handle_swap_calldata(&ds, pool, api_key_id, request) + handle_swap_calldata(&ds, pool, &key, request) .await .expect("second swap calldata"); @@ -385,6 +427,36 @@ mod tests { assert_eq!(rows[0].calldata_hash, rows[1].calldata_hash); } + #[rocket::async_test] + async fn test_issued_swap_calldata_survives_api_key_deletion() { + let client = TestClientBuilder::new().build().await; + let (key_id, _secret) = seed_api_key(&client).await; + let pool = client.rocket().state::().expect("pool"); + let key = fetch_authenticated_key(pool, &key_id).await; + + let ds = MockSwapDataSource { + orders: Ok(vec![]), + candidates: vec![], + calldata_result: Ok(ready_response()), + }; + + handle_swap_calldata(&ds, pool, &key, calldata_request("100", "2.5")) + .await + .expect("successful swap calldata"); + + sqlx::query("DELETE FROM api_keys WHERE key_id = ?") + .bind(&key_id) + .execute(pool) + .await + .expect("delete api key"); + + let rows = fetch_issued_rows(pool).await; + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].key_id, key_id); + assert_eq!(rows[0].label, "test-key"); + assert_eq!(rows[0].owner, "test-owner"); + } + #[test] fn test_should_persist_issued_swap_calldata() { assert!(should_persist_issued_swap_calldata(&ready_response())); @@ -402,8 +474,15 @@ mod tests { calldata_result: Ok(ready_response()), }; - let result = - handle_swap_calldata(&ds, pool, i64::MAX, calldata_request("100", "2.5")).await; + let key = AuthenticatedKey { + id: i64::MAX, + key_id: "missing".to_string(), + label: "missing".to_string(), + owner: "missing".to_string(), + is_admin: false, + }; + + let result = handle_swap_calldata(&ds, pool, &key, calldata_request("100", "2.5")).await; assert!(matches!( result, Err(ApiError::Internal(msg)) if msg == "failed to persist issued swap calldata" From d31ec27b1e5b92e83708f3657c7fd327b891370e Mon Sep 17 00:00:00 2001 From: Arda Nakisci Date: Wed, 25 Mar 2026 10:16:08 +0300 Subject: [PATCH 5/5] Improve customer volume attribution reporting --- ...0324000000_create_issued_swap_calldata.sql | 2 + src/db/issued_swap_calldata.rs | 60 +- src/reporting/customer_volume.rs | 923 +++++++++++++++--- src/routes/swap/calldata.rs | 72 +- 4 files changed, 878 insertions(+), 179 deletions(-) diff --git a/migrations/20260324000000_create_issued_swap_calldata.sql b/migrations/20260324000000_create_issued_swap_calldata.sql index 6790747..d957cf9 100644 --- a/migrations/20260324000000_create_issued_swap_calldata.sql +++ b/migrations/20260324000000_create_issued_swap_calldata.sql @@ -20,6 +20,8 @@ CREATE TABLE IF NOT EXISTS issued_swap_calldata ( CREATE INDEX idx_issued_swap_calldata_api_key_id_created ON issued_swap_calldata (api_key_id, created_at); +CREATE INDEX idx_issued_swap_calldata_chain_id_created + ON issued_swap_calldata (chain_id, created_at); CREATE INDEX idx_issued_swap_calldata_calldata_hash_created ON issued_swap_calldata (calldata_hash, created_at); CREATE INDEX idx_issued_swap_calldata_taker_created diff --git a/src/db/issued_swap_calldata.rs b/src/db/issued_swap_calldata.rs index 6b0bd32..4c00c1c 100644 --- a/src/db/issued_swap_calldata.rs +++ b/src/db/issued_swap_calldata.rs @@ -1,25 +1,27 @@ use crate::db::DbPool; -use crate::types::swap::{SwapCalldataRequest, SwapCalldataResponse}; -use alloy::hex::encode_prefixed; -use alloy::primitives::keccak256; -pub(crate) struct IssuedSwapCalldataKeySnapshot<'a> { +pub(crate) struct NewIssuedSwapCalldata { pub api_key_id: i64, - pub key_id: &'a str, - pub label: &'a str, - pub owner: &'a str, + pub key_id: String, + pub label: String, + pub owner: String, + pub chain_id: i64, + pub taker: String, + pub to_address: String, + pub tx_value: String, + pub calldata: String, + pub calldata_hash: String, + pub input_token: String, + pub output_token: String, + pub output_amount: String, + pub maximum_io_ratio: String, + pub estimated_input: String, } pub(crate) async fn insert( pool: &DbPool, - key: &IssuedSwapCalldataKeySnapshot<'_>, - chain_id: u32, - request: &SwapCalldataRequest, - response: &SwapCalldataResponse, + record: &NewIssuedSwapCalldata, ) -> Result<(), sqlx::Error> { - let calldata = encode_prefixed(response.data.as_ref()); - let calldata_hash = encode_prefixed(keccak256(response.data.as_ref())); - sqlx::query( "INSERT INTO issued_swap_calldata ( api_key_id, @@ -39,21 +41,21 @@ pub(crate) async fn insert( estimated_input ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) - .bind(key.api_key_id) - .bind(key.key_id) - .bind(key.label) - .bind(key.owner) - .bind(chain_id as i64) - .bind(format!("{:#x}", request.taker)) - .bind(format!("{:#x}", response.to)) - .bind(response.value.to_string()) - .bind(calldata) - .bind(calldata_hash) - .bind(format!("{:#x}", request.input_token)) - .bind(format!("{:#x}", request.output_token)) - .bind(&request.output_amount) - .bind(&request.maximum_io_ratio) - .bind(&response.estimated_input) + .bind(record.api_key_id) + .bind(&record.key_id) + .bind(&record.label) + .bind(&record.owner) + .bind(record.chain_id) + .bind(&record.taker) + .bind(&record.to_address) + .bind(&record.tx_value) + .bind(&record.calldata) + .bind(&record.calldata_hash) + .bind(&record.input_token) + .bind(&record.output_token) + .bind(&record.output_amount) + .bind(&record.maximum_io_ratio) + .bind(&record.estimated_input) .execute(pool) .await?; diff --git a/src/reporting/customer_volume.rs b/src/reporting/customer_volume.rs index 28d3e6b..21e77cf 100644 --- a/src/reporting/customer_volume.rs +++ b/src/reporting/customer_volume.rs @@ -1,5 +1,6 @@ use crate::db::DbPool; use crate::raindex::RaindexProvider; +use futures::stream::{self, StreamExt}; use rain_math_float::Float; use rain_orderbook_common::raindex_client::RaindexError; use reqwest::Client as HttpClient; @@ -9,8 +10,13 @@ use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use sqlx::{FromRow, QueryBuilder}; use std::collections::{BTreeMap, HashMap}; use std::ops::Add; +use std::time::Duration; const MATCH_WINDOW_SECS: i64 = 2 * 60; +const RPC_LOOKUP_CONCURRENCY: usize = 8; +const RPC_LOOKUP_TIMEOUT_SECS: u64 = 5; +const RPC_LOOKUP_MAX_ATTEMPTS: u32 = 3; +const RPC_LOOKUP_RETRY_BASE_DELAY_MS: u64 = 200; #[derive(Debug, Clone, Copy)] pub(crate) struct CustomerVolumeReportArgs { @@ -27,8 +33,6 @@ pub(crate) enum CustomerVolumeReportError { MissingLocalDbPath, #[error("raindex local db file not found: {path}")] MissingLocalDb { path: String }, - #[error("no orderbooks configured for chain {chain_id}")] - MissingOrderbooks { chain_id: u32 }, #[error("no RPC URLs configured for chain {chain_id}")] MissingRpcUrls { chain_id: u32 }, #[error("database error: {0}")] @@ -109,6 +113,7 @@ struct ReportAudit { matched_executions: usize, unmatched_executions: usize, ambiguous_matches: usize, + rpc_lookup_failures: usize, } #[derive(Debug, Clone, Serialize)] @@ -158,6 +163,57 @@ struct UnmatchedTransactionReport { total_output_amount: String, } +#[derive(Debug, Clone, Serialize)] +struct AmbiguousMatchCandidateReport { + api_key_id: i64, + key_id: String, + label: String, + owner: String, +} + +#[derive(Debug, Clone, Serialize)] +struct AmbiguousTransactionReport { + tx_hash: String, + sender: String, + to_address: String, + block_number: u64, + timestamp: u64, + total_input_amount: String, + total_output_amount: String, + candidates: Vec, +} + +#[derive(Debug, Clone, Serialize)] +struct RpcLookupFailureReport { + tx_hash: String, + sender: String, + to_address: String, + block_number: u64, + timestamp: u64, + error: String, +} + +impl RpcLookupFailureReport { + fn from_error(candidate: &CandidateExecution, error: CustomerVolumeReportError) -> Self { + let (tx_hash, error) = match error { + CustomerVolumeReportError::RpcLookup { tx_hash, message } => (tx_hash, message), + CustomerVolumeReportError::TransactionNotFound { tx_hash } => { + (tx_hash, "transaction not found via RPC".to_string()) + } + other => (candidate.tx_hash.clone(), other.to_string()), + }; + + Self { + tx_hash, + sender: candidate.sender.clone(), + to_address: candidate.orderbook_address.clone(), + block_number: candidate.block_number, + timestamp: candidate.timestamp, + error, + } + } +} + #[derive(Debug, Clone, Serialize)] struct CustomerVolumeReport { start_time: u64, @@ -166,6 +222,8 @@ struct CustomerVolumeReport { customers: Vec, matched_transactions: Vec, unmatched_transactions: Vec, + ambiguous_transactions: Vec, + rpc_lookup_failures: Vec, } type IssuedRowIndex<'a> = HashMap<(String, String, String), Vec<&'a IssuedSwapRow>>; @@ -214,6 +272,7 @@ pub(crate) async fn run( matched = report.audit.matched_executions, unmatched = report.audit.unmatched_executions, ambiguous = report.audit.ambiguous_matches, + rpc_lookup_failures = report.audit.rpc_lookup_failures, "customer volume report complete" ); @@ -226,22 +285,22 @@ async fn build_report( start_time: u64, end_time: u64, ) -> Result { - let chain_orderbooks = orderbooks_for_chain(raindex)?; let issued_rows = load_issued_rows(pool, start_time, end_time).await?; let issued_row_index = build_issued_row_index(&issued_rows); tracing::info!( - orderbook_count = chain_orderbooks.len(), issued_count = issued_rows.len(), "loaded issued swap rows for customer volume report" ); let raindex_db = open_raindex_db(raindex).await?; - let candidate_rows = - load_take_order_rows(&raindex_db, &chain_orderbooks, start_time, end_time).await?; + let candidate_rows = load_take_order_rows(&raindex_db, start_time, end_time).await?; let candidates = aggregate_candidate_rows(candidate_rows)?; let rpc_urls = rpc_urls_for_chain(raindex)?; - let rpc = HttpClient::new(); + let rpc = HttpClient::builder() + .timeout(Duration::from_secs(RPC_LOOKUP_TIMEOUT_SECS)) + .build() + .map_err(CustomerVolumeReportError::Http)?; tracing::info!( candidate_count = candidates.len(), @@ -252,13 +311,62 @@ async fn build_report( let mut matched = Vec::new(); let mut unmatched = Vec::new(); let mut ambiguous_matches = 0usize; + let mut ambiguous_transactions = Vec::new(); + let mut rpc_lookup_failures = Vec::new(); - for candidate in candidates { - let tx_input = fetch_transaction_input(&rpc, &rpc_urls, &candidate.tx_hash).await?; + let mut lookup_results = stream::iter(candidates.into_iter().map(|candidate| { + let rpc = rpc.clone(); + let rpc_urls = rpc_urls.clone(); + + async move { + let tx_input = fetch_transaction_input(&rpc, &rpc_urls, &candidate.tx_hash).await; + (candidate, tx_input) + } + })) + .buffered(RPC_LOOKUP_CONCURRENCY); + + while let Some((candidate, tx_input)) = lookup_results.next().await { + let tx_input = match tx_input { + Ok(tx_input) => tx_input, + Err(error @ CustomerVolumeReportError::RpcLookup { .. }) + | Err(error @ CustomerVolumeReportError::TransactionNotFound { .. }) => { + let failure = RpcLookupFailureReport::from_error(&candidate, error); + tracing::warn!( + tx_hash = %failure.tx_hash, + sender = %failure.sender, + to = %failure.to_address, + error = %failure.error, + "skipping candidate execution after RPC lookup failure" + ); + rpc_lookup_failures.push(failure); + continue; + } + Err(error) => return Err(error), + }; let selection = select_issued_row(&candidate, &tx_input, &issued_row_index); - if selection.duplicate_match_count > 1 { + if selection.distinct_match_count > 1 { ambiguous_matches += 1; + ambiguous_transactions.push(AmbiguousTransactionReport { + tx_hash: candidate.tx_hash.clone(), + sender: candidate.sender.clone(), + to_address: candidate.orderbook_address.clone(), + block_number: candidate.block_number, + timestamp: candidate.timestamp, + total_input_amount: format_hex(&candidate.total_input_hex)?, + total_output_amount: format_hex(&candidate.total_output_hex)?, + candidates: selection + .distinct_matches + .iter() + .map(|row| AmbiguousMatchCandidateReport { + api_key_id: row.api_key_id, + key_id: row.key_id.clone(), + label: row.label.clone(), + owner: row.owner.clone(), + }) + .collect(), + }); + continue; } match selection.row { @@ -276,7 +384,7 @@ async fn build_report( output_token: row.output_token.clone(), total_input_hex: candidate.total_input_hex.clone(), total_output_hex: candidate.total_output_hex.clone(), - duplicate_match_count: selection.duplicate_match_count, + duplicate_match_count: selection.matching_row_count, }), None => unmatched.push(UnmatchedTransactionReport { tx_hash: candidate.tx_hash.clone(), @@ -320,36 +428,16 @@ async fn build_report( matched_executions: matched_transactions.len(), unmatched_executions: unmatched.len(), ambiguous_matches, + rpc_lookup_failures: rpc_lookup_failures.len(), }, customers, matched_transactions, unmatched_transactions: unmatched, + ambiguous_transactions, + rpc_lookup_failures, }) } -fn orderbooks_for_chain( - raindex: &RaindexProvider, -) -> Result, CustomerVolumeReportError> { - let mut addresses = raindex - .client() - .get_all_orderbooks()? - .into_values() - .filter(|orderbook| orderbook.network.chain_id == crate::CHAIN_ID) - .map(|orderbook| normalize_address(&format!("{:#x}", orderbook.address))) - .collect::>(); - - addresses.sort(); - addresses.dedup(); - - if addresses.is_empty() { - return Err(CustomerVolumeReportError::MissingOrderbooks { - chain_id: crate::CHAIN_ID, - }); - } - - Ok(addresses) -} - fn rpc_urls_for_chain(raindex: &RaindexProvider) -> Result, CustomerVolumeReportError> { let network = raindex.client().get_network_by_chain_id(crate::CHAIN_ID)?; if network.rpcs.is_empty() { @@ -405,9 +493,9 @@ async fn load_issued_rows( CAST(strftime('%s', isc.created_at) AS INTEGER) AS created_at_unix FROM issued_swap_calldata isc WHERE isc.chain_id = ? - AND CAST(strftime('%s', isc.created_at) AS INTEGER) >= ? - AND CAST(strftime('%s', isc.created_at) AS INTEGER) <= ? - ORDER BY created_at_unix DESC, isc.id DESC", + AND isc.created_at >= datetime(?,'unixepoch') + AND isc.created_at <= datetime(?,'unixepoch') + ORDER BY isc.created_at DESC, isc.id DESC", ) .bind(crate::CHAIN_ID as i64) .bind(min_issue_time) @@ -419,7 +507,6 @@ async fn load_issued_rows( async fn load_take_order_rows( raindex_db: &sqlx::Pool, - orderbook_addresses: &[String], start_time: u64, end_time: u64, ) -> Result, CustomerVolumeReportError> { @@ -442,14 +529,7 @@ async fn load_take_order_rows( .push_bind(start_time as i64) .push(" AND block_timestamp <= ") .push_bind(end_time as i64) - .push(" AND orderbook_address IN ("); - - let mut separated = query_builder.separated(", "); - for orderbook in orderbook_addresses { - separated.push_bind(orderbook); - } - separated.push_unseparated(")"); - query_builder.push(" ORDER BY block_number, log_index"); + .push(" ORDER BY block_number, log_index"); query_builder .build_query_as::() @@ -461,7 +541,7 @@ async fn load_take_order_rows( fn aggregate_candidate_rows( rows: Vec, ) -> Result, CustomerVolumeReportError> { - let mut aggregates: HashMap = HashMap::new(); + let mut aggregates: HashMap<(String, String), CandidateExecution> = HashMap::new(); for row in rows { let tx_hash = normalize_hex(&row.transaction_hash); @@ -470,10 +550,11 @@ fn aggregate_candidate_rows( let input_delta = normalize_hex(&row.taker_output); let output_delta = normalize_hex(&row.taker_input); - match aggregates.get_mut(&tx_hash) { + let aggregate_key = (tx_hash.clone(), orderbook_address.clone()); + + match aggregates.get_mut(&aggregate_key) { Some(existing) => { if existing.sender != sender - || existing.orderbook_address != orderbook_address || existing.block_number != row.block_number as u64 || existing.timestamp != row.block_timestamp as u64 { @@ -491,7 +572,7 @@ fn aggregate_candidate_rows( add_hex_amount(&mut total_output_hex, &output_delta)?; aggregates.insert( - tx_hash.clone(), + aggregate_key, CandidateExecution { tx_hash, orderbook_address, @@ -512,13 +593,16 @@ fn aggregate_candidate_rows( .cmp(&right.timestamp) .then_with(|| left.block_number.cmp(&right.block_number)) .then_with(|| left.tx_hash.cmp(&right.tx_hash)) + .then_with(|| left.orderbook_address.cmp(&right.orderbook_address)) }); Ok(candidates) } struct Selection<'a> { row: Option<&'a IssuedSwapRow>, - duplicate_match_count: usize, + matching_row_count: usize, + distinct_match_count: usize, + distinct_matches: Vec<&'a IssuedSwapRow>, } fn build_issued_row_index(issued_rows: &[IssuedSwapRow]) -> IssuedRowIndex<'_> { @@ -566,13 +650,17 @@ fn select_issued_row<'a>( None => { return Selection { row: None, - duplicate_match_count: 0, + matching_row_count: 0, + distinct_match_count: 0, + distinct_matches: Vec::new(), }; } }; let mut matched_row = None; - let mut duplicate_match_count = 0; + let mut matching_row_count = 0; + let mut distinct_matches = Vec::new(); + let mut seen_api_keys = std::collections::HashSet::new(); for row in issued_rows { if row.created_at_unix > candidate_timestamp { @@ -580,19 +668,31 @@ fn select_issued_row<'a>( } if candidate_timestamp <= row.created_at_unix + MATCH_WINDOW_SECS { - duplicate_match_count += 1; + matching_row_count += 1; if matched_row.is_none() { matched_row = Some(*row); } + + if seen_api_keys.insert(row.api_key_id) { + distinct_matches.push(*row); + } continue; } break; } + let row = if distinct_matches.len() == 1 { + matched_row + } else { + None + }; + Selection { - row: matched_row, - duplicate_match_count, + row, + matching_row_count, + distinct_match_count: distinct_matches.len(), + distinct_matches, } } @@ -705,31 +805,43 @@ async fn fetch_transaction_input( let mut last_error = None; for rpc_url in rpc_urls { - let response = match client.post(rpc_url.clone()).json(&payload).send().await { - Ok(response) => response, - Err(error) => { - last_error = Some(error.to_string()); + for attempt in 1..=RPC_LOOKUP_MAX_ATTEMPTS { + let response = match client.post(rpc_url.clone()).json(&payload).send().await { + Ok(response) => response, + Err(error) => { + last_error = Some(error.to_string()); + sleep_before_retry(attempt).await; + continue; + } + }; + + if !response.status().is_success() { + last_error = Some(format!("rpc status {}", response.status())); + sleep_before_retry(attempt).await; continue; } - }; - let envelope = match response - .json::>() - .await - { - Ok(envelope) => envelope, - Err(error) => { - last_error = Some(error.to_string()); - continue; + let envelope = match response + .json::>() + .await + { + Ok(envelope) => envelope, + Err(error) => { + last_error = Some(error.to_string()); + sleep_before_retry(attempt).await; + continue; + } + }; + + if let Some(result) = envelope.result { + return Ok(normalize_hex(&result.input)); } - }; - if let Some(result) = envelope.result { - return Ok(normalize_hex(&result.input)); - } + if let Some(error) = envelope.error { + last_error = Some(error.message); + } - if let Some(error) = envelope.error { - last_error = Some(error.message); + break; } } @@ -745,55 +857,101 @@ async fn fetch_transaction_input( }) } +async fn sleep_before_retry(attempt: u32) { + if attempt >= RPC_LOOKUP_MAX_ATTEMPTS { + return; + } + + tokio::time::sleep(Duration::from_millis( + RPC_LOOKUP_RETRY_BASE_DELAY_MS * u64::from(attempt), + )) + .await; +} + fn print_human_report(report: &CustomerVolumeReport) { println!("Customer volume report"); println!("Window: {} -> {}", report.start_time, report.end_time); println!( - "Matched executed txs: {} | Unmatched executed txs: {} | Ambiguous matches: {}", + "Matched executed txs: {} | Unmatched executed txs: {} | Ambiguous matches: {} | RPC lookup failures: {}", report.audit.matched_executions, report.audit.unmatched_executions, - report.audit.ambiguous_matches + report.audit.ambiguous_matches, + report.audit.rpc_lookup_failures ); println!(); - if report.customers.is_empty() { - println!("No matched customer executions found."); - return; + if !report.rpc_lookup_failures.is_empty() { + println!("RPC lookup failures:"); + for failure in &report.rpc_lookup_failures { + println!( + " {} | sender={} | to={} | block={} | timestamp={} | error={}", + failure.tx_hash, + failure.sender, + failure.to_address, + failure.block_number, + failure.timestamp, + failure.error + ); + } + println!(); } - for customer in &report.customers { - println!( - "{} ({}) [{}] executions={}", - customer.label, customer.owner, customer.key_id, customer.executed_txs - ); - - for pair in &customer.pairs { + if !report.ambiguous_transactions.is_empty() { + println!("Ambiguous transactions:"); + for transaction in &report.ambiguous_transactions { println!( - " {} -> {} | input={} | output={}", - pair.input_token, - pair.output_token, - pair.total_input_amount, - pair.total_output_amount + " {} | sender={} | to={} | block={} | timestamp={} | input={} | output={}", + transaction.tx_hash, + transaction.sender, + transaction.to_address, + transaction.block_number, + transaction.timestamp, + transaction.total_input_amount, + transaction.total_output_amount ); + for candidate in &transaction.candidates { + println!( + " candidate api_key_id={} key_id={} label={} owner={}", + candidate.api_key_id, candidate.key_id, candidate.label, candidate.owner + ); + } } - println!(); } -} -fn normalize_address(value: &str) -> String { - let trimmed = value.trim(); - if let Some(address) = trimmed - .strip_prefix("0x") - .or_else(|| trimmed.strip_prefix("0X")) - { - format!("0x{}", address.to_ascii_lowercase()) + if report.customers.is_empty() { + println!("No matched customer executions found."); } else { - format!("0x{}", trimmed.to_ascii_lowercase()) + for customer in &report.customers { + println!( + "{} ({}) [{}] executions={}", + customer.label, customer.owner, customer.key_id, customer.executed_txs + ); + + for pair in &customer.pairs { + println!( + " {} -> {} | input={} | output={}", + pair.input_token, + pair.output_token, + pair.total_input_amount, + pair.total_output_amount + ); + } + + println!(); + } } } +fn normalize_address(value: &str) -> String { + normalize_prefixed_hex(value) +} + fn normalize_hex(value: &str) -> String { + normalize_prefixed_hex(value) +} + +fn normalize_prefixed_hex(value: &str) -> String { let trimmed = value.trim(); if let Some(hex) = trimmed .strip_prefix("0x") @@ -826,6 +984,10 @@ fn format_hex(value_hex: &str) -> Result { #[cfg(test)] mod tests { use super::*; + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }; #[derive(Debug)] struct SeededMatchContext { @@ -969,6 +1131,43 @@ mod tests { ); } + #[test] + fn test_aggregate_candidate_rows_keeps_same_tx_separate_per_orderbook() { + let rows = vec![ + TakeOrderRow { + orderbook_address: "0xd2938e7c9fe3597f78832ce780feb61945c377d7".to_string(), + transaction_hash: + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(), + block_number: 10, + block_timestamp: 100, + sender: "0x1111111111111111111111111111111111111111".to_string(), + taker_input: Float::parse("0.25".to_string()).expect("float").as_hex(), + taker_output: Float::parse("50".to_string()).expect("float").as_hex(), + }, + TakeOrderRow { + orderbook_address: "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(), + transaction_hash: + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(), + block_number: 10, + block_timestamp: 100, + sender: "0x1111111111111111111111111111111111111111".to_string(), + taker_input: Float::parse("0.20".to_string()).expect("float").as_hex(), + taker_output: Float::parse("40".to_string()).expect("float").as_hex(), + }, + ]; + + let aggregated = aggregate_candidate_rows(rows).expect("aggregate"); + assert_eq!(aggregated.len(), 2); + assert_eq!( + aggregated[0].orderbook_address, + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + ); + assert_eq!( + aggregated[1].orderbook_address, + "0xd2938e7c9fe3597f78832ce780feb61945c377d7" + ); + } + #[test] fn test_select_issued_row_requires_exact_match_and_window() { let rows = vec![ @@ -989,11 +1188,96 @@ mod tests { ); assert_eq!(selection.row.map(|row| row.id), Some(1)); - assert_eq!(selection.duplicate_match_count, 1); + assert_eq!(selection.matching_row_count, 1); + assert_eq!(selection.distinct_match_count, 1); + } + + #[test] + fn test_select_issued_row_rejects_future_issuance() { + let rows = vec![issued_row(1, 1, "alpha", "owner-a", 200)]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 199, + ), + "0xabcdef", + &issued_row_index, + ); + + assert!(selection.row.is_none()); + assert_eq!(selection.matching_row_count, 0); + assert_eq!(selection.distinct_match_count, 0); + } + + #[test] + fn test_select_issued_row_accepts_same_timestamp_issuance() { + let rows = vec![issued_row(1, 1, "alpha", "owner-a", 100)]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 100, + ), + "0xabcdef", + &issued_row_index, + ); + + assert_eq!(selection.row.map(|row| row.id), Some(1)); + assert_eq!(selection.matching_row_count, 1); + assert_eq!(selection.distinct_match_count, 1); + } + + #[test] + fn test_select_issued_row_accepts_exact_match_window_boundary() { + let rows = vec![issued_row(1, 1, "alpha", "owner-a", 100)]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 220, + ), + "0xabcdef", + &issued_row_index, + ); + + assert_eq!(selection.row.map(|row| row.id), Some(1)); + assert_eq!(selection.matching_row_count, 1); + assert_eq!(selection.distinct_match_count, 1); } #[test] - fn test_select_issued_row_prefers_latest_matching_issuance() { + fn test_select_issued_row_rejects_after_match_window_boundary() { + let rows = vec![issued_row(1, 1, "alpha", "owner-a", 100)]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 221, + ), + "0xabcdef", + &issued_row_index, + ); + + assert!(selection.row.is_none()); + assert_eq!(selection.matching_row_count, 0); + assert_eq!(selection.distinct_match_count, 0); + } + + #[test] + fn test_select_issued_row_returns_none_for_ambiguous_distinct_keys() { let rows = vec![ issued_row(1, 1, "alpha", "owner-a", 100), issued_row(2, 2, "beta", "owner-b", 200), @@ -1011,8 +1295,33 @@ mod tests { &issued_row_index, ); + assert!(selection.row.is_none()); + assert_eq!(selection.matching_row_count, 2); + assert_eq!(selection.distinct_match_count, 2); + } + + #[test] + fn test_select_issued_row_treats_same_key_duplicates_as_single_match() { + let rows = vec![ + issued_row(1, 1, "alpha", "owner-a", 100), + issued_row(2, 1, "alpha", "owner-a", 200), + ]; + let issued_row_index = build_issued_row_index(&rows); + + let selection = select_issued_row( + &candidate( + "0xtx", + "0x1111111111111111111111111111111111111111", + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + 210, + ), + "0xabcdef", + &issued_row_index, + ); + assert_eq!(selection.row.map(|row| row.id), Some(2)); - assert_eq!(selection.duplicate_match_count, 2); + assert_eq!(selection.matching_row_count, 2); + assert_eq!(selection.distinct_match_count, 1); } #[test] @@ -1060,19 +1369,11 @@ mod tests { assert_eq!(customers[0].pairs[0].total_output_amount, "0.45"); } - async fn mock_rpc_url_for_input(input: &str) -> String { + async fn mock_rpc_url_with_body(body: String) -> String { let listener = tokio::net::TcpListener::bind("127.0.0.1:0") .await .expect("bind mock rpc server"); let addr = listener.local_addr().expect("mock rpc addr"); - let body = serde_json::json!({ - "jsonrpc": "2.0", - "id": 1, - "result": { - "input": input, - } - }) - .to_string(); tokio::spawn(async move { loop { @@ -1098,10 +1399,107 @@ mod tests { format!("http://{addr}") } + async fn mock_rpc_url_with_bodies(bodies: Vec) -> String { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("bind mock rpc server"); + let addr = listener.local_addr().expect("mock rpc addr"); + let bodies = Arc::new(bodies); + let next_index = Arc::new(AtomicUsize::new(0)); + + tokio::spawn(async move { + loop { + let Ok((mut socket, _)) = listener.accept().await else { + break; + }; + + let bodies = Arc::clone(&bodies); + let next_index = Arc::clone(&next_index); + + tokio::spawn(async move { + let mut buffer = [0u8; 4096]; + let _ = tokio::io::AsyncReadExt::read(&mut socket, &mut buffer).await; + + let index = next_index.fetch_add(1, Ordering::SeqCst); + let body = bodies + .get(index) + .or_else(|| bodies.last()) + .cloned() + .expect("at least one mock rpc body"); + + let response = format!( + "HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}", + body.len(), + body + ); + let _ = + tokio::io::AsyncWriteExt::write_all(&mut socket, response.as_bytes()).await; + }); + } + }); + + format!("http://{addr}") + } + + async fn mock_rpc_url_for_input(input: &str) -> String { + mock_rpc_url_with_body( + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": { + "input": input, + } + }) + .to_string(), + ) + .await + } + + async fn mock_rpc_url_for_missing_transaction() -> String { + mock_rpc_url_with_body( + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": serde_json::Value::Null, + }) + .to_string(), + ) + .await + } + + async fn mock_rpc_url_for_rpc_error(message: &str) -> String { + mock_rpc_url_with_body( + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "error": { + "message": message, + } + }) + .to_string(), + ) + .await + } + async fn seed_raindex_take_orders( db_path: &std::path::Path, tx_hash: &str, block_timestamp: i64, + ) { + seed_raindex_take_orders_for_orderbook( + db_path, + tx_hash, + block_timestamp, + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + ) + .await; + } + + async fn seed_raindex_take_orders_for_orderbook( + db_path: &std::path::Path, + tx_hash: &str, + block_timestamp: i64, + orderbook_address: &str, ) { let pool = SqlitePoolOptions::new() .max_connections(1) @@ -1144,7 +1542,7 @@ mod tests { ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(crate::CHAIN_ID as i64) - .bind("0xd2938e7c9fe3597f78832ce780feb61945c377d7") + .bind(orderbook_address) .bind(tx_hash) .bind(10_i64) .bind(block_timestamp) @@ -1158,6 +1556,10 @@ mod tests { } fn test_registry_settings(rpc_url: &str) -> String { + test_registry_settings_with_orderbook(rpc_url, "0xd2938e7c9fe3597f78832ce780feb61945c377d7") + } + + fn test_registry_settings_with_orderbook(rpc_url: &str, orderbook_address: &str) -> String { format!( "version: 4 networks: @@ -1170,7 +1572,7 @@ subgraphs: base: https://example.com/subgraph orderbooks: base: - address: 0xd2938e7c9fe3597f78832ce780feb61945c377d7 + address: {orderbook_address} network: base subgraph: base deployment-block: 0 @@ -1191,7 +1593,28 @@ tokens: tx_input: &str, ) -> crate::raindex::RaindexProvider { let rpc_url = mock_rpc_url_for_input(tx_input).await; - let settings = test_registry_settings(&rpc_url); + load_test_raindex_with_rpc_url(db_path, &rpc_url).await + } + + async fn load_test_raindex_with_rpc_url( + db_path: std::path::PathBuf, + rpc_url: &str, + ) -> crate::raindex::RaindexProvider { + let settings = test_registry_settings(rpc_url); + let registry_url = + crate::test_helpers::mock_raindex_registry_url_with_settings(&settings).await; + + crate::raindex::RaindexProvider::load(®istry_url, Some(db_path)) + .await + .expect("load test raindex provider") + } + + async fn load_test_raindex_with_rpc_url_and_orderbook( + db_path: std::path::PathBuf, + rpc_url: &str, + orderbook_address: &str, + ) -> crate::raindex::RaindexProvider { + let settings = test_registry_settings_with_orderbook(rpc_url, orderbook_address); let registry_url = crate::test_helpers::mock_raindex_registry_url_with_settings(&settings).await; @@ -1221,6 +1644,7 @@ tokens: assert_eq!(report.audit.matched_executions, 1); assert_eq!(report.audit.unmatched_executions, 0); assert_eq!(report.audit.ambiguous_matches, 0); + assert_eq!(report.audit.rpc_lookup_failures, 0); assert_eq!(report.customers.len(), 1); assert_eq!(report.customers[0].api_key_id, expected.api_key_id); assert_eq!(report.customers[0].key_id, expected.key_id); @@ -1252,6 +1676,8 @@ tokens: assert_eq!(report.matched_transactions[0].total_input_amount, "50"); assert_eq!(report.matched_transactions[0].total_output_amount, "0.25"); assert_eq!(report.unmatched_transactions.len(), 0); + assert!(report.ambiguous_transactions.is_empty()); + assert!(report.rpc_lookup_failures.is_empty()); } #[tokio::test] @@ -1282,6 +1708,267 @@ tokens: assert_eq!(report.customers[0].label, "alpha"); assert_eq!(report.customers[0].owner, "owner-a"); assert_eq!(report.matched_transactions.len(), 1); - assert_eq!(report.matched_transactions[0].key_id, report.customers[0].key_id); + assert_eq!( + report.matched_transactions[0].key_id, + report.customers[0].key_id + ); + assert!(report.ambiguous_transactions.is_empty()); + assert!(report.rpc_lookup_failures.is_empty()); + } + + #[tokio::test] + async fn test_build_report_uses_chain_history_not_current_orderbook_registry() { + let pool = test_pool().await; + let expected = seed_match_context(&pool, 100).await; + let tx_hash = "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders_for_orderbook( + &db_path, + tx_hash, + 105, + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + ) + .await; + + let rpc_url = mock_rpc_url_for_input("0xabcdef").await; + let raindex = load_test_raindex_with_rpc_url_and_orderbook( + db_path, + &rpc_url, + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + ) + .await; + + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.audit.matched_executions, 1); + assert_eq!(report.matched_transactions.len(), 1); + assert_eq!(report.matched_transactions[0].tx_hash, tx_hash); + assert_eq!( + report.matched_transactions[0].api_key_id, + expected.api_key_id + ); + assert!(report.unmatched_transactions.is_empty()); + assert!(report.ambiguous_transactions.is_empty()); + assert!(report.rpc_lookup_failures.is_empty()); + } + + #[tokio::test] + async fn test_build_report_does_not_match_when_execution_precedes_issuance() { + let pool = test_pool().await; + seed_match_context(&pool, 105).await; + let tx_hash = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders(&db_path, tx_hash, 90).await; + + let raindex = load_test_raindex(db_path, "0xabcdef").await; + + let report = build_report(&pool, &raindex, 80, 110) + .await + .expect("build report"); + + assert_eq!(report.audit.matched_executions, 0); + assert_eq!(report.matched_transactions.len(), 0); + assert_eq!(report.audit.unmatched_executions, 1); + assert_eq!(report.unmatched_transactions.len(), 1); + assert_eq!(report.unmatched_transactions[0].tx_hash, tx_hash); + assert!(report.ambiguous_transactions.is_empty()); + assert!(report.rpc_lookup_failures.is_empty()); + } + + #[tokio::test] + async fn test_build_report_excludes_issued_rows_after_execution() { + let pool = test_pool().await; + seed_match_context(&pool, 111).await; + let tx_hash = "0xabababababababababababababababababababababababababababababababab"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders(&db_path, tx_hash, 110).await; + + let raindex = load_test_raindex(db_path, "0xabcdef").await; + + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.audit.matched_executions, 0); + assert_eq!(report.matched_transactions.len(), 0); + assert_eq!(report.audit.unmatched_executions, 1); + assert_eq!(report.unmatched_transactions.len(), 1); + assert_eq!(report.unmatched_transactions[0].tx_hash, tx_hash); + assert!(report.ambiguous_transactions.is_empty()); + assert!(report.rpc_lookup_failures.is_empty()); + } + + #[tokio::test] + async fn test_build_report_keeps_same_tx_hash_separate_per_orderbook() { + let pool = test_pool().await; + let expected = seed_match_context(&pool, 100).await; + let tx_hash = "0xedededededededededededededededededededededededededededededededed"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders_for_orderbook( + &db_path, + tx_hash, + 105, + "0xd2938e7c9fe3597f78832ce780feb61945c377d7", + ) + .await; + seed_raindex_take_orders_for_orderbook( + &db_path, + tx_hash, + 105, + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + ) + .await; + + let raindex = load_test_raindex(db_path, "0xabcdef").await; + + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.audit.matched_executions, 1); + assert_eq!(report.audit.unmatched_executions, 1); + assert_eq!(report.matched_transactions.len(), 1); + assert_eq!(report.matched_transactions[0].tx_hash, tx_hash); + assert_eq!( + report.matched_transactions[0].api_key_id, + expected.api_key_id + ); + assert_eq!(report.unmatched_transactions.len(), 1); + assert_eq!(report.unmatched_transactions[0].tx_hash, tx_hash); + assert_eq!( + report.unmatched_transactions[0].to_address, + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + ); + assert!(report.ambiguous_transactions.is_empty()); + assert!(report.rpc_lookup_failures.is_empty()); + } + + #[tokio::test] + async fn test_build_report_records_ambiguous_transactions_without_attribution() { + let pool = test_pool().await; + let first = seed_match_context(&pool, 100).await; + let second = seed_match_context(&pool, 101).await; + let tx_hash = "0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders(&db_path, tx_hash, 105).await; + + let raindex = load_test_raindex(db_path, "0xabcdef").await; + + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.audit.matched_executions, 0); + assert_eq!(report.audit.unmatched_executions, 0); + assert_eq!(report.audit.ambiguous_matches, 1); + assert!(report.customers.is_empty()); + assert!(report.matched_transactions.is_empty()); + assert!(report.unmatched_transactions.is_empty()); + assert_eq!(report.ambiguous_transactions.len(), 1); + assert_eq!(report.ambiguous_transactions[0].tx_hash, tx_hash); + assert_eq!(report.ambiguous_transactions[0].candidates.len(), 2); + assert_eq!( + report.ambiguous_transactions[0].candidates[0].api_key_id, + second.api_key_id + ); + assert_eq!( + report.ambiguous_transactions[0].candidates[1].api_key_id, + first.api_key_id + ); + assert!(report.rpc_lookup_failures.is_empty()); + } + + #[tokio::test] + async fn test_build_report_records_rpc_lookup_failures_without_aborting() { + let pool = test_pool().await; + let tx_hash = "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc"; + + let temp_dir = tempfile::tempdir().expect("temp dir"); + let db_path = temp_dir.path().join("raindex.db"); + seed_raindex_take_orders(&db_path, tx_hash, 105).await; + + let rpc_url = mock_rpc_url_for_missing_transaction().await; + let raindex = load_test_raindex_with_rpc_url(db_path, &rpc_url).await; + + let report = build_report(&pool, &raindex, 90, 110) + .await + .expect("build report"); + + assert_eq!(report.audit.matched_executions, 0); + assert_eq!(report.audit.unmatched_executions, 0); + assert_eq!(report.audit.ambiguous_matches, 0); + assert_eq!(report.audit.rpc_lookup_failures, 1); + assert!(report.customers.is_empty()); + assert!(report.matched_transactions.is_empty()); + assert!(report.unmatched_transactions.is_empty()); + assert!(report.ambiguous_transactions.is_empty()); + assert_eq!(report.rpc_lookup_failures.len(), 1); + assert_eq!(report.rpc_lookup_failures[0].tx_hash, tx_hash); + assert_eq!( + report.rpc_lookup_failures[0].error, + "transaction not found via RPC" + ); + } + + #[tokio::test] + async fn test_fetch_transaction_input_retries_same_rpc_url_after_invalid_json() { + let rpc_url = mock_rpc_url_with_bodies(vec![ + "not-json".to_string(), + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": { + "input": "0xabcdef", + } + }) + .to_string(), + ]) + .await; + let client = HttpClient::builder() + .timeout(Duration::from_secs(RPC_LOOKUP_TIMEOUT_SECS)) + .build() + .expect("http client"); + + let input = + fetch_transaction_input(&client, &[Url::parse(&rpc_url).expect("rpc url")], "0xtx") + .await + .expect("rpc retry should succeed"); + + assert_eq!(input, "0xabcdef"); + } + + #[tokio::test] + async fn test_fetch_transaction_input_falls_back_to_next_rpc_url() { + let failing_rpc_url = mock_rpc_url_for_rpc_error("upstream unavailable").await; + let succeeding_rpc_url = mock_rpc_url_for_input("0xabcdef").await; + let client = HttpClient::builder() + .timeout(Duration::from_secs(RPC_LOOKUP_TIMEOUT_SECS)) + .build() + .expect("http client"); + + let input = fetch_transaction_input( + &client, + &[ + Url::parse(&failing_rpc_url).expect("failing rpc url"), + Url::parse(&succeeding_rpc_url).expect("succeeding rpc url"), + ], + "0xtx", + ) + .await + .expect("rpc fallback should succeed"); + + assert_eq!(input, "0xabcdef"); } } diff --git a/src/routes/swap/calldata.rs b/src/routes/swap/calldata.rs index 3720877..2406cb4 100644 --- a/src/routes/swap/calldata.rs +++ b/src/routes/swap/calldata.rs @@ -4,6 +4,8 @@ use crate::db::DbPool; use crate::error::{ApiError, ApiErrorResponse}; use crate::fairings::{GlobalRateLimit, TracingSpan}; use crate::types::swap::{SwapCalldataRequest, SwapCalldataResponse}; +use alloy::hex::encode_prefixed; +use alloy::primitives::keccak256; use rain_orderbook_common::raindex_client::take_orders::TakeOrdersRequest; use rain_orderbook_common::take_orders::TakeOrdersMode; use rocket::serde::json::Json; @@ -73,7 +75,16 @@ async fn handle_swap_calldata( req: SwapCalldataRequest, ) -> Result { let response = process_swap_calldata(ds, &req).await?; - persist_issued_swap_calldata(pool, key, &req, &response).await?; + if let Err(error) = persist_issued_swap_calldata(pool, key, &req, &response).await { + tracing::warn!( + error = %error, + api_key_id = key.id, + key_id = %key.key_id, + taker = %req.taker, + to = %response.to, + "continuing after issued swap calldata persistence failure" + ); + } Ok(response) } @@ -82,37 +93,30 @@ async fn persist_issued_swap_calldata( key: &AuthenticatedKey, request: &SwapCalldataRequest, response: &SwapCalldataResponse, -) -> Result<(), ApiError> { +) -> Result<(), sqlx::Error> { if !should_persist_issued_swap_calldata(response) { return Ok(()); } - let key_snapshot = crate::db::issued_swap_calldata::IssuedSwapCalldataKeySnapshot { + let record = crate::db::issued_swap_calldata::NewIssuedSwapCalldata { api_key_id: key.id, - key_id: &key.key_id, - label: &key.label, - owner: &key.owner, + key_id: key.key_id.clone(), + label: key.label.clone(), + owner: key.owner.clone(), + chain_id: crate::CHAIN_ID as i64, + taker: format!("{:#x}", request.taker), + to_address: format!("{:#x}", response.to), + tx_value: response.value.to_string(), + calldata: encode_prefixed(response.data.as_ref()), + calldata_hash: encode_prefixed(keccak256(response.data.as_ref())), + input_token: format!("{:#x}", request.input_token), + output_token: format!("{:#x}", request.output_token), + output_amount: request.output_amount.clone(), + maximum_io_ratio: request.maximum_io_ratio.clone(), + estimated_input: response.estimated_input.clone(), }; - crate::db::issued_swap_calldata::insert( - pool, - &key_snapshot, - crate::CHAIN_ID, - request, - response, - ) - .await - .map_err(|error| { - tracing::error!( - error = %error, - api_key_id = key.id, - key_id = %key.key_id, - taker = %request.taker, - to = %response.to, - "failed to persist issued swap calldata" - ); - ApiError::Internal("failed to persist issued swap calldata".into()) - })?; + crate::db::issued_swap_calldata::insert(pool, &record).await?; tracing::info!( api_key_id = key.id, @@ -464,10 +468,15 @@ mod tests { } #[rocket::async_test] - async fn test_handle_swap_calldata_returns_internal_if_persistence_fails() { + async fn test_handle_swap_calldata_returns_response_if_persistence_fails() { let client = TestClientBuilder::new().build().await; let pool = client.rocket().state::().expect("pool"); + sqlx::query("DROP TABLE issued_swap_calldata") + .execute(pool) + .await + .expect("drop issued swap calldata table"); + let ds = MockSwapDataSource { orders: Ok(vec![]), candidates: vec![], @@ -482,12 +491,11 @@ mod tests { is_admin: false, }; - let result = handle_swap_calldata(&ds, pool, &key, calldata_request("100", "2.5")).await; - assert!(matches!( - result, - Err(ApiError::Internal(msg)) if msg == "failed to persist issued swap calldata" - )); - assert_eq!(issued_row_count(pool).await, 0); + let result = handle_swap_calldata(&ds, pool, &key, calldata_request("100", "2.5")) + .await + .expect("swap calldata should still succeed"); + assert_eq!(result.to, ORDERBOOK); + assert_eq!(result.data, ready_response().data); } #[rocket::async_test]