diff --git a/Cargo.toml b/Cargo.toml index 294c861..e31285d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ incremental = false signet-blobber = { version = "0.16.0-rc.7", path = "crates/blobber" } signet-block-processor = { version = "0.16.0-rc.7", path = "crates/block-processor" } signet-genesis = { version = "0.16.0-rc.7", path = "crates/genesis" } +signet-host-rpc = { version = "0.16.0-rc.7", path = "crates/host-rpc" } signet-node = { version = "0.16.0-rc.7", path = "crates/node" } signet-node-config = { version = "0.16.0-rc.7", path = "crates/node-config" } signet-node-tests = { version = "0.16.0-rc.7", path = "crates/node-tests" } diff --git a/crates/host-rpc/Cargo.toml b/crates/host-rpc/Cargo.toml new file mode 100644 index 0000000..5cf20d0 --- /dev/null +++ b/crates/host-rpc/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "signet-host-rpc" +description = "RPC-based implementation of the HostNotifier trait for signet-node." +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] +signet-node-types.workspace = true +signet-extract.workspace = true +signet-types.workspace = true + +alloy.workspace = true +futures-util.workspace = true +thiserror.workspace = true +tracing.workspace = true diff --git a/crates/host-rpc/README.md b/crates/host-rpc/README.md new file mode 100644 index 0000000..7d5a8a8 --- /dev/null +++ b/crates/host-rpc/README.md @@ -0,0 +1,6 @@ +# signet-host-rpc + +RPC-based implementation of the `HostNotifier` trait for signet-node. + +Connects to any Ethereum execution layer client via WebSocket, following +the host chain without embedding a full reth node. diff --git a/crates/host-rpc/src/builder.rs b/crates/host-rpc/src/builder.rs new file mode 100644 index 0000000..237d1b5 --- /dev/null +++ b/crates/host-rpc/src/builder.rs @@ -0,0 +1,79 @@ +use crate::{RpcHostError, RpcHostNotifier}; +use alloy::providers::Provider; +use std::collections::VecDeque; + +/// Default block buffer capacity. +const DEFAULT_BUFFER_CAPACITY: usize = 64; +/// Default backfill batch size. +const DEFAULT_BACKFILL_BATCH_SIZE: u64 = 32; + +/// Builder for [`RpcHostNotifier`]. +/// +/// # Example +/// +/// ```ignore +/// let notifier = RpcHostNotifierBuilder::new(provider) +/// .with_buffer_capacity(128) +/// .with_backfill_batch_size(64) +/// .build() +/// .await?; +/// ``` +#[derive(Debug)] +pub struct RpcHostNotifierBuilder

{ + provider: P, + buffer_capacity: usize, + backfill_batch_size: u64, + genesis_timestamp: u64, +} + +impl

RpcHostNotifierBuilder

