Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
.DS_Store
Cargo.lock
.idea/
docs/superpowers/
9 changes: 5 additions & 4 deletions crates/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,11 @@ where
let removed_blocks = drained
.into_iter()
.map(|d| {
let header = d.header.into_inner();
let logs =
d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect();
RemovedBlock { header, logs }
let number = d.header.number();
let hash = d.header.hash();
let timestamp = d.header.timestamp();
let logs = d.receipts.into_iter().flat_map(|r| r.receipt.logs).collect();
RemovedBlock { number, hash, timestamp, logs }
})
.collect();
let notif = ReorgNotification { common_ancestor, removed_blocks };
Expand Down
93 changes: 90 additions & 3 deletions crates/rpc/src/config/chain_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,21 @@ use crate::{
config::resolve::BlockTags,
interest::{ChainEvent, NewBlockNotification, ReorgNotification},
};
use std::{
collections::VecDeque,
sync::{Arc, RwLock},
time::Instant,
};
use tokio::sync::broadcast;

/// Maximum number of reorg notifications retained in the ring buffer.
/// At most one reorg per 12 seconds and a 5-minute stale filter TTL
/// gives a worst case of 25 entries.
const MAX_REORG_ENTRIES: usize = 25;

/// Timestamped reorg ring buffer.
type ReorgBuffer = VecDeque<(Instant, Arc<ReorgNotification>)>;

/// Shared chain state between the node and RPC layer.
///
/// Combines block tag tracking and chain event notification into a single
Expand All @@ -27,6 +40,7 @@ use tokio::sync::broadcast;
pub struct ChainNotifier {
tags: BlockTags,
notif_tx: broadcast::Sender<ChainEvent>,
reorgs: Arc<RwLock<ReorgBuffer>>,
}

