Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion forester/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ pub struct StartArgs {
#[arg(
long,
env = "WORK_ITEM_BATCH_SIZE",
value_parser = clap::value_parser!(usize).range(1..),
help = "Number of queue items to process per batch cycle. Smaller values reduce blockhash expiry risk, larger values reduce per-batch overhead."
)]
pub work_item_batch_size: Option<usize>,
Expand Down
35 changes: 27 additions & 8 deletions forester/src/processor/v1/send_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use solana_sdk::{
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
};
use tokio::time::Instant;
use tokio::{sync::Semaphore, time::Instant};
use tracing::{error, info, trace, warn};

use crate::{
Expand All @@ -40,6 +40,7 @@ struct PreparedBatchData {
struct ChunkSendContext<R: Rpc> {
pool: Arc<SolanaRpcPool<R>>,
max_concurrent_sends: usize,
global_send_semaphore: Arc<Semaphore>,
timeout_deadline: Instant,
cancel_signal: Arc<AtomicBool>,
num_sent_transactions: Arc<AtomicUsize>,
Expand Down Expand Up @@ -171,19 +172,26 @@ pub async fn send_batched_transactions<T: TransactionBuilder + Send + Sync + 'st
.collect();

let num_chunks = chunks.len();
let chunk_concurrency_limit = effective_max_concurrent_sends.min(num_chunks).max(1);
let global_send_semaphore = Arc::new(Semaphore::new(effective_max_concurrent_sends));

info!(
tree = %tree_accounts.merkle_tree,
"Processing {} concurrent chunks of up to {} items each",
num_chunks, work_item_batch_size
"Processing up to {} concurrent chunks ({} total) of up to {} items each",
chunk_concurrency_limit,
num_chunks,
work_item_batch_size
);

let chunk_futures: Vec<_> = chunks
.into_iter()
let mut chunk_results = futures::stream::iter(
chunks
.into_iter()
.map(|work_chunk| {
let pool = Arc::clone(&pool);
let transaction_builder = Arc::clone(&transaction_builder);
let cancel_signal = Arc::clone(&operation_cancel_signal);
let num_sent = Arc::clone(&num_sent_transactions);
let global_send_semaphore = Arc::clone(&global_send_semaphore);
let payer = payer.insecure_clone();
let derivation = *derivation;
let tree_id = tree_accounts.merkle_tree;
Expand Down Expand Up @@ -232,6 +240,7 @@ pub async fn send_batched_transactions<T: TransactionBuilder + Send + Sync + 'st
let send_context = ChunkSendContext {
pool: Arc::clone(&pool),
max_concurrent_sends: effective_max_concurrent_sends,
global_send_semaphore: Arc::clone(&global_send_semaphore),
timeout_deadline,
cancel_signal: Arc::clone(&cancel_signal),
num_sent_transactions: Arc::clone(&num_sent),
Expand All @@ -252,10 +261,10 @@ pub async fn send_batched_transactions<T: TransactionBuilder + Send + Sync + 'st
Ok::<(), ForesterError>(())
}
})
.collect();
)
.buffer_unordered(chunk_concurrency_limit);

let results = futures::future::join_all(chunk_futures).await;
for result in results {
while let Some(result) = chunk_results.next().await {
if let Err(ForesterError::NotEligible) = result {
return Err(ForesterError::NotEligible);
}
Expand Down Expand Up @@ -404,20 +413,30 @@ async fn execute_transaction_chunk_sending<R: Rpc>(
let pool = Arc::clone(&context.pool);
let cancel_signal = Arc::clone(&context.cancel_signal);
let num_sent_transactions = Arc::clone(&context.num_sent_transactions);
let global_send_semaphore = Arc::clone(&context.global_send_semaphore);
let timeout_deadline = context.timeout_deadline;
let max_concurrent_sends = context.max_concurrent_sends;
let confirmation = context.confirmation;
let transaction_send_futures = transactions.into_iter().map(|prepared_transaction| {
let pool_clone = Arc::clone(&pool);
let cancel_signal_clone = Arc::clone(&cancel_signal);
let num_sent_transactions_clone = Arc::clone(&num_sent_transactions);
let global_send_semaphore_clone = Arc::clone(&global_send_semaphore);
let tx_label = prepared_transaction.label().to_string();

async move {
if cancel_signal_clone.load(Ordering::SeqCst) || Instant::now() >= timeout_deadline {
return TransactionSendResult::Cancelled; // Or Timeout
}

let _send_permit = match global_send_semaphore_clone.acquire_owned().await {
Ok(permit) => permit,
Err(_) => {
warn!("Global send semaphore closed unexpectedly");
return TransactionSendResult::Cancelled;
}
};
Comment on lines +432 to +438
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, find and read the target file
wc -l forester/src/processor/v1/send_transaction.rs

Repository: Lightprotocol/light-protocol

Length of output: 122


🏁 Script executed:

# Read the section around lines 433-439 with context
sed -n '400,480p' forester/src/processor/v1/send_transaction.rs | cat -n

Repository: Lightprotocol/light-protocol

Length of output: 4513


🏁 Script executed:

# Look at the broader function to understand timeout_deadline and cancel_signal_clone
sed -n '380,500p' forester/src/processor/v1/send_transaction.rs | cat -n

Repository: Lightprotocol/light-protocol

Length of output: 6574


🏁 Script executed:

# Search for timeout_deadline definition in the file
rg "timeout_deadline" forester/src/processor/v1/send_transaction.rs -B 3 -A 3

Repository: Lightprotocol/light-protocol

Length of output: 3316


🏁 Script executed:

# Search for cancel_signal_clone usage patterns
rg "cancel_signal_clone" forester/src/processor/v1/send_transaction.rs -B 2 -A 2

Repository: Lightprotocol/light-protocol

Length of output: 907


🏁 Script executed:

# Search for other acquire_owned calls in the codebase
rg "acquire_owned" forester/src/ -B 2 -A 2

Repository: Lightprotocol/light-protocol

Length of output: 978


🏁 Script executed:

# Check how the semaphore is created and used
rg "Semaphore" forester/src/processor/v1/send_transaction.rs -B 2 -A 2

Repository: Lightprotocol/light-protocol

Length of output: 623


Wrap semaphore acquisition with a timeout to prevent tasks from exceeding the batch deadline.

The initial check at line 50 guards entry, but a task can block indefinitely in acquire_owned().await waiting for a permit. While blocked, the deadline passes or cancellation is signaled. Once a permit is released, the task wakes and proceeds to spend fees despite already exceeding timeout_deadline.

Suggested fix
-            let _send_permit = match global_send_semaphore_clone.acquire_owned().await {
-                Ok(permit) => permit,
-                Err(_) => {
-                    warn!("Global send semaphore closed unexpectedly");
-                    return TransactionSendResult::Cancelled;
-                }
-            };
+            let _send_permit = match tokio::time::timeout_at(
+                timeout_deadline,
+                global_send_semaphore_clone.acquire_owned(),
+            )
+            .await
+            {
+                Ok(Ok(permit)) => permit,
+                Ok(Err(_)) => {
+                    warn!("Global send semaphore closed unexpectedly");
+                    return TransactionSendResult::Cancelled;
+                }
+                Err(_) => return TransactionSendResult::Timeout,
+            };
+
+            if cancel_signal_clone.load(Ordering::SeqCst) || Instant::now() >= timeout_deadline {
+                return TransactionSendResult::Cancelled;
+            }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@forester/src/processor/v1/send_transaction.rs` around lines 433 - 439, The
acquire_owned().await on global_send_semaphore_clone can block past
timeout_deadline; wrap the semaphore acquisition in a timeout (e.g.,
tokio::time::timeout) or select against the task's cancellation/deadline so if
the deadline elapses you return TransactionSendResult::Cancelled instead of
proceeding; specifically replace the match on
global_send_semaphore_clone.acquire_owned().await with a timed await that checks
timeout_deadline (or uses tokio::time::timeout(duration,
global_send_semaphore_clone.acquire_owned())) and handle Err(elapsed) by
logging/returning TransactionSendResult::Cancelled, while preserving the
existing handling of a closed semaphore.


let tx_signature = prepared_transaction
.signature()
.unwrap_or_default();
Expand Down
Loading