diff --git a/src/devnet.rs b/src/devnet.rs index a049be6c..17ded036 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -635,6 +635,11 @@ impl Devnet { *node.state.write().await = NodeState::Running; if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { + // Wire the P2PNode into the payment verifier for merkle-closeness checks. + protocol + .payment_verifier_arc() + .attach_p2p_node(Arc::clone(p2p)); + let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); let protocol_clone = Arc::clone(protocol); diff --git a/src/node.rs b/src/node.rs index 43d58298..e16dd662 100644 --- a/src/node.rs +++ b/src/node.rs @@ -130,6 +130,15 @@ impl NodeBuilder { let p2p_arc = Arc::new(p2p_node); + // Wire the P2PNode handle into the payment verifier so merkle-payment + // checks can query the live DHT for peers actually closest to a pool + // midpoint (pay-yourself defence). + if let Some(ref protocol) = ant_protocol { + protocol + .payment_verifier_arc() + .attach_p2p_node(Arc::clone(&p2p_arc)); + } + // Initialize replication engine (if storage is enabled) let replication_engine = if let (Some(ref protocol), Some(fresh_rx)) = (&ant_protocol, fresh_write_rx) { diff --git a/src/payment/verifier.rs b/src/payment/verifier.rs index 8a8263d0..26f45212 100644 --- a/src/payment/verifier.rs +++ b/src/payment/verifier.rs @@ -19,9 +19,12 @@ use evmlib::Network as EvmNetwork; use evmlib::ProofOfPayment; use evmlib::RewardsAddress; use lru::LruCache; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use saorsa_core::identity::node_identity::peer_id_from_public_key_bytes; +use saorsa_core::identity::PeerId; +use saorsa_core::P2PNode; use std::num::NonZeroUsize; +use std::sync::Arc; use std::time::SystemTime; /// Minimum allowed size for a payment proof in bytes. @@ -118,10 +121,101 @@ pub struct PaymentVerifier { cache: VerifiedCache, /// LRU cache of verified merkle pool hashes → on-chain payment info. pool_cache: Mutex>, + /// LRU cache of pool hashes whose candidate closeness has already been + /// verified by this node. Collapses the per-chunk Kademlia lookup cost + /// within a batch (256 chunks × 1 pool = 1 lookup instead of 256). + closeness_pass_cache: Mutex>, + /// In-flight closeness lookups, keyed by pool hash. Lets concurrent PUTs + /// for the same pool coalesce onto a single Kademlia lookup AND share + /// its result — on both success and failure — which bounds `DoS` + /// amplification to one lookup per unique `pool_hash` regardless of + /// concurrency. + inflight_closeness: Mutex>>, + /// P2P node handle, attached post-construction so merkle verification can + /// check that candidate `pub_keys` map to peers actually close to the pool + /// midpoint in the live DHT. `None` in unit tests that don't exercise + /// merkle verification; production startup MUST call [`attach_p2p_node`]. + p2p_node: RwLock>>, /// Configuration. config: PaymentVerifierConfig, } +/// Shared state for an inflight closeness verification. The leader publishes +/// its result via the `OnceLock`; waiters read that result directly instead +/// of racing on a cache re-check. Wrapped in an `Arc` and held both by the +/// leader's drop guard and by each waiting task. +struct ClosenessSlot { + notify: Arc, + /// `Some(Ok(()))` on success, `Some(Err(msg))` on failure, `None` if the + /// leader disappeared without publishing (panic, cancellation). + result: std::sync::OnceLock>, +} + +impl ClosenessSlot { + fn new() -> Self { + Self { + notify: Arc::new(tokio::sync::Notify::new()), + result: std::sync::OnceLock::new(), + } + } + + /// Build an owned `Notified` future that snapshots the `notify_waiters` + /// counter at call time. Awaiting this future after dropping external + /// locks is race-free: if `notify_waiters` fires between construction + /// and the first poll, the snapshot mismatch resolves the future + /// immediately. + fn notified_owned(&self) -> tokio::sync::futures::OwnedNotified { + Arc::clone(&self.notify).notified_owned() + } +} + +/// Drop guard that publishes the leader's result, clears the inflight slot, +/// and wakes all waiters. Fires on every exit path: success, failure, panic, +/// future-cancellation. +/// +/// The guard owns its own `Arc` so `notify_waiters` still +/// fires even if LRU pressure evicted the slot before the leader finished. +/// Waiters see the published result via `result.get()`; the `Notify` is only +/// the wake-up signal. +struct InflightGuard<'a> { + slot_cache: &'a Mutex>>, + pool_hash: PoolHash, + slot: Arc, +} + +impl InflightGuard<'_> { + /// Publish the leader's result. Called exactly once by the leader on + /// every successful or explicit-error exit. If dropped without calling + /// (panic, cancellation) the guard still wakes waiters but leaves + /// `result` empty, which waiters treat as a transient failure and retry. + fn publish(&self, result: &Result<()>) { + let stored: std::result::Result<(), String> = match result { + Ok(()) => Ok(()), + Err(e) => Err(e.to_string()), + }; + let _ = self.slot.result.set(stored); + } +} + +impl Drop for InflightGuard<'_> { + fn drop(&mut self) { + // Remove the slot entry if it's still ours. A separate leader may + // have inserted a new slot for the same pool_hash after LRU + // eviction — don't pop someone else's entry. + { + let mut cache = self.slot_cache.lock(); + if let Some(existing) = cache.peek(&self.pool_hash) { + if Arc::ptr_eq(existing, &self.slot) { + cache.pop(&self.pool_hash); + } + } + } + // Wake every waiter registered against OUR slot, regardless of + // whether the cache entry is still ours. + self.slot.notify.notify_waiters(); + } +} + impl PaymentVerifier { /// Create a new payment verifier. #[must_use] @@ -134,17 +228,47 @@ impl PaymentVerifier { let pool_cache_size = NonZeroUsize::new(DEFAULT_POOL_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN); let pool_cache = Mutex::new(LruCache::new(pool_cache_size)); + let closeness_pass_cache = Mutex::new(LruCache::new(pool_cache_size)); + let inflight_closeness = Mutex::new(LruCache::new(pool_cache_size)); let cache_capacity = config.cache_capacity; info!("Payment verifier initialized (cache_capacity={cache_capacity}, evm=always-on, pool_cache={DEFAULT_POOL_CACHE_CAPACITY})"); + // Loud warning if a production binary was accidentally built with + // `test-utils`: that feature flips the closeness-check fail-open + // switch, disabling the pay-yourself defence when P2PNode isn't + // attached. Safe in tests, never intended for prod. + #[cfg(feature = "test-utils")] + crate::logging::error!( + "PaymentVerifier: built with `test-utils` feature — merkle closeness \ + defence falls back to fail-open when no P2PNode is attached. This \ + feature is for test binaries only; production nodes must be built \ + without it." + ); + Self { cache, pool_cache, + closeness_pass_cache, + inflight_closeness, + p2p_node: RwLock::new(None), config, } } + /// Attach the node's [`P2PNode`] handle so merkle-payment verification can + /// check candidate `pub_keys` against the DHT's actual closest peers to the + /// pool midpoint. + /// + /// Production startup MUST call this once the `P2PNode` exists. Without + /// it, the closeness check fails CLOSED in release builds (rejects the + /// PUT with a visible error) and fails open in test builds. Idempotent: + /// calling twice replaces the handle. + pub fn attach_p2p_node(&self, node: Arc) { + *self.p2p_node.write() = Some(node); + debug!("PaymentVerifier: P2PNode attached for merkle closeness checks"); + } + /// Check if payment is required for the given `XorName`. /// /// This is the main entry point for payment verification: @@ -289,6 +413,16 @@ impl PaymentVerifier { self.cache.insert(xorname); } + /// Pre-populate the merkle pool cache. Testing helper that lets e2e tests + /// bypass the on-chain `completedMerklePayments` lookup when the point of + /// the test is to exercise merkle-verification logic BEFORE the on-chain + /// call (e.g. the pay-yourself closeness check). + #[cfg(any(test, feature = "test-utils"))] + pub fn pool_cache_insert(&self, pool_hash: PoolHash, info: OnChainPaymentInfo) { + let mut cache = self.pool_cache.lock(); + cache.put(pool_hash, info); + } + /// Verify a single-node EVM payment proof. /// /// Verification steps: @@ -455,6 +589,311 @@ impl PaymentVerifier { Ok(()) } + /// Minimum number of candidate `pub_keys` (out of 16) whose derived `PeerId` + /// must match the DHT's actual closest peers to the pool midpoint address. + /// + /// Set below 16/16 to absorb normal routing-table skew between the + /// payer's view and this node's view — on a well-connected network the + /// divergence between two nodes' closest-set views is typically 1-2 + /// peers, occasionally 3 during churn. 13/16 tolerates 3 divergent + /// peers while still limiting how many candidates an attacker can + /// fabricate before the check bites. A lower threshold (e.g. 9/16) + /// would let an attacker who controls 7 real neighbourhood peers plant + /// 7 fabricated candidates and still pass. + /// + /// This is the pure "fabricated key" defence; it does not stop an + /// attacker who can grind the pool midpoint address to land near 13 + /// pre-chosen keys AND run those keys as Sybil DHT participants. That + /// requires an orthogonal Sybil-resistance layer and is out of scope + /// for this check. + const CANDIDATE_CLOSENESS_REQUIRED: usize = 13; + + /// Timeout for the authoritative network lookup used by the closeness + /// check. + /// + /// Iterative Kademlia lookups can cascade through up to 20 iterations, + /// and a single unresponsive peer's dial can take 20-30s before timing + /// out. 60s leaves room for the lookup to converge even under churn + /// while still capping `DoS` amplification at roughly one bounded lookup + /// per forged `pool_hash`. + const CLOSENESS_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); + + /// Maximum waiter → leader retries when the leader's future was cancelled + /// or panicked before publishing a result. Beyond this the waiter returns + /// a visible error rather than spinning indefinitely through a + /// cancellation cascade. + const MAX_LEADER_RETRIES: usize = 4; + + /// Verify that the candidate pool's `pub_keys` correspond to peers that + /// are actually XOR-closest to the pool midpoint address, by querying + /// the DHT for its closest peers to that address and requiring that a + /// majority of the candidates match. + /// + /// **What this blocks**: the "pay yourself" attack. Candidate signatures + /// only cover `(price, reward_address, timestamp)` and the `pub_key` bytes — + /// nothing ties a candidate to a network-registered identity or to the + /// pool neighbourhood. Without this check an attacker can generate 16 + /// ML-DSA keypairs locally, point all 16 `reward_address` fields at a + /// single attacker-controlled wallet, submit the merkle payment, and drain + /// their own payment back out. + /// + /// **How it blocks**: each candidate's `PeerId = BLAKE3(pub_key)`; the DHT + /// is the authoritative source of "which peers exist at this XOR + /// coordinate". If the attacker's 16 fabricated `PeerId`s are not among + /// the peers the network actually lists as closest to the pool address, + /// the pool is forged. + /// + /// **Scope**: a `MerklePaymentProof` carries exactly one `winner_pool` + /// (the pool the smart contract selected for the batch). Every storing + /// node that receives the proof independently re-runs this check against + /// that same pool, so a forged pool is rejected at every node it + /// reaches. + /// + /// **Known limitation — Sybil-grinding**: `midpoint_proof.address()` is a + /// BLAKE3 hash of attacker-controllable inputs (leaf bytes, tree root, + /// timestamp). A determined attacker who *also* runs Sybil DHT nodes can + /// grind the midpoint until it lands in a region where 13 of their + /// Sybil keys are the true network-closest — at which point this check + /// passes for the attacker. Closing that gap requires binding the + /// midpoint to an attacker-uncontrolled value (e.g. a block hash at + /// payment time or an on-chain VRF) or a Sybil-resistant identity + /// layer. This defence raises the attack cost from "free" to "run N + /// Sybil nodes AND grind", which is a meaningful but not complete + /// improvement. + async fn verify_merkle_candidate_closeness( + &self, + pool: &evmlib::merkle_payments::MerklePaymentCandidatePool, + pool_hash: PoolHash, + ) -> Result<()> { + // Fast path: this node already verified this pool successfully. + // A batch of 256 chunks shares one winner_pool, so without this cache + // we'd pay a Kademlia lookup per chunk. + if self.closeness_pass_cache.lock().get(&pool_hash).is_some() { + return Ok(()); + } + + // Single-flight: on each attempt, either claim leadership by + // inserting a fresh `ClosenessSlot`, or wait on an existing leader + // and read its published result. The leader holds an `Arc` to the + // slot independent of the LruCache so waiters are still woken if + // eviction pressure kicked the cache entry. + // + // The `notified_owned()` future snapshots the `notify_waiters` + // counter at the moment of construction (while we hold the lock), + // which makes the subsequent `.await` race-free: if the leader + // calls `notify_waiters` between our construction and our poll, the + // counter has advanced and the future resolves immediately on first + // poll. + // + // Bounded retry: if we're a waiter and the leader gets cancelled or + // panics (slot.result.get() == None after wake-up), we loop back to + // claim leadership. `MAX_LEADER_RETRIES` bounds the attempts so + // adversarial cancellation cascades cannot spin this indefinitely. + for attempt in 0..=Self::MAX_LEADER_RETRIES { + // Release the mutex guard explicitly before any await below. + // Clippy wants `if let ... else` written as `map_or_else`, but + // any such rewrite re-borrows the locked `inflight` inside the + // closure and fails the borrow checker — so the lint is + // silenced here. + #[allow(clippy::option_if_let_else)] + let (waiter_slot, leader_slot) = { + let mut inflight = self.inflight_closeness.lock(); + let chosen = if let Some(existing) = inflight.get(&pool_hash) { + (Some(Arc::clone(existing)), None) + } else { + let slot = Arc::new(ClosenessSlot::new()); + inflight.put(pool_hash, Arc::clone(&slot)); + (None, Some(slot)) + }; + drop(inflight); + chosen + }; + + if let Some(slot) = waiter_slot { + // Build the owned-notified future BEFORE awaiting, so it + // snapshots the `notify_waiters` counter now. The slot + // already existed when we locked, so the leader is either + // running or finished; in both cases the snapshot + counter + // check ensures we wake up correctly. + let notified = slot.notified_owned(); + notified.await; + + // Leader published a result — use it directly. + if let Some(result) = slot.result.get() { + return result.clone().map_err(Error::Payment); + } + // Leader disappeared without publishing (panic or + // cancellation). Slot was cleared by the leader's drop + // guard; loop to become the new leader — unless we've + // hit the retry bound (see MAX_LEADER_RETRIES). + if attempt == Self::MAX_LEADER_RETRIES { + return Err(Error::Payment( + "Merkle candidate pool rejected: closeness leader \ + repeatedly failed to publish a result (likely \ + repeated cancellation or panic)." + .into(), + )); + } + continue; + } + + // Leader path. Drop guard clears the slot and wakes waiters on + // every exit (success, failure, panic, cancellation). + let Some(slot) = leader_slot else { + // Unreachable by construction. + return Err(Error::Payment( + "internal error: neither leader nor waiter in closeness check".into(), + )); + }; + let guard = InflightGuard { + slot_cache: &self.inflight_closeness, + pool_hash, + slot, + }; + + let result = self.verify_merkle_candidate_closeness_inner(pool).await; + guard.publish(&result); + if result.is_ok() { + self.closeness_pass_cache.lock().put(pool_hash, ()); + } + return result; + } + // Unreachable: the for-loop body always either `return`s or `continue`s, + // and the waiter branch's `continue` only runs when `attempt < + // Self::MAX_LEADER_RETRIES`. The last iteration's waiter branch returns + // via the retry-bound check; the leader branch always returns. + Err(Error::Payment( + "internal error: closeness retry loop exited without returning".into(), + )) + } + + /// Inner closeness check: the actual DHT lookup + set-membership test. + /// Wrapped by [`verify_merkle_candidate_closeness`] with a pass-cache and + /// single-flight guard so a batch of chunks and a storm of forged PUTs + /// don't multiply the lookup cost. + async fn verify_merkle_candidate_closeness_inner( + &self, + pool: &evmlib::merkle_payments::MerklePaymentCandidatePool, + ) -> Result<()> { + // Release the RwLock guard before any await to avoid holding it + // across an iterative Kademlia lookup. + let attached = self.p2p_node.read().as_ref().map(Arc::clone); + let Some(p2p_node) = attached else { + // Production must call attach_p2p_node at startup. Fail CLOSED + // to avoid silently disabling the defence if a startup path + // regresses and loses the attach call. Unit-test builds that + // construct a PaymentVerifier directly without exercising merkle + // verification are opted-in via `test-utils` to fall back to + // fail-open. + #[cfg(any(test, feature = "test-utils"))] + { + crate::logging::warn!( + "PaymentVerifier: no P2PNode attached; merkle pay-yourself \ + defence SKIPPED (test build). Production startup MUST call \ + PaymentVerifier::attach_p2p_node." + ); + return Ok(()); + } + #[cfg(not(any(test, feature = "test-utils")))] + { + crate::logging::error!( + "PaymentVerifier: no P2PNode attached; rejecting merkle \ + payment. This is a node-startup bug — \ + PaymentVerifier::attach_p2p_node must be called before \ + any PUT handler runs." + ); + return Err(Error::Payment( + "Merkle candidate pool rejected: verifier is not wired to \ + the P2P layer; cannot verify candidate closeness." + .into(), + )); + } + }; + + let pool_address = pool.midpoint_proof.address(); + + // Derive each candidate's would-be PeerId from its pub_key. Fail + // closed on malformed keys — the candidate signature check ran + // upstream so a valid-looking pool ought to parse cleanly here. + let mut candidate_peer_ids = Vec::with_capacity(pool.candidate_nodes.len()); + for candidate in &pool.candidate_nodes { + let pid = peer_id_from_public_key_bytes(&candidate.pub_key).map_err(|e| { + Error::Payment(format!( + "Invalid ML-DSA public key in merkle candidate: {e}" + )) + })?; + candidate_peer_ids.push(pid); + } + + let lookup_count = pool.candidate_nodes.len(); + let network_lookup = p2p_node + .dht_manager() + .find_closest_nodes_network(&pool_address.0, lookup_count); + let network_peers = + match tokio::time::timeout(Self::CLOSENESS_LOOKUP_TIMEOUT, network_lookup).await { + Ok(Ok(peers)) => peers, + Ok(Err(e)) => { + debug!( + "Merkle closeness network-lookup failed for pool midpoint {}: {e}", + hex::encode(pool_address.0), + ); + return Err(Error::Payment( + "Merkle candidate pool rejected: could not verify candidate \ + closeness against the authoritative network view." + .into(), + )); + } + Err(_) => { + debug!( + "Merkle closeness network-lookup timeout ({:?}) for pool midpoint {}", + Self::CLOSENESS_LOOKUP_TIMEOUT, + hex::encode(pool_address.0), + ); + return Err(Error::Payment( + "Merkle candidate pool rejected: authoritative network lookup \ + timed out. Retry once the network lookup completes." + .into(), + )); + } + }; + + // Set-membership check against the returned closest-peers list. The + // lookup may return fewer than `lookup_count` on a sparse network, + // which only tightens the bar — any candidate not in the returned + // list counts as unmatched. + let network_set: std::collections::HashSet = + network_peers.iter().map(|n| n.peer_id).collect(); + let matched = candidate_peer_ids + .iter() + .filter(|pid| network_set.contains(pid)) + .count(); + + if matched < Self::CANDIDATE_CLOSENESS_REQUIRED { + debug!( + "Merkle closeness rejected: {matched}/{} candidates match the DHT's closest peers \ + for pool midpoint {} (required: {}, network returned {} peers)", + pool.candidate_nodes.len(), + hex::encode(pool_address.0), + Self::CANDIDATE_CLOSENESS_REQUIRED, + network_peers.len(), + ); + return Err(Error::Payment( + "Merkle candidate pool rejected: candidate pub_keys do not match the \ + network's closest peers to the pool midpoint address. Pools must be \ + collected from the pool-address close group, not fabricated off-network." + .into(), + )); + } + + debug!( + "Merkle closeness passed: {matched}/{} candidates matched the DHT's closest peers \ + for pool midpoint {}", + pool.candidate_nodes.len(), + hex::encode(pool_address.0), + ); + Ok(()) + } + /// Verify a merkle batch payment proof. /// /// This verification flow: @@ -495,6 +934,17 @@ impl PaymentVerifier { } } + // Pay-yourself defence: the candidate pub_keys must map to peers the + // live DHT actually considers closest to the pool midpoint. Without + // this, an attacker can point all 16 reward_address fields at a + // self-owned wallet and drain their own payment. Every storing node + // runs this check against the single `winner_pool` in the proof, so a + // forged pool is rejected everywhere it lands. The pass cache and + // single-flight keyed on pool_hash collapse the Kademlia lookup cost + // within a batch and across concurrent PUTs for the same pool. + self.verify_merkle_candidate_closeness(&merkle_proof.winner_pool, pool_hash) + .await?; + // Check pool cache first let cached_info = { let mut pool_cache = self.pool_cache.lock(); @@ -684,6 +1134,7 @@ impl PaymentVerifier { #[allow(clippy::expect_used)] mod tests { use super::*; + use evmlib::merkle_payments::MerklePaymentCandidatePool; /// Create a verifier for unit tests. EVM is always on, but tests can /// pre-populate the cache to bypass on-chain verification. @@ -1426,6 +1877,140 @@ mod tests { } } + #[tokio::test] + async fn closeness_pass_cache_short_circuits_second_call() { + // When a pool_hash is in the closeness_pass_cache, the outer + // verify_merkle_candidate_closeness must return Ok(()) without + // running the inner lookup — even if no P2PNode is attached. + // That second half (no-p2p → would normally fail-closed in release) + // is the proof the cache short-circuit ran first. + let verifier = create_test_verifier(); + let pool_hash = [0xAAu8; 32]; + verifier.closeness_pass_cache.lock().put(pool_hash, ()); + + // Construct a dummy pool — contents don't matter because the cache + // hit means we never look at them. + let pool = MerklePaymentCandidatePool { + midpoint_proof: fake_midpoint_proof(), + candidate_nodes: make_candidate_nodes(1_700_000_000), + }; + + let result = verifier + .verify_merkle_candidate_closeness(&pool, pool_hash) + .await; + assert!( + result.is_ok(), + "cached pool hash must bypass the inner check and return Ok(()), got: {result:?}" + ); + } + + #[tokio::test] + async fn closeness_single_flight_concurrent_readers_share_one_verification() { + // Two concurrent callers for the same pool_hash should produce the + // same outcome, and the cache should end up populated exactly once. + // We use the test-utils fail-open path to short-circuit the inner + // DHT lookup; the purpose of this test is the single-flight + // plumbing, not the lookup itself. + let verifier = Arc::new(create_test_verifier()); + let pool_hash = [0x77u8; 32]; + let pool = MerklePaymentCandidatePool { + midpoint_proof: fake_midpoint_proof(), + candidate_nodes: make_candidate_nodes(1_700_000_000), + }; + + let v1 = Arc::clone(&verifier); + let p1 = pool.clone(); + let v2 = Arc::clone(&verifier); + let p2 = pool.clone(); + + let (r1, r2) = tokio::join!( + async move { v1.verify_merkle_candidate_closeness(&p1, pool_hash).await }, + async move { v2.verify_merkle_candidate_closeness(&p2, pool_hash).await }, + ); + + assert_eq!(r1.is_ok(), r2.is_ok(), "concurrent callers must agree"); + assert!( + r1.is_ok(), + "both callers must succeed on the test-utils path" + ); + assert!( + verifier + .closeness_pass_cache + .lock() + .get(&pool_hash) + .is_some(), + "success path must populate the pass cache" + ); + assert!( + verifier.inflight_closeness.lock().get(&pool_hash).is_none(), + "inflight slot must be cleared after the leader finishes" + ); + } + + #[tokio::test] + async fn closeness_waiter_reads_leaders_published_failure() { + // Prove the waiter path actually surfaces a failure published by a + // concurrent leader, without running its own inner check. Insert a + // slot, spawn a waiter (which will park on notified_owned), then + // publish failure + notify from the outside — simulating what the + // leader's `publish` + drop-guard pair does. + let verifier = Arc::new(create_test_verifier()); + let pool_hash = [0x55u8; 32]; + let slot = Arc::new(ClosenessSlot::new()); + verifier + .inflight_closeness + .lock() + .put(pool_hash, Arc::clone(&slot)); + + let pool = MerklePaymentCandidatePool { + midpoint_proof: fake_midpoint_proof(), + candidate_nodes: make_candidate_nodes(1_700_000_000), + }; + + let verifier_c = Arc::clone(&verifier); + let pool_c = pool.clone(); + let waiter = tokio::spawn(async move { + verifier_c + .verify_merkle_candidate_closeness(&pool_c, pool_hash) + .await + }); + + // Yield so the waiter can run up to its `notified_owned().await`. + // A few yields cover both single-threaded and multi-threaded tokio + // runtimes regardless of scheduling. + for _ in 0..5 { + tokio::task::yield_now().await; + } + + // Simulate the leader's `publish` + drop-guard: publish the result, + // clear the slot, wake waiters. + slot.result + .set(Err("forged pool: not close enough".to_string())) + .expect("set once"); + verifier.inflight_closeness.lock().pop(&pool_hash); + slot.notify.notify_waiters(); + + let result = waiter.await.expect("task panicked"); + let err = result.expect_err("waiter must return the leader's published failure"); + assert!( + err.to_string().contains("forged pool"), + "waiter must surface the leader's error message, got: {err}" + ); + } + + /// Build a deterministic but otherwise-unused `MidpointProof` so unit + /// tests can construct a `MerklePaymentCandidatePool` without spinning + /// up a real merkle tree. The closeness path only calls `.address()` + /// on it, which is a pure BLAKE3 of the branch's leaf/root/timestamp — + /// the values don't need to be tree-valid for these tests. + fn fake_midpoint_proof() -> evmlib::merkle_payments::MidpointProof { + // Build a minimal tree of two leaves so we get a real branch. + let leaves = vec![xor_name::XorName([1u8; 32]), xor_name::XorName([2u8; 32])]; + let tree = evmlib::merkle_payments::MerkleTree::from_xornames(leaves).expect("tree"); + let candidates = tree.reward_candidates(1_700_000_000).expect("candidates"); + candidates.first().expect("at least one").clone() + } + // ========================================================================= // Merkle verification unit tests // ========================================================================= diff --git a/tests/e2e/merkle_payment.rs b/tests/e2e/merkle_payment.rs index e9bf6bb7..480164a2 100644 --- a/tests/e2e/merkle_payment.rs +++ b/tests/e2e/merkle_payment.rs @@ -9,8 +9,9 @@ //! - Merkle proof with tampered candidate signatures rejected //! - Merkle proof construction, serialization, and size validation //! - Concurrent merkle proof verification across multiple nodes +//! - Pay-yourself attack rejected (fabricated candidate pool) -#![allow(clippy::unwrap_used, clippy::expect_used)] +#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] use super::harness::TestHarness; use super::testnet::TestNetworkConfig; @@ -25,7 +26,7 @@ use ant_node::payment::{ use evmlib::common::Amount; use evmlib::merkle_payments::{ MerklePaymentCandidateNode, MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree, - CANDIDATES_PER_POOL, + OnChainPaymentInfo, CANDIDATES_PER_POOL, }; use evmlib::testnet::Testnet; use evmlib::RewardsAddress; @@ -473,3 +474,118 @@ async fn test_attack_merkle_proof_cross_address_replay() -> Result<(), Box Result<(), Box> +{ + info!("MERKLE ATTACK TEST: pay-yourself via fabricated candidate pool"); + + // Minimal testnet (5 nodes) — attacker-generated candidate pub_keys hash + // to PeerIds uniformly across the 2^256 ID space and will not match any + // of these 5 nodes' PeerIds with any meaningful probability. + let config = TestNetworkConfig::minimal(); + let harness = TestHarness::setup_with_config(config).await?; + // Let routing tables settle so find_closest_nodes_network returns a + // stable set. + sleep(Duration::from_secs(5)).await; + + let proof = build_valid_merkle_proof(); + + // Deserialize to compute the pool hash, then pre-populate the storing + // node's pool cache so the on-chain lookup is bypassed. The fake + // `OnChainPaymentInfo` is crafted so the proof would pass every OTHER + // check (address-proof depth, per-node payment formula, paid-node + // addresses). That way the ONLY check capable of rejecting the proof is + // the pay-yourself closeness check — giving the test a clean signal. + let parsed = ant_node::payment::deserialize_merkle_proof(&proof.tagged_proof) + .expect("deserialize forged proof"); + let pool_hash = parsed.winner_pool_hash(); + let ts = parsed + .winner_pool + .candidate_nodes + .first() + .expect("pool has candidates") + .merkle_payment_timestamp; + // 4-leaf tree → depth 2 (log2(4)). Prices are all 1024 in + // build_candidate_nodes, so per-node payment = 1024 * 2^2 / 2 = 2048. + let depth: u8 = 2; + let per_node = Amount::from(4096u64); + let paid_node_addresses = vec![ + (RewardsAddress::new([0u8; 20]), 0usize, per_node), + (RewardsAddress::new([1u8; 20]), 1usize, per_node), + ]; + let fake_info = OnChainPaymentInfo { + depth, + merkle_payment_timestamp: ts, + paid_node_addresses, + }; + + let node = harness.test_node(0).ok_or("no test node 0")?; + let protocol = node + .ant_protocol + .as_ref() + .ok_or("no ant_protocol on node")?; + protocol + .payment_verifier() + .pool_cache_insert(pool_hash, fake_info); + + let request = ChunkPutRequest::with_payment( + proof.target.address.0, + proof.target.content.clone(), + proof.tagged_proof, + ); + + let response = send_put_to_node(&harness, 0, request) + .await + .map_err(|e| format!("Send failed: {e}"))?; + + // Extract the error message so the test can report WHY verification + // passed or failed. The pay-yourself defence is the ONLY layer capable of + // rejecting this proof once on-chain lookup is bypassed — if the response + // is a Success or an unrelated error, the attack works. + match response.body { + ChunkMessageBody::PutResponse(ChunkPutResponse::Error(ProtocolError::PaymentFailed( + ref msg, + ))) if msg.contains("closest peers") + || msg.contains("closeness") + || msg.contains("authoritative network") => + { + info!("Correctly rejected pay-yourself attack: {msg}"); + } + ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) => { + panic!( + "Pay-yourself attack SUCCEEDED: verifier accepted a pool of 16 attacker-generated \ + candidates all rewarding the payer. Merkle closeness defence is missing." + ); + } + other => { + panic!( + "Pay-yourself attack was rejected for the WRONG reason — expected a closeness \ + rejection, got: {other:?}. The closeness defence is missing; the proof only \ + failed because of an unrelated check." + ); + } + } + + harness.teardown().await?; + Ok(()) +} diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index 40ba7473..a452fdda 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -1176,6 +1176,11 @@ impl TestNetwork { // Start protocol handler that routes incoming P2P messages to AntProtocol if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { + // Wire the P2PNode into the payment verifier so merkle-payment + // verification can run the pay-yourself closeness check against + // the live DHT. + protocol.payment_verifier().attach_p2p_node(Arc::clone(p2p)); + let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); let protocol_clone = Arc::clone(protocol);