fix: global v1 send concurrency across chunks#2376
fix: global v1 send concurrency across chunks#2376SwenSchaeferjohann wants to merge 2 commits intomainfrom
Conversation
|
Warning Rate limit exceeded
To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughIntroduces a global Changes
Sequence DiagramsequenceDiagram
participant TxQueue as Transaction Queue
participant ChunkProcessor as Chunk Processor
participant Semaphore as Global Semaphore
participant TxSender as Transaction Sender
participant Network as Network/Destination
rect rgba(100, 150, 200, 0.5)
Note over ChunkProcessor,Semaphore: New Concurrency Control Flow
ChunkProcessor->>TxQueue: Iterate transactions in chunk
TxQueue-->>ChunkProcessor: Next transaction
ChunkProcessor->>Semaphore: Acquire permit
alt Permit available
Semaphore-->>ChunkProcessor: Grant permit
ChunkProcessor->>TxSender: Execute send
TxSender->>Network: Send transaction
Network-->>TxSender: Response/ACK
TxSender-->>ChunkProcessor: Complete
ChunkProcessor->>Semaphore: Release permit
else Semaphore exhausted/closed
Semaphore-->>ChunkProcessor: Unavailable
ChunkProcessor->>TxSender: Mark Cancelled
TxSender-->>ChunkProcessor: Status: Cancelled
end
end
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 11 minutes and 23 seconds.Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
forester/src/processor/v1/send_transaction.rs (1)
268-272:⚠️ Potential issue | 🟠 MajorPropagate chunk failures instead of treating them as success.
Only
ForesterError::NotEligibleis handled here. Every otherErr(...)from the chunk future is ignored, including RPC/blockhash setup failures from Lines 210-213, so this function can return a successful send count for a partially failed batch.Suggested fix
- while let Some(result) = chunk_results.next().await { - if let Err(ForesterError::NotEligible) = result { - return Err(ForesterError::NotEligible); - } - } + let mut first_err = None; + while let Some(result) = chunk_results.next().await { + if let Err(err) = result { + if err.is_forester_not_eligible() { + operation_cancel_signal.store(true, Ordering::SeqCst); + } + first_err.get_or_insert(err); + } + } + if let Some(err) = first_err { + return Err(err); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@forester/src/processor/v1/send_transaction.rs` around lines 268 - 272, The loop over chunk_results currently only returns on ForesterError::NotEligible and silently ignores all other Err variants; update the match in the while let Some(result) = chunk_results.next().await loop so any Err(e) (other than NotEligible if you want special handling) is propagated back (return Err(e)) instead of being treated as success; locate the loop using the chunk_results identifier in send_transaction (and references to ForesterError::NotEligible and the earlier RPC/blockhash setup errors) and change the match to return any non-Ok error to ensure chunk failures are propagated.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@forester/src/processor/v1/send_transaction.rs`:
- Around line 433-439: The acquire_owned().await on global_send_semaphore_clone
can block past timeout_deadline; wrap the semaphore acquisition in a timeout
(e.g., tokio::time::timeout) or select against the task's cancellation/deadline
so if the deadline elapses you return TransactionSendResult::Cancelled instead
of proceeding; specifically replace the match on
global_send_semaphore_clone.acquire_owned().await with a timed await that checks
timeout_deadline (or uses tokio::time::timeout(duration,
global_send_semaphore_clone.acquire_owned())) and handle Err(elapsed) by
logging/returning TransactionSendResult::Cancelled, while preserving the
existing handling of a closed semaphore.
---
Outside diff comments:
In `@forester/src/processor/v1/send_transaction.rs`:
- Around line 268-272: The loop over chunk_results currently only returns on
ForesterError::NotEligible and silently ignores all other Err variants; update
the match in the while let Some(result) = chunk_results.next().await loop so any
Err(e) (other than NotEligible if you want special handling) is propagated back
(return Err(e)) instead of being treated as success; locate the loop using the
chunk_results identifier in send_transaction (and references to
ForesterError::NotEligible and the earlier RPC/blockhash setup errors) and
change the match to return any non-Ok error to ensure chunk failures are
propagated.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 3ec7beb0-7cf7-4f10-a398-6ecdca5b6356
📒 Files selected for processing (1)
forester/src/processor/v1/send_transaction.rs
| let _send_permit = match global_send_semaphore_clone.acquire_owned().await { | ||
| Ok(permit) => permit, | ||
| Err(_) => { | ||
| warn!("Global send semaphore closed unexpectedly"); | ||
| return TransactionSendResult::Cancelled; | ||
| } | ||
| }; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, find and read the target file
wc -l forester/src/processor/v1/send_transaction.rsRepository: Lightprotocol/light-protocol
Length of output: 122
🏁 Script executed:
# Read the section around lines 433-439 with context
sed -n '400,480p' forester/src/processor/v1/send_transaction.rs | cat -nRepository: Lightprotocol/light-protocol
Length of output: 4513
🏁 Script executed:
# Look at the broader function to understand timeout_deadline and cancel_signal_clone
sed -n '380,500p' forester/src/processor/v1/send_transaction.rs | cat -nRepository: Lightprotocol/light-protocol
Length of output: 6574
🏁 Script executed:
# Search for timeout_deadline definition in the file
rg "timeout_deadline" forester/src/processor/v1/send_transaction.rs -B 3 -A 3Repository: Lightprotocol/light-protocol
Length of output: 3316
🏁 Script executed:
# Search for cancel_signal_clone usage patterns
rg "cancel_signal_clone" forester/src/processor/v1/send_transaction.rs -B 2 -A 2Repository: Lightprotocol/light-protocol
Length of output: 907
🏁 Script executed:
# Search for other acquire_owned calls in the codebase
rg "acquire_owned" forester/src/ -B 2 -A 2Repository: Lightprotocol/light-protocol
Length of output: 978
🏁 Script executed:
# Check how the semaphore is created and used
rg "Semaphore" forester/src/processor/v1/send_transaction.rs -B 2 -A 2Repository: Lightprotocol/light-protocol
Length of output: 623
Wrap semaphore acquisition with a timeout to prevent tasks from exceeding the batch deadline.
The initial check at line 50 guards entry, but a task can block indefinitely in acquire_owned().await waiting for a permit. While blocked, the deadline passes or cancellation is signaled. Once a permit is released, the task wakes and proceeds to spend fees despite already exceeding timeout_deadline.
Suggested fix
- let _send_permit = match global_send_semaphore_clone.acquire_owned().await {
- Ok(permit) => permit,
- Err(_) => {
- warn!("Global send semaphore closed unexpectedly");
- return TransactionSendResult::Cancelled;
- }
- };
+ let _send_permit = match tokio::time::timeout_at(
+ timeout_deadline,
+ global_send_semaphore_clone.acquire_owned(),
+ )
+ .await
+ {
+ Ok(Ok(permit)) => permit,
+ Ok(Err(_)) => {
+ warn!("Global send semaphore closed unexpectedly");
+ return TransactionSendResult::Cancelled;
+ }
+ Err(_) => return TransactionSendResult::Timeout,
+ };
+
+ if cancel_signal_clone.load(Ordering::SeqCst) || Instant::now() >= timeout_deadline {
+ return TransactionSendResult::Cancelled;
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@forester/src/processor/v1/send_transaction.rs` around lines 433 - 439, The
acquire_owned().await on global_send_semaphore_clone can block past
timeout_deadline; wrap the semaphore acquisition in a timeout (e.g.,
tokio::time::timeout) or select against the task's cancellation/deadline so if
the deadline elapses you return TransactionSendResult::Cancelled instead of
proceeding; specifically replace the match on
global_send_semaphore_clone.acquire_owned().await with a timed await that checks
timeout_deadline (or uses tokio::time::timeout(duration,
global_send_semaphore_clone.acquire_owned())) and handle Err(elapsed) by
logging/returning TransactionSendResult::Cancelled, while preserving the
existing handling of a closed semaphore.
Summary by CodeRabbit