Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ enum StartupError {
routes::admin::put_registry,
routes::trades::get_trades_by_tx,
routes::trades::get_trades_by_address,
routes::trades::get_taker_trades,
routes::trades::post_trades_batch,
routes::registry::get_registry,
routes::registry::get_registry_history,
),
Expand Down
143 changes: 143 additions & 0 deletions src/routes/trades/get_taker_trades.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use crate::auth::AuthenticatedKey;
use crate::cache::AppCache;
use crate::error::{ApiError, ApiErrorResponse};
use crate::fairings::{GlobalRateLimit, TracingSpan};
use crate::types::common::ValidatedAddress;
use crate::types::trades::{
TakerTradesResponse, TradesByTxResponse, TradesPagination, TradesPaginationParams,
};
use alloy::primitives::{Address, B256};
use rocket::serde::json::Json;
use rocket::State;
use std::time::Duration;
use tracing::Instrument;

const TAKER_TX_HASH_CACHE_TTL: Duration = Duration::from_secs(15);
const TAKER_TX_HASH_CACHE_CAPACITY: u64 = 1_000;

pub(crate) type TakerTradesTxHashCache = AppCache<Address, Vec<(B256, u64)>>;

pub(crate) fn taker_trades_tx_hash_cache() -> TakerTradesTxHashCache {
AppCache::new(TAKER_TX_HASH_CACHE_CAPACITY, TAKER_TX_HASH_CACHE_TTL)
}

pub(crate) async fn process_get_taker_trades(
ds: &dyn super::TradesDataSource,
direct_trades: Option<&crate::direct_trades::DirectTradesFetcher>,
trades_by_tx_cache: &super::TradesByTxCache,
taker_tx_cache: &TakerTradesTxHashCache,
sender: Address,
params: TradesPaginationParams,
) -> Result<TakerTradesResponse, ApiError> {
// Step 1: Get tx hashes (cached)
let tx_hashes = match direct_trades {
Some(fetcher) => taker_tx_cache
.get_or_try_insert(sender, || async {
fetcher.fetch_taker_tx_hashes(&sender).await
})
.await
.map_err(ApiError::from)?,
None => {
tracing::warn!("direct trades fetcher unavailable; returning empty taker trades");
return Ok(TakerTradesResponse {
market_orders: vec![],
pagination: TradesPagination {
page: 1,
page_size: params.page_size.unwrap_or(20),
total_trades: 0,
total_pages: 0,
has_more: false,
},
});
}
};

// Step 2: Paginate
let page = params.page.unwrap_or(1);
let page_size = params.page_size.unwrap_or(20);
let total = tx_hashes.len() as u64;
let total_pages = if page_size == 0 {
0
} else {
total.div_ceil(u64::from(page_size))
};
let offset = (u64::from(page.saturating_sub(1)) * u64::from(page_size)) as usize;
let page_hashes: Vec<B256> = if offset >= tx_hashes.len() {
vec![]
} else {
let end = std::cmp::min(offset + page_size as usize, tx_hashes.len());
tx_hashes[offset..end].iter().map(|(h, _)| *h).collect()
};

// Step 3: Resolve each tx via existing cached trade-by-tx lookup
let mut market_orders = Vec::with_capacity(page_hashes.len());
for tx_hash in page_hashes {
match super::get_cached_trades_by_tx(trades_by_tx_cache, ds, tx_hash, None).await {
Ok(tx_trades) => market_orders.push(tx_trades),
Err(e) => {
tracing::warn!(tx_hash = %tx_hash, error = %e, "failed to resolve taker tx; skipping");
}
}
}

Ok(TakerTradesResponse {
market_orders,
pagination: TradesPagination {
page,
page_size,
total_trades: total,
total_pages,
has_more: u64::from(page) < total_pages,
},
})
}

