Skip to content

Convert P2P to spawned-concurrency actor#187

Closed
pablodeymo wants to merge 1 commit intomigrate-spawned-concurrency-v0.5from
p2p-spawned
Closed

Convert P2P to spawned-concurrency actor#187
pablodeymo wants to merge 1 commit intomigrate-spawned-concurrency-v0.5from
p2p-spawned

Conversation

@pablodeymo
Copy link
Collaborator

Motivation

The BlockChain actor already uses spawned-concurrency v0.5 (#[protocol], #[actor], #[send_handler]), but the P2P layer uses a manual tokio::select! event loop. This creates architectural inconsistency and a subtle liveness issue: when a handler takes a long time (e.g., blockchain.notify_new_block()), the libp2p swarm stops being polled.

This PR converts the P2P layer to use spawned-concurrency, matching the pattern established by the BlockChain actor.

Description

Split the monolithic P2PServer into two components:

SwarmDriver (plain tokio task)

  • Owns the libp2p::Swarm and polls it continuously via tokio::select!
  • Decodes gossip messages (snappy decompress + SSZ decode) and forwards typed protocol messages to the actor
  • Dispatches req/resp events (Status, BlocksByRoot) to the actor
  • Forwards connection events (established, closed, errors) to the actor
  • Executes SwarmCommands from the actor: publish, dial, send_request, send_response
  • Maintains OutboundRequestId → correlation_id mapping for request tracking

P2PServer (spawned actor)

  • All P2P business logic: peer tracking, request correlation, retries, gossip publishing
  • 18 protocol messages defined via #[protocol] trait (P2pProtocol)
  • Uses send_after for retry scheduling (block fetch backoff, bootnode redialing) instead of tokio::spawn(sleep + channel)
  • Receives decoded events from SwarmDriver and P2PMessages from BlockChain via a bridge task

Architecture diagram

                    P2PMessage (via bridge task)
BlockChain Actor ──────────────────────────────────> P2PServer Actor
                 <── blockchain.notify_*()                │
                                                          │
                     Protocol messages                    │
SwarmDriver ──────────────────────────────────────────────>│
(tokio task) <── SwarmCommand (mpsc channel) ──────────────│
                                                          │
                     send_after (retry self-messages)     │
                 <────────────────────────────────────────┘

Key design decisions

  • ResponseChannelWrapper: libp2p's ResponseChannel is not Clone, but the #[protocol] macro unconditionally derives Clone on generated message structs. Wrapped in Arc<Mutex<Option<_>>> with single-use take() semantics.
  • Correlation IDs: SwarmDriver maps OutboundRequestId → correlation_id; actor maps correlation_id → H256 (block root). Two-level indirection keeps the libp2p type out of the actor.
  • Bridge task: A tokio task in main.rs converts P2PMessage channel messages into actor protocol calls, avoiding circular dependency changes in the blockchain crate.

Files changed

File Change
crates/net/p2p/Cargo.toml Added spawned-concurrency dependency
crates/net/p2p/src/swarm_driver.rs New: SwarmCommand, SwarmDriver, event loop, gossip decoding
crates/net/p2p/src/lib.rs Major rewrite: P2pProtocol trait, P2PServer actor, P2P wrapper, new start_p2p()
crates/net/p2p/src/gossipsub/handler.rs Deleted: logic moved to SwarmDriver + actor handlers
crates/net/p2p/src/gossipsub/mod.rs Simplified exports
crates/net/p2p/src/req_resp/handlers.rs Simplified to just build_status()
crates/net/p2p/src/req_resp/mod.rs Updated exports
bin/ethlambda/src/main.rs Updated init: start_p2p returns (P2P, JoinHandle), bridge task added

Behavioral changes

  • Improved swarm liveness: SwarmDriver polls independently of actor message processing. The swarm is never blocked by slow handlers.
  • Message ordering: Previous biased select prioritized blockchain messages. New design uses FIFO ordering in the actor mailbox (acceptable — the bias was an optimization, not a correctness requirement).
  • Publish error feedback: Previously publish() returned MessageId. Now the SwarmDriver logs errors but doesn't propagate them back (acceptable — current code only logged errors anyway).

How to Test

  1. make fmt && make lint — code compiles and passes clippy
  2. make test — all unit tests + spec tests pass
  3. Local devnet (make run-devnet) — blocks produced, attested, finalized
  4. Multi-client devnet — interop works

…ncurrency actor

  Split the monolithic P2PServer into two components for better separation of
  concerns and improved swarm liveness:

  - SwarmDriver (plain tokio task): Owns the libp2p Swarm, polls it continuously,
    decodes gossip/req-resp events, and forwards typed protocol messages to the actor.
    Executes SwarmCommands (publish, dial, send_request, send_response) from the actor.

  - P2PServer (spawned actor): All P2P business logic — peer tracking, request
    correlation, retries via send_after, gossip publishing. Receives decoded events
    from SwarmDriver and P2PMessages from BlockChain via a bridge task.

  This ensures the swarm keeps being polled even when the actor is busy processing
  messages, matching the actor pattern already used by BlockChain.
@github-actions
Copy link

github-actions bot commented Mar 6, 2026

🤖 Kimi Code Review

Review Summary

This PR refactors the P2P layer from a monolithic event loop into an actor-based architecture using spawned-concurrency. The change is well-motivated for separation of concerns and better concurrency control.

Critical Issues

  1. Race condition in block fetch retry logic (lines 302-308, 486-492 in lib.rs)

    • The retry mechanism uses send_after with exponential backoff, but there's no atomic check between the backoff and retry execution. A block could be received and processed during the backoff period, but the retry will still attempt to fetch it again.
  2. Missing validation in gossip handlers (lines 213-219, 235-241, 257-263 in swarm_driver.rs)

    • The gossip message handlers don't validate message sizes before decompression, creating a potential DoS vector via compressed bombs.
  3. Unbounded channel usage (line 135 in main.rs, line 805 in lib.rs)

    • The P2P bridge uses an unbounded channel (tokio::sync::mpsc::unbounded_channel) which could lead to memory exhaustion under high load.

Security Concerns

  1. ResponseChannelWrapper race condition (lines 52-67 in lib.rs)

    • The ResponseChannelWrapper uses Mutex<Option<_>> but the take() operation isn't atomic with message processing. A malicious peer could potentially trigger multiple responses.
  2. Insufficient peer validation (lines 328-340 in swarm_driver.rs)

    • When handling BlocksByRootResponse, the code only logs mismatched roots but continues processing. This could allow a peer to send invalid blocks.

Performance Issues

  1. Inefficient correlation ID management (lines 245-250 in lib.rs)

    • The correlation_id_map grows unbounded. Failed requests should be cleaned up to prevent memory leaks.
  2. Redundant decompression (lines 213-263 in swarm_driver.rs)

    • Each gossip message is decompressed twice (once for validation, once for processing). Consider validating and processing in one pass.

Code Quality

  1. Error handling inconsistency (multiple files)

    • Some error cases use inspect_err for logging while others use explicit warn!/error! macros. Standardize error handling patterns.
  2. Magic numbers (lines 45-47 in lib.rs)

    • Constants like MAX_FETCH_RETRIES, INITIAL_BACKOFF_MS, BACKOFF_MULTIPLIER should be configurable via CLI or config file.
  3. Missing documentation (lines 1-10 in swarm_driver.rs)

    • The new SwarmDriver struct and its methods lack documentation comments.

Suggested Fixes

  1. Fix race condition in retry logic:
// In handle_retry_block_fetch, add atomic check
if !self.pending_requests.contains_key(&root) {
    trace!(%root, "Block fetch completed during backoff, skipping retry");
    return;
}
// Double-check after acquiring lock
let Some(pending) = self.pending_requests.get_mut(&root) else {
    return;
};
  1. Add message size validation:
// In handle_gossip_message, before decompression
if message.data.len() > MAX_COMPRESSED_MESSAGE_SIZE {
    warn!("Dropping oversized gossip message");
    return;
}
  1. Use bounded channels:
// In main.rs, replace unbounded channel with bounded
let (p2p_tx, mut p2p_rx) = tokio::sync::mpsc::channel(1000);
  1. Clean up correlation ID map:
// In handle_on_req_resp_failure, ensure cleanup
if let Some(root) = self.correlation_id_map.remove(&correlation_id) {
    self.pending_requests.remove(&root);
}

Overall, the actor-based refactoring is a good architectural improvement, but needs attention to race conditions and resource management before merging.


Automated review by Kimi (Moonshot AI) · custom prompt

@github-actions
Copy link

github-actions bot commented Mar 6, 2026

🤖 Codex Code Review

  1. High: block fetch can get stuck permanently on invalid/non-matching BlocksByRoot responses
    At crates/net/p2p/src/lib.rs:404, correlation_id_map is removed up front, then mismatched blocks are only logged in the loop (crates/net/p2p/src/lib.rs:427-crates/net/p2p/src/lib.rs:440).
    If response is non-empty but contains no matching root, pending_requests is never cleared/retried, so future FetchBlock(root) calls are deduped forever.
    Suggested fix: track matched = false; after loop, if !matched, call handle_fetch_failure(requested_root, peer, ctx).

  2. High: Response::Error does not propagate failure to retry logic
    In crates/net/p2p/src/swarm_driver.rs:279-crates/net/p2p/src/swarm_driver.rs:282, error responses are only logged.
    Because request_id is not removed from outbound_request_map and on_req_resp_failure is not called, corresponding pending block fetches may never retry/resolve.
    Suggested fix: on Response::Error, remove request_id from outbound_request_map and notify actor via on_req_resp_failure(...).

  3. Medium: stale correlation entries leak on empty response path
    At crates/net/p2p/src/lib.rs:419-crates/net/p2p/src/lib.rs:421, the same correlation_id is reinserted after response completion.
    Retries create a new correlation id, so old mapping is stale and can accumulate unboundedly under adversarial peers returning empty responses.
    Suggested fix: do not reinsert old correlation_id; retry path should rely on a fresh request/correlation mapping.

  4. Low: avoid panic on poisoned mutex in networking path
    ResponseChannelWrapper::take uses lock().unwrap() at crates/net/p2p/src/lib.rs:86.
    A poisoned mutex would panic and can take down networking.
    Suggested fix: handle lock errors explicitly (if let Ok(mut guard) = ...) and log/drop safely.

No direct changes were made to fork-choice, justification/finalization, state transition, XMSS verification, or SSZ format definitions; this PR is primarily P2P architecture/refactor.

I could not run cargo check in this environment due rustup temp-file permission errors.


Automated review by OpenAI Codex · custom prompt

@github-actions
Copy link

github-actions bot commented Mar 6, 2026

🤖 Claude Code Review

Now I have enough to write the review.


PR Review: Convert P2P to spawned-concurrency actor

The architecture split (SwarmDriver + P2PServer actor) is sound and directly addresses the liveness issue. The separation of concerns is clean, and the two-level correlation ID design (libp2p OutboundRequestIdcorrelation_idH256) is a good way to keep libp2p types out of the actor boundary. The ResponseChannelWrapper workaround for the #[protocol] macro's forced Clone derivation is well-documented.

Bugs

Stale correlation ID re-inserted on empty response — memory leak

crates/net/p2p/src/lib.rs, handle_on_blocks_by_root_response:

let Some(requested_root) = self.correlation_id_map.remove(&correlation_id) else { ... };

if blocks.is_empty() {
    self.correlation_id_map.insert(correlation_id, requested_root);  // ← bug
    self.handle_fetch_failure(requested_root, peer, ctx);
    return;
}

After the remove, correlation_id is re-inserted before calling handle_fetch_failure. But handle_fetch_failure never reads correlation_id_map — it only operates on pending_requests. Meanwhile, the SwarmDriver has already consumed the OutboundRequestId that this correlation_id was mapped from; it will never be used again. The entry is an orphan. Remove the insert call — it achieves nothing and leaks one map entry per empty response received.


Infinite actor self-message loop if SwarmDriver exits

crates/net/p2p/src/lib.rs, handle_retry_peer_redial:

let _ = self.swarm_tx.send(SwarmCommand::Dial(addr.clone()))
    .inspect_err(|e| {
        warn!(%peer_id, %e, "Failed to send redial command, will retry");
        send_after(
            Duration::from_secs(PEER_REDIAL_INTERVAL_SECS),
            ctx.clone(),
            p2p_protocol::RetryPeerRedial { peer_id },
        );
    });

UnboundedSender::send returns Err only when the receiver is dropped (i.e., the SwarmDriver task has exited). In that case the scheduled send_after will fire, call handle_retry_peer_redial again, fail again, schedule again — infinite loop. The actor will spin consuming CPU with no useful work. Should log error! and not reschedule when the channel is closed.


Design concerns

SwarmDriver::run select! can panic if both channels close

loop {
    tokio::select! {
        Some(command) = self.command_rx.recv() => { ... }
        Some(event) = self.swarm.next() => { ... }
    }
}

If both command_rx is closed and swarm.next() returns None, all select! arms are disabled and tokio panics. libp2p's Swarm stream likely never terminates in practice, but the pattern is fragile. Prefer:

event = self.swarm.next() => {
    let Some(event) = event else { break; };
    ...
}

Actor lifecycle is not monitored in main()

bin/ethlambda/src/main.rs:

tokio::select! {
    result = driver_handle => {
        panic!("P2P SwarmDriver has exited unexpectedly: {result:?}");
    }
    ...
}

Only the SwarmDriver JoinHandle is monitored. server.start() returns an ActorRef — if the actor task panics internally, main() never sees it. Messages sent to the actor would then silently be dropped. If the framework exposes the underlying task handle, it should be added to the select!.


Bridge task JoinHandle silently discarded

bin/ethlambda/src/main.rs:

tokio::spawn(async move {
    while let Some(msg) = p2p_rx.recv().await { ... }
});

The handle is dropped. A panic in the bridge task goes undetected, silently cutting the blockchain→P2P publish path. At minimum, capture and add it to the select! alongside driver_handle.


Minor issues

Logging style inconsistency in status handlers (lib.rs, handle_on_status_request / handle_on_status_response):

info!("Received status request from peer {peer}");
info!("Received status response from peer {}",  msg.peer);

Both use string formatting rather than structured tracing fields. Per the codebase conventions:

info!(%peer, finalized_slot, head_slot, "Received status request");

RC dependency in production

Cargo.lock resolves spawned-concurrency to 0.4.5 from git+...?tag=v0.5.0-rc. Using an RC tag in a consensus client warrants a note — worth tracking when the stable release lands.


Pre-existing (not introduced by this PR): INITIAL_BACKOFF_MS = 5 and BACKOFF_MULTIPLIER = 2 produce a 5/10/20/40/80 ms sequence, inconsistent with the CLAUDE.md documentation (10/40/160/640/2560 ms, implying multiplier 4). The constants are unchanged by this PR but worth correcting in a follow-up.


Summary

The primary correctness issues are the stale correlation ID re-insert (memory leak) and the infinite retry loop on channel closure. The lifecycle monitoring gaps are worth addressing before merging. Everything else is minor.


Automated review by Claude (Anthropic) · custom prompt

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 6, 2026

Greptile Summary

This PR converts the P2P layer from a monolithic tokio::select! event loop into a two-component architecture matching the spawned-concurrency pattern used by the BlockChain actor: a plain SwarmDriver tokio task that continuously polls the libp2p swarm, and a P2PServer actor that handles all business logic. This eliminates the liveness issue where slow handlers (e.g. notify_new_block) could stall swarm polling.

Key changes:

  • New SwarmDriver: owns the libp2p Swarm, decodes gossip (snappy decompress + SSZ), dispatches to the actor, and executes SwarmCommands from the actor via an unbounded mpsc channel.
  • New P2PServer actor: 18 protocol messages via #[protocol]/#[actor]/#[send_handler]; uses send_after for retry scheduling (block fetch backoff, bootnode redialing).
  • Bridge task in main.rs: translates P2PMessage channel messages into actor protocol calls, avoiding a circular crate dependency.
  • ResponseChannelWrapper: wraps libp2p's non-Clone ResponseChannel in Arc<Mutex<Option<_>>> for compatibility with the macro-generated Clone requirement.

Two bugs were found:

  • In handle_on_blocks_by_root_response, after removing a correlation_id from the map, it is immediately re-inserted before calling handle_fetch_failure. Since handle_fetch_failure does not use correlation_id_map, and the subsequent retry allocates a fresh correlation ID, the re-inserted entry is never removed — leaking an entry per empty response received.
  • SwarmDriver::run() uses Some(…) patterns in both select arms. When the actor shuts down and drops the command sender, the command arm is permanently disabled. The driver continues running unable to execute any commands, and if the swarm stream also terminates both arms would be disabled simultaneously, causing tokio::select! to panic.

Confidence Score: 3/5

  • PR introduces two concrete bugs in core flow paths that should be fixed before merging.
  • The architectural refactor is sound and the actor model is applied consistently. However, there are two concrete bugs: (1) a correlation_id_map memory leak from a spurious re-insert on empty BlocksByRoot responses, and (2) SwarmDriver::run() does not handle command channel closure, which degrades liveness silently after actor shutdown and could panic if the swarm stream also terminates. Both issues require attention before merging.
  • crates/net/p2p/src/lib.rs (memory leak in handle_on_blocks_by_root_response) and crates/net/p2p/src/swarm_driver.rs (no graceful shutdown on command channel close)

Sequence Diagram

sequenceDiagram
    participant BC as BlockChain Actor
    participant BT as Bridge Task
    participant P2P as P2PServer Actor
    participant SD as SwarmDriver (tokio task)
    participant LIB as libp2p Swarm

    Note over BC,LIB: Publish flow (outbound gossip)
    BC->>BT: P2PMessage via mpsc channel
    BT->>P2P: publish_attestation / publish_block / publish_aggregated_attestation
    P2P->>SD: SwarmCommand::GossipPublish
    SD->>LIB: swarm.behaviour_mut().gossipsub.publish()

    Note over BC,LIB: Inbound gossip flow
    LIB->>SD: SwarmEvent::Behaviour(Gossipsub::Message)
    SD->>SD: decompress + SSZ decode
    SD->>P2P: on_gossip_block / on_gossip_attestation / on_gossip_aggregated_attestation
    P2P->>BC: blockchain.notify_new_block() / notify_new_attestation()

    Note over BC,LIB: Block fetch (outbound req/resp)
    BC->>BT: P2PMessage::FetchBlock(root)
    BT->>P2P: fetch_block(root)
    P2P->>SD: SwarmCommand::SendRequest (correlation_id)
    SD->>LIB: req_resp.send_request_with_protocol()
    SD->>SD: outbound_request_map[OutboundRequestId] = correlation_id
    LIB->>SD: Event::Message::Response (BlocksByRoot)
    SD->>P2P: on_blocks_by_root_response(blocks, peer, correlation_id)
    P2P->>BC: blockchain.notify_new_block(block)

    Note over BC,LIB: Retry scheduling (backoff)
    P2P->>P2P: send_after(backoff, RetryBlockFetch)
    P2P->>SD: SwarmCommand::SendRequest (new correlation_id)

    Note over BC,LIB: Connection events
    LIB->>SD: SwarmEvent::ConnectionEstablished
    SD->>P2P: on_peer_connected(peer_id, direction, first_connection)
    P2P->>SD: SwarmCommand::SendRequest (Status)
Loading

Last reviewed commit: 38b9674

Comment on lines +419 to +425
if blocks.is_empty() {
self.correlation_id_map
.insert(correlation_id, requested_root);
warn!(%peer, "Received empty BlocksByRoot response");
self.handle_fetch_failure(requested_root, peer, ctx);
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale correlation_id re-insertion causes a memory leak.

The entry for correlation_id was just removed at line 414, but it is re-inserted here before calling handle_fetch_failure. handle_fetch_failure does not use correlation_id_map — it only reads pending_requests. When the retry fires via send_after, fetch_block_from_peer allocates a new correlation ID (via next_correlation_id()) and inserts a fresh entry. The re-inserted old correlation_id is never cleaned up, creating an unbounded leak in correlation_id_map for every empty BlocksByRoot response received.

The re-insert serves no purpose and should be removed:

Suggested change
if blocks.is_empty() {
self.correlation_id_map
.insert(correlation_id, requested_root);
warn!(%peer, "Received empty BlocksByRoot response");
self.handle_fetch_failure(requested_root, peer, ctx);
return;
}
if blocks.is_empty() {
warn!(%peer, "Received empty BlocksByRoot response");
self.handle_fetch_failure(requested_root, peer, ctx);
return;
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/lib.rs
Line: 419-425

Comment:
Stale `correlation_id` re-insertion causes a memory leak.

The entry for `correlation_id` was just removed at line 414, but it is re-inserted here before calling `handle_fetch_failure`. `handle_fetch_failure` does **not** use `correlation_id_map` — it only reads `pending_requests`. When the retry fires via `send_after`, `fetch_block_from_peer` allocates a **new** correlation ID (via `next_correlation_id()`) and inserts a fresh entry. The re-inserted old `correlation_id` is never cleaned up, creating an unbounded leak in `correlation_id_map` for every empty BlocksByRoot response received.

The re-insert serves no purpose and should be removed:

```suggestion
        if blocks.is_empty() {
            warn!(%peer, "Received empty BlocksByRoot response");
            self.handle_fetch_failure(requested_root, peer, ctx);
            return;
        }
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +75 to +86
pub(crate) async fn run(mut self) {
loop {
tokio::select! {
Some(command) = self.command_rx.recv() => {
self.handle_command(command);
}
Some(event) = self.swarm.next() => {
self.handle_swarm_event(event);
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SwarmDriver never exits when the command channel is closed.

Both select arms use Some(…) pattern matching. When the P2PServer actor is shut down (e.g., an unrecoverable error), it drops the swarm_cmd_tx sender, causing self.command_rx.recv() to return None. The Some(command) pattern never matches, so that arm becomes permanently disabled.

From that point on:

  1. The driver keeps running but can no longer accept any commands from the actor (publishes, requests, redials, etc.), silently degrading liveness.
  2. If the swarm Stream also terminates, both branches are permanently disabled and tokio::select! will panic.

The command channel closure should be treated as a shutdown signal:

pub(crate) async fn run(mut self) {
    loop {
        tokio::select! {
            command = self.command_rx.recv() => {
                match command {
                    Some(cmd) => self.handle_command(cmd),
                    None => {
                        tracing::info!("SwarmDriver command channel closed, shutting down");
                        break;
                    }
                }
            }
            Some(event) = self.swarm.next() => {
                self.handle_swarm_event(event);
            }
        }
    }
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/swarm_driver.rs
Line: 75-86

Comment:
`SwarmDriver` never exits when the command channel is closed.

Both select arms use `Some(…)` pattern matching. When the P2PServer actor is shut down (e.g., an unrecoverable error), it drops the `swarm_cmd_tx` sender, causing `self.command_rx.recv()` to return `None`. The `Some(command)` pattern never matches, so that arm becomes permanently disabled.

From that point on:
1. The driver keeps running but can no longer accept any commands from the actor (publishes, requests, redials, etc.), silently degrading liveness.
2. If the swarm `Stream` also terminates, **both** branches are permanently disabled and `tokio::select!` will panic.

The command channel closure should be treated as a shutdown signal:

```rust
pub(crate) async fn run(mut self) {
    loop {
        tokio::select! {
            command = self.command_rx.recv() => {
                match command {
                    Some(cmd) => self.handle_command(cmd),
                    None => {
                        tracing::info!("SwarmDriver command channel closed, shutting down");
                        break;
                    }
                }
            }
            Some(event) = self.swarm.next() => {
                self.handle_swarm_event(event);
            }
        }
    }
}
```

How can I resolve this? If you propose a fix, please make it concise.

@pablodeymo pablodeymo closed this Mar 6, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant