Migrate P2P module from event loop to spawned-concurrency actor#188
Migrate P2P module from event loop to spawned-concurrency actor#188pablodeymo wants to merge 1 commit intomigrate-spawned-concurrency-v0.5from
Conversation
…ncy actor
Replace the hand-rolled event loop in start_p2p() with a proper GenServer
actor (P2PServer) using the spawned-concurrency framework, matching how
BlockChainServer already works.
Key changes:
- New ethlambda-network-api crate with shared message types (PublishBlock,
FetchBlock, NewBlock, etc.) to break the dependency cycle between
blockchain and P2P crates
- SwarmAdapter: thin I/O bridge that owns the libp2p Swarm and communicates
via channels (SwarmCommand/SwarmEvent), keeping select! isolated to infra
- P2P actor receives swarm events via spawn_listener, network-api messages
via manual Handler<M> impls, and retry scheduling via send_after
- BlockChain protocol simplified: network-api messages use manual Handler
impls instead of protocol-wrapped types, enabling Recipient<M> across
actor boundaries
- Actors wired at startup via InitP2P/InitBlockChain init messages carrying
Recipient handles, replacing mpsc channels
🤖 Kimi Code ReviewSummaryThe PR refactors the P2P layer into a proper actor model using Issues found1. Race condition on actor initializationFile:
2. Unbounded channel usage in
|
🤖 Codex Code Review
Could not run Automated review by OpenAI Codex · custom prompt |
🤖 Claude Code ReviewNow I have everything I need for a thorough review. Let me compile the findings. PR #188 Review: Migrate P2P module to spawned-concurrency actorOverall: The architectural direction is sound — breaking the circular crate dependency, isolating the Significant IssuesGossipsub publish: success is always logged regardless of outcome
// Before (correct — only logs on success):
let _ = server.swarm.behaviour_mut().gossipsub
.publish(topic, compressed)
.inspect(|_| info!(..., "Published attestation to gossipsub"))
.inspect_err(|err| warn!(..., "Failed to publish attestation to gossipsub"));
// After (always logs success):
server.swarm_handle.publish(server.attestation_topic.clone(), compressed);
info!(..., "Published attestation to gossipsub"); // fires even on failureThe error is now only captured deep in Bootnode redial regression on synchronous In the old let _ = server.swarm.dial(addr.clone()).inspect_err(|e| {
warn!(%peer_id, %e, "Failed to redial bootnode, will retry");
schedule_peer_redial(server.retry_tx.clone(), peer_id); // re-schedules on error
});In the new self.swarm_handle.dial(addr.clone());
Process liveness monitoring removed
Old: tokio::select! {
result = p2p_handle => { panic!("P2P node task has exited unexpectedly: {result:?}"); }
_ = tokio::signal::ctrl_c() => {}
}New: tokio::signal::ctrl_c().await.ok();If the P2P actor, swarm adapter task, or blockchain actor terminates unexpectedly (panic, channel closure), the process now continues silently, appearing healthy to operators while producing no blocks or attestations. At minimum, consider monitoring actor liveness or restoring a panic on unexpected shutdown. Minor Issues
-pub(crate) struct P2PServer {
+pub struct P2PServer {
-struct BlockChainServer {
+pub struct BlockChainServer {The actor state structs are implementation details. Making them
Old: .inspect_err(|err| warn!(%peer, ?err, "Failed to send BlocksByRoot response"));New (in swarm adapter): .inspect_err(|err| warn!(?err, "Swarm adapter: send_response failed"));The peer ID is dropped from the error log, making it harder to correlate with peer state in production.
impl Handler<FetchBlock> for P2PServer {
async fn handle(&mut self, msg: FetchBlock, _ctx: &Context<Self>) {
...
fetch_block_from_peer(self, root).await; // return value ignored
}
}
What Looks Good
Automated review by Claude (Anthropic) · custom prompt |
Greptile SummaryThis PR migrates the P2P module from a hand-rolled Key changes:
Confidence Score: 3/5
|
| Filename | Overview |
|---|---|
| bin/ethlambda/src/main.rs | Replaces the manual task-handle select loop with actor wiring; loses P2P crash detection and silently discards init-message send errors. |
| crates/net/p2p/src/swarm_adapter.rs | New file — clean I/O bridge isolating the select! loop; swarm_loop exits silently on failure with no log. |
| crates/net/p2p/src/lib.rs | Rewrites P2P from event-loop to actor pattern; good separation of protocol and manual Handler impls; architecture is sound. |
| crates/net/p2p/src/gossipsub/handler.rs | Inbound forwarding correctly migrated to Recipient; success info logs for all three publish functions fire before the gossipsub operation is executed or confirmed. |
| crates/net/p2p/src/req_resp/handlers.rs | Correctly migrated to SwarmHandle and send_after; replaces unwrap on send_response with a channel send; logic preserved. |
| crates/net/api/src/lib.rs | New shared message-type crate; cleanly breaks the circular dependency; Init messages correctly derive Clone. |
| crates/blockchain/src/lib.rs | Replaces mpsc channel with Option<Recipient> fields; manual Handler impls are correct; P2P fields guarded consistently. |
Sequence Diagram
sequenceDiagram
participant Main
participant BlockChain as BlockChain (actor)
participant P2P as P2PServer (actor)
participant SwarmAdapter as SwarmAdapter (task)
participant Swarm as libp2p Swarm
Main->>SwarmAdapter: start_swarm_adapter(swarm)
SwarmAdapter-->>Main: (SwarmEventStream, SwarmHandle)
Main->>P2P: P2P::spawn(built, store)
Note over P2P: spawn_listener(swarm_stream)
Main->>BlockChain: BlockChain::spawn(store, keys)
Main->>BlockChain: send(InitP2P { publish_block, fetch_block, ... })
Main->>P2P: send(InitBlockChain { new_block, new_attestation, ... })
Note over SwarmAdapter,Swarm: swarm_loop: select! { swarm.next(), cmd_rx.recv() }
Swarm-->>SwarmAdapter: SwarmEvent (gossip/req-resp)
SwarmAdapter->>P2P: WrappedSwarmEvent (via spawn_listener)
P2P->>BlockChain: Recipient<NewBlock>.send(NewBlock)
P2P->>BlockChain: Recipient<NewAttestation>.send(NewAttestation)
BlockChain->>P2P: Recipient<PublishBlock>.send(PublishBlock)
P2P->>SwarmAdapter: SwarmHandle.publish(topic, data)
SwarmAdapter->>Swarm: gossipsub.publish(topic, data)
BlockChain->>P2P: Recipient<FetchBlock>.send(FetchBlock)
P2P->>SwarmAdapter: SwarmHandle.send_request(peer, req, proto)
SwarmAdapter->>Swarm: req_resp.send_request_with_protocol(...)
SwarmAdapter-->>P2P: OutboundRequestId (via oneshot)
Last reviewed commit: ba074c0
| // Ctrl-C received, shutting down | ||
| } | ||
| } | ||
| tokio::signal::ctrl_c().await.ok(); |
There was a problem hiding this comment.
P2P crash detection silently removed
The old code monitored the P2P task handle and panicked loudly if the P2P node exited unexpectedly:
tokio::select! {
result = p2p_handle => {
panic!("P2P node task has exited unexpectedly: {result:?}");
}
_ = tokio::signal::ctrl_c() => {}
}With the new architecture, if the swarm_loop task dies (e.g. event_tx.send() returns an error and breaks the loop) or the P2P actor panics, the main process will silently continue waiting for ctrl_c. The node will keep running but will no longer process any P2P events, publish any blocks/attestations, or fetch missing blocks — all without any crash, error log, or operator alert.
Consider monitoring both spawned tasks (swarm adapter and P2P actor lifecycle), or at minimum adding a tracing::error! before each break in swarm_loop and an explicit note that the process should be restarted.
Prompt To Fix With AI
This is a comment left during a code review.
Path: bin/ethlambda/src/main.rs
Line: 174
Comment:
**P2P crash detection silently removed**
The old code monitored the P2P task handle and panicked loudly if the P2P node exited unexpectedly:
```rust
tokio::select! {
result = p2p_handle => {
panic!("P2P node task has exited unexpectedly: {result:?}");
}
_ = tokio::signal::ctrl_c() => {}
}
```
With the new architecture, if the `swarm_loop` task dies (e.g. `event_tx.send()` returns an error and `break`s the loop) or the P2P actor panics, the main process will silently continue waiting for `ctrl_c`. The node will keep running but will no longer process any P2P events, publish any blocks/attestations, or fetch missing blocks — all without any crash, error log, or operator alert.
Consider monitoring both spawned tasks (swarm adapter and P2P actor lifecycle), or at minimum adding a `tracing::error!` before each `break` in `swarm_loop` and an explicit note that the process should be restarted.
How can I resolve this? If you propose a fix, please make it concise.| server | ||
| .swarm_handle | ||
| .publish(server.attestation_topic.clone(), compressed); | ||
| info!( | ||
| %slot, | ||
| validator, | ||
| target_slot = attestation.data.target.slot, | ||
| target_root = %ShortRoot(&attestation.data.target.root.0), | ||
| source_slot = attestation.data.source.slot, | ||
| source_root = %ShortRoot(&attestation.data.source.root.0), | ||
| "Published attestation to gossipsub" | ||
| ); |
There was a problem hiding this comment.
Success log fires before publish is actually executed
swarm_handle.publish() only enqueues a SwarmCommand::Publish onto an unbounded channel — the actual gossipsub.publish() call happens asynchronously inside swarm_loop. The info! log "Published attestation to gossipsub" (and its equivalents for block/aggregated attestation) fires unconditionally right after the enqueue, before the operation is executed or its result is known.
If gossipsub rejects the message (e.g. PublishError::InsufficientPeers, PublishError::Duplicate), the error is only logged as a warn! inside swarm_adapter.rs::execute_command, while the caller's log still says "Published attestation to gossipsub" — misleading operators into thinking the publish succeeded.
The old code correctly placed the info! inside .inspect(), so it only fired on success. Consider either:
- Moving the info log to
execute_commandwhere the result is known, or - Changing the log message to "Attempting to publish…" to accurately reflect the fire-and-forget nature of the call.
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/gossipsub/handler.rs
Line: 141-152
Comment:
**Success log fires before publish is actually executed**
`swarm_handle.publish()` only enqueues a `SwarmCommand::Publish` onto an unbounded channel — the actual `gossipsub.publish()` call happens asynchronously inside `swarm_loop`. The `info!` log "Published attestation to gossipsub" (and its equivalents for block/aggregated attestation) fires unconditionally right after the enqueue, before the operation is executed or its result is known.
If gossipsub rejects the message (e.g. `PublishError::InsufficientPeers`, `PublishError::Duplicate`), the error is only logged as a `warn!` inside `swarm_adapter.rs::execute_command`, while the caller's log still says "Published attestation to gossipsub" — misleading operators into thinking the publish succeeded.
The old code correctly placed the `info!` inside `.inspect()`, so it only fired on success. Consider either:
- Moving the info log to `execute_command` where the result is known, or
- Changing the log message to "Attempting to publish…" to accurately reflect the fire-and-forget nature of the call.
How can I resolve this? If you propose a fix, please make it concise.| async fn swarm_loop( | ||
| mut swarm: libp2p::Swarm<Behaviour>, | ||
| event_tx: mpsc::UnboundedSender<SwarmEvent<BehaviourEvent>>, | ||
| mut cmd_rx: mpsc::UnboundedReceiver<SwarmCommand>, | ||
| ) { | ||
| loop { | ||
| tokio::select! { | ||
| event = swarm.next() => { | ||
| let Some(event) = event else { break }; | ||
| if event_tx.send(event).is_err() { break } | ||
| } | ||
| cmd = cmd_rx.recv() => { | ||
| let Some(cmd) = cmd else { break }; | ||
| execute_command(&mut swarm, cmd); | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Swarm loop exits silently with no log
When swarm_loop exits via any of the three break branches (swarm stream ends, event_tx send fails, or cmd_rx closes), the task just returns without emitting any warning or error. Since this task is spawned with tokio::spawn(swarm_loop(...)) and nobody monitors its JoinHandle, there is no visibility into the failure.
After the loop exits:
- The
SwarmHandle::publish,dial, andsend_responsemethods will silently log "Swarm adapter closed, cannot …" on each subsequent call SwarmHandle::send_requestwill returnNone, which callers treat as a transient failure
At minimum, add a log at the exit point so operators can detect the failure:
async fn swarm_loop(...) {
loop {
tokio::select! { ... }
}
tracing::error!("Swarm adapter loop exited — P2P networking is no longer functional");
}Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/swarm_adapter.rs
Line: 104-121
Comment:
**Swarm loop exits silently with no log**
When `swarm_loop` exits via any of the three `break` branches (swarm stream ends, `event_tx` send fails, or `cmd_rx` closes), the task just returns without emitting any warning or error. Since this task is spawned with `tokio::spawn(swarm_loop(...))` and nobody monitors its `JoinHandle`, there is no visibility into the failure.
After the loop exits:
- The `SwarmHandle::publish`, `dial`, and `send_response` methods will silently log "Swarm adapter closed, cannot …" on each subsequent call
- `SwarmHandle::send_request` will return `None`, which callers treat as a transient failure
At minimum, add a log at the exit point so operators can detect the failure:
```rust
async fn swarm_loop(...) {
loop {
tokio::select! { ... }
}
tracing::error!("Swarm adapter loop exited — P2P networking is no longer functional");
}
```
How can I resolve this? If you propose a fix, please make it concise.| let _ = blockchain.actor_ref().recipient::<InitP2P>().send(InitP2P { | ||
| publish_block: p2p.actor_ref().recipient(), | ||
| publish_attestation: p2p.actor_ref().recipient(), | ||
| publish_aggregated: p2p.actor_ref().recipient(), | ||
| fetch_block: p2p.actor_ref().recipient(), | ||
| }); | ||
| let _ = p2p | ||
| .actor_ref() | ||
| .recipient::<InitBlockChain>() | ||
| .send(InitBlockChain { | ||
| new_block: blockchain.actor_ref().recipient(), | ||
| new_attestation: blockchain.actor_ref().recipient(), | ||
| new_aggregated: blockchain.actor_ref().recipient(), | ||
| }); |
There was a problem hiding this comment.
Init message failures silently discarded
Both init messages are sent with let _ = ..., silently discarding errors. If either send() call fails (e.g. the actor has already stopped, or the runtime is shutting down), the actors will operate in an un-wired state — all the if let Some(ref recipient) guards in gossip/req-resp handlers will simply skip forwarding, leading to completely silent data loss with no error logged at the call site.
While the actors are freshly spawned and failure is unlikely at startup, a log on error would make this much easier to diagnose:
if let Err(err) = blockchain.actor_ref().recipient::<InitP2P>().send(InitP2P { ... }) {
tracing::error!(%err, "Failed to send InitP2P — blockchain and P2P actors not wired");
}The same applies to the InitBlockChain message on line 159.
Prompt To Fix With AI
This is a comment left during a code review.
Path: bin/ethlambda/src/main.rs
Line: 153-166
Comment:
**Init message failures silently discarded**
Both init messages are sent with `let _ = ...`, silently discarding errors. If either `send()` call fails (e.g. the actor has already stopped, or the runtime is shutting down), the actors will operate in an un-wired state — all the `if let Some(ref recipient)` guards in gossip/req-resp handlers will simply skip forwarding, leading to completely silent data loss with no error logged at the call site.
While the actors are freshly spawned and failure is unlikely at startup, a log on error would make this much easier to diagnose:
```rust
if let Err(err) = blockchain.actor_ref().recipient::<InitP2P>().send(InitP2P { ... }) {
tracing::error!(%err, "Failed to send InitP2P — blockchain and P2P actors not wired");
}
```
The same applies to the `InitBlockChain` message on line 159.
How can I resolve this? If you propose a fix, please make it concise.
Motivation
The P2P module (
ethlambda-p2p) uses a hand-rolledtokio::select!event loop that manually multiplexes swarm events, incoming blockchain messages, and retry timers. This is inconsistent with theBlockChainServer, which already uses the spawned-concurrency GenServer pattern, and creates several problems:ethlambda-p2pdepends onethlambda-blockchain(for theBlockChainhandle andP2PMessageenum), andethlambda-blockchaindepends onethlambda-p2ptypes — tightly coupling the two crates.mpsc::unbounded_channelconnects blockchain→P2P, with a separateRetryMessagechannel for self-scheduling. Adding a new message type requires updating the enum, the channel, and theselect!arms.P2PServerstruct holds a raw&mut Swarm, the blockchain handle, and all channels — mixing I/O, business logic, and scheduling in one monolithic event loop.The
start_p2pfunction has aTODOcomment acknowledging this: it should be an actor receiving messages, not a hand-rolled loop.Description
Architecture (before → after)
New crate:
ethlambda-network-apiShared message types that decouple the two actors:
PublishBlock,PublishAttestation,PublishAggregatedAttestation,FetchBlockNewBlock,NewAttestation,NewAggregatedAttestationInitP2P(carries P2PRecipienthandles),InitBlockChain(carries blockchainRecipienthandles)SwarmAdapter (
swarm_adapter.rs)Thin I/O bridge that owns the libp2p
Swarmand exposes it via channels:SwarmEventstream (consumed byspawn_listenerinto the actor)SwarmCommandenum (Publish,Dial,SendRequest,SendResponse)SwarmHandle: Clone-able struct with convenience methods wrapping the command senderselect!in the codebase is here, isolated to infrastructureP2PServer actor
The actor uses a hybrid approach:
P2PProtocol): Only internal retry messages (RetryBlockFetch,RetryPeerRedial) — used withsend_afterfor timed self-messagesHandler<M>impls: Network-api messages (PublishBlock,FetchBlock,InitBlockChain, etc.) andWrappedSwarmEvent— this is necessary because:SwarmEventcontainsResponseChannelwhich isn'tClone(protocol macro auto-derives Clone on message structs)Recipient<M>to work across actor boundaries (protocol-wrapped types are different from the network-api types)BlockChainServer changes
Same hybrid approach applied for consistency:
Tick(internal scheduling viasend_after)InitP2P,NewBlock,NewAttestation,NewAggregatedAttestation— enablesRecipient<NewBlock>to work when P2P sends blocks to blockchainWiring in
main.rsChanges by file
Cargo.toml(workspace)crates/net/apimember andethlambda-network-apiworkspace dependencycrates/net/api/Cargo.tomlcrates/net/api/src/lib.rssend_messages!macro + manualInitP2P/InitBlockChainwith#[derive(Clone)])crates/net/p2p/Cargo.tomlspawned-concurrency,futures,tokio-stream; removeethlambda-blockchaindependencycrates/net/p2p/src/swarm_adapter.rsSwarmCommand,SwarmHandle,start_swarm_adapter(),swarm_loop(),execute_command()crates/net/p2p/src/lib.rsbuild_swarm()+P2Phandle +P2PServeractor with protocol + manual handlers; removedstart_p2p(),event_loop(),P2PMessage,RetryMessage, retry channelscrates/net/p2p/src/gossipsub/handler.rsserver.blockchain.notify_*()→Recipient::send();server.swarm.behaviour_mut().gossipsub.publish()→server.swarm_handle.publish()crates/net/p2p/src/req_resp/handlers.rsserver.swarm.behaviour_mut().req_resp.send_response()→server.swarm_handle.send_response();send_request_with_protocol()→server.swarm_handle.send_request().await;tokio::spawn(sleep + retry_tx)→send_after(backoff, ctx, RetryBlockFetch)crates/blockchain/Cargo.tomlethlambda-network-apidependencycrates/blockchain/src/lib.rsP2PMessageenum andp2p_txchannel; replace withOption<Recipient<...>>fields; protocol simplified toTickonly; manual handlers forInitP2P,NewBlock,NewAttestation,NewAggregatedAttestationbin/ethlambda/Cargo.tomlethlambda-network-apidependencybin/ethlambda/src/main.rsmpsc::unbounded_channel+tokio::spawn(start_p2p())withbuild_swarm()+P2P::spawn()+ init messagesWhat was removed
P2PMessageenum (replaced by network-api types +Recipient)RetryMessageenum and retry channels (replaced bysend_after)event_loop()function (replaced by actor message processing)start_p2p()function (replaced bybuild_swarm()+P2P::spawn())schedule_peer_redial()function (replaced bysend_after)handle_p2p_message()function (replaced by individualHandler<M>impls)ethlambda-blockchaindependency fromethlambda-p2p(cycle broken)How to test
For production validation, deploy to a multi-client devnet and verify: