Skip to content

feat: handle reorgs in get_filter_changes with reorg watermark#98

Merged
prestwich merged 13 commits intodevelopfrom
james/eng-1971
Mar 17, 2026
Merged

feat: handle reorgs in get_filter_changes with reorg watermark#98
prestwich merged 13 commits intodevelopfrom
james/eng-1971

Conversation

@prestwich
Copy link
Member

@prestwich prestwich commented Mar 10, 2026

Summary

  • Replaces per-filter reorg storage with a global ring buffer on FilterManagerInner that retains recent ReorgNotifications
  • Spawns a FilterReorgTask that subscribes to ChainEvent::Reorg broadcasts and eagerly populates the ring buffer
  • On poll, get_filter_changes scans the ring buffer for notifications received since the filter's last poll, lazily computes removed logs, and rewinds next_start_block for the subsequent forward scan
  • Adds an implicit reorg detection fallback (latest + 1 < start) for cases where the broadcast was missed
  • Restructures RemovedBlock to flat number/hash/timestamp fields (replaces full Header)
  • Preserves block_timestamp on removed logs for Ethereum JSON-RPC spec compliance

Closes ENG-1971

Stack

This PR includes commits from #96 and #97. Review only the top commits.

  1. feat: update BlockTags during reorgs to prevent stale tag window #96BlockTags::rewind_to for reorg tag updates
  2. feat: handle ChainEvent::Reorg in SubscriptionTask #97SubscriptionTask reorg handling
  3. feat: handle reorgs in get_filter_changes with reorg watermark #98 ← this PRget_filter_changes reorg handling with global ring buffer
  4. test: integration tests for reorg tracking in RPC subscriptions and filters #99 — Integration tests (includes feat: update BlockTags during reorgs to prevent stale tag window #96, feat: handle ChainEvent::Reorg in SubscriptionTask #97, feat: handle reorgs in get_filter_changes with reorg watermark #98)

Test plan

  • cargo clippy -p signet-rpc --all-features --all-targets — clean
  • cargo clippy -p signet-rpc --no-default-features --all-targets — clean
  • cargo +nightly fmt — clean
  • cargo t -p signet-rpc — 35 tests + 4 doc-tests pass
  • Review that FilterReorgTask self-terminates when FilterManager is dropped (uses Weak)
  • Review implicit reorg detection edge cases

🤖 Generated with Claude Code

@prestwich prestwich requested a review from a team as a code owner March 10, 2026 12:03
Copy link
Member

@Evalir Evalir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine. It requires thinking quite a bit through the possible reorg cases but the flow is clear to me. I'm good with filter changes returning the empty output instead of querying an impossible range

@Evalir Evalir self-requested a review March 13, 2026 14:22
prestwich and others added 2 commits March 13, 2026 13:47
Add a `reorg_watermark` field to `ActiveFilter` that records the common
ancestor block when a chain reorganization occurs. `FilterManager` now
subscribes to `ChainEvent::Reorg` broadcasts and eagerly propagates
watermarks to all active filters. On the next poll, `get_filter_changes`
rewinds `next_start_block` so re-fetched data reflects the new chain.

An implicit reorg detection check (latest < start) provides a
belt-and-suspenders fallback when the explicit watermark is missed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the simple u64 watermark with a Vec of Arc<ReorgNotification>
paired with next_start_block snapshots. This fixes two issues:

1. Race condition: filters created after a reorg no longer receive
   false watermarks, guarded by a created_at timestamp comparison.

2. Missing removed logs: get_filter_changes now emits `removed: true`
   logs per the Ethereum JSON-RPC spec. Each pending reorg's snapshot
   determines which removed blocks the client already saw.

Restructure ReorgNotification to group logs per block (RemovedBlock)
so filters can determine relevance by block number. The Arc sharing
means no log data is cloned until poll time, and automatic drop
eliminates the need for cleanup passes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@prestwich prestwich changed the base branch from james/eng-1970 to develop March 13, 2026 17:50
@prestwich prestwich marked this pull request as ready for review March 13, 2026 17:50
@prestwich
Copy link
Member Author

@Evalir we have redesigned the watermarking to store pending notification in each filter

@prestwich prestwich requested a review from Fraser999 March 13, 2026 17:50
@prestwich
Copy link
Member Author

[Claude Code]

Code review

