From 17a885af5903c05570ac0d46e640e51b4324c824 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 22 Apr 2026 16:57:52 +0900 Subject: [PATCH 1/4] fix(payment): block merkle pay-yourself via DHT closeness check Merkle verification never tied the 16 candidate pub_keys to the pool midpoint's actual close group on the network. An attacker could generate 16 ML-DSA keypairs locally, point every reward_address at a self-owned wallet, fund the on-chain merkle payment, and receive their own rewards back. Add a closeness defence: for each MerklePaymentProof, the verifier queries the DHT for the closest peers to the winner_pool's midpoint address and requires a majority of the 16 candidate pub_keys to hash to PeerIds that appear in that returned set. Hooked in between the cheap signature check and the on-chain RPC so a forged pool is rejected without paying for an on-chain round-trip. Every storing node that receives the proof runs the same check against the single winner_pool. PaymentVerifier grows an optional Arc attached at startup via attach_p2p_node; node.rs, devnet.rs, and the e2e testnet harness all wire it after P2PNode construction. Unit tests that don't exercise merkle verification keep a None handle and log a warning. New e2e test test_attack_merkle_pay_yourself_fabricated_pool fails on main (verifier accepts 16 attacker-generated keys all rewarding the payer) and passes with the fix. --- src/devnet.rs | 5 + src/node.rs | 9 ++ src/payment/verifier.rs | 185 +++++++++++++++++++++++++++++++++++- tests/e2e/merkle_payment.rs | 120 ++++++++++++++++++++++- tests/e2e/testnet.rs | 5 + 5 files changed, 321 insertions(+), 3 deletions(-) diff --git a/src/devnet.rs b/src/devnet.rs index a049be6c..17ded036 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -635,6 +635,11 @@ impl Devnet { *node.state.write().await = NodeState::Running; if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { + // Wire the P2PNode into the payment verifier for merkle-closeness checks. + protocol + .payment_verifier_arc() + .attach_p2p_node(Arc::clone(p2p)); + let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); let protocol_clone = Arc::clone(protocol); diff --git a/src/node.rs b/src/node.rs index 43d58298..e16dd662 100644 --- a/src/node.rs +++ b/src/node.rs @@ -130,6 +130,15 @@ impl NodeBuilder { let p2p_arc = Arc::new(p2p_node); + // Wire the P2PNode handle into the payment verifier so merkle-payment + // checks can query the live DHT for peers actually closest to a pool + // midpoint (pay-yourself defence). + if let Some(ref protocol) = ant_protocol { + protocol + .payment_verifier_arc() + .attach_p2p_node(Arc::clone(&p2p_arc)); + } + // Initialize replication engine (if storage is enabled) let replication_engine = if let (Some(ref protocol), Some(fresh_rx)) = (&ant_protocol, fresh_write_rx) { diff --git a/src/payment/verifier.rs b/src/payment/verifier.rs index 8a8263d0..6aba03fa 100644 --- a/src/payment/verifier.rs +++ b/src/payment/verifier.rs @@ -19,9 +19,12 @@ use evmlib::Network as EvmNetwork; use evmlib::ProofOfPayment; use evmlib::RewardsAddress; use lru::LruCache; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use saorsa_core::identity::node_identity::peer_id_from_public_key_bytes; +use saorsa_core::identity::PeerId; +use saorsa_core::P2PNode; use std::num::NonZeroUsize; +use std::sync::Arc; use std::time::SystemTime; /// Minimum allowed size for a payment proof in bytes. @@ -118,6 +121,11 @@ pub struct PaymentVerifier { cache: VerifiedCache, /// LRU cache of verified merkle pool hashes → on-chain payment info. pool_cache: Mutex>, + /// P2P node handle, attached post-construction so merkle verification can + /// check that candidate `pub_keys` map to peers actually close to the pool + /// midpoint in the live DHT. `None` in unit tests that don't exercise + /// merkle verification; production startup MUST call [`attach_p2p_node`]. + p2p_node: RwLock>>, /// Configuration. config: PaymentVerifierConfig, } @@ -141,10 +149,22 @@ impl PaymentVerifier { Self { cache, pool_cache, + p2p_node: RwLock::new(None), config, } } + /// Attach the node's [`P2PNode`] handle so merkle-payment verification can + /// check candidate `pub_keys` against the DHT's actual closest peers to the + /// pool midpoint. + /// + /// Production startup MUST call this once the `P2PNode` exists. Without + /// it, the closeness check fails open (logs a warning and skips) so that + /// unit tests which never exercise merkle verification still work. + pub fn attach_p2p_node(&self, node: Arc) { + *self.p2p_node.write() = Some(node); + } + /// Check if payment is required for the given `XorName`. /// /// This is the main entry point for payment verification: @@ -289,6 +309,16 @@ impl PaymentVerifier { self.cache.insert(xorname); } + /// Pre-populate the merkle pool cache. Testing helper that lets e2e tests + /// bypass the on-chain `completedMerklePayments` lookup when the point of + /// the test is to exercise merkle-verification logic BEFORE the on-chain + /// call (e.g. the pay-yourself closeness check). + #[cfg(any(test, feature = "test-utils"))] + pub fn pool_cache_insert(&self, pool_hash: PoolHash, info: OnChainPaymentInfo) { + let mut cache = self.pool_cache.lock(); + cache.put(pool_hash, info); + } + /// Verify a single-node EVM payment proof. /// /// Verification steps: @@ -455,6 +485,150 @@ impl PaymentVerifier { Ok(()) } + /// Minimum number of candidate `pub_keys` (out of 16) whose derived `PeerId` + /// must match the DHT's actual closest peers to the pool midpoint address. + /// + /// Set to a majority rather than 16/16 so routing-table skew between the + /// payer's view and this node's view is tolerated. 9/16 absorbs small + /// divergence while still biting on fabricated pools: BLAKE3 of a random + /// ML-DSA key lands uniformly in the 2^256 ID space, so an attacker + /// without real peer identities cannot plant 9 `pub_keys` into the exact + /// close group of an arbitrary target address. + const CANDIDATE_CLOSENESS_REQUIRED: usize = 9; + + /// Timeout for the authoritative network lookup used by the closeness + /// check. A forged pool can trigger exactly one bounded Kademlia lookup; + /// subsequent chunks from the same batch hit the pool cache and pay no + /// extra cost. + const CLOSENESS_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + + /// Verify that the candidate pool's `pub_keys` correspond to peers that + /// are actually XOR-closest to the pool midpoint address, by querying + /// the DHT for its closest peers to that address and requiring that a + /// majority of the candidates match. + /// + /// **What this blocks**: the "pay yourself" attack. Candidate signatures + /// only cover `(price, reward_address, timestamp)` and the `pub_key` bytes — + /// nothing ties a candidate to a network-registered identity or to the + /// pool neighbourhood. Without this check an attacker can generate 16 + /// ML-DSA keypairs locally, point all 16 `reward_address` fields at a + /// single attacker-controlled wallet, submit the merkle payment, and drain + /// their own payment back out. + /// + /// **How it blocks**: each candidate's `PeerId = BLAKE3(pub_key)`; the DHT + /// is the authoritative source of "which peers exist at this XOR + /// coordinate". If the attacker's 16 fabricated `PeerId`s are not among + /// the peers the network actually lists as closest to the pool address, + /// the pool is forged. + /// + /// **Scope**: a `MerklePaymentProof` carries exactly one `winner_pool` + /// (the pool the smart contract selected for the batch). Every storing + /// node that receives the proof independently re-runs this check against + /// that same pool, so a forged pool is rejected at every node it + /// reaches. + async fn verify_merkle_candidate_closeness( + &self, + pool: &evmlib::merkle_payments::MerklePaymentCandidatePool, + ) -> Result<()> { + // Release the RwLock guard before any await to avoid holding it + // across an iterative Kademlia lookup. + let attached = self.p2p_node.read().as_ref().map(Arc::clone); + let Some(p2p_node) = attached else { + // Production startup must call attach_p2p_node. The fail-open + // path is only reachable in unit-test setups that construct a + // PaymentVerifier directly without exercising merkle checks. + crate::logging::warn!( + "PaymentVerifier: no P2PNode attached; merkle pay-yourself \ + defence DISABLED. Call PaymentVerifier::attach_p2p_node \ + during node startup." + ); + return Ok(()); + }; + + let pool_address = pool.midpoint_proof.address(); + + // Derive each candidate's would-be PeerId from its pub_key. Fail + // closed on malformed keys — the candidate signature check ran + // upstream so a valid-looking pool ought to parse cleanly here. + let mut candidate_peer_ids = Vec::with_capacity(pool.candidate_nodes.len()); + for candidate in &pool.candidate_nodes { + let pid = peer_id_from_public_key_bytes(&candidate.pub_key).map_err(|e| { + Error::Payment(format!( + "Invalid ML-DSA public key in merkle candidate: {e}" + )) + })?; + candidate_peer_ids.push(pid); + } + + let lookup_count = pool.candidate_nodes.len(); + let network_lookup = p2p_node + .dht_manager() + .find_closest_nodes_network(&pool_address.0, lookup_count); + let network_peers = + match tokio::time::timeout(Self::CLOSENESS_LOOKUP_TIMEOUT, network_lookup).await { + Ok(Ok(peers)) => peers, + Ok(Err(e)) => { + debug!( + "Merkle closeness network-lookup failed for pool midpoint {}: {e}", + hex::encode(pool_address.0), + ); + return Err(Error::Payment( + "Merkle candidate pool rejected: could not verify candidate \ + closeness against the authoritative network view." + .into(), + )); + } + Err(_) => { + debug!( + "Merkle closeness network-lookup timeout ({:?}) for pool midpoint {}", + Self::CLOSENESS_LOOKUP_TIMEOUT, + hex::encode(pool_address.0), + ); + return Err(Error::Payment( + "Merkle candidate pool rejected: authoritative network lookup \ + timed out. Retry once the network lookup completes." + .into(), + )); + } + }; + + // Set-membership check against the returned closest-peers list. The + // lookup may return fewer than `lookup_count` on a sparse network, + // which only tightens the bar — any candidate not in the returned + // list counts as unmatched. + let network_set: std::collections::HashSet = + network_peers.iter().map(|n| n.peer_id).collect(); + let matched = candidate_peer_ids + .iter() + .filter(|pid| network_set.contains(pid)) + .count(); + + if matched < Self::CANDIDATE_CLOSENESS_REQUIRED { + debug!( + "Merkle closeness rejected: {matched}/{} candidates match the DHT's closest peers \ + for pool midpoint {} (required: {}, network returned {} peers)", + pool.candidate_nodes.len(), + hex::encode(pool_address.0), + Self::CANDIDATE_CLOSENESS_REQUIRED, + network_peers.len(), + ); + return Err(Error::Payment( + "Merkle candidate pool rejected: candidate pub_keys do not match the \ + network's closest peers to the pool midpoint address. Pools must be \ + collected from the pool-address close group, not fabricated off-network." + .into(), + )); + } + + debug!( + "Merkle closeness passed: {matched}/{} candidates matched the DHT's closest peers \ + for pool midpoint {}", + pool.candidate_nodes.len(), + hex::encode(pool_address.0), + ); + Ok(()) + } + /// Verify a merkle batch payment proof. /// /// This verification flow: @@ -495,6 +669,15 @@ impl PaymentVerifier { } } + // Pay-yourself defence: the candidate pub_keys must map to peers the + // live DHT actually considers closest to the pool midpoint. Without + // this, an attacker can point all 16 reward_address fields at a + // self-owned wallet and drain their own payment. Every storing node + // runs this check against the single `winner_pool` in the proof, so a + // forged pool is rejected everywhere it lands. + self.verify_merkle_candidate_closeness(&merkle_proof.winner_pool) + .await?; + // Check pool cache first let cached_info = { let mut pool_cache = self.pool_cache.lock(); diff --git a/tests/e2e/merkle_payment.rs b/tests/e2e/merkle_payment.rs index e9bf6bb7..480164a2 100644 --- a/tests/e2e/merkle_payment.rs +++ b/tests/e2e/merkle_payment.rs @@ -9,8 +9,9 @@ //! - Merkle proof with tampered candidate signatures rejected //! - Merkle proof construction, serialization, and size validation //! - Concurrent merkle proof verification across multiple nodes +//! - Pay-yourself attack rejected (fabricated candidate pool) -#![allow(clippy::unwrap_used, clippy::expect_used)] +#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] use super::harness::TestHarness; use super::testnet::TestNetworkConfig; @@ -25,7 +26,7 @@ use ant_node::payment::{ use evmlib::common::Amount; use evmlib::merkle_payments::{ MerklePaymentCandidateNode, MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree, - CANDIDATES_PER_POOL, + OnChainPaymentInfo, CANDIDATES_PER_POOL, }; use evmlib::testnet::Testnet; use evmlib::RewardsAddress; @@ -473,3 +474,118 @@ async fn test_attack_merkle_proof_cross_address_replay() -> Result<(), Box Result<(), Box> +{ + info!("MERKLE ATTACK TEST: pay-yourself via fabricated candidate pool"); + + // Minimal testnet (5 nodes) — attacker-generated candidate pub_keys hash + // to PeerIds uniformly across the 2^256 ID space and will not match any + // of these 5 nodes' PeerIds with any meaningful probability. + let config = TestNetworkConfig::minimal(); + let harness = TestHarness::setup_with_config(config).await?; + // Let routing tables settle so find_closest_nodes_network returns a + // stable set. + sleep(Duration::from_secs(5)).await; + + let proof = build_valid_merkle_proof(); + + // Deserialize to compute the pool hash, then pre-populate the storing + // node's pool cache so the on-chain lookup is bypassed. The fake + // `OnChainPaymentInfo` is crafted so the proof would pass every OTHER + // check (address-proof depth, per-node payment formula, paid-node + // addresses). That way the ONLY check capable of rejecting the proof is + // the pay-yourself closeness check — giving the test a clean signal. + let parsed = ant_node::payment::deserialize_merkle_proof(&proof.tagged_proof) + .expect("deserialize forged proof"); + let pool_hash = parsed.winner_pool_hash(); + let ts = parsed + .winner_pool + .candidate_nodes + .first() + .expect("pool has candidates") + .merkle_payment_timestamp; + // 4-leaf tree → depth 2 (log2(4)). Prices are all 1024 in + // build_candidate_nodes, so per-node payment = 1024 * 2^2 / 2 = 2048. + let depth: u8 = 2; + let per_node = Amount::from(4096u64); + let paid_node_addresses = vec![ + (RewardsAddress::new([0u8; 20]), 0usize, per_node), + (RewardsAddress::new([1u8; 20]), 1usize, per_node), + ]; + let fake_info = OnChainPaymentInfo { + depth, + merkle_payment_timestamp: ts, + paid_node_addresses, + }; + + let node = harness.test_node(0).ok_or("no test node 0")?; + let protocol = node + .ant_protocol + .as_ref() + .ok_or("no ant_protocol on node")?; + protocol + .payment_verifier() + .pool_cache_insert(pool_hash, fake_info); + + let request = ChunkPutRequest::with_payment( + proof.target.address.0, + proof.target.content.clone(), + proof.tagged_proof, + ); + + let response = send_put_to_node(&harness, 0, request) + .await + .map_err(|e| format!("Send failed: {e}"))?; + + // Extract the error message so the test can report WHY verification + // passed or failed. The pay-yourself defence is the ONLY layer capable of + // rejecting this proof once on-chain lookup is bypassed — if the response + // is a Success or an unrelated error, the attack works. + match response.body { + ChunkMessageBody::PutResponse(ChunkPutResponse::Error(ProtocolError::PaymentFailed( + ref msg, + ))) if msg.contains("closest peers") + || msg.contains("closeness") + || msg.contains("authoritative network") => + { + info!("Correctly rejected pay-yourself attack: {msg}"); + } + ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) => { + panic!( + "Pay-yourself attack SUCCEEDED: verifier accepted a pool of 16 attacker-generated \ + candidates all rewarding the payer. Merkle closeness defence is missing." + ); + } + other => { + panic!( + "Pay-yourself attack was rejected for the WRONG reason — expected a closeness \ + rejection, got: {other:?}. The closeness defence is missing; the proof only \ + failed because of an unrelated check." + ); + } + } + + harness.teardown().await?; + Ok(()) +} diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index 40ba7473..a452fdda 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -1176,6 +1176,11 @@ impl TestNetwork { // Start protocol handler that routes incoming P2P messages to AntProtocol if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { + // Wire the P2PNode into the payment verifier so merkle-payment + // verification can run the pay-yourself closeness check against + // the live DHT. + protocol.payment_verifier().attach_p2p_node(Arc::clone(p2p)); + let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); let protocol_clone = Arc::clone(protocol); From e385b7f2ef65be67ec196f45d348f6c88b866ff2 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 22 Apr 2026 17:12:08 +0900 Subject: [PATCH 2/4] harden(payment): fail-closed, tighter threshold, single-flight cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Security review flagged three issues with the initial closeness check: 1. **Fail-open on missing P2PNode** was unsafe in production: any startup path that forgot to call attach_p2p_node would silently disable the defence and only log a warning. Fail CLOSED in release builds — the warn->error and Ok->Err flip surfaces the wiring bug as a rejected PUT rather than a silent accept. Test-utils builds keep fail-open so unit tests don't need a real DHT. 2. **9/16 majority was too permissive**: an attacker controlling 7 neighbourhood peers could fabricate the remaining 7 and still pass. Raise to 13/16 — tolerates normal routing-table skew (1-3 peers) without letting an attacker plant 7 fabricated candidates. 3. **Per-chunk Kademlia lookups were a DoS amplifier**: a 256-chunk batch sharing one winner_pool would trigger 256 iterative lookups, and N concurrent forged-pool PUTs for the same hash would trigger N parallel lookups. Add: - closeness_pass_cache: pool_hash -> () so within-batch repeats and cross-connection replays are free after the first check. - inflight_closeness: pool_hash -> Notify so concurrent PUTs for the same pool coalesce to a single lookup behind a drop-guarded waker. Also bump the per-lookup timeout from 10s to 60s. Iterative Kademlia lookups with dial cascades of 20-30s per unresponsive peer were timing out under normal churn at 10s — false-rejecting honest proofs. 60s keeps DoS bounded while being wide enough for real-world convergence. Document the known Sybil-grinding limitation: midpoint_proof.address() is BLAKE3 of attacker-controllable inputs, so an attacker who also runs Sybil DHT nodes can grind it to land in their own neighbourhood. Closing that gap needs a Sybil-resistance layer or an attacker-uncontrolled midpoint binding (on-chain VRF / block hash). --- src/payment/verifier.rs | 185 ++++++++++++++++++++++++++++++++++------ 1 file changed, 161 insertions(+), 24 deletions(-) diff --git a/src/payment/verifier.rs b/src/payment/verifier.rs index 6aba03fa..fa8ec8a1 100644 --- a/src/payment/verifier.rs +++ b/src/payment/verifier.rs @@ -121,6 +121,15 @@ pub struct PaymentVerifier { cache: VerifiedCache, /// LRU cache of verified merkle pool hashes → on-chain payment info. pool_cache: Mutex>, + /// LRU cache of pool hashes whose candidate closeness has already been + /// verified by this node. Collapses the per-chunk Kademlia lookup cost + /// within a batch (256 chunks × 1 pool = 1 lookup instead of 256). + closeness_pass_cache: Mutex>, + /// In-flight closeness lookups, keyed by pool hash. Lets concurrent PUTs + /// for the same pool coalesce onto a single Kademlia lookup, which bounds + /// `DoS` amplification: `N` concurrent forged-pool PUTs cost at most one + /// lookup, not `N`. + inflight_closeness: Mutex>>, /// P2P node handle, attached post-construction so merkle verification can /// check that candidate `pub_keys` map to peers actually close to the pool /// midpoint in the live DHT. `None` in unit tests that don't exercise @@ -130,6 +139,23 @@ pub struct PaymentVerifier { config: PaymentVerifierConfig, } +/// Drop guard that clears the inflight-closeness slot and wakes all waiters +/// when the leader of a pool-hash verification finishes (success, failure, +/// panic, or early return all run the guard). +struct InflightGuard<'a> { + slot: &'a Mutex>>, + pool_hash: PoolHash, +} + +impl Drop for InflightGuard<'_> { + fn drop(&mut self) { + let mut slot = self.slot.lock(); + if let Some(notify) = slot.pop(&self.pool_hash) { + notify.notify_waiters(); + } + } +} + impl PaymentVerifier { /// Create a new payment verifier. #[must_use] @@ -142,6 +168,8 @@ impl PaymentVerifier { let pool_cache_size = NonZeroUsize::new(DEFAULT_POOL_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN); let pool_cache = Mutex::new(LruCache::new(pool_cache_size)); + let closeness_pass_cache = Mutex::new(LruCache::new(pool_cache_size)); + let inflight_closeness = Mutex::new(LruCache::new(pool_cache_size)); let cache_capacity = config.cache_capacity; info!("Payment verifier initialized (cache_capacity={cache_capacity}, evm=always-on, pool_cache={DEFAULT_POOL_CACHE_CAPACITY})"); @@ -149,6 +177,8 @@ impl PaymentVerifier { Self { cache, pool_cache, + closeness_pass_cache, + inflight_closeness, p2p_node: RwLock::new(None), config, } @@ -159,10 +189,12 @@ impl PaymentVerifier { /// pool midpoint. /// /// Production startup MUST call this once the `P2PNode` exists. Without - /// it, the closeness check fails open (logs a warning and skips) so that - /// unit tests which never exercise merkle verification still work. + /// it, the closeness check fails CLOSED in release builds (rejects the + /// PUT with a visible error) and fails open in test builds. Idempotent: + /// calling twice replaces the handle. pub fn attach_p2p_node(&self, node: Arc) { *self.p2p_node.write() = Some(node); + debug!("PaymentVerifier: P2PNode attached for merkle closeness checks"); } /// Check if payment is required for the given `XorName`. @@ -488,19 +520,31 @@ impl PaymentVerifier { /// Minimum number of candidate `pub_keys` (out of 16) whose derived `PeerId` /// must match the DHT's actual closest peers to the pool midpoint address. /// - /// Set to a majority rather than 16/16 so routing-table skew between the - /// payer's view and this node's view is tolerated. 9/16 absorbs small - /// divergence while still biting on fabricated pools: BLAKE3 of a random - /// ML-DSA key lands uniformly in the 2^256 ID space, so an attacker - /// without real peer identities cannot plant 9 `pub_keys` into the exact - /// close group of an arbitrary target address. - const CANDIDATE_CLOSENESS_REQUIRED: usize = 9; + /// Set below 16/16 to absorb normal routing-table skew between the + /// payer's view and this node's view — on a well-connected network the + /// divergence between two nodes' closest-set views is typically 1-2 + /// peers, occasionally 3 during churn. 13/16 tolerates 3 divergent + /// peers while still limiting how many candidates an attacker can + /// fabricate before the check bites. A lower threshold (e.g. 9/16) + /// would let an attacker who controls 7 real neighbourhood peers plant + /// 7 fabricated candidates and still pass. + /// + /// This is the pure "fabricated key" defence; it does not stop an + /// attacker who can grind the pool midpoint address to land near 13 + /// pre-chosen keys AND run those keys as Sybil DHT participants. That + /// requires an orthogonal Sybil-resistance layer and is out of scope + /// for this check. + const CANDIDATE_CLOSENESS_REQUIRED: usize = 13; /// Timeout for the authoritative network lookup used by the closeness - /// check. A forged pool can trigger exactly one bounded Kademlia lookup; - /// subsequent chunks from the same batch hit the pool cache and pay no - /// extra cost. - const CLOSENESS_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + /// check. + /// + /// Iterative Kademlia lookups can cascade through up to 20 iterations, + /// and a single unresponsive peer's dial can take 20-30s before timing + /// out. 60s leaves room for the lookup to converge even under churn + /// while still capping `DoS` amplification at roughly one bounded lookup + /// per forged `pool_hash`. + const CLOSENESS_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); /// Verify that the candidate pool's `pub_keys` correspond to peers that /// are actually XOR-closest to the pool midpoint address, by querying @@ -526,23 +570,114 @@ impl PaymentVerifier { /// node that receives the proof independently re-runs this check against /// that same pool, so a forged pool is rejected at every node it /// reaches. + /// + /// **Known limitation — Sybil-grinding**: `midpoint_proof.address()` is a + /// BLAKE3 hash of attacker-controllable inputs (leaf bytes, tree root, + /// timestamp). A determined attacker who *also* runs Sybil DHT nodes can + /// grind the midpoint until it lands in a region where 13 of their + /// Sybil keys are the true network-closest — at which point this check + /// passes for the attacker. Closing that gap requires binding the + /// midpoint to an attacker-uncontrolled value (e.g. a block hash at + /// payment time or an on-chain VRF) or a Sybil-resistant identity + /// layer. This defence raises the attack cost from "free" to "run N + /// Sybil nodes AND grind", which is a meaningful but not complete + /// improvement. async fn verify_merkle_candidate_closeness( &self, pool: &evmlib::merkle_payments::MerklePaymentCandidatePool, + pool_hash: PoolHash, + ) -> Result<()> { + // Fast path: this node already verified closeness for this pool. + // A batch of 256 chunks shares one winner_pool, so without this cache + // we'd pay a Kademlia lookup per chunk. + { + let mut cache = self.closeness_pass_cache.lock(); + if cache.get(&pool_hash).is_some() { + return Ok(()); + } + } + + // Single-flight: if another task is already verifying this pool, + // wait on its completion and re-check the cache. Collapses a + // concurrent storm of forged-pool PUTs to at most one live Kademlia + // lookup per unique pool_hash. + let wait_for = { + let mut inflight = self.inflight_closeness.lock(); + let existing = inflight.get(&pool_hash).map(Arc::clone); + if existing.is_none() { + let notify = Arc::new(tokio::sync::Notify::new()); + inflight.put(pool_hash, notify); + } + existing + }; + + if let Some(notify) = wait_for { + notify.notified().await; + // Leader finished. Re-check the pass cache: hit = leader passed; + // miss = leader failed or cache pressure evicted. Fall through on + // miss and run our own verification (rare; bounded because the + // inflight slot was cleared when the leader completed). + let mut cache = self.closeness_pass_cache.lock(); + if cache.get(&pool_hash).is_some() { + return Ok(()); + } + } + + // We are the leader for this pool_hash. Wake waiters and clear the + // inflight slot on all exits via a drop guard. + let _guard = InflightGuard { + slot: &self.inflight_closeness, + pool_hash, + }; + + let result = self.verify_merkle_candidate_closeness_inner(pool).await; + if result.is_ok() { + self.closeness_pass_cache.lock().put(pool_hash, ()); + } + result + } + + /// Inner closeness check: the actual DHT lookup + set-membership test. + /// Wrapped by [`verify_merkle_candidate_closeness`] with a pass-cache and + /// single-flight guard so a batch of chunks and a storm of forged PUTs + /// don't multiply the lookup cost. + async fn verify_merkle_candidate_closeness_inner( + &self, + pool: &evmlib::merkle_payments::MerklePaymentCandidatePool, ) -> Result<()> { // Release the RwLock guard before any await to avoid holding it // across an iterative Kademlia lookup. let attached = self.p2p_node.read().as_ref().map(Arc::clone); let Some(p2p_node) = attached else { - // Production startup must call attach_p2p_node. The fail-open - // path is only reachable in unit-test setups that construct a - // PaymentVerifier directly without exercising merkle checks. - crate::logging::warn!( - "PaymentVerifier: no P2PNode attached; merkle pay-yourself \ - defence DISABLED. Call PaymentVerifier::attach_p2p_node \ - during node startup." - ); - return Ok(()); + // Production must call attach_p2p_node at startup. Fail CLOSED + // to avoid silently disabling the defence if a startup path + // regresses and loses the attach call. Unit-test builds that + // construct a PaymentVerifier directly without exercising merkle + // verification are opted-in via `test-utils` to fall back to + // fail-open. + #[cfg(any(test, feature = "test-utils"))] + { + crate::logging::warn!( + "PaymentVerifier: no P2PNode attached; merkle pay-yourself \ + defence SKIPPED (test build). Production startup MUST call \ + PaymentVerifier::attach_p2p_node." + ); + return Ok(()); + } + #[cfg(not(any(test, feature = "test-utils")))] + { + crate::logging::error!( + "PaymentVerifier: no P2PNode attached; rejecting merkle \ + payment. This is a node-startup bug — \ + PaymentVerifier::attach_p2p_node must be called before \ + any PUT handler runs." + ); + return Err(Error::Payment( + "Merkle candidate pool rejected: verifier is not wired to \ + the P2P layer; cannot verify candidate closeness." + .into(), + )); + } }; let pool_address = pool.midpoint_proof.address(); @@ -674,8 +809,10 @@ impl PaymentVerifier { // this, an attacker can point all 16 reward_address fields at a // self-owned wallet and drain their own payment. Every storing node // runs this check against the single `winner_pool` in the proof, so a - // forged pool is rejected everywhere it lands. - self.verify_merkle_candidate_closeness(&merkle_proof.winner_pool) + // forged pool is rejected everywhere it lands. The pass cache and + // single-flight keyed on pool_hash collapse the Kademlia lookup cost + // within a batch and across concurrent PUTs for the same pool. + self.verify_merkle_candidate_closeness(&merkle_proof.winner_pool, pool_hash) .await?; // Check pool cache first From 8a7139285278bec32778542b88d351ccf93ebb2b Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 22 Apr 2026 17:26:54 +0900 Subject: [PATCH 3/4] harden(payment): fix lost-wakeup, LRU-eviction, and failure-fanout in single-flight MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second concurrency review flagged three real problems with the single-flight closeness check: 1. **Lost-wakeup race**. `notify.notified().await` creates the Notified future AFTER the inflight lock is released. If the leader calls `notify_waiters()` between the waiter's lock drop and the future's first poll, the notification is lost and the waiter parks forever — there was no outer timeout around the wait. Fixed by switching to `Arc::notified_owned()`, which snapshots the `notify_waiters` counter at call time; the counter check on the first poll resolves the future immediately even if the leader already fired. 2. **LRU eviction orphans waiters**. When `DEFAULT_POOL_CACHE_CAPACITY` (1000) unique pool_hashes are in flight, LruCache evicts the oldest `Notify` entry. The leader's drop guard did `slot.pop(&pool_hash)` which, for an evicted entry, popped nothing and called `notify_waiters` on — nothing. Worse, a subsequent leader for the same pool_hash inserts a fresh `Notify`, which the original leader then erroneously pops. Both failure modes orphan waiters. Fixed by having the leader own an `Arc` independent of the cache and calling `notify_waiters` on THAT; the pop only runs if `Arc::ptr_eq` confirms the cache still holds our slot. 3. **Failure-path waiters each re-run the lookup**. On leader failure the pass cache wasn't written, waiters woke, saw cache miss, and all became new leaders running their own Kademlia lookups. That defeats the documented DoS bound of "N forged PUTs cost at most one lookup". Fixed by sharing the leader's `Result<(), String>` through an `Arc` with a `OnceLock`; waiters read the stored result directly and skip re-verification on both success AND failure. Other changes: - Loud startup error log if a binary is accidentally built with the `test-utils` feature, since that flips the closeness defence to fail-open when P2PNode isn't attached. - Two new unit tests exercise the cache short-circuit and the concurrent-waiter single-flight path. --- src/payment/verifier.rs | 287 +++++++++++++++++++++++++++++++++------- 1 file changed, 237 insertions(+), 50 deletions(-) diff --git a/src/payment/verifier.rs b/src/payment/verifier.rs index fa8ec8a1..59c7b19b 100644 --- a/src/payment/verifier.rs +++ b/src/payment/verifier.rs @@ -126,10 +126,11 @@ pub struct PaymentVerifier { /// within a batch (256 chunks × 1 pool = 1 lookup instead of 256). closeness_pass_cache: Mutex>, /// In-flight closeness lookups, keyed by pool hash. Lets concurrent PUTs - /// for the same pool coalesce onto a single Kademlia lookup, which bounds - /// `DoS` amplification: `N` concurrent forged-pool PUTs cost at most one - /// lookup, not `N`. - inflight_closeness: Mutex>>, + /// for the same pool coalesce onto a single Kademlia lookup AND share + /// its result — on both success and failure — which bounds `DoS` + /// amplification to one lookup per unique `pool_hash` regardless of + /// concurrency. + inflight_closeness: Mutex>>, /// P2P node handle, attached post-construction so merkle verification can /// check that candidate `pub_keys` map to peers actually close to the pool /// midpoint in the live DHT. `None` in unit tests that don't exercise @@ -139,20 +140,79 @@ pub struct PaymentVerifier { config: PaymentVerifierConfig, } -/// Drop guard that clears the inflight-closeness slot and wakes all waiters -/// when the leader of a pool-hash verification finishes (success, failure, -/// panic, or early return all run the guard). +/// Shared state for an inflight closeness verification. The leader publishes +/// its result via the `OnceLock`; waiters read that result directly instead +/// of racing on a cache re-check. Wrapped in an `Arc` and held both by the +/// leader's drop guard and by each waiting task. +struct ClosenessSlot { + notify: Arc, + /// `Some(Ok(()))` on success, `Some(Err(msg))` on failure, `None` if the + /// leader disappeared without publishing (panic, cancellation). + result: std::sync::OnceLock>, +} + +impl ClosenessSlot { + fn new() -> Self { + Self { + notify: Arc::new(tokio::sync::Notify::new()), + result: std::sync::OnceLock::new(), + } + } + + /// Build an owned `Notified` future that snapshots the `notify_waiters` + /// counter at call time. Awaiting this future after dropping external + /// locks is race-free: if `notify_waiters` fires between construction + /// and the first poll, the snapshot mismatch resolves the future + /// immediately. + fn notified_owned(&self) -> tokio::sync::futures::OwnedNotified { + Arc::clone(&self.notify).notified_owned() + } +} + +/// Drop guard that publishes the leader's result, clears the inflight slot, +/// and wakes all waiters. Fires on every exit path: success, failure, panic, +/// future-cancellation. +/// +/// The guard owns its own `Arc` so `notify_waiters` still +/// fires even if LRU pressure evicted the slot before the leader finished. +/// Waiters see the published result via `result.get()`; the `Notify` is only +/// the wake-up signal. struct InflightGuard<'a> { - slot: &'a Mutex>>, + slot_cache: &'a Mutex>>, pool_hash: PoolHash, + slot: Arc, +} + +impl InflightGuard<'_> { + /// Publish the leader's result. Called exactly once by the leader on + /// every successful or explicit-error exit. If dropped without calling + /// (panic, cancellation) the guard still wakes waiters but leaves + /// `result` empty, which waiters treat as a transient failure and retry. + fn publish(&self, result: &Result<()>) { + let stored: std::result::Result<(), String> = match result { + Ok(()) => Ok(()), + Err(e) => Err(e.to_string()), + }; + let _ = self.slot.result.set(stored); + } } impl Drop for InflightGuard<'_> { fn drop(&mut self) { - let mut slot = self.slot.lock(); - if let Some(notify) = slot.pop(&self.pool_hash) { - notify.notify_waiters(); + // Remove the slot entry if it's still ours. A separate leader may + // have inserted a new slot for the same pool_hash after LRU + // eviction — don't pop someone else's entry. + { + let mut cache = self.slot_cache.lock(); + if let Some(existing) = cache.peek(&self.pool_hash) { + if Arc::ptr_eq(existing, &self.slot) { + cache.pop(&self.pool_hash); + } + } } + // Wake every waiter registered against OUR slot, regardless of + // whether the cache entry is still ours. + self.slot.notify.notify_waiters(); } } @@ -174,6 +234,18 @@ impl PaymentVerifier { let cache_capacity = config.cache_capacity; info!("Payment verifier initialized (cache_capacity={cache_capacity}, evm=always-on, pool_cache={DEFAULT_POOL_CACHE_CAPACITY})"); + // Loud warning if a production binary was accidentally built with + // `test-utils`: that feature flips the closeness-check fail-open + // switch, disabling the pay-yourself defence when P2PNode isn't + // attached. Safe in tests, never intended for prod. + #[cfg(feature = "test-utils")] + crate::logging::error!( + "PaymentVerifier: built with `test-utils` feature — merkle closeness \ + defence falls back to fail-open when no P2PNode is attached. This \ + feature is for test binaries only; production nodes must be built \ + without it." + ); + Self { cache, pool_cache, @@ -587,54 +659,85 @@ impl PaymentVerifier { pool: &evmlib::merkle_payments::MerklePaymentCandidatePool, pool_hash: PoolHash, ) -> Result<()> { - // Fast path: this node already verified closeness for this pool. + // Fast path: this node already verified this pool successfully. // A batch of 256 chunks shares one winner_pool, so without this cache // we'd pay a Kademlia lookup per chunk. - { - let mut cache = self.closeness_pass_cache.lock(); - if cache.get(&pool_hash).is_some() { - return Ok(()); - } + if self.closeness_pass_cache.lock().get(&pool_hash).is_some() { + return Ok(()); } - // Single-flight: if another task is already verifying this pool, - // wait on its completion and re-check the cache. Collapses a - // concurrent storm of forged-pool PUTs to at most one live Kademlia - // lookup per unique pool_hash. - let wait_for = { - let mut inflight = self.inflight_closeness.lock(); - let existing = inflight.get(&pool_hash).map(Arc::clone); - if existing.is_none() { - let notify = Arc::new(tokio::sync::Notify::new()); - inflight.put(pool_hash, notify); - } - existing - }; + // Single-flight: on each attempt, either claim leadership by + // inserting a fresh `ClosenessSlot`, or wait on an existing leader + // and read its published result. The leader holds an `Arc` to the + // slot independent of the LruCache so waiters are still woken if + // eviction pressure kicked the cache entry. + // + // The `notified_owned()` future snapshots the `notify_waiters` + // counter at the moment of construction (while we hold the lock), + // which makes the subsequent `.await` race-free: if the leader + // calls `notify_waiters` between our construction and our poll, the + // counter has advanced and the future resolves immediately on first + // poll. + loop { + // Release the mutex guard explicitly before any await below. + // Clippy wants `if let ... else` written as `map_or_else`, but + // any such rewrite re-borrows the locked `inflight` inside the + // closure and fails the borrow checker — so the lint is + // silenced here. + #[allow(clippy::option_if_let_else)] + let (waiter_slot, leader_slot) = { + let mut inflight = self.inflight_closeness.lock(); + let chosen = if let Some(existing) = inflight.get(&pool_hash) { + (Some(Arc::clone(existing)), None) + } else { + let slot = Arc::new(ClosenessSlot::new()); + inflight.put(pool_hash, Arc::clone(&slot)); + (None, Some(slot)) + }; + drop(inflight); + chosen + }; - if let Some(notify) = wait_for { - notify.notified().await; - // Leader finished. Re-check the pass cache: hit = leader passed; - // miss = leader failed or cache pressure evicted. Fall through on - // miss and run our own verification (rare; bounded because the - // inflight slot was cleared when the leader completed). - let mut cache = self.closeness_pass_cache.lock(); - if cache.get(&pool_hash).is_some() { - return Ok(()); + if let Some(slot) = waiter_slot { + // Build the owned-notified future BEFORE awaiting, so it + // snapshots the `notify_waiters` counter now. The slot + // already existed when we locked, so the leader is either + // running or finished; in both cases the snapshot + counter + // check ensures we wake up correctly. + let notified = slot.notified_owned(); + notified.await; + + // Leader published a result — use it directly. + if let Some(result) = slot.result.get() { + return result.clone().map_err(Error::Payment); + } + // Leader disappeared without publishing (panic or + // cancellation). Slot was cleared by the leader's drop + // guard; loop to become the new leader. + continue; } - } - // We are the leader for this pool_hash. Wake waiters and clear the - // inflight slot on all exits via a drop guard. - let _guard = InflightGuard { - slot: &self.inflight_closeness, - pool_hash, - }; + // Leader path. Drop guard clears the slot and wakes waiters on + // every exit (success, failure, panic, cancellation). + let Some(slot) = leader_slot else { + // Unreachable by construction. + return Err(Error::Payment( + "internal error: neither leader nor waiter in closeness check".into(), + )); + }; + let guard = InflightGuard { + slot_cache: &self.inflight_closeness, + pool_hash, + slot, + }; - let result = self.verify_merkle_candidate_closeness_inner(pool).await; - if result.is_ok() { - self.closeness_pass_cache.lock().put(pool_hash, ()); + let result = self.verify_merkle_candidate_closeness_inner(pool).await; + guard.publish(&result); + if result.is_ok() { + self.closeness_pass_cache.lock().put(pool_hash, ()); + } + return result; } - result } /// Inner closeness check: the actual DHT lookup + set-membership test. @@ -1004,6 +1107,7 @@ impl PaymentVerifier { #[allow(clippy::expect_used)] mod tests { use super::*; + use evmlib::merkle_payments::MerklePaymentCandidatePool; /// Create a verifier for unit tests. EVM is always on, but tests can /// pre-populate the cache to bypass on-chain verification. @@ -1746,6 +1850,89 @@ mod tests { } } + #[tokio::test] + async fn closeness_pass_cache_short_circuits_second_call() { + // When a pool_hash is in the closeness_pass_cache, the outer + // verify_merkle_candidate_closeness must return Ok(()) without + // running the inner lookup — even if no P2PNode is attached. + // That second half (no-p2p → would normally fail-closed in release) + // is the proof the cache short-circuit ran first. + let verifier = create_test_verifier(); + let pool_hash = [0xAAu8; 32]; + verifier.closeness_pass_cache.lock().put(pool_hash, ()); + + // Construct a dummy pool — contents don't matter because the cache + // hit means we never look at them. + let pool = MerklePaymentCandidatePool { + midpoint_proof: fake_midpoint_proof(), + candidate_nodes: make_candidate_nodes(1_700_000_000), + }; + + let result = verifier + .verify_merkle_candidate_closeness(&pool, pool_hash) + .await; + assert!( + result.is_ok(), + "cached pool hash must bypass the inner check and return Ok(()), got: {result:?}" + ); + } + + #[tokio::test] + async fn closeness_single_flight_concurrent_readers_share_one_verification() { + // Two concurrent callers for the same pool_hash should produce the + // same outcome, and the cache should end up populated exactly once. + // We use the test-utils fail-open path to short-circuit the inner + // DHT lookup; the purpose of this test is the single-flight + // plumbing, not the lookup itself. + let verifier = Arc::new(create_test_verifier()); + let pool_hash = [0x77u8; 32]; + let pool = MerklePaymentCandidatePool { + midpoint_proof: fake_midpoint_proof(), + candidate_nodes: make_candidate_nodes(1_700_000_000), + }; + + let v1 = Arc::clone(&verifier); + let p1 = pool.clone(); + let v2 = Arc::clone(&verifier); + let p2 = pool.clone(); + + let (r1, r2) = tokio::join!( + async move { v1.verify_merkle_candidate_closeness(&p1, pool_hash).await }, + async move { v2.verify_merkle_candidate_closeness(&p2, pool_hash).await }, + ); + + assert_eq!(r1.is_ok(), r2.is_ok(), "concurrent callers must agree"); + assert!( + r1.is_ok(), + "both callers must succeed on the test-utils path" + ); + assert!( + verifier + .closeness_pass_cache + .lock() + .get(&pool_hash) + .is_some(), + "success path must populate the pass cache" + ); + assert!( + verifier.inflight_closeness.lock().get(&pool_hash).is_none(), + "inflight slot must be cleared after the leader finishes" + ); + } + + /// Build a deterministic but otherwise-unused `MidpointProof` so unit + /// tests can construct a `MerklePaymentCandidatePool` without spinning + /// up a real merkle tree. The closeness path only calls `.address()` + /// on it, which is a pure BLAKE3 of the branch's leaf/root/timestamp — + /// the values don't need to be tree-valid for these tests. + fn fake_midpoint_proof() -> evmlib::merkle_payments::MidpointProof { + // Build a minimal tree of two leaves so we get a real branch. + let leaves = vec![xor_name::XorName([1u8; 32]), xor_name::XorName([2u8; 32])]; + let tree = evmlib::merkle_payments::MerkleTree::from_xornames(leaves).expect("tree"); + let candidates = tree.reward_candidates(1_700_000_000).expect("candidates"); + candidates.first().expect("at least one").clone() + } + // ========================================================================= // Merkle verification unit tests // ========================================================================= From 8f69ec654100f0b239f51d5b36f48ef962478aa0 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 22 Apr 2026 17:31:30 +0900 Subject: [PATCH 4/4] harden(payment): bound leader retries and add a real waiter-path test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Third review pass found two remaining issues: 1. **Unbounded retry loop on leader cancellation cascade**. The loop retries forever if every leader gets cancelled before publishing. Add `MAX_LEADER_RETRIES = 4` so waiters return a visible error rather than spinning through indefinite cancellation cycles. 2. **Single-flight test didn't actually exercise the waiter path**. The existing concurrent-readers test relies on timing and might run both calls serially through the leader path. Add `closeness_waiter_reads_leaders_published_failure`, which seeds the inflight slot, spawns a waiter task, yields until the waiter is parked on `notified_owned().await`, then externally publishes a failure + notifies — proving the waiter reads the leader's published result directly without running its own inner check. --- src/payment/verifier.rs | 82 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 2 deletions(-) diff --git a/src/payment/verifier.rs b/src/payment/verifier.rs index 59c7b19b..26f45212 100644 --- a/src/payment/verifier.rs +++ b/src/payment/verifier.rs @@ -618,6 +618,12 @@ impl PaymentVerifier { /// per forged `pool_hash`. const CLOSENESS_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); + /// Maximum waiter → leader retries when the leader's future was cancelled + /// or panicked before publishing a result. Beyond this the waiter returns + /// a visible error rather than spinning indefinitely through a + /// cancellation cascade. + const MAX_LEADER_RETRIES: usize = 4; + /// Verify that the candidate pool's `pub_keys` correspond to peers that /// are actually XOR-closest to the pool midpoint address, by querying /// the DHT for its closest peers to that address and requiring that a @@ -678,7 +684,12 @@ impl PaymentVerifier { // calls `notify_waiters` between our construction and our poll, the // counter has advanced and the future resolves immediately on first // poll. - loop { + // + // Bounded retry: if we're a waiter and the leader gets cancelled or + // panics (slot.result.get() == None after wake-up), we loop back to + // claim leadership. `MAX_LEADER_RETRIES` bounds the attempts so + // adversarial cancellation cascades cannot spin this indefinitely. + for attempt in 0..=Self::MAX_LEADER_RETRIES { // Release the mutex guard explicitly before any await below. // Clippy wants `if let ... else` written as `map_or_else`, but // any such rewrite re-borrows the locked `inflight` inside the @@ -713,7 +724,16 @@ impl PaymentVerifier { } // Leader disappeared without publishing (panic or // cancellation). Slot was cleared by the leader's drop - // guard; loop to become the new leader. + // guard; loop to become the new leader — unless we've + // hit the retry bound (see MAX_LEADER_RETRIES). + if attempt == Self::MAX_LEADER_RETRIES { + return Err(Error::Payment( + "Merkle candidate pool rejected: closeness leader \ + repeatedly failed to publish a result (likely \ + repeated cancellation or panic)." + .into(), + )); + } continue; } @@ -738,6 +758,13 @@ impl PaymentVerifier { } return result; } + // Unreachable: the for-loop body always either `return`s or `continue`s, + // and the waiter branch's `continue` only runs when `attempt < + // Self::MAX_LEADER_RETRIES`. The last iteration's waiter branch returns + // via the retry-bound check; the leader branch always returns. + Err(Error::Payment( + "internal error: closeness retry loop exited without returning".into(), + )) } /// Inner closeness check: the actual DHT lookup + set-membership test. @@ -1920,6 +1947,57 @@ mod tests { ); } + #[tokio::test] + async fn closeness_waiter_reads_leaders_published_failure() { + // Prove the waiter path actually surfaces a failure published by a + // concurrent leader, without running its own inner check. Insert a + // slot, spawn a waiter (which will park on notified_owned), then + // publish failure + notify from the outside — simulating what the + // leader's `publish` + drop-guard pair does. + let verifier = Arc::new(create_test_verifier()); + let pool_hash = [0x55u8; 32]; + let slot = Arc::new(ClosenessSlot::new()); + verifier + .inflight_closeness + .lock() + .put(pool_hash, Arc::clone(&slot)); + + let pool = MerklePaymentCandidatePool { + midpoint_proof: fake_midpoint_proof(), + candidate_nodes: make_candidate_nodes(1_700_000_000), + }; + + let verifier_c = Arc::clone(&verifier); + let pool_c = pool.clone(); + let waiter = tokio::spawn(async move { + verifier_c + .verify_merkle_candidate_closeness(&pool_c, pool_hash) + .await + }); + + // Yield so the waiter can run up to its `notified_owned().await`. + // A few yields cover both single-threaded and multi-threaded tokio + // runtimes regardless of scheduling. + for _ in 0..5 { + tokio::task::yield_now().await; + } + + // Simulate the leader's `publish` + drop-guard: publish the result, + // clear the slot, wake waiters. + slot.result + .set(Err("forged pool: not close enough".to_string())) + .expect("set once"); + verifier.inflight_closeness.lock().pop(&pool_hash); + slot.notify.notify_waiters(); + + let result = waiter.await.expect("task panicked"); + let err = result.expect_err("waiter must return the leader's published failure"); + assert!( + err.to_string().contains("forged pool"), + "waiter must surface the leader's error message, got: {err}" + ); + } + /// Build a deterministic but otherwise-unused `MidpointProof` so unit /// tests can construct a `MerklePaymentCandidatePool` without spinning /// up a real merkle tree. The closeness path only calls `.address()`