Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 91 additions & 2 deletions crates/tempo-zone/src/l1_state/tip403/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,96 @@ impl PolicyCheck for PolicyProvider {
}

fn policy_id_counter(&self) -> u64 {
let cache = self.cache.read();
cache.policies().keys().max().map_or(2, |max| max + 1)
// The cache only holds policies the zone has observed events for, so its
// highest key is a lower bound on L1's authoritative `policyIdCounter`
// (policies created before the subscriber started, or never referenced
// by a tracked token, are absent). Returning the cache-derived bound
// would hand the in-zone TIP-403 proxy a value lower than L1's.
let cache_derived = self
.cache
.read()
.policies()
.keys()
.max()
.map_or(2, |max| max + 1);

// Read the authoritative counter from L1 at `last_l1_block` for a
// deterministic, cross-node-consistent value (mirroring the other proxy
// reads). Fall back to the cache-derived bound only if the RPC fails.
let block_number = self.cache.read().last_l1_block();
let registry = ITIP403Registry::new(TIP403_REGISTRY_ADDRESS, &self.provider);
let on_chain = tokio::task::block_in_place(|| {
self.runtime_handle.block_on(async {
registry
.policyIdCounter()
.block(BlockId::number(block_number))
.call()
.await
})
});

match on_chain {
Ok(counter) => counter,
Err(e) => {
self.metrics.rpc_errors.increment(1);
warn!(
block_number,
cache_derived,
%e,
"policyIdCounter RPC failed, using cache-derived lower bound"
);
cache_derived
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::{Bytes, U256};
use alloy_provider::{Provider, ProviderBuilder};
use alloy_transport::mock::Asserter;

fn abi_encode_u64(value: u64) -> Bytes {
Bytes::copy_from_slice(&U256::from(value).to_be_bytes::<32>())
}

fn provider_with(asserter: Asserter, cache: PolicyCache) -> PolicyProvider {
let provider = ProviderBuilder::new_with_network::<TempoNetwork>()
.connect_mocked_client(asserter)
.erased();
PolicyProvider::new(cache, provider, tokio::runtime::Handle::current())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn policy_id_counter_reads_authoritative_value_from_l1() {
let asserter = Asserter::new();
// `policyIdCounter()` (uint64) resolves to 42 on L1, higher than
// anything the cache has observed.
asserter.push_success(&abi_encode_u64(42));

let provider = provider_with(asserter, PolicyCache::default());
let counter = tokio::task::spawn_blocking(move || provider.policy_id_counter())
.await
.unwrap();

assert_eq!(counter, 42);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn policy_id_counter_falls_back_to_cache_on_rpc_error() {
let cache = PolicyCache::default();
{
let mut inner = cache.write();
inner.set_policy_type(5, PolicyType::WHITELIST);
}
// Empty asserter → the RPC call errors → cache-derived lower bound (5 + 1).
let provider = provider_with(Asserter::new(), cache);
let counter = tokio::task::spawn_blocking(move || provider.policy_id_counter())
.await
.unwrap();

assert_eq!(counter, 6);
}
}