impl ChainNotifier {
Expand All @@ -35,7 +49,7 @@ impl ChainNotifier {
pub fn new(channel_capacity: usize) -> Self {
let tags = BlockTags::new(0, 0, 0);
let (notif_tx, _) = broadcast::channel(channel_capacity);
Self { tags, notif_tx }
Self { tags, notif_tx, reorgs: Arc::new(RwLock::new(VecDeque::new())) }
}

/// Access the block tags.
Expand All @@ -57,13 +71,21 @@ impl ChainNotifier {

/// Send a reorg notification.
///
/// Returns `Ok(receiver_count)` or `Err` if there are no active
/// receivers (which is not usually an error condition).
/// Writes the notification to the authoritative ring buffer before
/// broadcasting to push subscribers. The broadcast `Err` only means
/// "no active push subscribers" — the ring buffer is unaffected.
#[allow(clippy::result_large_err)]
pub fn send_reorg(
&self,
notif: ReorgNotification,
) -> Result<usize, broadcast::error::SendError<ChainEvent>> {
{
let mut buf = self.reorgs.write().unwrap();
if buf.len() >= MAX_REORG_ENTRIES {
buf.pop_front();
}
buf.push_back((Instant::now(), Arc::new(notif.clone())));
}
self.send_event(ChainEvent::Reorg(notif))
}

Expand All @@ -81,10 +103,75 @@ impl ChainNotifier {
self.notif_tx.subscribe()
}

/// Return all reorg notifications received after `since`.
///
/// Clones the [`Arc`]s under a brief read lock. Used by polling
/// filters at poll time to compute removed logs.
pub fn reorgs_since(&self, since: Instant) -> Vec<Arc<ReorgNotification>> {
self.reorgs
.read()
.unwrap()
.iter()
.filter(|(received_at, _)| *received_at > since)
.map(|(_, reorg)| Arc::clone(reorg))
.collect()
}

/// Get a clone of the broadcast sender.
///
/// Used by the subscription manager to create its own receiver.
pub fn notif_sender(&self) -> broadcast::Sender<ChainEvent> {
self.notif_tx.clone()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::interest::ReorgNotification;
use std::time::Duration;

fn reorg_notification(ancestor: u64) -> ReorgNotification {
ReorgNotification { common_ancestor: ancestor, removed_blocks: vec![] }
}

#[test]
fn push_reorg_evicts_oldest() {
let notifier = ChainNotifier::new(16);
for i in 0..MAX_REORG_ENTRIES + 5 {
notifier.send_reorg(reorg_notification(i as u64)).ok();
}
let buf = notifier.reorgs.read().unwrap();
assert_eq!(buf.len(), MAX_REORG_ENTRIES);
assert_eq!(buf[0].1.common_ancestor, 5);
}

#[test]
fn reorgs_since_filters_by_time() {
let notifier = ChainNotifier::new(16);
let before = Instant::now();
std::thread::sleep(Duration::from_millis(5));
notifier.send_reorg(reorg_notification(10)).ok();
let mid = Instant::now();
std::thread::sleep(Duration::from_millis(5));
notifier.send_reorg(reorg_notification(8)).ok();

let all = notifier.reorgs_since(before);
assert_eq!(all.len(), 2);

let recent = notifier.reorgs_since(mid);
assert_eq!(recent.len(), 1);
assert_eq!(recent[0].common_ancestor, 8);
}

#[test]
fn reorgs_since_skips_pre_creation_reorgs() {
let notifier = ChainNotifier::new(16);
notifier.send_reorg(reorg_notification(5)).ok();
std::thread::sleep(Duration::from_millis(5));

let after = Instant::now();
let reorgs = notifier.reorgs_since(after);
assert!(reorgs.is_empty());
}
}
3 changes: 2 additions & 1 deletion crates/rpc/src/config/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ impl<H: HotKv> StorageRpcCtx<H> {
config: StorageRpcConfig,
) -> Self {
let tracing_semaphore = Arc::new(Semaphore::new(config.max_tracing_requests));
let filter_manager = FilterManager::new(config.stale_filter_ttl, config.stale_filter_ttl);
let filter_manager =
FilterManager::new(chain.clone(), config.stale_filter_ttl, config.stale_filter_ttl);
let sub_manager = SubscriptionManager::new(chain.notif_sender(), config.stale_filter_ttl);
let gas_cache = GasOracleCache::new();
Self {
Expand Down
42 changes: 38 additions & 4 deletions crates/rpc/src/eth/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use revm_inspectors::access_list::AccessListInspector;
use serde::Serialize;
use signet_cold::{HeaderSpecifier, ReceiptSpecifier};
use signet_hot::{HistoryRead, HotKv, db::HotDbRead, model::HotKvRead};
use tracing::{Instrument, debug, trace_span};
use tracing::{Instrument, debug, trace, trace_span};
use trevm::{
EstimationResult, revm::context::result::ExecutionResult, revm::database::DBErrorMarker,
};
Expand Down Expand Up @@ -1078,12 +1078,38 @@ where
let fm = ctx.filter_manager();
let mut entry = fm.get_mut(id).ok_or_else(|| format!("filter not found: {id}"))?;

// Scan the global reorg ring buffer for notifications received
// since this filter's last poll, then lazily compute removed logs
// and rewind `next_start_block`.
let reorgs = fm.reorgs_since(entry.last_poll_time());
let removed = entry.compute_removed_logs(&reorgs);
if !removed.is_empty() {
trace!(count = removed.len(), "computed removed logs from reorg ring buffer");
}

let latest = ctx.tags().latest();
let start = entry.next_start_block();

// Implicit reorg detection: if latest has moved backward past our
// window, a reorg occurred that we missed (e.g. broadcast lagged).
// Return any removed logs we do have, then reset.
if latest + 1 < start {
trace!(latest, start, "implicit reorg detected, resetting filter");
entry.touch_poll_time();
return Ok(if removed.is_empty() {
entry.empty_output()
} else {
FilterOutput::from(removed)
});
}

if start > latest {
entry.mark_polled(latest);
return Ok(entry.empty_output());
entry.touch_poll_time();
return Ok(if removed.is_empty() {
entry.empty_output()
} else {
FilterOutput::from(removed)
});
}

let cold = ctx.cold();
Expand All @@ -1110,7 +1136,15 @@ where
let stream =
cold.stream_logs(resolved, max_logs, deadline).await.map_err(|e| e.to_string())?;

let logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?;
let mut logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?;

// Prepend removed logs so the client sees removals before
// the replacement data.
if !removed.is_empty() {
let mut combined = removed;
combined.append(&mut logs);
logs = combined;
}

entry.mark_polled(latest);
Ok(FilterOutput::from(logs))
Expand Down
Loading