Found 2 issues:

  1. Removed logs silently discarded on early-return paths in get_filter_changes.
    drain_reorgs() is called unconditionally at the top and populates removed, but two early-return paths discard those logs without sending them to the client:

    • The implicit reorg guard (if latest + 1 < start) returns entry.empty_output(). The comment says "Removed logs are unavailable in this degraded path," but they are available in removed when the explicit broadcast was received before the implicit check fires.
    • The existing if start > latest guard also returns entry.empty_output(). After a reorg rewinds next_start_block to common_ancestor + 1, if latest == common_ancestor this path fires and the drained removed logs are dropped.

    In both cases the client never receives the removed: true logs it needs to invalidate stale state.

    let latest = ctx.tags().latest();
    let start = entry.next_start_block();
    // Implicit reorg detection: if latest has moved backward past our
    // window, a reorg occurred that we missed (e.g. broadcast lagged).
    // Reset to avoid skipping. Removed logs are unavailable in this
    // degraded path.
    if latest + 1 < start {
    trace!(latest, start, "implicit reorg detected, resetting filter");
    entry.mark_polled(latest);
    return Ok(entry.empty_output());
    }
    if start > latest {
    entry.mark_polled(latest);
    return Ok(entry.empty_output());
    }
    let cold = ctx.cold();

  2. block_timestamp regression from PR feat: handle ChainEvent::Reorg in SubscriptionTask #97 review feedback.
    PR feat: handle ChainEvent::Reorg in SubscriptionTask #97 added full headers to RemovedBlock specifically to populate block_timestamp on removed logs, per Fraser999's review comment requesting Ethereum JSON-RPC spec compliance. This PR replaces header: Header with flat number/hash fields, dropping timestamp entirely. Both drain_reorgs and filter_reorg_for_sub now set block_timestamp: None. The test that previously asserted block_timestamp.unwrap() was also removed. Consider adding a timestamp: u64 field to RemovedBlock to preserve spec compliance.

    /// A block that was removed during a chain reorganization.
    #[derive(Debug, Clone)]
    pub struct RemovedBlock {
    /// The block number.
    pub number: u64,
    /// The block hash.
    pub hash: alloy::primitives::B256,
    /// Logs emitted in the removed block.
    pub logs: Vec<alloy::primitives::Log>,

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

prestwich and others added 2 commits March 14, 2026 08:56
Update module-level and struct-level documentation to reflect the new
FilterReorgTask tokio worker spawned alongside the existing OS cleanup
thread. Remove unnecessary `if !removed.is_empty()` guard around the
prepend logic in get_filter_changes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Thread timestamp from DrainedBlock.header through RemovedBlock so that
drain_reorgs and filter_reorg_for_sub populate block_timestamp on
removed logs, restoring Ethereum JSON-RPC spec compliance lost during
the reorg redesign.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@prestwich
Copy link
Member Author

block timestamp regression addressed in 74cad40

Per-filter `pending_reorgs` accumulated `Arc<ReorgNotification>`
eagerly via O(n) iteration on every reorg. Two early-return paths in
`get_filter_changes` then discarded the drained removed logs —
notably `start > latest` fires every post-reorg poll before new
blocks arrive.

Replace with a global `RwLock<VecDeque<(Instant, Arc)>>` ring buffer
(cap 25) on FilterManagerInner. Filters compute removed logs lazily
at poll time by scanning entries received since `last_poll_time`,
walking reorgs in order to derive snapshots. Both early-return paths
now return removed logs instead of `empty_output()`.

- Remove `pending_reorgs`, `created_at`, `push_reorg`, `drain_reorgs`
  from ActiveFilter
- Add `compute_removed_logs`, `last_poll_time` accessor
- Add `push_reorg` (ring buffer append) and `reorgs_since` to
  FilterManagerInner
- Simplify FilterReorgTask to just append (no per-filter iteration)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@prestwich
Copy link
Member Author

4c9f2f1 includes a further refactor that addresses the dropped logs

@prestwich
Copy link
Member Author

[Claude Code]

Code review

Found 2 issues:

  1. mark_polled(latest) overwrites the reorg rewind on the implicit-reorg path. compute_removed_logs rewinds next_start_block to common_ancestor + 1, then mark_polled(latest) unconditionally sets it to latest + 1. Since latest < common_ancestor on this branch, the rewind is lost and the next poll skips blocks between latest + 1 and common_ancestor.

// Return any removed logs we do have, then reset.
if latest + 1 < start {
trace!(latest, start, "implicit reorg detected, resetting filter");
entry.mark_polled(latest);
return Ok(if removed.is_empty() {
entry.empty_output()
} else {
FilterOutput::from(removed)
});

  1. block_timestamp regression guard removed from filter_reorg_for_sub_matches_logs. PR feat: handle ChainEvent::Reorg in SubscriptionTask #97 added this assertion per Fraser999's review for spec compliance. Commit 74cad40 on this branch fixed the production code, but the test assertion was dropped in 4c9f2f1 — leaving the fix untested.

assert_eq!(logs.len(), 1);
assert!(logs[0].removed);
assert_eq!(logs[0].inner.address, addr);
assert_eq!(logs[0].block_hash, Some(block_hash));
assert_eq!(logs[0].block_number, Some(11));
}

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

Replace mark_polled(latest) with touch_poll_time() on the implicit-reorg
and no-new-blocks early-return paths. mark_polled unconditionally
overwrites next_start_block, discarding the rewind applied by
compute_removed_logs. The new touch_poll_time method updates only
last_poll_time so the next forward scan starts from the correct position.

Also restore the block_timestamp assertion in filter_reorg_for_sub test
that was dropped during the ring buffer refactor.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@prestwich
Copy link
Member Author

[Claude Code]

Ring buffer enthusiasm score for this PR: 11/10. We have graduated from "store everything per-filter" to "one ring buffer to rule them all." Next PR will presumably replace the database with a very large ring buffer.

pub(crate) struct FilterManagerInner {
current_id: AtomicU64,
filters: DashMap<FilterId, ActiveFilter>,
reorgs: RwLock<VecDeque<(Instant, Arc<ReorgNotification>)>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we put the ring buffer in the ChainNotifier rather than here, we'd eliminate a class of errors by avoiding the need for a broadcast receiver in the FilterReorgTask (i.e. no possibility of the receiver lagging).

The ChainNotifier would write to the ring buffer before broadcasting, and the broadcasts would only be consumed by push-based subscriptions. Polling filters would directly hit the ring buffer, which would be guaranteed complete (assuming the size of the ring buffer is suitable).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done. Moved the ring buffer from FilterManagerInner into ChainNotifier. send_reorg now writes to the buffer before broadcasting, and FilterManager reads it directly via ChainNotifier::reorgs_since(). FilterReorgTask and the broadcast receiver are deleted entirely — no more lag risk.

/// The block timestamp.
pub timestamp: u64,
/// Logs emitted in the removed block.
pub logs: Vec<alloy::primitives::Log>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this wasn't changed in this PR, but claude has picked up that this should be alloy::rpc::types::Log so that we retain the tx hash and log index, which are supposed to be returned to clients for removed logs according to the Ethereum spec.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done. RemovedBlock.logs is now Vec<alloy::rpc::types::Log>. The producer in notify_reorg no longer strips .inner, so transaction_hash and log_index from ColdReceipt flow through to consumers. Both compute_removed_logs and filter_reorg_for_sub clone the log and set removed: true instead of reconstructing with None fields.

prestwich and others added 5 commits March 17, 2026 09:02
Addresses Fraser's PR #98 review feedback with two changes:
1. Move reorg ring buffer from FilterManagerInner to ChainNotifier
2. Preserve alloy::rpc::types::Log in RemovedBlock for spec compliance

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add Arc wrapper on reorgs field (ChainNotifier is Clone)
- Fix ColdReceipt type name (Receipt<RpcLog>, not ConsensusReceipt)
- Note module-level and struct-level doc updates needed
- Clarify send_reorg return semantics with ring buffer

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Addresses spec review feedback:
- Preserve Deref impl on FilterManager
- Populate block metadata in test log helpers
- Add missing reorgs_since_skips_pre_creation test
- Simplify send_reorg (clone instead of Arc::try_unwrap)
- Clean up confusing false-start text

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Change RemovedBlock.logs from Vec<alloy::primitives::Log> to
Vec<alloy::rpc::types::Log>. The producer (notify_reorg) now passes
through the full RPC log from ColdReceipt instead of stripping the
wrapper. Consumers (compute_removed_logs, filter_reorg_for_sub) clone
and set removed=true instead of reconstructing with None fields.

This satisfies the Ethereum JSON-RPC spec requirement that removed
logs include transaction_hash and log_index.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
ChainNotifier now writes to an authoritative ring buffer before
broadcasting reorg events. Polling filters read it directly via
reorgs_since(), eliminating the broadcast::Receiver lag bug where
FilterReorgTask could silently drop reorg notifications.

Deletes FilterReorgTask entirely. FilterManager::new now takes a
ChainNotifier instead of a broadcast::Sender.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@prestwich
Copy link
Member Author

[Claude Code]

Pushed 3 new commits addressing both review comments:

  1. fix: preserve tx_hash and log_index on removed logsRemovedBlock.logs is now Vec<alloy::rpc::types::Log>. The producer stops stripping .inner, so transaction_hash and log_index from ColdReceipt flow through. Both consumers clone and set removed: true instead of reconstructing with None fields.

  2. refactor: move reorg ring buffer from FilterManager to ChainNotifierChainNotifier::send_reorg writes to an authoritative ring buffer before broadcasting. Polling filters read it directly via reorgs_since(). FilterReorgTask and the broadcast receiver are deleted entirely — no more lag/correctness risk.

  3. chore: remove plan/spec files — Accidentally committed some design docs from a new Claude skill we're trying out. Blame Swanny for that one.

@prestwich prestwich enabled auto-merge (squash) March 17, 2026 14:42
@prestwich prestwich merged commit 72aa2dc into develop Mar 17, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants