Skip to content

Migrate P2P module from event loop to spawned-concurrency actor#188

Open
pablodeymo wants to merge 1 commit intomigrate-spawned-concurrency-v0.5from
p2p-actor-migration
Open

Migrate P2P module from event loop to spawned-concurrency actor#188
pablodeymo wants to merge 1 commit intomigrate-spawned-concurrency-v0.5from
p2p-actor-migration

Conversation

@pablodeymo
Copy link
Collaborator

Motivation

The P2P module (ethlambda-p2p) uses a hand-rolled tokio::select! event loop that manually multiplexes swarm events, incoming blockchain messages, and retry timers. This is inconsistent with the BlockChainServer, which already uses the spawned-concurrency GenServer pattern, and creates several problems:

  1. Circular dependency: ethlambda-p2p depends on ethlambda-blockchain (for the BlockChain handle and P2PMessage enum), and ethlambda-blockchain depends on ethlambda-p2p types — tightly coupling the two crates.
  2. Manual channel plumbing: An mpsc::unbounded_channel connects blockchain→P2P, with a separate RetryMessage channel for self-scheduling. Adding a new message type requires updating the enum, the channel, and the select! arms.
  3. No actor isolation: The P2PServer struct holds a raw &mut Swarm, the blockchain handle, and all channels — mixing I/O, business logic, and scheduling in one monolithic event loop.

The start_p2p function has a TODO comment acknowledging this: it should be an actor receiving messages, not a hand-rolled loop.

Description

Architecture (before → after)

BEFORE:
┌─────────────┐  mpsc channel   ┌──────────────────────────────┐
│ BlockChain  │ ──────────────→ │ P2PServer (event loop)       │
│ (actor)     │ ←── notify_*()  │   owns: Swarm, blockchain,   │
│             │                 │   p2p_rx, retry_tx/rx         │
└─────────────┘                 │   select! { swarm, p2p, retry }│
                                └──────────────────────────────┘

AFTER:
┌─────────────┐  Recipient<M>   ┌──────────────────────────────┐
│ BlockChain  │ ←─────────────→ │ P2PServer (actor)            │
│ (actor)     │  InitP2P /      │   owns: SwarmHandle, store,  │
│             │  InitBlockChain │   Recipient handles          │
└─────────────┘                 └──────────┬───────────────────┘
                                           │ spawn_listener
                                ┌──────────┴───────────────────┐
                                │ SwarmAdapter (I/O bridge)    │
                                │   owns: Swarm                │
                                │   select! { swarm, commands } │
                                └──────────────────────────────┘

New crate: ethlambda-network-api

Shared message types that decouple the two actors:

Direction Messages
BlockChain → P2P PublishBlock, PublishAttestation, PublishAggregatedAttestation, FetchBlock
P2P → BlockChain NewBlock, NewAttestation, NewAggregatedAttestation
Startup wiring InitP2P (carries P2P Recipient handles), InitBlockChain (carries blockchain Recipient handles)

SwarmAdapter (swarm_adapter.rs)

Thin I/O bridge that owns the libp2p Swarm and exposes it via channels:

  • Inbound: SwarmEvent stream (consumed by spawn_listener into the actor)
  • Outbound: SwarmCommand enum (Publish, Dial, SendRequest, SendResponse)
  • SwarmHandle: Clone-able struct with convenience methods wrapping the command sender
  • Zero business logic — the only select! in the codebase is here, isolated to infrastructure

P2PServer actor

The actor uses a hybrid approach:

  • Protocol trait (P2PProtocol): Only internal retry messages (RetryBlockFetch, RetryPeerRedial) — used with send_after for timed self-messages
  • Manual Handler<M> impls: Network-api messages (PublishBlock, FetchBlock, InitBlockChain, etc.) and WrappedSwarmEvent — this is necessary because:
    • SwarmEvent contains ResponseChannel which isn't Clone (protocol macro auto-derives Clone on message structs)
    • Network-api messages must use the exact same type for Recipient<M> to work across actor boundaries (protocol-wrapped types are different from the network-api types)

BlockChainServer changes

Same hybrid approach applied for consistency:

  • Protocol: Only Tick (internal scheduling via send_after)
  • Manual handlers: InitP2P, NewBlock, NewAttestation, NewAggregatedAttestation — enables Recipient<NewBlock> to work when P2P sends blocks to blockchain

Wiring in main.rs

// 1. Build swarm (config, dial, subscribe)
let built = build_swarm(SwarmConfig { ... })?;

// 2. Spawn actors
let blockchain = BlockChain::spawn(store.clone(), validator_keys, is_aggregator);
let p2p = P2P::spawn(built, store.clone());

// 3. Wire via init messages (Recipient handles)
blockchain.actor_ref().recipient::<InitP2P>().send(InitP2P {
    publish_block: p2p.actor_ref().recipient(),
    publish_attestation: p2p.actor_ref().recipient(),
    ...
});
p2p.actor_ref().recipient::<InitBlockChain>().send(InitBlockChain {
    new_block: blockchain.actor_ref().recipient(),
    ...
});

Changes by file

File Changes
Cargo.toml (workspace) Add crates/net/api member and ethlambda-network-api workspace dependency
crates/net/api/Cargo.toml New crate manifest
crates/net/api/src/lib.rs Shared message types (send_messages! macro + manual InitP2P/InitBlockChain with #[derive(Clone)])
crates/net/p2p/Cargo.toml Add spawned-concurrency, futures, tokio-stream; remove ethlambda-blockchain dependency
crates/net/p2p/src/swarm_adapter.rs New: SwarmCommand, SwarmHandle, start_swarm_adapter(), swarm_loop(), execute_command()
crates/net/p2p/src/lib.rs Rewrite: build_swarm() + P2P handle + P2PServer actor with protocol + manual handlers; removed start_p2p(), event_loop(), P2PMessage, RetryMessage, retry channels
crates/net/p2p/src/gossipsub/handler.rs server.blockchain.notify_*()Recipient::send(); server.swarm.behaviour_mut().gossipsub.publish()server.swarm_handle.publish()
crates/net/p2p/src/req_resp/handlers.rs server.swarm.behaviour_mut().req_resp.send_response()server.swarm_handle.send_response(); send_request_with_protocol()server.swarm_handle.send_request().await; tokio::spawn(sleep + retry_tx)send_after(backoff, ctx, RetryBlockFetch)
crates/blockchain/Cargo.toml Add ethlambda-network-api dependency
crates/blockchain/src/lib.rs Remove P2PMessage enum and p2p_tx channel; replace with Option<Recipient<...>> fields; protocol simplified to Tick only; manual handlers for InitP2P, NewBlock, NewAttestation, NewAggregatedAttestation
bin/ethlambda/Cargo.toml Add ethlambda-network-api dependency
bin/ethlambda/src/main.rs Replace mpsc::unbounded_channel + tokio::spawn(start_p2p()) with build_swarm() + P2P::spawn() + init messages

What was removed

  • P2PMessage enum (replaced by network-api types + Recipient)
  • RetryMessage enum and retry channels (replaced by send_after)
  • event_loop() function (replaced by actor message processing)
  • start_p2p() function (replaced by build_swarm() + P2P::spawn())
  • schedule_peer_redial() function (replaced by send_after)
  • handle_p2p_message() function (replaced by individual Handler<M> impls)
  • Direct ethlambda-blockchain dependency from ethlambda-p2p (cycle broken)

How to test

make fmt    # Formatting passes
make lint   # Clippy with -D warnings passes
make test   # All tests pass (including fork choice spec tests)

For production validation, deploy to a multi-client devnet and verify:

  • Block gossip works (blocks published and received)
  • Attestation gossip works (attestations published and received)
  • Peer connections establish and status requests/responses flow
  • Block fetch retries work with exponential backoff
  • Bootnode redial works after disconnection

…ncy actor

  Replace the hand-rolled event loop in start_p2p() with a proper GenServer
  actor (P2PServer) using the spawned-concurrency framework, matching how
  BlockChainServer already works.

  Key changes:
  - New ethlambda-network-api crate with shared message types (PublishBlock,
    FetchBlock, NewBlock, etc.) to break the dependency cycle between
    blockchain and P2P crates
  - SwarmAdapter: thin I/O bridge that owns the libp2p Swarm and communicates
    via channels (SwarmCommand/SwarmEvent), keeping select! isolated to infra
  - P2P actor receives swarm events via spawn_listener, network-api messages
    via manual Handler<M> impls, and retry scheduling via send_after
  - BlockChain protocol simplified: network-api messages use manual Handler
    impls instead of protocol-wrapped types, enabling Recipient<M> across
    actor boundaries
  - Actors wired at startup via InitP2P/InitBlockChain init messages carrying
    Recipient handles, replacing mpsc channels
@github-actions
Copy link

github-actions bot commented Mar 6, 2026

🤖 Kimi Code Review

Summary

The PR refactors the P2P layer into a proper actor model using spawned-concurrency, introduces a new ethlambda-network-api crate for message definitions, and replaces the previous direct BlockChain handle with asynchronous Recipient<M> channels. This is a large structural change that touches initialization, message passing, retry logic, and error handling.

Issues found

1. Race condition on actor initialization

File: bin/ethlambda/src/main.rs (lines 150–167)
The InitP2P and InitBlockChain messages are sent with let _ = … – the results are ignored.

  • If the BlockChain actor is not yet ready when the InitP2P message arrives, the message is silently dropped.
  • Same risk in the reverse direction.
    Fix: spawn each actor first, then await the Init* messages to ensure they are processed before continuing.

2. Unbounded channel usage in swarm_adapter.rs

File: crates/net/p2p/src/swarm_adapter.rs (lines 66, 75, 86, 97, 108, 119, 131, 142, 153)
mpsc::unbounded_channel is used for both events and commands.

  • A malicious or mis-behaving peer could flood the node with gossip messages, causing unbounded memory growth.
    Fix: switch to bounded channels with a reasonable high-water mark (e.g., 1024) and log/drop when full.

3. Missing error handling on publish failures

File: crates/net/p2p/src/gossipsub/handler.rs (lines 141–166, 169–194, 197–222)
publish calls on SwarmHandle are fire-and-forget (they only warn if the adapter is closed).

  • If the underlying gossipsub buffer is full or the peer score is too low, the message is silently lost.
    Fix: surface the PublishError back to the caller (e.g., return a Result<(), PublishError> from the Publish* handlers) so the blockchain can decide to retry or log.

4. Potential deadlock on shutdown

File: bin/ethlambda/src/main.rs (lines 174–175)
The new code removes the tokio::select! that kept the P2P task alive.

  • If the P2P actor panics, tokio::signal::ctrl_c().await will still complete and the process exits silently.
    Fix: keep a JoinHandle from P2P::spawn and select! on it plus the ctrl-c future so panics are logged.

5. Incorrect error variant in handle_blocks_by_root_response

File: crates/net/p2p/src/req_resp/handlers.rs (line 147)
handle_fetch_failure is called with an empty response, but the log line says “Received empty BlocksByRoot response” yet still schedules a retry.

  • Empty responses should probably be treated as terminal failure, not retried.
    Fix: decide policy: either retry with exponential backoff or mark the root as unavailable.

6. Unused ValidatorSecretKey import

File: crates/blockchain/src/lib.rs (line 21)
ValidatorSecretKey is imported but not used in the diff.
Fix: remove the unused import.

7. Typo in unreachable message

File: crates/net/p2p/src/gossipsub/handler.rs (line 24)
“we already matched on event_loop” → should be “we already matched on Message variant in handle_swarm_event”.
Fix: update the string.

8. Inconsistent visibility of BlockChainServer

File: crates/blockchain/src/lib.rs (line 84)
BlockChainServer is now pub but was previously private.

  • This exposes internals that downstream crates should not rely on.
    Fix: revert to pub(crate) unless there is a compelling reason.

9. Missing Clone on InitP2P / InitBlockChain

The manual Message impls are correct, but the structs are Clone – consider deriving it to avoid future drift.

Positive observations

  • The new ethlambda-network-api crate cleanly separates concerns and avoids cyclic dependencies.
  • The SwarmHandle abstraction prevents direct &mut Swarm access from actor code, improving safety.
  • Retry logic moved from ad-hoc tokio::spawn into the actor scheduler (send_after) is more robust.

Minor style nits

  • Use Duration::from_secs_f64(0.1) instead of Duration::from_millis(100) for readability in swarm_adapter.rs.
  • Prefer expect("…") over unwrap() in execute_command for easier debugging.

Overall the refactor is sound, but the initialization race and unbounded channels should be addressed before merge.


Automated review by Kimi (Moonshot AI) · custom prompt

@github-actions
Copy link

github-actions bot commented Mar 6, 2026

🤖 Codex Code Review

  1. High: startup race can silently drop consensus-critical messages before actor wiring completes.

  2. High: process no longer supervises P2P liveness; node can keep running while effectively disconnected.

    • Main now waits only for Ctrl-C in main.rs:174.
    • Previous behavior (panic/exit when P2P task died) was removed. If swarm/adapter exits, consensus actor may continue producing while not networking, which is a liveness and operational safety regression.
  3. High (security/perf): unbounded adapter queues allow memory growth under peer-driven event flood.

    • Unbounded channels introduced in swarm_adapter.rs:94 and swarm_adapter.rs:95.
    • swarm_loop forwards every network event into the unbounded queue in swarm_adapter.rs:111.
    • Under gossip/request spam, this is a realistic OOM/DoS vector. A bounded queue + backpressure/drop policy is safer for a consensus client.
  4. Medium: publish logs can report success before actual libp2p publish outcome.

Could not run cargo check due sandbox rustup permission error (/home/runner/.rustup/tmp not writable), so findings are from static diff/code inspection.


Automated review by OpenAI Codex · custom prompt

@github-actions
Copy link

github-actions bot commented Mar 6, 2026

🤖 Claude Code Review

Now I have everything I need for a thorough review. Let me compile the findings.


PR #188 Review: Migrate P2P module to spawned-concurrency actor

Overall: The architectural direction is sound — breaking the circular crate dependency, isolating the select! loop to a thin I/O adapter, and using the actor model consistently are all improvements. The code is well-structured and the PR description is thorough. A few correctness and observability issues are worth addressing before merge.


Significant Issues

Gossipsub publish: success is always logged regardless of outcome

crates/net/p2p/src/gossipsub/handler.rs — all three publish functions (attestation, block, aggregated attestation) now unconditionally log success:

// Before (correct — only logs on success):
let _ = server.swarm.behaviour_mut().gossipsub
    .publish(topic, compressed)
    .inspect(|_| info!(..., "Published attestation to gossipsub"))
    .inspect_err(|err| warn!(..., "Failed to publish attestation to gossipsub"));

// After (always logs success):
server.swarm_handle.publish(server.attestation_topic.clone(), compressed);
info!(..., "Published attestation to gossipsub");  // fires even on failure

The error is now only captured deep in swarm_adapter.rs via warn!(%err, "Swarm adapter: publish failed"), without slot, validator, topic, or any other context. "Insufficient peers" failures during startup or after disconnections will be invisible in operational logs while appearing as successful publishes. The old inspect/inspect_err pattern was correct here; the refactoring broke it.

Bootnode redial regression on synchronous dial() failure

In the old handle_peer_redial:

let _ = server.swarm.dial(addr.clone()).inspect_err(|e| {
    warn!(%peer_id, %e, "Failed to redial bootnode, will retry");
    schedule_peer_redial(server.retry_tx.clone(), peer_id);  // re-schedules on error
});

In the new handle_retry_peer_redial:

self.swarm_handle.dial(addr.clone());

swarm.dial() can fail synchronously (e.g., address limit reached, already dialing). The old code detected this and re-scheduled a retry. The new code fires-and-forgets; swarm_adapter.rs only logs the error without rescheduling. A bootnode that hits a dial limit will never be retried until the next disconnect event triggers the cycle again.

Process liveness monitoring removed

bin/ethlambda/src/main.rs:

Old:

tokio::select! {
    result = p2p_handle => { panic!("P2P node task has exited unexpectedly: {result:?}"); }
    _ = tokio::signal::ctrl_c() => {}
}

New:

tokio::signal::ctrl_c().await.ok();

If the P2P actor, swarm adapter task, or blockchain actor terminates unexpectedly (panic, channel closure), the process now continues silently, appearing healthy to operators while producing no blocks or attestations. At minimum, consider monitoring actor liveness or restoring a panic on unexpected shutdown.


Minor Issues

P2PServer and BlockChainServer unnecessarily widened to pub

crates/net/p2p/src/lib.rs and crates/blockchain/src/lib.rs:

-pub(crate) struct P2PServer {
+pub struct P2PServer {

-struct BlockChainServer {
+pub struct BlockChainServer {

The actor state structs are implementation details. Making them pub leaks internal fields (all of which are also pub(crate) or pub). If nothing outside the crate needs these types directly, pub(crate) is more appropriate.

send_response error loses peer context

crates/net/p2p/src/req_resp/handlers.rs:

Old:

.inspect_err(|err| warn!(%peer, ?err, "Failed to send BlocksByRoot response"));

New (in swarm adapter):

.inspect_err(|err| warn!(?err, "Swarm adapter: send_response failed"));

The peer ID is dropped from the error log, making it harder to correlate with peer state in production.

send_request awaits inside actor handler — minor head-of-line blocking

crates/net/p2p/src/swarm_adapter.rssend_request sends a command and awaits a oneshot channel response from the swarm loop to retrieve the OutboundRequestId. Since spawned-concurrency actors are single-threaded per actor, this await pauses the P2P mailbox (blocking WrappedSwarmEvent processing, etc.) for the duration of the round-trip. In practice the latency is negligible, but it's worth noting as a constraint: any swarm backpressure here stalls the entire actor.

FetchBlock handler: ctx not passed, return value unchecked

crates/net/p2p/src/lib.rs:

impl Handler<FetchBlock> for P2PServer {
    async fn handle(&mut self, msg: FetchBlock, _ctx: &Context<Self>) {
        ...
        fetch_block_from_peer(self, root).await;  // return value ignored
    }
}

fetch_block_from_peer returns bool indicating whether the request was sent successfully. If false, the caller should log or handle the failure. handle_retry_block_fetch correctly checks and cleans up on failure; the initial FetchBlock handler should do the same (or at least log). Note _ctx is unused here but would be needed if the handler needs to schedule a retry on initial failure.


What Looks Good

  • The SwarmAdapter cleanly isolates the select! loop to infrastructure; the zero-business-logic rule is respected.
  • Dependency cycle between ethlambda-p2p and ethlambda-blockchain is properly broken via the new ethlambda-network-api crate.
  • spawn_listener for bridging the swarm event stream into the actor mailbox is the correct pattern.
  • send_after replacing tokio::spawn(sleep + channel_send) for retry scheduling is a clear improvement.
  • The send_messages! / manual InitP2P/InitBlockChain split is well-justified and the comment explains the reasoning clearly.
  • Deduplication of in-flight block requests is preserved correctly.

Automated review by Claude (Anthropic) · custom prompt

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 6, 2026

Greptile Summary

This PR migrates the P2P module from a hand-rolled tokio::select! event loop to the spawned-concurrency actor pattern, consistent with BlockChainServer. It introduces a new ethlambda-network-api crate to share message types and break the circular dependency between ethlambda-p2p and ethlambda-blockchain, and adds a SwarmAdapter I/O bridge that isolates all direct Swarm access. The architectural direction is solid and the refactoring is thorough.

Key changes:

  • New ethlambda-network-api crate: Shared PublishBlock, FetchBlock, NewBlock, InitP2P, InitBlockChain, etc. — cleanly decouples the two actors.
  • SwarmAdapter: Thin bridge that owns the libp2p Swarm, exposing a SwarmHandle (clone-able command sender) and an event stream consumed by spawn_listener; the only remaining select! in the codebase.
  • P2PServer actor: Hybrid protocol (RetryBlockFetch, RetryPeerRedial via send_after) + manual Handler<M> impls for network-api and swarm-event messages.
  • BlockChainServer: Protocol reduced to Tick; p2p_tx: mpsc::Sender replaced by Option<Recipient<T>> fields set via InitP2P.
  • Issues found: (1) The old crash detection that panicked when the P2P task exited was removed — if swarm_loop dies the node now silently continues without P2P. (2) The three gossipsub publish functions log "Published X to gossipsub" immediately after enqueuing the command, before the actual gossipsub.publish() executes, so failures in the swarm adapter are not reflected in the caller's logs. (3) swarm_loop exits without any error log, making failures invisible. (4) The actor init messages in main.rs discard errors with let _ = and no fallback logging.

Confidence Score: 3/5

  • Safe to merge after addressing the crash-detection regression and misleading publish logs — functional behavior is otherwise preserved.
  • The architectural refactor is well-executed and correctly preserves the core P2P logic. However two logic-level issues reduce confidence: the removal of crash detection means a silent swarm adapter failure will leave the node running with no P2P but no alert, and the unconditional publish-success logs will make debugging gossip failures harder in production. The init-message and swarm_loop logging issues are lower severity but warrant attention before a production deploy.
  • bin/ethlambda/src/main.rs (crash detection, init error handling) and crates/net/p2p/src/gossipsub/handler.rs (premature success logs).

Important Files Changed

Filename Overview
bin/ethlambda/src/main.rs Replaces the manual task-handle select loop with actor wiring; loses P2P crash detection and silently discards init-message send errors.
crates/net/p2p/src/swarm_adapter.rs New file — clean I/O bridge isolating the select! loop; swarm_loop exits silently on failure with no log.
crates/net/p2p/src/lib.rs Rewrites P2P from event-loop to actor pattern; good separation of protocol and manual Handler impls; architecture is sound.
crates/net/p2p/src/gossipsub/handler.rs Inbound forwarding correctly migrated to Recipient; success info logs for all three publish functions fire before the gossipsub operation is executed or confirmed.
crates/net/p2p/src/req_resp/handlers.rs Correctly migrated to SwarmHandle and send_after; replaces unwrap on send_response with a channel send; logic preserved.
crates/net/api/src/lib.rs New shared message-type crate; cleanly breaks the circular dependency; Init messages correctly derive Clone.
crates/blockchain/src/lib.rs Replaces mpsc channel with Option<Recipient> fields; manual Handler impls are correct; P2P fields guarded consistently.

Sequence Diagram

sequenceDiagram
    participant Main
    participant BlockChain as BlockChain (actor)
    participant P2P as P2PServer (actor)
    participant SwarmAdapter as SwarmAdapter (task)
    participant Swarm as libp2p Swarm

    Main->>SwarmAdapter: start_swarm_adapter(swarm)
    SwarmAdapter-->>Main: (SwarmEventStream, SwarmHandle)
    Main->>P2P: P2P::spawn(built, store)
    Note over P2P: spawn_listener(swarm_stream)
    Main->>BlockChain: BlockChain::spawn(store, keys)
    Main->>BlockChain: send(InitP2P { publish_block, fetch_block, ... })
    Main->>P2P: send(InitBlockChain { new_block, new_attestation, ... })

    Note over SwarmAdapter,Swarm: swarm_loop: select! { swarm.next(), cmd_rx.recv() }

    Swarm-->>SwarmAdapter: SwarmEvent (gossip/req-resp)
    SwarmAdapter->>P2P: WrappedSwarmEvent (via spawn_listener)
    P2P->>BlockChain: Recipient<NewBlock>.send(NewBlock)
    P2P->>BlockChain: Recipient<NewAttestation>.send(NewAttestation)

    BlockChain->>P2P: Recipient<PublishBlock>.send(PublishBlock)
    P2P->>SwarmAdapter: SwarmHandle.publish(topic, data)
    SwarmAdapter->>Swarm: gossipsub.publish(topic, data)

    BlockChain->>P2P: Recipient<FetchBlock>.send(FetchBlock)
    P2P->>SwarmAdapter: SwarmHandle.send_request(peer, req, proto)
    SwarmAdapter->>Swarm: req_resp.send_request_with_protocol(...)
    SwarmAdapter-->>P2P: OutboundRequestId (via oneshot)
Loading

Last reviewed commit: ba074c0

// Ctrl-C received, shutting down
}
}
tokio::signal::ctrl_c().await.ok();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2P crash detection silently removed

The old code monitored the P2P task handle and panicked loudly if the P2P node exited unexpectedly:

tokio::select! {
    result = p2p_handle => {
        panic!("P2P node task has exited unexpectedly: {result:?}");
    }
    _ = tokio::signal::ctrl_c() => {}
}

With the new architecture, if the swarm_loop task dies (e.g. event_tx.send() returns an error and breaks the loop) or the P2P actor panics, the main process will silently continue waiting for ctrl_c. The node will keep running but will no longer process any P2P events, publish any blocks/attestations, or fetch missing blocks — all without any crash, error log, or operator alert.

Consider monitoring both spawned tasks (swarm adapter and P2P actor lifecycle), or at minimum adding a tracing::error! before each break in swarm_loop and an explicit note that the process should be restarted.

Prompt To Fix With AI
This is a comment left during a code review.
Path: bin/ethlambda/src/main.rs
Line: 174

Comment:
**P2P crash detection silently removed**

The old code monitored the P2P task handle and panicked loudly if the P2P node exited unexpectedly:

```rust
tokio::select! {
    result = p2p_handle => {
        panic!("P2P node task has exited unexpectedly: {result:?}");
    }
    _ = tokio::signal::ctrl_c() => {}
}
```

With the new architecture, if the `swarm_loop` task dies (e.g. `event_tx.send()` returns an error and `break`s the loop) or the P2P actor panics, the main process will silently continue waiting for `ctrl_c`. The node will keep running but will no longer process any P2P events, publish any blocks/attestations, or fetch missing blocks — all without any crash, error log, or operator alert.

Consider monitoring both spawned tasks (swarm adapter and P2P actor lifecycle), or at minimum adding a `tracing::error!` before each `break` in `swarm_loop` and an explicit note that the process should be restarted.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +141 to +152
server
.swarm_handle
.publish(server.attestation_topic.clone(), compressed);
info!(
%slot,
validator,
target_slot = attestation.data.target.slot,
target_root = %ShortRoot(&attestation.data.target.root.0),
source_slot = attestation.data.source.slot,
source_root = %ShortRoot(&attestation.data.source.root.0),
"Published attestation to gossipsub"
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Success log fires before publish is actually executed

swarm_handle.publish() only enqueues a SwarmCommand::Publish onto an unbounded channel — the actual gossipsub.publish() call happens asynchronously inside swarm_loop. The info! log "Published attestation to gossipsub" (and its equivalents for block/aggregated attestation) fires unconditionally right after the enqueue, before the operation is executed or its result is known.

If gossipsub rejects the message (e.g. PublishError::InsufficientPeers, PublishError::Duplicate), the error is only logged as a warn! inside swarm_adapter.rs::execute_command, while the caller's log still says "Published attestation to gossipsub" — misleading operators into thinking the publish succeeded.

The old code correctly placed the info! inside .inspect(), so it only fired on success. Consider either:

  • Moving the info log to execute_command where the result is known, or
  • Changing the log message to "Attempting to publish…" to accurately reflect the fire-and-forget nature of the call.
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/gossipsub/handler.rs
Line: 141-152

Comment:
**Success log fires before publish is actually executed**

`swarm_handle.publish()` only enqueues a `SwarmCommand::Publish` onto an unbounded channel — the actual `gossipsub.publish()` call happens asynchronously inside `swarm_loop`. The `info!` log "Published attestation to gossipsub" (and its equivalents for block/aggregated attestation) fires unconditionally right after the enqueue, before the operation is executed or its result is known.

If gossipsub rejects the message (e.g. `PublishError::InsufficientPeers`, `PublishError::Duplicate`), the error is only logged as a `warn!` inside `swarm_adapter.rs::execute_command`, while the caller's log still says "Published attestation to gossipsub" — misleading operators into thinking the publish succeeded.

The old code correctly placed the `info!` inside `.inspect()`, so it only fired on success. Consider either:
- Moving the info log to `execute_command` where the result is known, or
- Changing the log message to "Attempting to publish…" to accurately reflect the fire-and-forget nature of the call.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +104 to +121
async fn swarm_loop(
mut swarm: libp2p::Swarm<Behaviour>,
event_tx: mpsc::UnboundedSender<SwarmEvent<BehaviourEvent>>,
mut cmd_rx: mpsc::UnboundedReceiver<SwarmCommand>,
) {
loop {
tokio::select! {
event = swarm.next() => {
let Some(event) = event else { break };
if event_tx.send(event).is_err() { break }
}
cmd = cmd_rx.recv() => {
let Some(cmd) = cmd else { break };
execute_command(&mut swarm, cmd);
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swarm loop exits silently with no log

When swarm_loop exits via any of the three break branches (swarm stream ends, event_tx send fails, or cmd_rx closes), the task just returns without emitting any warning or error. Since this task is spawned with tokio::spawn(swarm_loop(...)) and nobody monitors its JoinHandle, there is no visibility into the failure.

After the loop exits:

  • The SwarmHandle::publish, dial, and send_response methods will silently log "Swarm adapter closed, cannot …" on each subsequent call
  • SwarmHandle::send_request will return None, which callers treat as a transient failure

At minimum, add a log at the exit point so operators can detect the failure:

async fn swarm_loop(...) {
    loop {
        tokio::select! { ... }
    }
    tracing::error!("Swarm adapter loop exited — P2P networking is no longer functional");
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/swarm_adapter.rs
Line: 104-121

Comment:
**Swarm loop exits silently with no log**

When `swarm_loop` exits via any of the three `break` branches (swarm stream ends, `event_tx` send fails, or `cmd_rx` closes), the task just returns without emitting any warning or error. Since this task is spawned with `tokio::spawn(swarm_loop(...))` and nobody monitors its `JoinHandle`, there is no visibility into the failure.

After the loop exits:
- The `SwarmHandle::publish`, `dial`, and `send_response` methods will silently log "Swarm adapter closed, cannot …" on each subsequent call
- `SwarmHandle::send_request` will return `None`, which callers treat as a transient failure

At minimum, add a log at the exit point so operators can detect the failure:
```rust
async fn swarm_loop(...) {
    loop {
        tokio::select! { ... }
    }
    tracing::error!("Swarm adapter loop exited — P2P networking is no longer functional");
}
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +153 to +166
let _ = blockchain.actor_ref().recipient::<InitP2P>().send(InitP2P {
publish_block: p2p.actor_ref().recipient(),
publish_attestation: p2p.actor_ref().recipient(),
publish_aggregated: p2p.actor_ref().recipient(),
fetch_block: p2p.actor_ref().recipient(),
});
let _ = p2p
.actor_ref()
.recipient::<InitBlockChain>()
.send(InitBlockChain {
new_block: blockchain.actor_ref().recipient(),
new_attestation: blockchain.actor_ref().recipient(),
new_aggregated: blockchain.actor_ref().recipient(),
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Init message failures silently discarded

Both init messages are sent with let _ = ..., silently discarding errors. If either send() call fails (e.g. the actor has already stopped, or the runtime is shutting down), the actors will operate in an un-wired state — all the if let Some(ref recipient) guards in gossip/req-resp handlers will simply skip forwarding, leading to completely silent data loss with no error logged at the call site.

While the actors are freshly spawned and failure is unlikely at startup, a log on error would make this much easier to diagnose:

if let Err(err) = blockchain.actor_ref().recipient::<InitP2P>().send(InitP2P { ... }) {
    tracing::error!(%err, "Failed to send InitP2P — blockchain and P2P actors not wired");
}

The same applies to the InitBlockChain message on line 159.

Prompt To Fix With AI
This is a comment left during a code review.
Path: bin/ethlambda/src/main.rs
Line: 153-166

Comment:
**Init message failures silently discarded**

Both init messages are sent with `let _ = ...`, silently discarding errors. If either `send()` call fails (e.g. the actor has already stopped, or the runtime is shutting down), the actors will operate in an un-wired state — all the `if let Some(ref recipient)` guards in gossip/req-resp handlers will simply skip forwarding, leading to completely silent data loss with no error logged at the call site.

While the actors are freshly spawned and failure is unlikely at startup, a log on error would make this much easier to diagnose:
```rust
if let Err(err) = blockchain.actor_ref().recipient::<InitP2P>().send(InitP2P { ... }) {
    tracing::error!(%err, "Failed to send InitP2P — blockchain and P2P actors not wired");
}
```

The same applies to the `InitBlockChain` message on line 159.

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant