diff --git a/Cargo.toml b/Cargo.toml index 1d01544..294c861 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ signet-genesis = { version = "0.16.0-rc.7", path = "crates/genesis" } signet-node = { version = "0.16.0-rc.7", path = "crates/node" } signet-node-config = { version = "0.16.0-rc.7", path = "crates/node-config" } signet-node-tests = { version = "0.16.0-rc.7", path = "crates/node-tests" } +signet-node-types = { version = "0.16.0-rc.7", path = "crates/node-types" } signet-rpc = { version = "0.16.0-rc.7", path = "crates/rpc" } init4-bin-base = { version = "0.18.0-rc.8", features = ["alloy"] } diff --git a/crates/node-config/Cargo.toml b/crates/node-config/Cargo.toml index 877f503..f906131 100644 --- a/crates/node-config/Cargo.toml +++ b/crates/node-config/Cargo.toml @@ -18,8 +18,6 @@ init4-bin-base.workspace = true reth.workspace = true reth-chainspec.workspace = true -reth-exex.workspace = true -reth-node-api.workspace = true alloy.workspace = true eyre.workspace = true diff --git a/crates/node-config/src/core.rs b/crates/node-config/src/core.rs index 2bb5840..185f7fd 100644 --- a/crates/node-config/src/core.rs +++ b/crates/node-config/src/core.rs @@ -1,9 +1,9 @@ use crate::StorageConfig; use alloy::genesis::Genesis; use init4_bin_base::utils::{calc::SlotCalculator, from_env::FromEnv}; +use reth::primitives::NodePrimitives; use reth::providers::providers::StaticFileProvider; use reth_chainspec::ChainSpec; -use reth_node_api::NodePrimitives; use signet_blobber::BlobFetcherConfig; use signet_genesis::GenesisSpec; use signet_types::constants::{ConfigError, SignetSystemConstants}; diff --git a/crates/node-config/src/lib.rs b/crates/node-config/src/lib.rs index 26071cc..abd9c7c 100644 --- a/crates/node-config/src/lib.rs +++ b/crates/node-config/src/lib.rs @@ -14,7 +14,8 @@ mod core; pub use core::{SIGNET_NODE_DEFAULT_HTTP_PORT, SignetNodeConfig}; -mod rpc; +// NB: RPC config merging (previously `merge_rpc_configs`) is now the +// responsibility of the host adapter crate (e.g. `signet-host-reth`). mod storage; pub use storage::StorageConfig; diff --git a/crates/node-config/src/rpc.rs b/crates/node-config/src/rpc.rs deleted file mode 100644 index 44e4258..0000000 --- a/crates/node-config/src/rpc.rs +++ /dev/null @@ -1,29 +0,0 @@ -use crate::SignetNodeConfig; -use reth::args::RpcServerArgs; -use reth_exex::ExExContext; -use reth_node_api::FullNodeComponents; - -impl SignetNodeConfig { - /// Inherits the IP host address from the Reth RPC server configuration, - /// and change the configured port for the RPC server. If the host server - /// is configured to use IPC, Signet Node will use the endpoint specified by the - /// environment variable `IPC_ENDPOINT`. - fn modify_args(&self, sc: &RpcServerArgs) -> eyre::Result { - let mut args = sc.clone(); - - args.http_port = self.http_port(); - args.ws_port = self.ws_port(); - args.ipcpath = self.ipc_endpoint().map(ToOwned::to_owned).unwrap_or_default(); - - Ok(args) - } - - /// Merges Signet Node configurations over the transport and rpc server - /// configurations, and returns the modified configs. - pub fn merge_rpc_configs(&self, exex: &ExExContext) -> eyre::Result - where - Node: FullNodeComponents, - { - self.modify_args(&exex.config.rpc) - } -} diff --git a/crates/node-types/Cargo.toml b/crates/node-types/Cargo.toml new file mode 100644 index 0000000..6815a66 --- /dev/null +++ b/crates/node-types/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "signet-node-types" +description = "Trait abstractions for the signet node's host chain interface." +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] +signet-extract.workspace = true diff --git a/crates/node-types/README.md b/crates/node-types/README.md new file mode 100644 index 0000000..5f02143 --- /dev/null +++ b/crates/node-types/README.md @@ -0,0 +1,3 @@ +# signet-node-types + +Trait abstractions for the signet node's host chain interface. diff --git a/crates/node-types/src/lib.rs b/crates/node-types/src/lib.rs new file mode 100644 index 0000000..6572742 --- /dev/null +++ b/crates/node-types/src/lib.rs @@ -0,0 +1,18 @@ +#![doc = include_str!("../README.md")] +#![warn( + missing_copy_implementations, + missing_debug_implementations, + missing_docs, + unreachable_pub, + clippy::missing_const_for_fn, + rustdoc::all +)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![deny(unused_must_use, rust_2018_idioms)] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +mod notification; +pub use notification::{HostNotification, HostNotificationKind}; + +mod notifier; +pub use notifier::HostNotifier; diff --git a/crates/node-types/src/notification.rs b/crates/node-types/src/notification.rs new file mode 100644 index 0000000..abfb63f --- /dev/null +++ b/crates/node-types/src/notification.rs @@ -0,0 +1,117 @@ +use std::sync::Arc; + +/// A notification from the host chain, bundling a chain event with +/// point-in-time block tag data. The safe/finalized block numbers are +/// intentionally snapshotted at notification creation time rather than +/// fetched live, because rollup safe/finalized tags are only updated +/// after block processing completes. +/// +/// # Examples +/// +/// ``` +/// # use std::sync::Arc; +/// # use signet_node_types::{HostNotification, HostNotificationKind}; +/// # fn example(chain: Arc) { +/// let notification = HostNotification { +/// kind: HostNotificationKind::ChainCommitted { new: chain }, +/// safe_block_number: Some(100), +/// finalized_block_number: Some(90), +/// }; +/// +/// // Access the committed chain via the shortcut method. +/// assert!(notification.committed_chain().is_some()); +/// assert!(notification.reverted_chain().is_none()); +/// # } +/// ``` +#[derive(Debug, Clone)] +pub struct HostNotification { + /// The chain event (commit, revert, or reorg). + pub kind: HostNotificationKind, + /// The host chain "safe" block number at the time of this notification. + pub safe_block_number: Option, + /// The host chain "finalized" block number at the time of this + /// notification. + pub finalized_block_number: Option, +} + +impl HostNotification { + /// Returns the committed chain, if any. Shortcut for + /// `self.kind.committed_chain()`. + pub const fn committed_chain(&self) -> Option<&Arc> { + self.kind.committed_chain() + } + + /// Returns the reverted chain, if any. Shortcut for + /// `self.kind.reverted_chain()`. + pub const fn reverted_chain(&self) -> Option<&Arc> { + self.kind.reverted_chain() + } +} + +/// The kind of chain event in a [`HostNotification`]. +/// +/// # Examples +/// +/// ``` +/// # use std::sync::Arc; +/// # use signet_node_types::HostNotificationKind; +/// # fn example(old: Arc, new: Arc) { +/// let kind = HostNotificationKind::ChainReorged { +/// old: old.clone(), +/// new: new.clone(), +/// }; +/// # } +/// ``` +#[derive(Debug, Clone)] +pub enum HostNotificationKind { + /// A new chain segment was committed. + ChainCommitted { + /// The newly committed chain segment. + new: Arc, + }, + /// A chain segment was reverted. + ChainReverted { + /// The reverted chain segment. + old: Arc, + }, + /// A chain reorg occurred: one segment was reverted and replaced by + /// another. + ChainReorged { + /// The reverted chain segment. + old: Arc, + /// The newly committed chain segment. + new: Arc, + }, +} + +impl HostNotificationKind { + /// Returns the committed chain, if any. + /// + /// Returns `Some` for [`ChainCommitted`] and [`ChainReorged`], `None` + /// for [`ChainReverted`]. + /// + /// [`ChainCommitted`]: HostNotificationKind::ChainCommitted + /// [`ChainReorged`]: HostNotificationKind::ChainReorged + /// [`ChainReverted`]: HostNotificationKind::ChainReverted + pub const fn committed_chain(&self) -> Option<&Arc> { + match self { + Self::ChainCommitted { new } | Self::ChainReorged { new, .. } => Some(new), + Self::ChainReverted { .. } => None, + } + } + + /// Returns the reverted chain, if any. + /// + /// Returns `Some` for [`ChainReverted`] and [`ChainReorged`], `None` + /// for [`ChainCommitted`]. + /// + /// [`ChainReverted`]: HostNotificationKind::ChainReverted + /// [`ChainReorged`]: HostNotificationKind::ChainReorged + /// [`ChainCommitted`]: HostNotificationKind::ChainCommitted + pub const fn reverted_chain(&self) -> Option<&Arc> { + match self { + Self::ChainReverted { old } | Self::ChainReorged { old, .. } => Some(old), + Self::ChainCommitted { .. } => None, + } + } +} diff --git a/crates/node-types/src/notifier.rs b/crates/node-types/src/notifier.rs new file mode 100644 index 0000000..b3bddd9 --- /dev/null +++ b/crates/node-types/src/notifier.rs @@ -0,0 +1,56 @@ +use crate::HostNotification; +use core::future::Future; +use signet_extract::Extractable; + +/// Abstraction over a host chain notification source. +/// +/// Drives the signet node's main loop: yielding chain events, controlling +/// backfill, and sending feedback. All block data comes from notifications; +/// the backend handles hash resolution internally. +/// +/// # Implementors +/// +/// - `signet-host-reth`: wraps reth's `ExExContext` +/// +/// # Implementing +/// +/// Implementations must uphold the following contract: +/// +/// 1. **`set_head`** — called once at startup before the first +/// [`next_notification`]. The backend must resolve the block number to a +/// hash (falling back to genesis if the number is not yet available) and +/// begin delivering notifications from that point. +/// 2. **`next_notification`** — must yield notifications in host-chain order. +/// Returning `None` signals a clean shutdown. +/// 3. **`set_backfill_thresholds`** — may be called at any time. Passing +/// `None` should restore the backend's default batch size. +/// 4. **`send_finished_height`** — may be called after processing each +/// notification batch. The backend resolves the block number to a hash +/// internally. Sending a height that has already been acknowledged is a +/// no-op. +/// +/// [`next_notification`]: HostNotifier::next_notification +pub trait HostNotifier: Send + Sync { + /// A chain segment — contiguous blocks with receipts. + type Chain: Extractable; + + /// The error type for fallible operations. + type Error: core::error::Error + Send + Sync + 'static; + + /// Yield the next notification. `None` signals host shutdown. + fn next_notification( + &mut self, + ) -> impl Future, Self::Error>>> + Send; + + /// Set the head position, requesting backfill from this block number. + /// The backend resolves the block number to a block hash internally. + fn set_head(&mut self, block_number: u64); + + /// Configure backfill batch size limits. `None` means use the backend's + /// default. + fn set_backfill_thresholds(&mut self, max_blocks: Option); + + /// Signal that processing is complete up to this host block number. + /// The backend resolves the block number to a block hash internally. + fn send_finished_height(&self, block_number: u64) -> Result<(), Self::Error>; +} diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index ad24182..29292a9 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -9,31 +9,24 @@ homepage.workspace = true repository.workspace = true [dependencies] +signet-blobber.workspace = true signet-block-processor.workspace = true signet-cold.workspace = true signet-evm.workspace = true signet-extract.workspace = true signet-genesis.workspace = true +signet-hot.workspace = true signet-node-config.workspace = true +signet-node-types.workspace = true signet-rpc.workspace = true -signet-hot.workspace = true signet-storage.workspace = true - -signet-blobber.workspace = true signet-tx-cache.workspace = true signet-types.workspace = true alloy.workspace = true - -reth.workspace = true -reth-exex.workspace = true -reth-node-api.workspace = true -reth-stages-types.workspace = true - trevm.workspace = true eyre.workspace = true -futures-util.workspace = true metrics.workspace = true reqwest.workspace = true tokio.workspace = true diff --git a/crates/node/src/alias.rs b/crates/node/src/alias.rs deleted file mode 100644 index c682c87..0000000 --- a/crates/node/src/alias.rs +++ /dev/null @@ -1,114 +0,0 @@ -use alloy::{consensus::constants::KECCAK_EMPTY, primitives::Address}; -use core::{ - fmt, - future::{self, Future}, -}; -use eyre::OptionExt; -use reth::providers::{StateProviderBox, StateProviderFactory}; -use signet_block_processor::{AliasOracle, AliasOracleFactory}; - -/// An [`AliasOracle`] backed by a reth [`StateProviderBox`]. -/// -/// Checks whether an address has non-delegation bytecode, indicating it -/// should be aliased during transaction processing. -pub struct RethAliasOracle(StateProviderBox); - -impl fmt::Debug for RethAliasOracle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RethAliasOracle").finish_non_exhaustive() - } -} - -impl RethAliasOracle { - /// Synchronously check whether the given address should be aliased. - fn check_alias(&self, address: Address) -> eyre::Result { - let Some(acct) = self.0.basic_account(&address)? else { return Ok(false) }; - // Get the bytecode hash for this account. - let bch = match acct.bytecode_hash { - Some(hash) => hash, - // No bytecode hash; not a contract. - None => return Ok(false), - }; - // No code at this address. - if bch == KECCAK_EMPTY { - return Ok(false); - } - // Fetch the code associated with this bytecode hash. - let code = self - .0 - .bytecode_by_hash(&bch)? - .ok_or_eyre("code not found. This indicates a corrupted database")?; - - // If not a 7702 delegation contract, alias it. - Ok(!code.is_eip7702()) - } -} - -impl AliasOracle for RethAliasOracle { - fn should_alias(&self, address: Address) -> impl Future> + Send { - let result = self.check_alias(address); - future::ready(result) - } -} - -/// An [`AliasOracleFactory`] backed by a `Box`. -/// -/// Creates [`RethAliasOracle`] instances from the latest host chain state. -pub struct RethAliasOracleFactory(Box); - -impl fmt::Debug for RethAliasOracleFactory { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RethAliasOracleFactory").finish_non_exhaustive() - } -} - -impl RethAliasOracleFactory { - /// Create a new [`RethAliasOracleFactory`] from a boxed state provider - /// factory. - pub fn new(provider: Box) -> Self { - Self(provider) - } -} - -impl AliasOracleFactory for RethAliasOracleFactory { - type Oracle = RethAliasOracle; - - fn create(&self) -> eyre::Result { - // ## Why `Latest` instead of a pinned host height - // - // We use `Latest` rather than pinning to a specific host block - // height because pinning would require every node to be an archive - // node in order to sync historical state, which is impractical. - // - // ## Why `Latest` is safe - // - // An EOA cannot become a non-delegation contract without a birthday - // attack (c.f. EIP-3607). CREATE and CREATE2 addresses are - // deterministic and cannot target an existing EOA. EIP-7702 - // delegations are explicitly excluded by the `is_eip7702()` check - // in `should_alias`, so delegated EOAs are never aliased. This - // means the alias status of an address is stable across blocks - // under normal conditions, making `Latest` equivalent to any - // pinned height. - // - // ## The only risk: birthday attacks - // - // A birthday attack could produce a CREATE/CREATE2 collision with - // an existing EOA, causing `should_alias` to return a false - // positive. This is computationally infeasible for the foreseeable - // future (~2^80 work), and if it ever becomes practical we can - // revisit this decision. - // - // ## Over-aliasing vs under-aliasing - // - // Even in the birthday attack scenario, the result is - // over-aliasing (a false positive), which is benign: a transaction - // sender gets an aliased address when it shouldn't. The dangerous - // failure mode — under-aliasing — cannot occur here because - // contract bytecode is never removed once deployed. - self.0 - .state_by_block_number_or_tag(alloy::eips::BlockNumberOrTag::Latest) - .map(RethAliasOracle) - .map_err(Into::into) - } -} diff --git a/crates/node/src/builder.rs b/crates/node/src/builder.rs index 490c542..57f6222 100644 --- a/crates/node/src/builder.rs +++ b/crates/node/src/builder.rs @@ -1,14 +1,14 @@ #![allow(clippy::type_complexity)] -use crate::{NodeStatus, RethAliasOracleFactory, SignetNode}; +use crate::{NodeStatus, SignetNode}; use eyre::OptionExt; -use reth::{primitives::EthPrimitives, providers::StateProviderFactory}; -use reth_exex::ExExContext; -use reth_node_api::{FullNodeComponents, NodeTypes}; +use signet_blobber::CacheHandle; use signet_block_processor::AliasOracleFactory; use signet_cold::BlockData; use signet_hot::db::{HotDbRead, UnsafeDbWrite}; use signet_node_config::SignetNodeConfig; +use signet_node_types::HostNotifier; +use signet_rpc::{ServeConfig, StorageRpcConfig}; use signet_storage::{HistoryRead, HistoryWrite, HotKv, HotKvRead, UnifiedStorage}; use std::sync::Arc; use tracing::info; @@ -25,22 +25,58 @@ pub struct NotAStorage; /// Builder for [`SignetNode`]. This is the main way to create a signet node. /// /// The builder requires the following components to be set before building: -/// - An [`ExExContext`], via [`Self::with_ctx`]. +/// - A [`HostNotifier`], via [`Self::with_notifier`]. /// - An [`Arc>`], via [`Self::with_storage`]. /// - An [`AliasOracleFactory`], via [`Self::with_alias_oracle`]. -/// - If not set, a default one will be created from the [`ExExContext`]'s -/// provider. +/// - A [`CacheHandle`], via [`Self::with_blob_cacher`]. +/// - A [`ServeConfig`], via [`Self::with_serve_config`]. +/// - A [`StorageRpcConfig`], via [`Self::with_rpc_config`]. /// - A `reqwest::Client`, via [`Self::with_client`]. /// - If not set, a default client will be created. -pub struct SignetNodeBuilder { +/// +/// # Examples +/// +/// ```no_run +/// # use signet_node::builder::SignetNodeBuilder; +/// # fn example( +/// # config: signet_node_config::SignetNodeConfig, +/// # notifier: impl signet_node_types::HostNotifier, +/// # storage: std::sync::Arc>, +/// # alias_oracle: impl signet_block_processor::AliasOracleFactory, +/// # blob_cacher: signet_blobber::CacheHandle, +/// # serve_config: signet_rpc::ServeConfig, +/// # rpc_config: signet_rpc::StorageRpcConfig, +/// # ) { +/// let builder = SignetNodeBuilder::new(config) +/// .with_notifier(notifier) +/// .with_storage(storage) +/// .with_alias_oracle(alias_oracle) +/// .with_blob_cacher(blob_cacher) +/// .with_serve_config(serve_config) +/// .with_rpc_config(rpc_config); +/// # } +/// ``` +pub struct SignetNodeBuilder< + Notifier = (), + Storage = NotAStorage, + Aof = NotAnAof, + Bc = (), + Sc = (), + Rc = (), +> { config: SignetNodeConfig, alias_oracle: Option, - ctx: Option, + notifier: Option, storage: Option, client: Option, + blob_cacher: Option, + serve_config: Option, + rpc_config: Option, } -impl core::fmt::Debug for SignetNodeBuilder { +impl core::fmt::Debug + for SignetNodeBuilder +{ fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("SignetNodeBuilder").finish_non_exhaustive() } @@ -49,40 +85,51 @@ impl core::fmt::Debug for SignetNodeBuilder Self { - Self { config, alias_oracle: None, ctx: None, storage: None, client: None } + Self { + config, + alias_oracle: None, + notifier: None, + storage: None, + client: None, + blob_cacher: None, + serve_config: None, + rpc_config: None, + } } } -impl SignetNodeBuilder { +impl SignetNodeBuilder { /// Set the [`UnifiedStorage`] backend for the signet node. pub fn with_storage( self, storage: Arc>, - ) -> SignetNodeBuilder>, Aof> { + ) -> SignetNodeBuilder>, Aof, Bc, Sc, Rc> { SignetNodeBuilder { config: self.config, alias_oracle: self.alias_oracle, - ctx: self.ctx, + notifier: self.notifier, storage: Some(storage), client: self.client, + blob_cacher: self.blob_cacher, + serve_config: self.serve_config, + rpc_config: self.rpc_config, } } - /// Set the [`ExExContext`] for the signet node. - pub fn with_ctx( + /// Set the [`HostNotifier`] for the signet node. + pub fn with_notifier( self, - ctx: ExExContext, - ) -> SignetNodeBuilder, Storage, Aof> - where - NewHost: FullNodeComponents, - NewHost::Types: NodeTypes, - { + notifier: N, + ) -> SignetNodeBuilder { SignetNodeBuilder { config: self.config, alias_oracle: self.alias_oracle, - ctx: Some(ctx), + notifier: Some(notifier), storage: self.storage, client: self.client, + blob_cacher: self.blob_cacher, + serve_config: self.serve_config, + rpc_config: self.rpc_config, } } @@ -90,13 +137,16 @@ impl SignetNodeBuilder { pub fn with_alias_oracle( self, alias_oracle: NewAof, - ) -> SignetNodeBuilder { + ) -> SignetNodeBuilder { SignetNodeBuilder { config: self.config, alias_oracle: Some(alias_oracle), - ctx: self.ctx, + notifier: self.notifier, storage: self.storage, client: self.client, + blob_cacher: self.blob_cacher, + serve_config: self.serve_config, + rpc_config: self.rpc_config, } } @@ -105,19 +155,72 @@ impl SignetNodeBuilder { self.client = Some(client); self } + + /// Set the pre-built blob cacher handle. + pub fn with_blob_cacher( + self, + blob_cacher: CacheHandle, + ) -> SignetNodeBuilder { + SignetNodeBuilder { + config: self.config, + alias_oracle: self.alias_oracle, + notifier: self.notifier, + storage: self.storage, + client: self.client, + blob_cacher: Some(blob_cacher), + serve_config: self.serve_config, + rpc_config: self.rpc_config, + } + } + + /// Set the RPC transport configuration. + pub fn with_serve_config( + self, + serve_config: ServeConfig, + ) -> SignetNodeBuilder { + SignetNodeBuilder { + config: self.config, + alias_oracle: self.alias_oracle, + notifier: self.notifier, + storage: self.storage, + client: self.client, + blob_cacher: self.blob_cacher, + serve_config: Some(serve_config), + rpc_config: self.rpc_config, + } + } + + /// Set the RPC behaviour configuration. + pub fn with_rpc_config( + self, + rpc_config: StorageRpcConfig, + ) -> SignetNodeBuilder { + SignetNodeBuilder { + config: self.config, + alias_oracle: self.alias_oracle, + notifier: self.notifier, + storage: self.storage, + client: self.client, + blob_cacher: self.blob_cacher, + serve_config: self.serve_config, + rpc_config: Some(rpc_config), + } + } } -impl SignetNodeBuilder, Arc>, Aof> +impl + SignetNodeBuilder>, Aof, CacheHandle, ServeConfig, StorageRpcConfig> where - Host: FullNodeComponents, - Host::Types: NodeTypes, - H: HotKv, + N: HostNotifier, + H: HotKv + Clone + Send + Sync + 'static, + ::Error: DBErrorMarker, + Aof: AliasOracleFactory, { /// Prebuild checks for the signet node builder. Shared by all build /// commands. async fn prebuild(&mut self) -> eyre::Result<()> { self.client.get_or_insert_default(); - self.ctx.as_ref().ok_or_eyre("Launch context must be set")?; + self.notifier.as_ref().ok_or_eyre("Notifier must be set")?; let storage = self.storage.as_ref().ok_or_eyre("Storage must be set")?; // Load genesis into hot storage if absent. @@ -150,60 +253,24 @@ where Ok(()) } -} - -impl SignetNodeBuilder, Arc>, NotAnAof> -where - Host: FullNodeComponents, - Host::Types: NodeTypes, - H: HotKv + Clone + Send + Sync + 'static, - ::Error: DBErrorMarker, -{ - /// Build the node. This performs the following steps: - /// - /// - Runs prebuild checks. - /// - Inits storage from genesis if needed. - /// - Creates a default `AliasOracleFactory` from the host DB. - pub async fn build( - mut self, - ) -> eyre::Result<(SignetNode, tokio::sync::watch::Receiver)> { - self.prebuild().await?; - let ctx = self.ctx.unwrap(); - let provider = ctx.provider().clone(); - let alias_oracle = RethAliasOracleFactory::new(Box::new(provider)); - - SignetNode::new_unsafe( - ctx, - self.config, - self.storage.unwrap(), - alias_oracle, - self.client.unwrap(), - ) - } -} -impl SignetNodeBuilder, Arc>, Aof> -where - Host: FullNodeComponents, - Host::Types: NodeTypes, - H: HotKv + Clone + Send + Sync + 'static, - ::Error: DBErrorMarker, - Aof: AliasOracleFactory, -{ /// Build the node. This performs the following steps: /// /// - Runs prebuild checks. /// - Inits storage from genesis if needed. pub async fn build( mut self, - ) -> eyre::Result<(SignetNode, tokio::sync::watch::Receiver)> { + ) -> eyre::Result<(SignetNode, tokio::sync::watch::Receiver)> { self.prebuild().await?; SignetNode::new_unsafe( - self.ctx.unwrap(), + self.notifier.unwrap(), self.config, self.storage.unwrap(), self.alias_oracle.unwrap(), self.client.unwrap(), + self.blob_cacher.unwrap(), + self.serve_config.unwrap(), + self.rpc_config.unwrap(), ) } } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index c8e7c60..4af3110 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -11,9 +11,6 @@ #![deny(unused_must_use, rust_2018_idioms)] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -mod alias; -pub use alias::{RethAliasOracle, RethAliasOracleFactory}; - mod builder; pub use builder::SignetNodeBuilder; diff --git a/crates/node/src/metrics.rs b/crates/node/src/metrics.rs index d775464..1c93380 100644 --- a/crates/node/src/metrics.rs +++ b/crates/node/src/metrics.rs @@ -7,8 +7,8 @@ //! - Number of reorgs processed use metrics::{Counter, counter, describe_counter}; -use reth::primitives::NodePrimitives; -use reth_exex::ExExNotification; +use signet_extract::Extractable; +use signet_node_types::HostNotification; use std::sync::LazyLock; const NOTIFICATION_RECEIVED: &str = "signet.node.notification_received"; @@ -66,16 +66,12 @@ fn inc_reorgs_processed() { reorgs_processed().increment(1); } -pub(crate) fn record_notification_received(notification: &ExExNotification) { +pub(crate) fn record_notification_received(notification: &HostNotification) { inc_notifications_received(); - if notification.reverted_chain().is_some() { - inc_reorgs_received(); - } + notification.reverted_chain().inspect(|_| inc_reorgs_received()); } -pub(crate) fn record_notification_processed(notification: &ExExNotification) { +pub(crate) fn record_notification_processed(notification: &HostNotification) { inc_notifications_processed(); - if notification.reverted_chain().is_some() { - inc_reorgs_processed(); - } + notification.reverted_chain().inspect(|_| inc_reorgs_processed()); } diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 1be52e0..29330b8 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -1,22 +1,15 @@ -use crate::{NodeStatus, RethAliasOracleFactory, metrics}; +use crate::{NodeStatus, metrics}; use alloy::consensus::BlockHeader; use eyre::Context; -use futures_util::StreamExt; -use reth::{ - chainspec::EthChainSpec, - primitives::EthPrimitives, - providers::{BlockIdReader, BlockReader, HeaderProvider}, -}; -use reth_exex::{ExExContext, ExExEvent, ExExHead, ExExNotificationsStream}; -use reth_node_api::{FullNodeComponents, FullNodeTypes, NodeTypes}; -use reth_stages_types::ExecutionStageThresholds; -use signet_blobber::{CacheHandle, ExtractableChainShim}; +use signet_blobber::CacheHandle; use signet_block_processor::{AliasOracleFactory, SignetBlockProcessorV1}; use signet_evm::EthereumHardfork; -use signet_extract::Extractor; +use signet_extract::{Extractable, Extractor}; use signet_node_config::SignetNodeConfig; +use signet_node_types::{HostNotification, HostNotifier}; use signet_rpc::{ ChainNotifier, NewBlockNotification, RemovedBlock, ReorgNotification, RpcServerGuard, + ServeConfig, StorageRpcConfig, }; use signet_storage::{DrainedBlock, HistoryRead, HotKv, HotKvRead, UnifiedStorage}; use signet_types::{PairedHeights, constants::SignetSystemConstants}; @@ -25,20 +18,14 @@ use tokio::sync::watch; use tracing::{debug, info, instrument}; use trevm::revm::database::DBErrorMarker; -/// Type alias for the host primitives. -type PrimitivesOf = <::Types as NodeTypes>::Primitives; -type ExExNotification = reth_exex::ExExNotification>; -type Chain = reth::providers::Chain>; - /// Signet context and configuration. -pub struct SignetNode +pub struct SignetNode where - Host: FullNodeComponents, - Host::Types: NodeTypes, + N: HostNotifier, H: HotKv, { - /// The host context, which manages provider access and notifications. - pub(crate) host: ExExContext, + /// The host notifier, which yields chain notifications. + pub(crate) notifier: N, /// Signet node configuration. pub(crate) config: Arc, @@ -68,12 +55,17 @@ where /// A reqwest client, used by the blob fetch and the tx cache forwarder. pub(crate) client: reqwest::Client, + + /// RPC transport configuration. + pub(crate) serve_config: ServeConfig, + + /// RPC behaviour configuration. + pub(crate) rpc_config: StorageRpcConfig, } -impl fmt::Debug for SignetNode +impl fmt::Debug for SignetNode where - Host: FullNodeComponents, - Host::Types: NodeTypes, + N: HostNotifier, H: HotKv, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -81,10 +73,9 @@ where } } -impl SignetNode +impl SignetNode where - Host: FullNodeComponents, - Host::Types: NodeTypes, + N: HostNotifier, H: HotKv + Clone + Send + Sync + 'static, ::Error: DBErrorMarker, AliasOracle: AliasOracleFactory, @@ -102,12 +93,16 @@ where /// /// [`SignetNodeBuilder`]: crate::builder::SignetNodeBuilder #[doc(hidden)] + #[allow(clippy::too_many_arguments)] pub fn new_unsafe( - ctx: ExExContext, + notifier: N, config: SignetNodeConfig, storage: Arc>, alias_oracle: AliasOracle, client: reqwest::Client, + blob_cacher: CacheHandle, + serve_config: ServeConfig, + rpc_config: StorageRpcConfig, ) -> eyre::Result<(Self, watch::Receiver)> { let constants = config.constants().wrap_err("failed to load signet constants from genesis")?; @@ -115,17 +110,9 @@ where let (status, receiver) = watch::channel(NodeStatus::Booting); let chain = ChainNotifier::new(128); - let blob_cacher = signet_blobber::BlobFetcher::builder() - .with_config(config.block_extractor())? - .with_pool(ctx.pool().clone()) - .with_client(client.clone()) - .build_cache() - .wrap_err("failed to create blob cacher")? - .spawn(); - let this = Self { config: config.into(), - host: ctx, + notifier, storage, chain, rpc_handle: None, @@ -134,6 +121,8 @@ where alias_oracle: Arc::new(alias_oracle), blob_cacher, client, + serve_config, + rpc_config, }; Ok((this, receiver)) } @@ -144,9 +133,9 @@ where Ok(reader.last_block_number()?.unwrap_or(0)) } - /// Start the Signet instance, listening for ExEx notifications. Trace any + /// Start the Signet instance, listening for host notifications. Trace any /// errors. - #[instrument(skip(self), fields(host = ?self.host.config.chain.chain()))] + #[instrument(skip(self))] pub async fn start(mut self) -> eyre::Result<()> { // Ensure hot and cold storage are at the same height. If either // is ahead, unwind to the minimum so the host re-delivers blocks. @@ -175,13 +164,12 @@ where let last_block = self.storage.reader().ok().and_then(|r| r.last_block_number().ok().flatten()); - let exex_head = last_block.and_then(|h| self.set_exex_head(h).ok()); - tracing::error!(err, last_block, ?exex_head, "Signet node crashed"); + tracing::error!(err, last_block, "Signet node crashed"); }) } - /// Start the Signet instance, listening for ExEx notifications. + /// Start the Signet instance, listening for host notifications. async fn start_inner(&mut self) -> eyre::Result<()> { debug!(constants = ?self.constants, "signet starting"); @@ -195,135 +183,83 @@ where // Update the node status channel with last block height self.status.send_modify(|s| *s = NodeStatus::AtHeight(last_rollup_block)); - // Sets the ExEx head position relative to that last block - let exex_head = self.set_exex_head(last_rollup_block)?; + // Set the head position and backfill thresholds on the notifier + let host_height = match last_rollup_block { + 0 => self.constants.host_deploy_height(), + n => self.constants.pair_ru(n).host, + }; + self.notifier.set_head(host_height); + self.notifier.set_backfill_thresholds(self.config.backfill_max_blocks()); + info!( - host_head = exex_head.block.number, - host_hash = %exex_head.block.hash, + host_height, rollup_head_height = last_rollup_block, "signet listening for notifications" ); - // Handle incoming ExEx notifications - while let Some(notification) = self.host.notifications.next().await { - let notification = notification.wrap_err("error in reth host notifications stream")?; - self.on_notification(notification) + // Handle incoming host notifications + while let Some(notification) = self.notifier.next_notification().await { + let notification = notification.wrap_err("error in host notifications stream")?; + let changed = self + .on_notification(¬ification) .await .wrap_err("error while processing notification")?; + if changed { + let ru_height = self.last_rollup_block()?; + self.update_block_tags( + ru_height, + notification.safe_block_number, + notification.finalized_block_number, + )?; + } } info!("signet shutting down"); Ok(()) } - /// Sets the head of the Exex chain from the last rollup block, handling - /// genesis conditions if necessary. - fn set_exex_head(&mut self, last_rollup_block: u64) -> eyre::Result { - // If the last rollup block is 0, shortcut to the host rollup - // deployment block. - if last_rollup_block == 0 { - let host_deployment_block = - self.host.provider().block_by_number(self.constants.host_deploy_height())?; - match host_deployment_block { - Some(genesis_block) => { - let exex_head = ExExHead { block: genesis_block.num_hash_slow() }; - self.host.notifications.set_with_head(exex_head); - self.set_backfill_thresholds(); - return Ok(exex_head); - } - None => { - let host_ru_deploy_block = self.constants.host_deploy_height(); - debug!( - host_ru_deploy_block, - "Host deploy height not found. Falling back to genesis block" - ); - let genesis_block = self - .host - .provider() - .block_by_number(0)? - .expect("failed to find genesis block"); - let exex_head = ExExHead { block: genesis_block.num_hash_slow() }; - self.host.notifications.set_with_head(exex_head); - self.set_backfill_thresholds(); - return Ok(exex_head); - } - } - } - - // Find the corresponding host block for the rollup block number. - let host_height = self.constants.pair_ru(last_rollup_block).host; - - match self.host.provider().block_by_number(host_height)? { - Some(host_block) => { - debug!(host_height, "found host block for height"); - let exex_head = ExExHead { block: host_block.num_hash_slow() }; - self.host.notifications.set_with_head(exex_head); - self.set_backfill_thresholds(); - Ok(exex_head) - } - None => { - debug!(host_height, "no host block found for host height"); - let genesis_block = - self.host.provider().block_by_number(0)?.expect("failed to find genesis block"); - let exex_head = ExExHead { block: genesis_block.num_hash_slow() }; - self.host.notifications.set_with_head(exex_head); - self.set_backfill_thresholds(); - Ok(exex_head) - } - } - } - - /// Sets backfill thresholds to limit memory usage during sync. - /// This should be called after `set_with_head` to configure how many - /// blocks can be processed per backfill batch. - fn set_backfill_thresholds(&mut self) { - if let Some(max_blocks) = self.config.backfill_max_blocks() { - self.host.notifications.set_backfill_thresholds(ExecutionStageThresholds { - max_blocks: Some(max_blocks), - ..Default::default() - }); - debug!(max_blocks, "configured backfill thresholds"); - } - } - - /// Runs on any notification received from the ExEx context. + /// Runs on any notification received from the host. + /// + /// Returns `true` if any rollup state changed. #[instrument(parent = None, skip_all, fields( reverted = notification.reverted_chain().map(|c| c.len()).unwrap_or_default(), committed = notification.committed_chain().map(|c| c.len()).unwrap_or_default(), ))] - pub async fn on_notification(&self, notification: ExExNotification) -> eyre::Result<()> { - metrics::record_notification_received(¬ification); + pub(crate) async fn on_notification( + &self, + notification: &HostNotification, + ) -> eyre::Result { + metrics::record_notification_received(notification); let mut changed = false; // NB: REVERTS MUST RUN FIRST if let Some(chain) = notification.reverted_chain() { changed |= - self.on_host_revert(&chain).await.wrap_err("error encountered during revert")?; + self.on_host_revert(chain).await.wrap_err("error encountered during revert")?; } if let Some(chain) = notification.committed_chain() { changed |= self - .process_committed_chain(&chain) + .process_committed_chain(chain) .await .wrap_err("error encountered during commit")?; } if changed { - self.update_status()?; + self.update_status_channel()?; } - metrics::record_notification_processed(¬ification); - Ok(()) + metrics::record_notification_processed(notification); + Ok(changed) } /// Process a committed chain by extracting and executing blocks. /// /// Returns `true` if any rollup blocks were processed. - async fn process_committed_chain(&self, chain: &Arc>) -> eyre::Result { - let shim = ExtractableChainShim::new(chain); + async fn process_committed_chain(&self, chain: &Arc) -> eyre::Result { let extractor = Extractor::new(self.constants.clone()); - let extracts: Vec<_> = extractor.extract_signet(&shim).collect(); + let extracts: Vec<_> = extractor.extract_signet(chain.as_ref()).collect(); let last_height = self.last_rollup_block()?; @@ -380,26 +316,28 @@ where let _ = self.chain.send_reorg(notif); } - /// Update the status channel and block tags. This keeps the RPC node - /// in sync with the latest block information. - fn update_status(&self) -> eyre::Result<()> { + /// Update the status channel with the current rollup height. + fn update_status_channel(&self) -> eyre::Result<()> { let ru_height = self.last_rollup_block()?; - - self.update_block_tags(ru_height)?; self.status.send_modify(|s| *s = NodeStatus::AtHeight(ru_height)); Ok(()) } - /// Update block tags (latest/safe/finalized) and notify reth of processed - /// height. - fn update_block_tags(&self, ru_height: u64) -> eyre::Result<()> { + /// Update block tags (latest/safe/finalized) and notify the host of + /// processed height. + fn update_block_tags( + &self, + ru_height: u64, + safe_block_number: Option, + finalized_block_number: Option, + ) -> eyre::Result<()> { // Safe height - let safe_heights = self.load_safe_block_heights(ru_height)?; + let safe_heights = self.clamp_host_heights(ru_height, safe_block_number); let safe_ru_height = safe_heights.rollup; debug!(safe_ru_height, "calculated safe ru height"); // Finalized height - let finalized_heights = self.load_finalized_block_heights(ru_height)?; + let finalized_heights = self.clamp_host_heights(ru_height, finalized_block_number); debug!( finalized_host_height = finalized_heights.host, finalized_ru_height = finalized_heights.rollup, @@ -409,7 +347,7 @@ where // Atomically update all three tags self.chain.tags().update_all(ru_height, safe_ru_height, finalized_heights.rollup); - // Notify reth that we've finished processing up to the finalized + // Notify the host that we've finished processing up to the finalized // height. Skip if finalized rollup height is still at genesis. if finalized_heights.rollup > 0 { self.update_highest_processed_height(finalized_heights.host)?; @@ -424,88 +362,51 @@ where Ok(()) } - /// Load the host chain "safe" block number and determine the rollup "safe" - /// block number. - /// - /// There are three cases: - /// 1. The host chain "safe" block number is below the rollup genesis. - /// 2. The safe rollup equivalent is beyond the current rollup height. - /// 3. The safe rollup equivalent is below the current rollup height (normal - /// case). - fn load_safe_block_heights(&self, ru_height: u64) -> eyre::Result { - let Some(safe_heights) = - self.host.provider().safe_block_number()?.and_then(|h| self.constants.pair_host(h)) - else { - // Host safe block is below rollup genesis — use genesis. - return Ok(PairedHeights { host: self.constants.host_deploy_height(), rollup: 0 }); + /// Map a host block number to a [`PairedHeights`], clamping to the + /// current rollup height. Returns genesis heights when the host block + /// is below the rollup deploy height. + fn clamp_host_heights(&self, ru_height: u64, host_block_number: Option) -> PairedHeights { + let Some(heights) = host_block_number.and_then(|h| self.constants.pair_host(h)) else { + return PairedHeights { host: self.constants.host_deploy_height(), rollup: 0 }; }; // Clamp to current rollup height if ahead. - if safe_heights.rollup > ru_height { - Ok(self.constants.pair_ru(ru_height)) - } else { - Ok(safe_heights) - } + if heights.rollup > ru_height { self.constants.pair_ru(ru_height) } else { heights } } - /// Load the host chain "finalized" block number and determine the rollup - /// "finalized" block number. - /// - /// There are three cases: - /// 1. The host chain "finalized" block is below the rollup genesis. - /// 2. The finalized rollup equivalent is beyond the current rollup height. - /// 3. The finalized rollup equivalent is below the current rollup height - /// (normal case). - fn load_finalized_block_heights(&self, ru_height: u64) -> eyre::Result { - let Some(finalized_ru) = self - .host - .provider() - .finalized_block_number()? - .and_then(|h| self.constants.host_block_to_rollup_block_num(h)) - else { - // Host finalized block is below rollup genesis — use genesis. - return Ok(PairedHeights { host: self.constants.host_deploy_height(), rollup: 0 }); - }; - - // Clamp to current rollup height if ahead. - let ru = finalized_ru.min(ru_height); - Ok(self.constants.pair_ru(ru)) - } - - /// Update the host node with the highest processed host height for the - /// ExEx. + /// Update the host node with the highest processed host height. fn update_highest_processed_height(&self, finalized_host_height: u64) -> eyre::Result<()> { let adjusted_height = finalized_host_height.saturating_sub(1); - let adjusted_header = self - .host - .provider() - .sealed_header(adjusted_height)? - .expect("db inconsistent. no host header for adjusted height"); - - let hash = adjusted_header.hash(); - debug!(finalized_host_height = adjusted_height, "Sending FinishedHeight notification"); - self.host.events.send(ExExEvent::FinishedHeight(alloy::eips::NumHash { - number: adjusted_height, - hash, - }))?; + self.notifier.send_finished_height(adjusted_height).map_err(|e| eyre::eyre!(e))?; Ok(()) } /// Called when the host chain has reverted a block or set of blocks. /// /// Returns `true` if any rollup state was unwound. - #[instrument(skip_all, fields(first = chain.first().number(), tip = chain.tip().number()))] - pub async fn on_host_revert(&self, chain: &Arc>) -> eyre::Result { + #[instrument(skip_all, fields( + first = chain.first_number().unwrap_or(0), + tip = chain.tip_number().unwrap_or(0), + ))] + pub(crate) async fn on_host_revert(&self, chain: &Arc) -> eyre::Result { + // NB: `unwrap_or(0)` is safe here because a non-empty chain always + // has a first/tip block. If an invariant violation causes `None`, + // the `0` fallback results in `drain_above(0)` which drains all + // rollup state — a loud, obvious failure rather than silent + // corruption. + let tip = chain.tip_number().unwrap_or(0); + let first = chain.first_number().unwrap_or(0); + // If the end is before the RU genesis, nothing to do. - if chain.tip().number() <= self.constants.host_deploy_height() { + if tip <= self.constants.host_deploy_height() { return Ok(false); } // Target is the block BEFORE the first block in the chain, or 0. let target = self .constants - .host_block_to_rollup_block_num(chain.first().number()) + .host_block_to_rollup_block_num(first) .unwrap_or_default() .saturating_sub(1); diff --git a/crates/node/src/rpc.rs b/crates/node/src/rpc.rs index 6ed6ef9..45a5688 100644 --- a/crates/node/src/rpc.rs +++ b/crates/node/src/rpc.rs @@ -1,23 +1,21 @@ use crate::SignetNode; -use reth::{args::RpcServerArgs, primitives::EthPrimitives}; -use reth_node_api::{FullNodeComponents, NodeTypes}; use signet_block_processor::AliasOracleFactory; -use signet_rpc::{RpcServerGuard, ServeConfig, StorageRpcConfig, StorageRpcCtx}; +use signet_node_types::HostNotifier; +use signet_rpc::{RpcServerGuard, StorageRpcCtx}; use signet_storage::HotKv; use signet_tx_cache::TxCache; -use std::{net::SocketAddr, sync::Arc}; +use std::sync::Arc; use tracing::info; -impl SignetNode +impl SignetNode where - Host: FullNodeComponents, - Host::Types: NodeTypes, + N: HostNotifier, H: HotKv + Send + Sync + 'static, ::Error: trevm::revm::database::DBErrorMarker, AliasOracle: AliasOracleFactory, { /// Start the RPC server. - pub async fn start_rpc(&mut self) -> eyre::Result<()> { + pub(crate) async fn start_rpc(&mut self) -> eyre::Result<()> { let guard = self.launch_rpc().await?; self.rpc_handle = Some(guard); info!("launched rpc server"); @@ -28,45 +26,16 @@ where let tx_cache = self.config.forward_url().map(|url| TxCache::new_with_client(url, self.client.clone())); - let args = self.config.merge_rpc_configs(&self.host)?; - let rpc_config = rpc_config_from_args(&args); - let rpc_ctx = StorageRpcCtx::new( Arc::clone(&self.storage), self.constants.clone(), self.config.genesis().config.clone(), self.chain.clone(), tx_cache, - rpc_config, + self.rpc_config, ); let router = signet_rpc::router::().with_state(rpc_ctx); - let serve_config = serve_config_from_args(args); - serve_config.serve(router).await.map_err(Into::into) + self.serve_config.clone().serve(router).await.map_err(Into::into) } } - -/// Extract [`StorageRpcConfig`] values from reth's host RPC settings. -/// -/// Fields with no reth equivalent retain their defaults. -fn rpc_config_from_args(args: &RpcServerArgs) -> StorageRpcConfig { - let gpo = &args.gas_price_oracle; - StorageRpcConfig::builder() - .rpc_gas_cap(args.rpc_gas_cap) - .max_tracing_requests(args.rpc_max_tracing_requests) - .gas_oracle_block_count(gpo.blocks as u64) - .gas_oracle_percentile(gpo.percentile as f64) - .ignore_price(Some(gpo.ignore_price as u128)) - .max_price(Some(gpo.max_price as u128)) - .build() -} - -/// Convert reth [`RpcServerArgs`] into a reth-free [`ServeConfig`]. -fn serve_config_from_args(args: RpcServerArgs) -> ServeConfig { - let http = - if args.http { vec![SocketAddr::from((args.http_addr, args.http_port))] } else { vec![] }; - let ws = if args.ws { vec![SocketAddr::from((args.ws_addr, args.ws_port))] } else { vec![] }; - let ipc = if !args.ipcdisable { Some(args.ipcpath) } else { None }; - - ServeConfig { http, http_cors: args.http_corsdomain, ws, ws_cors: args.ws_allowed_origins, ipc } -}