diff --git a/crates/tempo-zone/src/l1/mod.rs b/crates/tempo-zone/src/l1/mod.rs index f2121ea3..cdbd3b3f 100644 --- a/crates/tempo-zone/src/l1/mod.rs +++ b/crates/tempo-zone/src/l1/mod.rs @@ -14,7 +14,8 @@ //! - [`queue`] — the deposit hash-chain queue consumed by the engine. use alloy_consensus::BlockHeader as _; -use alloy_eips::NumHash; +use alloy_eips::{BlockNumberOrTag, NumHash}; +use alloy_network::BlockResponse as _; use alloy_primitives::{Address, B256, Bytes, U256, keccak256}; use alloy_provider::{DynProvider, Provider, ProviderBuilder}; use alloy_rpc_client::RpcClient; diff --git a/crates/tempo-zone/src/l1/subscriber.rs b/crates/tempo-zone/src/l1/subscriber.rs index d6b3a36b..e53e9969 100644 --- a/crates/tempo-zone/src/l1/subscriber.rs +++ b/crates/tempo-zone/src/l1/subscriber.rs @@ -1,7 +1,7 @@ use super::*; -/// Poll interval for the HTTP block filter fallback (500ms, matching L1 block time). -const HTTP_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500); +/// Poll interval for finalized L1 block polling (500ms, matching L1 block time). +const FINALIZED_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500); /// Configuration for the L1 subscriber. #[derive(Debug, Clone)] @@ -19,8 +19,11 @@ pub struct L1SubscriberConfig { /// blocks. pub policy_cache: crate::l1_state::tip403::PolicyCache, /// Shared L1 state cache. The subscriber updates the cache anchor on each - /// confirmed block and clears it on reorgs. + /// finalized block and clears it on reorgs. pub l1_state_cache: crate::l1_state::cache::L1StateCache, + /// L1 block tag used for polling. Production nodes use `Finalized`; tests + /// can use `Latest` for local dev chains that do not advance finalized. + pub l1_block_tag: BlockNumberOrTag, /// Maximum number of concurrent L1 RPC receipt fetches. Used directly for /// the live stream and halved for backfill (which sends 2 requests per block). pub l1_fetch_concurrency: usize, @@ -133,60 +136,19 @@ impl L1Subscriber { Ok(provider) } - /// Returns a stream of new L1 block headers, abstracting over the transport. - /// - /// - **WebSocket**: uses `subscribe_blocks` for push-based delivery. - /// - **HTTP**: falls back to `watch_full_blocks` (filter-based polling via - /// `eth_newBlockFilter` + `eth_getFilterChanges`), extracting the header - /// from each block. The fallback is selected when `subscribe_blocks` - /// returns `PubsubUnavailable`. - /// - /// Both paths produce the same header payloads; transport-specific polling - /// failures are surfaced as stream errors so [`run`](Self::run) can - /// reconnect and resync. - async fn header_stream<'a>( - &self, - provider: &'a DynProvider, - ) -> eyre::Result< - Pin< - Box< - dyn Stream< - Item = eyre::Result< - ::HeaderResponse, - >, - > + Send - + 'a, - >, - >, - > { - match provider.subscribe_blocks().await { - Ok(sub) => { - info!("Using WebSocket block subscription"); - Ok(Box::pin(sub.into_stream().map(Ok))) - } - Err(e) => { - if e.as_transport_err() - .is_some_and(|t| t.is_pubsub_unavailable()) - { - info!("Pubsub unavailable, falling back to HTTP polling"); - let mut watcher = provider.watch_full_blocks().await?; - watcher.set_poll_interval(HTTP_POLL_INTERVAL); - let stream = watcher - .into_stream() - .map(|res| res.map(|block| block.header).map_err(Into::into)); - Ok(Box::pin(stream)) - } else { - Err(e.into()) - } - } - } + async fn tagged_block_number(&self, provider: &DynProvider) -> eyre::Result { + let block = provider + .get_block_by_number(self.config.l1_block_tag) + .await? + .ok_or_else(|| eyre::eyre!("tagged L1 block not available"))?; + Ok(block.header().number()) } - /// Build the live L1 block stream, fetching receipts for each new header - /// and buffering requests ahead of processing. - async fn l1_block_stream<'a>( + /// Build the live L1 block stream from the configured finality tag. + async fn l1_block_stream( &self, - provider: &'a DynProvider, + provider: &DynProvider, + start_block: u64, ) -> eyre::Result< Pin< Box< @@ -195,44 +157,77 @@ impl L1Subscriber { ::HeaderResponse, Vec<::ReceiptResponse>, )>, - > + Send - + 'a, + > + Send, >, >, > { - let header_stream = self.header_stream(provider).await?; - let concurrency = self.config.l1_fetch_concurrency.max(1); + use futures::stream; + + let provider = provider.clone(); let subscriber_metrics = self.subscriber_metrics.clone(); - let stream = header_stream - .map_ok(move |header| { - let provider = provider; - let subscriber_metrics = subscriber_metrics.clone(); - async move { - let block_number = header.number(); - let start = std::time::Instant::now(); - let fetch_failures = &subscriber_metrics.fetch_failures; - let receipts = provider - .get_block_receipts(BlockId::number(block_number)) + let l1_block_tag = self.config.l1_block_tag; + let stream = stream::unfold(start_block, move |block_number| { + let provider = provider.clone(); + let subscriber_metrics = subscriber_metrics.clone(); + async move { + loop { + let tagged_number = match provider + .get_block_by_number(l1_block_tag) .await .map_err(eyre::Report::from) - .and_then(|receipts| { - receipts + .and_then(|block| { + block.ok_or_else(|| eyre::eyre!("tagged L1 block not available")) + }) { + Ok(block) => block.header().number(), + Err(err) => return Some((Err(err), block_number)), + }; + + if block_number > tagged_number { + tokio::time::sleep(FINALIZED_POLL_INTERVAL).await; + continue; + } + + let start = std::time::Instant::now(); + let fetch_failures = &subscriber_metrics.fetch_failures; + let fetched = tokio::try_join!( + async { + provider + .get_block_by_number(block_number.into()) + .full() + .await + .map_err(eyre::Report::from)? + .ok_or_else(|| { + eyre::eyre!("L1 block not found for block {block_number}") + }) + }, + async { + provider + .get_block_receipts(BlockId::number(block_number)) + .await + .map_err(eyre::Report::from)? .ok_or_else(|| eyre::eyre!("no receipts for block {block_number}")) - }) - .inspect_err(|_| { - fetch_failures.increment(1); - })?; - let elapsed = start.elapsed(); - debug!( - block_number, - elapsed_ms = elapsed.as_millis() as u64, - receipts = receipts.len(), - "Fetched live block receipts" - ); - Ok::<_, eyre::Report>((header, receipts)) + }, + ) + .inspect_err(|_| { + fetch_failures.increment(1); + }); + + let result = fetched.map(|(block, receipts)| { + let header = block.header().clone(); + let elapsed = start.elapsed(); + debug!( + block_number, + elapsed_ms = elapsed.as_millis() as u64, + receipts = receipts.len(), + "Fetched tagged L1 block data" + ); + (header, receipts) + }); + + return Some((result, block_number + 1)); } - }) - .try_buffered(concurrency); + } + }); Ok(Box::pin(stream)) } @@ -274,11 +269,11 @@ impl L1Subscriber { Ok(Some(on_chain + 1)) } - /// Backfill deposit events from the starting block to the current L1 tip. + /// Backfill deposit events from the starting block to the current tagged L1 block. #[instrument(skip(self, l1_provider))] - async fn sync_to_l1_tip( + async fn sync_to_finalized_l1( &mut self, - l1_provider: &impl Provider, + l1_provider: &DynProvider, ) -> eyre::Result<()> { let Some(mut from) = self.resolve_start_block(l1_provider).await? else { self.subscriber_metrics.current_l1_lag_blocks.set(0.0); @@ -299,10 +294,10 @@ impl L1Subscriber { from = from.max(adjusted); } - let tip = l1_provider.get_block_number().await?; + let tip = self.tagged_block_number(l1_provider).await?; self.record_seen_block(tip, 0); if from > tip { - info!(from, tip, "Already synced to L1 tip"); + info!(from, tip, "Already synced to tagged L1"); self.subscriber_metrics.current_l1_lag_blocks.set(0.0); return Ok(()); } @@ -311,7 +306,7 @@ impl L1Subscriber { from, tip, blocks = tip - from + 1, - "Backfilling deposit events" + "Backfilling tagged L1 deposit events" ); let start = std::time::Instant::now(); let result = self.backfill(l1_provider, from, tip).await; @@ -428,15 +423,10 @@ impl L1Subscriber { /// Run the L1 subscriber until the stream ends or an error occurs. /// - /// Connects to the L1 node (HTTP or WebSocket), backfills deposit events - /// to the current L1 tip, then listens for new block headers. Each block — - /// with or without deposits — is enqueued so the zone engine sees a strict - /// sequential chain. - /// - /// Live-streamed blocks are buffered one block behind: a block is only - /// flushed to the deposit queue once the next block arrives with a - /// matching parent hash, proving the buffered block is canonical. This - /// prevents the zone from committing to an L1 tip that gets reorged away. + /// Connects to the L1 node, backfills deposit events to the current + /// configured L1 block tag, then polls newly available blocks. Each block — with + /// or without deposits — is enqueued so the zone engine sees a strict + /// sequential finalized chain. /// /// Callers should retry on error (see [`Self::spawn`]). pub async fn run(mut self) -> eyre::Result<()> { @@ -446,22 +436,23 @@ impl L1Subscriber { let provider = self.connect().await?; - // Backfill to the current tip before subscribing. - // Backfilled blocks are historical and considered confirmed. - self.sync_to_l1_tip(&provider).await?; + // Backfill to the current tagged block before polling for the next + // available block. + self.sync_to_finalized_l1(&provider).await?; - info!(portal = %self.config.portal_address, "Listening for L1 blocks"); - let mut stream = self.l1_block_stream(&provider).await?; - - // Confirmation buffer: holds the latest unconfirmed L1 block. - // A block is only flushed to the deposit queue once the NEXT block - // arrives with a matching parent hash, proving the buffered block - // is on the canonical chain. - let mut unconfirmed_tip: Option<( - SealedHeader, - L1PortalEvents, - Vec, - )> = None; + let stream_start = if let Some(last) = self.deposit_queue.last_enqueued() { + last.number + 1 + } else if let Some(start) = self.resolve_start_block(&provider).await? { + start + } else { + self.tagged_block_number(&provider).await? + 1 + }; + info!( + portal = %self.config.portal_address, + stream_start, + "Polling tagged L1 blocks" + ); + let mut stream = self.l1_block_stream(&provider, stream_start).await?; loop { let stream_wait_start = std::time::Instant::now(); @@ -477,59 +468,30 @@ impl L1Subscriber { let (events, policy_events) = self.extract_events(block_number, &receipts); self.record_seen_block(block_number, 0); - // If we have a buffered tip, check if the new block confirms it. - if let Some((tip_header, tip_events, tip_policy_events)) = unconfirmed_tip.take() { - if sealed.parent_hash() == tip_header.hash() { - // Confirmed — update the L1 state anchor, apply events, and - // flush to the queue. - let tip_number = tip_header.number(); - let tip_hash = tip_header.hash(); - let tip_parent = tip_header.parent_hash(); - self.update_l1_state_anchor(tip_number, tip_hash, tip_parent); - self.apply_policy_events(tip_number, &tip_policy_events); - self.apply_portal_state_events(tip_number, &tip_events); - match self - .deposit_queue - .try_enqueue(tip_header, tip_events, tip_policy_events) - { - EnqueueOutcome::Accepted => { - self.subscriber_metrics.blocks_enqueued.increment(1); - } - EnqueueOutcome::Duplicate => {} - EnqueueOutcome::NeedBackfill { from, to } => { - // Gap between queue head and confirmed tip — backfill - // the missing range including the tip (re-fetched from - // the provider since try_enqueue consumed ownership). - warn!( - from, - to, - tip = tip_number, - "Backfilling gap before confirmed tip" - ); - self.backfill(&provider, from, tip_number).await?; - } - } - } else { - // Reorg — discard the buffered tip and clear L1 state and - // policy caches. - self.subscriber_metrics.reorgs_detected.increment(1); + self.update_l1_state_anchor(block_number, sealed.hash(), sealed.parent_hash()); + self.apply_policy_events(block_number, &policy_events); + self.apply_portal_state_events(block_number, &events); + match self + .deposit_queue + .try_enqueue(sealed, events, policy_events) + { + EnqueueOutcome::Accepted => { + self.subscriber_metrics.blocks_enqueued.increment(1); + } + EnqueueOutcome::Duplicate => {} + EnqueueOutcome::NeedBackfill { from, to } => { warn!( - discarded_block = tip_header.number(), - discarded_hash = %tip_header.hash(), - new_block = block_number, - new_parent = %sealed.parent_hash(), - "Discarding unconfirmed L1 block (reorg)" + from, + to, + tip = block_number, + "Backfilling gap before tagged block" ); - self.config.l1_state_cache.write().clear(); - self.config.policy_cache.write().clear(); + self.backfill(&provider, from, block_number).await?; } } - - // Buffer the new block as unconfirmed tip. - unconfirmed_tip = Some((sealed, events, policy_events)); } - warn!("L1 block subscription stream ended"); + warn!("Tagged L1 block polling stream ended"); Ok(()) } diff --git a/crates/tempo-zone/src/l1/tests.rs b/crates/tempo-zone/src/l1/tests.rs index 32ccbe93..fb6bb9d4 100644 --- a/crates/tempo-zone/src/l1/tests.rs +++ b/crates/tempo-zone/src/l1/tests.rs @@ -146,6 +146,7 @@ fn test_subscriber( genesis_tempo_block_number, policy_cache: crate::PolicyCache::default(), l1_state_cache: crate::L1StateCache::new(HashSet::from([portal_address])), + l1_block_tag: BlockNumberOrTag::Finalized, l1_fetch_concurrency: 1, retry_connection_interval: Duration::from_secs(1), }, diff --git a/crates/tempo-zone/src/node.rs b/crates/tempo-zone/src/node.rs index 268c556d..b80ae372 100644 --- a/crates/tempo-zone/src/node.rs +++ b/crates/tempo-zone/src/node.rs @@ -146,6 +146,7 @@ impl ZoneNode { genesis_tempo_block_number, policy_cache: policy_cache.clone(), l1_state_cache: l1_state_cache.clone(), + l1_block_tag: alloy_eips::BlockNumberOrTag::Finalized, l1_fetch_concurrency, retry_connection_interval, }; @@ -190,6 +191,12 @@ impl ZoneNode { self } + /// Set the L1 block tag used by the subscriber. + pub fn with_l1_block_tag(mut self, tag: alloy_eips::BlockNumberOrTag) -> Self { + self.l1_config.l1_block_tag = tag; + self + } + /// Returns the current deposit queue pub fn deposit_queue(&self) -> DepositQueue { self.deposit_queue.clone() diff --git a/crates/tempo-zone/tests/it/l1_e2e.rs b/crates/tempo-zone/tests/it/l1_e2e.rs index c50df143..36b7bb6b 100644 --- a/crates/tempo-zone/tests/it/l1_e2e.rs +++ b/crates/tempo-zone/tests/it/l1_e2e.rs @@ -5,7 +5,8 @@ //! subscriber naturally receives blocks and deposits — no synthetic injection. use crate::utils::{ - L1TestNode, STABLECOIN_DEX_ADDRESS, WithdrawalArgs, ZoneAccount, ZoneTestNode, spawn_sequencer, + DEFAULT_POLL, L1TestNode, STABLECOIN_DEX_ADDRESS, WithdrawalArgs, ZoneAccount, ZoneTestNode, + poll_until, spawn_sequencer, }; use alloy::{ primitives::{Address, B256, U256}, @@ -108,12 +109,14 @@ async fn test_zone_advances_with_real_l1() -> eyre::Result<()> { // Verify L1 is producing blocks let l1_block_0 = l1.provider().get_block_number().await?; - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - let l1_block_1 = l1.provider().get_block_number().await?; - assert!( - l1_block_1 > l1_block_0, - "L1 should be producing blocks in dev mode" - ); + poll_until(L1_TIMEOUT, DEFAULT_POLL, "L1 block production", || { + let provider = l1.provider(); + async move { + let l1_block_1 = provider.get_block_number().await?; + Ok((l1_block_1 > l1_block_0).then_some(l1_block_1)) + } + }) + .await?; // Start zone node connected to real L1 — genesis is patched from the L1's // current header so TempoState chain continuity works. diff --git a/crates/tempo-zone/tests/it/utils.rs b/crates/tempo-zone/tests/it/utils.rs index e39eec23..947b5570 100644 --- a/crates/tempo-zone/tests/it/utils.rs +++ b/crates/tempo-zone/tests/it/utils.rs @@ -509,7 +509,8 @@ impl ZoneTestNode { 4, std::time::Duration::from_millis(100), ) - .with_initial_tokens(vec![]); + .with_initial_tokens(vec![]) + .with_l1_block_tag(alloy_eips::BlockNumberOrTag::Latest); // Don't use .dev() — it spawns a LocalMiner that conflicts with ZoneEngine. // The ZoneEngine is the sole block producer; it advances the chain when L1