diff --git a/crates/tempo-zone/src/l1_state/tip403/provider.rs b/crates/tempo-zone/src/l1_state/tip403/provider.rs index 51fd58a9..37df5e2f 100644 --- a/crates/tempo-zone/src/l1_state/tip403/provider.rs +++ b/crates/tempo-zone/src/l1_state/tip403/provider.rs @@ -659,7 +659,96 @@ impl PolicyCheck for PolicyProvider { } fn policy_id_counter(&self) -> u64 { - let cache = self.cache.read(); - cache.policies().keys().max().map_or(2, |max| max + 1) + // The cache only holds policies the zone has observed events for, so its + // highest key is a lower bound on L1's authoritative `policyIdCounter` + // (policies created before the subscriber started, or never referenced + // by a tracked token, are absent). Returning the cache-derived bound + // would hand the in-zone TIP-403 proxy a value lower than L1's. + let cache_derived = self + .cache + .read() + .policies() + .keys() + .max() + .map_or(2, |max| max + 1); + + // Read the authoritative counter from L1 at `last_l1_block` for a + // deterministic, cross-node-consistent value (mirroring the other proxy + // reads). Fall back to the cache-derived bound only if the RPC fails. + let block_number = self.cache.read().last_l1_block(); + let registry = ITIP403Registry::new(TIP403_REGISTRY_ADDRESS, &self.provider); + let on_chain = tokio::task::block_in_place(|| { + self.runtime_handle.block_on(async { + registry + .policyIdCounter() + .block(BlockId::number(block_number)) + .call() + .await + }) + }); + + match on_chain { + Ok(counter) => counter, + Err(e) => { + self.metrics.rpc_errors.increment(1); + warn!( + block_number, + cache_derived, + %e, + "policyIdCounter RPC failed, using cache-derived lower bound" + ); + cache_derived + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::{Bytes, U256}; + use alloy_provider::{Provider, ProviderBuilder}; + use alloy_transport::mock::Asserter; + + fn abi_encode_u64(value: u64) -> Bytes { + Bytes::copy_from_slice(&U256::from(value).to_be_bytes::<32>()) + } + + fn provider_with(asserter: Asserter, cache: PolicyCache) -> PolicyProvider { + let provider = ProviderBuilder::new_with_network::() + .connect_mocked_client(asserter) + .erased(); + PolicyProvider::new(cache, provider, tokio::runtime::Handle::current()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn policy_id_counter_reads_authoritative_value_from_l1() { + let asserter = Asserter::new(); + // `policyIdCounter()` (uint64) resolves to 42 on L1, higher than + // anything the cache has observed. + asserter.push_success(&abi_encode_u64(42)); + + let provider = provider_with(asserter, PolicyCache::default()); + let counter = tokio::task::spawn_blocking(move || provider.policy_id_counter()) + .await + .unwrap(); + + assert_eq!(counter, 42); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn policy_id_counter_falls_back_to_cache_on_rpc_error() { + let cache = PolicyCache::default(); + { + let mut inner = cache.write(); + inner.set_policy_type(5, PolicyType::WHITELIST); + } + // Empty asserter → the RPC call errors → cache-derived lower bound (5 + 1). + let provider = provider_with(Asserter::new(), cache); + let counter = tokio::task::spawn_blocking(move || provider.policy_id_counter()) + .await + .unwrap(); + + assert_eq!(counter, 6); } }