From 5c3b1aaed736e626b882c2cea59060cc0935463f Mon Sep 17 00:00:00 2001 From: James Date: Tue, 17 Mar 2026 11:36:25 -0400 Subject: [PATCH] feat: handle reorgs in get_filter_changes with reorg watermark Add reorg tracking to RPC subscriptions and filters using a ring buffer design. Includes integration tests for multi-block reorgs, watermark min() coverage, and removed-log emission. Co-Authored-By: Claude Opus 4.6 (1M context) --- .gitignore | 1 + crates/node-tests/tests/reorg.rs | 555 ++++++++++++++++++++++++ crates/node/src/node.rs | 9 +- crates/rpc/src/config/chain_notifier.rs | 93 +++- crates/rpc/src/config/ctx.rs | 3 +- crates/rpc/src/eth/endpoints.rs | 42 +- crates/rpc/src/interest/filters.rs | 274 +++++++++++- crates/rpc/src/interest/kind.rs | 86 ++-- crates/rpc/src/interest/mod.rs | 30 +- 9 files changed, 1023 insertions(+), 70 deletions(-) create mode 100644 crates/node-tests/tests/reorg.rs diff --git a/.gitignore b/.gitignore index 208f6f5..faa6dc5 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ .DS_Store Cargo.lock .idea/ +docs/superpowers/ diff --git a/crates/node-tests/tests/reorg.rs b/crates/node-tests/tests/reorg.rs new file mode 100644 index 0000000..fbc7a94 --- /dev/null +++ b/crates/node-tests/tests/reorg.rs @@ -0,0 +1,555 @@ +use alloy::{ + primitives::{Address, B256, LogData}, + providers::Provider, + rpc::types::eth::{Filter, Log}, + sol_types::{SolCall, SolEvent}, +}; +use serial_test::serial; +use signet_node_tests::{HostBlockSpec, SignetTestContext, rpc_test, run_test, types::Counter}; +use std::time::Duration; + +const SOME_USER: Address = Address::repeat_byte(0x39); + +/// Helper: build and process an increment block using a host system +/// transaction (`simple_transact`). This avoids the transaction pool +/// entirely, which is important for reorg tests where we revert and +/// rebuild blocks. +/// +/// Returns the `HostBlockSpec` so it can later be reverted. +fn increment_block(ctx: &SignetTestContext, contract_address: Address) -> HostBlockSpec { + ctx.start_host_block().simple_transact( + ctx.addresses[1], + contract_address, + Counter::incrementCall::SELECTOR, + 0, + ) +} + +/// Process an increment block and return the spec for later revert. +async fn process_increment(ctx: &SignetTestContext, contract_address: Address) -> HostBlockSpec { + let block = increment_block(ctx, contract_address); + let for_revert = block.clone(); + ctx.process_block(block).await.unwrap(); + for_revert +} + +// --------------------------------------------------------------------------- +// 1. Block tags +// --------------------------------------------------------------------------- + +#[serial] +#[tokio::test] +async fn test_block_tags_reorg() { + run_test(|ctx| async move { + // Process two blocks via enter events. + let block1 = HostBlockSpec::new(ctx.constants()).enter_token( + SOME_USER, + 1000, + ctx.constants().host().tokens().usdc(), + ); + let block1_clone = block1.clone(); + ctx.process_block(block1).await.unwrap(); + + let block2 = HostBlockSpec::new(ctx.constants()).enter_token( + SOME_USER, + 2000, + ctx.constants().host().tokens().usdc(), + ); + let block2_clone = block2.clone(); + ctx.process_block(block2).await.unwrap(); + + assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 2); + + // Revert block 2. + ctx.revert_block(block2_clone).await.unwrap(); + assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 1); + + // Revert block 1. + ctx.revert_block(block1_clone).await.unwrap(); + assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 0); + + // Rebuild two new blocks. + let new_block1 = HostBlockSpec::new(ctx.constants()).enter_token( + SOME_USER, + 500, + ctx.constants().host().tokens().usdc(), + ); + ctx.process_block(new_block1).await.unwrap(); + assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 1); + + let new_block2 = HostBlockSpec::new(ctx.constants()).enter_token( + SOME_USER, + 600, + ctx.constants().host().tokens().usdc(), + ); + ctx.process_block(new_block2).await.unwrap(); + assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 2); + + // Verify the new block 2 is accessible. + let block = ctx.alloy_provider.get_block_by_number(2.into()).await.unwrap(); + assert!(block.is_some()); + }) + .await; +} + +// --------------------------------------------------------------------------- +// 2. Block filter + reorg +// --------------------------------------------------------------------------- + +#[serial] +#[tokio::test] +async fn test_block_filter_reorg() { + rpc_test(|ctx, contract| async move { + // Install a block filter (starts after block 1, where contract was deployed). + let filter_id = ctx.alloy_provider.new_block_filter().await.unwrap(); + + // Process block 2 (increment via system tx). + let _block2 = process_increment(&ctx, *contract.address()).await; + + // Poll: should have 1 block hash. + let hashes: Vec = ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap(); + assert_eq!(hashes.len(), 1); + + // Process block 3 (increment), keep clone for revert. + let block3 = process_increment(&ctx, *contract.address()).await; + + // Revert block 3. + ctx.revert_block(block3).await.unwrap(); + + // Poll: reorg watermark resets start to ancestor+1 (= 3), but latest + // is now 2, so start > latest -> empty. + let hashes: Vec = ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap(); + assert!(hashes.is_empty()); + + // Process a new block 3. + let _new_block3 = process_increment(&ctx, *contract.address()).await; + + // Poll: should return the new block 3 hash. + let hashes: Vec = ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap(); + assert_eq!(hashes.len(), 1); + let new_block3_hash = + ctx.alloy_provider.get_block_by_number(3.into()).await.unwrap().unwrap().hash(); + assert_eq!(hashes[0], new_block3_hash); + + ctx + }) + .await; +} + +// --------------------------------------------------------------------------- +// 3. Log filter + reorg +// --------------------------------------------------------------------------- + +#[serial] +#[tokio::test] +async fn test_log_filter_reorg() { + rpc_test(|ctx, contract| async move { + // Install a log filter on the Counter address. + let filter_id = ctx + .alloy_provider + .new_filter(&Filter::new().address(*contract.address())) + .await + .unwrap(); + + // Process block 2 (increment -> count=1). + let _block2 = process_increment(&ctx, *contract.address()).await; + + // Poll: 1 log. + let logs: Vec> = + ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap(); + assert_eq!(logs.len(), 1); + assert_eq!(logs[0].inner.topics()[0], Counter::Count::SIGNATURE_HASH); + assert_eq!(logs[0].inner.topics()[1], B256::with_last_byte(1)); + + // Process block 3 (increment -> count=2), clone for revert. + let block3 = process_increment(&ctx, *contract.address()).await; + + // Revert block 3. + ctx.revert_block(block3).await.unwrap(); + + // Poll: empty (watermark rewinds, but latest < start). + let logs: Vec> = + ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap(); + assert!(logs.is_empty()); + + // Process a new block 3 (increment -> count=2 again). + let _new_block3 = process_increment(&ctx, *contract.address()).await; + + // Poll: 1 log with count=2. + let logs: Vec> = + ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap(); + assert_eq!(logs.len(), 1); + assert_eq!(logs[0].inner.topics()[1], B256::with_last_byte(2)); + + ctx + }) + .await; +} + +// --------------------------------------------------------------------------- +// 4. Block subscription + reorg +// --------------------------------------------------------------------------- + +#[serial] +#[tokio::test] +async fn test_block_subscription_reorg() { + rpc_test(|ctx, contract| async move { + let mut sub = ctx.alloy_provider.subscribe_blocks().await.unwrap(); + + // Process block 2. + let block2 = process_increment(&ctx, *contract.address()).await; + + let header = + tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap(); + assert_eq!(header.number, 2); + + // Revert block 2. Block subs do not emit anything for reorgs. + ctx.revert_block(block2).await.unwrap(); + + // Process a new block 2. + let _new_block2 = process_increment(&ctx, *contract.address()).await; + + let header = + tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap(); + assert_eq!(header.number, 2); + + ctx + }) + .await; +} + +// --------------------------------------------------------------------------- +// 5. Log subscription + reorg (removed: true) +// --------------------------------------------------------------------------- + +#[serial] +#[tokio::test] +async fn test_log_subscription_reorg() { + rpc_test(|ctx, contract| async move { + let mut sub = ctx + .alloy_provider + .subscribe_logs(&Filter::new().address(*contract.address())) + .await + .unwrap(); + + // Process block 2 (increment -> count=1). + let block2 = process_increment(&ctx, *contract.address()).await; + + // Receive the normal log. + let log = tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap(); + assert!(!log.removed); + assert_eq!(log.inner.address, *contract.address()); + assert_eq!(log.inner.topics()[0], Counter::Count::SIGNATURE_HASH); + assert_eq!(log.inner.topics()[1], B256::with_last_byte(1)); + + // Revert block 2. + ctx.revert_block(block2).await.unwrap(); + + // Receive the removed log. + let removed_log = + tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap(); + assert!(removed_log.removed); + assert_eq!(removed_log.inner.address, *contract.address()); + assert_eq!(removed_log.inner.topics()[0], Counter::Count::SIGNATURE_HASH); + + // Process a new block 2 (increment -> count=1 again). + let _new_block2 = process_increment(&ctx, *contract.address()).await; + + // Receive the new log. + let new_log = + tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap(); + assert!(!new_log.removed); + assert_eq!(new_log.inner.address, *contract.address()); + assert_eq!(new_log.inner.topics()[1], B256::with_last_byte(1)); + + ctx + }) + .await; +} + +// --------------------------------------------------------------------------- +// 6. Log subscription filter selectivity during reorg +// --------------------------------------------------------------------------- + +#[serial] +#[tokio::test] +async fn test_log_subscription_reorg_filter_selectivity() { + rpc_test(|ctx, contract| async move { + // Subscribe to logs on the Counter address (should receive events). + let mut matching_sub = ctx + .alloy_provider + .subscribe_logs(&Filter::new().address(*contract.address())) + .await + .unwrap(); + + // Subscribe to logs on a non-matching address (should receive nothing). + let mut non_matching_sub = + ctx.alloy_provider.subscribe_logs(&Filter::new().address(SOME_USER)).await.unwrap(); + + // Process a block with an increment system tx. + let block2 = process_increment(&ctx, *contract.address()).await; + + // The matching subscription should receive the log. + let log = tokio::time::timeout(Duration::from_secs(5), matching_sub.recv()) + .await + .unwrap() + .unwrap(); + assert!(!log.removed); + assert_eq!(log.inner.address, *contract.address()); + + // The non-matching subscription should receive nothing. + let extra = tokio::time::timeout(Duration::from_millis(200), non_matching_sub.recv()).await; + assert!(extra.is_err(), "non-matching sub should not receive the log"); + + // Revert: only the matching subscription should get a removed log. + ctx.revert_block(block2).await.unwrap(); + + let removed = tokio::time::timeout(Duration::from_secs(5), matching_sub.recv()) + .await + .unwrap() + .unwrap(); + assert!(removed.removed); + assert_eq!(removed.inner.address, *contract.address()); + + // The non-matching subscription should still receive nothing. + let extra = tokio::time::timeout(Duration::from_millis(200), non_matching_sub.recv()).await; + assert!(extra.is_err(), "non-matching sub should not receive removed log"); + + ctx + }) + .await; +} + +// --------------------------------------------------------------------------- +// 7. No-regression: normal progression with filters and subscriptions +// --------------------------------------------------------------------------- + +#[serial] +#[tokio::test] +async fn test_no_regression_filters_and_subscriptions() { + rpc_test(|ctx, contract| async move { + // Install filters. + let block_filter = ctx.alloy_provider.new_block_filter().await.unwrap(); + let log_filter = ctx + .alloy_provider + .new_filter(&Filter::new().address(*contract.address())) + .await + .unwrap(); + + // Subscribe. + let mut block_sub = ctx.alloy_provider.subscribe_blocks().await.unwrap(); + let mut log_sub = ctx + .alloy_provider + .subscribe_logs(&Filter::new().address(*contract.address())) + .await + .unwrap(); + + // Process 2 increments via system transactions. + let _b2 = process_increment(&ctx, *contract.address()).await; + let _b3 = process_increment(&ctx, *contract.address()).await; + + // Poll block filter: 2 hashes. + let hashes: Vec = ctx.alloy_provider.get_filter_changes(block_filter).await.unwrap(); + assert_eq!(hashes.len(), 2); + + // Poll log filter: 2 logs with sequential counter values. + let logs: Vec> = + ctx.alloy_provider.get_filter_changes(log_filter).await.unwrap(); + assert_eq!(logs.len(), 2); + assert_eq!(logs[0].inner.topics()[1], B256::with_last_byte(1)); + assert_eq!(logs[1].inner.topics()[1], B256::with_last_byte(2)); + + // Receive 2 block headers. + for expected_num in [2, 3] { + let header = tokio::time::timeout(Duration::from_secs(5), block_sub.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(header.number, expected_num); + } + + // Receive 2 log events, all removed=false. + for expected_count in [1u8, 2] { + let log = tokio::time::timeout(Duration::from_secs(5), log_sub.recv()) + .await + .unwrap() + .unwrap(); + assert!(!log.removed); + assert_eq!(log.inner.address, *contract.address()); + assert_eq!(log.inner.topics()[1], B256::with_last_byte(expected_count)); + } + + ctx + }) + .await; +} + +// --------------------------------------------------------------------------- +// 8. Multi-block reorg with log filter +// --------------------------------------------------------------------------- + +#[serial] +#[tokio::test] +async fn test_multi_block_reorg_log_filter() { + rpc_test(|ctx, contract| async move { + let addr = *contract.address(); + + // Install a log filter on the Counter address. + let filter_id = ctx.alloy_provider.new_filter(&Filter::new().address(addr)).await.unwrap(); + + // Process blocks 2, 3, 4 (increment each → count 1, 2, 3). + let block2 = process_increment(&ctx, addr).await; + let block3 = process_increment(&ctx, addr).await; + let block4 = process_increment(&ctx, addr).await; + + // Poll: expect 3 logs. + let logs: Vec> = + ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap(); + assert_eq!(logs.len(), 3); + for (i, log) in logs.iter().enumerate() { + assert_eq!(log.inner.topics()[1], B256::with_last_byte(i as u8 + 1)); + } + + // Revert blocks 4, 3, 2 (back to block 1). + ctx.revert_block(block4).await.unwrap(); + ctx.revert_block(block3).await.unwrap(); + ctx.revert_block(block2).await.unwrap(); + + // Poll: removed logs for blocks 4, 3, 2 (reorg ring buffer). + // Watermark rewinds start to 2, latest=1 → no forward scan, + // but the removed logs are still returned. + let logs: Vec> = + ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap(); + assert_eq!(logs.len(), 3); + assert!(logs.iter().all(|l| l.removed)); + + // Rebuild blocks 2, 3 (increment each → count 1, 2). + let _new_b2 = process_increment(&ctx, addr).await; + let _new_b3 = process_increment(&ctx, addr).await; + + // Poll: expect 2 logs with count 1, 2. + let logs: Vec> = + ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap(); + assert_eq!(logs.len(), 2); + assert_eq!(logs[0].inner.topics()[1], B256::with_last_byte(1)); + assert_eq!(logs[1].inner.topics()[1], B256::with_last_byte(2)); + + ctx + }) + .await; +} + +// --------------------------------------------------------------------------- +// 9. Multi-block reorg with log subscription +// --------------------------------------------------------------------------- + +#[serial] +#[tokio::test] +async fn test_multi_block_reorg_log_subscription() { + rpc_test(|ctx, contract| async move { + let addr = *contract.address(); + let mut sub = + ctx.alloy_provider.subscribe_logs(&Filter::new().address(addr)).await.unwrap(); + + // Process blocks 2, 3 (increment each → count 1, 2). + let block2 = process_increment(&ctx, addr).await; + let block3 = process_increment(&ctx, addr).await; + + // Receive 2 normal logs. + for expected in [1u8, 2] { + let log = + tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap(); + assert!(!log.removed); + assert_eq!(log.inner.topics()[1], B256::with_last_byte(expected)); + } + + // Revert blocks 3, 2. + ctx.revert_block(block3).await.unwrap(); + ctx.revert_block(block2).await.unwrap(); + + // Receive 2 removed logs (one per reverted block). + for _ in 0..2 { + let log = + tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap(); + assert!(log.removed); + assert_eq!(log.inner.address, addr); + } + + // Rebuild block 2 (increment → count 1). + let _new_b2 = process_increment(&ctx, addr).await; + + // Receive the new normal log. + let log = tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap(); + assert!(!log.removed); + assert_eq!(log.inner.topics()[1], B256::with_last_byte(1)); + + ctx + }) + .await; +} + +// --------------------------------------------------------------------------- +// 10. Multiple reorgs between polls (watermark min path) +// --------------------------------------------------------------------------- + +#[serial] +#[tokio::test] +async fn test_multiple_reorgs_between_polls() { + rpc_test(|ctx, contract| async move { + let addr = *contract.address(); + + let filter_id = ctx.alloy_provider.new_filter(&Filter::new().address(addr)).await.unwrap(); + + // Process blocks 2, 3, 4, 5 (count 1, 2, 3, 4). + let _b2 = process_increment(&ctx, addr).await; + let _b3 = process_increment(&ctx, addr).await; + let b4 = process_increment(&ctx, addr).await; + let b5 = process_increment(&ctx, addr).await; + + // Poll to advance the filter cursor past block 5. + let logs: Vec> = + ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap(); + assert_eq!(logs.len(), 4); + + // --- Reorg 1: revert block 5 only (common_ancestor=4, watermark=4) --- + ctx.revert_block(b5).await.unwrap(); + + // Rebuild block 5 (count=5). + let new_b5 = process_increment(&ctx, addr).await; + + // --- Reorg 2 (deeper): revert blocks 5 AND 4 --- + // common_ancestor for block 5 revert = 4, for block 4 revert = 3 + // watermark = min(4, min(4, 3)) = 3 + ctx.revert_block(new_b5).await.unwrap(); + ctx.revert_block(b4).await.unwrap(); + + // DO NOT POLL between reorg 1 and reorg 2 — this is the key. + // The filter now has watermark=3 from the min() of both reorgs. + + // Poll: removed logs from both reorgs (block 5 from reorg 1, + // block 4 from reorg 3). Reorg 2's block 5 is skipped because + // it was never delivered (number >= snapshot after rewind). + let logs: Vec> = + ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap(); + assert_eq!(logs.len(), 2); + assert!(logs.iter().all(|l| l.removed)); + + // Rebuild blocks 4, 5 (count 3, 4). + let _new_b4 = process_increment(&ctx, addr).await; + let _new_b5 = process_increment(&ctx, addr).await; + + // Poll: 2 logs from blocks 4 and 5 (count 3, 4). + // After reverting blocks 4+5, count was back to 2. Rebuilding + // increments to 3 then 4. This proves the deeper watermark (3) + // was kept — if only the shallow watermark (4) had been kept, + // we'd see 1 log from block 5 only. + let logs: Vec> = + ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap(); + assert_eq!(logs.len(), 2); + assert_eq!(logs[0].inner.topics()[1], B256::with_last_byte(3)); + assert_eq!(logs[1].inner.topics()[1], B256::with_last_byte(4)); + + ctx + }) + .await; +} diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index f8e2f95..1be52e0 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -368,10 +368,11 @@ where let removed_blocks = drained .into_iter() .map(|d| { - let header = d.header.into_inner(); - let logs = - d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect(); - RemovedBlock { header, logs } + let number = d.header.number(); + let hash = d.header.hash(); + let timestamp = d.header.timestamp(); + let logs = d.receipts.into_iter().flat_map(|r| r.receipt.logs).collect(); + RemovedBlock { number, hash, timestamp, logs } }) .collect(); let notif = ReorgNotification { common_ancestor, removed_blocks }; diff --git a/crates/rpc/src/config/chain_notifier.rs b/crates/rpc/src/config/chain_notifier.rs index 9d719ab..94d1b30 100644 --- a/crates/rpc/src/config/chain_notifier.rs +++ b/crates/rpc/src/config/chain_notifier.rs @@ -4,8 +4,21 @@ use crate::{ config::resolve::BlockTags, interest::{ChainEvent, NewBlockNotification, ReorgNotification}, }; +use std::{ + collections::VecDeque, + sync::{Arc, RwLock}, + time::Instant, +}; use tokio::sync::broadcast; +/// Maximum number of reorg notifications retained in the ring buffer. +/// At most one reorg per 12 seconds and a 5-minute stale filter TTL +/// gives a worst case of 25 entries. +const MAX_REORG_ENTRIES: usize = 25; + +/// Timestamped reorg ring buffer. +type ReorgBuffer = VecDeque<(Instant, Arc)>; + /// Shared chain state between the node and RPC layer. /// /// Combines block tag tracking and chain event notification into a single @@ -27,6 +40,7 @@ use tokio::sync::broadcast; pub struct ChainNotifier { tags: BlockTags, notif_tx: broadcast::Sender, + reorgs: Arc>, } impl ChainNotifier { @@ -35,7 +49,7 @@ impl ChainNotifier { pub fn new(channel_capacity: usize) -> Self { let tags = BlockTags::new(0, 0, 0); let (notif_tx, _) = broadcast::channel(channel_capacity); - Self { tags, notif_tx } + Self { tags, notif_tx, reorgs: Arc::new(RwLock::new(VecDeque::new())) } } /// Access the block tags. @@ -57,13 +71,21 @@ impl ChainNotifier { /// Send a reorg notification. /// - /// Returns `Ok(receiver_count)` or `Err` if there are no active - /// receivers (which is not usually an error condition). + /// Writes the notification to the authoritative ring buffer before + /// broadcasting to push subscribers. The broadcast `Err` only means + /// "no active push subscribers" — the ring buffer is unaffected. #[allow(clippy::result_large_err)] pub fn send_reorg( &self, notif: ReorgNotification, ) -> Result> { + { + let mut buf = self.reorgs.write().unwrap(); + if buf.len() >= MAX_REORG_ENTRIES { + buf.pop_front(); + } + buf.push_back((Instant::now(), Arc::new(notif.clone()))); + } self.send_event(ChainEvent::Reorg(notif)) } @@ -81,6 +103,20 @@ impl ChainNotifier { self.notif_tx.subscribe() } + /// Return all reorg notifications received after `since`. + /// + /// Clones the [`Arc`]s under a brief read lock. Used by polling + /// filters at poll time to compute removed logs. + pub fn reorgs_since(&self, since: Instant) -> Vec> { + self.reorgs + .read() + .unwrap() + .iter() + .filter(|(received_at, _)| *received_at > since) + .map(|(_, reorg)| Arc::clone(reorg)) + .collect() + } + /// Get a clone of the broadcast sender. /// /// Used by the subscription manager to create its own receiver. @@ -88,3 +124,54 @@ impl ChainNotifier { self.notif_tx.clone() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::interest::ReorgNotification; + use std::time::Duration; + + fn reorg_notification(ancestor: u64) -> ReorgNotification { + ReorgNotification { common_ancestor: ancestor, removed_blocks: vec![] } + } + + #[test] + fn push_reorg_evicts_oldest() { + let notifier = ChainNotifier::new(16); + for i in 0..MAX_REORG_ENTRIES + 5 { + notifier.send_reorg(reorg_notification(i as u64)).ok(); + } + let buf = notifier.reorgs.read().unwrap(); + assert_eq!(buf.len(), MAX_REORG_ENTRIES); + assert_eq!(buf[0].1.common_ancestor, 5); + } + + #[test] + fn reorgs_since_filters_by_time() { + let notifier = ChainNotifier::new(16); + let before = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + notifier.send_reorg(reorg_notification(10)).ok(); + let mid = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + notifier.send_reorg(reorg_notification(8)).ok(); + + let all = notifier.reorgs_since(before); + assert_eq!(all.len(), 2); + + let recent = notifier.reorgs_since(mid); + assert_eq!(recent.len(), 1); + assert_eq!(recent[0].common_ancestor, 8); + } + + #[test] + fn reorgs_since_skips_pre_creation_reorgs() { + let notifier = ChainNotifier::new(16); + notifier.send_reorg(reorg_notification(5)).ok(); + std::thread::sleep(Duration::from_millis(5)); + + let after = Instant::now(); + let reorgs = notifier.reorgs_since(after); + assert!(reorgs.is_empty()); + } +} diff --git a/crates/rpc/src/config/ctx.rs b/crates/rpc/src/config/ctx.rs index 5c933ef..27b70f4 100644 --- a/crates/rpc/src/config/ctx.rs +++ b/crates/rpc/src/config/ctx.rs @@ -101,7 +101,8 @@ impl StorageRpcCtx { config: StorageRpcConfig, ) -> Self { let tracing_semaphore = Arc::new(Semaphore::new(config.max_tracing_requests)); - let filter_manager = FilterManager::new(config.stale_filter_ttl, config.stale_filter_ttl); + let filter_manager = + FilterManager::new(chain.clone(), config.stale_filter_ttl, config.stale_filter_ttl); let sub_manager = SubscriptionManager::new(chain.notif_sender(), config.stale_filter_ttl); let gas_cache = GasOracleCache::new(); Self { diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index 75d9479..6e818e2 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -32,7 +32,7 @@ use revm_inspectors::access_list::AccessListInspector; use serde::Serialize; use signet_cold::{HeaderSpecifier, ReceiptSpecifier}; use signet_hot::{HistoryRead, HotKv, db::HotDbRead, model::HotKvRead}; -use tracing::{Instrument, debug, trace_span}; +use tracing::{Instrument, debug, trace, trace_span}; use trevm::{ EstimationResult, revm::context::result::ExecutionResult, revm::database::DBErrorMarker, }; @@ -1078,12 +1078,38 @@ where let fm = ctx.filter_manager(); let mut entry = fm.get_mut(id).ok_or_else(|| format!("filter not found: {id}"))?; + // Scan the global reorg ring buffer for notifications received + // since this filter's last poll, then lazily compute removed logs + // and rewind `next_start_block`. + let reorgs = fm.reorgs_since(entry.last_poll_time()); + let removed = entry.compute_removed_logs(&reorgs); + if !removed.is_empty() { + trace!(count = removed.len(), "computed removed logs from reorg ring buffer"); + } + let latest = ctx.tags().latest(); let start = entry.next_start_block(); + // Implicit reorg detection: if latest has moved backward past our + // window, a reorg occurred that we missed (e.g. broadcast lagged). + // Return any removed logs we do have, then reset. + if latest + 1 < start { + trace!(latest, start, "implicit reorg detected, resetting filter"); + entry.touch_poll_time(); + return Ok(if removed.is_empty() { + entry.empty_output() + } else { + FilterOutput::from(removed) + }); + } + if start > latest { - entry.mark_polled(latest); - return Ok(entry.empty_output()); + entry.touch_poll_time(); + return Ok(if removed.is_empty() { + entry.empty_output() + } else { + FilterOutput::from(removed) + }); } let cold = ctx.cold(); @@ -1110,7 +1136,15 @@ where let stream = cold.stream_logs(resolved, max_logs, deadline).await.map_err(|e| e.to_string())?; - let logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?; + let mut logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?; + + // Prepend removed logs so the client sees removals before + // the replacement data. + if !removed.is_empty() { + let mut combined = removed; + combined.append(&mut logs); + logs = combined; + } entry.mark_polled(latest); Ok(FilterOutput::from(logs)) diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index ae09367..3f97ffd 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -1,9 +1,12 @@ //! Filter management for `eth_newFilter` / `eth_getFilterChanges`. -use crate::interest::{InterestKind, buffer::EventBuffer}; +use crate::{ + config::ChainNotifier, + interest::{InterestKind, ReorgNotification, buffer::EventBuffer}, +}; use alloy::{ primitives::{B256, U64}, - rpc::types::Filter, + rpc::types::{Filter, Log}, }; use dashmap::{DashMap, mapref::one::RefMut}; use std::{ @@ -24,7 +27,11 @@ pub(crate) type FilterOutput = EventBuffer; /// /// Records the filter details, the [`Instant`] at which the filter was last /// polled, and the first block whose contents should be considered. -#[derive(Debug, Clone, PartialEq, Eq)] +/// +/// `last_poll_time` doubles as the cursor into the global reorg ring +/// buffer — at poll time the filter scans for reorgs received after +/// this instant. +#[derive(Debug, Clone)] pub(crate) struct ActiveFilter { next_start_block: u64, last_poll_time: Instant, @@ -38,7 +45,7 @@ impl core::fmt::Display for ActiveFilter { "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?} }}", self.next_start_block, self.last_poll_time.elapsed().as_millis(), - self.kind + self.kind, ) } } @@ -60,11 +67,25 @@ impl ActiveFilter { self.last_poll_time = Instant::now(); } + /// Update the poll timestamp without advancing `next_start_block`. + /// + /// Used on early-return paths (implicit reorg, no new blocks) where + /// `compute_removed_logs` has already rewound `next_start_block` and + /// we must preserve that position for the next forward scan. + pub(crate) fn touch_poll_time(&mut self) { + self.last_poll_time = Instant::now(); + } + /// Get the next start block for the filter. pub(crate) const fn next_start_block(&self) -> u64 { self.next_start_block } + /// Get the instant at which the filter was last polled (or created). + pub(crate) const fn last_poll_time(&self) -> Instant { + self.last_poll_time + } + /// Get the duration since the filter was last polled. pub(crate) fn time_since_last_poll(&self) -> Duration { self.last_poll_time.elapsed() @@ -74,6 +95,49 @@ impl ActiveFilter { pub(crate) const fn empty_output(&self) -> FilterOutput { self.kind.empty_output() } + + /// Compute removed logs from a sequence of reorg notifications. + /// + /// Walks the reorgs in order, computing snapshots lazily from the + /// filter's current [`next_start_block`]. For each reorg, only logs + /// from blocks the filter had already delivered (block number below + /// the snapshot) are included. + /// + /// Updates `next_start_block` to the rewound value so the subsequent + /// forward scan starts from the correct position. + /// + /// Block filters return an empty vec — the Ethereum JSON-RPC spec + /// does not define `removed` semantics for block filters. + /// + /// [`next_start_block`]: Self::next_start_block + pub(crate) fn compute_removed_logs(&mut self, reorgs: &[Arc]) -> Vec { + let Some(filter) = self.kind.as_filter() else { + for reorg in reorgs { + self.next_start_block = self.next_start_block.min(reorg.common_ancestor + 1); + } + return Vec::new(); + }; + + let mut removed = Vec::new(); + for notification in reorgs { + let snapshot = self.next_start_block; + self.next_start_block = self.next_start_block.min(notification.common_ancestor + 1); + for block in ¬ification.removed_blocks { + if block.number >= snapshot { + continue; + } + for log in &block.logs { + if !filter.matches(&log.inner) { + continue; + } + let mut log = log.clone(); + log.removed = true; + removed.push(log); + } + } + } + removed + } } /// Inner logic for [`FilterManager`]. @@ -102,10 +166,14 @@ impl FilterManagerInner { fn install(&self, current_block: u64, kind: InterestKind) -> FilterId { let id = self.next_id(); - let next_start_block = current_block + 1; - let _ = self - .filters - .insert(id, ActiveFilter { next_start_block, last_poll_time: Instant::now(), kind }); + let _ = self.filters.insert( + id, + ActiveFilter { + next_start_block: current_block + 1, + last_poll_time: Instant::now(), + kind, + }, + ); id } @@ -136,22 +204,39 @@ impl FilterManagerInner { /// Filters are stored in a [`DashMap`] that maps filter IDs to active filters. /// Filter IDs are assigned sequentially, starting from 1. /// -/// Calling [`Self::new`] spawns a task that periodically cleans stale filters. -/// This task runs on a separate thread to avoid [`DashMap::retain`] deadlock. -/// See [`DashMap`] documentation for more information. +/// Reorg notifications are read from the [`ChainNotifier`]'s ring buffer +/// at poll time. Filters compute their removed logs lazily by scanning +/// for entries received since the last poll. +/// +/// Calling [`Self::new`] spawns an OS thread that periodically cleans +/// stale filters (using a separate thread to avoid [`DashMap::retain`] +/// deadlock). The worker holds a [`Weak`] reference and self-terminates +/// when the manager is dropped. #[derive(Debug, Clone)] pub(crate) struct FilterManager { inner: Arc, + chain: ChainNotifier, } impl FilterManager { - /// Create a new filter manager. Spawn a task to clean stale filters. - pub(crate) fn new(clean_interval: Duration, age_limit: Duration) -> Self { + /// Create a new filter manager. + /// + /// Spawns a cleanup thread for stale filters. Reorg notifications + /// are read directly from the [`ChainNotifier`]'s ring buffer at + /// poll time — no background listener is needed. + pub(crate) fn new(chain: ChainNotifier, clean_interval: Duration, age_limit: Duration) -> Self { let inner = Arc::new(FilterManagerInner::new()); - let manager = Self { inner }; + let manager = Self { inner, chain }; FilterCleanTask::new(Arc::downgrade(&manager.inner), clean_interval, age_limit).spawn(); manager } + + /// Return all reorg notifications received after `since`. + /// + /// Delegates to [`ChainNotifier::reorgs_since`]. + pub(crate) fn reorgs_since(&self, since: Instant) -> Vec> { + self.chain.reorgs_since(since) + } } impl std::ops::Deref for FilterManager { @@ -195,6 +280,167 @@ impl FilterCleanTask { } } +#[cfg(test)] +mod tests { + use super::*; + use crate::interest::{InterestKind, RemovedBlock}; + use alloy::primitives::{Address, B256, Bytes, LogData, address, b256}; + + fn block_filter(start: u64) -> ActiveFilter { + ActiveFilter { + next_start_block: start, + last_poll_time: Instant::now(), + kind: InterestKind::Block, + } + } + + fn log_filter(start: u64, addr: Address) -> ActiveFilter { + ActiveFilter { + next_start_block: start, + last_poll_time: Instant::now(), + kind: InterestKind::Log(Box::new(Filter::new().address(addr))), + } + } + + fn test_log(addr: Address, number: u64, hash: B256) -> Log { + Log { + inner: alloy::primitives::Log { + address: addr, + data: LogData::new_unchecked(vec![], Bytes::new()), + }, + block_hash: Some(hash), + block_number: Some(number), + block_timestamp: Some(1_000_000 + number), + transaction_hash: Some(B256::ZERO), + transaction_index: Some(0), + log_index: Some(0), + removed: false, + } + } + + fn reorg_notification(ancestor: u64, removed: Vec) -> ReorgNotification { + ReorgNotification { common_ancestor: ancestor, removed_blocks: removed } + } + + fn removed_block(number: u64, logs: Vec) -> RemovedBlock { + RemovedBlock { + number, + hash: b256!("0x0000000000000000000000000000000000000000000000000000000000000001"), + timestamp: 1_000_000 + number, + logs, + } + } + + #[test] + fn compute_removed_logs_rewinds_start_block() { + let mut f = block_filter(10); + let reorg = Arc::new(reorg_notification(7, vec![])); + + f.compute_removed_logs(&[reorg]); + + assert_eq!(f.next_start_block, 8); + } + + #[test] + fn compute_removed_logs_matches_removed() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let hash = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let mut f = log_filter(11, addr); + + let reorg = Arc::new(reorg_notification( + 8, + vec![ + removed_block(9, vec![test_log(addr, 9, hash)]), + removed_block(10, vec![test_log(addr, 10, hash)]), + ], + )); + + let removed = f.compute_removed_logs(&[reorg]); + + assert_eq!(removed.len(), 2); + assert!(removed.iter().all(|l| l.removed)); + assert!(removed.iter().all(|l| l.inner.address == addr)); + // Verify RPC metadata is preserved through clone-and-set path. + assert!(removed.iter().all(|l| l.transaction_hash == Some(B256::ZERO))); + assert!(removed.iter().all(|l| l.log_index == Some(0))); + } + + #[test] + fn compute_removed_logs_skips_undelivered_blocks() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let hash = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + // Filter has only delivered up to block 10 (next_start = 11). + let mut f = log_filter(11, addr); + + // Reorg removes blocks 10, 11, 12. Only block 10 was delivered. + let reorg = Arc::new(reorg_notification( + 9, + vec![ + removed_block(10, vec![test_log(addr, 10, hash)]), + removed_block(11, vec![test_log(addr, 11, hash)]), + removed_block(12, vec![test_log(addr, 12, hash)]), + ], + )); + + let removed = f.compute_removed_logs(&[reorg]); + + assert_eq!(removed.len(), 1); + assert_eq!(removed[0].block_number, Some(10)); + } + + #[test] + fn compute_removed_logs_cascading() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let hash = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + // Filter has delivered up to block 100 (next_start = 101). + let mut f = log_filter(101, addr); + + // Reorg A: rewinds to 98, removes 99-100. + let reorg_a = Arc::new(reorg_notification( + 98, + vec![ + removed_block(99, vec![test_log(addr, 99, hash)]), + removed_block(100, vec![test_log(addr, 100, hash)]), + ], + )); + + // Reorg B: rewinds to 95, removes 96-103. + let reorg_b = Arc::new(reorg_notification( + 95, + vec![ + removed_block(96, vec![test_log(addr, 96, hash)]), + removed_block(97, vec![test_log(addr, 97, hash)]), + removed_block(98, vec![test_log(addr, 98, hash)]), + removed_block(99, vec![test_log(addr, 99, hash)]), + removed_block(100, vec![test_log(addr, 100, hash)]), + removed_block(101, vec![test_log(addr, 101, hash)]), + removed_block(102, vec![test_log(addr, 102, hash)]), + removed_block(103, vec![test_log(addr, 103, hash)]), + ], + )); + + let removed = f.compute_removed_logs(&[reorg_a, reorg_b]); + + // Reorg A: snapshot=101, blocks 99-100 < 101 → 2 logs. + // Reorg B: snapshot=99, blocks 96-98 < 99 → 3 logs. + // Blocks 99-103 from reorg B are >= 99 → skipped. + assert_eq!(removed.len(), 5); + assert_eq!(f.next_start_block, 96); + } + + #[test] + fn compute_removed_logs_block_filter_empty() { + let mut f = block_filter(10); + let reorg = Arc::new(reorg_notification(5, vec![removed_block(6, vec![])])); + + let removed = f.compute_removed_logs(&[reorg]); + + assert!(removed.is_empty()); + // But the rewind still happened. + assert_eq!(f.next_start_block, 6); + } +} + // Some code in this file has been copied and modified from reth // // The original license is included below: diff --git a/crates/rpc/src/interest/kind.rs b/crates/rpc/src/interest/kind.rs index f0ed907..cb09c74 100644 --- a/crates/rpc/src/interest/kind.rs +++ b/crates/rpc/src/interest/kind.rs @@ -109,21 +109,13 @@ impl InterestKind { let logs: VecDeque = reorg .removed_blocks - .into_iter() - .flat_map(|block| { - let block_hash = block.header.hash_slow(); - let block_number = block.header.number; - let block_timestamp = block.header.timestamp; - block.logs.into_iter().filter(move |log| filter.matches(log)).map(move |log| Log { - inner: log, - block_hash: Some(block_hash), - block_number: Some(block_number), - block_timestamp: Some(block_timestamp), - transaction_hash: None, - transaction_index: None, - log_index: None, - removed: true, - }) + .iter() + .flat_map(|block| block.logs.iter()) + .filter(|log| filter.matches(&log.inner)) + .map(|log| { + let mut log = log.clone(); + log.removed = true; + log }) .collect(); @@ -134,37 +126,51 @@ impl InterestKind { #[cfg(test)] mod tests { use super::*; - use crate::interest::RemovedBlock; use alloy::primitives::{Address, B256, Bytes, LogData, address, b256}; - fn test_log(addr: Address, topic: B256) -> alloy::primitives::Log { - alloy::primitives::Log { - address: addr, - data: LogData::new_unchecked(vec![topic], Bytes::new()), + fn test_log(addr: Address, topic: B256, number: u64, hash: B256) -> Log { + Log { + inner: alloy::primitives::Log { + address: addr, + data: LogData::new_unchecked(vec![topic], Bytes::new()), + }, + block_hash: Some(hash), + block_number: Some(number), + block_timestamp: Some(1_000_000 + number), + transaction_hash: Some(B256::ZERO), + transaction_index: Some(0), + log_index: Some(0), + removed: false, } } - fn test_header(number: u64) -> alloy::consensus::Header { - alloy::consensus::Header { number, timestamp: 1_000_000 + number, ..Default::default() } - } - fn test_filter(addr: Address) -> Filter { Filter::new().address(addr) } + fn test_removed_block( + number: u64, + hash: B256, + logs: Vec, + ) -> crate::interest::RemovedBlock { + crate::interest::RemovedBlock { number, hash, timestamp: 1_000_000 + number, logs } + } + #[test] fn filter_reorg_for_sub_matches_logs() { let addr = address!("0x0000000000000000000000000000000000000001"); let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000099"); - let header = test_header(11); let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { - header: header.clone(), - logs: vec![test_log(addr, topic)], - }], + removed_blocks: vec![test_removed_block( + 11, + block_hash, + vec![test_log(addr, topic, 11, block_hash)], + )], }; let buf = kind.filter_reorg_for_sub(reorg); @@ -173,9 +179,9 @@ mod tests { assert_eq!(logs.len(), 1); assert!(logs[0].removed); assert_eq!(logs[0].inner.address, addr); - assert_eq!(logs[0].block_hash.unwrap(), header.hash_slow()); - assert_eq!(logs[0].block_number.unwrap(), 11); - assert_eq!(logs[0].block_timestamp.unwrap(), 1_000_011); + assert_eq!(logs[0].block_hash, Some(block_hash)); + assert_eq!(logs[0].block_number, Some(11)); + assert_eq!(logs[0].block_timestamp, Some(1_000_011)); } #[test] @@ -183,14 +189,17 @@ mod tests { let addr = address!("0x0000000000000000000000000000000000000001"); let other = address!("0x0000000000000000000000000000000000000002"); let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000099"); let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { - header: test_header(11), - logs: vec![test_log(other, topic)], - }], + removed_blocks: vec![test_removed_block( + 11, + block_hash, + vec![test_log(other, topic, 11, block_hash)], + )], }; let buf = kind.filter_reorg_for_sub(reorg); @@ -200,9 +209,12 @@ mod tests { #[test] fn filter_reorg_for_sub_block_returns_empty() { + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { header: test_header(11), logs: vec![] }], + removed_blocks: vec![test_removed_block(11, block_hash, vec![])], }; let buf = InterestKind::Block.filter_reorg_for_sub(reorg); diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index 3ad2620..ce5472f 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -25,12 +25,18 @@ //! reference to the `Arc`, so they self-terminate once all //! strong references are dropped. //! -//! OS threads are used (rather than tokio tasks) because +//! OS threads are used for cleanup (rather than tokio tasks) because //! [`DashMap::retain`] can deadlock if called from an async context //! that also holds a `DashMap` read guard on the same shard. Running //! cleanup on a dedicated OS thread ensures the retain lock is never //! contended with an in-flight async handler. //! +//! [`FilterManager`] reads reorg notifications directly from the +//! [`ChainNotifier`]'s ring buffer at poll time — no background +//! listener task is needed. +//! +//! [`ChainNotifier`]: crate::ChainNotifier +//! //! [`Weak`]: std::sync::Weak //! [`DashMap`]: dashmap::DashMap //! [`DashMap::retain`]: dashmap::DashMap::retain @@ -69,13 +75,23 @@ pub enum ChainEvent { Reorg(ReorgNotification), } -/// Data from a single block removed during a chain reorganization. +/// A block that was removed during a chain reorganization. #[derive(Debug, Clone)] pub struct RemovedBlock { - /// The header of the removed block. - pub header: alloy::consensus::Header, - /// Logs emitted by the removed block. - pub logs: Vec, + /// The block number. + pub number: u64, + /// The block hash. + pub hash: alloy::primitives::B256, + /// The block timestamp. + pub timestamp: u64, + /// Logs emitted in the removed block. + /// + /// Uses the RPC log type so that `transaction_hash` and `log_index` + /// from the original receipts can be preserved, as required by the + /// Ethereum JSON-RPC spec for removed logs. These fields are + /// populated when cold storage has indexed the block; otherwise the + /// vec may be empty. + pub logs: Vec, } /// Notification sent when a chain reorganization is detected. @@ -83,6 +99,6 @@ pub struct RemovedBlock { pub struct ReorgNotification { /// The block number of the common ancestor (last block still valid). pub common_ancestor: u64, - /// Blocks removed by the reorg, each carrying its header and logs. + /// The blocks that were removed, ordered by block number. pub removed_blocks: Vec, }