From 714b5f88ea7f3356b80762da58539bd59117741c Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Thu, 5 Mar 2026 19:11:35 -0300 Subject: [PATCH] Migrate spawned-concurrency from 0.4 to 0.5.0-rc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the GenServer pattern with the new protocol/actor macro API: - Define #[protocol] trait BlockChainProtocol with send-only methods - Replace impl GenServer + CastMessage enum with #[actor] impl + #[send_handler]s - GenServerHandle → ActorRef, send_after takes Context instead of handle - notify_* methods become synchronous (&self) since ActorRef::send() is non-blocking - Remove .await from all P2P call sites accordingly - Remove unused spawned-concurrency and spawned-rt deps from bin/ethlambda --- Cargo.lock | 108 +++++++++++++++++-- Cargo.toml | 4 +- bin/ethlambda/Cargo.toml | 2 - crates/blockchain/src/lib.rs | 132 ++++++++++++------------ crates/net/p2p/src/gossipsub/handler.rs | 10 +- crates/net/p2p/src/req_resp/handlers.rs | 2 +- 6 files changed, 171 insertions(+), 87 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ec1c042..85d759f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -841,6 +841,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block2" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5" +dependencies = [ + "objc2", +] + [[package]] name = "bls12_381" version = "0.8.0" @@ -1422,6 +1431,17 @@ dependencies = [ "cipher", ] +[[package]] +name = "ctrlc" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73736a89c4aff73035ba2ed2e565061954da00d4970fc9ac25dcc85a2a20d790" +dependencies = [ + "dispatch2", + "nix 0.30.1", + "windows-sys 0.61.2", +] + [[package]] name = "cuckoofilter" version = "0.5.0" @@ -1735,6 +1755,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "dispatch2" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e0e367e4e7da84520dedcac1901e4da967309406d1e51017ae1abfb97adbd38" +dependencies = [ + "bitflags 2.10.0", + "block2", + "libc", + "objc2", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -2000,8 +2032,6 @@ dependencies = [ "reqwest", "serde", "serde_yaml_ng", - "spawned-concurrency", - "spawned-rt", "thiserror 2.0.17", "tikv-jemallocator", "tokio", @@ -2027,7 +2057,7 @@ dependencies = [ "hex", "serde", "serde_json", - "spawned-concurrency", + "spawned-concurrency 0.4.5 (git+https://github.com/lambdaclass/spawned.git?tag=v0.5.0-rc)", "ssz_types", "thiserror 2.0.17", "tokio", @@ -2299,8 +2329,8 @@ dependencies = [ "serde_json", "sha2", "snap", - "spawned-concurrency", - "spawned-rt", + "spawned-concurrency 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "spawned-rt 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "thiserror 2.0.17", "tokio", "tokio-stream", @@ -4790,6 +4820,18 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" +dependencies = [ + "bitflags 2.10.0", + "cfg-if 1.0.4", + "cfg_aliases", + "libc", +] + [[package]] name = "nohash-hasher" version = "0.2.0" @@ -4966,6 +5008,21 @@ dependencies = [ "libc", ] +[[package]] +name = "objc2" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a12a8ed07aefc768292f076dc3ac8c48f3781c8f2d5851dd3d98950e8c5a89f" +dependencies = [ + "objc2-encode", +] + +[[package]] +name = "objc2-encode" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33" + [[package]] name = "object" version = "0.37.3" @@ -6434,7 +6491,7 @@ dependencies = [ "netlink-packet-utils", "netlink-proto", "netlink-sys", - "nix", + "nix 0.26.4", "thiserror 1.0.69", "tokio", ] @@ -7007,11 +7064,34 @@ checksum = "4d3ec6b3c003075f7d1c4c6475308243e853c9a78149b84b1f8b64d5bed49d49" dependencies = [ "futures", "pin-project-lite", - "spawned-rt", + "spawned-rt 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "thiserror 2.0.17", + "tracing", +] + +[[package]] +name = "spawned-concurrency" +version = "0.4.5" +source = "git+https://github.com/lambdaclass/spawned.git?tag=v0.5.0-rc#8b4dbf73641c2179138ce85390afc490688b0580" +dependencies = [ + "futures", + "pin-project-lite", + "spawned-macros", + "spawned-rt 0.4.5 (git+https://github.com/lambdaclass/spawned.git?tag=v0.5.0-rc)", "thiserror 2.0.17", "tracing", ] +[[package]] +name = "spawned-macros" +version = "0.4.5" +source = "git+https://github.com/lambdaclass/spawned.git?tag=v0.5.0-rc#8b4dbf73641c2179138ce85390afc490688b0580" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "spawned-rt" version = "0.4.5" @@ -7026,6 +7106,20 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "spawned-rt" +version = "0.4.5" +source = "git+https://github.com/lambdaclass/spawned.git?tag=v0.5.0-rc#8b4dbf73641c2179138ce85390afc490688b0580" +dependencies = [ + "crossbeam 0.7.3", + "ctrlc", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", + "tracing-subscriber", +] + [[package]] name = "spin" version = "0.9.8" diff --git a/Cargo.toml b/Cargo.toml index c5f2fc35..46fdc657 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,8 +44,8 @@ serde_json = "1.0.117" serde_yaml_ng = "0.10" hex = "0.4" -spawned-concurrency = "0.4" -spawned-rt = "0.4" +spawned-concurrency = { git = "https://github.com/lambdaclass/spawned.git", tag = "v0.5.0-rc" } +spawned-rt = { git = "https://github.com/lambdaclass/spawned.git", tag = "v0.5.0-rc" } tokio = "1.0" prometheus = "0.14" diff --git a/bin/ethlambda/Cargo.toml b/bin/ethlambda/Cargo.toml index 2ecb780d..7b46286f 100644 --- a/bin/ethlambda/Cargo.toml +++ b/bin/ethlambda/Cargo.toml @@ -12,8 +12,6 @@ ethlambda-types.workspace = true ethlambda-rpc.workspace = true ethlambda-storage.workspace = true -spawned-concurrency.workspace = true -spawned-rt.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 50dd6f4e..1263afe1 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -11,9 +11,10 @@ use ethlambda_types::{ primitives::{H256, ssz::TreeHash}, signature::ValidatorSecretKey, }; -use spawned_concurrency::tasks::{ - CallResponse, CastResponse, GenServer, GenServerHandle, send_after, -}; +use spawned_concurrency::actor; +use spawned_concurrency::error::ActorError; +use spawned_concurrency::protocol; +use spawned_concurrency::tasks::{Actor, ActorRef, ActorStart, Context, Handler, send_after}; use tokio::sync::mpsc; use tracing::{error, info, trace, warn}; @@ -38,7 +39,7 @@ pub enum P2PMessage { } pub struct BlockChain { - handle: GenServerHandle, + handle: ActorRef, } /// Milliseconds per interval (800ms ticks). @@ -68,41 +69,35 @@ impl BlockChain { let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) .duration_since(SystemTime::now()) .unwrap_or_default(); - send_after(time_until_genesis, handle.clone(), CastMessage::Tick); + send_after( + time_until_genesis, + handle.context(), + block_chain_protocol::Tick, + ); BlockChain { handle } } /// Sends a block to the BlockChain for processing. - /// - /// Note that this is *NOT* `async`, since the internal [`GenServerHandle::cast`] is non-blocking. - pub async fn notify_new_block(&mut self, block: SignedBlockWithAttestation) { + pub fn notify_new_block(&self, block: SignedBlockWithAttestation) { let _ = self .handle - .cast(CastMessage::NewBlock(block)) - .await + .new_block(block) .inspect_err(|err| error!(%err, "Failed to notify BlockChain of new block")); } /// Sends an attestation to the BlockChain for processing. - /// - /// Note that this is *NOT* `async`, since the internal [`GenServerHandle::cast`] is non-blocking. - pub async fn notify_new_attestation(&mut self, attestation: SignedAttestation) { + pub fn notify_new_attestation(&self, attestation: SignedAttestation) { let _ = self .handle - .cast(CastMessage::NewAttestation(attestation)) - .await + .new_attestation(attestation) .inspect_err(|err| error!(%err, "Failed to notify BlockChain of new attestation")); } /// Sends an aggregated attestation to the BlockChain for processing. - pub async fn notify_new_aggregated_attestation( - &mut self, - attestation: SignedAggregatedAttestation, - ) { + pub fn notify_new_aggregated_attestation(&self, attestation: SignedAggregatedAttestation) { let _ = self .handle - .cast(CastMessage::NewAggregatedAttestation(attestation)) - .await + .new_aggregated_attestation(attestation) .inspect_err( |err| error!(%err, "Failed to notify BlockChain of new aggregated attestation"), ); @@ -489,60 +484,61 @@ impl BlockChainServer { } } -#[derive(Clone, Debug)] -enum CastMessage { - NewBlock(SignedBlockWithAttestation), - NewAttestation(SignedAttestation), - NewAggregatedAttestation(SignedAggregatedAttestation), - Tick, +#[protocol] +#[allow(dead_code)] // tick() is invoked via send_after(Tick), not called directly +pub(crate) trait BlockChainProtocol: Send + Sync { + fn new_block(&self, block: SignedBlockWithAttestation) -> Result<(), ActorError>; + fn new_attestation(&self, attestation: SignedAttestation) -> Result<(), ActorError>; + fn new_aggregated_attestation( + &self, + attestation: SignedAggregatedAttestation, + ) -> Result<(), ActorError>; + fn tick(&self) -> Result<(), ActorError>; } -impl GenServer for BlockChainServer { - type CallMsg = (); - - type CastMsg = CastMessage; - - type OutMsg = (); +#[actor(protocol = BlockChainProtocol)] +impl BlockChainServer { + #[send_handler] + async fn handle_tick(&mut self, _msg: block_chain_protocol::Tick, ctx: &Context) { + let timestamp = SystemTime::UNIX_EPOCH + .elapsed() + .expect("already past the unix epoch"); + self.on_tick(timestamp.as_millis() as u64); + // Schedule the next tick at the next 800ms interval boundary + let ms_since_epoch = timestamp.as_millis() as u64; + let ms_to_next_interval = + MILLISECONDS_PER_INTERVAL - (ms_since_epoch % MILLISECONDS_PER_INTERVAL); + send_after( + Duration::from_millis(ms_to_next_interval), + ctx.clone(), + block_chain_protocol::Tick, + ); + } - type Error = (); + #[send_handler] + async fn handle_new_block( + &mut self, + msg: block_chain_protocol::NewBlock, + _ctx: &Context, + ) { + self.on_block(msg.block); + } - async fn handle_call( + #[send_handler] + async fn handle_new_attestation( &mut self, - _message: Self::CallMsg, - _handle: &GenServerHandle, - ) -> CallResponse { - CallResponse::Unused + msg: block_chain_protocol::NewAttestation, + _ctx: &Context, + ) { + self.on_gossip_attestation(msg.attestation); } - async fn handle_cast( + #[send_handler] + async fn handle_new_aggregated_attestation( &mut self, - message: Self::CastMsg, - handle: &GenServerHandle, - ) -> CastResponse { - match message { - CastMessage::Tick => { - let timestamp = SystemTime::UNIX_EPOCH - .elapsed() - .expect("already past the unix epoch"); - self.on_tick(timestamp.as_millis() as u64); - // Schedule the next tick at the next 800ms interval boundary - let ms_since_epoch = timestamp.as_millis() as u64; - let ms_to_next_interval = - MILLISECONDS_PER_INTERVAL - (ms_since_epoch % MILLISECONDS_PER_INTERVAL); - send_after( - Duration::from_millis(ms_to_next_interval), - handle.clone(), - message, - ); - } - CastMessage::NewBlock(signed_block) => { - self.on_block(signed_block); - } - CastMessage::NewAttestation(attestation) => self.on_gossip_attestation(attestation), - CastMessage::NewAggregatedAttestation(attestation) => { - self.on_gossip_aggregated_attestation(attestation); - } - } - CastResponse::NoReply + msg: block_chain_protocol::NewAggregatedAttestation, + _ctx: &Context, + ) { + self.on_gossip_aggregated_attestation(msg.attestation); } } diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index eab0c6ed..8f156a22 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -49,7 +49,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { attestation_count, "Received block from gossip" ); - server.blockchain.notify_new_block(signed_block).await; + server.blockchain.notify_new_block(signed_block); } Some(AGGREGATION_TOPIC_KIND) => { let Ok(uncompressed_data) = decompress_message(&message.data) @@ -74,8 +74,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { ); server .blockchain - .notify_new_aggregated_attestation(aggregation) - .await; + .notify_new_aggregated_attestation(aggregation); } Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => { let Ok(uncompressed_data) = decompress_message(&message.data) @@ -101,10 +100,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { source_root = %ShortRoot(&signed_attestation.data.source.root.0), "Received attestation from gossip" ); - server - .blockchain - .notify_new_attestation(signed_attestation) - .await; + server.blockchain.notify_new_attestation(signed_attestation); } _ => { trace!("Received message on unknown topic: {}", message.topic); diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index 9e50ea66..e08f4357 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -169,7 +169,7 @@ async fn handle_blocks_by_root_response( // Clean up tracking for this root server.pending_requests.remove(&root); - server.blockchain.notify_new_block(block).await; + server.blockchain.notify_new_block(block); } }