Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 101 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ serde_json = "1.0.117"
serde_yaml_ng = "0.10"
hex = "0.4"

spawned-concurrency = "0.4"
spawned-rt = "0.4"
spawned-concurrency = { git = "https://github.com/lambdaclass/spawned.git", tag = "v0.5.0-rc" }
spawned-rt = { git = "https://github.com/lambdaclass/spawned.git", tag = "v0.5.0-rc" }
tokio = "1.0"

prometheus = "0.14"
Expand Down
2 changes: 0 additions & 2 deletions bin/ethlambda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ ethlambda-types.workspace = true
ethlambda-rpc.workspace = true
ethlambda-storage.workspace = true

spawned-concurrency.workspace = true
spawned-rt.workspace = true
tokio.workspace = true

tracing.workspace = true
Expand Down
132 changes: 64 additions & 68 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use ethlambda_types::{
primitives::{H256, ssz::TreeHash},
signature::ValidatorSecretKey,
};
use spawned_concurrency::tasks::{
CallResponse, CastResponse, GenServer, GenServerHandle, send_after,
};
use spawned_concurrency::actor;
use spawned_concurrency::error::ActorError;
use spawned_concurrency::protocol;
use spawned_concurrency::tasks::{Actor, ActorRef, ActorStart, Context, Handler, send_after};
use tokio::sync::mpsc;
use tracing::{error, info, trace, warn};

Expand All @@ -38,7 +39,7 @@ pub enum P2PMessage {
}

pub struct BlockChain {
handle: GenServerHandle<BlockChainServer>,
handle: ActorRef<BlockChainServer>,
}

/// Milliseconds per interval (800ms ticks).
Expand Down Expand Up @@ -68,41 +69,35 @@ impl BlockChain {
let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time))
.duration_since(SystemTime::now())
.unwrap_or_default();
send_after(time_until_genesis, handle.clone(), CastMessage::Tick);
send_after(
time_until_genesis,
handle.context(),
block_chain_protocol::Tick,
);
BlockChain { handle }
}

/// Sends a block to the BlockChain for processing.
///
/// Note that this is *NOT* `async`, since the internal [`GenServerHandle::cast`] is non-blocking.
pub async fn notify_new_block(&mut self, block: SignedBlockWithAttestation) {
pub fn notify_new_block(&self, block: SignedBlockWithAttestation) {
let _ = self
.handle
.cast(CastMessage::NewBlock(block))
.await
.new_block(block)
.inspect_err(|err| error!(%err, "Failed to notify BlockChain of new block"));
}

/// Sends an attestation to the BlockChain for processing.
///
/// Note that this is *NOT* `async`, since the internal [`GenServerHandle::cast`] is non-blocking.
pub async fn notify_new_attestation(&mut self, attestation: SignedAttestation) {
pub fn notify_new_attestation(&self, attestation: SignedAttestation) {
let _ = self
.handle
.cast(CastMessage::NewAttestation(attestation))
.await
.new_attestation(attestation)
.inspect_err(|err| error!(%err, "Failed to notify BlockChain of new attestation"));
}

/// Sends an aggregated attestation to the BlockChain for processing.
pub async fn notify_new_aggregated_attestation(
&mut self,
attestation: SignedAggregatedAttestation,
) {
pub fn notify_new_aggregated_attestation(&self, attestation: SignedAggregatedAttestation) {
let _ = self
.handle
.cast(CastMessage::NewAggregatedAttestation(attestation))
.await
.new_aggregated_attestation(attestation)
.inspect_err(
|err| error!(%err, "Failed to notify BlockChain of new aggregated attestation"),
);
Expand Down Expand Up @@ -489,60 +484,61 @@ impl BlockChainServer {
}
}

#[derive(Clone, Debug)]
enum CastMessage {
NewBlock(SignedBlockWithAttestation),
NewAttestation(SignedAttestation),
NewAggregatedAttestation(SignedAggregatedAttestation),
Tick,
#[protocol]
#[allow(dead_code)] // tick() is invoked via send_after(Tick), not called directly
pub(crate) trait BlockChainProtocol: Send + Sync {
fn new_block(&self, block: SignedBlockWithAttestation) -> Result<(), ActorError>;
fn new_attestation(&self, attestation: SignedAttestation) -> Result<(), ActorError>;
fn new_aggregated_attestation(
&self,
attestation: SignedAggregatedAttestation,
) -> Result<(), ActorError>;
fn tick(&self) -> Result<(), ActorError>;
}

impl GenServer for BlockChainServer {
type CallMsg = ();

type CastMsg = CastMessage;

type OutMsg = ();
#[actor(protocol = BlockChainProtocol)]
impl BlockChainServer {
#[send_handler]
async fn handle_tick(&mut self, _msg: block_chain_protocol::Tick, ctx: &Context<Self>) {
let timestamp = SystemTime::UNIX_EPOCH
.elapsed()
.expect("already past the unix epoch");
self.on_tick(timestamp.as_millis() as u64);
// Schedule the next tick at the next 800ms interval boundary
let ms_since_epoch = timestamp.as_millis() as u64;
let ms_to_next_interval =
MILLISECONDS_PER_INTERVAL - (ms_since_epoch % MILLISECONDS_PER_INTERVAL);
send_after(
Duration::from_millis(ms_to_next_interval),
ctx.clone(),
block_chain_protocol::Tick,
);
}

type Error = ();
#[send_handler]
async fn handle_new_block(
&mut self,
msg: block_chain_protocol::NewBlock,
_ctx: &Context<Self>,
) {
self.on_block(msg.block);
}

async fn handle_call(
#[send_handler]
async fn handle_new_attestation(
&mut self,
_message: Self::CallMsg,
_handle: &GenServerHandle<Self>,
) -> CallResponse<Self> {
CallResponse::Unused
msg: block_chain_protocol::NewAttestation,
_ctx: &Context<Self>,
) {
self.on_gossip_attestation(msg.attestation);
}

async fn handle_cast(
#[send_handler]
async fn handle_new_aggregated_attestation(
&mut self,
message: Self::CastMsg,
handle: &GenServerHandle<Self>,
) -> CastResponse {
match message {
CastMessage::Tick => {
let timestamp = SystemTime::UNIX_EPOCH
.elapsed()
.expect("already past the unix epoch");
self.on_tick(timestamp.as_millis() as u64);
// Schedule the next tick at the next 800ms interval boundary
let ms_since_epoch = timestamp.as_millis() as u64;
let ms_to_next_interval =
MILLISECONDS_PER_INTERVAL - (ms_since_epoch % MILLISECONDS_PER_INTERVAL);
send_after(
Duration::from_millis(ms_to_next_interval),
handle.clone(),
message,
);
}
CastMessage::NewBlock(signed_block) => {
self.on_block(signed_block);
}
CastMessage::NewAttestation(attestation) => self.on_gossip_attestation(attestation),
CastMessage::NewAggregatedAttestation(attestation) => {
self.on_gossip_aggregated_attestation(attestation);
}
}
CastResponse::NoReply
msg: block_chain_protocol::NewAggregatedAttestation,
_ctx: &Context<Self>,
) {
self.on_gossip_aggregated_attestation(msg.attestation);
}
}
Loading