Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions ant-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ serial_test = "3"
anyhow = "1"
alloy = { version = "1.6", features = ["node-bindings"] }
tokio-test = "0.4"
# Direct access to BootstrapManager used by the cold-start-from-disk test,
# which populates a cache via `add_peer_trusted` (bypasses Sybil rate limits)
# and then verifies reload after save. Version tracks ant-node's transitive
# saorsa-core dep.
saorsa-core = "0.23"

[[example]]
name = "start-local-devnet"
Expand Down
26 changes: 21 additions & 5 deletions ant-core/src/data/client/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Chunks are immutable, content-addressed data blocks where the address
//! is the BLAKE3 hash of the content.

use crate::data::client::peer_cache::record_peer_outcome;
use crate::data::client::Client;
use crate::data::error::{Error, Result};
use ant_node::ant_protocol::{
Expand All @@ -15,7 +16,7 @@ use ant_node::CLOSE_GROUP_MAJORITY;
use bytes::Bytes;
use futures::stream::{FuturesUnordered, StreamExt};
use std::future::Future;
use std::time::Duration;
use std::time::{Duration, Instant};
use tracing::{debug, warn};

/// Data type identifier for chunks (used in quote requests).
Expand Down Expand Up @@ -171,7 +172,7 @@ impl Client {
let addr_hex = hex::encode(address);
let timeout_secs = self.config().store_timeout_secs;

send_and_await_chunk_response(
let result = send_and_await_chunk_response(
node,
target_peer,
message_bytes,
Expand Down Expand Up @@ -204,7 +205,15 @@ impl Client {
))
},
)
.await
.await;

// No RTT recorded on the PUT path: the wall-clock is dominated by
// the ~4 MB payload upload, which reflects the uploader's uplink
// rather than the peer's responsiveness. Quote-path and GET-path
// RTTs still feed quality scoring.
record_peer_outcome(node, *target_peer, peer_addrs, result.is_ok(), None).await;

result
}

/// Retrieve a chunk from the Autonomi network.
Expand Down Expand Up @@ -278,7 +287,8 @@ impl Client {
let addr_hex = hex::encode(address);
let timeout_secs = self.config().store_timeout_secs;

send_and_await_chunk_response(
let start = Instant::now();
let result = send_and_await_chunk_response(
node,
peer,
message_bytes,
Expand Down Expand Up @@ -325,7 +335,13 @@ impl Client {
))
},
)
.await
.await;

let success = result.is_ok();
let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
record_peer_outcome(node, *peer, peer_addrs, success, rtt_ms).await;

result
}

/// Check if a chunk exists on the network.
Expand Down
1 change: 1 addition & 0 deletions ant-core/src/data/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod data;
pub mod file;
pub mod merkle;
pub mod payment;
pub(crate) mod peer_cache;
pub mod quote;

use crate::data::client::cache::ChunkCache;
Expand Down
137 changes: 137 additions & 0 deletions ant-core/src/data/client/peer_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//! Bootstrap-cache population helpers.
//!
//! Wires client-side peer contacts into saorsa-core's `BootstrapManager`
//! so the persistent cache reflects real peer quality across sessions.

use ant_node::core::{MultiAddr, P2PNode, PeerId};
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use tracing::debug;

/// Feed a peer contact outcome into the `BootstrapManager` cache so future
/// cold-starts can rank peers by observed latency and success.
///
/// `success = true`: upserts the peer via `add_discovered_peer` (subject to
/// saorsa-core Sybil checks — rate limit + IP diversity) and records RTT via
/// `update_peer_metrics`.
///
/// `success = false`: only updates the quality score of peers already in
/// the cache. Unreachable peers are never inserted.
///
/// Both upstream calls silently discard errors — peer-cache bookkeeping
/// must never abort a user operation. Enable the `saorsa_core::bootstrap`
/// tracing target to see rejection reasons.
pub(crate) async fn record_peer_outcome(
node: &Arc<P2PNode>,
peer_id: PeerId,
addrs: &[MultiAddr],
success: bool,
rtt_ms: Option<u64>,
) {
if success {
let before = node.cached_peer_count().await;
let _ = node.add_discovered_peer(peer_id, addrs.to_vec()).await;
let after = node.cached_peer_count().await;
if after > before {
debug!("Bootstrap cache grew: {before} -> {after} peers");
}
}
if let Some(primary) = select_primary_multiaddr(addrs) {
let _ = node
.update_peer_metrics(primary, success, rtt_ms, None)
.await;
}
}