#[utoipa::path(
get,
path = "/v1/trades/taker/{address}",
tag = "Trades",
security(("basicAuth" = [])),
params(
("address" = String, Path, description = "Taker address"),
TradesPaginationParams,
),
responses(
(status = 200, description = "Paginated list of market orders (taker transactions)", body = TakerTradesResponse),
(status = 400, description = "Bad request", body = ApiErrorResponse),
(status = 401, description = "Unauthorized", body = ApiErrorResponse),
(status = 429, description = "Rate limited", body = ApiErrorResponse),
(status = 500, description = "Internal server error", body = ApiErrorResponse),
)
)]
#[get("/taker/<address>?<params..>")]
pub async fn get_taker_trades(
_global: GlobalRateLimit,
_key: AuthenticatedKey,
shared_raindex: &State<crate::raindex::SharedRaindexProvider>,
trades_by_tx_cache: &State<super::TradesByTxCache>,
taker_tx_cache: &State<TakerTradesTxHashCache>,
direct_trades: &State<Option<crate::direct_trades::DirectTradesFetcher>>,
span: TracingSpan,
address: ValidatedAddress,
params: TradesPaginationParams,
) -> Result<Json<TakerTradesResponse>, ApiError> {
async move {
tracing::info!(address = ?address, params = ?params, "taker trades request received");
let raindex = shared_raindex.read().await;
let ds = super::RaindexTradesDataSource {
client: raindex.client(),
};
let response = process_get_taker_trades(
&ds,
direct_trades.inner().as_ref(),
trades_by_tx_cache,
taker_tx_cache,
address.0,
params,
)
.await?;
Ok(Json(response))
}
.instrument(span.0)
.await
}
188 changes: 188 additions & 0 deletions src/routes/trades/post_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use crate::auth::AuthenticatedKey;
use crate::cache::AppCache;
use crate::error::{ApiError, ApiErrorResponse};
use crate::fairings::{GlobalRateLimit, TracingSpan};
use crate::types::order::OrderTradeEntry;
use crate::types::trades::{TradesBatchEntry, TradesBatchRequest, TradesBatchResponse};
use alloy::primitives::B256;
use futures::future::join_all;
use rocket::serde::json::Json;
use rocket::State;
use std::time::{Duration, Instant};
use tracing::Instrument;

const TRADES_BY_ORDER_HASH_CACHE_TTL: Duration = Duration::from_secs(60);
const TRADES_BY_ORDER_HASH_CACHE_CAPACITY: u64 = 1_000;
const TRADES_BATCH_MAX_HASHES: usize = 50;

pub(crate) type TradesByOrderHashCache = AppCache<B256, Vec<OrderTradeEntry>>;

pub(crate) fn trades_by_order_hash_cache() -> TradesByOrderHashCache {
AppCache::new(
TRADES_BY_ORDER_HASH_CACHE_CAPACITY,
TRADES_BY_ORDER_HASH_CACHE_TTL,
)
}

async fn fetch_trades_for_hash(
ds: &dyn super::TradesDataSource,
hash: B256,
) -> Result<Vec<OrderTradeEntry>, ApiError> {
let order = match ds.find_order_by_hash(hash).await? {
Some(o) => o,
None => return Ok(vec![]),
};
let trades = ds.get_order_trades(&order, None, None).await?;
Ok(trades.iter().map(super::super::order::map_trade).collect())
}

