Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/tempo-zone/src/l1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
//! - [`queue`] — the deposit hash-chain queue consumed by the engine.

use alloy_consensus::BlockHeader as _;
use alloy_eips::NumHash;
use alloy_eips::{BlockNumberOrTag, NumHash};
use alloy_network::BlockResponse as _;
use alloy_primitives::{Address, B256, Bytes, U256, keccak256};
use alloy_provider::{DynProvider, Provider, ProviderBuilder};
use alloy_rpc_client::RpcClient;
Expand Down
284 changes: 123 additions & 161 deletions crates/tempo-zone/src/l1/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;

/// Poll interval for the HTTP block filter fallback (500ms, matching L1 block time).
const HTTP_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);
/// Poll interval for finalized L1 block polling (500ms, matching L1 block time).
const FINALIZED_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500);

/// Configuration for the L1 subscriber.
#[derive(Debug, Clone)]
Expand All @@ -19,8 +19,11 @@ pub struct L1SubscriberConfig {
/// blocks.
pub policy_cache: crate::l1_state::tip403::PolicyCache,
/// Shared L1 state cache. The subscriber updates the cache anchor on each
/// confirmed block and clears it on reorgs.
/// finalized block and clears it on reorgs.
pub l1_state_cache: crate::l1_state::cache::L1StateCache,
/// L1 block tag used for polling. Production nodes use `Finalized`; tests
/// can use `Latest` for local dev chains that do not advance finalized.
pub l1_block_tag: BlockNumberOrTag,
/// Maximum number of concurrent L1 RPC receipt fetches. Used directly for
/// the live stream and halved for backfill (which sends 2 requests per block).
pub l1_fetch_concurrency: usize,
Expand Down Expand Up @@ -133,60 +136,19 @@ impl L1Subscriber {
Ok(provider)
}

/// Returns a stream of new L1 block headers, abstracting over the transport.
///
/// - **WebSocket**: uses `subscribe_blocks` for push-based delivery.
/// - **HTTP**: falls back to `watch_full_blocks` (filter-based polling via
/// `eth_newBlockFilter` + `eth_getFilterChanges`), extracting the header
/// from each block. The fallback is selected when `subscribe_blocks`
/// returns `PubsubUnavailable`.
///
/// Both paths produce the same header payloads; transport-specific polling
/// failures are surfaced as stream errors so [`run`](Self::run) can
/// reconnect and resync.
async fn header_stream<'a>(
&self,
provider: &'a DynProvider<TempoNetwork>,
) -> eyre::Result<
Pin<
Box<
dyn Stream<
Item = eyre::Result<
<TempoNetwork as alloy_network::Network>::HeaderResponse,
>,
> + Send
+ 'a,
>,
>,
> {
match provider.subscribe_blocks().await {
Ok(sub) => {
info!("Using WebSocket block subscription");
Ok(Box::pin(sub.into_stream().map(Ok)))
}
Err(e) => {
if e.as_transport_err()
.is_some_and(|t| t.is_pubsub_unavailable())
{
info!("Pubsub unavailable, falling back to HTTP polling");
let mut watcher = provider.watch_full_blocks().await?;
watcher.set_poll_interval(HTTP_POLL_INTERVAL);
let stream = watcher
.into_stream()
.map(|res| res.map(|block| block.header).map_err(Into::into));
Ok(Box::pin(stream))
} else {
Err(e.into())
}
}
}
async fn tagged_block_number(&self, provider: &DynProvider<TempoNetwork>) -> eyre::Result<u64> {
let block = provider
.get_block_by_number(self.config.l1_block_tag)
.await?
.ok_or_else(|| eyre::eyre!("tagged L1 block not available"))?;
Ok(block.header().number())
}

/// Build the live L1 block stream, fetching receipts for each new header
/// and buffering requests ahead of processing.
async fn l1_block_stream<'a>(
/// Build the live L1 block stream from the configured finality tag.
async fn l1_block_stream(
&self,
provider: &'a DynProvider<TempoNetwork>,
provider: &DynProvider<TempoNetwork>,
start_block: u64,
) -> eyre::Result<
Pin<
Box<
Expand All @@ -195,44 +157,77 @@ impl L1Subscriber {
<TempoNetwork as alloy_network::Network>::HeaderResponse,
Vec<<TempoNetwork as alloy_network::Network>::ReceiptResponse>,
)>,
> + Send
+ 'a,
> + Send,
>,
>,
> {
let header_stream = self.header_stream(provider).await?;
let concurrency = self.config.l1_fetch_concurrency.max(1);
use futures::stream;

let provider = provider.clone();
let subscriber_metrics = self.subscriber_metrics.clone();
let stream = header_stream
.map_ok(move |header| {
let provider = provider;
let subscriber_metrics = subscriber_metrics.clone();
async move {
let block_number = header.number();
let start = std::time::Instant::now();
let fetch_failures = &subscriber_metrics.fetch_failures;
let receipts = provider
.get_block_receipts(BlockId::number(block_number))
let l1_block_tag = self.config.l1_block_tag;
let stream = stream::unfold(start_block, move |block_number| {
let provider = provider.clone();
let subscriber_metrics = subscriber_metrics.clone();
async move {
loop {
let tagged_number = match provider
.get_block_by_number(l1_block_tag)
.await
.map_err(eyre::Report::from)
.and_then(|receipts| {
receipts
.and_then(|block| {
block.ok_or_else(|| eyre::eyre!("tagged L1 block not available"))
}) {
Ok(block) => block.header().number(),
Err(err) => return Some((Err(err), block_number)),
};

if block_number > tagged_number {
tokio::time::sleep(FINALIZED_POLL_INTERVAL).await;
continue;
}

let start = std::time::Instant::now();
let fetch_failures = &subscriber_metrics.fetch_failures;
let fetched = tokio::try_join!(
async {
provider
.get_block_by_number(block_number.into())
.full()
.await
.map_err(eyre::Report::from)?
.ok_or_else(|| {
eyre::eyre!("L1 block not found for block {block_number}")
})
},
async {
provider
.get_block_receipts(BlockId::number(block_number))
.await
.map_err(eyre::Report::from)?
.ok_or_else(|| eyre::eyre!("no receipts for block {block_number}"))
})
.inspect_err(|_| {
fetch_failures.increment(1);
})?;
let elapsed = start.elapsed();
debug!(
block_number,
elapsed_ms = elapsed.as_millis() as u64,
receipts = receipts.len(),
"Fetched live block receipts"
);
Ok::<_, eyre::Report>((header, receipts))
},
)
.inspect_err(|_| {
fetch_failures.increment(1);
});

let result = fetched.map(|(block, receipts)| {
let header = block.header().clone();
let elapsed = start.elapsed();
debug!(
block_number,
elapsed_ms = elapsed.as_millis() as u64,
receipts = receipts.len(),
"Fetched tagged L1 block data"
);
(header, receipts)
});

return Some((result, block_number + 1));
}
})
.try_buffered(concurrency);
}
});
Ok(Box::pin(stream))
}

Expand Down Expand Up @@ -274,11 +269,11 @@ impl L1Subscriber {
Ok(Some(on_chain + 1))
}

/// Backfill deposit events from the starting block to the current L1 tip.
/// Backfill deposit events from the starting block to the current tagged L1 block.
#[instrument(skip(self, l1_provider))]
async fn sync_to_l1_tip(
async fn sync_to_finalized_l1(
&mut self,
l1_provider: &impl Provider<TempoNetwork>,
l1_provider: &DynProvider<TempoNetwork>,
) -> eyre::Result<()> {
let Some(mut from) = self.resolve_start_block(l1_provider).await? else {
self.subscriber_metrics.current_l1_lag_blocks.set(0.0);
Expand All @@ -299,10 +294,10 @@ impl L1Subscriber {
from = from.max(adjusted);
}

let tip = l1_provider.get_block_number().await?;
let tip = self.tagged_block_number(l1_provider).await?;
self.record_seen_block(tip, 0);
if from > tip {
info!(from, tip, "Already synced to L1 tip");
info!(from, tip, "Already synced to tagged L1");
self.subscriber_metrics.current_l1_lag_blocks.set(0.0);
return Ok(());
}
Expand All @@ -311,7 +306,7 @@ impl L1Subscriber {
from,
tip,
blocks = tip - from + 1,
"Backfilling deposit events"
"Backfilling tagged L1 deposit events"
);
let start = std::time::Instant::now();
let result = self.backfill(l1_provider, from, tip).await;
Expand Down Expand Up @@ -428,15 +423,10 @@ impl L1Subscriber {

/// Run the L1 subscriber until the stream ends or an error occurs.
///
/// Connects to the L1 node (HTTP or WebSocket), backfills deposit events
/// to the current L1 tip, then listens for new block headers. Each block —
/// with or without deposits — is enqueued so the zone engine sees a strict
/// sequential chain.
///
/// Live-streamed blocks are buffered one block behind: a block is only
/// flushed to the deposit queue once the next block arrives with a
/// matching parent hash, proving the buffered block is canonical. This
/// prevents the zone from committing to an L1 tip that gets reorged away.
/// Connects to the L1 node, backfills deposit events to the current
/// configured L1 block tag, then polls newly available blocks. Each block — with
/// or without deposits — is enqueued so the zone engine sees a strict
/// sequential finalized chain.
///
/// Callers should retry on error (see [`Self::spawn`]).
pub async fn run(mut self) -> eyre::Result<()> {
Expand All @@ -446,22 +436,23 @@ impl L1Subscriber {

let provider = self.connect().await?;

// Backfill to the current tip before subscribing.
// Backfilled blocks are historical and considered confirmed.
self.sync_to_l1_tip(&provider).await?;
// Backfill to the current tagged block before polling for the next
// available block.
self.sync_to_finalized_l1(&provider).await?;

info!(portal = %self.config.portal_address, "Listening for L1 blocks");
let mut stream = self.l1_block_stream(&provider).await?;

// Confirmation buffer: holds the latest unconfirmed L1 block.
// A block is only flushed to the deposit queue once the NEXT block
// arrives with a matching parent hash, proving the buffered block
// is on the canonical chain.
let mut unconfirmed_tip: Option<(
SealedHeader<TempoHeader>,
L1PortalEvents,
Vec<PolicyEvent>,
)> = None;
let stream_start = if let Some(last) = self.deposit_queue.last_enqueued() {
last.number + 1
} else if let Some(start) = self.resolve_start_block(&provider).await? {
start
} else {
self.tagged_block_number(&provider).await? + 1
};
info!(
portal = %self.config.portal_address,
stream_start,
"Polling tagged L1 blocks"
);
let mut stream = self.l1_block_stream(&provider, stream_start).await?;

loop {
let stream_wait_start = std::time::Instant::now();
Expand All @@ -477,59 +468,30 @@ impl L1Subscriber {
let (events, policy_events) = self.extract_events(block_number, &receipts);
self.record_seen_block(block_number, 0);

// If we have a buffered tip, check if the new block confirms it.
if let Some((tip_header, tip_events, tip_policy_events)) = unconfirmed_tip.take() {
if sealed.parent_hash() == tip_header.hash() {
// Confirmed — update the L1 state anchor, apply events, and
// flush to the queue.
let tip_number = tip_header.number();
let tip_hash = tip_header.hash();
let tip_parent = tip_header.parent_hash();
self.update_l1_state_anchor(tip_number, tip_hash, tip_parent);
self.apply_policy_events(tip_number, &tip_policy_events);
self.apply_portal_state_events(tip_number, &tip_events);
match self
.deposit_queue
.try_enqueue(tip_header, tip_events, tip_policy_events)
{
EnqueueOutcome::Accepted => {
self.subscriber_metrics.blocks_enqueued.increment(1);
}
EnqueueOutcome::Duplicate => {}
EnqueueOutcome::NeedBackfill { from, to } => {
// Gap between queue head and confirmed tip — backfill
// the missing range including the tip (re-fetched from
// the provider since try_enqueue consumed ownership).
warn!(
from,
to,
tip = tip_number,
"Backfilling gap before confirmed tip"
);
self.backfill(&provider, from, tip_number).await?;
}
}
} else {
// Reorg — discard the buffered tip and clear L1 state and
// policy caches.
self.subscriber_metrics.reorgs_detected.increment(1);
self.update_l1_state_anchor(block_number, sealed.hash(), sealed.parent_hash());
self.apply_policy_events(block_number, &policy_events);
self.apply_portal_state_events(block_number, &events);
match self
.deposit_queue
.try_enqueue(sealed, events, policy_events)
{
EnqueueOutcome::Accepted => {
self.subscriber_metrics.blocks_enqueued.increment(1);
}
EnqueueOutcome::Duplicate => {}
EnqueueOutcome::NeedBackfill { from, to } => {
warn!(
discarded_block = tip_header.number(),
discarded_hash = %tip_header.hash(),
new_block = block_number,
new_parent = %sealed.parent_hash(),
"Discarding unconfirmed L1 block (reorg)"
from,
to,
tip = block_number,
"Backfilling gap before tagged block"
);
self.config.l1_state_cache.write().clear();
self.config.policy_cache.write().clear();
self.backfill(&provider, from, block_number).await?;
}
}

// Buffer the new block as unconfirmed tip.
unconfirmed_tip = Some((sealed, events, policy_events));
}

warn!("L1 block subscription stream ended");
warn!("Tagged L1 block polling stream ended");
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions crates/tempo-zone/src/l1/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ fn test_subscriber(
genesis_tempo_block_number,
policy_cache: crate::PolicyCache::default(),
l1_state_cache: crate::L1StateCache::new(HashSet::from([portal_address])),
l1_block_tag: BlockNumberOrTag::Finalized,
l1_fetch_concurrency: 1,
retry_connection_interval: Duration::from_secs(1),
},
Expand Down
Loading
Loading