diff --git a/Cargo.toml b/Cargo.toml
index 294c861..e31285d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -37,6 +37,7 @@ incremental = false
signet-blobber = { version = "0.16.0-rc.7", path = "crates/blobber" }
signet-block-processor = { version = "0.16.0-rc.7", path = "crates/block-processor" }
signet-genesis = { version = "0.16.0-rc.7", path = "crates/genesis" }
+signet-host-rpc = { version = "0.16.0-rc.7", path = "crates/host-rpc" }
signet-node = { version = "0.16.0-rc.7", path = "crates/node" }
signet-node-config = { version = "0.16.0-rc.7", path = "crates/node-config" }
signet-node-tests = { version = "0.16.0-rc.7", path = "crates/node-tests" }
diff --git a/crates/host-rpc/Cargo.toml b/crates/host-rpc/Cargo.toml
new file mode 100644
index 0000000..5cf20d0
--- /dev/null
+++ b/crates/host-rpc/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "signet-host-rpc"
+description = "RPC-based implementation of the HostNotifier trait for signet-node."
+version.workspace = true
+edition.workspace = true
+rust-version.workspace = true
+authors.workspace = true
+license.workspace = true
+homepage.workspace = true
+repository.workspace = true
+
+[dependencies]
+signet-node-types.workspace = true
+signet-extract.workspace = true
+signet-types.workspace = true
+
+alloy.workspace = true
+futures-util.workspace = true
+thiserror.workspace = true
+tracing.workspace = true
diff --git a/crates/host-rpc/README.md b/crates/host-rpc/README.md
new file mode 100644
index 0000000..7d5a8a8
--- /dev/null
+++ b/crates/host-rpc/README.md
@@ -0,0 +1,6 @@
+# signet-host-rpc
+
+RPC-based implementation of the `HostNotifier` trait for signet-node.
+
+Connects to any Ethereum execution layer client via WebSocket, following
+the host chain without embedding a full reth node.
diff --git a/crates/host-rpc/src/builder.rs b/crates/host-rpc/src/builder.rs
new file mode 100644
index 0000000..237d1b5
--- /dev/null
+++ b/crates/host-rpc/src/builder.rs
@@ -0,0 +1,79 @@
+use crate::{RpcHostError, RpcHostNotifier};
+use alloy::providers::Provider;
+use std::collections::VecDeque;
+
+/// Default block buffer capacity.
+const DEFAULT_BUFFER_CAPACITY: usize = 64;
+/// Default backfill batch size.
+const DEFAULT_BACKFILL_BATCH_SIZE: u64 = 32;
+
+/// Builder for [`RpcHostNotifier`].
+///
+/// # Example
+///
+/// ```ignore
+/// let notifier = RpcHostNotifierBuilder::new(provider)
+/// .with_buffer_capacity(128)
+/// .with_backfill_batch_size(64)
+/// .build()
+/// .await?;
+/// ```
+#[derive(Debug)]
+pub struct RpcHostNotifierBuilder
{
+ provider: P,
+ buffer_capacity: usize,
+ backfill_batch_size: u64,
+ genesis_timestamp: u64,
+}
+
+impl
RpcHostNotifierBuilder
+where
+ P: Provider + Clone,
+{
+ /// Create a new builder with the given provider.
+ pub const fn new(provider: P) -> Self {
+ Self {
+ provider,
+ buffer_capacity: DEFAULT_BUFFER_CAPACITY,
+ backfill_batch_size: DEFAULT_BACKFILL_BATCH_SIZE,
+ genesis_timestamp: 0,
+ }
+ }
+
+ /// Set the block buffer capacity (default: 64).
+ pub const fn with_buffer_capacity(mut self, capacity: usize) -> Self {
+ self.buffer_capacity = capacity;
+ self
+ }
+
+ /// Set the backfill batch size (default: 32).
+ pub const fn with_backfill_batch_size(mut self, batch_size: u64) -> Self {
+ self.backfill_batch_size = batch_size;
+ self
+ }
+
+ /// Set the genesis timestamp for epoch calculation.
+ pub const fn with_genesis_timestamp(mut self, timestamp: u64) -> Self {
+ self.genesis_timestamp = timestamp;
+ self
+ }
+
+ /// Build the notifier, establishing the `newHeads` WebSocket subscription.
+ pub async fn build(self) -> Result, RpcHostError> {
+ let sub = self.provider.subscribe_blocks().await?;
+ let header_sub = sub.into_stream();
+
+ Ok(RpcHostNotifier {
+ provider: self.provider,
+ header_sub,
+ block_buffer: VecDeque::with_capacity(self.buffer_capacity),
+ buffer_capacity: self.buffer_capacity,
+ cached_safe: None,
+ cached_finalized: None,
+ last_tag_epoch: None,
+ backfill_from: None,
+ backfill_batch_size: self.backfill_batch_size,
+ genesis_timestamp: self.genesis_timestamp,
+ })
+ }
+}
diff --git a/crates/host-rpc/src/error.rs b/crates/host-rpc/src/error.rs
new file mode 100644
index 0000000..012e69c
--- /dev/null
+++ b/crates/host-rpc/src/error.rs
@@ -0,0 +1,26 @@
+use alloy::transports::{RpcError, TransportErrorKind};
+
+/// Errors from the RPC host notifier.
+#[derive(Debug, thiserror::Error)]
+pub enum RpcHostError {
+ /// The WebSocket subscription was dropped unexpectedly.
+ #[error("subscription closed")]
+ SubscriptionClosed,
+
+ /// An RPC call failed.
+ #[error("rpc error: {0}")]
+ Rpc(#[from] RpcError),
+
+ /// The RPC node returned no block for the requested number.
+ #[error("missing block {0}")]
+ MissingBlock(u64),
+
+ /// Reorg deeper than the block buffer.
+ #[error("reorg depth {depth} exceeds buffer capacity {capacity}")]
+ ReorgTooDeep {
+ /// The detected reorg depth.
+ depth: u64,
+ /// The configured buffer capacity.
+ capacity: usize,
+ },
+}
diff --git a/crates/host-rpc/src/lib.rs b/crates/host-rpc/src/lib.rs
new file mode 100644
index 0000000..dad7dbb
--- /dev/null
+++ b/crates/host-rpc/src/lib.rs
@@ -0,0 +1,24 @@
+#![doc = include_str!("../README.md")]
+#![warn(
+ missing_copy_implementations,
+ missing_debug_implementations,
+ missing_docs,
+ unreachable_pub,
+ clippy::missing_const_for_fn,
+ rustdoc::all
+)]
+#![cfg_attr(not(test), warn(unused_crate_dependencies))]
+#![deny(unused_must_use, rust_2018_idioms)]
+#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
+
+mod builder;
+pub use builder::RpcHostNotifierBuilder;
+
+mod error;
+pub use error::RpcHostError;
+
+mod notifier;
+pub use notifier::RpcHostNotifier;
+
+mod segment;
+pub use segment::{RpcBlock, RpcChainSegment};
diff --git a/crates/host-rpc/src/notifier.rs b/crates/host-rpc/src/notifier.rs
new file mode 100644
index 0000000..9f0a603
--- /dev/null
+++ b/crates/host-rpc/src/notifier.rs
@@ -0,0 +1,409 @@
+use crate::{RpcBlock, RpcChainSegment, RpcHostError};
+use alloy::{
+ consensus::{BlockHeader, transaction::Recovered},
+ eips::BlockNumberOrTag,
+ network::BlockResponse,
+ primitives::{B256, Sealed},
+ providers::Provider,
+ pubsub::SubscriptionStream,
+ rpc::types::Header as RpcHeader,
+};
+use futures_util::StreamExt;
+use signet_extract::Extractable;
+use signet_node_types::{HostNotification, HostNotificationKind, HostNotifier};
+use signet_types::primitives::{RecoveredBlock, SealedBlock, TransactionSigned};
+use std::{collections::VecDeque, sync::Arc};
+use tracing::debug;
+
+/// Seconds per Ethereum slot.
+const SLOT_SECONDS: u64 = 12;
+/// Slots per Ethereum epoch.
+const SLOTS_PER_EPOCH: u64 = 32;
+
+/// RPC-based implementation of [`HostNotifier`].
+///
+/// Follows a host chain via WebSocket `newHeads` subscription, fetching full
+/// blocks and receipts on demand. Detects reorgs via a ring buffer of recent
+/// block hashes.
+///
+/// Generic over `P`: any alloy provider that supports subscriptions.
+pub struct RpcHostNotifier {
+ /// The alloy provider.
+ pub(crate) provider: P,
+
+ /// Subscription stream of new block headers.
+ pub(crate) header_sub: SubscriptionStream,
+
+ /// Recent blocks for reorg detection and caching.
+ pub(crate) block_buffer: VecDeque>,
+
+ /// Maximum entries in the block buffer.
+ pub(crate) buffer_capacity: usize,
+
+ /// Cached safe block number, refreshed at epoch boundaries.
+ pub(crate) cached_safe: Option,
+
+ /// Cached finalized block number, refreshed at epoch boundaries.
+ pub(crate) cached_finalized: Option,
+
+ /// Last epoch number for which safe/finalized were fetched.
+ pub(crate) last_tag_epoch: Option,
+
+ /// If set, backfill from this block number before processing
+ /// subscription events.
+ pub(crate) backfill_from: Option,
+
+ /// Max blocks per backfill batch.
+ pub(crate) backfill_batch_size: u64,
+
+ /// Genesis timestamp, used for epoch calculation.
+ pub(crate) genesis_timestamp: u64,
+}
+
+impl core::fmt::Debug for RpcHostNotifier
{
+ fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
+ f.debug_struct("RpcHostNotifier")
+ .field("buffer_len", &self.block_buffer.len())
+ .field("buffer_capacity", &self.buffer_capacity)
+ .field("backfill_from", &self.backfill_from)
+ .finish_non_exhaustive()
+ }
+}
+
+impl
RpcHostNotifier
+where
+ P: Provider + Clone,
+{
+ /// Current tip block number from the buffer.
+ fn tip(&self) -> Option {
+ self.block_buffer.back().map(|b| b.number())
+ }
+
+ /// Look up a block hash in the buffer by block number.
+ fn buffered_hash(&self, number: u64) -> Option {
+ self.block_buffer.iter().rev().find(|b| b.number() == number).map(|b| b.hash())
+ }
+
+ /// Fetch a single block with its receipts from the provider.
+ async fn fetch_block(&self, number: u64) -> Result {
+ let rpc_block = self
+ .provider
+ .get_block_by_number(number.into())
+ .full()
+ .await?
+ .ok_or(RpcHostError::MissingBlock(number))?;
+
+ let rpc_receipts =
+ self.provider.get_block_receipts(number.into()).await?.unwrap_or_default();
+
+ // Convert RPC block to our RecoveredBlock type.
+ let hash = rpc_block.header.hash;
+ let block = rpc_block
+ .map_transactions(|tx| {
+ let recovered = tx.inner;
+ let signer = recovered.signer();
+ let tx: TransactionSigned = recovered.into_inner().into();
+ Recovered::new_unchecked(tx, signer)
+ })
+ .into_consensus();
+ let sealed_header = Sealed::new_unchecked(block.header, hash);
+ let block: RecoveredBlock = SealedBlock::new(sealed_header, block.body.transactions);
+
+ // Convert RPC receipts to consensus receipts.
+ let receipts =
+ rpc_receipts.into_iter().map(|r| r.inner.into_primitives_receipt()).collect();
+
+ Ok(RpcBlock { block, receipts })
+ }
+
+ /// Fetch a range of blocks concurrently.
+ async fn fetch_range(&self, from: u64, to: u64) -> Result>, RpcHostError> {
+ let mut blocks = Vec::with_capacity((to - from + 1) as usize);
+ // Fetch sequentially for now; can be parallelized later with
+ // futures::stream::FuturesOrdered.
+ for number in from..=to {
+ let block = self.fetch_block(number).await?;
+ blocks.push(Arc::new(block));
+ }
+ Ok(blocks)
+ }
+
+ /// Derive the epoch number from a block timestamp.
+ const fn epoch_of(&self, timestamp: u64) -> u64 {
+ timestamp.saturating_sub(self.genesis_timestamp) / (SLOT_SECONDS * SLOTS_PER_EPOCH)
+ }
+
+ /// Refresh safe/finalized block numbers if an epoch boundary was crossed.
+ async fn maybe_refresh_tags(&mut self, timestamp: u64) -> Result<(), RpcHostError> {
+ let epoch = self.epoch_of(timestamp);
+ if self.last_tag_epoch == Some(epoch) {
+ return Ok(());
+ }
+
+ let safe = self
+ .provider
+ .get_block_by_number(BlockNumberOrTag::Safe)
+ .await?
+ .map(|b| b.header().number());
+ let finalized = self
+ .provider
+ .get_block_by_number(BlockNumberOrTag::Finalized)
+ .await?
+ .map(|b| b.header().number());
+
+ self.cached_safe = safe;
+ self.cached_finalized = finalized;
+ self.last_tag_epoch = Some(epoch);
+
+ debug!(epoch, safe, finalized, "refreshed block tags at epoch boundary");
+ Ok(())
+ }
+
+ /// Remove invalidated entries from the buffer on reorg, then add new
+ /// blocks.
+ fn update_buffer_reorg(&mut self, fork_number: u64, new_blocks: &[Arc]) {
+ // Remove all blocks at or above the fork point.
+ while self.block_buffer.back().is_some_and(|b| b.number() >= fork_number) {
+ self.block_buffer.pop_back();
+ }
+ self.push_to_buffer(new_blocks);
+ }
+
+ /// Push blocks to the buffer, evicting oldest if over capacity.
+ fn push_to_buffer(&mut self, blocks: &[Arc]) {
+ for block in blocks {
+ self.block_buffer.push_back(Arc::clone(block));
+ if self.block_buffer.len() > self.buffer_capacity {
+ self.block_buffer.pop_front();
+ }
+ }
+ }
+
+ /// Find the fork point by walking backward through the buffer.
+ ///
+ /// Returns the block number where the chain diverged (the first block
+ /// that differs), or `None` if the fork is deeper than the buffer.
+ ///
+ /// **Limitation:** This queries the RPC node for blocks on the new chain.
+ /// If the node hasn't fully switched to the new chain, it may return
+ /// stale blocks from the old chain, producing an incorrect fork point.
+ /// This is an inherent limitation of RPC-based reorg detection.
+ async fn find_fork_point(&self, new_header: &RpcHeader) -> Result, RpcHostError> {
+ // Walk the new chain backward from the new header's parent.
+ let mut check_hash = new_header.parent_hash();
+ let mut check_number = new_header.number().saturating_sub(1);
+
+ loop {
+ match self.buffered_hash(check_number) {
+ Some(buffered) if buffered == check_hash => {
+ // Found the common ancestor at check_number.
+ // The fork point is the next block (first divergence).
+ return Ok(Some(check_number + 1));
+ }
+ Some(_) => {
+ // Mismatch — keep walking backward.
+ if check_number == 0 {
+ return Ok(None);
+ }
+ // Fetch the parent of this block on the new chain.
+ let parent = self
+ .provider
+ .get_block_by_number(check_number.into())
+ .await?
+ .ok_or(RpcHostError::MissingBlock(check_number))?;
+ check_hash = parent.header().parent_hash();
+ check_number = check_number.saturating_sub(1);
+ }
+ None => {
+ // Beyond our buffer — can't determine fork point.
+ return Ok(None);
+ }
+ }
+ }
+ }
+
+ /// Process a backfill batch if pending.
+ ///
+ /// Returns `Some(notification)` if a batch was emitted, `None` if no
+ /// backfill is pending.
+ async fn drain_backfill(
+ &mut self,
+ ) -> Option, RpcHostError>> {
+ let from = self.backfill_from?;
+ let tip = match self.provider.get_block_number().await {
+ Ok(n) => n,
+ Err(e) => return Some(Err(e.into())),
+ };
+
+ if from > tip {
+ self.backfill_from = None;
+ return None;
+ }
+
+ let to = tip.min(from + self.backfill_batch_size - 1);
+
+ let blocks = match self.fetch_range(from, to).await {
+ Ok(b) => b,
+ Err(e) => return Some(Err(e)),
+ };
+
+ self.push_to_buffer(&blocks);
+
+ // Advance or clear backfill.
+ if to >= tip {
+ self.backfill_from = None;
+ } else {
+ self.backfill_from = Some(to + 1);
+ }
+
+ // Refresh tags using the last block's timestamp.
+ if let Some(last) = blocks.last()
+ && let Err(e) = self.maybe_refresh_tags(last.block.timestamp()).await
+ {
+ return Some(Err(e));
+ }
+
+ let segment = Arc::new(RpcChainSegment::new(blocks));
+ Some(Ok(HostNotification {
+ kind: HostNotificationKind::ChainCommitted { new: segment },
+ safe_block_number: self.cached_safe,
+ finalized_block_number: self.cached_finalized,
+ }))
+ }
+
+ /// Handle a new header from the subscription stream.
+ async fn handle_new_head(
+ &mut self,
+ header: RpcHeader,
+ ) -> Result, RpcHostError> {
+ let new_number = header.number();
+ let new_parent = header.parent_hash();
+
+ // Check parent hash continuity.
+ let is_reorg = self.tip().is_some_and(|tip_num| {
+ self.buffered_hash(tip_num).is_some_and(|tip_hash| {
+ // Parent should point to our tip, and the new block
+ // should be exactly one ahead.
+ new_parent != tip_hash || new_number != tip_num + 1
+ })
+ });
+
+ let kind = if is_reorg {
+ self.handle_reorg(header).await?
+ } else {
+ self.handle_advance(header).await?
+ };
+
+ // Refresh block tags.
+ let timestamp = match &kind {
+ HostNotificationKind::ChainCommitted { new } => {
+ new.blocks_and_receipts().last().map(|bar| bar.block.timestamp()).unwrap_or(0)
+ }
+ HostNotificationKind::ChainReorged { new, .. } => {
+ new.blocks_and_receipts().last().map(|bar| bar.block.timestamp()).unwrap_or(0)
+ }
+ HostNotificationKind::ChainReverted { .. } => 0,
+ };
+ if timestamp > 0 {
+ self.maybe_refresh_tags(timestamp).await?;
+ }
+
+ Ok(HostNotification {
+ kind,
+ safe_block_number: self.cached_safe,
+ finalized_block_number: self.cached_finalized,
+ })
+ }
+
+ /// Handle a normal chain advance (no reorg).
+ async fn handle_advance(
+ &mut self,
+ header: RpcHeader,
+ ) -> Result, RpcHostError> {
+ let new_number = header.number();
+ let from = self.tip().map_or(new_number, |t| t + 1);
+
+ let blocks = self.fetch_range(from, new_number).await?;
+ self.push_to_buffer(&blocks);
+
+ Ok(HostNotificationKind::ChainCommitted { new: Arc::new(RpcChainSegment::new(blocks)) })
+ }
+
+ /// Handle a reorg: find fork point, emit `ChainReorged`.
+ async fn handle_reorg(
+ &mut self,
+ header: RpcHeader,
+ ) -> Result, RpcHostError> {
+ let new_number = header.number();
+
+ let fork_number = self.find_fork_point(&header).await?.ok_or_else(|| {
+ let depth = self
+ .tip()
+ .unwrap_or(0)
+ .saturating_sub(self.block_buffer.front().map_or(0, |b| b.number()));
+ RpcHostError::ReorgTooDeep { depth, capacity: self.buffer_capacity }
+ })?;
+
+ // Collect reverted blocks from the buffer before removing them.
+ let old_blocks: Vec> = self
+ .block_buffer
+ .iter()
+ .filter(|b| b.number() >= fork_number)
+ .map(Arc::clone)
+ .collect();
+
+ // Fetch new chain from fork point to new head.
+ let blocks = self.fetch_range(fork_number, new_number).await?;
+ self.update_buffer_reorg(fork_number, &blocks);
+
+ Ok(HostNotificationKind::ChainReorged {
+ old: Arc::new(RpcChainSegment::new(old_blocks)),
+ new: Arc::new(RpcChainSegment::new(blocks)),
+ })
+ }
+}
+
+/// [`HostNotifier`] implementation for [`RpcHostNotifier`].
+///
+/// Note: this implementation never emits
+/// [`HostNotificationKind::ChainReverted`]. The `newHeads` WebSocket
+/// subscription only fires when a new block appears — a pure revert
+/// (blocks removed without a replacement chain) produces no new header and
+/// is therefore invisible to the subscription. Only
+/// [`HostNotificationKind::ChainCommitted`] and
+/// [`HostNotificationKind::ChainReorged`] are produced.
+impl HostNotifier for RpcHostNotifier
+where
+ P: Provider + Clone + Send + Sync + 'static,
+{
+ type Chain = RpcChainSegment;
+ type Error = RpcHostError;
+
+ async fn next_notification(
+ &mut self,
+ ) -> Option, Self::Error>> {
+ // Drain pending backfill first.
+ if let Some(result) = self.drain_backfill().await {
+ return Some(result);
+ }
+
+ // Await next header from subscription.
+ let header = self.header_sub.next().await?;
+ Some(self.handle_new_head(header).await)
+ }
+
+ fn set_head(&mut self, block_number: u64) {
+ self.backfill_from = Some(block_number);
+ }
+
+ fn set_backfill_thresholds(&mut self, max_blocks: Option) {
+ if let Some(max) = max_blocks {
+ self.backfill_batch_size = max.max(1);
+ }
+ }
+
+ fn send_finished_height(&self, _block_number: u64) -> Result<(), Self::Error> {
+ // No-op: no ExEx to notify for an RPC follower.
+ Ok(())
+ }
+}
diff --git a/crates/host-rpc/src/segment.rs b/crates/host-rpc/src/segment.rs
new file mode 100644
index 0000000..460a84a
--- /dev/null
+++ b/crates/host-rpc/src/segment.rs
@@ -0,0 +1,74 @@
+use alloy::consensus::{BlockHeader, ReceiptEnvelope};
+use signet_extract::{BlockAndReceipts, Extractable};
+use signet_types::primitives::RecoveredBlock;
+use std::sync::Arc;
+
+/// A block with its receipts, fetched via RPC.
+#[derive(Debug)]
+pub struct RpcBlock {
+ /// The recovered block (with senders).
+ pub(crate) block: RecoveredBlock,
+ /// The receipts for this block's transactions.
+ pub(crate) receipts: Vec,
+}
+
+impl RpcBlock {
+ /// The block number.
+ pub fn number(&self) -> u64 {
+ self.block.number()
+ }
+
+ /// The block hash.
+ pub const fn hash(&self) -> alloy::primitives::B256 {
+ self.block.header.hash()
+ }
+
+ /// The parent block hash.
+ pub fn parent_hash(&self) -> alloy::primitives::B256 {
+ self.block.parent_hash()
+ }
+}
+
+/// A chain segment fetched via RPC.
+///
+/// Contains one or more blocks with their receipts, ordered by block number
+/// ascending. Blocks are wrapped in [`Arc`] for cheap sharing with the
+/// notifier's internal buffer.
+#[derive(Debug)]
+pub struct RpcChainSegment {
+ blocks: Vec>,
+}
+
+impl RpcChainSegment {
+ /// Create a new segment from a list of blocks.
+ pub const fn new(blocks: Vec>) -> Self {
+ Self { blocks }
+ }
+}
+
+impl Extractable for RpcChainSegment {
+ type Block = RecoveredBlock;
+ type Receipt = ReceiptEnvelope;
+
+ fn blocks_and_receipts(
+ &self,
+ ) -> impl Iterator- > {
+ self.blocks.iter().map(|b| BlockAndReceipts { block: &b.block, receipts: &b.receipts })
+ }
+
+ fn first_number(&self) -> Option
{
+ self.blocks.first().map(|b| b.number())
+ }
+
+ fn tip_number(&self) -> Option {
+ self.blocks.last().map(|b| b.number())
+ }
+
+ fn len(&self) -> usize {
+ self.blocks.len()
+ }
+
+ fn is_empty(&self) -> bool {
+ self.blocks.is_empty()
+ }
+}
diff --git a/crates/node/src/builder.rs b/crates/node/src/builder.rs
index 57f6222..3ef5d93 100644
--- a/crates/node/src/builder.rs
+++ b/crates/node/src/builder.rs
@@ -33,50 +33,18 @@ pub struct NotAStorage;
/// - A [`StorageRpcConfig`], via [`Self::with_rpc_config`].
/// - A `reqwest::Client`, via [`Self::with_client`].
/// - If not set, a default client will be created.
-///
-/// # Examples
-///
-/// ```no_run
-/// # use signet_node::builder::SignetNodeBuilder;
-/// # fn example(
-/// # config: signet_node_config::SignetNodeConfig,
-/// # notifier: impl signet_node_types::HostNotifier,
-/// # storage: std::sync::Arc>,
-/// # alias_oracle: impl signet_block_processor::AliasOracleFactory,
-/// # blob_cacher: signet_blobber::CacheHandle,
-/// # serve_config: signet_rpc::ServeConfig,
-/// # rpc_config: signet_rpc::StorageRpcConfig,
-/// # ) {
-/// let builder = SignetNodeBuilder::new(config)
-/// .with_notifier(notifier)
-/// .with_storage(storage)
-/// .with_alias_oracle(alias_oracle)
-/// .with_blob_cacher(blob_cacher)
-/// .with_serve_config(serve_config)
-/// .with_rpc_config(rpc_config);
-/// # }
-/// ```
-pub struct SignetNodeBuilder<
- Notifier = (),
- Storage = NotAStorage,
- Aof = NotAnAof,
- Bc = (),
- Sc = (),
- Rc = (),
-> {
+pub struct SignetNodeBuilder {
config: SignetNodeConfig,
alias_oracle: Option,
notifier: Option,
storage: Option,
client: Option,
- blob_cacher: Option,
- serve_config: Option,
- rpc_config: Option,
+ blob_cacher: Option,
+ serve_config: Option,
+ rpc_config: Option,
}
-impl core::fmt::Debug
- for SignetNodeBuilder
-{
+impl core::fmt::Debug for SignetNodeBuilder {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("SignetNodeBuilder").finish_non_exhaustive()
}
@@ -98,12 +66,12 @@ impl SignetNodeBuilder {
}
}
-impl SignetNodeBuilder {
+impl SignetNodeBuilder {
/// Set the [`UnifiedStorage`] backend for the signet node.
pub fn with_storage(
self,
storage: Arc>,
- ) -> SignetNodeBuilder>, Aof, Bc, Sc, Rc> {
+ ) -> SignetNodeBuilder>, Aof> {
SignetNodeBuilder {
config: self.config,
alias_oracle: self.alias_oracle,
@@ -117,10 +85,7 @@ impl SignetNodeBuilder(
- self,
- notifier: N,
- ) -> SignetNodeBuilder {
+ pub fn with_notifier(self, notifier: N) -> SignetNodeBuilder {
SignetNodeBuilder {
config: self.config,
alias_oracle: self.alias_oracle,
@@ -137,7 +102,7 @@ impl SignetNodeBuilder(
self,
alias_oracle: NewAof,
- ) -> SignetNodeBuilder {
+ ) -> SignetNodeBuilder {
SignetNodeBuilder {
config: self.config,
alias_oracle: Some(alias_oracle),
@@ -157,59 +122,25 @@ impl SignetNodeBuilder SignetNodeBuilder {
- SignetNodeBuilder {
- config: self.config,
- alias_oracle: self.alias_oracle,
- notifier: self.notifier,
- storage: self.storage,
- client: self.client,
- blob_cacher: Some(blob_cacher),
- serve_config: self.serve_config,
- rpc_config: self.rpc_config,
- }
+ pub fn with_blob_cacher(mut self, blob_cacher: CacheHandle) -> Self {
+ self.blob_cacher = Some(blob_cacher);
+ self
}
/// Set the RPC transport configuration.
- pub fn with_serve_config(
- self,
- serve_config: ServeConfig,
- ) -> SignetNodeBuilder {
- SignetNodeBuilder {
- config: self.config,
- alias_oracle: self.alias_oracle,
- notifier: self.notifier,
- storage: self.storage,
- client: self.client,
- blob_cacher: self.blob_cacher,
- serve_config: Some(serve_config),
- rpc_config: self.rpc_config,
- }
+ pub fn with_serve_config(mut self, serve_config: ServeConfig) -> Self {
+ self.serve_config = Some(serve_config);
+ self
}
/// Set the RPC behaviour configuration.
- pub fn with_rpc_config(
- self,
- rpc_config: StorageRpcConfig,
- ) -> SignetNodeBuilder {
- SignetNodeBuilder {
- config: self.config,
- alias_oracle: self.alias_oracle,
- notifier: self.notifier,
- storage: self.storage,
- client: self.client,
- blob_cacher: self.blob_cacher,
- serve_config: self.serve_config,
- rpc_config: Some(rpc_config),
- }
+ pub const fn with_rpc_config(mut self, rpc_config: StorageRpcConfig) -> Self {
+ self.rpc_config = Some(rpc_config);
+ self
}
}
-impl
- SignetNodeBuilder>, Aof, CacheHandle, ServeConfig, StorageRpcConfig>
+impl SignetNodeBuilder>, Aof>
where
N: HostNotifier,
H: HotKv + Clone + Send + Sync + 'static,
@@ -268,9 +199,9 @@ where
self.storage.unwrap(),
self.alias_oracle.unwrap(),
self.client.unwrap(),
- self.blob_cacher.unwrap(),
- self.serve_config.unwrap(),
- self.rpc_config.unwrap(),
+ self.blob_cacher.ok_or_eyre("Blob cacher must be set")?,
+ self.serve_config.ok_or_eyre("Serve config must be set")?,
+ self.rpc_config.ok_or_eyre("RPC config must be set")?,
)
}
}
diff --git a/crates/node/src/metrics.rs b/crates/node/src/metrics.rs
index 1c93380..f43d25f 100644
--- a/crates/node/src/metrics.rs
+++ b/crates/node/src/metrics.rs
@@ -68,10 +68,14 @@ fn inc_reorgs_processed() {
pub(crate) fn record_notification_received(notification: &HostNotification) {
inc_notifications_received();
- notification.reverted_chain().inspect(|_| inc_reorgs_received());
+ if notification.reverted_chain().is_some() {
+ inc_reorgs_received();
+ }
}
pub(crate) fn record_notification_processed(notification: &HostNotification) {
inc_notifications_processed();
- notification.reverted_chain().inspect(|_| inc_reorgs_processed());
+ if notification.reverted_chain().is_some() {
+ inc_reorgs_processed();
+ }
}
diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs
index 29330b8..646b313 100644
--- a/crates/node/src/node.rs
+++ b/crates/node/src/node.rs
@@ -225,7 +225,7 @@ where
reverted = notification.reverted_chain().map(|c| c.len()).unwrap_or_default(),
committed = notification.committed_chain().map(|c| c.len()).unwrap_or_default(),
))]
- pub(crate) async fn on_notification(
+ pub async fn on_notification(
&self,
notification: &HostNotification,
) -> eyre::Result {
@@ -389,7 +389,7 @@ where
first = chain.first_number().unwrap_or(0),
tip = chain.tip_number().unwrap_or(0),
))]
- pub(crate) async fn on_host_revert(&self, chain: &Arc) -> eyre::Result {
+ pub async fn on_host_revert(&self, chain: &Arc) -> eyre::Result {
// NB: `unwrap_or(0)` is safe here because a non-empty chain always
// has a first/tip block. If an invariant violation causes `None`,
// the `0` fallback results in `drain_above(0)` which drains all
diff --git a/crates/node/src/rpc.rs b/crates/node/src/rpc.rs
index 45a5688..e143f97 100644
--- a/crates/node/src/rpc.rs
+++ b/crates/node/src/rpc.rs
@@ -15,7 +15,7 @@ where
AliasOracle: AliasOracleFactory,
{
/// Start the RPC server.
- pub(crate) async fn start_rpc(&mut self) -> eyre::Result<()> {
+ pub async fn start_rpc(&mut self) -> eyre::Result<()> {
let guard = self.launch_rpc().await?;
self.rpc_handle = Some(guard);
info!("launched rpc server");