Skip to content

fix: global v1 send concurrency across chunks#2376

Open
SwenSchaeferjohann wants to merge 2 commits intomainfrom
fix/v1-global-send-concurrency-cap
Open

fix: global v1 send concurrency across chunks#2376
SwenSchaeferjohann wants to merge 2 commits intomainfrom
fix/v1-global-send-concurrency-cap

Conversation

@SwenSchaeferjohann
Copy link
Copy Markdown
Contributor

@SwenSchaeferjohann SwenSchaeferjohann commented Apr 29, 2026

Summary by CodeRabbit

  • Refactor
    • Transaction sending now uses global concurrency management for improved resource handling across all transaction batches.
    • Concurrent transaction processing has been enhanced with buffered stream patterns instead of immediate batch processing.
    • Transaction send requests are now individually managed with permit-based allocation.
    • Improved handling for failed or closed transaction sends.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 29, 2026

Warning

Rate limit exceeded

@SwenSchaeferjohann has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 11 minutes and 23 seconds before requesting another review.

To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: ab1ef206-1ebb-40ee-81b1-269b12f03232

📥 Commits

Reviewing files that changed from the base of the PR and between 269ae21 and 3d629b3.

📒 Files selected for processing (2)
  • forester/src/cli.rs
  • forester/src/processor/v1/send_transaction.rs
📝 Walkthrough

Walkthrough

Introduces a global Semaphore to enforce an upper bound on concurrently in-flight transaction sends across all chunks. The semaphore is passed through ChunkSendContext, and permits are acquired before each send. The chunk processing switches from join_all to buffered stream consumption, maintaining active chunk tasks up to the configured concurrency limit. Semaphore exhaustion triggers Cancelled status.

Changes

Cohort / File(s) Summary
Concurrency Control Enhancement
forester/src/processor/v1/send_transaction.rs
Adds global Arc<Semaphore> to cap in-flight transaction sends; threads semaphore through ChunkSendContext; replaces join_all with buffered unordered stream for chunk-level concurrency; acquires permit per transaction send; handles semaphore exhaustion by marking sends as Cancelled; updates logging for chunk concurrency limit and total chunk count.

Sequence Diagram

sequenceDiagram
    participant TxQueue as Transaction Queue
    participant ChunkProcessor as Chunk Processor
    participant Semaphore as Global Semaphore
    participant TxSender as Transaction Sender
    participant Network as Network/Destination

    rect rgba(100, 150, 200, 0.5)
    Note over ChunkProcessor,Semaphore: New Concurrency Control Flow
    
    ChunkProcessor->>TxQueue: Iterate transactions in chunk
    TxQueue-->>ChunkProcessor: Next transaction
    ChunkProcessor->>Semaphore: Acquire permit
    
    alt Permit available
        Semaphore-->>ChunkProcessor: Grant permit
        ChunkProcessor->>TxSender: Execute send
        TxSender->>Network: Send transaction
        Network-->>TxSender: Response/ACK
        TxSender-->>ChunkProcessor: Complete
        ChunkProcessor->>Semaphore: Release permit
    else Semaphore exhausted/closed
        Semaphore-->>ChunkProcessor: Unavailable
        ChunkProcessor->>TxSender: Mark Cancelled
        TxSender-->>ChunkProcessor: Status: Cancelled
    end
    end
Loading

Estimated Code Review Effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

📨✨ Semaphores dance, permits flow free,
Capping transactions in harmony!
No flood of sends through channels wide,
Just ordered streams with measured stride.
Concurrent grace, controlled at last! 🎚️

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: introducing global send concurrency control across chunks in the v1 processor, which aligns directly with the implementation of a global Semaphore for capping in-flight transaction sends.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 70.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/v1-global-send-concurrency-cap

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 11 minutes and 23 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
forester/src/processor/v1/send_transaction.rs (1)

268-272: ⚠️ Potential issue | 🟠 Major

Propagate chunk failures instead of treating them as success.

Only ForesterError::NotEligible is handled here. Every other Err(...) from the chunk future is ignored, including RPC/blockhash setup failures from Lines 210-213, so this function can return a successful send count for a partially failed batch.

