From 379d1c6b4e54a89c37716e9656c59aa35c923b91 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 10 Mar 2026 08:02:53 -0400 Subject: [PATCH 01/13] feat: handle reorgs in `get_filter_changes` with reorg watermark Add a `reorg_watermark` field to `ActiveFilter` that records the common ancestor block when a chain reorganization occurs. `FilterManager` now subscribes to `ChainEvent::Reorg` broadcasts and eagerly propagates watermarks to all active filters. On the next poll, `get_filter_changes` rewinds `next_start_block` so re-fetched data reflects the new chain. An implicit reorg detection check (latest < start) provides a belt-and-suspenders fallback when the explicit watermark is missed. Co-Authored-By: Claude Opus 4.6 --- crates/rpc/src/config/ctx.rs | 6 +- crates/rpc/src/eth/endpoints.rs | 15 ++- crates/rpc/src/interest/filters.rs | 184 +++++++++++++++++++++++++++-- 3 files changed, 195 insertions(+), 10 deletions(-) diff --git a/crates/rpc/src/config/ctx.rs b/crates/rpc/src/config/ctx.rs index 5c933ef..166979b 100644 --- a/crates/rpc/src/config/ctx.rs +++ b/crates/rpc/src/config/ctx.rs @@ -101,7 +101,11 @@ impl StorageRpcCtx { config: StorageRpcConfig, ) -> Self { let tracing_semaphore = Arc::new(Semaphore::new(config.max_tracing_requests)); - let filter_manager = FilterManager::new(config.stale_filter_ttl, config.stale_filter_ttl); + let filter_manager = FilterManager::new( + &chain.notif_sender(), + config.stale_filter_ttl, + config.stale_filter_ttl, + ); let sub_manager = SubscriptionManager::new(chain.notif_sender(), config.stale_filter_ttl); let gas_cache = GasOracleCache::new(); Self { diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index 75d9479..bfa0a15 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -32,7 +32,7 @@ use revm_inspectors::access_list::AccessListInspector; use serde::Serialize; use signet_cold::{HeaderSpecifier, ReceiptSpecifier}; use signet_hot::{HistoryRead, HotKv, db::HotDbRead, model::HotKvRead}; -use tracing::{Instrument, debug, trace_span}; +use tracing::{Instrument, debug, trace, trace_span}; use trevm::{ EstimationResult, revm::context::result::ExecutionResult, revm::database::DBErrorMarker, }; @@ -1078,9 +1078,22 @@ where let fm = ctx.filter_manager(); let mut entry = fm.get_mut(id).ok_or_else(|| format!("filter not found: {id}"))?; + // Handle any pending reorg watermark. + if let Some(watermark) = entry.handle_reorg() { + trace!(watermark, "filter reset due to reorg"); + } + let latest = ctx.tags().latest(); let start = entry.next_start_block(); + // Implicit reorg detection: if latest has moved backward past our + // window, a reorg occurred that we missed. Reset to avoid skipping. + if latest + 1 < start { + trace!(latest, start, "implicit reorg detected, resetting filter"); + entry.mark_polled(latest); + return Ok(entry.empty_output()); + } + if start > latest { entry.mark_polled(latest); return Ok(entry.empty_output()); diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index ae09367..89f51ce 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -1,6 +1,6 @@ //! Filter management for `eth_newFilter` / `eth_getFilterChanges`. -use crate::interest::{InterestKind, buffer::EventBuffer}; +use crate::interest::{ChainEvent, InterestKind, buffer::EventBuffer}; use alloy::{ primitives::{B256, U64}, rpc::types::Filter, @@ -13,6 +13,7 @@ use std::{ }, time::{Duration, Instant}, }; +use tokio::sync::broadcast; use tracing::trace; type FilterId = U64; @@ -29,17 +30,22 @@ pub(crate) struct ActiveFilter { next_start_block: u64, last_poll_time: Instant, kind: InterestKind, + reorg_watermark: Option, } impl core::fmt::Display for ActiveFilter { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { write!( f, - "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?} }}", + "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?}", self.next_start_block, self.last_poll_time.elapsed().as_millis(), self.kind - ) + )?; + if let Some(w) = self.reorg_watermark { + write!(f, ", reorg_watermark: {w}")?; + } + write!(f, " }}") } } @@ -74,6 +80,31 @@ impl ActiveFilter { pub(crate) const fn empty_output(&self) -> FilterOutput { self.kind.empty_output() } + + /// Record that a reorg occurred back to this ancestor block. + /// + /// If multiple reorgs arrive before the filter is polled, the lowest + /// (most conservative) watermark is kept. + pub(crate) fn set_reorg_watermark(&mut self, common_ancestor: u64) { + self.reorg_watermark = + Some(self.reorg_watermark.map_or(common_ancestor, |w| w.min(common_ancestor))); + } + + /// Reset filter state if a pending reorg affected this filter's window. + /// + /// Takes and clears the watermark. If the watermark is below + /// `next_start_block`, rewinds the start block so the next poll + /// re-fetches from just after the common ancestor. Returns the + /// watermark value when a reset occurred, `None` otherwise. + pub(crate) fn handle_reorg(&mut self) -> Option { + let watermark = self.reorg_watermark.take()?; + if watermark < self.next_start_block { + self.next_start_block = watermark + 1; + Some(watermark) + } else { + None + } + } } /// Inner logic for [`FilterManager`]. @@ -103,9 +134,15 @@ impl FilterManagerInner { fn install(&self, current_block: u64, kind: InterestKind) -> FilterId { let id = self.next_id(); let next_start_block = current_block + 1; - let _ = self - .filters - .insert(id, ActiveFilter { next_start_block, last_poll_time: Instant::now(), kind }); + let _ = self.filters.insert( + id, + ActiveFilter { + next_start_block, + last_poll_time: Instant::now(), + kind, + reorg_watermark: None, + }, + ); id } @@ -124,6 +161,13 @@ impl FilterManagerInner { self.filters.remove(&id) } + /// Set a reorg watermark on all active filters. + pub(crate) fn set_reorg_watermark_all(&self, common_ancestor: u64) { + self.filters + .iter_mut() + .for_each(|mut entry| entry.value_mut().set_reorg_watermark(common_ancestor)); + } + /// Clean stale filters that have not been polled in a while. fn clean_stale(&self, older_than: Duration) { self.filters.retain(|_, filter| filter.time_since_last_poll() < older_than); @@ -145,11 +189,20 @@ pub(crate) struct FilterManager { } impl FilterManager { - /// Create a new filter manager. Spawn a task to clean stale filters. - pub(crate) fn new(clean_interval: Duration, age_limit: Duration) -> Self { + /// Create a new filter manager. + /// + /// Spawns a cleanup thread for stale filters and a tokio task that + /// listens for [`ChainEvent::Reorg`] events and propagates watermarks + /// to all active filters. + pub(crate) fn new( + chain_events: &broadcast::Sender, + clean_interval: Duration, + age_limit: Duration, + ) -> Self { let inner = Arc::new(FilterManagerInner::new()); let manager = Self { inner }; FilterCleanTask::new(Arc::downgrade(&manager.inner), clean_interval, age_limit).spawn(); + FilterReorgTask::new(Arc::downgrade(&manager.inner), chain_events.subscribe()).spawn(); manager } } @@ -195,6 +248,121 @@ impl FilterCleanTask { } } +/// Task that listens for reorg events and propagates watermarks to all +/// active filters. +/// +/// Uses a [`Weak`] reference to self-terminate when the [`FilterManager`] +/// is dropped. +struct FilterReorgTask { + manager: Weak, + rx: broadcast::Receiver, +} + +impl FilterReorgTask { + const fn new(manager: Weak, rx: broadcast::Receiver) -> Self { + Self { manager, rx } + } + + /// Spawn the listener as a tokio task. + fn spawn(self) { + tokio::spawn(self.run()); + } + + async fn run(mut self) { + loop { + let event = match self.rx.recv().await { + Ok(event) => event, + Err(broadcast::error::RecvError::Lagged(skipped)) => { + trace!(skipped, "filter reorg listener missed notifications"); + continue; + } + Err(_) => break, + }; + + let ChainEvent::Reorg(reorg) = event else { continue }; + + let Some(manager) = self.manager.upgrade() else { break }; + manager.set_reorg_watermark_all(reorg.common_ancestor); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::interest::InterestKind; + + fn block_filter(start: u64) -> ActiveFilter { + ActiveFilter { + next_start_block: start, + last_poll_time: Instant::now(), + kind: InterestKind::Block, + reorg_watermark: None, + } + } + + #[test] + fn set_reorg_watermark_keeps_minimum() { + let mut f = block_filter(10); + f.set_reorg_watermark(8); + assert_eq!(f.reorg_watermark, Some(8)); + + // A higher watermark does not overwrite the lower one. + f.set_reorg_watermark(9); + assert_eq!(f.reorg_watermark, Some(8)); + + // A lower watermark replaces the current one. + f.set_reorg_watermark(5); + assert_eq!(f.reorg_watermark, Some(5)); + } + + #[test] + fn handle_reorg_resets_start_block() { + let mut f = block_filter(10); + f.set_reorg_watermark(7); + + let result = f.handle_reorg(); + assert_eq!(result, Some(7)); + assert_eq!(f.next_start_block, 8); + assert!(f.reorg_watermark.is_none()); + } + + #[test] + fn handle_reorg_noop_when_watermark_at_or_above_start() { + let mut f = block_filter(10); + f.set_reorg_watermark(10); + + let result = f.handle_reorg(); + assert!(result.is_none()); + // next_start_block unchanged. + assert_eq!(f.next_start_block, 10); + assert!(f.reorg_watermark.is_none()); + } + + #[test] + fn handle_reorg_clears_watermark() { + let mut f = block_filter(10); + f.set_reorg_watermark(5); + f.handle_reorg(); + + // Second call returns None — watermark already consumed. + assert!(f.handle_reorg().is_none()); + } + + #[test] + fn set_reorg_watermark_all_propagates() { + let inner = FilterManagerInner::new(); + inner.install_block_filter(20); + inner.install_block_filter(30); + + inner.set_reorg_watermark_all(15); + + inner.filters.iter().for_each(|entry| { + assert_eq!(entry.value().reorg_watermark, Some(15)); + }); + } +} + // Some code in this file has been copied and modified from reth // // The original license is included below: From cb9d35cf10600dd6dcb37f4fb85e2f052bd91e07 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 13 Mar 2026 10:14:15 -0400 Subject: [PATCH 02/13] refactor: redesign filter reorg handling with Arc-shared notifications Replace the simple u64 watermark with a Vec of Arc paired with next_start_block snapshots. This fixes two issues: 1. Race condition: filters created after a reorg no longer receive false watermarks, guarded by a created_at timestamp comparison. 2. Missing removed logs: get_filter_changes now emits `removed: true` logs per the Ethereum JSON-RPC spec. Each pending reorg's snapshot determines which removed blocks the client already saw. Restructure ReorgNotification to group logs per block (RemovedBlock) so filters can determine relevance by block number. The Arc sharing means no log data is cloned until poll time, and automatic drop eliminates the need for cleanup passes. Co-Authored-By: Claude Opus 4.6 --- crates/node/src/node.rs | 5 +- crates/rpc/src/eth/endpoints.rs | 23 +- crates/rpc/src/interest/filters.rs | 331 ++++++++++++++++++++++------- crates/rpc/src/interest/kind.rs | 67 +++--- crates/rpc/src/interest/mod.rs | 12 +- 5 files changed, 321 insertions(+), 117 deletions(-) diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index f8e2f95..22bd3b9 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -368,10 +368,11 @@ where let removed_blocks = drained .into_iter() .map(|d| { - let header = d.header.into_inner(); + let number = d.header.number(); + let hash = d.header.hash(); let logs = d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect(); - RemovedBlock { header, logs } + RemovedBlock { number, hash, logs } }) .collect(); let notif = ReorgNotification { common_ancestor, removed_blocks }; diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index bfa0a15..d697492 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -1078,16 +1078,21 @@ where let fm = ctx.filter_manager(); let mut entry = fm.get_mut(id).ok_or_else(|| format!("filter not found: {id}"))?; - // Handle any pending reorg watermark. - if let Some(watermark) = entry.handle_reorg() { - trace!(watermark, "filter reset due to reorg"); + // Drain any pending reorg notifications, producing removed logs + // for log filters. `next_start_block` was already rewound eagerly + // when the reorg was received. + let removed = entry.drain_reorgs(); + if !removed.is_empty() { + trace!(count = removed.len(), "drained removed logs from pending reorgs"); } let latest = ctx.tags().latest(); let start = entry.next_start_block(); // Implicit reorg detection: if latest has moved backward past our - // window, a reorg occurred that we missed. Reset to avoid skipping. + // window, a reorg occurred that we missed (e.g. broadcast lagged). + // Reset to avoid skipping. Removed logs are unavailable in this + // degraded path. if latest + 1 < start { trace!(latest, start, "implicit reorg detected, resetting filter"); entry.mark_polled(latest); @@ -1123,7 +1128,15 @@ where let stream = cold.stream_logs(resolved, max_logs, deadline).await.map_err(|e| e.to_string())?; - let logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?; + let mut logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?; + + // Prepend removed logs so the client sees removals before + // the replacement data. + if !removed.is_empty() { + let mut combined = removed; + combined.append(&mut logs); + logs = combined; + } entry.mark_polled(latest); Ok(FilterOutput::from(logs)) diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index 89f51ce..fd60b4c 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -1,9 +1,9 @@ //! Filter management for `eth_newFilter` / `eth_getFilterChanges`. -use crate::interest::{ChainEvent, InterestKind, buffer::EventBuffer}; +use crate::interest::{ChainEvent, InterestKind, ReorgNotification, buffer::EventBuffer}; use alloy::{ primitives::{B256, U64}, - rpc::types::Filter, + rpc::types::{Filter, Log}, }; use dashmap::{DashMap, mapref::one::RefMut}; use std::{ @@ -21,31 +21,38 @@ type FilterId = U64; /// Output of a polled filter: log entries or block hashes. pub(crate) type FilterOutput = EventBuffer; +/// A pending reorg notification paired with the filter's +/// `next_start_block` at the time the reorg was received. +/// +/// The snapshot records which blocks the filter had already delivered, +/// so [`ActiveFilter::drain_reorgs`] can determine which removed logs +/// are relevant. +type PendingReorg = (Arc, u64); + /// An active filter. /// /// Records the filter details, the [`Instant`] at which the filter was last /// polled, and the first block whose contents should be considered. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub(crate) struct ActiveFilter { next_start_block: u64, last_poll_time: Instant, + created_at: Instant, kind: InterestKind, - reorg_watermark: Option, + pending_reorgs: Vec, } impl core::fmt::Display for ActiveFilter { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { write!( f, - "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?}", + "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?}, \ + pending_reorgs: {} }}", self.next_start_block, self.last_poll_time.elapsed().as_millis(), - self.kind - )?; - if let Some(w) = self.reorg_watermark { - write!(f, ", reorg_watermark: {w}")?; - } - write!(f, " }}") + self.kind, + self.pending_reorgs.len(), + ) } } @@ -64,6 +71,7 @@ impl ActiveFilter { pub(crate) fn mark_polled(&mut self, current_block: u64) { self.next_start_block = current_block + 1; self.last_poll_time = Instant::now(); + self.pending_reorgs.clear(); } /// Get the next start block for the filter. @@ -81,29 +89,67 @@ impl ActiveFilter { self.kind.empty_output() } - /// Record that a reorg occurred back to this ancestor block. + /// Record a reorg notification, eagerly rewinding `next_start_block`. + /// + /// The notification is stored (behind an [`Arc`]) alongside a snapshot + /// of the filter's `next_start_block` at the time of the reorg, so + /// that [`drain_reorgs`] can later determine which removed logs the + /// client has already seen. /// - /// If multiple reorgs arrive before the filter is polled, the lowest - /// (most conservative) watermark is kept. - pub(crate) fn set_reorg_watermark(&mut self, common_ancestor: u64) { - self.reorg_watermark = - Some(self.reorg_watermark.map_or(common_ancestor, |w| w.min(common_ancestor))); + /// If `received_at` is before the filter's creation time, the reorg + /// is silently skipped — the filter was installed after the reorg + /// occurred and its `next_start_block` already reflects the post-reorg + /// chain state. + /// + /// [`drain_reorgs`]: Self::drain_reorgs + fn push_reorg(&mut self, reorg: Arc, received_at: Instant) { + if self.created_at > received_at { + return; + } + + let snapshot = self.next_start_block; + self.next_start_block = self.next_start_block.min(reorg.common_ancestor + 1); + self.pending_reorgs.push((reorg, snapshot)); } - /// Reset filter state if a pending reorg affected this filter's window. + /// Drain pending reorgs, returning matched removed logs with + /// `removed: true`. /// - /// Takes and clears the watermark. If the watermark is below - /// `next_start_block`, rewinds the start block so the next poll - /// re-fetches from just after the common ancestor. Returns the - /// watermark value when a reset occurred, `None` otherwise. - pub(crate) fn handle_reorg(&mut self) -> Option { - let watermark = self.reorg_watermark.take()?; - if watermark < self.next_start_block { - self.next_start_block = watermark + 1; - Some(watermark) - } else { - None + /// For each pending reorg, only logs from blocks the filter had + /// already delivered (block number below the snapshot) are included. + /// Block filters return an empty vec — the Ethereum JSON-RPC spec + /// does not define `removed` semantics for block filters. + pub(crate) fn drain_reorgs(&mut self) -> Vec { + let reorgs = std::mem::take(&mut self.pending_reorgs); + + let Some(filter) = self.kind.as_filter() else { + return Vec::new(); + }; + + let mut removed = Vec::new(); + for (notification, snapshot_start) in reorgs { + for block in ¬ification.removed_blocks { + if block.number >= snapshot_start { + continue; + } + for log in &block.logs { + if !filter.matches(log) { + continue; + } + removed.push(Log { + inner: log.clone(), + block_hash: Some(block.hash), + block_number: Some(block.number), + block_timestamp: None, + transaction_hash: None, + transaction_index: None, + log_index: None, + removed: true, + }); + } + } } + removed } } @@ -133,14 +179,15 @@ impl FilterManagerInner { fn install(&self, current_block: u64, kind: InterestKind) -> FilterId { let id = self.next_id(); - let next_start_block = current_block + 1; + let now = Instant::now(); let _ = self.filters.insert( id, ActiveFilter { - next_start_block, - last_poll_time: Instant::now(), + next_start_block: current_block + 1, + last_poll_time: now, + created_at: now, kind, - reorg_watermark: None, + pending_reorgs: Vec::new(), }, ); id @@ -161,11 +208,15 @@ impl FilterManagerInner { self.filters.remove(&id) } - /// Set a reorg watermark on all active filters. - pub(crate) fn set_reorg_watermark_all(&self, common_ancestor: u64) { + /// Apply a reorg notification to all active filters. + /// + /// Each filter records the shared `Arc` alongside + /// a snapshot of its current `next_start_block`, then eagerly rewinds. + /// Filters created after `received_at` are skipped (race guard). + pub(crate) fn apply_reorg(&self, reorg: Arc, received_at: Instant) { self.filters .iter_mut() - .for_each(|mut entry| entry.value_mut().set_reorg_watermark(common_ancestor)); + .for_each(|mut entry| entry.value_mut().push_reorg(Arc::clone(&reorg), received_at)); } /// Clean stale filters that have not been polled in a while. @@ -192,8 +243,8 @@ impl FilterManager { /// Create a new filter manager. /// /// Spawns a cleanup thread for stale filters and a tokio task that - /// listens for [`ChainEvent::Reorg`] events and propagates watermarks - /// to all active filters. + /// listens for [`ChainEvent::Reorg`] events and propagates reorg + /// notifications to all active filters. pub(crate) fn new( chain_events: &broadcast::Sender, clean_interval: Duration, @@ -248,8 +299,8 @@ impl FilterCleanTask { } } -/// Task that listens for reorg events and propagates watermarks to all -/// active filters. +/// Task that listens for reorg events and propagates them to all active +/// filters. /// /// Uses a [`Weak`] reference to self-terminate when the [`FilterManager`] /// is dropped. @@ -281,8 +332,10 @@ impl FilterReorgTask { let ChainEvent::Reorg(reorg) = event else { continue }; + let received_at = Instant::now(); + let reorg = Arc::new(reorg); let Some(manager) = self.manager.upgrade() else { break }; - manager.set_reorg_watermark_all(reorg.common_ancestor); + manager.apply_reorg(reorg, received_at); } } } @@ -290,76 +343,208 @@ impl FilterReorgTask { #[cfg(test)] mod tests { use super::*; - use crate::interest::InterestKind; + use crate::interest::{InterestKind, RemovedBlock}; + use alloy::primitives::{Address, Bytes, LogData, address, b256}; fn block_filter(start: u64) -> ActiveFilter { ActiveFilter { next_start_block: start, last_poll_time: Instant::now(), + created_at: Instant::now(), kind: InterestKind::Block, - reorg_watermark: None, + pending_reorgs: Vec::new(), + } + } + + fn log_filter(start: u64, addr: Address) -> ActiveFilter { + ActiveFilter { + next_start_block: start, + last_poll_time: Instant::now(), + created_at: Instant::now(), + kind: InterestKind::Log(Box::new(Filter::new().address(addr))), + pending_reorgs: Vec::new(), + } + } + + fn test_log(addr: Address) -> alloy::primitives::Log { + alloy::primitives::Log { address: addr, data: LogData::new_unchecked(vec![], Bytes::new()) } + } + + fn reorg_notification(ancestor: u64, removed: Vec) -> ReorgNotification { + ReorgNotification { common_ancestor: ancestor, removed_blocks: removed } + } + + fn removed_block(number: u64, logs: Vec) -> RemovedBlock { + RemovedBlock { + number, + hash: b256!("0x0000000000000000000000000000000000000000000000000000000000000001"), + logs, } } #[test] - fn set_reorg_watermark_keeps_minimum() { + fn push_reorg_skips_future_filters() { let mut f = block_filter(10); - f.set_reorg_watermark(8); - assert_eq!(f.reorg_watermark, Some(8)); + // received_at is before the filter was created. + let received_at = f.created_at - Duration::from_secs(1); + let reorg = Arc::new(reorg_notification(5, vec![])); - // A higher watermark does not overwrite the lower one. - f.set_reorg_watermark(9); - assert_eq!(f.reorg_watermark, Some(8)); + f.push_reorg(reorg, received_at); - // A lower watermark replaces the current one. - f.set_reorg_watermark(5); - assert_eq!(f.reorg_watermark, Some(5)); + assert!(f.pending_reorgs.is_empty()); + assert_eq!(f.next_start_block, 10); } #[test] - fn handle_reorg_resets_start_block() { + fn push_reorg_rewinds_start_block() { let mut f = block_filter(10); - f.set_reorg_watermark(7); + let received_at = Instant::now(); + let reorg = Arc::new(reorg_notification(7, vec![])); + + f.push_reorg(reorg, received_at); - let result = f.handle_reorg(); - assert_eq!(result, Some(7)); assert_eq!(f.next_start_block, 8); - assert!(f.reorg_watermark.is_none()); + assert_eq!(f.pending_reorgs.len(), 1); } #[test] - fn handle_reorg_noop_when_watermark_at_or_above_start() { - let mut f = block_filter(10); - f.set_reorg_watermark(10); + fn drain_reorgs_matches_removed_logs() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let mut f = log_filter(11, addr); + let received_at = Instant::now(); + + let reorg = Arc::new(reorg_notification( + 8, + vec![removed_block(9, vec![test_log(addr)]), removed_block(10, vec![test_log(addr)])], + )); + + f.push_reorg(reorg, received_at); + let removed = f.drain_reorgs(); + + assert_eq!(removed.len(), 2); + assert!(removed.iter().all(|l| l.removed)); + assert!(removed.iter().all(|l| l.inner.address == addr)); + } - let result = f.handle_reorg(); - assert!(result.is_none()); - // next_start_block unchanged. - assert_eq!(f.next_start_block, 10); - assert!(f.reorg_watermark.is_none()); + #[test] + fn drain_reorgs_skips_undelivered_blocks() { + let addr = address!("0x0000000000000000000000000000000000000001"); + // Filter has only delivered up to block 10 (next_start = 11). + let mut f = log_filter(11, addr); + let received_at = Instant::now(); + + // Reorg removes blocks 10, 11, 12. Only block 10 was delivered. + let reorg = Arc::new(reorg_notification( + 9, + vec![ + removed_block(10, vec![test_log(addr)]), + removed_block(11, vec![test_log(addr)]), + removed_block(12, vec![test_log(addr)]), + ], + )); + + f.push_reorg(reorg, received_at); + let removed = f.drain_reorgs(); + + assert_eq!(removed.len(), 1); + assert_eq!(removed[0].block_number, Some(10)); + } + + #[test] + fn drain_reorgs_cascading() { + let addr = address!("0x0000000000000000000000000000000000000001"); + // Filter has delivered up to block 100 (next_start = 101). + let mut f = log_filter(101, addr); + let received_at = Instant::now(); + + // Reorg A: rewinds to 98, removes 99-100. + let reorg_a = Arc::new(reorg_notification( + 98, + vec![removed_block(99, vec![test_log(addr)]), removed_block(100, vec![test_log(addr)])], + )); + f.push_reorg(reorg_a, received_at); + assert_eq!(f.next_start_block, 99); + + // Reorg B: rewinds to 95, removes 96-103. + let reorg_b = Arc::new(reorg_notification( + 95, + vec![ + removed_block(96, vec![test_log(addr)]), + removed_block(97, vec![test_log(addr)]), + removed_block(98, vec![test_log(addr)]), + removed_block(99, vec![test_log(addr)]), + removed_block(100, vec![test_log(addr)]), + removed_block(101, vec![test_log(addr)]), + removed_block(102, vec![test_log(addr)]), + removed_block(103, vec![test_log(addr)]), + ], + )); + f.push_reorg(reorg_b, received_at); + assert_eq!(f.next_start_block, 96); + + let removed = f.drain_reorgs(); + + // Reorg A: snapshot=101, blocks 99-100 < 101 → 2 logs. + // Reorg B: snapshot=99, blocks 96-98 < 99 → 3 logs. + // Blocks 99-103 from reorg B are >= 99 → skipped. + assert_eq!(removed.len(), 5); } #[test] - fn handle_reorg_clears_watermark() { + fn drain_reorgs_block_filter_empty() { let mut f = block_filter(10); - f.set_reorg_watermark(5); - f.handle_reorg(); + let received_at = Instant::now(); + let reorg = Arc::new(reorg_notification(5, vec![removed_block(6, vec![])])); - // Second call returns None — watermark already consumed. - assert!(f.handle_reorg().is_none()); + f.push_reorg(reorg, received_at); + let removed = f.drain_reorgs(); + + assert!(removed.is_empty()); + // But the rewind still happened. + assert_eq!(f.next_start_block, 6); } #[test] - fn set_reorg_watermark_all_propagates() { + fn drain_reorgs_clears_pending() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let mut f = log_filter(11, addr); + let received_at = Instant::now(); + let reorg = Arc::new(reorg_notification(8, vec![removed_block(9, vec![test_log(addr)])])); + + f.push_reorg(reorg, received_at); + let first = f.drain_reorgs(); + assert_eq!(first.len(), 1); + + let second = f.drain_reorgs(); + assert!(second.is_empty()); + } + + #[test] + fn apply_reorg_propagates_with_race_guard() { let inner = FilterManagerInner::new(); inner.install_block_filter(20); inner.install_block_filter(30); - inner.set_reorg_watermark_all(15); + let received_at = Instant::now(); + let reorg = Arc::new(reorg_notification(15, vec![])); + inner.apply_reorg(reorg, received_at); inner.filters.iter().for_each(|entry| { - assert_eq!(entry.value().reorg_watermark, Some(15)); + assert_eq!(entry.value().pending_reorgs.len(), 1); + assert_eq!(entry.value().next_start_block, 16); }); + + // A filter installed after the reorg should not have it. + let late_id = inner.install_block_filter(50); + // Re-apply same reorg with the old timestamp. + let reorg2 = Arc::new(reorg_notification(15, vec![])); + inner.apply_reorg(reorg2, received_at); + + let late = inner.filters.get(&late_id).unwrap(); + // The late filter was created after received_at, so it should + // have no pending reorgs from this event. + assert!(late.pending_reorgs.is_empty()); + assert_eq!(late.next_start_block, 51); } } diff --git a/crates/rpc/src/interest/kind.rs b/crates/rpc/src/interest/kind.rs index f0ed907..a9814b6 100644 --- a/crates/rpc/src/interest/kind.rs +++ b/crates/rpc/src/interest/kind.rs @@ -109,21 +109,22 @@ impl InterestKind { let logs: VecDeque = reorg .removed_blocks - .into_iter() + .iter() .flat_map(|block| { - let block_hash = block.header.hash_slow(); - let block_number = block.header.number; - let block_timestamp = block.header.timestamp; - block.logs.into_iter().filter(move |log| filter.matches(log)).map(move |log| Log { - inner: log, - block_hash: Some(block_hash), - block_number: Some(block_number), - block_timestamp: Some(block_timestamp), - transaction_hash: None, - transaction_index: None, - log_index: None, - removed: true, - }) + let hash = block.hash; + let number = block.number; + block.logs.iter().map(move |log| (hash, number, log)) + }) + .filter(|(_, _, log)| filter.matches(log)) + .map(|(hash, number, log)| Log { + inner: log.clone(), + block_hash: Some(hash), + block_number: Some(number), + block_timestamp: None, + transaction_hash: None, + transaction_index: None, + log_index: None, + removed: true, }) .collect(); @@ -134,7 +135,6 @@ impl InterestKind { #[cfg(test)] mod tests { use super::*; - use crate::interest::RemovedBlock; use alloy::primitives::{Address, B256, Bytes, LogData, address, b256}; fn test_log(addr: Address, topic: B256) -> alloy::primitives::Log { @@ -144,27 +144,29 @@ mod tests { } } - fn test_header(number: u64) -> alloy::consensus::Header { - alloy::consensus::Header { number, timestamp: 1_000_000 + number, ..Default::default() } - } - fn test_filter(addr: Address) -> Filter { Filter::new().address(addr) } + fn test_removed_block( + number: u64, + hash: B256, + logs: Vec, + ) -> crate::interest::RemovedBlock { + crate::interest::RemovedBlock { number, hash, logs } + } + #[test] fn filter_reorg_for_sub_matches_logs() { let addr = address!("0x0000000000000000000000000000000000000001"); let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000099"); - let header = test_header(11); let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { - header: header.clone(), - logs: vec![test_log(addr, topic)], - }], + removed_blocks: vec![test_removed_block(11, block_hash, vec![test_log(addr, topic)])], }; let buf = kind.filter_reorg_for_sub(reorg); @@ -173,9 +175,8 @@ mod tests { assert_eq!(logs.len(), 1); assert!(logs[0].removed); assert_eq!(logs[0].inner.address, addr); - assert_eq!(logs[0].block_hash.unwrap(), header.hash_slow()); - assert_eq!(logs[0].block_number.unwrap(), 11); - assert_eq!(logs[0].block_timestamp.unwrap(), 1_000_011); + assert_eq!(logs[0].block_hash, Some(block_hash)); + assert_eq!(logs[0].block_number, Some(11)); } #[test] @@ -183,14 +184,13 @@ mod tests { let addr = address!("0x0000000000000000000000000000000000000001"); let other = address!("0x0000000000000000000000000000000000000002"); let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000099"); let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { - header: test_header(11), - logs: vec![test_log(other, topic)], - }], + removed_blocks: vec![test_removed_block(11, block_hash, vec![test_log(other, topic)])], }; let buf = kind.filter_reorg_for_sub(reorg); @@ -200,9 +200,12 @@ mod tests { #[test] fn filter_reorg_for_sub_block_returns_empty() { + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { header: test_header(11), logs: vec![] }], + removed_blocks: vec![test_removed_block(11, block_hash, vec![])], }; let buf = InterestKind::Block.filter_reorg_for_sub(reorg); diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index 3ad2620..c79ecc6 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -69,12 +69,14 @@ pub enum ChainEvent { Reorg(ReorgNotification), } -/// Data from a single block removed during a chain reorganization. +/// A block that was removed during a chain reorganization. #[derive(Debug, Clone)] pub struct RemovedBlock { - /// The header of the removed block. - pub header: alloy::consensus::Header, - /// Logs emitted by the removed block. + /// The block number. + pub number: u64, + /// The block hash. + pub hash: alloy::primitives::B256, + /// Logs emitted in the removed block. pub logs: Vec, } @@ -83,6 +85,6 @@ pub struct RemovedBlock { pub struct ReorgNotification { /// The block number of the common ancestor (last block still valid). pub common_ancestor: u64, - /// Blocks removed by the reorg, each carrying its header and logs. + /// The blocks that were removed, ordered by block number. pub removed_blocks: Vec, } From cfa280cbbb50087a7a9bfd58654114ea92440829 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 13 Mar 2026 15:32:30 -0400 Subject: [PATCH 03/13] ci: trigger CI run From 993d5500517e87c83dd919afb9aca7d2f87dcf28 Mon Sep 17 00:00:00 2001 From: James Date: Sat, 14 Mar 2026 08:56:16 -0400 Subject: [PATCH 04/13] docs: update FilterManager docs for reorg task, remove unnecessary guard Update module-level and struct-level documentation to reflect the new FilterReorgTask tokio worker spawned alongside the existing OS cleanup thread. Remove unnecessary `if !removed.is_empty()` guard around the prepend logic in get_filter_changes. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/rpc/src/eth/endpoints.rs | 8 +++----- crates/rpc/src/interest/filters.rs | 14 ++++++++++---- crates/rpc/src/interest/mod.rs | 7 ++++++- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index d697492..060d647 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -1132,11 +1132,9 @@ where // Prepend removed logs so the client sees removals before // the replacement data. - if !removed.is_empty() { - let mut combined = removed; - combined.append(&mut logs); - logs = combined; - } + let mut combined = removed; + combined.append(&mut logs); + logs = combined; entry.mark_polled(latest); Ok(FilterOutput::from(logs)) diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index fd60b4c..6f81f47 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -32,7 +32,8 @@ type PendingReorg = (Arc, u64); /// An active filter. /// /// Records the filter details, the [`Instant`] at which the filter was last -/// polled, and the first block whose contents should be considered. +/// polled, and the first block whose contents should be considered. Tracks +/// a `created_at` timestamp used to guard against stale reorg notifications. #[derive(Debug, Clone)] pub(crate) struct ActiveFilter { next_start_block: u64, @@ -231,9 +232,14 @@ impl FilterManagerInner { /// Filters are stored in a [`DashMap`] that maps filter IDs to active filters. /// Filter IDs are assigned sequentially, starting from 1. /// -/// Calling [`Self::new`] spawns a task that periodically cleans stale filters. -/// This task runs on a separate thread to avoid [`DashMap::retain`] deadlock. -/// See [`DashMap`] documentation for more information. +/// Calling [`Self::new`] spawns two background workers: +/// - An OS thread that periodically cleans stale filters (using a separate +/// thread to avoid [`DashMap::retain`] deadlock). +/// - A tokio task that listens for reorg broadcasts and eagerly propagates +/// them to all active filters. +/// +/// Both workers hold [`Weak`] references and self-terminate when the +/// manager is dropped. #[derive(Debug, Clone)] pub(crate) struct FilterManager { inner: Arc, diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index c79ecc6..8b20187 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -25,12 +25,17 @@ //! reference to the `Arc`, so they self-terminate once all //! strong references are dropped. //! -//! OS threads are used (rather than tokio tasks) because +//! OS threads are used for cleanup (rather than tokio tasks) because //! [`DashMap::retain`] can deadlock if called from an async context //! that also holds a `DashMap` read guard on the same shard. Running //! cleanup on a dedicated OS thread ensures the retain lock is never //! contended with an in-flight async handler. //! +//! [`FilterManager`] additionally spawns a tokio task that listens +//! for [`ChainEvent::Reorg`] broadcasts and eagerly propagates reorg +//! notifications to all active filters. This task does not call +//! `retain`, so it is safe to run in an async context. +//! //! [`Weak`]: std::sync::Weak //! [`DashMap`]: dashmap::DashMap //! [`DashMap::retain`]: dashmap::DashMap::retain From 74cad4062c2b0817e5e5cb1dd510ed90d9230680 Mon Sep 17 00:00:00 2001 From: James Date: Sat, 14 Mar 2026 09:03:05 -0400 Subject: [PATCH 05/13] fix: restore block_timestamp on removed logs for spec compliance Thread timestamp from DrainedBlock.header through RemovedBlock so that drain_reorgs and filter_reorg_for_sub populate block_timestamp on removed logs, restoring Ethereum JSON-RPC spec compliance lost during the reorg redesign. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/node/src/node.rs | 3 ++- crates/rpc/src/interest/filters.rs | 3 ++- crates/rpc/src/interest/kind.rs | 11 ++++++----- crates/rpc/src/interest/mod.rs | 2 ++ 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 22bd3b9..b2ddc9a 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -370,9 +370,10 @@ where .map(|d| { let number = d.header.number(); let hash = d.header.hash(); + let timestamp = d.header.timestamp(); let logs = d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect(); - RemovedBlock { number, hash, logs } + RemovedBlock { number, hash, timestamp, logs } }) .collect(); let notif = ReorgNotification { common_ancestor, removed_blocks }; diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index 6f81f47..0a036be 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -141,7 +141,7 @@ impl ActiveFilter { inner: log.clone(), block_hash: Some(block.hash), block_number: Some(block.number), - block_timestamp: None, + block_timestamp: Some(block.timestamp), transaction_hash: None, transaction_index: None, log_index: None, @@ -384,6 +384,7 @@ mod tests { RemovedBlock { number, hash: b256!("0x0000000000000000000000000000000000000000000000000000000000000001"), + timestamp: 1_000_000 + number, logs, } } diff --git a/crates/rpc/src/interest/kind.rs b/crates/rpc/src/interest/kind.rs index a9814b6..f9f7bf7 100644 --- a/crates/rpc/src/interest/kind.rs +++ b/crates/rpc/src/interest/kind.rs @@ -113,14 +113,15 @@ impl InterestKind { .flat_map(|block| { let hash = block.hash; let number = block.number; - block.logs.iter().map(move |log| (hash, number, log)) + let timestamp = block.timestamp; + block.logs.iter().map(move |log| (hash, number, timestamp, log)) }) - .filter(|(_, _, log)| filter.matches(log)) - .map(|(hash, number, log)| Log { + .filter(|(_, _, _, log)| filter.matches(log)) + .map(|(hash, number, timestamp, log)| Log { inner: log.clone(), block_hash: Some(hash), block_number: Some(number), - block_timestamp: None, + block_timestamp: Some(timestamp), transaction_hash: None, transaction_index: None, log_index: None, @@ -153,7 +154,7 @@ mod tests { hash: B256, logs: Vec, ) -> crate::interest::RemovedBlock { - crate::interest::RemovedBlock { number, hash, logs } + crate::interest::RemovedBlock { number, hash, timestamp: 1_000_000 + number, logs } } #[test] diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index 8b20187..2403037 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -81,6 +81,8 @@ pub struct RemovedBlock { pub number: u64, /// The block hash. pub hash: alloy::primitives::B256, + /// The block timestamp. + pub timestamp: u64, /// Logs emitted in the removed block. pub logs: Vec, } From 4c9f2f1cecf9a8028e25cc0911a4148555909f8c Mon Sep 17 00:00:00 2001 From: James Date: Sat, 14 Mar 2026 09:51:59 -0400 Subject: [PATCH 06/13] refactor: replace per-filter reorg storage with global ring buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-filter `pending_reorgs` accumulated `Arc` eagerly via O(n) iteration on every reorg. Two early-return paths in `get_filter_changes` then discarded the drained removed logs — notably `start > latest` fires every post-reorg poll before new blocks arrive. Replace with a global `RwLock>` ring buffer (cap 25) on FilterManagerInner. Filters compute removed logs lazily at poll time by scanning entries received since `last_poll_time`, walking reorgs in order to derive snapshots. Both early-return paths now return removed logs instead of `empty_output()`. - Remove `pending_reorgs`, `created_at`, `push_reorg`, `drain_reorgs` from ActiveFilter - Add `compute_removed_logs`, `last_poll_time` accessor - Add `push_reorg` (ring buffer append) and `reorgs_since` to FilterManagerInner - Simplify FilterReorgTask to just append (no per-filter iteration) Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/rpc/src/eth/endpoints.rs | 34 ++-- crates/rpc/src/interest/filters.rs | 259 +++++++++++++---------------- 2 files changed, 141 insertions(+), 152 deletions(-) diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index 060d647..765556a 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -1078,12 +1078,13 @@ where let fm = ctx.filter_manager(); let mut entry = fm.get_mut(id).ok_or_else(|| format!("filter not found: {id}"))?; - // Drain any pending reorg notifications, producing removed logs - // for log filters. `next_start_block` was already rewound eagerly - // when the reorg was received. - let removed = entry.drain_reorgs(); + // Scan the global reorg ring buffer for notifications received + // since this filter's last poll, then lazily compute removed logs + // and rewind `next_start_block`. + let reorgs = fm.reorgs_since(entry.last_poll_time()); + let removed = entry.compute_removed_logs(&reorgs); if !removed.is_empty() { - trace!(count = removed.len(), "drained removed logs from pending reorgs"); + trace!(count = removed.len(), "computed removed logs from reorg ring buffer"); } let latest = ctx.tags().latest(); @@ -1091,17 +1092,24 @@ where // Implicit reorg detection: if latest has moved backward past our // window, a reorg occurred that we missed (e.g. broadcast lagged). - // Reset to avoid skipping. Removed logs are unavailable in this - // degraded path. + // Return any removed logs we do have, then reset. if latest + 1 < start { trace!(latest, start, "implicit reorg detected, resetting filter"); entry.mark_polled(latest); - return Ok(entry.empty_output()); + return Ok(if removed.is_empty() { + entry.empty_output() + } else { + FilterOutput::from(removed) + }); } if start > latest { entry.mark_polled(latest); - return Ok(entry.empty_output()); + return Ok(if removed.is_empty() { + entry.empty_output() + } else { + FilterOutput::from(removed) + }); } let cold = ctx.cold(); @@ -1132,9 +1140,11 @@ where // Prepend removed logs so the client sees removals before // the replacement data. - let mut combined = removed; - combined.append(&mut logs); - logs = combined; + if !removed.is_empty() { + let mut combined = removed; + combined.append(&mut logs); + logs = combined; + } entry.mark_polled(latest); Ok(FilterOutput::from(logs)) diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index 0a036be..59297b9 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -7,8 +7,9 @@ use alloy::{ }; use dashmap::{DashMap, mapref::one::RefMut}; use std::{ + collections::VecDeque, sync::{ - Arc, Weak, + Arc, RwLock, Weak, atomic::{AtomicU64, Ordering}, }, time::{Duration, Instant}, @@ -21,38 +22,34 @@ type FilterId = U64; /// Output of a polled filter: log entries or block hashes. pub(crate) type FilterOutput = EventBuffer; -/// A pending reorg notification paired with the filter's -/// `next_start_block` at the time the reorg was received. -/// -/// The snapshot records which blocks the filter had already delivered, -/// so [`ActiveFilter::drain_reorgs`] can determine which removed logs -/// are relevant. -type PendingReorg = (Arc, u64); +/// Maximum number of reorg notifications retained in the global ring +/// buffer. At most one reorg per 12 seconds and a 5-minute stale filter +/// TTL gives a worst case of 25 entries. +const MAX_REORG_ENTRIES: usize = 25; /// An active filter. /// /// Records the filter details, the [`Instant`] at which the filter was last -/// polled, and the first block whose contents should be considered. Tracks -/// a `created_at` timestamp used to guard against stale reorg notifications. +/// polled, and the first block whose contents should be considered. +/// +/// `last_poll_time` doubles as the cursor into the global reorg ring +/// buffer — at poll time the filter scans for reorgs received after +/// this instant. #[derive(Debug, Clone)] pub(crate) struct ActiveFilter { next_start_block: u64, last_poll_time: Instant, - created_at: Instant, kind: InterestKind, - pending_reorgs: Vec, } impl core::fmt::Display for ActiveFilter { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { write!( f, - "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?}, \ - pending_reorgs: {} }}", + "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?} }}", self.next_start_block, self.last_poll_time.elapsed().as_millis(), self.kind, - self.pending_reorgs.len(), ) } } @@ -72,7 +69,6 @@ impl ActiveFilter { pub(crate) fn mark_polled(&mut self, current_block: u64) { self.next_start_block = current_block + 1; self.last_poll_time = Instant::now(); - self.pending_reorgs.clear(); } /// Get the next start block for the filter. @@ -80,6 +76,11 @@ impl ActiveFilter { self.next_start_block } + /// Get the instant at which the filter was last polled (or created). + pub(crate) const fn last_poll_time(&self) -> Instant { + self.last_poll_time + } + /// Get the duration since the filter was last polled. pub(crate) fn time_since_last_poll(&self) -> Duration { self.last_poll_time.elapsed() @@ -90,47 +91,34 @@ impl ActiveFilter { self.kind.empty_output() } - /// Record a reorg notification, eagerly rewinding `next_start_block`. - /// - /// The notification is stored (behind an [`Arc`]) alongside a snapshot - /// of the filter's `next_start_block` at the time of the reorg, so - /// that [`drain_reorgs`] can later determine which removed logs the - /// client has already seen. + /// Compute removed logs from a sequence of reorg notifications. /// - /// If `received_at` is before the filter's creation time, the reorg - /// is silently skipped — the filter was installed after the reorg - /// occurred and its `next_start_block` already reflects the post-reorg - /// chain state. + /// Walks the reorgs in order, computing snapshots lazily from the + /// filter's current [`next_start_block`]. For each reorg, only logs + /// from blocks the filter had already delivered (block number below + /// the snapshot) are included. /// - /// [`drain_reorgs`]: Self::drain_reorgs - fn push_reorg(&mut self, reorg: Arc, received_at: Instant) { - if self.created_at > received_at { - return; - } - - let snapshot = self.next_start_block; - self.next_start_block = self.next_start_block.min(reorg.common_ancestor + 1); - self.pending_reorgs.push((reorg, snapshot)); - } - - /// Drain pending reorgs, returning matched removed logs with - /// `removed: true`. + /// Updates `next_start_block` to the rewound value so the subsequent + /// forward scan starts from the correct position. /// - /// For each pending reorg, only logs from blocks the filter had - /// already delivered (block number below the snapshot) are included. /// Block filters return an empty vec — the Ethereum JSON-RPC spec /// does not define `removed` semantics for block filters. - pub(crate) fn drain_reorgs(&mut self) -> Vec { - let reorgs = std::mem::take(&mut self.pending_reorgs); - + /// + /// [`next_start_block`]: Self::next_start_block + pub(crate) fn compute_removed_logs(&mut self, reorgs: &[Arc]) -> Vec { let Some(filter) = self.kind.as_filter() else { + for reorg in reorgs { + self.next_start_block = self.next_start_block.min(reorg.common_ancestor + 1); + } return Vec::new(); }; let mut removed = Vec::new(); - for (notification, snapshot_start) in reorgs { + for notification in reorgs { + let snapshot = self.next_start_block; + self.next_start_block = self.next_start_block.min(notification.common_ancestor + 1); for block in ¬ification.removed_blocks { - if block.number >= snapshot_start { + if block.number >= snapshot { continue; } for log in &block.logs { @@ -159,13 +147,18 @@ impl ActiveFilter { pub(crate) struct FilterManagerInner { current_id: AtomicU64, filters: DashMap, + reorgs: RwLock)>>, } impl FilterManagerInner { /// Create a new filter manager. fn new() -> Self { // Start from 1, as 0 is weird in quantity encoding. - Self { current_id: AtomicU64::new(1), filters: DashMap::new() } + Self { + current_id: AtomicU64::new(1), + filters: DashMap::new(), + reorgs: RwLock::new(VecDeque::new()), + } } /// Get the next filter ID. @@ -180,15 +173,12 @@ impl FilterManagerInner { fn install(&self, current_block: u64, kind: InterestKind) -> FilterId { let id = self.next_id(); - let now = Instant::now(); let _ = self.filters.insert( id, ActiveFilter { next_start_block: current_block + 1, - last_poll_time: now, - created_at: now, + last_poll_time: Instant::now(), kind, - pending_reorgs: Vec::new(), }, ); id @@ -209,15 +199,30 @@ impl FilterManagerInner { self.filters.remove(&id) } - /// Apply a reorg notification to all active filters. + /// Append a reorg notification to the global ring buffer. /// - /// Each filter records the shared `Arc` alongside - /// a snapshot of its current `next_start_block`, then eagerly rewinds. - /// Filters created after `received_at` are skipped (race guard). - pub(crate) fn apply_reorg(&self, reorg: Arc, received_at: Instant) { - self.filters - .iter_mut() - .for_each(|mut entry| entry.value_mut().push_reorg(Arc::clone(&reorg), received_at)); + /// Evicts the oldest entry when the buffer is full. The + /// [`Arc`] is shared across all poll-time readers. + pub(crate) fn push_reorg(&self, reorg: ReorgNotification) { + let entry = (Instant::now(), Arc::new(reorg)); + let mut buf = self.reorgs.write().unwrap(); + if buf.len() >= MAX_REORG_ENTRIES { + buf.pop_front(); + } + buf.push_back(entry); + } + + /// Return all reorg notifications received after `since`. + /// + /// Clones the [`Arc`]s under a brief read lock and returns them. + pub(crate) fn reorgs_since(&self, since: Instant) -> Vec> { + self.reorgs + .read() + .unwrap() + .iter() + .filter(|(received_at, _)| *received_at > since) + .map(|(_, reorg)| Arc::clone(reorg)) + .collect() } /// Clean stale filters that have not been polled in a while. @@ -232,11 +237,16 @@ impl FilterManagerInner { /// Filters are stored in a [`DashMap`] that maps filter IDs to active filters. /// Filter IDs are assigned sequentially, starting from 1. /// +/// Reorg notifications are stored in a global ring buffer (capped at +/// [`MAX_REORG_ENTRIES`]). Filters compute their removed logs lazily at +/// poll time by scanning the buffer for entries received since the last +/// poll. +/// /// Calling [`Self::new`] spawns two background workers: /// - An OS thread that periodically cleans stale filters (using a separate /// thread to avoid [`DashMap::retain`] deadlock). -/// - A tokio task that listens for reorg broadcasts and eagerly propagates -/// them to all active filters. +/// - A tokio task that listens for reorg broadcasts and appends them to +/// the global ring buffer. /// /// Both workers hold [`Weak`] references and self-terminate when the /// manager is dropped. @@ -249,8 +259,8 @@ impl FilterManager { /// Create a new filter manager. /// /// Spawns a cleanup thread for stale filters and a tokio task that - /// listens for [`ChainEvent::Reorg`] events and propagates reorg - /// notifications to all active filters. + /// listens for [`ChainEvent::Reorg`] events and appends them to the + /// global ring buffer. pub(crate) fn new( chain_events: &broadcast::Sender, clean_interval: Duration, @@ -305,8 +315,8 @@ impl FilterCleanTask { } } -/// Task that listens for reorg events and propagates them to all active -/// filters. +/// Task that listens for reorg events and appends them to the global +/// ring buffer. /// /// Uses a [`Weak`] reference to self-terminate when the [`FilterManager`] /// is dropped. @@ -338,10 +348,8 @@ impl FilterReorgTask { let ChainEvent::Reorg(reorg) = event else { continue }; - let received_at = Instant::now(); - let reorg = Arc::new(reorg); let Some(manager) = self.manager.upgrade() else { break }; - manager.apply_reorg(reorg, received_at); + manager.push_reorg(reorg); } } } @@ -356,9 +364,7 @@ mod tests { ActiveFilter { next_start_block: start, last_poll_time: Instant::now(), - created_at: Instant::now(), kind: InterestKind::Block, - pending_reorgs: Vec::new(), } } @@ -366,9 +372,7 @@ mod tests { ActiveFilter { next_start_block: start, last_poll_time: Instant::now(), - created_at: Instant::now(), kind: InterestKind::Log(Box::new(Filter::new().address(addr))), - pending_reorgs: Vec::new(), } } @@ -390,43 +394,26 @@ mod tests { } #[test] - fn push_reorg_skips_future_filters() { + fn compute_removed_logs_rewinds_start_block() { let mut f = block_filter(10); - // received_at is before the filter was created. - let received_at = f.created_at - Duration::from_secs(1); - let reorg = Arc::new(reorg_notification(5, vec![])); - - f.push_reorg(reorg, received_at); - - assert!(f.pending_reorgs.is_empty()); - assert_eq!(f.next_start_block, 10); - } - - #[test] - fn push_reorg_rewinds_start_block() { - let mut f = block_filter(10); - let received_at = Instant::now(); let reorg = Arc::new(reorg_notification(7, vec![])); - f.push_reorg(reorg, received_at); + f.compute_removed_logs(&[reorg]); assert_eq!(f.next_start_block, 8); - assert_eq!(f.pending_reorgs.len(), 1); } #[test] - fn drain_reorgs_matches_removed_logs() { + fn compute_removed_logs_matches_removed() { let addr = address!("0x0000000000000000000000000000000000000001"); let mut f = log_filter(11, addr); - let received_at = Instant::now(); let reorg = Arc::new(reorg_notification( 8, vec![removed_block(9, vec![test_log(addr)]), removed_block(10, vec![test_log(addr)])], )); - f.push_reorg(reorg, received_at); - let removed = f.drain_reorgs(); + let removed = f.compute_removed_logs(&[reorg]); assert_eq!(removed.len(), 2); assert!(removed.iter().all(|l| l.removed)); @@ -434,11 +421,10 @@ mod tests { } #[test] - fn drain_reorgs_skips_undelivered_blocks() { + fn compute_removed_logs_skips_undelivered_blocks() { let addr = address!("0x0000000000000000000000000000000000000001"); // Filter has only delivered up to block 10 (next_start = 11). let mut f = log_filter(11, addr); - let received_at = Instant::now(); // Reorg removes blocks 10, 11, 12. Only block 10 was delivered. let reorg = Arc::new(reorg_notification( @@ -450,27 +436,23 @@ mod tests { ], )); - f.push_reorg(reorg, received_at); - let removed = f.drain_reorgs(); + let removed = f.compute_removed_logs(&[reorg]); assert_eq!(removed.len(), 1); assert_eq!(removed[0].block_number, Some(10)); } #[test] - fn drain_reorgs_cascading() { + fn compute_removed_logs_cascading() { let addr = address!("0x0000000000000000000000000000000000000001"); // Filter has delivered up to block 100 (next_start = 101). let mut f = log_filter(101, addr); - let received_at = Instant::now(); // Reorg A: rewinds to 98, removes 99-100. let reorg_a = Arc::new(reorg_notification( 98, vec![removed_block(99, vec![test_log(addr)]), removed_block(100, vec![test_log(addr)])], )); - f.push_reorg(reorg_a, received_at); - assert_eq!(f.next_start_block, 99); // Reorg B: rewinds to 95, removes 96-103. let reorg_b = Arc::new(reorg_notification( @@ -486,25 +468,22 @@ mod tests { removed_block(103, vec![test_log(addr)]), ], )); - f.push_reorg(reorg_b, received_at); - assert_eq!(f.next_start_block, 96); - let removed = f.drain_reorgs(); + let removed = f.compute_removed_logs(&[reorg_a, reorg_b]); // Reorg A: snapshot=101, blocks 99-100 < 101 → 2 logs. // Reorg B: snapshot=99, blocks 96-98 < 99 → 3 logs. // Blocks 99-103 from reorg B are >= 99 → skipped. assert_eq!(removed.len(), 5); + assert_eq!(f.next_start_block, 96); } #[test] - fn drain_reorgs_block_filter_empty() { + fn compute_removed_logs_block_filter_empty() { let mut f = block_filter(10); - let received_at = Instant::now(); let reorg = Arc::new(reorg_notification(5, vec![removed_block(6, vec![])])); - f.push_reorg(reorg, received_at); - let removed = f.drain_reorgs(); + let removed = f.compute_removed_logs(&[reorg]); assert!(removed.is_empty()); // But the rewind still happened. @@ -512,46 +491,46 @@ mod tests { } #[test] - fn drain_reorgs_clears_pending() { - let addr = address!("0x0000000000000000000000000000000000000001"); - let mut f = log_filter(11, addr); - let received_at = Instant::now(); - let reorg = Arc::new(reorg_notification(8, vec![removed_block(9, vec![test_log(addr)])])); + fn push_reorg_evicts_oldest() { + let inner = FilterManagerInner::new(); + for i in 0..MAX_REORG_ENTRIES + 5 { + inner.push_reorg(reorg_notification(i as u64, vec![])); + } + let buf = inner.reorgs.read().unwrap(); + assert_eq!(buf.len(), MAX_REORG_ENTRIES); + // Oldest surviving entry should be the 6th push (index 5). + assert_eq!(buf[0].1.common_ancestor, 5); + } - f.push_reorg(reorg, received_at); - let first = f.drain_reorgs(); - assert_eq!(first.len(), 1); + #[test] + fn reorgs_since_filters_by_time() { + let inner = FilterManagerInner::new(); + let before = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + inner.push_reorg(reorg_notification(10, vec![])); + let mid = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + inner.push_reorg(reorg_notification(8, vec![])); + + let all = inner.reorgs_since(before); + assert_eq!(all.len(), 2); - let second = f.drain_reorgs(); - assert!(second.is_empty()); + let recent = inner.reorgs_since(mid); + assert_eq!(recent.len(), 1); + assert_eq!(recent[0].common_ancestor, 8); } #[test] - fn apply_reorg_propagates_with_race_guard() { + fn reorgs_since_skips_pre_creation_reorgs() { let inner = FilterManagerInner::new(); - inner.install_block_filter(20); - inner.install_block_filter(30); + inner.push_reorg(reorg_notification(5, vec![])); + std::thread::sleep(Duration::from_millis(5)); - let received_at = Instant::now(); - let reorg = Arc::new(reorg_notification(15, vec![])); - inner.apply_reorg(reorg, received_at); - - inner.filters.iter().for_each(|entry| { - assert_eq!(entry.value().pending_reorgs.len(), 1); - assert_eq!(entry.value().next_start_block, 16); - }); + let id = inner.install_block_filter(20); + let filter = inner.filters.get(&id).unwrap(); - // A filter installed after the reorg should not have it. - let late_id = inner.install_block_filter(50); - // Re-apply same reorg with the old timestamp. - let reorg2 = Arc::new(reorg_notification(15, vec![])); - inner.apply_reorg(reorg2, received_at); - - let late = inner.filters.get(&late_id).unwrap(); - // The late filter was created after received_at, so it should - // have no pending reorgs from this event. - assert!(late.pending_reorgs.is_empty()); - assert_eq!(late.next_start_block, 51); + let reorgs = inner.reorgs_since(filter.last_poll_time); + assert!(reorgs.is_empty()); } } From 10bb2fba2aee00aee7f6d62d23147f64a3d65f3e Mon Sep 17 00:00:00 2001 From: James Date: Sat, 14 Mar 2026 10:22:28 -0400 Subject: [PATCH 07/13] fix: preserve rewound next_start_block on early-return paths Replace mark_polled(latest) with touch_poll_time() on the implicit-reorg and no-new-blocks early-return paths. mark_polled unconditionally overwrites next_start_block, discarding the rewind applied by compute_removed_logs. The new touch_poll_time method updates only last_poll_time so the next forward scan starts from the correct position. Also restore the block_timestamp assertion in filter_reorg_for_sub test that was dropped during the ring buffer refactor. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/rpc/src/eth/endpoints.rs | 4 ++-- crates/rpc/src/interest/filters.rs | 9 +++++++++ crates/rpc/src/interest/kind.rs | 1 + 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index 765556a..6e818e2 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -1095,7 +1095,7 @@ where // Return any removed logs we do have, then reset. if latest + 1 < start { trace!(latest, start, "implicit reorg detected, resetting filter"); - entry.mark_polled(latest); + entry.touch_poll_time(); return Ok(if removed.is_empty() { entry.empty_output() } else { @@ -1104,7 +1104,7 @@ where } if start > latest { - entry.mark_polled(latest); + entry.touch_poll_time(); return Ok(if removed.is_empty() { entry.empty_output() } else { diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index 59297b9..24fecea 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -71,6 +71,15 @@ impl ActiveFilter { self.last_poll_time = Instant::now(); } + /// Update the poll timestamp without advancing `next_start_block`. + /// + /// Used on early-return paths (implicit reorg, no new blocks) where + /// `compute_removed_logs` has already rewound `next_start_block` and + /// we must preserve that position for the next forward scan. + pub(crate) fn touch_poll_time(&mut self) { + self.last_poll_time = Instant::now(); + } + /// Get the next start block for the filter. pub(crate) const fn next_start_block(&self) -> u64 { self.next_start_block diff --git a/crates/rpc/src/interest/kind.rs b/crates/rpc/src/interest/kind.rs index f9f7bf7..72cd78f 100644 --- a/crates/rpc/src/interest/kind.rs +++ b/crates/rpc/src/interest/kind.rs @@ -178,6 +178,7 @@ mod tests { assert_eq!(logs[0].inner.address, addr); assert_eq!(logs[0].block_hash, Some(block_hash)); assert_eq!(logs[0].block_number, Some(11)); + assert_eq!(logs[0].block_timestamp, Some(1_000_011)); } #[test] From 9c7f6c05622ebc80c541fb14b76e4e58d59578b9 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 17 Mar 2026 09:02:55 -0400 Subject: [PATCH 08/13] docs: add design spec for reorg ring buffer and log type changes Addresses Fraser's PR #98 review feedback with two changes: 1. Move reorg ring buffer from FilterManagerInner to ChainNotifier 2. Preserve alloy::rpc::types::Log in RemovedBlock for spec compliance Co-Authored-By: Claude Opus 4.6 (1M context) --- ...7-reorg-ring-buffer-and-log-type-design.md | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md diff --git a/docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md b/docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md new file mode 100644 index 0000000..b297c5b --- /dev/null +++ b/docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md @@ -0,0 +1,149 @@ +# Reorg Ring Buffer and Removed Log Type + +**Date:** 2026-03-17 +**PR:** #98 (james/eng-1971) +**Reviewer feedback:** Fraser + +## Context + +PR #98 adds reorg handling to `eth_getFilterChanges`. Fraser's review +identified two improvements: + +1. The reorg ring buffer lives in `FilterManagerInner` and is fed by + `FilterReorgTask` via a `broadcast::Receiver`. If the receiver lags, + reorg notifications are silently dropped — a correctness bug. + +2. `RemovedBlock.logs` uses `alloy::primitives::Log`, discarding + `transaction_hash` and `log_index` that the Ethereum spec requires on + removed logs returned to clients. + +## Change 1: Move ring buffer into ChainNotifier + +### Current flow + +``` +ChainNotifier::send_reorg() + -> broadcast::Sender + -> FilterReorgTask (broadcast::Receiver) + -> FilterManagerInner::push_reorg() (ring buffer) +``` + +`FilterReorgTask` listens on a `broadcast::Receiver`, filters +for `ChainEvent::Reorg`, and appends to a `VecDeque` on +`FilterManagerInner`. If the receiver lags, reorgs are lost. + +### New flow + +``` +ChainNotifier::send_reorg() + -> write to ring buffer (authoritative) + -> broadcast to subscribers (push-based only) +``` + +Polling filters read the ring buffer directly from `ChainNotifier` at +poll time. The broadcast channel remains for push subscriptions only. + +### ChainNotifier changes + +Add fields: + +- `reorgs: RwLock)>>` + +Add constant: + +- `MAX_REORG_ENTRIES: usize = 25` + +Modify `send_reorg`: + +- Write `Arc` to the ring buffer (evicting oldest if + full) **before** broadcasting on the channel. + +Add method: + +- `reorgs_since(since: Instant) -> Vec>` — scans + the buffer under a brief read lock. Moved from `FilterManagerInner`. + +### FilterManager changes + +- `FilterManager::new` takes `ChainNotifier` instead of + `&broadcast::Sender`. +- Store `ChainNotifier` (cheap clone) alongside the `Arc`. +- Remove `reorgs` field, `push_reorg`, and `reorgs_since` from + `FilterManagerInner`. +- Delete `FilterReorgTask` entirely. +- Poll-time reorg reads call `chain_notifier.reorgs_since(...)` instead + of `self.inner.reorgs_since(...)`. + +### Call site (ctx.rs) + +`FilterManager::new` call changes from `&chain.notif_sender()` to +`chain.clone()`. + +### SubscriptionManager + +No changes. Continues using the broadcast channel for push delivery. + +## Change 2: Preserve rpc::types::Log in RemovedBlock + +### RemovedBlock type change + +```rust +// Before +pub logs: Vec + +// After +pub logs: Vec +``` + +### Producer (node.rs::notify_reorg) + +Stop stripping the rpc wrapper. Currently: + +```rust +d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect() +``` + +Becomes: + +```rust +d.receipts.into_iter().flat_map(|r| r.receipt.logs).collect() +``` + +The `ColdReceipt` already stores `ConsensusReceipt` with +`transaction_hash` and `log_index` populated. + +### Consumer: compute_removed_logs (filters.rs) + +Currently reconstructs `alloy::rpc::types::Log` with `transaction_hash: +None`. Instead, clone the log from `RemovedBlock`, set `removed: true`, +and preserve the existing fields. Filter matching calls +`filter.matches(&log.inner)` since `Filter::matches` takes +`&alloy::primitives::Log`. + +### Consumer: filter_reorg_for_sub (kind.rs) + +Same change — use the log directly instead of reconstructing with `None` +fields. Set `removed: true` on the cloned log. + +### Tests + +Update test helpers (`removed_block` in filters.rs, `test_removed_block` +in kind.rs) to construct `alloy::rpc::types::Log` instead of +`alloy::primitives::Log`. + +## Files changed + +| File | Change | +|------|--------| +| `crates/rpc/src/config/chain_notifier.rs` | Add ring buffer, `reorgs_since`, modify `send_reorg` | +| `crates/rpc/src/interest/filters.rs` | Remove `FilterReorgTask`, reorg fields; store `ChainNotifier`; update `compute_removed_logs` | +| `crates/rpc/src/interest/mod.rs` | Change `RemovedBlock.logs` type | +| `crates/rpc/src/interest/kind.rs` | Update `filter_reorg_for_sub` to use rpc log directly | +| `crates/rpc/src/config/ctx.rs` | Pass `ChainNotifier` to `FilterManager::new` | +| `crates/node/src/node.rs` | Stop stripping log wrapper in `notify_reorg` | + +## Ordering + +Change 2 (log type) is independent of Change 1 (ring buffer move) and +can be implemented first. Change 1 depends on Change 2 only in that +test helpers need to agree on the log type. From 4df3f7821472214b4d283c9fd7d99729036e3dea Mon Sep 17 00:00:00 2001 From: James Date: Tue, 17 Mar 2026 09:08:47 -0400 Subject: [PATCH 09/13] docs: address spec review feedback - Add Arc wrapper on reorgs field (ChainNotifier is Clone) - Fix ColdReceipt type name (Receipt, not ConsensusReceipt) - Note module-level and struct-level doc updates needed - Clarify send_reorg return semantics with ring buffer Co-Authored-By: Claude Opus 4.6 (1M context) --- ...-17-reorg-ring-buffer-and-log-type-design.md | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md b/docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md index b297c5b..ccb7e45 100644 --- a/docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md +++ b/docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md @@ -47,7 +47,10 @@ poll time. The broadcast channel remains for push subscriptions only. Add fields: -- `reorgs: RwLock)>>` +- `reorgs: Arc)>>>` + +The `Arc` wrapper is required because `ChainNotifier` derives `Clone` +and all existing fields are `Arc`-backed. Add constant: @@ -56,7 +59,10 @@ Add constant: Modify `send_reorg`: - Write `Arc` to the ring buffer (evicting oldest if - full) **before** broadcasting on the channel. + full) **before** broadcasting on the channel. The ring buffer write is + the authoritative store; the broadcast `Err` only means "no push + subscribers" and does not affect the buffer. Return type stays the same + for backward compatibility. Add method: @@ -109,7 +115,8 @@ Becomes: d.receipts.into_iter().flat_map(|r| r.receipt.logs).collect() ``` -The `ColdReceipt` already stores `ConsensusReceipt` with +The `ColdReceipt` already stores `Receipt` (where `RpcLog` is +re-exported from alloy as `alloy::rpc::types::Log`) with `transaction_hash` and `log_index` populated. ### Consumer: compute_removed_logs (filters.rs) @@ -136,8 +143,8 @@ in kind.rs) to construct `alloy::rpc::types::Log` instead of | File | Change | |------|--------| | `crates/rpc/src/config/chain_notifier.rs` | Add ring buffer, `reorgs_since`, modify `send_reorg` | -| `crates/rpc/src/interest/filters.rs` | Remove `FilterReorgTask`, reorg fields; store `ChainNotifier`; update `compute_removed_logs` | -| `crates/rpc/src/interest/mod.rs` | Change `RemovedBlock.logs` type | +| `crates/rpc/src/interest/filters.rs` | Remove `FilterReorgTask`, reorg fields; store `ChainNotifier`; update `compute_removed_logs`; update `FilterManager` doc comment (now one background worker, not two) | +| `crates/rpc/src/interest/mod.rs` | Change `RemovedBlock.logs` type; update module-level docs (remove `FilterReorgTask` description) | | `crates/rpc/src/interest/kind.rs` | Update `filter_reorg_for_sub` to use rpc log directly | | `crates/rpc/src/config/ctx.rs` | Pass `ChainNotifier` to `FilterManager::new` | | `crates/node/src/node.rs` | Stop stripping log wrapper in `notify_reorg` | From e7c646038b54c05fc8895224d0b52086c4b6c1a0 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 17 Mar 2026 09:23:49 -0400 Subject: [PATCH 10/13] docs: add implementation plan for reorg ring buffer and log type Addresses spec review feedback: - Preserve Deref impl on FilterManager - Populate block metadata in test log helpers - Add missing reorgs_since_skips_pre_creation test - Simplify send_reorg (clone instead of Arc::try_unwrap) - Clean up confusing false-start text Co-Authored-By: Claude Opus 4.6 (1M context) --- ...26-03-17-reorg-ring-buffer-and-log-type.md | 734 ++++++++++++++++++ 1 file changed, 734 insertions(+) create mode 100644 docs/superpowers/plans/2026-03-17-reorg-ring-buffer-and-log-type.md diff --git a/docs/superpowers/plans/2026-03-17-reorg-ring-buffer-and-log-type.md b/docs/superpowers/plans/2026-03-17-reorg-ring-buffer-and-log-type.md new file mode 100644 index 0000000..de4c8b1 --- /dev/null +++ b/docs/superpowers/plans/2026-03-17-reorg-ring-buffer-and-log-type.md @@ -0,0 +1,734 @@ +# Reorg Ring Buffer & Log Type Implementation Plan + +> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Address Fraser's PR #98 review: move the reorg ring buffer from `FilterManagerInner` to `ChainNotifier`, and change `RemovedBlock.logs` to `Vec` for Ethereum spec compliance. + +**Architecture:** Two independent changes. Change 2 (log type) is implemented first because it's simpler and Change 1's tests must agree on the log type. Change 1 moves the ring buffer upstream into `ChainNotifier`, deletes `FilterReorgTask`, and has `FilterManager` delegate `reorgs_since` to `ChainNotifier`. + +**Tech Stack:** Rust, alloy, tokio broadcast, signet-rpc, signet-node + +**Spec:** `docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md` + +--- + +## Chunk 1: Preserve rpc::types::Log in RemovedBlock + +### Task 1: Change RemovedBlock.logs type and update producer + +**Files:** +- Modify: `crates/rpc/src/interest/mod.rs:87` +- Modify: `crates/node/src/node.rs:367-381` + +- [ ] **Step 1: Change `RemovedBlock.logs` type** + +In `crates/rpc/src/interest/mod.rs`, change: + +```rust + /// Logs emitted in the removed block. + pub logs: Vec, +``` + +To: + +```rust + /// Logs emitted in the removed block. + /// + /// Uses the RPC log type to preserve `transaction_hash` and + /// `log_index` from the original receipts, as required by the + /// Ethereum JSON-RPC spec for removed logs. + pub logs: Vec, +``` + +- [ ] **Step 2: Update the producer in node.rs** + +In `crates/node/src/node.rs`, the `notify_reorg` method currently strips the RPC wrapper. Change: + +```rust + let logs = + d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect(); +``` + +To: + +```rust + let logs = d.receipts.into_iter().flat_map(|r| r.receipt.logs).collect(); +``` + +- [ ] **Step 3: Verify compilation** + +Run: `cargo clippy -p signet-node --all-features --all-targets 2>&1 | head -30` + +Expected: Compilation errors in `filters.rs` and `kind.rs` because they +still construct `Log` from `primitives::Log` — these are fixed in Tasks 2 +and 3. + +### Task 2: Update compute_removed_logs consumer (filters.rs) + +**Files:** +- Modify: `crates/rpc/src/interest/filters.rs:117-151` +- Modify: `crates/rpc/src/interest/filters.rs:388-403` (test helpers) + +- [ ] **Step 1: Update `compute_removed_logs`** + +In `crates/rpc/src/interest/filters.rs`, replace the body of +`compute_removed_logs` (lines 117-151). The key changes: +- `filter.matches(log)` becomes `filter.matches(&log.inner)` (rpc Log wraps primitives Log) +- Instead of constructing a new `Log`, clone the existing one and set `removed: true` + +```rust + pub(crate) fn compute_removed_logs(&mut self, reorgs: &[Arc]) -> Vec { + let Some(filter) = self.kind.as_filter() else { + for reorg in reorgs { + self.next_start_block = self.next_start_block.min(reorg.common_ancestor + 1); + } + return Vec::new(); + }; + + let mut removed = Vec::new(); + for notification in reorgs { + let snapshot = self.next_start_block; + self.next_start_block = self.next_start_block.min(notification.common_ancestor + 1); + for block in ¬ification.removed_blocks { + if block.number >= snapshot { + continue; + } + for log in &block.logs { + if !filter.matches(&log.inner) { + continue; + } + let mut log = log.clone(); + log.removed = true; + removed.push(log); + } + } + } + removed + } +``` + +- [ ] **Step 2: Update test helpers** + +In the `mod tests` block of `filters.rs`, update `test_log` and +`removed_block` to use `alloy::rpc::types::Log`: + +Replace the `test_log` helper: + +```rust + fn test_log(addr: Address, number: u64, hash: B256) -> Log { + Log { + inner: alloy::primitives::Log { + address: addr, + data: LogData::new_unchecked(vec![], Bytes::new()), + }, + block_hash: Some(hash), + block_number: Some(number), + block_timestamp: Some(1_000_000 + number), + transaction_hash: Some(B256::ZERO), + transaction_index: Some(0), + log_index: Some(0), + removed: false, + } + } +``` + +Replace the `removed_block` helper: + +```rust + fn removed_block(number: u64, logs: Vec) -> RemovedBlock { + RemovedBlock { + number, + hash: b256!("0x0000000000000000000000000000000000000001"), + timestamp: 1_000_000 + number, + logs, + } + } +``` + +Add `B256` to the test imports (already available via +`alloy::primitives`). The `Log` type comes from `super::*`. + +Update all test call sites that use `test_log` to pass block number and +hash. For example, `test_log(addr)` becomes +`test_log(addr, 9, b256!("0x...0001"))` where the number and hash match +the `removed_block` they appear in. Each `removed_block` call already +passes a block number — the logs inside should use the same number and +the same hash constant. + +- [ ] **Step 3: Run tests** + +Run: `cargo t -p signet-rpc -- filters` + +Expected: All `compute_removed_logs_*` and `push_reorg_*` and +`reorgs_since_*` tests pass. + +- [ ] **Step 4: Lint** + +Run: `cargo clippy -p signet-rpc --all-features --all-targets` + +Expected: Clean. + +### Task 3: Update filter_reorg_for_sub consumer (kind.rs) + +**Files:** +- Modify: `crates/rpc/src/interest/kind.rs:105-133` +- Modify: `crates/rpc/src/interest/kind.rs:141-158` (test helpers) + +- [ ] **Step 1: Update `filter_reorg_for_sub`** + +In `crates/rpc/src/interest/kind.rs`, replace the body of +`filter_reorg_for_sub`: + +```rust + pub(crate) fn filter_reorg_for_sub(&self, reorg: ReorgNotification) -> SubscriptionBuffer { + let Some(filter) = self.as_filter() else { + return self.empty_sub_buffer(); + }; + + let logs: VecDeque = reorg + .removed_blocks + .iter() + .flat_map(|block| block.logs.iter()) + .filter(|log| filter.matches(&log.inner)) + .map(|log| { + let mut log = log.clone(); + log.removed = true; + log + }) + .collect(); + + SubscriptionBuffer::Log(logs) + } +``` + +- [ ] **Step 2: Update test helpers** + +Replace the `test_log` helper: + +```rust + fn test_log(addr: Address, topic: B256, number: u64, hash: B256) -> Log { + Log { + inner: alloy::primitives::Log { + address: addr, + data: LogData::new_unchecked(vec![topic], Bytes::new()), + }, + block_hash: Some(hash), + block_number: Some(number), + block_timestamp: Some(1_000_000 + number), + transaction_hash: Some(B256::ZERO), + transaction_index: Some(0), + log_index: Some(0), + removed: false, + } + } +``` + +Replace the `test_removed_block` helper: + +```rust + fn test_removed_block( + number: u64, + hash: B256, + logs: Vec, + ) -> crate::interest::RemovedBlock { + crate::interest::RemovedBlock { number, hash, timestamp: 1_000_000 + number, logs } + } +``` + +Update all test call sites: `test_log(addr, topic)` becomes +`test_log(addr, topic, number, hash)` where number and hash match the +`test_removed_block` the log appears in. The `Log` type comes from +`use super::*`. Existing imports stay the same. + +- [ ] **Step 3: Run tests** + +Run: `cargo t -p signet-rpc -- kind` + +Expected: All `filter_reorg_for_sub_*` tests pass. + +- [ ] **Step 4: Lint both crates** + +Run: `cargo clippy -p signet-rpc --all-features --all-targets && cargo clippy -p signet-node --all-features --all-targets` + +Expected: Clean. + +- [ ] **Step 5: Format** + +Run: `cargo +nightly fmt` + +- [ ] **Step 6: Commit** + +```bash +git add crates/rpc/src/interest/mod.rs crates/rpc/src/interest/filters.rs crates/rpc/src/interest/kind.rs crates/node/src/node.rs +git commit -m "fix: preserve tx_hash and log_index on removed logs + +Change RemovedBlock.logs from Vec to +Vec. The producer (notify_reorg) now passes +through the full RPC log from ColdReceipt instead of stripping the +wrapper. Consumers (compute_removed_logs, filter_reorg_for_sub) clone +and set removed=true instead of reconstructing with None fields. + +This satisfies the Ethereum JSON-RPC spec requirement that removed +logs include transaction_hash and log_index." +``` + +--- + +## Chunk 2: Move ring buffer into ChainNotifier + +### Task 4: Add ring buffer and reorgs_since to ChainNotifier + +**Files:** +- Modify: `crates/rpc/src/config/chain_notifier.rs` + +- [ ] **Step 1: Add imports and constant** + +In `crates/rpc/src/config/chain_notifier.rs`, add imports and constant. +Replace the existing imports: + +```rust +use crate::{ + config::resolve::BlockTags, + interest::{ChainEvent, NewBlockNotification, ReorgNotification}, +}; +use std::{ + collections::VecDeque, + sync::{Arc, RwLock}, + time::Instant, +}; +use tokio::sync::broadcast; +``` + +Add constant after imports: + +```rust +/// Maximum number of reorg notifications retained in the ring buffer. +/// At most one reorg per 12 seconds and a 5-minute stale filter TTL +/// gives a worst case of 25 entries. +const MAX_REORG_ENTRIES: usize = 25; +``` + +- [ ] **Step 2: Add `reorgs` field to `ChainNotifier`** + +```rust +pub struct ChainNotifier { + tags: BlockTags, + notif_tx: broadcast::Sender, + reorgs: Arc)>>>, +} +``` + +- [ ] **Step 3: Update `new` to initialize the field** + +```rust + pub fn new(channel_capacity: usize) -> Self { + let tags = BlockTags::new(0, 0, 0); + let (notif_tx, _) = broadcast::channel(channel_capacity); + Self { tags, notif_tx, reorgs: Arc::new(RwLock::new(VecDeque::new())) } + } +``` + +- [ ] **Step 4: Update `send_reorg` to write ring buffer first** + +Replace `send_reorg`: + +```rust + /// Send a reorg notification. + /// + /// Writes the notification to the authoritative ring buffer before + /// broadcasting to push subscribers. The broadcast `Err` only means + /// "no active push subscribers" — the ring buffer is unaffected. + #[allow(clippy::result_large_err)] + pub fn send_reorg( + &self, + notif: ReorgNotification, + ) -> Result> { + { + let mut buf = self.reorgs.write().unwrap(); + if buf.len() >= MAX_REORG_ENTRIES { + buf.pop_front(); + } + buf.push_back((Instant::now(), Arc::new(notif.clone()))); + } + self.send_event(ChainEvent::Reorg(notif)) + } +``` + +- [ ] **Step 5: Add `reorgs_since` method** + +Add after the `notif_sender` method: + +```rust + /// Return all reorg notifications received after `since`. + /// + /// Clones the [`Arc`]s under a brief read lock. Used by polling + /// filters at poll time to compute removed logs. + pub fn reorgs_since(&self, since: Instant) -> Vec> { + self.reorgs + .read() + .unwrap() + .iter() + .filter(|(received_at, _)| *received_at > since) + .map(|(_, reorg)| Arc::clone(reorg)) + .collect() + } +``` + +- [ ] **Step 6: Update the doctest** + +The construction doctest needs no changes — `ChainNotifier::new(128)` +still works. But verify: + +Run: `cargo t -p signet-rpc --doc` + +Expected: Doctest passes. + +- [ ] **Step 7: Lint** + +Run: `cargo clippy -p signet-rpc --all-features --all-targets` + +Expected: Errors in `filters.rs` (still references old ring buffer code). +That's fine — fixed in Task 5. + +### Task 5: Remove ring buffer and FilterReorgTask from FilterManager + +**Files:** +- Modify: `crates/rpc/src/interest/filters.rs` + +- [ ] **Step 1: Add ChainNotifier import, remove broadcast import** + +Replace the imports at the top of `filters.rs`: + +```rust +use crate::{ + config::ChainNotifier, + interest::{InterestKind, ReorgNotification, buffer::EventBuffer}, +}; +use alloy::{ + primitives::{B256, U64}, + rpc::types::{Filter, Log}, +}; +use dashmap::{DashMap, mapref::one::RefMut}; +use std::{ + sync::{ + Arc, Weak, + atomic::{AtomicU64, Ordering}, + }, + time::{Duration, Instant}, +}; +use tracing::trace; +``` + +Removed vs. current: `tokio::sync::broadcast` gone, `RwLock` gone, +`VecDeque` gone, `ChainEvent` gone from crate import. Added +`ChainNotifier`. Kept `ReorgNotification` (used by `compute_removed_logs`). + +- [ ] **Step 2: Remove `MAX_REORG_ENTRIES` constant** + +Delete: + +```rust +/// Maximum number of reorg notifications retained in the global ring +/// buffer. At most one reorg per 12 seconds and a 5-minute stale filter +/// TTL gives a worst case of 25 entries. +const MAX_REORG_ENTRIES: usize = 25; +``` + +- [ ] **Step 3: Remove reorg fields from `FilterManagerInner`** + +Update the struct: + +```rust +/// Inner logic for [`FilterManager`]. +#[derive(Debug)] +pub(crate) struct FilterManagerInner { + current_id: AtomicU64, + filters: DashMap, +} +``` + +Update `new`: + +```rust + fn new() -> Self { + // Start from 1, as 0 is weird in quantity encoding. + Self { + current_id: AtomicU64::new(1), + filters: DashMap::new(), + } + } +``` + +- [ ] **Step 4: Remove `push_reorg` and `reorgs_since` from `FilterManagerInner`** + +Delete these two methods entirely (lines 211-235 in the current file). + +- [ ] **Step 5: Update `FilterManager` struct and `new`** + +Replace the `FilterManager` struct, its doc comment, and `new`: + +```rust +/// Manager for filters. +/// +/// The manager tracks active filters, and periodically cleans stale filters. +/// Filters are stored in a [`DashMap`] that maps filter IDs to active filters. +/// Filter IDs are assigned sequentially, starting from 1. +/// +/// Reorg notifications are read from the [`ChainNotifier`]'s ring buffer +/// at poll time. Filters compute their removed logs lazily by scanning +/// for entries received since the last poll. +/// +/// Calling [`Self::new`] spawns an OS thread that periodically cleans +/// stale filters (using a separate thread to avoid [`DashMap::retain`] +/// deadlock). The worker holds a [`Weak`] reference and self-terminates +/// when the manager is dropped. +#[derive(Debug, Clone)] +pub(crate) struct FilterManager { + inner: Arc, + chain: ChainNotifier, +} + +impl FilterManager { + /// Create a new filter manager. + /// + /// Spawns a cleanup thread for stale filters. Reorg notifications + /// are read directly from the [`ChainNotifier`]'s ring buffer at + /// poll time — no background listener is needed. + pub(crate) fn new( + chain: ChainNotifier, + clean_interval: Duration, + age_limit: Duration, + ) -> Self { + let inner = Arc::new(FilterManagerInner::new()); + let manager = Self { inner, chain }; + FilterCleanTask::new(Arc::downgrade(&manager.inner), clean_interval, age_limit).spawn(); + manager + } + + /// Return all reorg notifications received after `since`. + /// + /// Delegates to [`ChainNotifier::reorgs_since`]. + pub(crate) fn reorgs_since(&self, since: Instant) -> Vec> { + self.chain.reorgs_since(since) + } +} + +impl std::ops::Deref for FilterManager { + type Target = FilterManagerInner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} +``` + +The `Deref` impl is preserved from the existing code — call sites like +`fm.get_mut(id)`, `fm.install_log_filter(...)`, etc. in `endpoints.rs` +depend on it to reach `FilterManagerInner` methods. + +- [ ] **Step 6: Delete `FilterReorgTask`** + +Delete the entire `FilterReorgTask` struct and impl block (lines 327-364 +in the current file). + +- [ ] **Step 7: Update test module** + +The tests `push_reorg_evicts_oldest`, `reorgs_since_filters_by_time`, +and `reorgs_since_skips_pre_creation_reorgs` test ring buffer behavior +that now lives on `ChainNotifier`. Move them to a new `#[cfg(test)] mod +tests` block in `chain_notifier.rs` (Task 6). For now, delete them from +`filters.rs`. + +Remove these three tests and update the test imports. The remaining tests +only need: + +```rust +#[cfg(test)] +mod tests { + use super::*; + use crate::interest::{InterestKind, RemovedBlock}; + use alloy::primitives::{Address, Bytes, LogData, address, b256}; +``` + +(Same as before — the `RemovedBlock` import stays; the `Log` type comes +from `super::*`.) + +Also update `removed_block` test helper to match the new log type from +Task 2 (should already be done if Task 2 was completed first). + +- [ ] **Step 8: Lint** + +Run: `cargo clippy -p signet-rpc --all-features --all-targets` + +Expected: Errors in `ctx.rs` (wrong `FilterManager::new` signature). +Fixed in Task 7. + +### Task 6: Add ring buffer tests to ChainNotifier + +**Files:** +- Modify: `crates/rpc/src/config/chain_notifier.rs` + +- [ ] **Step 1: Add test module** + +Add at the bottom of `chain_notifier.rs`: + +```rust +#[cfg(test)] +mod tests { + use super::*; + use crate::interest::ReorgNotification; + + fn reorg_notification(ancestor: u64) -> ReorgNotification { + ReorgNotification { common_ancestor: ancestor, removed_blocks: vec![] } + } + + #[test] + fn push_reorg_evicts_oldest() { + let notifier = ChainNotifier::new(16); + for i in 0..MAX_REORG_ENTRIES + 5 { + notifier.send_reorg(reorg_notification(i as u64)).ok(); + } + let buf = notifier.reorgs.read().unwrap(); + assert_eq!(buf.len(), MAX_REORG_ENTRIES); + assert_eq!(buf[0].1.common_ancestor, 5); + } + + #[test] + fn reorgs_since_filters_by_time() { + let notifier = ChainNotifier::new(16); + let before = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + notifier.send_reorg(reorg_notification(10)).ok(); + let mid = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + notifier.send_reorg(reorg_notification(8)).ok(); + + let all = notifier.reorgs_since(before); + assert_eq!(all.len(), 2); + + let recent = notifier.reorgs_since(mid); + assert_eq!(recent.len(), 1); + assert_eq!(recent[0].common_ancestor, 8); + } + + #[test] + fn reorgs_since_skips_pre_creation_reorgs() { + let notifier = ChainNotifier::new(16); + notifier.send_reorg(reorg_notification(5)).ok(); + std::thread::sleep(Duration::from_millis(5)); + + let after = Instant::now(); + let reorgs = notifier.reorgs_since(after); + assert!(reorgs.is_empty()); + } +} +``` + +Add `Duration` to the existing `std` import in the file's main imports +(it's already brought in by the test module via `super::*` if we add it +to the main imports). Alternatively, add `use std::time::Duration;` +inside the test module. + +- [ ] **Step 2: Run tests** + +Run: `cargo t -p signet-rpc -- chain_notifier` + +Expected: All three new tests pass. + +### Task 7: Update call site in ctx.rs + +**Files:** +- Modify: `crates/rpc/src/config/ctx.rs:104-108` + +- [ ] **Step 1: Update `FilterManager::new` call** + +Replace: + +```rust + let filter_manager = FilterManager::new( + &chain.notif_sender(), + config.stale_filter_ttl, + config.stale_filter_ttl, + ); +``` + +With: + +```rust + let filter_manager = FilterManager::new( + chain.clone(), + config.stale_filter_ttl, + config.stale_filter_ttl, + ); +``` + +- [ ] **Step 2: Lint** + +Run: `cargo clippy -p signet-rpc --all-features --all-targets` + +Expected: Clean. + +### Task 8: Update module-level docs + +**Files:** +- Modify: `crates/rpc/src/interest/mod.rs:34-37` + +- [ ] **Step 1: Remove FilterReorgTask mention from module docs** + +Replace lines 34-37: + +```rust +//! [`FilterManager`] additionally spawns a tokio task that listens +//! for [`ChainEvent::Reorg`] broadcasts and eagerly propagates reorg +//! notifications to all active filters. This task does not call +//! `retain`, so it is safe to run in an async context. +``` + +With: + +```rust +//! [`FilterManager`] reads reorg notifications directly from the +//! [`ChainNotifier`]'s ring buffer at poll time — no background +//! listener task is needed. +//! +//! [`ChainNotifier`]: crate::ChainNotifier +``` + +- [ ] **Step 2: Full lint, test, format** + +Run: + +```bash +cargo clippy -p signet-rpc --all-features --all-targets +cargo clippy -p signet-node --all-features --all-targets +cargo t -p signet-rpc +cargo +nightly fmt +``` + +Expected: All clean, all tests pass. + +- [ ] **Step 3: Commit** + +```bash +git add crates/rpc/src/config/chain_notifier.rs crates/rpc/src/interest/filters.rs crates/rpc/src/interest/mod.rs crates/rpc/src/config/ctx.rs +git commit -m "refactor: move reorg ring buffer from FilterManager to ChainNotifier + +ChainNotifier now writes to an authoritative ring buffer before +broadcasting reorg events. Polling filters read it directly via +reorgs_since(), eliminating the broadcast::Receiver lag bug where +FilterReorgTask could silently drop reorg notifications. + +Deletes FilterReorgTask entirely. FilterManager::new now takes a +ChainNotifier instead of a broadcast::Sender." +``` + +- [ ] **Step 4: Final verification** + +Run full test suite: + +```bash +cargo t -p signet-rpc +cargo t -p signet-node +``` + +Expected: All tests pass. From 65679ab53154febf44e165a8848847e5374bdbd2 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 17 Mar 2026 09:36:36 -0400 Subject: [PATCH 11/13] fix: preserve tx_hash and log_index on removed logs Change RemovedBlock.logs from Vec to Vec. The producer (notify_reorg) now passes through the full RPC log from ColdReceipt instead of stripping the wrapper. Consumers (compute_removed_logs, filter_reorg_for_sub) clone and set removed=true instead of reconstructing with None fields. This satisfies the Ethereum JSON-RPC spec requirement that removed logs include transaction_hash and log_index. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/node/src/node.rs | 3 +- crates/rpc/src/interest/filters.rs | 73 ++++++++++++++++++------------ crates/rpc/src/interest/kind.rs | 53 ++++++++++++---------- crates/rpc/src/interest/mod.rs | 8 +++- 4 files changed, 83 insertions(+), 54 deletions(-) diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index b2ddc9a..1be52e0 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -371,8 +371,7 @@ where let number = d.header.number(); let hash = d.header.hash(); let timestamp = d.header.timestamp(); - let logs = - d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect(); + let logs = d.receipts.into_iter().flat_map(|r| r.receipt.logs).collect(); RemovedBlock { number, hash, timestamp, logs } }) .collect(); diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index 24fecea..e57b2a4 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -131,19 +131,12 @@ impl ActiveFilter { continue; } for log in &block.logs { - if !filter.matches(log) { + if !filter.matches(&log.inner) { continue; } - removed.push(Log { - inner: log.clone(), - block_hash: Some(block.hash), - block_number: Some(block.number), - block_timestamp: Some(block.timestamp), - transaction_hash: None, - transaction_index: None, - log_index: None, - removed: true, - }); + let mut log = log.clone(); + log.removed = true; + removed.push(log); } } } @@ -367,7 +360,7 @@ impl FilterReorgTask { mod tests { use super::*; use crate::interest::{InterestKind, RemovedBlock}; - use alloy::primitives::{Address, Bytes, LogData, address, b256}; + use alloy::primitives::{Address, B256, Bytes, LogData, address, b256}; fn block_filter(start: u64) -> ActiveFilter { ActiveFilter { @@ -385,15 +378,27 @@ mod tests { } } - fn test_log(addr: Address) -> alloy::primitives::Log { - alloy::primitives::Log { address: addr, data: LogData::new_unchecked(vec![], Bytes::new()) } + fn test_log(addr: Address, number: u64, hash: B256) -> Log { + Log { + inner: alloy::primitives::Log { + address: addr, + data: LogData::new_unchecked(vec![], Bytes::new()), + }, + block_hash: Some(hash), + block_number: Some(number), + block_timestamp: Some(1_000_000 + number), + transaction_hash: Some(B256::ZERO), + transaction_index: Some(0), + log_index: Some(0), + removed: false, + } } fn reorg_notification(ancestor: u64, removed: Vec) -> ReorgNotification { ReorgNotification { common_ancestor: ancestor, removed_blocks: removed } } - fn removed_block(number: u64, logs: Vec) -> RemovedBlock { + fn removed_block(number: u64, logs: Vec) -> RemovedBlock { RemovedBlock { number, hash: b256!("0x0000000000000000000000000000000000000000000000000000000000000001"), @@ -415,11 +420,15 @@ mod tests { #[test] fn compute_removed_logs_matches_removed() { let addr = address!("0x0000000000000000000000000000000000000001"); + let hash = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); let mut f = log_filter(11, addr); let reorg = Arc::new(reorg_notification( 8, - vec![removed_block(9, vec![test_log(addr)]), removed_block(10, vec![test_log(addr)])], + vec![ + removed_block(9, vec![test_log(addr, 9, hash)]), + removed_block(10, vec![test_log(addr, 10, hash)]), + ], )); let removed = f.compute_removed_logs(&[reorg]); @@ -427,11 +436,15 @@ mod tests { assert_eq!(removed.len(), 2); assert!(removed.iter().all(|l| l.removed)); assert!(removed.iter().all(|l| l.inner.address == addr)); + // Verify RPC metadata is preserved through clone-and-set path. + assert!(removed.iter().all(|l| l.transaction_hash == Some(B256::ZERO))); + assert!(removed.iter().all(|l| l.log_index == Some(0))); } #[test] fn compute_removed_logs_skips_undelivered_blocks() { let addr = address!("0x0000000000000000000000000000000000000001"); + let hash = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); // Filter has only delivered up to block 10 (next_start = 11). let mut f = log_filter(11, addr); @@ -439,9 +452,9 @@ mod tests { let reorg = Arc::new(reorg_notification( 9, vec![ - removed_block(10, vec![test_log(addr)]), - removed_block(11, vec![test_log(addr)]), - removed_block(12, vec![test_log(addr)]), + removed_block(10, vec![test_log(addr, 10, hash)]), + removed_block(11, vec![test_log(addr, 11, hash)]), + removed_block(12, vec![test_log(addr, 12, hash)]), ], )); @@ -454,27 +467,31 @@ mod tests { #[test] fn compute_removed_logs_cascading() { let addr = address!("0x0000000000000000000000000000000000000001"); + let hash = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); // Filter has delivered up to block 100 (next_start = 101). let mut f = log_filter(101, addr); // Reorg A: rewinds to 98, removes 99-100. let reorg_a = Arc::new(reorg_notification( 98, - vec![removed_block(99, vec![test_log(addr)]), removed_block(100, vec![test_log(addr)])], + vec![ + removed_block(99, vec![test_log(addr, 99, hash)]), + removed_block(100, vec![test_log(addr, 100, hash)]), + ], )); // Reorg B: rewinds to 95, removes 96-103. let reorg_b = Arc::new(reorg_notification( 95, vec![ - removed_block(96, vec![test_log(addr)]), - removed_block(97, vec![test_log(addr)]), - removed_block(98, vec![test_log(addr)]), - removed_block(99, vec![test_log(addr)]), - removed_block(100, vec![test_log(addr)]), - removed_block(101, vec![test_log(addr)]), - removed_block(102, vec![test_log(addr)]), - removed_block(103, vec![test_log(addr)]), + removed_block(96, vec![test_log(addr, 96, hash)]), + removed_block(97, vec![test_log(addr, 97, hash)]), + removed_block(98, vec![test_log(addr, 98, hash)]), + removed_block(99, vec![test_log(addr, 99, hash)]), + removed_block(100, vec![test_log(addr, 100, hash)]), + removed_block(101, vec![test_log(addr, 101, hash)]), + removed_block(102, vec![test_log(addr, 102, hash)]), + removed_block(103, vec![test_log(addr, 103, hash)]), ], )); diff --git a/crates/rpc/src/interest/kind.rs b/crates/rpc/src/interest/kind.rs index 72cd78f..cb09c74 100644 --- a/crates/rpc/src/interest/kind.rs +++ b/crates/rpc/src/interest/kind.rs @@ -110,22 +110,12 @@ impl InterestKind { let logs: VecDeque = reorg .removed_blocks .iter() - .flat_map(|block| { - let hash = block.hash; - let number = block.number; - let timestamp = block.timestamp; - block.logs.iter().map(move |log| (hash, number, timestamp, log)) - }) - .filter(|(_, _, _, log)| filter.matches(log)) - .map(|(hash, number, timestamp, log)| Log { - inner: log.clone(), - block_hash: Some(hash), - block_number: Some(number), - block_timestamp: Some(timestamp), - transaction_hash: None, - transaction_index: None, - log_index: None, - removed: true, + .flat_map(|block| block.logs.iter()) + .filter(|log| filter.matches(&log.inner)) + .map(|log| { + let mut log = log.clone(); + log.removed = true; + log }) .collect(); @@ -138,10 +128,19 @@ mod tests { use super::*; use alloy::primitives::{Address, B256, Bytes, LogData, address, b256}; - fn test_log(addr: Address, topic: B256) -> alloy::primitives::Log { - alloy::primitives::Log { - address: addr, - data: LogData::new_unchecked(vec![topic], Bytes::new()), + fn test_log(addr: Address, topic: B256, number: u64, hash: B256) -> Log { + Log { + inner: alloy::primitives::Log { + address: addr, + data: LogData::new_unchecked(vec![topic], Bytes::new()), + }, + block_hash: Some(hash), + block_number: Some(number), + block_timestamp: Some(1_000_000 + number), + transaction_hash: Some(B256::ZERO), + transaction_index: Some(0), + log_index: Some(0), + removed: false, } } @@ -152,7 +151,7 @@ mod tests { fn test_removed_block( number: u64, hash: B256, - logs: Vec, + logs: Vec, ) -> crate::interest::RemovedBlock { crate::interest::RemovedBlock { number, hash, timestamp: 1_000_000 + number, logs } } @@ -167,7 +166,11 @@ mod tests { let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![test_removed_block(11, block_hash, vec![test_log(addr, topic)])], + removed_blocks: vec![test_removed_block( + 11, + block_hash, + vec![test_log(addr, topic, 11, block_hash)], + )], }; let buf = kind.filter_reorg_for_sub(reorg); @@ -192,7 +195,11 @@ mod tests { let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![test_removed_block(11, block_hash, vec![test_log(other, topic)])], + removed_blocks: vec![test_removed_block( + 11, + block_hash, + vec![test_log(other, topic, 11, block_hash)], + )], }; let buf = kind.filter_reorg_for_sub(reorg); diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index 2403037..3feda5c 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -84,7 +84,13 @@ pub struct RemovedBlock { /// The block timestamp. pub timestamp: u64, /// Logs emitted in the removed block. - pub logs: Vec, + /// + /// Uses the RPC log type so that `transaction_hash` and `log_index` + /// from the original receipts can be preserved, as required by the + /// Ethereum JSON-RPC spec for removed logs. These fields are + /// populated when cold storage has indexed the block; otherwise the + /// vec may be empty. + pub logs: Vec, } /// Notification sent when a chain reorganization is detected. From 2a74c5431d27dd9e765966346cf32262821f9135 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 17 Mar 2026 09:43:47 -0400 Subject: [PATCH 12/13] refactor: move reorg ring buffer from FilterManager to ChainNotifier ChainNotifier now writes to an authoritative ring buffer before broadcasting reorg events. Polling filters read it directly via reorgs_since(), eliminating the broadcast::Receiver lag bug where FilterReorgTask could silently drop reorg notifications. Deletes FilterReorgTask entirely. FilterManager::new now takes a ChainNotifier instead of a broadcast::Sender. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/rpc/src/config/chain_notifier.rs | 93 ++++++++++++- crates/rpc/src/config/ctx.rs | 7 +- crates/rpc/src/interest/filters.rs | 171 ++++-------------------- crates/rpc/src/interest/mod.rs | 9 +- 4 files changed, 123 insertions(+), 157 deletions(-) diff --git a/crates/rpc/src/config/chain_notifier.rs b/crates/rpc/src/config/chain_notifier.rs index 9d719ab..94d1b30 100644 --- a/crates/rpc/src/config/chain_notifier.rs +++ b/crates/rpc/src/config/chain_notifier.rs @@ -4,8 +4,21 @@ use crate::{ config::resolve::BlockTags, interest::{ChainEvent, NewBlockNotification, ReorgNotification}, }; +use std::{ + collections::VecDeque, + sync::{Arc, RwLock}, + time::Instant, +}; use tokio::sync::broadcast; +/// Maximum number of reorg notifications retained in the ring buffer. +/// At most one reorg per 12 seconds and a 5-minute stale filter TTL +/// gives a worst case of 25 entries. +const MAX_REORG_ENTRIES: usize = 25; + +/// Timestamped reorg ring buffer. +type ReorgBuffer = VecDeque<(Instant, Arc)>; + /// Shared chain state between the node and RPC layer. /// /// Combines block tag tracking and chain event notification into a single @@ -27,6 +40,7 @@ use tokio::sync::broadcast; pub struct ChainNotifier { tags: BlockTags, notif_tx: broadcast::Sender, + reorgs: Arc>, } impl ChainNotifier { @@ -35,7 +49,7 @@ impl ChainNotifier { pub fn new(channel_capacity: usize) -> Self { let tags = BlockTags::new(0, 0, 0); let (notif_tx, _) = broadcast::channel(channel_capacity); - Self { tags, notif_tx } + Self { tags, notif_tx, reorgs: Arc::new(RwLock::new(VecDeque::new())) } } /// Access the block tags. @@ -57,13 +71,21 @@ impl ChainNotifier { /// Send a reorg notification. /// - /// Returns `Ok(receiver_count)` or `Err` if there are no active - /// receivers (which is not usually an error condition). + /// Writes the notification to the authoritative ring buffer before + /// broadcasting to push subscribers. The broadcast `Err` only means + /// "no active push subscribers" — the ring buffer is unaffected. #[allow(clippy::result_large_err)] pub fn send_reorg( &self, notif: ReorgNotification, ) -> Result> { + { + let mut buf = self.reorgs.write().unwrap(); + if buf.len() >= MAX_REORG_ENTRIES { + buf.pop_front(); + } + buf.push_back((Instant::now(), Arc::new(notif.clone()))); + } self.send_event(ChainEvent::Reorg(notif)) } @@ -81,6 +103,20 @@ impl ChainNotifier { self.notif_tx.subscribe() } + /// Return all reorg notifications received after `since`. + /// + /// Clones the [`Arc`]s under a brief read lock. Used by polling + /// filters at poll time to compute removed logs. + pub fn reorgs_since(&self, since: Instant) -> Vec> { + self.reorgs + .read() + .unwrap() + .iter() + .filter(|(received_at, _)| *received_at > since) + .map(|(_, reorg)| Arc::clone(reorg)) + .collect() + } + /// Get a clone of the broadcast sender. /// /// Used by the subscription manager to create its own receiver. @@ -88,3 +124,54 @@ impl ChainNotifier { self.notif_tx.clone() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::interest::ReorgNotification; + use std::time::Duration; + + fn reorg_notification(ancestor: u64) -> ReorgNotification { + ReorgNotification { common_ancestor: ancestor, removed_blocks: vec![] } + } + + #[test] + fn push_reorg_evicts_oldest() { + let notifier = ChainNotifier::new(16); + for i in 0..MAX_REORG_ENTRIES + 5 { + notifier.send_reorg(reorg_notification(i as u64)).ok(); + } + let buf = notifier.reorgs.read().unwrap(); + assert_eq!(buf.len(), MAX_REORG_ENTRIES); + assert_eq!(buf[0].1.common_ancestor, 5); + } + + #[test] + fn reorgs_since_filters_by_time() { + let notifier = ChainNotifier::new(16); + let before = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + notifier.send_reorg(reorg_notification(10)).ok(); + let mid = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + notifier.send_reorg(reorg_notification(8)).ok(); + + let all = notifier.reorgs_since(before); + assert_eq!(all.len(), 2); + + let recent = notifier.reorgs_since(mid); + assert_eq!(recent.len(), 1); + assert_eq!(recent[0].common_ancestor, 8); + } + + #[test] + fn reorgs_since_skips_pre_creation_reorgs() { + let notifier = ChainNotifier::new(16); + notifier.send_reorg(reorg_notification(5)).ok(); + std::thread::sleep(Duration::from_millis(5)); + + let after = Instant::now(); + let reorgs = notifier.reorgs_since(after); + assert!(reorgs.is_empty()); + } +} diff --git a/crates/rpc/src/config/ctx.rs b/crates/rpc/src/config/ctx.rs index 166979b..27b70f4 100644 --- a/crates/rpc/src/config/ctx.rs +++ b/crates/rpc/src/config/ctx.rs @@ -101,11 +101,8 @@ impl StorageRpcCtx { config: StorageRpcConfig, ) -> Self { let tracing_semaphore = Arc::new(Semaphore::new(config.max_tracing_requests)); - let filter_manager = FilterManager::new( - &chain.notif_sender(), - config.stale_filter_ttl, - config.stale_filter_ttl, - ); + let filter_manager = + FilterManager::new(chain.clone(), config.stale_filter_ttl, config.stale_filter_ttl); let sub_manager = SubscriptionManager::new(chain.notif_sender(), config.stale_filter_ttl); let gas_cache = GasOracleCache::new(); Self { diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index e57b2a4..3f97ffd 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -1,20 +1,21 @@ //! Filter management for `eth_newFilter` / `eth_getFilterChanges`. -use crate::interest::{ChainEvent, InterestKind, ReorgNotification, buffer::EventBuffer}; +use crate::{ + config::ChainNotifier, + interest::{InterestKind, ReorgNotification, buffer::EventBuffer}, +}; use alloy::{ primitives::{B256, U64}, rpc::types::{Filter, Log}, }; use dashmap::{DashMap, mapref::one::RefMut}; use std::{ - collections::VecDeque, sync::{ - Arc, RwLock, Weak, + Arc, Weak, atomic::{AtomicU64, Ordering}, }, time::{Duration, Instant}, }; -use tokio::sync::broadcast; use tracing::trace; type FilterId = U64; @@ -22,11 +23,6 @@ type FilterId = U64; /// Output of a polled filter: log entries or block hashes. pub(crate) type FilterOutput = EventBuffer; -/// Maximum number of reorg notifications retained in the global ring -/// buffer. At most one reorg per 12 seconds and a 5-minute stale filter -/// TTL gives a worst case of 25 entries. -const MAX_REORG_ENTRIES: usize = 25; - /// An active filter. /// /// Records the filter details, the [`Instant`] at which the filter was last @@ -149,18 +145,13 @@ impl ActiveFilter { pub(crate) struct FilterManagerInner { current_id: AtomicU64, filters: DashMap, - reorgs: RwLock)>>, } impl FilterManagerInner { /// Create a new filter manager. fn new() -> Self { // Start from 1, as 0 is weird in quantity encoding. - Self { - current_id: AtomicU64::new(1), - filters: DashMap::new(), - reorgs: RwLock::new(VecDeque::new()), - } + Self { current_id: AtomicU64::new(1), filters: DashMap::new() } } /// Get the next filter ID. @@ -201,32 +192,6 @@ impl FilterManagerInner { self.filters.remove(&id) } - /// Append a reorg notification to the global ring buffer. - /// - /// Evicts the oldest entry when the buffer is full. The - /// [`Arc`] is shared across all poll-time readers. - pub(crate) fn push_reorg(&self, reorg: ReorgNotification) { - let entry = (Instant::now(), Arc::new(reorg)); - let mut buf = self.reorgs.write().unwrap(); - if buf.len() >= MAX_REORG_ENTRIES { - buf.pop_front(); - } - buf.push_back(entry); - } - - /// Return all reorg notifications received after `since`. - /// - /// Clones the [`Arc`]s under a brief read lock and returns them. - pub(crate) fn reorgs_since(&self, since: Instant) -> Vec> { - self.reorgs - .read() - .unwrap() - .iter() - .filter(|(received_at, _)| *received_at > since) - .map(|(_, reorg)| Arc::clone(reorg)) - .collect() - } - /// Clean stale filters that have not been polled in a while. fn clean_stale(&self, older_than: Duration) { self.filters.retain(|_, filter| filter.time_since_last_poll() < older_than); @@ -239,41 +204,39 @@ impl FilterManagerInner { /// Filters are stored in a [`DashMap`] that maps filter IDs to active filters. /// Filter IDs are assigned sequentially, starting from 1. /// -/// Reorg notifications are stored in a global ring buffer (capped at -/// [`MAX_REORG_ENTRIES`]). Filters compute their removed logs lazily at -/// poll time by scanning the buffer for entries received since the last -/// poll. -/// -/// Calling [`Self::new`] spawns two background workers: -/// - An OS thread that periodically cleans stale filters (using a separate -/// thread to avoid [`DashMap::retain`] deadlock). -/// - A tokio task that listens for reorg broadcasts and appends them to -/// the global ring buffer. +/// Reorg notifications are read from the [`ChainNotifier`]'s ring buffer +/// at poll time. Filters compute their removed logs lazily by scanning +/// for entries received since the last poll. /// -/// Both workers hold [`Weak`] references and self-terminate when the -/// manager is dropped. +/// Calling [`Self::new`] spawns an OS thread that periodically cleans +/// stale filters (using a separate thread to avoid [`DashMap::retain`] +/// deadlock). The worker holds a [`Weak`] reference and self-terminates +/// when the manager is dropped. #[derive(Debug, Clone)] pub(crate) struct FilterManager { inner: Arc, + chain: ChainNotifier, } impl FilterManager { /// Create a new filter manager. /// - /// Spawns a cleanup thread for stale filters and a tokio task that - /// listens for [`ChainEvent::Reorg`] events and appends them to the - /// global ring buffer. - pub(crate) fn new( - chain_events: &broadcast::Sender, - clean_interval: Duration, - age_limit: Duration, - ) -> Self { + /// Spawns a cleanup thread for stale filters. Reorg notifications + /// are read directly from the [`ChainNotifier`]'s ring buffer at + /// poll time — no background listener is needed. + pub(crate) fn new(chain: ChainNotifier, clean_interval: Duration, age_limit: Duration) -> Self { let inner = Arc::new(FilterManagerInner::new()); - let manager = Self { inner }; + let manager = Self { inner, chain }; FilterCleanTask::new(Arc::downgrade(&manager.inner), clean_interval, age_limit).spawn(); - FilterReorgTask::new(Arc::downgrade(&manager.inner), chain_events.subscribe()).spawn(); manager } + + /// Return all reorg notifications received after `since`. + /// + /// Delegates to [`ChainNotifier::reorgs_since`]. + pub(crate) fn reorgs_since(&self, since: Instant) -> Vec> { + self.chain.reorgs_since(since) + } } impl std::ops::Deref for FilterManager { @@ -317,45 +280,6 @@ impl FilterCleanTask { } } -/// Task that listens for reorg events and appends them to the global -/// ring buffer. -/// -/// Uses a [`Weak`] reference to self-terminate when the [`FilterManager`] -/// is dropped. -struct FilterReorgTask { - manager: Weak, - rx: broadcast::Receiver, -} - -impl FilterReorgTask { - const fn new(manager: Weak, rx: broadcast::Receiver) -> Self { - Self { manager, rx } - } - - /// Spawn the listener as a tokio task. - fn spawn(self) { - tokio::spawn(self.run()); - } - - async fn run(mut self) { - loop { - let event = match self.rx.recv().await { - Ok(event) => event, - Err(broadcast::error::RecvError::Lagged(skipped)) => { - trace!(skipped, "filter reorg listener missed notifications"); - continue; - } - Err(_) => break, - }; - - let ChainEvent::Reorg(reorg) = event else { continue }; - - let Some(manager) = self.manager.upgrade() else { break }; - manager.push_reorg(reorg); - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -515,49 +439,6 @@ mod tests { // But the rewind still happened. assert_eq!(f.next_start_block, 6); } - - #[test] - fn push_reorg_evicts_oldest() { - let inner = FilterManagerInner::new(); - for i in 0..MAX_REORG_ENTRIES + 5 { - inner.push_reorg(reorg_notification(i as u64, vec![])); - } - let buf = inner.reorgs.read().unwrap(); - assert_eq!(buf.len(), MAX_REORG_ENTRIES); - // Oldest surviving entry should be the 6th push (index 5). - assert_eq!(buf[0].1.common_ancestor, 5); - } - - #[test] - fn reorgs_since_filters_by_time() { - let inner = FilterManagerInner::new(); - let before = Instant::now(); - std::thread::sleep(Duration::from_millis(5)); - inner.push_reorg(reorg_notification(10, vec![])); - let mid = Instant::now(); - std::thread::sleep(Duration::from_millis(5)); - inner.push_reorg(reorg_notification(8, vec![])); - - let all = inner.reorgs_since(before); - assert_eq!(all.len(), 2); - - let recent = inner.reorgs_since(mid); - assert_eq!(recent.len(), 1); - assert_eq!(recent[0].common_ancestor, 8); - } - - #[test] - fn reorgs_since_skips_pre_creation_reorgs() { - let inner = FilterManagerInner::new(); - inner.push_reorg(reorg_notification(5, vec![])); - std::thread::sleep(Duration::from_millis(5)); - - let id = inner.install_block_filter(20); - let filter = inner.filters.get(&id).unwrap(); - - let reorgs = inner.reorgs_since(filter.last_poll_time); - assert!(reorgs.is_empty()); - } } // Some code in this file has been copied and modified from reth diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index 3feda5c..ce5472f 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -31,10 +31,11 @@ //! cleanup on a dedicated OS thread ensures the retain lock is never //! contended with an in-flight async handler. //! -//! [`FilterManager`] additionally spawns a tokio task that listens -//! for [`ChainEvent::Reorg`] broadcasts and eagerly propagates reorg -//! notifications to all active filters. This task does not call -//! `retain`, so it is safe to run in an async context. +//! [`FilterManager`] reads reorg notifications directly from the +//! [`ChainNotifier`]'s ring buffer at poll time — no background +//! listener task is needed. +//! +//! [`ChainNotifier`]: crate::ChainNotifier //! //! [`Weak`]: std::sync::Weak //! [`DashMap`]: dashmap::DashMap From 8df8d45d8121eaf993f61e6b68691dfdbc7e615d Mon Sep 17 00:00:00 2001 From: James Date: Tue, 17 Mar 2026 10:39:15 -0400 Subject: [PATCH 13/13] chore: remove plan/spec files and gitignore docs/superpowers/ Co-Authored-By: Claude Opus 4.6 (1M context) --- .gitignore | 1 + ...26-03-17-reorg-ring-buffer-and-log-type.md | 734 ------------------ ...7-reorg-ring-buffer-and-log-type-design.md | 156 ---- 3 files changed, 1 insertion(+), 890 deletions(-) delete mode 100644 docs/superpowers/plans/2026-03-17-reorg-ring-buffer-and-log-type.md delete mode 100644 docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md diff --git a/.gitignore b/.gitignore index 208f6f5..faa6dc5 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ .DS_Store Cargo.lock .idea/ +docs/superpowers/ diff --git a/docs/superpowers/plans/2026-03-17-reorg-ring-buffer-and-log-type.md b/docs/superpowers/plans/2026-03-17-reorg-ring-buffer-and-log-type.md deleted file mode 100644 index de4c8b1..0000000 --- a/docs/superpowers/plans/2026-03-17-reorg-ring-buffer-and-log-type.md +++ /dev/null @@ -1,734 +0,0 @@ -# Reorg Ring Buffer & Log Type Implementation Plan - -> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Address Fraser's PR #98 review: move the reorg ring buffer from `FilterManagerInner` to `ChainNotifier`, and change `RemovedBlock.logs` to `Vec` for Ethereum spec compliance. - -**Architecture:** Two independent changes. Change 2 (log type) is implemented first because it's simpler and Change 1's tests must agree on the log type. Change 1 moves the ring buffer upstream into `ChainNotifier`, deletes `FilterReorgTask`, and has `FilterManager` delegate `reorgs_since` to `ChainNotifier`. - -**Tech Stack:** Rust, alloy, tokio broadcast, signet-rpc, signet-node - -**Spec:** `docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md` - ---- - -## Chunk 1: Preserve rpc::types::Log in RemovedBlock - -### Task 1: Change RemovedBlock.logs type and update producer - -**Files:** -- Modify: `crates/rpc/src/interest/mod.rs:87` -- Modify: `crates/node/src/node.rs:367-381` - -- [ ] **Step 1: Change `RemovedBlock.logs` type** - -In `crates/rpc/src/interest/mod.rs`, change: - -```rust - /// Logs emitted in the removed block. - pub logs: Vec, -``` - -To: - -```rust - /// Logs emitted in the removed block. - /// - /// Uses the RPC log type to preserve `transaction_hash` and - /// `log_index` from the original receipts, as required by the - /// Ethereum JSON-RPC spec for removed logs. - pub logs: Vec, -``` - -- [ ] **Step 2: Update the producer in node.rs** - -In `crates/node/src/node.rs`, the `notify_reorg` method currently strips the RPC wrapper. Change: - -```rust - let logs = - d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect(); -``` - -To: - -```rust - let logs = d.receipts.into_iter().flat_map(|r| r.receipt.logs).collect(); -``` - -- [ ] **Step 3: Verify compilation** - -Run: `cargo clippy -p signet-node --all-features --all-targets 2>&1 | head -30` - -Expected: Compilation errors in `filters.rs` and `kind.rs` because they -still construct `Log` from `primitives::Log` — these are fixed in Tasks 2 -and 3. - -### Task 2: Update compute_removed_logs consumer (filters.rs) - -**Files:** -- Modify: `crates/rpc/src/interest/filters.rs:117-151` -- Modify: `crates/rpc/src/interest/filters.rs:388-403` (test helpers) - -- [ ] **Step 1: Update `compute_removed_logs`** - -In `crates/rpc/src/interest/filters.rs`, replace the body of -`compute_removed_logs` (lines 117-151). The key changes: -- `filter.matches(log)` becomes `filter.matches(&log.inner)` (rpc Log wraps primitives Log) -- Instead of constructing a new `Log`, clone the existing one and set `removed: true` - -```rust - pub(crate) fn compute_removed_logs(&mut self, reorgs: &[Arc]) -> Vec { - let Some(filter) = self.kind.as_filter() else { - for reorg in reorgs { - self.next_start_block = self.next_start_block.min(reorg.common_ancestor + 1); - } - return Vec::new(); - }; - - let mut removed = Vec::new(); - for notification in reorgs { - let snapshot = self.next_start_block; - self.next_start_block = self.next_start_block.min(notification.common_ancestor + 1); - for block in ¬ification.removed_blocks { - if block.number >= snapshot { - continue; - } - for log in &block.logs { - if !filter.matches(&log.inner) { - continue; - } - let mut log = log.clone(); - log.removed = true; - removed.push(log); - } - } - } - removed - } -``` - -- [ ] **Step 2: Update test helpers** - -In the `mod tests` block of `filters.rs`, update `test_log` and -`removed_block` to use `alloy::rpc::types::Log`: - -Replace the `test_log` helper: - -```rust - fn test_log(addr: Address, number: u64, hash: B256) -> Log { - Log { - inner: alloy::primitives::Log { - address: addr, - data: LogData::new_unchecked(vec![], Bytes::new()), - }, - block_hash: Some(hash), - block_number: Some(number), - block_timestamp: Some(1_000_000 + number), - transaction_hash: Some(B256::ZERO), - transaction_index: Some(0), - log_index: Some(0), - removed: false, - } - } -``` - -Replace the `removed_block` helper: - -```rust - fn removed_block(number: u64, logs: Vec) -> RemovedBlock { - RemovedBlock { - number, - hash: b256!("0x0000000000000000000000000000000000000001"), - timestamp: 1_000_000 + number, - logs, - } - } -``` - -Add `B256` to the test imports (already available via -`alloy::primitives`). The `Log` type comes from `super::*`. - -Update all test call sites that use `test_log` to pass block number and -hash. For example, `test_log(addr)` becomes -`test_log(addr, 9, b256!("0x...0001"))` where the number and hash match -the `removed_block` they appear in. Each `removed_block` call already -passes a block number — the logs inside should use the same number and -the same hash constant. - -- [ ] **Step 3: Run tests** - -Run: `cargo t -p signet-rpc -- filters` - -Expected: All `compute_removed_logs_*` and `push_reorg_*` and -`reorgs_since_*` tests pass. - -- [ ] **Step 4: Lint** - -Run: `cargo clippy -p signet-rpc --all-features --all-targets` - -Expected: Clean. - -### Task 3: Update filter_reorg_for_sub consumer (kind.rs) - -**Files:** -- Modify: `crates/rpc/src/interest/kind.rs:105-133` -- Modify: `crates/rpc/src/interest/kind.rs:141-158` (test helpers) - -- [ ] **Step 1: Update `filter_reorg_for_sub`** - -In `crates/rpc/src/interest/kind.rs`, replace the body of -`filter_reorg_for_sub`: - -```rust - pub(crate) fn filter_reorg_for_sub(&self, reorg: ReorgNotification) -> SubscriptionBuffer { - let Some(filter) = self.as_filter() else { - return self.empty_sub_buffer(); - }; - - let logs: VecDeque = reorg - .removed_blocks - .iter() - .flat_map(|block| block.logs.iter()) - .filter(|log| filter.matches(&log.inner)) - .map(|log| { - let mut log = log.clone(); - log.removed = true; - log - }) - .collect(); - - SubscriptionBuffer::Log(logs) - } -``` - -- [ ] **Step 2: Update test helpers** - -Replace the `test_log` helper: - -```rust - fn test_log(addr: Address, topic: B256, number: u64, hash: B256) -> Log { - Log { - inner: alloy::primitives::Log { - address: addr, - data: LogData::new_unchecked(vec![topic], Bytes::new()), - }, - block_hash: Some(hash), - block_number: Some(number), - block_timestamp: Some(1_000_000 + number), - transaction_hash: Some(B256::ZERO), - transaction_index: Some(0), - log_index: Some(0), - removed: false, - } - } -``` - -Replace the `test_removed_block` helper: - -```rust - fn test_removed_block( - number: u64, - hash: B256, - logs: Vec, - ) -> crate::interest::RemovedBlock { - crate::interest::RemovedBlock { number, hash, timestamp: 1_000_000 + number, logs } - } -``` - -Update all test call sites: `test_log(addr, topic)` becomes -`test_log(addr, topic, number, hash)` where number and hash match the -`test_removed_block` the log appears in. The `Log` type comes from -`use super::*`. Existing imports stay the same. - -- [ ] **Step 3: Run tests** - -Run: `cargo t -p signet-rpc -- kind` - -Expected: All `filter_reorg_for_sub_*` tests pass. - -- [ ] **Step 4: Lint both crates** - -Run: `cargo clippy -p signet-rpc --all-features --all-targets && cargo clippy -p signet-node --all-features --all-targets` - -Expected: Clean. - -- [ ] **Step 5: Format** - -Run: `cargo +nightly fmt` - -- [ ] **Step 6: Commit** - -```bash -git add crates/rpc/src/interest/mod.rs crates/rpc/src/interest/filters.rs crates/rpc/src/interest/kind.rs crates/node/src/node.rs -git commit -m "fix: preserve tx_hash and log_index on removed logs - -Change RemovedBlock.logs from Vec to -Vec. The producer (notify_reorg) now passes -through the full RPC log from ColdReceipt instead of stripping the -wrapper. Consumers (compute_removed_logs, filter_reorg_for_sub) clone -and set removed=true instead of reconstructing with None fields. - -This satisfies the Ethereum JSON-RPC spec requirement that removed -logs include transaction_hash and log_index." -``` - ---- - -## Chunk 2: Move ring buffer into ChainNotifier - -### Task 4: Add ring buffer and reorgs_since to ChainNotifier - -**Files:** -- Modify: `crates/rpc/src/config/chain_notifier.rs` - -- [ ] **Step 1: Add imports and constant** - -In `crates/rpc/src/config/chain_notifier.rs`, add imports and constant. -Replace the existing imports: - -```rust -use crate::{ - config::resolve::BlockTags, - interest::{ChainEvent, NewBlockNotification, ReorgNotification}, -}; -use std::{ - collections::VecDeque, - sync::{Arc, RwLock}, - time::Instant, -}; -use tokio::sync::broadcast; -``` - -Add constant after imports: - -```rust -/// Maximum number of reorg notifications retained in the ring buffer. -/// At most one reorg per 12 seconds and a 5-minute stale filter TTL -/// gives a worst case of 25 entries. -const MAX_REORG_ENTRIES: usize = 25; -``` - -- [ ] **Step 2: Add `reorgs` field to `ChainNotifier`** - -```rust -pub struct ChainNotifier { - tags: BlockTags, - notif_tx: broadcast::Sender, - reorgs: Arc)>>>, -} -``` - -- [ ] **Step 3: Update `new` to initialize the field** - -```rust - pub fn new(channel_capacity: usize) -> Self { - let tags = BlockTags::new(0, 0, 0); - let (notif_tx, _) = broadcast::channel(channel_capacity); - Self { tags, notif_tx, reorgs: Arc::new(RwLock::new(VecDeque::new())) } - } -``` - -- [ ] **Step 4: Update `send_reorg` to write ring buffer first** - -Replace `send_reorg`: - -```rust - /// Send a reorg notification. - /// - /// Writes the notification to the authoritative ring buffer before - /// broadcasting to push subscribers. The broadcast `Err` only means - /// "no active push subscribers" — the ring buffer is unaffected. - #[allow(clippy::result_large_err)] - pub fn send_reorg( - &self, - notif: ReorgNotification, - ) -> Result> { - { - let mut buf = self.reorgs.write().unwrap(); - if buf.len() >= MAX_REORG_ENTRIES { - buf.pop_front(); - } - buf.push_back((Instant::now(), Arc::new(notif.clone()))); - } - self.send_event(ChainEvent::Reorg(notif)) - } -``` - -- [ ] **Step 5: Add `reorgs_since` method** - -Add after the `notif_sender` method: - -```rust - /// Return all reorg notifications received after `since`. - /// - /// Clones the [`Arc`]s under a brief read lock. Used by polling - /// filters at poll time to compute removed logs. - pub fn reorgs_since(&self, since: Instant) -> Vec> { - self.reorgs - .read() - .unwrap() - .iter() - .filter(|(received_at, _)| *received_at > since) - .map(|(_, reorg)| Arc::clone(reorg)) - .collect() - } -``` - -- [ ] **Step 6: Update the doctest** - -The construction doctest needs no changes — `ChainNotifier::new(128)` -still works. But verify: - -Run: `cargo t -p signet-rpc --doc` - -Expected: Doctest passes. - -- [ ] **Step 7: Lint** - -Run: `cargo clippy -p signet-rpc --all-features --all-targets` - -Expected: Errors in `filters.rs` (still references old ring buffer code). -That's fine — fixed in Task 5. - -### Task 5: Remove ring buffer and FilterReorgTask from FilterManager - -**Files:** -- Modify: `crates/rpc/src/interest/filters.rs` - -- [ ] **Step 1: Add ChainNotifier import, remove broadcast import** - -Replace the imports at the top of `filters.rs`: - -```rust -use crate::{ - config::ChainNotifier, - interest::{InterestKind, ReorgNotification, buffer::EventBuffer}, -}; -use alloy::{ - primitives::{B256, U64}, - rpc::types::{Filter, Log}, -}; -use dashmap::{DashMap, mapref::one::RefMut}; -use std::{ - sync::{ - Arc, Weak, - atomic::{AtomicU64, Ordering}, - }, - time::{Duration, Instant}, -}; -use tracing::trace; -``` - -Removed vs. current: `tokio::sync::broadcast` gone, `RwLock` gone, -`VecDeque` gone, `ChainEvent` gone from crate import. Added -`ChainNotifier`. Kept `ReorgNotification` (used by `compute_removed_logs`). - -- [ ] **Step 2: Remove `MAX_REORG_ENTRIES` constant** - -Delete: - -```rust -/// Maximum number of reorg notifications retained in the global ring -/// buffer. At most one reorg per 12 seconds and a 5-minute stale filter -/// TTL gives a worst case of 25 entries. -const MAX_REORG_ENTRIES: usize = 25; -``` - -- [ ] **Step 3: Remove reorg fields from `FilterManagerInner`** - -Update the struct: - -```rust -/// Inner logic for [`FilterManager`]. -#[derive(Debug)] -pub(crate) struct FilterManagerInner { - current_id: AtomicU64, - filters: DashMap, -} -``` - -Update `new`: - -```rust - fn new() -> Self { - // Start from 1, as 0 is weird in quantity encoding. - Self { - current_id: AtomicU64::new(1), - filters: DashMap::new(), - } - } -``` - -- [ ] **Step 4: Remove `push_reorg` and `reorgs_since` from `FilterManagerInner`** - -Delete these two methods entirely (lines 211-235 in the current file). - -- [ ] **Step 5: Update `FilterManager` struct and `new`** - -Replace the `FilterManager` struct, its doc comment, and `new`: - -```rust -/// Manager for filters. -/// -/// The manager tracks active filters, and periodically cleans stale filters. -/// Filters are stored in a [`DashMap`] that maps filter IDs to active filters. -/// Filter IDs are assigned sequentially, starting from 1. -/// -/// Reorg notifications are read from the [`ChainNotifier`]'s ring buffer -/// at poll time. Filters compute their removed logs lazily by scanning -/// for entries received since the last poll. -/// -/// Calling [`Self::new`] spawns an OS thread that periodically cleans -/// stale filters (using a separate thread to avoid [`DashMap::retain`] -/// deadlock). The worker holds a [`Weak`] reference and self-terminates -/// when the manager is dropped. -#[derive(Debug, Clone)] -pub(crate) struct FilterManager { - inner: Arc, - chain: ChainNotifier, -} - -impl FilterManager { - /// Create a new filter manager. - /// - /// Spawns a cleanup thread for stale filters. Reorg notifications - /// are read directly from the [`ChainNotifier`]'s ring buffer at - /// poll time — no background listener is needed. - pub(crate) fn new( - chain: ChainNotifier, - clean_interval: Duration, - age_limit: Duration, - ) -> Self { - let inner = Arc::new(FilterManagerInner::new()); - let manager = Self { inner, chain }; - FilterCleanTask::new(Arc::downgrade(&manager.inner), clean_interval, age_limit).spawn(); - manager - } - - /// Return all reorg notifications received after `since`. - /// - /// Delegates to [`ChainNotifier::reorgs_since`]. - pub(crate) fn reorgs_since(&self, since: Instant) -> Vec> { - self.chain.reorgs_since(since) - } -} - -impl std::ops::Deref for FilterManager { - type Target = FilterManagerInner; - - fn deref(&self) -> &Self::Target { - self.inner.deref() - } -} -``` - -The `Deref` impl is preserved from the existing code — call sites like -`fm.get_mut(id)`, `fm.install_log_filter(...)`, etc. in `endpoints.rs` -depend on it to reach `FilterManagerInner` methods. - -- [ ] **Step 6: Delete `FilterReorgTask`** - -Delete the entire `FilterReorgTask` struct and impl block (lines 327-364 -in the current file). - -- [ ] **Step 7: Update test module** - -The tests `push_reorg_evicts_oldest`, `reorgs_since_filters_by_time`, -and `reorgs_since_skips_pre_creation_reorgs` test ring buffer behavior -that now lives on `ChainNotifier`. Move them to a new `#[cfg(test)] mod -tests` block in `chain_notifier.rs` (Task 6). For now, delete them from -`filters.rs`. - -Remove these three tests and update the test imports. The remaining tests -only need: - -```rust -#[cfg(test)] -mod tests { - use super::*; - use crate::interest::{InterestKind, RemovedBlock}; - use alloy::primitives::{Address, Bytes, LogData, address, b256}; -``` - -(Same as before — the `RemovedBlock` import stays; the `Log` type comes -from `super::*`.) - -Also update `removed_block` test helper to match the new log type from -Task 2 (should already be done if Task 2 was completed first). - -- [ ] **Step 8: Lint** - -Run: `cargo clippy -p signet-rpc --all-features --all-targets` - -Expected: Errors in `ctx.rs` (wrong `FilterManager::new` signature). -Fixed in Task 7. - -### Task 6: Add ring buffer tests to ChainNotifier - -**Files:** -- Modify: `crates/rpc/src/config/chain_notifier.rs` - -- [ ] **Step 1: Add test module** - -Add at the bottom of `chain_notifier.rs`: - -```rust -#[cfg(test)] -mod tests { - use super::*; - use crate::interest::ReorgNotification; - - fn reorg_notification(ancestor: u64) -> ReorgNotification { - ReorgNotification { common_ancestor: ancestor, removed_blocks: vec![] } - } - - #[test] - fn push_reorg_evicts_oldest() { - let notifier = ChainNotifier::new(16); - for i in 0..MAX_REORG_ENTRIES + 5 { - notifier.send_reorg(reorg_notification(i as u64)).ok(); - } - let buf = notifier.reorgs.read().unwrap(); - assert_eq!(buf.len(), MAX_REORG_ENTRIES); - assert_eq!(buf[0].1.common_ancestor, 5); - } - - #[test] - fn reorgs_since_filters_by_time() { - let notifier = ChainNotifier::new(16); - let before = Instant::now(); - std::thread::sleep(Duration::from_millis(5)); - notifier.send_reorg(reorg_notification(10)).ok(); - let mid = Instant::now(); - std::thread::sleep(Duration::from_millis(5)); - notifier.send_reorg(reorg_notification(8)).ok(); - - let all = notifier.reorgs_since(before); - assert_eq!(all.len(), 2); - - let recent = notifier.reorgs_since(mid); - assert_eq!(recent.len(), 1); - assert_eq!(recent[0].common_ancestor, 8); - } - - #[test] - fn reorgs_since_skips_pre_creation_reorgs() { - let notifier = ChainNotifier::new(16); - notifier.send_reorg(reorg_notification(5)).ok(); - std::thread::sleep(Duration::from_millis(5)); - - let after = Instant::now(); - let reorgs = notifier.reorgs_since(after); - assert!(reorgs.is_empty()); - } -} -``` - -Add `Duration` to the existing `std` import in the file's main imports -(it's already brought in by the test module via `super::*` if we add it -to the main imports). Alternatively, add `use std::time::Duration;` -inside the test module. - -- [ ] **Step 2: Run tests** - -Run: `cargo t -p signet-rpc -- chain_notifier` - -Expected: All three new tests pass. - -### Task 7: Update call site in ctx.rs - -**Files:** -- Modify: `crates/rpc/src/config/ctx.rs:104-108` - -- [ ] **Step 1: Update `FilterManager::new` call** - -Replace: - -```rust - let filter_manager = FilterManager::new( - &chain.notif_sender(), - config.stale_filter_ttl, - config.stale_filter_ttl, - ); -``` - -With: - -```rust - let filter_manager = FilterManager::new( - chain.clone(), - config.stale_filter_ttl, - config.stale_filter_ttl, - ); -``` - -- [ ] **Step 2: Lint** - -Run: `cargo clippy -p signet-rpc --all-features --all-targets` - -Expected: Clean. - -### Task 8: Update module-level docs - -**Files:** -- Modify: `crates/rpc/src/interest/mod.rs:34-37` - -- [ ] **Step 1: Remove FilterReorgTask mention from module docs** - -Replace lines 34-37: - -```rust -//! [`FilterManager`] additionally spawns a tokio task that listens -//! for [`ChainEvent::Reorg`] broadcasts and eagerly propagates reorg -//! notifications to all active filters. This task does not call -//! `retain`, so it is safe to run in an async context. -``` - -With: - -```rust -//! [`FilterManager`] reads reorg notifications directly from the -//! [`ChainNotifier`]'s ring buffer at poll time — no background -//! listener task is needed. -//! -//! [`ChainNotifier`]: crate::ChainNotifier -``` - -- [ ] **Step 2: Full lint, test, format** - -Run: - -```bash -cargo clippy -p signet-rpc --all-features --all-targets -cargo clippy -p signet-node --all-features --all-targets -cargo t -p signet-rpc -cargo +nightly fmt -``` - -Expected: All clean, all tests pass. - -- [ ] **Step 3: Commit** - -```bash -git add crates/rpc/src/config/chain_notifier.rs crates/rpc/src/interest/filters.rs crates/rpc/src/interest/mod.rs crates/rpc/src/config/ctx.rs -git commit -m "refactor: move reorg ring buffer from FilterManager to ChainNotifier - -ChainNotifier now writes to an authoritative ring buffer before -broadcasting reorg events. Polling filters read it directly via -reorgs_since(), eliminating the broadcast::Receiver lag bug where -FilterReorgTask could silently drop reorg notifications. - -Deletes FilterReorgTask entirely. FilterManager::new now takes a -ChainNotifier instead of a broadcast::Sender." -``` - -- [ ] **Step 4: Final verification** - -Run full test suite: - -```bash -cargo t -p signet-rpc -cargo t -p signet-node -``` - -Expected: All tests pass. diff --git a/docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md b/docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md deleted file mode 100644 index ccb7e45..0000000 --- a/docs/superpowers/specs/2026-03-17-reorg-ring-buffer-and-log-type-design.md +++ /dev/null @@ -1,156 +0,0 @@ -# Reorg Ring Buffer and Removed Log Type - -**Date:** 2026-03-17 -**PR:** #98 (james/eng-1971) -**Reviewer feedback:** Fraser - -## Context - -PR #98 adds reorg handling to `eth_getFilterChanges`. Fraser's review -identified two improvements: - -1. The reorg ring buffer lives in `FilterManagerInner` and is fed by - `FilterReorgTask` via a `broadcast::Receiver`. If the receiver lags, - reorg notifications are silently dropped — a correctness bug. - -2. `RemovedBlock.logs` uses `alloy::primitives::Log`, discarding - `transaction_hash` and `log_index` that the Ethereum spec requires on - removed logs returned to clients. - -## Change 1: Move ring buffer into ChainNotifier - -### Current flow - -``` -ChainNotifier::send_reorg() - -> broadcast::Sender - -> FilterReorgTask (broadcast::Receiver) - -> FilterManagerInner::push_reorg() (ring buffer) -``` - -`FilterReorgTask` listens on a `broadcast::Receiver`, filters -for `ChainEvent::Reorg`, and appends to a `VecDeque` on -`FilterManagerInner`. If the receiver lags, reorgs are lost. - -### New flow - -``` -ChainNotifier::send_reorg() - -> write to ring buffer (authoritative) - -> broadcast to subscribers (push-based only) -``` - -Polling filters read the ring buffer directly from `ChainNotifier` at -poll time. The broadcast channel remains for push subscriptions only. - -### ChainNotifier changes - -Add fields: - -- `reorgs: Arc)>>>` - -The `Arc` wrapper is required because `ChainNotifier` derives `Clone` -and all existing fields are `Arc`-backed. - -Add constant: - -- `MAX_REORG_ENTRIES: usize = 25` - -Modify `send_reorg`: - -- Write `Arc` to the ring buffer (evicting oldest if - full) **before** broadcasting on the channel. The ring buffer write is - the authoritative store; the broadcast `Err` only means "no push - subscribers" and does not affect the buffer. Return type stays the same - for backward compatibility. - -Add method: - -- `reorgs_since(since: Instant) -> Vec>` — scans - the buffer under a brief read lock. Moved from `FilterManagerInner`. - -### FilterManager changes - -- `FilterManager::new` takes `ChainNotifier` instead of - `&broadcast::Sender`. -- Store `ChainNotifier` (cheap clone) alongside the `Arc`. -- Remove `reorgs` field, `push_reorg`, and `reorgs_since` from - `FilterManagerInner`. -- Delete `FilterReorgTask` entirely. -- Poll-time reorg reads call `chain_notifier.reorgs_since(...)` instead - of `self.inner.reorgs_since(...)`. - -### Call site (ctx.rs) - -`FilterManager::new` call changes from `&chain.notif_sender()` to -`chain.clone()`. - -### SubscriptionManager - -No changes. Continues using the broadcast channel for push delivery. - -## Change 2: Preserve rpc::types::Log in RemovedBlock - -### RemovedBlock type change - -```rust -// Before -pub logs: Vec - -// After -pub logs: Vec -``` - -### Producer (node.rs::notify_reorg) - -Stop stripping the rpc wrapper. Currently: - -```rust -d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect() -``` - -Becomes: - -```rust -d.receipts.into_iter().flat_map(|r| r.receipt.logs).collect() -``` - -The `ColdReceipt` already stores `Receipt` (where `RpcLog` is -re-exported from alloy as `alloy::rpc::types::Log`) with -`transaction_hash` and `log_index` populated. - -### Consumer: compute_removed_logs (filters.rs) - -Currently reconstructs `alloy::rpc::types::Log` with `transaction_hash: -None`. Instead, clone the log from `RemovedBlock`, set `removed: true`, -and preserve the existing fields. Filter matching calls -`filter.matches(&log.inner)` since `Filter::matches` takes -`&alloy::primitives::Log`. - -### Consumer: filter_reorg_for_sub (kind.rs) - -Same change — use the log directly instead of reconstructing with `None` -fields. Set `removed: true` on the cloned log. - -### Tests - -Update test helpers (`removed_block` in filters.rs, `test_removed_block` -in kind.rs) to construct `alloy::rpc::types::Log` instead of -`alloy::primitives::Log`. - -## Files changed - -| File | Change | -|------|--------| -| `crates/rpc/src/config/chain_notifier.rs` | Add ring buffer, `reorgs_since`, modify `send_reorg` | -| `crates/rpc/src/interest/filters.rs` | Remove `FilterReorgTask`, reorg fields; store `ChainNotifier`; update `compute_removed_logs`; update `FilterManager` doc comment (now one background worker, not two) | -| `crates/rpc/src/interest/mod.rs` | Change `RemovedBlock.logs` type; update module-level docs (remove `FilterReorgTask` description) | -| `crates/rpc/src/interest/kind.rs` | Update `filter_reorg_for_sub` to use rpc log directly | -| `crates/rpc/src/config/ctx.rs` | Pass `ChainNotifier` to `FilterManager::new` | -| `crates/node/src/node.rs` | Stop stripping log wrapper in `notify_reorg` | - -## Ordering - -Change 2 (log type) is independent of Change 1 (ring buffer move) and -can be implemented first. Change 1 depends on Change 2 only in that -test helpers need to agree on the log type.