diff --git a/.gitignore b/.gitignore index 208f6f5..faa6dc5 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ .DS_Store Cargo.lock .idea/ +docs/superpowers/ diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index f8e2f95..1be52e0 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -368,10 +368,11 @@ where let removed_blocks = drained .into_iter() .map(|d| { - let header = d.header.into_inner(); - let logs = - d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect(); - RemovedBlock { header, logs } + let number = d.header.number(); + let hash = d.header.hash(); + let timestamp = d.header.timestamp(); + let logs = d.receipts.into_iter().flat_map(|r| r.receipt.logs).collect(); + RemovedBlock { number, hash, timestamp, logs } }) .collect(); let notif = ReorgNotification { common_ancestor, removed_blocks }; diff --git a/crates/rpc/src/config/chain_notifier.rs b/crates/rpc/src/config/chain_notifier.rs index 9d719ab..94d1b30 100644 --- a/crates/rpc/src/config/chain_notifier.rs +++ b/crates/rpc/src/config/chain_notifier.rs @@ -4,8 +4,21 @@ use crate::{ config::resolve::BlockTags, interest::{ChainEvent, NewBlockNotification, ReorgNotification}, }; +use std::{ + collections::VecDeque, + sync::{Arc, RwLock}, + time::Instant, +}; use tokio::sync::broadcast; +/// Maximum number of reorg notifications retained in the ring buffer. +/// At most one reorg per 12 seconds and a 5-minute stale filter TTL +/// gives a worst case of 25 entries. +const MAX_REORG_ENTRIES: usize = 25; + +/// Timestamped reorg ring buffer. +type ReorgBuffer = VecDeque<(Instant, Arc)>; + /// Shared chain state between the node and RPC layer. /// /// Combines block tag tracking and chain event notification into a single @@ -27,6 +40,7 @@ use tokio::sync::broadcast; pub struct ChainNotifier { tags: BlockTags, notif_tx: broadcast::Sender, + reorgs: Arc>, } impl ChainNotifier { @@ -35,7 +49,7 @@ impl ChainNotifier { pub fn new(channel_capacity: usize) -> Self { let tags = BlockTags::new(0, 0, 0); let (notif_tx, _) = broadcast::channel(channel_capacity); - Self { tags, notif_tx } + Self { tags, notif_tx, reorgs: Arc::new(RwLock::new(VecDeque::new())) } } /// Access the block tags. @@ -57,13 +71,21 @@ impl ChainNotifier { /// Send a reorg notification. /// - /// Returns `Ok(receiver_count)` or `Err` if there are no active - /// receivers (which is not usually an error condition). + /// Writes the notification to the authoritative ring buffer before + /// broadcasting to push subscribers. The broadcast `Err` only means + /// "no active push subscribers" — the ring buffer is unaffected. #[allow(clippy::result_large_err)] pub fn send_reorg( &self, notif: ReorgNotification, ) -> Result> { + { + let mut buf = self.reorgs.write().unwrap(); + if buf.len() >= MAX_REORG_ENTRIES { + buf.pop_front(); + } + buf.push_back((Instant::now(), Arc::new(notif.clone()))); + } self.send_event(ChainEvent::Reorg(notif)) } @@ -81,6 +103,20 @@ impl ChainNotifier { self.notif_tx.subscribe() } + /// Return all reorg notifications received after `since`. + /// + /// Clones the [`Arc`]s under a brief read lock. Used by polling + /// filters at poll time to compute removed logs. + pub fn reorgs_since(&self, since: Instant) -> Vec> { + self.reorgs + .read() + .unwrap() + .iter() + .filter(|(received_at, _)| *received_at > since) + .map(|(_, reorg)| Arc::clone(reorg)) + .collect() + } + /// Get a clone of the broadcast sender. /// /// Used by the subscription manager to create its own receiver. @@ -88,3 +124,54 @@ impl ChainNotifier { self.notif_tx.clone() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::interest::ReorgNotification; + use std::time::Duration; + + fn reorg_notification(ancestor: u64) -> ReorgNotification { + ReorgNotification { common_ancestor: ancestor, removed_blocks: vec![] } + } + + #[test] + fn push_reorg_evicts_oldest() { + let notifier = ChainNotifier::new(16); + for i in 0..MAX_REORG_ENTRIES + 5 { + notifier.send_reorg(reorg_notification(i as u64)).ok(); + } + let buf = notifier.reorgs.read().unwrap(); + assert_eq!(buf.len(), MAX_REORG_ENTRIES); + assert_eq!(buf[0].1.common_ancestor, 5); + } + + #[test] + fn reorgs_since_filters_by_time() { + let notifier = ChainNotifier::new(16); + let before = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + notifier.send_reorg(reorg_notification(10)).ok(); + let mid = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + notifier.send_reorg(reorg_notification(8)).ok(); + + let all = notifier.reorgs_since(before); + assert_eq!(all.len(), 2); + + let recent = notifier.reorgs_since(mid); + assert_eq!(recent.len(), 1); + assert_eq!(recent[0].common_ancestor, 8); + } + + #[test] + fn reorgs_since_skips_pre_creation_reorgs() { + let notifier = ChainNotifier::new(16); + notifier.send_reorg(reorg_notification(5)).ok(); + std::thread::sleep(Duration::from_millis(5)); + + let after = Instant::now(); + let reorgs = notifier.reorgs_since(after); + assert!(reorgs.is_empty()); + } +} diff --git a/crates/rpc/src/config/ctx.rs b/crates/rpc/src/config/ctx.rs index 5c933ef..27b70f4 100644 --- a/crates/rpc/src/config/ctx.rs +++ b/crates/rpc/src/config/ctx.rs @@ -101,7 +101,8 @@ impl StorageRpcCtx { config: StorageRpcConfig, ) -> Self { let tracing_semaphore = Arc::new(Semaphore::new(config.max_tracing_requests)); - let filter_manager = FilterManager::new(config.stale_filter_ttl, config.stale_filter_ttl); + let filter_manager = + FilterManager::new(chain.clone(), config.stale_filter_ttl, config.stale_filter_ttl); let sub_manager = SubscriptionManager::new(chain.notif_sender(), config.stale_filter_ttl); let gas_cache = GasOracleCache::new(); Self { diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index 75d9479..6e818e2 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -32,7 +32,7 @@ use revm_inspectors::access_list::AccessListInspector; use serde::Serialize; use signet_cold::{HeaderSpecifier, ReceiptSpecifier}; use signet_hot::{HistoryRead, HotKv, db::HotDbRead, model::HotKvRead}; -use tracing::{Instrument, debug, trace_span}; +use tracing::{Instrument, debug, trace, trace_span}; use trevm::{ EstimationResult, revm::context::result::ExecutionResult, revm::database::DBErrorMarker, }; @@ -1078,12 +1078,38 @@ where let fm = ctx.filter_manager(); let mut entry = fm.get_mut(id).ok_or_else(|| format!("filter not found: {id}"))?; + // Scan the global reorg ring buffer for notifications received + // since this filter's last poll, then lazily compute removed logs + // and rewind `next_start_block`. + let reorgs = fm.reorgs_since(entry.last_poll_time()); + let removed = entry.compute_removed_logs(&reorgs); + if !removed.is_empty() { + trace!(count = removed.len(), "computed removed logs from reorg ring buffer"); + } + let latest = ctx.tags().latest(); let start = entry.next_start_block(); + // Implicit reorg detection: if latest has moved backward past our + // window, a reorg occurred that we missed (e.g. broadcast lagged). + // Return any removed logs we do have, then reset. + if latest + 1 < start { + trace!(latest, start, "implicit reorg detected, resetting filter"); + entry.touch_poll_time(); + return Ok(if removed.is_empty() { + entry.empty_output() + } else { + FilterOutput::from(removed) + }); + } + if start > latest { - entry.mark_polled(latest); - return Ok(entry.empty_output()); + entry.touch_poll_time(); + return Ok(if removed.is_empty() { + entry.empty_output() + } else { + FilterOutput::from(removed) + }); } let cold = ctx.cold(); @@ -1110,7 +1136,15 @@ where let stream = cold.stream_logs(resolved, max_logs, deadline).await.map_err(|e| e.to_string())?; - let logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?; + let mut logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?; + + // Prepend removed logs so the client sees removals before + // the replacement data. + if !removed.is_empty() { + let mut combined = removed; + combined.append(&mut logs); + logs = combined; + } entry.mark_polled(latest); Ok(FilterOutput::from(logs)) diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index ae09367..3f97ffd 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -1,9 +1,12 @@ //! Filter management for `eth_newFilter` / `eth_getFilterChanges`. -use crate::interest::{InterestKind, buffer::EventBuffer}; +use crate::{ + config::ChainNotifier, + interest::{InterestKind, ReorgNotification, buffer::EventBuffer}, +}; use alloy::{ primitives::{B256, U64}, - rpc::types::Filter, + rpc::types::{Filter, Log}, }; use dashmap::{DashMap, mapref::one::RefMut}; use std::{ @@ -24,7 +27,11 @@ pub(crate) type FilterOutput = EventBuffer; /// /// Records the filter details, the [`Instant`] at which the filter was last /// polled, and the first block whose contents should be considered. -#[derive(Debug, Clone, PartialEq, Eq)] +/// +/// `last_poll_time` doubles as the cursor into the global reorg ring +/// buffer — at poll time the filter scans for reorgs received after +/// this instant. +#[derive(Debug, Clone)] pub(crate) struct ActiveFilter { next_start_block: u64, last_poll_time: Instant, @@ -38,7 +45,7 @@ impl core::fmt::Display for ActiveFilter { "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?} }}", self.next_start_block, self.last_poll_time.elapsed().as_millis(), - self.kind + self.kind, ) } } @@ -60,11 +67,25 @@ impl ActiveFilter { self.last_poll_time = Instant::now(); } + /// Update the poll timestamp without advancing `next_start_block`. + /// + /// Used on early-return paths (implicit reorg, no new blocks) where + /// `compute_removed_logs` has already rewound `next_start_block` and + /// we must preserve that position for the next forward scan. + pub(crate) fn touch_poll_time(&mut self) { + self.last_poll_time = Instant::now(); + } + /// Get the next start block for the filter. pub(crate) const fn next_start_block(&self) -> u64 { self.next_start_block } + /// Get the instant at which the filter was last polled (or created). + pub(crate) const fn last_poll_time(&self) -> Instant { + self.last_poll_time + } + /// Get the duration since the filter was last polled. pub(crate) fn time_since_last_poll(&self) -> Duration { self.last_poll_time.elapsed() @@ -74,6 +95,49 @@ impl ActiveFilter { pub(crate) const fn empty_output(&self) -> FilterOutput { self.kind.empty_output() } + + /// Compute removed logs from a sequence of reorg notifications. + /// + /// Walks the reorgs in order, computing snapshots lazily from the + /// filter's current [`next_start_block`]. For each reorg, only logs + /// from blocks the filter had already delivered (block number below + /// the snapshot) are included. + /// + /// Updates `next_start_block` to the rewound value so the subsequent + /// forward scan starts from the correct position. + /// + /// Block filters return an empty vec — the Ethereum JSON-RPC spec + /// does not define `removed` semantics for block filters. + /// + /// [`next_start_block`]: Self::next_start_block + pub(crate) fn compute_removed_logs(&mut self, reorgs: &[Arc]) -> Vec { + let Some(filter) = self.kind.as_filter() else { + for reorg in reorgs { + self.next_start_block = self.next_start_block.min(reorg.common_ancestor + 1); + } + return Vec::new(); + }; + + let mut removed = Vec::new(); + for notification in reorgs { + let snapshot = self.next_start_block; + self.next_start_block = self.next_start_block.min(notification.common_ancestor + 1); + for block in ¬ification.removed_blocks { + if block.number >= snapshot { + continue; + } + for log in &block.logs { + if !filter.matches(&log.inner) { + continue; + } + let mut log = log.clone(); + log.removed = true; + removed.push(log); + } + } + } + removed + } } /// Inner logic for [`FilterManager`]. @@ -102,10 +166,14 @@ impl FilterManagerInner { fn install(&self, current_block: u64, kind: InterestKind) -> FilterId { let id = self.next_id(); - let next_start_block = current_block + 1; - let _ = self - .filters - .insert(id, ActiveFilter { next_start_block, last_poll_time: Instant::now(), kind }); + let _ = self.filters.insert( + id, + ActiveFilter { + next_start_block: current_block + 1, + last_poll_time: Instant::now(), + kind, + }, + ); id } @@ -136,22 +204,39 @@ impl FilterManagerInner { /// Filters are stored in a [`DashMap`] that maps filter IDs to active filters. /// Filter IDs are assigned sequentially, starting from 1. /// -/// Calling [`Self::new`] spawns a task that periodically cleans stale filters. -/// This task runs on a separate thread to avoid [`DashMap::retain`] deadlock. -/// See [`DashMap`] documentation for more information. +/// Reorg notifications are read from the [`ChainNotifier`]'s ring buffer +/// at poll time. Filters compute their removed logs lazily by scanning +/// for entries received since the last poll. +/// +/// Calling [`Self::new`] spawns an OS thread that periodically cleans +/// stale filters (using a separate thread to avoid [`DashMap::retain`] +/// deadlock). The worker holds a [`Weak`] reference and self-terminates +/// when the manager is dropped. #[derive(Debug, Clone)] pub(crate) struct FilterManager { inner: Arc, + chain: ChainNotifier, } impl FilterManager { - /// Create a new filter manager. Spawn a task to clean stale filters. - pub(crate) fn new(clean_interval: Duration, age_limit: Duration) -> Self { + /// Create a new filter manager. + /// + /// Spawns a cleanup thread for stale filters. Reorg notifications + /// are read directly from the [`ChainNotifier`]'s ring buffer at + /// poll time — no background listener is needed. + pub(crate) fn new(chain: ChainNotifier, clean_interval: Duration, age_limit: Duration) -> Self { let inner = Arc::new(FilterManagerInner::new()); - let manager = Self { inner }; + let manager = Self { inner, chain }; FilterCleanTask::new(Arc::downgrade(&manager.inner), clean_interval, age_limit).spawn(); manager } + + /// Return all reorg notifications received after `since`. + /// + /// Delegates to [`ChainNotifier::reorgs_since`]. + pub(crate) fn reorgs_since(&self, since: Instant) -> Vec> { + self.chain.reorgs_since(since) + } } impl std::ops::Deref for FilterManager { @@ -195,6 +280,167 @@ impl FilterCleanTask { } } +#[cfg(test)] +mod tests { + use super::*; + use crate::interest::{InterestKind, RemovedBlock}; + use alloy::primitives::{Address, B256, Bytes, LogData, address, b256}; + + fn block_filter(start: u64) -> ActiveFilter { + ActiveFilter { + next_start_block: start, + last_poll_time: Instant::now(), + kind: InterestKind::Block, + } + } + + fn log_filter(start: u64, addr: Address) -> ActiveFilter { + ActiveFilter { + next_start_block: start, + last_poll_time: Instant::now(), + kind: InterestKind::Log(Box::new(Filter::new().address(addr))), + } + } + + fn test_log(addr: Address, number: u64, hash: B256) -> Log { + Log { + inner: alloy::primitives::Log { + address: addr, + data: LogData::new_unchecked(vec![], Bytes::new()), + }, + block_hash: Some(hash), + block_number: Some(number), + block_timestamp: Some(1_000_000 + number), + transaction_hash: Some(B256::ZERO), + transaction_index: Some(0), + log_index: Some(0), + removed: false, + } + } + + fn reorg_notification(ancestor: u64, removed: Vec) -> ReorgNotification { + ReorgNotification { common_ancestor: ancestor, removed_blocks: removed } + } + + fn removed_block(number: u64, logs: Vec) -> RemovedBlock { + RemovedBlock { + number, + hash: b256!("0x0000000000000000000000000000000000000000000000000000000000000001"), + timestamp: 1_000_000 + number, + logs, + } + } + + #[test] + fn compute_removed_logs_rewinds_start_block() { + let mut f = block_filter(10); + let reorg = Arc::new(reorg_notification(7, vec![])); + + f.compute_removed_logs(&[reorg]); + + assert_eq!(f.next_start_block, 8); + } + + #[test] + fn compute_removed_logs_matches_removed() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let hash = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let mut f = log_filter(11, addr); + + let reorg = Arc::new(reorg_notification( + 8, + vec![ + removed_block(9, vec![test_log(addr, 9, hash)]), + removed_block(10, vec![test_log(addr, 10, hash)]), + ], + )); + + let removed = f.compute_removed_logs(&[reorg]); + + assert_eq!(removed.len(), 2); + assert!(removed.iter().all(|l| l.removed)); + assert!(removed.iter().all(|l| l.inner.address == addr)); + // Verify RPC metadata is preserved through clone-and-set path. + assert!(removed.iter().all(|l| l.transaction_hash == Some(B256::ZERO))); + assert!(removed.iter().all(|l| l.log_index == Some(0))); + } + + #[test] + fn compute_removed_logs_skips_undelivered_blocks() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let hash = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + // Filter has only delivered up to block 10 (next_start = 11). + let mut f = log_filter(11, addr); + + // Reorg removes blocks 10, 11, 12. Only block 10 was delivered. + let reorg = Arc::new(reorg_notification( + 9, + vec![ + removed_block(10, vec![test_log(addr, 10, hash)]), + removed_block(11, vec![test_log(addr, 11, hash)]), + removed_block(12, vec![test_log(addr, 12, hash)]), + ], + )); + + let removed = f.compute_removed_logs(&[reorg]); + + assert_eq!(removed.len(), 1); + assert_eq!(removed[0].block_number, Some(10)); + } + + #[test] + fn compute_removed_logs_cascading() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let hash = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + // Filter has delivered up to block 100 (next_start = 101). + let mut f = log_filter(101, addr); + + // Reorg A: rewinds to 98, removes 99-100. + let reorg_a = Arc::new(reorg_notification( + 98, + vec![ + removed_block(99, vec![test_log(addr, 99, hash)]), + removed_block(100, vec![test_log(addr, 100, hash)]), + ], + )); + + // Reorg B: rewinds to 95, removes 96-103. + let reorg_b = Arc::new(reorg_notification( + 95, + vec![ + removed_block(96, vec![test_log(addr, 96, hash)]), + removed_block(97, vec![test_log(addr, 97, hash)]), + removed_block(98, vec![test_log(addr, 98, hash)]), + removed_block(99, vec![test_log(addr, 99, hash)]), + removed_block(100, vec![test_log(addr, 100, hash)]), + removed_block(101, vec![test_log(addr, 101, hash)]), + removed_block(102, vec![test_log(addr, 102, hash)]), + removed_block(103, vec![test_log(addr, 103, hash)]), + ], + )); + + let removed = f.compute_removed_logs(&[reorg_a, reorg_b]); + + // Reorg A: snapshot=101, blocks 99-100 < 101 → 2 logs. + // Reorg B: snapshot=99, blocks 96-98 < 99 → 3 logs. + // Blocks 99-103 from reorg B are >= 99 → skipped. + assert_eq!(removed.len(), 5); + assert_eq!(f.next_start_block, 96); + } + + #[test] + fn compute_removed_logs_block_filter_empty() { + let mut f = block_filter(10); + let reorg = Arc::new(reorg_notification(5, vec![removed_block(6, vec![])])); + + let removed = f.compute_removed_logs(&[reorg]); + + assert!(removed.is_empty()); + // But the rewind still happened. + assert_eq!(f.next_start_block, 6); + } +} + // Some code in this file has been copied and modified from reth // // The original license is included below: diff --git a/crates/rpc/src/interest/kind.rs b/crates/rpc/src/interest/kind.rs index f0ed907..cb09c74 100644 --- a/crates/rpc/src/interest/kind.rs +++ b/crates/rpc/src/interest/kind.rs @@ -109,21 +109,13 @@ impl InterestKind { let logs: VecDeque = reorg .removed_blocks - .into_iter() - .flat_map(|block| { - let block_hash = block.header.hash_slow(); - let block_number = block.header.number; - let block_timestamp = block.header.timestamp; - block.logs.into_iter().filter(move |log| filter.matches(log)).map(move |log| Log { - inner: log, - block_hash: Some(block_hash), - block_number: Some(block_number), - block_timestamp: Some(block_timestamp), - transaction_hash: None, - transaction_index: None, - log_index: None, - removed: true, - }) + .iter() + .flat_map(|block| block.logs.iter()) + .filter(|log| filter.matches(&log.inner)) + .map(|log| { + let mut log = log.clone(); + log.removed = true; + log }) .collect(); @@ -134,37 +126,51 @@ impl InterestKind { #[cfg(test)] mod tests { use super::*; - use crate::interest::RemovedBlock; use alloy::primitives::{Address, B256, Bytes, LogData, address, b256}; - fn test_log(addr: Address, topic: B256) -> alloy::primitives::Log { - alloy::primitives::Log { - address: addr, - data: LogData::new_unchecked(vec![topic], Bytes::new()), + fn test_log(addr: Address, topic: B256, number: u64, hash: B256) -> Log { + Log { + inner: alloy::primitives::Log { + address: addr, + data: LogData::new_unchecked(vec![topic], Bytes::new()), + }, + block_hash: Some(hash), + block_number: Some(number), + block_timestamp: Some(1_000_000 + number), + transaction_hash: Some(B256::ZERO), + transaction_index: Some(0), + log_index: Some(0), + removed: false, } } - fn test_header(number: u64) -> alloy::consensus::Header { - alloy::consensus::Header { number, timestamp: 1_000_000 + number, ..Default::default() } - } - fn test_filter(addr: Address) -> Filter { Filter::new().address(addr) } + fn test_removed_block( + number: u64, + hash: B256, + logs: Vec, + ) -> crate::interest::RemovedBlock { + crate::interest::RemovedBlock { number, hash, timestamp: 1_000_000 + number, logs } + } + #[test] fn filter_reorg_for_sub_matches_logs() { let addr = address!("0x0000000000000000000000000000000000000001"); let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000099"); - let header = test_header(11); let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { - header: header.clone(), - logs: vec![test_log(addr, topic)], - }], + removed_blocks: vec![test_removed_block( + 11, + block_hash, + vec![test_log(addr, topic, 11, block_hash)], + )], }; let buf = kind.filter_reorg_for_sub(reorg); @@ -173,9 +179,9 @@ mod tests { assert_eq!(logs.len(), 1); assert!(logs[0].removed); assert_eq!(logs[0].inner.address, addr); - assert_eq!(logs[0].block_hash.unwrap(), header.hash_slow()); - assert_eq!(logs[0].block_number.unwrap(), 11); - assert_eq!(logs[0].block_timestamp.unwrap(), 1_000_011); + assert_eq!(logs[0].block_hash, Some(block_hash)); + assert_eq!(logs[0].block_number, Some(11)); + assert_eq!(logs[0].block_timestamp, Some(1_000_011)); } #[test] @@ -183,14 +189,17 @@ mod tests { let addr = address!("0x0000000000000000000000000000000000000001"); let other = address!("0x0000000000000000000000000000000000000002"); let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000099"); let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { - header: test_header(11), - logs: vec![test_log(other, topic)], - }], + removed_blocks: vec![test_removed_block( + 11, + block_hash, + vec![test_log(other, topic, 11, block_hash)], + )], }; let buf = kind.filter_reorg_for_sub(reorg); @@ -200,9 +209,12 @@ mod tests { #[test] fn filter_reorg_for_sub_block_returns_empty() { + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { header: test_header(11), logs: vec![] }], + removed_blocks: vec![test_removed_block(11, block_hash, vec![])], }; let buf = InterestKind::Block.filter_reorg_for_sub(reorg); diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index 3ad2620..ce5472f 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -25,12 +25,18 @@ //! reference to the `Arc`, so they self-terminate once all //! strong references are dropped. //! -//! OS threads are used (rather than tokio tasks) because +//! OS threads are used for cleanup (rather than tokio tasks) because //! [`DashMap::retain`] can deadlock if called from an async context //! that also holds a `DashMap` read guard on the same shard. Running //! cleanup on a dedicated OS thread ensures the retain lock is never //! contended with an in-flight async handler. //! +//! [`FilterManager`] reads reorg notifications directly from the +//! [`ChainNotifier`]'s ring buffer at poll time — no background +//! listener task is needed. +//! +//! [`ChainNotifier`]: crate::ChainNotifier +//! //! [`Weak`]: std::sync::Weak //! [`DashMap`]: dashmap::DashMap //! [`DashMap::retain`]: dashmap::DashMap::retain @@ -69,13 +75,23 @@ pub enum ChainEvent { Reorg(ReorgNotification), } -/// Data from a single block removed during a chain reorganization. +/// A block that was removed during a chain reorganization. #[derive(Debug, Clone)] pub struct RemovedBlock { - /// The header of the removed block. - pub header: alloy::consensus::Header, - /// Logs emitted by the removed block. - pub logs: Vec, + /// The block number. + pub number: u64, + /// The block hash. + pub hash: alloy::primitives::B256, + /// The block timestamp. + pub timestamp: u64, + /// Logs emitted in the removed block. + /// + /// Uses the RPC log type so that `transaction_hash` and `log_index` + /// from the original receipts can be preserved, as required by the + /// Ethereum JSON-RPC spec for removed logs. These fields are + /// populated when cold storage has indexed the block; otherwise the + /// vec may be empty. + pub logs: Vec, } /// Notification sent when a chain reorganization is detected. @@ -83,6 +99,6 @@ pub struct RemovedBlock { pub struct ReorgNotification { /// The block number of the common ancestor (last block still valid). pub common_ancestor: u64, - /// Blocks removed by the reorg, each carrying its header and logs. + /// The blocks that were removed, ordered by block number. pub removed_blocks: Vec, }