diff --git a/Cargo.lock b/Cargo.lock index eb3ed33..cf86812 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -869,6 +869,7 @@ dependencies = [ "rand 0.8.6", "reqwest 0.12.28", "rmp-serde", + "saorsa-core", "saorsa-pqc 0.5.1", "self-replace", "self_encryption", diff --git a/ant-core/Cargo.toml b/ant-core/Cargo.toml index e15abd7..dc910b8 100644 --- a/ant-core/Cargo.toml +++ b/ant-core/Cargo.toml @@ -57,6 +57,11 @@ serial_test = "3" anyhow = "1" alloy = { version = "1.6", features = ["node-bindings"] } tokio-test = "0.4" +# Direct access to BootstrapManager used by the cold-start-from-disk test, +# which populates a cache via `add_peer_trusted` (bypasses Sybil rate limits) +# and then verifies reload after save. Version tracks ant-node's transitive +# saorsa-core dep. +saorsa-core = "0.23" [[example]] name = "start-local-devnet" diff --git a/ant-core/src/data/client/chunk.rs b/ant-core/src/data/client/chunk.rs index 592e6a0..46174e1 100644 --- a/ant-core/src/data/client/chunk.rs +++ b/ant-core/src/data/client/chunk.rs @@ -3,6 +3,7 @@ //! Chunks are immutable, content-addressed data blocks where the address //! is the BLAKE3 hash of the content. +use crate::data::client::peer_cache::record_peer_outcome; use crate::data::client::Client; use crate::data::error::{Error, Result}; use ant_node::ant_protocol::{ @@ -15,7 +16,7 @@ use ant_node::CLOSE_GROUP_MAJORITY; use bytes::Bytes; use futures::stream::{FuturesUnordered, StreamExt}; use std::future::Future; -use std::time::Duration; +use std::time::{Duration, Instant}; use tracing::{debug, warn}; /// Data type identifier for chunks (used in quote requests). @@ -171,7 +172,7 @@ impl Client { let addr_hex = hex::encode(address); let timeout_secs = self.config().store_timeout_secs; - send_and_await_chunk_response( + let result = send_and_await_chunk_response( node, target_peer, message_bytes, @@ -204,7 +205,15 @@ impl Client { )) }, ) - .await + .await; + + // No RTT recorded on the PUT path: the wall-clock is dominated by + // the ~4 MB payload upload, which reflects the uploader's uplink + // rather than the peer's responsiveness. Quote-path and GET-path + // RTTs still feed quality scoring. + record_peer_outcome(node, *target_peer, peer_addrs, result.is_ok(), None).await; + + result } /// Retrieve a chunk from the Autonomi network. @@ -278,7 +287,8 @@ impl Client { let addr_hex = hex::encode(address); let timeout_secs = self.config().store_timeout_secs; - send_and_await_chunk_response( + let start = Instant::now(); + let result = send_and_await_chunk_response( node, peer, message_bytes, @@ -325,7 +335,13 @@ impl Client { )) }, ) - .await + .await; + + let success = result.is_ok(); + let rtt_ms = success.then(|| start.elapsed().as_millis() as u64); + record_peer_outcome(node, *peer, peer_addrs, success, rtt_ms).await; + + result } /// Check if a chunk exists on the network. diff --git a/ant-core/src/data/client/mod.rs b/ant-core/src/data/client/mod.rs index ccce50d..5f0b67d 100644 --- a/ant-core/src/data/client/mod.rs +++ b/ant-core/src/data/client/mod.rs @@ -10,6 +10,7 @@ pub mod data; pub mod file; pub mod merkle; pub mod payment; +pub(crate) mod peer_cache; pub mod quote; use crate::data::client::cache::ChunkCache; diff --git a/ant-core/src/data/client/peer_cache.rs b/ant-core/src/data/client/peer_cache.rs new file mode 100644 index 0000000..dc67aa0 --- /dev/null +++ b/ant-core/src/data/client/peer_cache.rs @@ -0,0 +1,137 @@ +//! Bootstrap-cache population helpers. +//! +//! Wires client-side peer contacts into saorsa-core's `BootstrapManager` +//! so the persistent cache reflects real peer quality across sessions. + +use ant_node::core::{MultiAddr, P2PNode, PeerId}; +use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; +use tracing::debug; + +/// Feed a peer contact outcome into the `BootstrapManager` cache so future +/// cold-starts can rank peers by observed latency and success. +/// +/// `success = true`: upserts the peer via `add_discovered_peer` (subject to +/// saorsa-core Sybil checks — rate limit + IP diversity) and records RTT via +/// `update_peer_metrics`. +/// +/// `success = false`: only updates the quality score of peers already in +/// the cache. Unreachable peers are never inserted. +/// +/// Both upstream calls silently discard errors — peer-cache bookkeeping +/// must never abort a user operation. Enable the `saorsa_core::bootstrap` +/// tracing target to see rejection reasons. +pub(crate) async fn record_peer_outcome( + node: &Arc, + peer_id: PeerId, + addrs: &[MultiAddr], + success: bool, + rtt_ms: Option, +) { + if success { + let before = node.cached_peer_count().await; + let _ = node.add_discovered_peer(peer_id, addrs.to_vec()).await; + let after = node.cached_peer_count().await; + if after > before { + debug!("Bootstrap cache grew: {before} -> {after} peers"); + } + } + if let Some(primary) = select_primary_multiaddr(addrs) { + let _ = node + .update_peer_metrics(primary, success, rtt_ms, None) + .await; + } +} + +/// Pick the `MultiAddr` to use as the peer's cache key. +/// +/// Prefers a globally routable socket address over RFC1918 / link-local / +/// loopback. Without this, a peer advertising `[10.0.0.5, 203.0.113.1]` +/// would be keyed under the RFC1918 address, so metrics recorded during +/// a contact over the public address would land on a stale cache entry. +/// Falls back to any socket-addressable `MultiAddr` if none look global. +fn select_primary_multiaddr(addrs: &[MultiAddr]) -> Option<&MultiAddr> { + addrs + .iter() + .find(|a| a.socket_addr().is_some_and(|sa| is_globally_routable(&sa))) + .or_else(|| addrs.iter().find(|a| a.socket_addr().is_some())) +} + +fn is_globally_routable(addr: &SocketAddr) -> bool { + match addr.ip() { + IpAddr::V4(v4) => { + !v4.is_private() + && !v4.is_loopback() + && !v4.is_link_local() + && !v4.is_broadcast() + && !v4.is_documentation() + && !v4.is_unspecified() + } + IpAddr::V6(v6) => { + // Full Ipv6Addr::is_global is unstable; this is the practical + // subset that mirrors the IPv4 checks above. + !v6.is_loopback() + && !v6.is_unspecified() + && !v6.is_multicast() + && !v6.segments()[0].eq(&0xfe80) // link-local fe80::/10 (approx) + && !matches!(v6.segments()[0] & 0xfe00, 0xfc00) // unique-local fc00::/7 + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::{Ipv4Addr, Ipv6Addr}; + + #[test] + fn globally_routable_v4() { + // 8.8.8.8 (Google DNS) — genuinely public, not in any reserved range. + assert!(is_globally_routable(&SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), + 80 + ))); + assert!(!is_globally_routable(&SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(10, 0, 0, 5)), + 80 + ))); + assert!(!is_globally_routable(&SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + 80 + ))); + assert!(!is_globally_routable(&SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), + 80 + ))); + // 203.0.113.0/24 is TEST-NET-3 documentation — rejected by + // `is_documentation()`, which is the behaviour we want: quality + // metrics should not land on addresses that are never dialed in + // production by spec. + assert!(!is_globally_routable(&SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(203, 0, 113, 1)), + 80 + ))); + } + + #[test] + fn globally_routable_v6() { + // 2606:4700:4700::1111 (Cloudflare DNS) — a real public v6 outside + // the `2001:db8::/32` documentation prefix. + assert!(is_globally_routable(&SocketAddr::new( + IpAddr::V6(Ipv6Addr::new(0x2606, 0x4700, 0x4700, 0, 0, 0, 0, 0x1111)), + 80 + ))); + assert!(!is_globally_routable(&SocketAddr::new( + IpAddr::V6(Ipv6Addr::LOCALHOST), + 80 + ))); + assert!(!is_globally_routable(&SocketAddr::new( + IpAddr::V6(Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 1)), + 80 + ))); + assert!(!is_globally_routable(&SocketAddr::new( + IpAddr::V6(Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 1)), + 80 + ))); + } +} diff --git a/ant-core/src/data/client/quote.rs b/ant-core/src/data/client/quote.rs index 3e13626..4523e91 100644 --- a/ant-core/src/data/client/quote.rs +++ b/ant-core/src/data/client/quote.rs @@ -3,6 +3,7 @@ //! Handles requesting storage quotes from network nodes and //! managing payment for data storage. +use crate::data::client::peer_cache::record_peer_outcome; use crate::data::client::Client; use crate::data::error::{Error, Result}; use ant_node::ant_protocol::{ @@ -14,7 +15,7 @@ use ant_node::{CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE}; use evmlib::common::Amount; use evmlib::PaymentQuote; use futures::stream::{FuturesUnordered, StreamExt}; -use std::time::Duration; +use std::time::{Duration, Instant}; use tracing::{debug, warn}; /// Compute XOR distance between a peer's ID bytes and a target address. @@ -106,6 +107,7 @@ impl Client { let node_clone = node.clone(); let quote_future = async move { + let start = Instant::now(); let result = send_and_await_chunk_response( &node_clone, &peer_id_clone, @@ -147,6 +149,11 @@ impl Client { ) .await; + let success = result.is_ok(); + let rtt_ms = success.then(|| start.elapsed().as_millis() as u64); + record_peer_outcome(&node_clone, peer_id_clone, &addrs_clone, success, rtt_ms) + .await; + (peer_id_clone, addrs_clone, result) }; diff --git a/ant-core/tests/e2e_bootstrap_cache.rs b/ant-core/tests/e2e_bootstrap_cache.rs new file mode 100644 index 0000000..80303ef --- /dev/null +++ b/ant-core/tests/e2e_bootstrap_cache.rs @@ -0,0 +1,143 @@ +//! E2E tests for BootstrapManager cache population from real peer interactions. +//! +//! Proves that client-side uploads and downloads feed the BootstrapManager +//! cache via `add_discovered_peer` + `update_peer_metrics`, so that subsequent +//! cold-starts can load quality-scored peers beyond the bundled bootstrap set. +//! +//! ## Why the assertion is "cache grew", not "cache >= 10" +//! +//! saorsa-core gates `add_peer` through two independent Sybil mechanisms: +//! +//! 1. `BootstrapIpLimiter::can_accept` — the IP-diversity limiter. When the +//! node is built with `allow_loopback = true` (as `MiniTestnet` does), +//! this returns early for loopback IPs, so it is NOT the bottleneck here. +//! 2. `JoinRateLimiter::check_join_allowed` — the temporal rate limiter. +//! Defaults cap inserts at 3 per /24 subnet per hour and are NOT exempt +//! for loopback (`saorsa-core/src/rate_limit.rs:254` has no `is_loopback` +//! branch). All testnet nodes bind to `127.0.0.1`, so all ~11 available +//! peers fall in the single `127.0.0.0/24` bucket — the first 3 land in +//! the cache, the rest are rejected with `Subnet24LimitExceeded`. +//! +//! In production, peers span many /24s (typically one per ASN), so the /24 +//! rate limit is never the binding constraint and crossing +//! `min_peers_to_save = 10` is straightforward. +//! +//! Asserting `after > before` is sufficient proof that the client library +//! correctly wires `add_discovered_peer` and `update_peer_metrics` into the +//! upload (and, transitively, download) paths. The threshold-crossing + +//! persistence behavior is an upstream contract covered by saorsa-transport's +//! own tests. + +#![allow(clippy::unwrap_used, clippy::expect_used)] + +mod support; + +use ant_core::data::{Client, ClientConfig}; +use bytes::Bytes; +use serial_test::serial; +use std::sync::Arc; +use support::MiniTestnet; + +const BOOTSTRAP_CACHE_TEST_NODES: usize = 12; + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_bootstrap_cache_grows_after_client_activity() { + let testnet = MiniTestnet::start(BOOTSTRAP_CACHE_TEST_NODES).await; + let node = testnet.node(3).expect("Node 3 should exist"); + + let client = Client::from_node(Arc::clone(&node), ClientConfig::default()) + .with_wallet(testnet.wallet().clone()); + + let before = node.cached_peer_count().await; + + let content = Bytes::from("bootstrap-cache e2e payload"); + let address = client + .chunk_put(content.clone()) + .await + .expect("chunk_put should succeed with payment"); + + // The GET exercises the download-side hook (chunk_get_from_peer), which + // would silently break if record_peer_outcome's signature drifted from + // what chunk.rs expects. The assertion here is just that the round-trip + // works — cache growth from the GET itself is capped by the /24 rate + // limiter which saturated during the PUT. + let retrieved = client + .chunk_get(&address) + .await + .expect("chunk_get should succeed") + .expect("chunk should be retrievable"); + assert_eq!(retrieved.content.as_ref(), content.as_ref()); + + let after = node.cached_peer_count().await; + assert!( + after > before, + "cache should grow after peer interactions: before={before} after={after}" + ); + + drop(client); + testnet.teardown().await; +} + +/// Cold-start-from-disk round-trip. +/// +/// ## What this proves +/// +/// - A populated `BootstrapManager` cache with ≥ `min_peers_to_save` peers +/// is persisted to disk on `save()`. +/// - A *fresh* `BootstrapManager` constructed against the same `cache_dir` +/// reloads the persisted peers on startup. +/// +/// Together with `test_bootstrap_cache_grows_after_client_activity` above +/// (which exercises the add-during-activity hook), this closes the loop on +/// the V2-202 value prop: cold-start clients reload real peers from disk. +/// +/// ## Why `add_peer_trusted` and not `add_discovered_peer` +/// +/// `add_discovered_peer` goes through `BootstrapManager::add_peer`, which +/// runs both the IP-diversity limiter and the temporal `JoinRateLimiter`. +/// The latter caps inserts at 3 per /24 subnet per hour and has no +/// loopback exemption. A real test that populates 15 peers through that +/// path would need peers on distinct /24s — not practical on a single-host +/// testnet. `add_peer_trusted` skips both limiters and talks to the same +/// underlying `BootstrapCache::add_seed` that our hooks ultimately feed, +/// so the persistence path exercised is identical to production's. +#[tokio::test] +async fn test_bootstrap_cache_roundtrip_through_disk() { + use saorsa_core::{BootstrapConfig, BootstrapManager}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + let cache_dir = tempfile::TempDir::new().expect("create temp cache dir"); + let config = BootstrapConfig { + cache_dir: cache_dir.path().to_path_buf(), + ..BootstrapConfig::default() + }; + + // Populate with peers on distinct /24s (cosmetic — add_peer_trusted + // skips rate limits — but keeps the data realistic if saorsa-transport + // ever tightens its invariants). + let peer_count = 15; + { + let mgr = BootstrapManager::with_config(config.clone()) + .await + .expect("construct populating BootstrapManager"); + for i in 0..peer_count { + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, i as u8 + 1)), 9000); + mgr.add_peer_trusted(&addr, vec![addr]).await; + } + assert_eq!(mgr.peer_count().await, peer_count, "in-memory populate"); + mgr.save() + .await + .expect("save should succeed above threshold"); + } + + // Fresh manager, same cache_dir: peers should be reloaded. + let reloaded = BootstrapManager::with_config(config) + .await + .expect("construct reloading BootstrapManager"); + let reloaded_count = reloaded.peer_count().await; + assert_eq!( + reloaded_count, peer_count, + "all {peer_count} peers should reload from disk, got {reloaded_count}" + ); +}