diff --git a/forester/src/cli.rs b/forester/src/cli.rs index 359ce9f0f0..b6c1769e82 100644 --- a/forester/src/cli.rs +++ b/forester/src/cli.rs @@ -306,7 +306,6 @@ pub struct StartArgs { #[arg( long, env = "WORK_ITEM_BATCH_SIZE", - value_parser = clap::value_parser!(usize).range(1..), help = "Number of queue items to process per batch cycle. Smaller values reduce blockhash expiry risk, larger values reduce per-batch overhead." )] pub work_item_batch_size: Option, diff --git a/forester/src/processor/v1/send_transaction.rs b/forester/src/processor/v1/send_transaction.rs index 86e7bfb650..4e79f9b9c6 100644 --- a/forester/src/processor/v1/send_transaction.rs +++ b/forester/src/processor/v1/send_transaction.rs @@ -16,7 +16,7 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }; -use tokio::time::Instant; +use tokio::{sync::Semaphore, time::Instant}; use tracing::{error, info, trace, warn}; use crate::{ @@ -40,6 +40,7 @@ struct PreparedBatchData { struct ChunkSendContext { pool: Arc>, max_concurrent_sends: usize, + global_send_semaphore: Arc, timeout_deadline: Instant, cancel_signal: Arc, num_sent_transactions: Arc, @@ -171,19 +172,26 @@ pub async fn send_batched_transactions = chunks - .into_iter() + let mut chunk_results = futures::stream::iter( + chunks + .into_iter() .map(|work_chunk| { let pool = Arc::clone(&pool); let transaction_builder = Arc::clone(&transaction_builder); let cancel_signal = Arc::clone(&operation_cancel_signal); let num_sent = Arc::clone(&num_sent_transactions); + let global_send_semaphore = Arc::clone(&global_send_semaphore); let payer = payer.insecure_clone(); let derivation = *derivation; let tree_id = tree_accounts.merkle_tree; @@ -232,6 +240,7 @@ pub async fn send_batched_transactions(()) } }) - .collect(); + ) + .buffer_unordered(chunk_concurrency_limit); - let results = futures::future::join_all(chunk_futures).await; - for result in results { + while let Some(result) = chunk_results.next().await { if let Err(ForesterError::NotEligible) = result { return Err(ForesterError::NotEligible); } @@ -404,6 +413,7 @@ async fn execute_transaction_chunk_sending( let pool = Arc::clone(&context.pool); let cancel_signal = Arc::clone(&context.cancel_signal); let num_sent_transactions = Arc::clone(&context.num_sent_transactions); + let global_send_semaphore = Arc::clone(&context.global_send_semaphore); let timeout_deadline = context.timeout_deadline; let max_concurrent_sends = context.max_concurrent_sends; let confirmation = context.confirmation; @@ -411,6 +421,7 @@ async fn execute_transaction_chunk_sending( let pool_clone = Arc::clone(&pool); let cancel_signal_clone = Arc::clone(&cancel_signal); let num_sent_transactions_clone = Arc::clone(&num_sent_transactions); + let global_send_semaphore_clone = Arc::clone(&global_send_semaphore); let tx_label = prepared_transaction.label().to_string(); async move { @@ -418,6 +429,14 @@ async fn execute_transaction_chunk_sending( return TransactionSendResult::Cancelled; // Or Timeout } + let _send_permit = match global_send_semaphore_clone.acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + warn!("Global send semaphore closed unexpectedly"); + return TransactionSendResult::Cancelled; + } + }; + let tx_signature = prepared_transaction .signature() .unwrap_or_default();