Convert P2P to spawned-concurrency actor#187
Convert P2P to spawned-concurrency actor#187pablodeymo wants to merge 1 commit intomigrate-spawned-concurrency-v0.5from
Conversation
…ncurrency actor
Split the monolithic P2PServer into two components for better separation of
concerns and improved swarm liveness:
- SwarmDriver (plain tokio task): Owns the libp2p Swarm, polls it continuously,
decodes gossip/req-resp events, and forwards typed protocol messages to the actor.
Executes SwarmCommands (publish, dial, send_request, send_response) from the actor.
- P2PServer (spawned actor): All P2P business logic — peer tracking, request
correlation, retries via send_after, gossip publishing. Receives decoded events
from SwarmDriver and P2PMessages from BlockChain via a bridge task.
This ensures the swarm keeps being polled even when the actor is busy processing
messages, matching the actor pattern already used by BlockChain.
🤖 Kimi Code ReviewReview SummaryThis PR refactors the P2P layer from a monolithic event loop into an actor-based architecture using Critical Issues
Security Concerns
Performance Issues
Code Quality
Suggested Fixes
// In handle_retry_block_fetch, add atomic check
if !self.pending_requests.contains_key(&root) {
trace!(%root, "Block fetch completed during backoff, skipping retry");
return;
}
// Double-check after acquiring lock
let Some(pending) = self.pending_requests.get_mut(&root) else {
return;
};
// In handle_gossip_message, before decompression
if message.data.len() > MAX_COMPRESSED_MESSAGE_SIZE {
warn!("Dropping oversized gossip message");
return;
}
// In main.rs, replace unbounded channel with bounded
let (p2p_tx, mut p2p_rx) = tokio::sync::mpsc::channel(1000);
// In handle_on_req_resp_failure, ensure cleanup
if let Some(root) = self.correlation_id_map.remove(&correlation_id) {
self.pending_requests.remove(&root);
}Overall, the actor-based refactoring is a good architectural improvement, but needs attention to race conditions and resource management before merging. Automated review by Kimi (Moonshot AI) · custom prompt |
🤖 Codex Code Review
No direct changes were made to fork-choice, justification/finalization, state transition, XMSS verification, or SSZ format definitions; this PR is primarily P2P architecture/refactor. I could not run Automated review by OpenAI Codex · custom prompt |
🤖 Claude Code ReviewNow I have enough to write the review. PR Review: Convert P2P to spawned-concurrency actorThe architecture split (SwarmDriver + P2PServer actor) is sound and directly addresses the liveness issue. The separation of concerns is clean, and the two-level correlation ID design (libp2p BugsStale correlation ID re-inserted on empty response — memory leak
let Some(requested_root) = self.correlation_id_map.remove(&correlation_id) else { ... };
if blocks.is_empty() {
self.correlation_id_map.insert(correlation_id, requested_root); // ← bug
self.handle_fetch_failure(requested_root, peer, ctx);
return;
}After the Infinite actor self-message loop if SwarmDriver exits
let _ = self.swarm_tx.send(SwarmCommand::Dial(addr.clone()))
.inspect_err(|e| {
warn!(%peer_id, %e, "Failed to send redial command, will retry");
send_after(
Duration::from_secs(PEER_REDIAL_INTERVAL_SECS),
ctx.clone(),
p2p_protocol::RetryPeerRedial { peer_id },
);
});
Design concerns
loop {
tokio::select! {
Some(command) = self.command_rx.recv() => { ... }
Some(event) = self.swarm.next() => { ... }
}
}If both event = self.swarm.next() => {
let Some(event) = event else { break; };
...
}Actor lifecycle is not monitored in
tokio::select! {
result = driver_handle => {
panic!("P2P SwarmDriver has exited unexpectedly: {result:?}");
}
...
}Only the SwarmDriver Bridge task JoinHandle silently discarded
tokio::spawn(async move {
while let Some(msg) = p2p_rx.recv().await { ... }
});The handle is dropped. A panic in the bridge task goes undetected, silently cutting the blockchain→P2P publish path. At minimum, capture and add it to the Minor issuesLogging style inconsistency in status handlers ( info!("Received status request from peer {peer}");
info!("Received status response from peer {}", msg.peer);Both use string formatting rather than structured tracing fields. Per the codebase conventions: info!(%peer, finalized_slot, head_slot, "Received status request");RC dependency in production
Pre-existing (not introduced by this PR): SummaryThe primary correctness issues are the stale correlation ID re-insert (memory leak) and the infinite retry loop on channel closure. The lifecycle monitoring gaps are worth addressing before merging. Everything else is minor. Automated review by Claude (Anthropic) · custom prompt |
Greptile SummaryThis PR converts the P2P layer from a monolithic Key changes:
Two bugs were found:
Confidence Score: 3/5
Sequence DiagramsequenceDiagram
participant BC as BlockChain Actor
participant BT as Bridge Task
participant P2P as P2PServer Actor
participant SD as SwarmDriver (tokio task)
participant LIB as libp2p Swarm
Note over BC,LIB: Publish flow (outbound gossip)
BC->>BT: P2PMessage via mpsc channel
BT->>P2P: publish_attestation / publish_block / publish_aggregated_attestation
P2P->>SD: SwarmCommand::GossipPublish
SD->>LIB: swarm.behaviour_mut().gossipsub.publish()
Note over BC,LIB: Inbound gossip flow
LIB->>SD: SwarmEvent::Behaviour(Gossipsub::Message)
SD->>SD: decompress + SSZ decode
SD->>P2P: on_gossip_block / on_gossip_attestation / on_gossip_aggregated_attestation
P2P->>BC: blockchain.notify_new_block() / notify_new_attestation()
Note over BC,LIB: Block fetch (outbound req/resp)
BC->>BT: P2PMessage::FetchBlock(root)
BT->>P2P: fetch_block(root)
P2P->>SD: SwarmCommand::SendRequest (correlation_id)
SD->>LIB: req_resp.send_request_with_protocol()
SD->>SD: outbound_request_map[OutboundRequestId] = correlation_id
LIB->>SD: Event::Message::Response (BlocksByRoot)
SD->>P2P: on_blocks_by_root_response(blocks, peer, correlation_id)
P2P->>BC: blockchain.notify_new_block(block)
Note over BC,LIB: Retry scheduling (backoff)
P2P->>P2P: send_after(backoff, RetryBlockFetch)
P2P->>SD: SwarmCommand::SendRequest (new correlation_id)
Note over BC,LIB: Connection events
LIB->>SD: SwarmEvent::ConnectionEstablished
SD->>P2P: on_peer_connected(peer_id, direction, first_connection)
P2P->>SD: SwarmCommand::SendRequest (Status)
Last reviewed commit: 38b9674 |
| if blocks.is_empty() { | ||
| self.correlation_id_map | ||
| .insert(correlation_id, requested_root); | ||
| warn!(%peer, "Received empty BlocksByRoot response"); | ||
| self.handle_fetch_failure(requested_root, peer, ctx); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Stale correlation_id re-insertion causes a memory leak.
The entry for correlation_id was just removed at line 414, but it is re-inserted here before calling handle_fetch_failure. handle_fetch_failure does not use correlation_id_map — it only reads pending_requests. When the retry fires via send_after, fetch_block_from_peer allocates a new correlation ID (via next_correlation_id()) and inserts a fresh entry. The re-inserted old correlation_id is never cleaned up, creating an unbounded leak in correlation_id_map for every empty BlocksByRoot response received.
The re-insert serves no purpose and should be removed:
| if blocks.is_empty() { | |
| self.correlation_id_map | |
| .insert(correlation_id, requested_root); | |
| warn!(%peer, "Received empty BlocksByRoot response"); | |
| self.handle_fetch_failure(requested_root, peer, ctx); | |
| return; | |
| } | |
| if blocks.is_empty() { | |
| warn!(%peer, "Received empty BlocksByRoot response"); | |
| self.handle_fetch_failure(requested_root, peer, ctx); | |
| return; | |
| } |
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/lib.rs
Line: 419-425
Comment:
Stale `correlation_id` re-insertion causes a memory leak.
The entry for `correlation_id` was just removed at line 414, but it is re-inserted here before calling `handle_fetch_failure`. `handle_fetch_failure` does **not** use `correlation_id_map` — it only reads `pending_requests`. When the retry fires via `send_after`, `fetch_block_from_peer` allocates a **new** correlation ID (via `next_correlation_id()`) and inserts a fresh entry. The re-inserted old `correlation_id` is never cleaned up, creating an unbounded leak in `correlation_id_map` for every empty BlocksByRoot response received.
The re-insert serves no purpose and should be removed:
```suggestion
if blocks.is_empty() {
warn!(%peer, "Received empty BlocksByRoot response");
self.handle_fetch_failure(requested_root, peer, ctx);
return;
}
```
How can I resolve this? If you propose a fix, please make it concise.| pub(crate) async fn run(mut self) { | ||
| loop { | ||
| tokio::select! { | ||
| Some(command) = self.command_rx.recv() => { | ||
| self.handle_command(command); | ||
| } | ||
| Some(event) = self.swarm.next() => { | ||
| self.handle_swarm_event(event); | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
SwarmDriver never exits when the command channel is closed.
Both select arms use Some(…) pattern matching. When the P2PServer actor is shut down (e.g., an unrecoverable error), it drops the swarm_cmd_tx sender, causing self.command_rx.recv() to return None. The Some(command) pattern never matches, so that arm becomes permanently disabled.
From that point on:
- The driver keeps running but can no longer accept any commands from the actor (publishes, requests, redials, etc.), silently degrading liveness.
- If the swarm
Streamalso terminates, both branches are permanently disabled andtokio::select!will panic.
The command channel closure should be treated as a shutdown signal:
pub(crate) async fn run(mut self) {
loop {
tokio::select! {
command = self.command_rx.recv() => {
match command {
Some(cmd) => self.handle_command(cmd),
None => {
tracing::info!("SwarmDriver command channel closed, shutting down");
break;
}
}
}
Some(event) = self.swarm.next() => {
self.handle_swarm_event(event);
}
}
}
}Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/swarm_driver.rs
Line: 75-86
Comment:
`SwarmDriver` never exits when the command channel is closed.
Both select arms use `Some(…)` pattern matching. When the P2PServer actor is shut down (e.g., an unrecoverable error), it drops the `swarm_cmd_tx` sender, causing `self.command_rx.recv()` to return `None`. The `Some(command)` pattern never matches, so that arm becomes permanently disabled.
From that point on:
1. The driver keeps running but can no longer accept any commands from the actor (publishes, requests, redials, etc.), silently degrading liveness.
2. If the swarm `Stream` also terminates, **both** branches are permanently disabled and `tokio::select!` will panic.
The command channel closure should be treated as a shutdown signal:
```rust
pub(crate) async fn run(mut self) {
loop {
tokio::select! {
command = self.command_rx.recv() => {
match command {
Some(cmd) => self.handle_command(cmd),
None => {
tracing::info!("SwarmDriver command channel closed, shutting down");
break;
}
}
}
Some(event) = self.swarm.next() => {
self.handle_swarm_event(event);
}
}
}
}
```
How can I resolve this? If you propose a fix, please make it concise.
Motivation
The BlockChain actor already uses spawned-concurrency v0.5 (
#[protocol],#[actor],#[send_handler]), but the P2P layer uses a manualtokio::select!event loop. This creates architectural inconsistency and a subtle liveness issue: when a handler takes a long time (e.g.,blockchain.notify_new_block()), the libp2p swarm stops being polled.This PR converts the P2P layer to use spawned-concurrency, matching the pattern established by the BlockChain actor.
Description
Split the monolithic
P2PServerinto two components:SwarmDriver (plain tokio task)
libp2p::Swarmand polls it continuously viatokio::select!SwarmCommands from the actor: publish, dial, send_request, send_responseOutboundRequestId → correlation_idmapping for request trackingP2PServer (spawned actor)
#[protocol]trait (P2pProtocol)send_afterfor retry scheduling (block fetch backoff, bootnode redialing) instead oftokio::spawn(sleep + channel)P2PMessages from BlockChain via a bridge taskArchitecture diagram
Key design decisions
ResponseChannelis notClone, but the#[protocol]macro unconditionally derivesCloneon generated message structs. Wrapped inArc<Mutex<Option<_>>>with single-usetake()semantics.OutboundRequestId → correlation_id; actor mapscorrelation_id → H256(block root). Two-level indirection keeps the libp2p type out of the actor.main.rsconvertsP2PMessagechannel messages into actor protocol calls, avoiding circular dependency changes in the blockchain crate.Files changed
crates/net/p2p/Cargo.tomlspawned-concurrencydependencycrates/net/p2p/src/swarm_driver.rscrates/net/p2p/src/lib.rsstart_p2p()crates/net/p2p/src/gossipsub/handler.rscrates/net/p2p/src/gossipsub/mod.rscrates/net/p2p/src/req_resp/handlers.rsbuild_status()crates/net/p2p/src/req_resp/mod.rsbin/ethlambda/src/main.rsstart_p2preturns(P2P, JoinHandle), bridge task addedBehavioral changes
biasedselect prioritized blockchain messages. New design uses FIFO ordering in the actor mailbox (acceptable — the bias was an optimization, not a correctness requirement).publish()returnedMessageId. Now the SwarmDriver logs errors but doesn't propagate them back (acceptable — current code only logged errors anyway).How to Test
make fmt && make lint— code compiles and passes clippymake test— all unit tests + spec tests passmake run-devnet) — blocks produced, attested, finalized