-
Notifications
You must be signed in to change notification settings - Fork 12
Migrate P2P module from event loop to spawned-concurrency actor #188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: migrate-spawned-concurrency-v0.5
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,8 @@ use std::{ | |
| }; | ||
|
|
||
| use clap::Parser; | ||
| use ethlambda_p2p::{Bootnode, parse_enrs, start_p2p}; | ||
| use ethlambda_network_api::{InitBlockChain, InitP2P}; | ||
| use ethlambda_p2p::{Bootnode, P2P, SwarmConfig, build_swarm, parse_enrs}; | ||
| use ethlambda_types::primitives::H256; | ||
| use ethlambda_types::{ | ||
| genesis::GenesisConfig, | ||
|
|
@@ -132,38 +133,45 @@ async fn main() -> eyre::Result<()> { | |
| .await | ||
| .inspect_err(|err| error!(%err, "Failed to initialize state"))?; | ||
|
|
||
| let (p2p_tx, p2p_rx) = tokio::sync::mpsc::unbounded_channel(); | ||
| // Use first validator ID for subnet subscription | ||
| let first_validator_id = validator_keys.keys().min().copied(); | ||
| let blockchain = | ||
| BlockChain::spawn(store.clone(), p2p_tx, validator_keys, options.is_aggregator); | ||
| let blockchain = BlockChain::spawn(store.clone(), validator_keys, options.is_aggregator); | ||
|
|
||
| let p2p_handle = tokio::spawn(start_p2p( | ||
| node_p2p_key, | ||
| let built = build_swarm(SwarmConfig { | ||
| node_key: node_p2p_key, | ||
| bootnodes, | ||
| p2p_socket, | ||
| blockchain, | ||
| p2p_rx, | ||
| store.clone(), | ||
| first_validator_id, | ||
| options.attestation_committee_count, | ||
| options.is_aggregator, | ||
| )); | ||
| listening_socket: p2p_socket, | ||
| validator_id: first_validator_id, | ||
| attestation_committee_count: options.attestation_committee_count, | ||
| is_aggregator: options.is_aggregator, | ||
| }) | ||
| .expect("failed to build swarm"); | ||
|
|
||
| let p2p = P2P::spawn(built, store.clone()); | ||
|
|
||
| // Wire actors together via init messages | ||
| let _ = blockchain.actor_ref().recipient::<InitP2P>().send(InitP2P { | ||
| publish_block: p2p.actor_ref().recipient(), | ||
| publish_attestation: p2p.actor_ref().recipient(), | ||
| publish_aggregated: p2p.actor_ref().recipient(), | ||
| fetch_block: p2p.actor_ref().recipient(), | ||
| }); | ||
| let _ = p2p | ||
| .actor_ref() | ||
| .recipient::<InitBlockChain>() | ||
| .send(InitBlockChain { | ||
| new_block: blockchain.actor_ref().recipient(), | ||
| new_attestation: blockchain.actor_ref().recipient(), | ||
| new_aggregated: blockchain.actor_ref().recipient(), | ||
| }); | ||
|
|
||
| ethlambda_rpc::start_rpc_server(metrics_socket, store) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| info!("Node initialized"); | ||
|
|
||
| tokio::select! { | ||
| result = p2p_handle => { | ||
| panic!("P2P node task has exited unexpectedly: {result:?}"); | ||
| } | ||
| _ = tokio::signal::ctrl_c() => { | ||
| // Ctrl-C received, shutting down | ||
| } | ||
| } | ||
| tokio::signal::ctrl_c().await.ok(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2P crash detection silently removed The old code monitored the P2P task handle and panicked loudly if the P2P node exited unexpectedly: tokio::select! {
result = p2p_handle => {
panic!("P2P node task has exited unexpectedly: {result:?}");
}
_ = tokio::signal::ctrl_c() => {}
}With the new architecture, if the Consider monitoring both spawned tasks (swarm adapter and P2P actor lifecycle), or at minimum adding a Prompt To Fix With AIThis is a comment left during a code review.
Path: bin/ethlambda/src/main.rs
Line: 174
Comment:
**P2P crash detection silently removed**
The old code monitored the P2P task handle and panicked loudly if the P2P node exited unexpectedly:
```rust
tokio::select! {
result = p2p_handle => {
panic!("P2P node task has exited unexpectedly: {result:?}");
}
_ = tokio::signal::ctrl_c() => {}
}
```
With the new architecture, if the `swarm_loop` task dies (e.g. `event_tx.send()` returns an error and `break`s the loop) or the P2P actor panics, the main process will silently continue waiting for `ctrl_c`. The node will keep running but will no longer process any P2P events, publish any blocks/attestations, or fetch missing blocks — all without any crash, error log, or operator alert.
Consider monitoring both spawned tasks (swarm adapter and P2P actor lifecycle), or at minimum adding a `tracing::error!` before each `break` in `swarm_loop` and an explicit note that the process should be restarted.
How can I resolve this? If you propose a fix, please make it concise. |
||
| println!("Shutting down..."); | ||
|
|
||
| Ok(()) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Init message failures silently discarded
Both init messages are sent with
let _ = ..., silently discarding errors. If eithersend()call fails (e.g. the actor has already stopped, or the runtime is shutting down), the actors will operate in an un-wired state — all theif let Some(ref recipient)guards in gossip/req-resp handlers will simply skip forwarding, leading to completely silent data loss with no error logged at the call site.While the actors are freshly spawned and failure is unlikely at startup, a log on error would make this much easier to diagnose:
The same applies to the
InitBlockChainmessage on line 159.Prompt To Fix With AI