+where + P: Provider + Clone, +{ + /// Create a new builder with the given provider. + pub const fn new(provider: P) -> Self { + Self { + provider, + buffer_capacity: DEFAULT_BUFFER_CAPACITY, + backfill_batch_size: DEFAULT_BACKFILL_BATCH_SIZE, + genesis_timestamp: 0, + } + } + + /// Set the block buffer capacity (default: 64). + pub const fn with_buffer_capacity(mut self, capacity: usize) -> Self { + self.buffer_capacity = capacity; + self + } + + /// Set the backfill batch size (default: 32). + pub const fn with_backfill_batch_size(mut self, batch_size: u64) -> Self { + self.backfill_batch_size = batch_size; + self + } + + /// Set the genesis timestamp for epoch calculation. + pub const fn with_genesis_timestamp(mut self, timestamp: u64) -> Self { + self.genesis_timestamp = timestamp; + self + } + + /// Build the notifier, establishing the `newHeads` WebSocket subscription. + pub async fn build(self) -> Result, RpcHostError> { + let sub = self.provider.subscribe_blocks().await?; + let header_sub = sub.into_stream(); + + Ok(RpcHostNotifier { + provider: self.provider, + header_sub, + block_buffer: VecDeque::with_capacity(self.buffer_capacity), + buffer_capacity: self.buffer_capacity, + cached_safe: None, + cached_finalized: None, + last_tag_epoch: None, + backfill_from: None, + backfill_batch_size: self.backfill_batch_size, + genesis_timestamp: self.genesis_timestamp, + }) + } +} diff --git a/crates/host-rpc/src/error.rs b/crates/host-rpc/src/error.rs new file mode 100644 index 0000000..012e69c --- /dev/null +++ b/crates/host-rpc/src/error.rs @@ -0,0 +1,26 @@ +use alloy::transports::{RpcError, TransportErrorKind}; + +/// Errors from the RPC host notifier. +#[derive(Debug, thiserror::Error)] +pub enum RpcHostError { + /// The WebSocket subscription was dropped unexpectedly. + #[error("subscription closed")] + SubscriptionClosed, + + /// An RPC call failed. + #[error("rpc error: {0}")] + Rpc(#[from] RpcError), + + /// The RPC node returned no block for the requested number. + #[error("missing block {0}")] + MissingBlock(u64), + + /// Reorg deeper than the block buffer. + #[error("reorg depth {depth} exceeds buffer capacity {capacity}")] + ReorgTooDeep { + /// The detected reorg depth. + depth: u64, + /// The configured buffer capacity. + capacity: usize, + }, +} diff --git a/crates/host-rpc/src/lib.rs b/crates/host-rpc/src/lib.rs new file mode 100644 index 0000000..dad7dbb --- /dev/null +++ b/crates/host-rpc/src/lib.rs @@ -0,0 +1,24 @@ +#![doc = include_str!("../README.md")] +#![warn( + missing_copy_implementations, + missing_debug_implementations, + missing_docs, + unreachable_pub, + clippy::missing_const_for_fn, + rustdoc::all +)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![deny(unused_must_use, rust_2018_idioms)] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +mod builder; +pub use builder::RpcHostNotifierBuilder; + +mod error; +pub use error::RpcHostError; + +mod notifier; +pub use notifier::RpcHostNotifier; + +mod segment; +pub use segment::{RpcBlock, RpcChainSegment}; diff --git a/crates/host-rpc/src/notifier.rs b/crates/host-rpc/src/notifier.rs new file mode 100644 index 0000000..9f0a603 --- /dev/null +++ b/crates/host-rpc/src/notifier.rs @@ -0,0 +1,409 @@ +use crate::{RpcBlock, RpcChainSegment, RpcHostError}; +use alloy::{ + consensus::{BlockHeader, transaction::Recovered}, + eips::BlockNumberOrTag, + network::BlockResponse, + primitives::{B256, Sealed}, + providers::Provider, + pubsub::SubscriptionStream, + rpc::types::Header as RpcHeader, +}; +use futures_util::StreamExt; +use signet_extract::Extractable; +use signet_node_types::{HostNotification, HostNotificationKind, HostNotifier}; +use signet_types::primitives::{RecoveredBlock, SealedBlock, TransactionSigned}; +use std::{collections::VecDeque, sync::Arc}; +use tracing::debug; + +/// Seconds per Ethereum slot. +const SLOT_SECONDS: u64 = 12; +/// Slots per Ethereum epoch. +const SLOTS_PER_EPOCH: u64 = 32; + +/// RPC-based implementation of [`HostNotifier`]. +/// +/// Follows a host chain via WebSocket `newHeads` subscription, fetching full +/// blocks and receipts on demand. Detects reorgs via a ring buffer of recent +/// block hashes. +/// +/// Generic over `P`: any alloy provider that supports subscriptions. +pub struct RpcHostNotifier

{ + /// The alloy provider. + pub(crate) provider: P, + + /// Subscription stream of new block headers. + pub(crate) header_sub: SubscriptionStream, + + /// Recent blocks for reorg detection and caching. + pub(crate) block_buffer: VecDeque>, + + /// Maximum entries in the block buffer. + pub(crate) buffer_capacity: usize, + + /// Cached safe block number, refreshed at epoch boundaries. + pub(crate) cached_safe: Option, + + /// Cached finalized block number, refreshed at epoch boundaries. + pub(crate) cached_finalized: Option, + + /// Last epoch number for which safe/finalized were fetched. + pub(crate) last_tag_epoch: Option, + + /// If set, backfill from this block number before processing + /// subscription events. + pub(crate) backfill_from: Option, + + /// Max blocks per backfill batch. + pub(crate) backfill_batch_size: u64, + + /// Genesis timestamp, used for epoch calculation. + pub(crate) genesis_timestamp: u64, +} + +impl

core::fmt::Debug for RpcHostNotifier

{ + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("RpcHostNotifier") + .field("buffer_len", &self.block_buffer.len()) + .field("buffer_capacity", &self.buffer_capacity) + .field("backfill_from", &self.backfill_from) + .finish_non_exhaustive() + } +} + +impl

RpcHostNotifier

+where + P: Provider + Clone, +{ + /// Current tip block number from the buffer. + fn tip(&self) -> Option { + self.block_buffer.back().map(|b| b.number()) + } + + /// Look up a block hash in the buffer by block number. + fn buffered_hash(&self, number: u64) -> Option { + self.block_buffer.iter().rev().find(|b| b.number() == number).map(|b| b.hash()) + } + + /// Fetch a single block with its receipts from the provider. + async fn fetch_block(&self, number: u64) -> Result { + let rpc_block = self + .provider + .get_block_by_number(number.into()) + .full() + .await? + .ok_or(RpcHostError::MissingBlock(number))?; + + let rpc_receipts = + self.provider.get_block_receipts(number.into()).await?.unwrap_or_default(); + + // Convert RPC block to our RecoveredBlock type. + let hash = rpc_block.header.hash; + let block = rpc_block + .map_transactions(|tx| { + let recovered = tx.inner; + let signer = recovered.signer(); + let tx: TransactionSigned = recovered.into_inner().into(); + Recovered::new_unchecked(tx, signer) + }) + .into_consensus(); + let sealed_header = Sealed::new_unchecked(block.header, hash); + let block: RecoveredBlock = SealedBlock::new(sealed_header, block.body.transactions); + + // Convert RPC receipts to consensus receipts. + let receipts = + rpc_receipts.into_iter().map(|r| r.inner.into_primitives_receipt()).collect(); + + Ok(RpcBlock { block, receipts }) + } + + /// Fetch a range of blocks concurrently. + async fn fetch_range(&self, from: u64, to: u64) -> Result>, RpcHostError> { + let mut blocks = Vec::with_capacity((to - from + 1) as usize); + // Fetch sequentially for now; can be parallelized later with + // futures::stream::FuturesOrdered. + for number in from..=to { + let block = self.fetch_block(number).await?; + blocks.push(Arc::new(block)); + } + Ok(blocks) + } + + /// Derive the epoch number from a block timestamp. + const fn epoch_of(&self, timestamp: u64) -> u64 { + timestamp.saturating_sub(self.genesis_timestamp) / (SLOT_SECONDS * SLOTS_PER_EPOCH) + } + + /// Refresh safe/finalized block numbers if an epoch boundary was crossed. + async fn maybe_refresh_tags(&mut self, timestamp: u64) -> Result<(), RpcHostError> { + let epoch = self.epoch_of(timestamp); + if self.last_tag_epoch == Some(epoch) { + return Ok(()); + } + + let safe = self + .provider + .get_block_by_number(BlockNumberOrTag::Safe) + .await? + .map(|b| b.header().number()); + let finalized = self + .provider + .get_block_by_number(BlockNumberOrTag::Finalized) + .await? + .map(|b| b.header().number()); + + self.cached_safe = safe; + self.cached_finalized = finalized; + self.last_tag_epoch = Some(epoch); + + debug!(epoch, safe, finalized, "refreshed block tags at epoch boundary"); + Ok(()) + } + + /// Remove invalidated entries from the buffer on reorg, then add new + /// blocks. + fn update_buffer_reorg(&mut self, fork_number: u64, new_blocks: &[Arc]) { + // Remove all blocks at or above the fork point. + while self.block_buffer.back().is_some_and(|b| b.number() >= fork_number) { + self.block_buffer.pop_back(); + } + self.push_to_buffer(new_blocks); + } + + /// Push blocks to the buffer, evicting oldest if over capacity. + fn push_to_buffer(&mut self, blocks: &[Arc]) { + for block in blocks { + self.block_buffer.push_back(Arc::clone(block)); + if self.block_buffer.len() > self.buffer_capacity { + self.block_buffer.pop_front(); + } + } + } + + /// Find the fork point by walking backward through the buffer. + /// + /// Returns the block number where the chain diverged (the first block + /// that differs), or `None` if the fork is deeper than the buffer. + /// + /// **Limitation:** This queries the RPC node for blocks on the new chain. + /// If the node hasn't fully switched to the new chain, it may return + /// stale blocks from the old chain, producing an incorrect fork point. + /// This is an inherent limitation of RPC-based reorg detection. + async fn find_fork_point(&self, new_header: &RpcHeader) -> Result, RpcHostError> { + // Walk the new chain backward from the new header's parent. + let mut check_hash = new_header.parent_hash(); + let mut check_number = new_header.number().saturating_sub(1); + + loop { + match self.buffered_hash(check_number) { + Some(buffered) if buffered == check_hash => { + // Found the common ancestor at check_number. + // The fork point is the next block (first divergence). + return Ok(Some(check_number + 1)); + } + Some(_) => { + // Mismatch — keep walking backward. + if check_number == 0 { + return Ok(None); + } + // Fetch the parent of this block on the new chain. + let parent = self + .provider + .get_block_by_number(check_number.into()) + .await? + .ok_or(RpcHostError::MissingBlock(check_number))?; + check_hash = parent.header().parent_hash(); + check_number = check_number.saturating_sub(1); + } + None => { + // Beyond our buffer — can't determine fork point. + return Ok(None); + } + } + } + } + + /// Process a backfill batch if pending. + /// + /// Returns `Some(notification)` if a batch was emitted, `None` if no + /// backfill is pending. + async fn drain_backfill( + &mut self, + ) -> Option, RpcHostError>> { + let from = self.backfill_from?; + let tip = match self.provider.get_block_number().await { + Ok(n) => n, + Err(e) => return Some(Err(e.into())), + }; + + if from > tip { + self.backfill_from = None; + return None; + } + + let to = tip.min(from + self.backfill_batch_size - 1); + + let blocks = match self.fetch_range(from, to).await { + Ok(b) => b, + Err(e) => return Some(Err(e)), + }; + + self.push_to_buffer(&blocks); + + // Advance or clear backfill. + if to >= tip { + self.backfill_from = None; + } else { + self.backfill_from = Some(to + 1); + } + + // Refresh tags using the last block's timestamp. + if let Some(last) = blocks.last() + && let Err(e) = self.maybe_refresh_tags(last.block.timestamp()).await + { + return Some(Err(e)); + } + + let segment = Arc::new(RpcChainSegment::new(blocks)); + Some(Ok(HostNotification { + kind: HostNotificationKind::ChainCommitted { new: segment }, + safe_block_number: self.cached_safe, + finalized_block_number: self.cached_finalized, + })) + } + + /// Handle a new header from the subscription stream. + async fn handle_new_head( + &mut self, + header: RpcHeader, + ) -> Result, RpcHostError> { + let new_number = header.number(); + let new_parent = header.parent_hash(); + + // Check parent hash continuity. + let is_reorg = self.tip().is_some_and(|tip_num| { + self.buffered_hash(tip_num).is_some_and(|tip_hash| { + // Parent should point to our tip, and the new block + // should be exactly one ahead. + new_parent != tip_hash || new_number != tip_num + 1 + }) + }); + + let kind = if is_reorg { + self.handle_reorg(header).await? + } else { + self.handle_advance(header).await? + }; + + // Refresh block tags. + let timestamp = match &kind { + HostNotificationKind::ChainCommitted { new } => { + new.blocks_and_receipts().last().map(|bar| bar.block.timestamp()).unwrap_or(0) + } + HostNotificationKind::ChainReorged { new, .. } => { + new.blocks_and_receipts().last().map(|bar| bar.block.timestamp()).unwrap_or(0) + } + HostNotificationKind::ChainReverted { .. } => 0, + }; + if timestamp > 0 { + self.maybe_refresh_tags(timestamp).await?; + } + + Ok(HostNotification { + kind, + safe_block_number: self.cached_safe, + finalized_block_number: self.cached_finalized, + }) + } + + /// Handle a normal chain advance (no reorg). + async fn handle_advance( + &mut self, + header: RpcHeader, + ) -> Result, RpcHostError> { + let new_number = header.number(); + let from = self.tip().map_or(new_number, |t| t + 1); + + let blocks = self.fetch_range(from, new_number).await?; + self.push_to_buffer(&blocks); + + Ok(HostNotificationKind::ChainCommitted { new: Arc::new(RpcChainSegment::new(blocks)) }) + } + + /// Handle a reorg: find fork point, emit `ChainReorged`. + async fn handle_reorg( + &mut self, + header: RpcHeader, + ) -> Result, RpcHostError> { + let new_number = header.number(); + + let fork_number = self.find_fork_point(&header).await?.ok_or_else(|| { + let depth = self + .tip() + .unwrap_or(0) + .saturating_sub(self.block_buffer.front().map_or(0, |b| b.number())); + RpcHostError::ReorgTooDeep { depth, capacity: self.buffer_capacity } + })?; + + // Collect reverted blocks from the buffer before removing them. + let old_blocks: Vec> = self + .block_buffer + .iter() + .filter(|b| b.number() >= fork_number) + .map(Arc::clone) + .collect(); + + // Fetch new chain from fork point to new head. + let blocks = self.fetch_range(fork_number, new_number).await?; + self.update_buffer_reorg(fork_number, &blocks); + + Ok(HostNotificationKind::ChainReorged { + old: Arc::new(RpcChainSegment::new(old_blocks)), + new: Arc::new(RpcChainSegment::new(blocks)), + }) + } +} + +/// [`HostNotifier`] implementation for [`RpcHostNotifier`]. +/// +/// Note: this implementation never emits +/// [`HostNotificationKind::ChainReverted`]. The `newHeads` WebSocket +/// subscription only fires when a new block appears — a pure revert +/// (blocks removed without a replacement chain) produces no new header and +/// is therefore invisible to the subscription. Only +/// [`HostNotificationKind::ChainCommitted`] and +/// [`HostNotificationKind::ChainReorged`] are produced. +impl

HostNotifier for RpcHostNotifier

+where + P: Provider + Clone + Send + Sync + 'static, +{ + type Chain = RpcChainSegment; + type Error = RpcHostError; + + async fn next_notification( + &mut self, + ) -> Option, Self::Error>> { + // Drain pending backfill first. + if let Some(result) = self.drain_backfill().await { + return Some(result); + } + + // Await next header from subscription. + let header = self.header_sub.next().await?; + Some(self.handle_new_head(header).await) + } + + fn set_head(&mut self, block_number: u64) { + self.backfill_from = Some(block_number); + } + + fn set_backfill_thresholds(&mut self, max_blocks: Option) { + if let Some(max) = max_blocks { + self.backfill_batch_size = max.max(1); + } + } + + fn send_finished_height(&self, _block_number: u64) -> Result<(), Self::Error> { + // No-op: no ExEx to notify for an RPC follower. + Ok(()) + } +} diff --git a/crates/host-rpc/src/segment.rs b/crates/host-rpc/src/segment.rs new file mode 100644 index 0000000..460a84a --- /dev/null +++ b/crates/host-rpc/src/segment.rs @@ -0,0 +1,74 @@ +use alloy::consensus::{BlockHeader, ReceiptEnvelope}; +use signet_extract::{BlockAndReceipts, Extractable}; +use signet_types::primitives::RecoveredBlock; +use std::sync::Arc; + +/// A block with its receipts, fetched via RPC. +#[derive(Debug)] +pub struct RpcBlock { + /// The recovered block (with senders). + pub(crate) block: RecoveredBlock, + /// The receipts for this block's transactions. + pub(crate) receipts: Vec, +} + +impl RpcBlock { + /// The block number. + pub fn number(&self) -> u64 { + self.block.number() + } + + /// The block hash. + pub const fn hash(&self) -> alloy::primitives::B256 { + self.block.header.hash() + } + + /// The parent block hash. + pub fn parent_hash(&self) -> alloy::primitives::B256 { + self.block.parent_hash() + } +} + +/// A chain segment fetched via RPC. +/// +/// Contains one or more blocks with their receipts, ordered by block number +/// ascending. Blocks are wrapped in [`Arc`] for cheap sharing with the +/// notifier's internal buffer. +#[derive(Debug)] +pub struct RpcChainSegment { + blocks: Vec>, +} + +impl RpcChainSegment { + /// Create a new segment from a list of blocks. + pub const fn new(blocks: Vec>) -> Self { + Self { blocks } + } +} + +impl Extractable for RpcChainSegment { + type Block = RecoveredBlock; + type Receipt = ReceiptEnvelope; + + fn blocks_and_receipts( + &self, + ) -> impl Iterator> { + self.blocks.iter().map(|b| BlockAndReceipts { block: &b.block, receipts: &b.receipts }) + } + + fn first_number(&self) -> Option { + self.blocks.first().map(|b| b.number()) + } + + fn tip_number(&self) -> Option { + self.blocks.last().map(|b| b.number()) + } + + fn len(&self) -> usize { + self.blocks.len() + } + + fn is_empty(&self) -> bool { + self.blocks.is_empty() + } +} diff --git a/crates/node/src/builder.rs b/crates/node/src/builder.rs index 57f6222..3ef5d93 100644 --- a/crates/node/src/builder.rs +++ b/crates/node/src/builder.rs @@ -33,50 +33,18 @@ pub struct NotAStorage; /// - A [`StorageRpcConfig`], via [`Self::with_rpc_config`]. /// - A `reqwest::Client`, via [`Self::with_client`]. /// - If not set, a default client will be created. -/// -/// # Examples -/// -/// ```no_run -/// # use signet_node::builder::SignetNodeBuilder; -/// # fn example( -/// # config: signet_node_config::SignetNodeConfig, -/// # notifier: impl signet_node_types::HostNotifier, -/// # storage: std::sync::Arc>, -/// # alias_oracle: impl signet_block_processor::AliasOracleFactory, -/// # blob_cacher: signet_blobber::CacheHandle, -/// # serve_config: signet_rpc::ServeConfig, -/// # rpc_config: signet_rpc::StorageRpcConfig, -/// # ) { -/// let builder = SignetNodeBuilder::new(config) -/// .with_notifier(notifier) -/// .with_storage(storage) -/// .with_alias_oracle(alias_oracle) -/// .with_blob_cacher(blob_cacher) -/// .with_serve_config(serve_config) -/// .with_rpc_config(rpc_config); -/// # } -/// ``` -pub struct SignetNodeBuilder< - Notifier = (), - Storage = NotAStorage, - Aof = NotAnAof, - Bc = (), - Sc = (), - Rc = (), -> { +pub struct SignetNodeBuilder { config: SignetNodeConfig, alias_oracle: Option, notifier: Option, storage: Option, client: Option, - blob_cacher: Option, - serve_config: Option, - rpc_config: Option, + blob_cacher: Option, + serve_config: Option, + rpc_config: Option, } -impl core::fmt::Debug - for SignetNodeBuilder -{ +impl core::fmt::Debug for SignetNodeBuilder { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("SignetNodeBuilder").finish_non_exhaustive() } @@ -98,12 +66,12 @@ impl SignetNodeBuilder { } } -impl SignetNodeBuilder { +impl SignetNodeBuilder { /// Set the [`UnifiedStorage`] backend for the signet node. pub fn with_storage( self, storage: Arc>, - ) -> SignetNodeBuilder>, Aof, Bc, Sc, Rc> { + ) -> SignetNodeBuilder>, Aof> { SignetNodeBuilder { config: self.config, alias_oracle: self.alias_oracle, @@ -117,10 +85,7 @@ impl SignetNodeBuilder( - self, - notifier: N, - ) -> SignetNodeBuilder { + pub fn with_notifier(self, notifier: N) -> SignetNodeBuilder { SignetNodeBuilder { config: self.config, alias_oracle: self.alias_oracle, @@ -137,7 +102,7 @@ impl SignetNodeBuilder( self, alias_oracle: NewAof, - ) -> SignetNodeBuilder { + ) -> SignetNodeBuilder { SignetNodeBuilder { config: self.config, alias_oracle: Some(alias_oracle), @@ -157,59 +122,25 @@ impl SignetNodeBuilder SignetNodeBuilder { - SignetNodeBuilder { - config: self.config, - alias_oracle: self.alias_oracle, - notifier: self.notifier, - storage: self.storage, - client: self.client, - blob_cacher: Some(blob_cacher), - serve_config: self.serve_config, - rpc_config: self.rpc_config, - } + pub fn with_blob_cacher(mut self, blob_cacher: CacheHandle) -> Self { + self.blob_cacher = Some(blob_cacher); + self } /// Set the RPC transport configuration. - pub fn with_serve_config( - self, - serve_config: ServeConfig, - ) -> SignetNodeBuilder { - SignetNodeBuilder { - config: self.config, - alias_oracle: self.alias_oracle, - notifier: self.notifier, - storage: self.storage, - client: self.client, - blob_cacher: self.blob_cacher, - serve_config: Some(serve_config), - rpc_config: self.rpc_config, - } + pub fn with_serve_config(mut self, serve_config: ServeConfig) -> Self { + self.serve_config = Some(serve_config); + self } /// Set the RPC behaviour configuration. - pub fn with_rpc_config( - self, - rpc_config: StorageRpcConfig, - ) -> SignetNodeBuilder { - SignetNodeBuilder { - config: self.config, - alias_oracle: self.alias_oracle, - notifier: self.notifier, - storage: self.storage, - client: self.client, - blob_cacher: self.blob_cacher, - serve_config: self.serve_config, - rpc_config: Some(rpc_config), - } + pub const fn with_rpc_config(mut self, rpc_config: StorageRpcConfig) -> Self { + self.rpc_config = Some(rpc_config); + self } } -impl - SignetNodeBuilder>, Aof, CacheHandle, ServeConfig, StorageRpcConfig> +impl SignetNodeBuilder>, Aof> where N: HostNotifier, H: HotKv + Clone + Send + Sync + 'static, @@ -268,9 +199,9 @@ where self.storage.unwrap(), self.alias_oracle.unwrap(), self.client.unwrap(), - self.blob_cacher.unwrap(), - self.serve_config.unwrap(), - self.rpc_config.unwrap(), + self.blob_cacher.ok_or_eyre("Blob cacher must be set")?, + self.serve_config.ok_or_eyre("Serve config must be set")?, + self.rpc_config.ok_or_eyre("RPC config must be set")?, ) } } diff --git a/crates/node/src/metrics.rs b/crates/node/src/metrics.rs index 1c93380..f43d25f 100644 --- a/crates/node/src/metrics.rs +++ b/crates/node/src/metrics.rs @@ -68,10 +68,14 @@ fn inc_reorgs_processed() { pub(crate) fn record_notification_received(notification: &HostNotification) { inc_notifications_received(); - notification.reverted_chain().inspect(|_| inc_reorgs_received()); + if notification.reverted_chain().is_some() { + inc_reorgs_received(); + } } pub(crate) fn record_notification_processed(notification: &HostNotification) { inc_notifications_processed(); - notification.reverted_chain().inspect(|_| inc_reorgs_processed()); + if notification.reverted_chain().is_some() { + inc_reorgs_processed(); + } } diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 29330b8..646b313 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -225,7 +225,7 @@ where reverted = notification.reverted_chain().map(|c| c.len()).unwrap_or_default(), committed = notification.committed_chain().map(|c| c.len()).unwrap_or_default(), ))] - pub(crate) async fn on_notification( + pub async fn on_notification( &self, notification: &HostNotification, ) -> eyre::Result { @@ -389,7 +389,7 @@ where first = chain.first_number().unwrap_or(0), tip = chain.tip_number().unwrap_or(0), ))] - pub(crate) async fn on_host_revert(&self, chain: &Arc) -> eyre::Result { + pub async fn on_host_revert(&self, chain: &Arc) -> eyre::Result { // NB: `unwrap_or(0)` is safe here because a non-empty chain always // has a first/tip block. If an invariant violation causes `None`, // the `0` fallback results in `drain_above(0)` which drains all diff --git a/crates/node/src/rpc.rs b/crates/node/src/rpc.rs index 45a5688..e143f97 100644 --- a/crates/node/src/rpc.rs +++ b/crates/node/src/rpc.rs @@ -15,7 +15,7 @@ where AliasOracle: AliasOracleFactory, { /// Start the RPC server. - pub(crate) async fn start_rpc(&mut self) -> eyre::Result<()> { + pub async fn start_rpc(&mut self) -> eyre::Result<()> { let guard = self.launch_rpc().await?; self.rpc_handle = Some(guard); info!("launched rpc server");