/// Pick the `MultiAddr` to use as the peer's cache key.
///
/// Prefers a globally routable socket address over RFC1918 / link-local /
/// loopback. Without this, a peer advertising `[10.0.0.5, 203.0.113.1]`
/// would be keyed under the RFC1918 address, so metrics recorded during
/// a contact over the public address would land on a stale cache entry.
/// Falls back to any socket-addressable `MultiAddr` if none look global.
fn select_primary_multiaddr(addrs: &[MultiAddr]) -> Option<&MultiAddr> {
addrs
.iter()
.find(|a| a.socket_addr().is_some_and(|sa| is_globally_routable(&sa)))
.or_else(|| addrs.iter().find(|a| a.socket_addr().is_some()))
}

fn is_globally_routable(addr: &SocketAddr) -> bool {
match addr.ip() {
IpAddr::V4(v4) => {
!v4.is_private()
&& !v4.is_loopback()
&& !v4.is_link_local()
&& !v4.is_broadcast()
&& !v4.is_documentation()
&& !v4.is_unspecified()
}
IpAddr::V6(v6) => {
// Full Ipv6Addr::is_global is unstable; this is the practical
// subset that mirrors the IPv4 checks above.
!v6.is_loopback()
&& !v6.is_unspecified()
&& !v6.is_multicast()
&& !v6.segments()[0].eq(&0xfe80) // link-local fe80::/10 (approx)
&& !matches!(v6.segments()[0] & 0xfe00, 0xfc00) // unique-local fc00::/7
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::net::{Ipv4Addr, Ipv6Addr};

#[test]
fn globally_routable_v4() {
// 8.8.8.8 (Google DNS) — genuinely public, not in any reserved range.
assert!(is_globally_routable(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)),
80
)));
assert!(!is_globally_routable(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(10, 0, 0, 5)),
80
)));
assert!(!is_globally_routable(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
80
)));
assert!(!is_globally_routable(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
80
)));
// 203.0.113.0/24 is TEST-NET-3 documentation — rejected by
// `is_documentation()`, which is the behaviour we want: quality
// metrics should not land on addresses that are never dialed in
// production by spec.
assert!(!is_globally_routable(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(203, 0, 113, 1)),
80
)));
}

#[test]
fn globally_routable_v6() {
// 2606:4700:4700::1111 (Cloudflare DNS) — a real public v6 outside
// the `2001:db8::/32` documentation prefix.
assert!(is_globally_routable(&SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0x2606, 0x4700, 0x4700, 0, 0, 0, 0, 0x1111)),
80
)));
assert!(!is_globally_routable(&SocketAddr::new(
IpAddr::V6(Ipv6Addr::LOCALHOST),
80
)));
assert!(!is_globally_routable(&SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 1)),
80
)));
assert!(!is_globally_routable(&SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 1)),
80
)));
}
}
9 changes: 8 additions & 1 deletion ant-core/src/data/client/quote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Handles requesting storage quotes from network nodes and
//! managing payment for data storage.

use crate::data::client::peer_cache::record_peer_outcome;
use crate::data::client::Client;
use crate::data::error::{Error, Result};
use ant_node::ant_protocol::{
Expand All @@ -14,7 +15,7 @@ use ant_node::{CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE};
use evmlib::common::Amount;
use evmlib::PaymentQuote;
use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use std::time::{Duration, Instant};
use tracing::{debug, warn};

/// Compute XOR distance between a peer's ID bytes and a target address.
Expand Down Expand Up @@ -106,6 +107,7 @@ impl Client {
let node_clone = node.clone();

let quote_future = async move {
let start = Instant::now();
let result = send_and_await_chunk_response(
&node_clone,
&peer_id_clone,
Expand Down Expand Up @@ -147,6 +149,11 @@ impl Client {
)
.await;

let success = result.is_ok();
let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
record_peer_outcome(&node_clone, peer_id_clone, &addrs_clone, success, rtt_ms)
.await;

(peer_id_clone, addrs_clone, result)
};

Expand Down
Loading
Loading