pub(crate) async fn process_trades_batch(
ds: &dyn super::TradesDataSource,
cache: &TradesByOrderHashCache,
direct_trades: Option<&crate::direct_trades::DirectTradesFetcher>,
hashes: Vec<B256>,
) -> Result<TradesBatchResponse, ApiError> {
let total_start = Instant::now();

let mut cached_map: std::collections::HashMap<B256, Vec<OrderTradeEntry>> =
std::collections::HashMap::new();
let mut uncached: Vec<B256> = Vec::new();

for &hash in &hashes {
if let Some(trades) = cache.get(&hash).await {
cached_map.insert(hash, trades);
} else {
uncached.push(hash);
}
}

tracing::info!(
total_hashes = hashes.len(),
cached = cached_map.len(),
uncached = uncached.len(),
"batch trades cache check"
);

if !uncached.is_empty() {
if let Some(fetcher) = direct_trades {
// Fast path: single batch query via direct SQLite connection
match fetcher.batch_fetch(&uncached).await {
Ok(batch_result) => {
for &hash in &uncached {
let trades = batch_result.get(&hash).cloned().unwrap_or_default();
cache.insert(hash, trades.clone()).await;
cached_map.insert(hash, trades);
}
}
Err(e) => {
tracing::warn!(error = %e, "direct batch trades failed; falling back to library");
let results =
join_all(uncached.iter().map(|&hash| fetch_trades_for_hash(ds, hash)))
.await;
for (&hash, result) in uncached.iter().zip(results) {
match result {
Ok(trades) => {
cache.insert(hash, trades.clone()).await;
cached_map.insert(hash, trades);
}
Err(e) => {
tracing::warn!(order_hash = %hash, error = %e, "failed to fetch trades for order in batch");
cached_map.insert(hash, vec![]);
}
}
}
}
}
} else {
// Fallback: N parallel queries via library
let results =
join_all(uncached.iter().map(|&hash| fetch_trades_for_hash(ds, hash))).await;
for (&hash, result) in uncached.iter().zip(results) {
match result {
Ok(trades) => {
cache.insert(hash, trades.clone()).await;
cached_map.insert(hash, trades);
}
Err(e) => {
tracing::warn!(order_hash = %hash, error = %e, "failed to fetch trades for order in batch");
cached_map.insert(hash, vec![]);
}
}
}
}
}

let entries = hashes
.iter()
.map(|hash| TradesBatchEntry {
order_hash: *hash,
trades: cached_map.remove(hash).unwrap_or_default(),
})
.collect();

tracing::info!(
total_duration_ms = total_start.elapsed().as_millis(),
total_hashes = hashes.len(),
"batch trades request processed"
);

Ok(TradesBatchResponse { orders: entries })
}

#[utoipa::path(
post,
path = "/v1/trades/batch",
tag = "Trades",
security(("basicAuth" = [])),
request_body = TradesBatchRequest,
responses(
(status = 200, description = "Trades grouped by order hash", body = TradesBatchResponse),
(status = 400, description = "Bad request", body = ApiErrorResponse),
(status = 401, description = "Unauthorized", body = ApiErrorResponse),
(status = 429, description = "Rate limited", body = ApiErrorResponse),
(status = 500, description = "Internal server error", body = ApiErrorResponse),
)
)]
#[post("/batch", data = "<body>")]
pub async fn post_trades_batch(
_global: GlobalRateLimit,
_key: AuthenticatedKey,
shared_raindex: &State<crate::raindex::SharedRaindexProvider>,
trades_by_order_hash_cache: &State<TradesByOrderHashCache>,
direct_trades: &State<Option<crate::direct_trades::DirectTradesFetcher>>,
span: TracingSpan,
body: Json<TradesBatchRequest>,
) -> Result<Json<TradesBatchResponse>, ApiError> {
async move {
tracing::info!(
hash_count = body.order_hashes.len(),
"batch trades request received"
);

if body.order_hashes.is_empty() {
return Ok(Json(TradesBatchResponse { orders: vec![] }));
}

if body.order_hashes.len() > TRADES_BATCH_MAX_HASHES {
return Err(ApiError::BadRequest(format!(
"maximum {} order hashes per batch request",
TRADES_BATCH_MAX_HASHES
)));
}

let raindex = shared_raindex.read().await;
let ds = super::RaindexTradesDataSource {
client: raindex.client(),
};
let response = process_trades_batch(
&ds,
trades_by_order_hash_cache,
direct_trades.inner().as_ref(),
body.into_inner().order_hashes,
)
.await?;
Ok(Json(response))
}
.instrument(span.0)
.await
}
28 changes: 28 additions & 0 deletions src/types/trades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,31 @@ pub struct TradesByTxResponse {
pub trades: Vec<TradeByTxEntry>,
pub totals: TradesTotals,
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TradesBatchRequest {
#[schema(value_type = Vec<String>)]
pub order_hashes: Vec<alloy::primitives::B256>,
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TradesBatchEntry {
#[schema(value_type = String)]
pub order_hash: alloy::primitives::B256,
pub trades: Vec<crate::types::order::OrderTradeEntry>,
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TradesBatchResponse {
pub orders: Vec<TradesBatchEntry>,
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct TakerTradesResponse {
pub market_orders: Vec<TradesByTxResponse>,
pub pagination: TradesPagination,
}
Loading