From 269ae2161f6c9ed13bd6d9a83c5ec6af6ef97821 Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Wed, 29 Apr 2026 10:31:43 +0100 Subject: [PATCH 1/2] fix: global v1 send concurrency across chunks --- forester/src/processor/v1/send_transaction.rs | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/forester/src/processor/v1/send_transaction.rs b/forester/src/processor/v1/send_transaction.rs index 86e7bfb650..558ce18096 100644 --- a/forester/src/processor/v1/send_transaction.rs +++ b/forester/src/processor/v1/send_transaction.rs @@ -16,6 +16,7 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }; +use tokio::sync::Semaphore; use tokio::time::Instant; use tracing::{error, info, trace, warn}; @@ -40,6 +41,7 @@ struct PreparedBatchData { struct ChunkSendContext { pool: Arc>, max_concurrent_sends: usize, + global_send_semaphore: Arc, timeout_deadline: Instant, cancel_signal: Arc, num_sent_transactions: Arc, @@ -171,19 +173,26 @@ pub async fn send_batched_transactions = chunks - .into_iter() + let mut chunk_results = futures::stream::iter( + chunks + .into_iter() .map(|work_chunk| { let pool = Arc::clone(&pool); let transaction_builder = Arc::clone(&transaction_builder); let cancel_signal = Arc::clone(&operation_cancel_signal); let num_sent = Arc::clone(&num_sent_transactions); + let global_send_semaphore = Arc::clone(&global_send_semaphore); let payer = payer.insecure_clone(); let derivation = *derivation; let tree_id = tree_accounts.merkle_tree; @@ -232,6 +241,7 @@ pub async fn send_batched_transactions(()) } }) - .collect(); + ) + .buffer_unordered(chunk_concurrency_limit); - let results = futures::future::join_all(chunk_futures).await; - for result in results { + while let Some(result) = chunk_results.next().await { if let Err(ForesterError::NotEligible) = result { return Err(ForesterError::NotEligible); } @@ -404,6 +414,7 @@ async fn execute_transaction_chunk_sending( let pool = Arc::clone(&context.pool); let cancel_signal = Arc::clone(&context.cancel_signal); let num_sent_transactions = Arc::clone(&context.num_sent_transactions); + let global_send_semaphore = Arc::clone(&context.global_send_semaphore); let timeout_deadline = context.timeout_deadline; let max_concurrent_sends = context.max_concurrent_sends; let confirmation = context.confirmation; @@ -411,6 +422,7 @@ async fn execute_transaction_chunk_sending( let pool_clone = Arc::clone(&pool); let cancel_signal_clone = Arc::clone(&cancel_signal); let num_sent_transactions_clone = Arc::clone(&num_sent_transactions); + let global_send_semaphore_clone = Arc::clone(&global_send_semaphore); let tx_label = prepared_transaction.label().to_string(); async move { @@ -418,6 +430,14 @@ async fn execute_transaction_chunk_sending( return TransactionSendResult::Cancelled; // Or Timeout } + let _send_permit = match global_send_semaphore_clone.acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + warn!("Global send semaphore closed unexpectedly"); + return TransactionSendResult::Cancelled; + } + }; + let tx_signature = prepared_transaction .signature() .unwrap_or_default(); From 3d629b3e9be9921795b667409000d0906f2d4032 Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Wed, 29 Apr 2026 11:06:37 +0100 Subject: [PATCH 2/2] fix(ci): rustfmt tokio imports --- forester/src/cli.rs | 1 - forester/src/processor/v1/send_transaction.rs | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/forester/src/cli.rs b/forester/src/cli.rs index 359ce9f0f0..b6c1769e82 100644 --- a/forester/src/cli.rs +++ b/forester/src/cli.rs @@ -306,7 +306,6 @@ pub struct StartArgs { #[arg( long, env = "WORK_ITEM_BATCH_SIZE", - value_parser = clap::value_parser!(usize).range(1..), help = "Number of queue items to process per batch cycle. Smaller values reduce blockhash expiry risk, larger values reduce per-batch overhead." )] pub work_item_batch_size: Option, diff --git a/forester/src/processor/v1/send_transaction.rs b/forester/src/processor/v1/send_transaction.rs index 558ce18096..4e79f9b9c6 100644 --- a/forester/src/processor/v1/send_transaction.rs +++ b/forester/src/processor/v1/send_transaction.rs @@ -16,8 +16,7 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }; -use tokio::sync::Semaphore; -use tokio::time::Instant; +use tokio::{sync::Semaphore, time::Instant}; use tracing::{error, info, trace, warn}; use crate::{