Suggested fix
-    while let Some(result) = chunk_results.next().await {
-        if let Err(ForesterError::NotEligible) = result {
-            return Err(ForesterError::NotEligible);
-        }
-    }
+    let mut first_err = None;
+    while let Some(result) = chunk_results.next().await {
+        if let Err(err) = result {
+            if err.is_forester_not_eligible() {
+                operation_cancel_signal.store(true, Ordering::SeqCst);
+            }
+            first_err.get_or_insert(err);
+        }
+    }
+    if let Some(err) = first_err {
+        return Err(err);
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@forester/src/processor/v1/send_transaction.rs` around lines 268 - 272, The
loop over chunk_results currently only returns on ForesterError::NotEligible and
silently ignores all other Err variants; update the match in the while let
Some(result) = chunk_results.next().await loop so any Err(e) (other than
NotEligible if you want special handling) is propagated back (return Err(e))
instead of being treated as success; locate the loop using the chunk_results
identifier in send_transaction (and references to ForesterError::NotEligible and
the earlier RPC/blockhash setup errors) and change the match to return any
non-Ok error to ensure chunk failures are propagated.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@forester/src/processor/v1/send_transaction.rs`:
- Around line 433-439: The acquire_owned().await on global_send_semaphore_clone
can block past timeout_deadline; wrap the semaphore acquisition in a timeout
(e.g., tokio::time::timeout) or select against the task's cancellation/deadline
so if the deadline elapses you return TransactionSendResult::Cancelled instead
of proceeding; specifically replace the match on
global_send_semaphore_clone.acquire_owned().await with a timed await that checks
timeout_deadline (or uses tokio::time::timeout(duration,
global_send_semaphore_clone.acquire_owned())) and handle Err(elapsed) by
logging/returning TransactionSendResult::Cancelled, while preserving the
existing handling of a closed semaphore.

---

Outside diff comments:
In `@forester/src/processor/v1/send_transaction.rs`:
- Around line 268-272: The loop over chunk_results currently only returns on
ForesterError::NotEligible and silently ignores all other Err variants; update
the match in the while let Some(result) = chunk_results.next().await loop so any
Err(e) (other than NotEligible if you want special handling) is propagated back
(return Err(e)) instead of being treated as success; locate the loop using the
chunk_results identifier in send_transaction (and references to
ForesterError::NotEligible and the earlier RPC/blockhash setup errors) and
change the match to return any non-Ok error to ensure chunk failures are
propagated.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 3ec7beb0-7cf7-4f10-a398-6ecdca5b6356

📥 Commits

Reviewing files that changed from the base of the PR and between 0687860 and 269ae21.

📒 Files selected for processing (1)
  • forester/src/processor/v1/send_transaction.rs

Comment on lines +433 to +439
let _send_permit = match global_send_semaphore_clone.acquire_owned().await {
Ok(permit) => permit,
Err(_) => {
warn!("Global send semaphore closed unexpectedly");
return TransactionSendResult::Cancelled;
}
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, find and read the target file
wc -l forester/src/processor/v1/send_transaction.rs

Repository: Lightprotocol/light-protocol

Length of output: 122


🏁 Script executed:

# Read the section around lines 433-439 with context
sed -n '400,480p' forester/src/processor/v1/send_transaction.rs | cat -n

Repository: Lightprotocol/light-protocol

Length of output: 4513


🏁 Script executed:

# Look at the broader function to understand timeout_deadline and cancel_signal_clone
sed -n '380,500p' forester/src/processor/v1/send_transaction.rs | cat -n

Repository: Lightprotocol/light-protocol

Length of output: 6574


🏁 Script executed:

# Search for timeout_deadline definition in the file
rg "timeout_deadline" forester/src/processor/v1/send_transaction.rs -B 3 -A 3

Repository: Lightprotocol/light-protocol

Length of output: 3316


🏁 Script executed:

# Search for cancel_signal_clone usage patterns
rg "cancel_signal_clone" forester/src/processor/v1/send_transaction.rs -B 2 -A 2

Repository: Lightprotocol/light-protocol

Length of output: 907


🏁 Script executed:

# Search for other acquire_owned calls in the codebase
rg "acquire_owned" forester/src/ -B 2 -A 2

Repository: Lightprotocol/light-protocol

Length of output: 978


🏁 Script executed:

# Check how the semaphore is created and used
rg "Semaphore" forester/src/processor/v1/send_transaction.rs -B 2 -A 2

Repository: Lightprotocol/light-protocol

Length of output: 623


Wrap semaphore acquisition with a timeout to prevent tasks from exceeding the batch deadline.

The initial check at line 50 guards entry, but a task can block indefinitely in acquire_owned().await waiting for a permit. While blocked, the deadline passes or cancellation is signaled. Once a permit is released, the task wakes and proceeds to spend fees despite already exceeding timeout_deadline.

Suggested fix
-            let _send_permit = match global_send_semaphore_clone.acquire_owned().await {
-                Ok(permit) => permit,
-                Err(_) => {
-                    warn!("Global send semaphore closed unexpectedly");
-                    return TransactionSendResult::Cancelled;
-                }
-            };
+            let _send_permit = match tokio::time::timeout_at(
+                timeout_deadline,
+                global_send_semaphore_clone.acquire_owned(),
+            )
+            .await
+            {
+                Ok(Ok(permit)) => permit,
+                Ok(Err(_)) => {
+                    warn!("Global send semaphore closed unexpectedly");
+                    return TransactionSendResult::Cancelled;
+                }
+                Err(_) => return TransactionSendResult::Timeout,
+            };
+
+            if cancel_signal_clone.load(Ordering::SeqCst) || Instant::now() >= timeout_deadline {
+                return TransactionSendResult::Cancelled;
+            }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@forester/src/processor/v1/send_transaction.rs` around lines 433 - 439, The
acquire_owned().await on global_send_semaphore_clone can block past
timeout_deadline; wrap the semaphore acquisition in a timeout (e.g.,
tokio::time::timeout) or select against the task's cancellation/deadline so if
the deadline elapses you return TransactionSendResult::Cancelled instead of
proceeding; specifically replace the match on
global_send_semaphore_clone.acquire_owned().await with a timed await that checks
timeout_deadline (or uses tokio::time::timeout(duration,
global_send_semaphore_clone.acquire_owned())) and handle Err(elapsed) by
logging/returning TransactionSendResult::Cancelled, while preserving the
existing handling of a closed semaphore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant