diff --git a/Cargo.toml b/Cargo.toml index 5075ad8..d8f4c8e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,4 +67,3 @@ default = ["full"] [package.metadata.docs.rs] all-features = true - diff --git a/README.md b/README.md index 8a4586a..85fb204 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,19 @@ If you want to explicitly start in Server mode, because you know you are not run you can call `Dht::builder().server_mode().build()`, and you can optionally add your known public ip so the node doesn't have to depend on, votes from responding nodes: `Dht::builder().server_mode().public_ip().build()`. +## Benchmarks + +There are manual benchmarks in `benches/` for tracking performance of core DHT +operations. Each file has a doc comment describing what it measures and what +regressions it catches. Run them with: + +```sh +cargo run --release --features full --bin latency +cargo run --release --features full --bin throughput +cargo run --release --features full --bin scalability +cargo run --release --features full --bin routing_table +``` + ## Acknowledgment This implementation was possible thanks to [Webtorrent's Bittorrent-dht](https://github.com/webtorrent/bittorrent-dht) as a reference, diff --git a/src/rpc/config.rs b/src/actor/config.rs similarity index 100% rename from src/rpc/config.rs rename to src/actor/config.rs diff --git a/src/actor/handle_request.rs b/src/actor/handle_request.rs new file mode 100644 index 0000000..de56310 --- /dev/null +++ b/src/actor/handle_request.rs @@ -0,0 +1,76 @@ +use std::net::SocketAddrV4; + +use tracing::info; + +use crate::common::{ + FindNodeRequestArguments, Id, MessageType, Node, RequestSpecific, RequestTypeSpecific, + RoutingTable, +}; +use crate::core::iterative_query::GetRequestSpecific; + +use super::Actor; + +impl Actor { + /// Handle an inbound KRPC request: forward to server, detect NAT traversal, + /// and rotate our ID if it doesn't match our public IP. + pub(super) fn handle_request( + &mut self, + from: SocketAddrV4, + transaction_id: u32, + request_specific: RequestSpecific, + ) { + // By default we only add nodes that responds to our requests. + // + // This is the only exception; the first node creating the DHT, + // without this exception, the bootstrapping node's routing table + // will never be populated. + if self.bootstrap.is_empty() { + if let RequestTypeSpecific::FindNode(param) = &request_specific.request_type { + self.routing_table.add(Node::new(param.target, from)); + } + } + + let is_ping = matches!(request_specific.request_type, RequestTypeSpecific::Ping); + + if self.server_mode() { + let server = &mut self.server; + + match server.handle_request(&self.routing_table, from, request_specific) { + Some(MessageType::Error(error)) => { + self.error(from, transaction_id, error); + } + Some(MessageType::Response(response)) => { + self.response(from, transaction_id, response); + } + _ => {} + }; + } + + if let Some(our_address) = self.public_address { + if from == our_address && is_ping { + self.firewalled = false; + + let ipv4 = our_address.ip(); + + // Restarting our routing table with new secure Id if necessary. + if !self.id().is_valid_for_ip(*ipv4) { + let new_id = Id::from_ipv4(*ipv4); + + info!( + "Our current id {} is not valid for address {}. Using new id {}", + self.id(), + our_address, + new_id + ); + + self.get( + GetRequestSpecific::FindNode(FindNodeRequestArguments { target: new_id }), + None, + ); + + self.routing_table = RoutingTable::new(new_id); + } + } + } + } +} diff --git a/src/actor/handle_response.rs b/src/actor/handle_response.rs new file mode 100644 index 0000000..e4f717d --- /dev/null +++ b/src/actor/handle_response.rs @@ -0,0 +1,197 @@ +use std::net::SocketAddrV4; + +use tracing::debug; + +use crate::common::{ + validate_immutable, GetImmutableResponseArguments, GetMutableResponseArguments, + GetPeersResponseArguments, Id, Message, MessageType, MutableItem, + NoMoreRecentValueResponseArguments, NoValuesResponseArguments, Node, RequestTypeSpecific, + ResponseSpecific, +}; + +use super::{Actor, Response}; + +impl Actor { + /// Handle an inbound KRPC response: match it to a put or iterative query, + /// validate any returned value, and add the responder to the routing table. + /// + /// Returns `Some((target, response))` when a value is obtained. + pub(super) fn handle_response( + &mut self, + from: SocketAddrV4, + message: Message, + ) -> Option<(Id, Response)> { + // If someone claims to be readonly, then let's not store anything even if they respond. + if message.read_only { + return None; + }; + + // If the response looks like a Ping response, check StoreQueries for the transaction_id. + if let Some(query) = self + .put_queries + .values_mut() + .find(|query| query.inflight(message.transaction_id)) + { + match message.message_type { + MessageType::Response(ResponseSpecific::Ping(_)) => { + // Mark storage at that node as a success. + query.success(); + } + MessageType::Error(error) => query.error(error), + _ => {} + }; + + return None; + } + + let mut should_add_node = false; + let author_id = message.get_author_id(); + let from_version = message.version.to_owned(); + + // Get corresponding query for message.transaction_id + if let Some(query) = self + .iterative_queries + .values_mut() + .find(|query| query.inflight(message.transaction_id)) + { + // KrpcSocket would not give us a response from the wrong address for the transaction_id + should_add_node = true; + + if let Some(nodes) = message.get_closer_nodes() { + for node in nodes { + query.add_candidate(node.clone()); + } + } + + if let Some((responder_id, token)) = message.get_token() { + query.add_responding_node(Node::new_with_token(responder_id, from, token.into())); + } + + if let Some(proposed_ip) = message.requester_ip { + query.add_address_vote(proposed_ip); + } + + let target = query.target(); + + match message.message_type { + MessageType::Response(ResponseSpecific::GetPeers(GetPeersResponseArguments { + values, + .. + })) => { + let response = Response::Peers(values); + query.response(from, response.clone()); + + return Some((target, response)); + } + MessageType::Response(ResponseSpecific::GetImmutable( + GetImmutableResponseArguments { + v, responder_id, .. + }, + )) => { + if validate_immutable(&v, query.target()) { + let response = Response::Immutable(v); + query.response(from, response.clone()); + + return Some((target, response)); + } + + let target = query.target(); + debug!( + ?v, + ?target, + ?responder_id, + ?from, + ?from_version, + "Invalid immutable value" + ); + } + MessageType::Response(ResponseSpecific::GetMutable( + GetMutableResponseArguments { + v, + seq, + sig, + k, + responder_id, + .. + }, + )) => { + let salt = match query.request.request_type.clone() { + RequestTypeSpecific::GetValue(args) => args.salt, + _ => None, + }; + let target = query.target(); + + match MutableItem::from_dht_message(query.target(), &k, v, seq, &sig, salt) { + Ok(item) => { + let response = Response::Mutable(item); + query.response(from, response.clone()); + + return Some((target, response)); + } + Err(error) => { + debug!( + ?error, + ?from, + ?responder_id, + ?from_version, + "Invalid mutable record" + ); + } + } + } + MessageType::Response(ResponseSpecific::NoMoreRecentValue( + NoMoreRecentValueResponseArguments { + seq, responder_id, .. + }, + )) => { + debug!( + target= ?query.target(), + salt= ?match query.request.request_type.clone() { + RequestTypeSpecific::GetValue(args) => args.salt, + _ => None, + }, + ?seq, + ?from, + ?responder_id, + ?from_version, + "No more recent value" + ); + } + MessageType::Response(ResponseSpecific::NoValues(NoValuesResponseArguments { + responder_id, + .. + })) => { + debug!( + target= ?query.target(), + salt= ?match query.request.request_type.clone() { + RequestTypeSpecific::GetValue(args) => args.salt, + _ => None, + }, + ?from, + ?responder_id, + ?from_version , + "No values" + ); + } + MessageType::Error(error) => { + debug!(?error, ?from_version, "Get query got error response"); + } + // Ping/FindNode: no value payload; node is added to routing table below. + // Requests are handled elsewhere. + MessageType::Response(ResponseSpecific::Ping(_)) + | MessageType::Response(ResponseSpecific::FindNode(_)) + | MessageType::Request(_) => {} + }; + }; + + if should_add_node { + // Add a node to our routing table on any expected incoming response. + + if let Some(id) = author_id { + self.routing_table.add(Node::new(id, from)); + } + } + + None + } +} diff --git a/src/rpc/info.rs b/src/actor/info.rs similarity index 84% rename from src/rpc/info.rs rename to src/actor/info.rs index 6f1063e..8156239 100644 --- a/src/rpc/info.rs +++ b/src/actor/info.rs @@ -2,7 +2,7 @@ use std::net::SocketAddrV4; use crate::Id; -use super::Rpc; +use super::Actor; /// Information and statistics about this mainline node. #[derive(Debug, Clone)] @@ -56,15 +56,15 @@ impl Info { } } -impl From<&Rpc> for Info { - fn from(rpc: &Rpc) -> Self { +impl From<&Actor> for Info { + fn from(actor: &Actor) -> Self { Self { - id: *rpc.id(), - local_addr: rpc.local_addr(), - dht_size_estimate: rpc.dht_size_estimate(), - public_address: rpc.public_address(), - firewalled: rpc.firewalled(), - server_mode: rpc.server_mode(), + id: *actor.id(), + local_addr: actor.local_addr(), + dht_size_estimate: actor.dht_size_estimate(), + public_address: actor.public_address(), + firewalled: actor.firewalled(), + server_mode: actor.server_mode(), } } } diff --git a/src/actor/mod.rs b/src/actor/mod.rs new file mode 100644 index 0000000..e1e72f3 --- /dev/null +++ b/src/actor/mod.rs @@ -0,0 +1,649 @@ +//! Actor implementation - I/O orchestration layer for the DHT. +//! +//! This module handles network communication and coordinates the DHT node's +//! event loop. Pure logic (queries, statistics, maintenance decisions) lives +//! in `core/` and is called from here. + +pub(crate) mod config; +mod handle_request; +mod handle_response; +mod info; +pub(crate) mod socket; + +use std::collections::HashMap; +use std::net::{SocketAddr, SocketAddrV4, ToSocketAddrs}; + +use tracing::{debug, error, info}; + +use crate::core::iterative_query::IterativeQuery; +use crate::core::put_query::PutQuery; + +use crate::common::{ + ErrorSpecific, FindNodeRequestArguments, GetValueRequestArguments, Id, MessageType, + MutableItem, Node, PutRequestSpecific, RequestSpecific, RequestTypeSpecific, ResponseSpecific, + RoutingTable, MAX_BUCKET_SIZE_K, +}; +use crate::core::routing_maintenance::RoutingMaintenance; +use crate::core::server::Server; +use crate::core::statistics::DhtStatistics; + +use self::messages::{GetPeersRequestArguments, PutMutableRequestArguments}; +use socket::KrpcSocket; + +pub use crate::common::messages; +pub use crate::core::iterative_query::GetRequestSpecific; +pub use crate::core::put_query::{ConcurrencyError, PutError, PutQueryError}; +pub use crate::core::server::{ + RequestFilter, ServerSettings, MAX_INFO_HASHES, MAX_PEERS, MAX_VALUES, +}; +pub use info::Info; +pub use socket::DEFAULT_REQUEST_TIMEOUT; + +/// Result of `tick_get_queries`: completed queries and whether self-findnode finished. +type GetQueriesResult = (Vec<(Id, Box<[Node]>)>, bool); + +pub const DEFAULT_BOOTSTRAP_NODES: [&str; 4] = [ + "router.bittorrent.com:6881", + "dht.transmissionbt.com:6881", + "dht.libtorrent.org:25401", + "relay.pkarr.org:6881", +]; + +#[derive(Debug)] +/// Internal Actor called in the Dht thread loop, useful to create your own actor setup. +pub struct Actor { + // Options + bootstrap: Box<[SocketAddrV4]>, + + socket: KrpcSocket, + + // Routing + /// Closest nodes to this node + routing_table: RoutingTable, + /// Routing table maintenance (refresh/ping timing) + maintenance: RoutingMaintenance, + + // DHT statistics and cached queries + statistics: DhtStatistics, + + // Active IterativeQueries + iterative_queries: HashMap, + /// Put queries are special, since they have to wait for a corresponding + /// get query to finish, update the closest_nodes, then `query_all` these. + put_queries: HashMap, + + server: Server, + + public_address: Option, + firewalled: bool, +} + +impl Actor { + /// Creates a new Actor. Does not perform network I/O; call [`Actor::tick`] to + /// bootstrap and run scheduled maintenance. + pub fn new(config: config::Config) -> Result { + let id = if let Some(ip) = config.public_ip { + Id::from_ip(ip.into()) + } else { + Id::random() + }; + + let socket = KrpcSocket::new(&config)?; + + Ok(Actor { + bootstrap: config + .bootstrap + .unwrap_or_else(|| to_socket_address(&DEFAULT_BOOTSTRAP_NODES)) + .into(), + socket, + + routing_table: RoutingTable::new(id), + maintenance: RoutingMaintenance::new(), + statistics: DhtStatistics::new(), + + iterative_queries: HashMap::new(), + put_queries: HashMap::new(), + + server: Server::new(config.server_settings), + + public_address: None, + firewalled: true, + }) + } + + // === Getters === + + /// Returns the node's Id + pub fn id(&self) -> &Id { + self.routing_table.id() + } + + /// Returns the address the server is listening to. + #[inline] + pub fn local_addr(&self) -> SocketAddrV4 { + self.socket.local_addr() + } + + /// Returns the best guess for this node's Public address. + /// + /// If [crate::DhtBuilder::public_ip] was set, this is what will be returned + /// (plus the local port), otherwise it will rely on consensus from + /// responding nodes voting on our public IP and port. + pub fn public_address(&self) -> Option { + self.public_address + } + + /// Returns `true` if we can't confirm that [Self::public_address] is publicly addressable. + /// + /// If this node is firewalled, it won't switch to server mode if it is in adaptive mode, + /// but if [crate::DhtBuilder::server_mode] was set to true, then whether or not this node is firewalled + /// won't matter. + pub fn firewalled(&self) -> bool { + self.firewalled + } + + /// Returns whether or not this node is running in server mode. + pub fn server_mode(&self) -> bool { + self.socket.server_mode + } + + pub fn routing_table(&self) -> &RoutingTable { + &self.routing_table + } + + pub fn routing_table_mut(&mut self) -> &mut RoutingTable { + &mut self.routing_table + } + + /// Returns: + /// 1. Normal Dht size estimate based on all closer `nodes` in query responses. + /// 2. Standard deviaiton as a function of the number of samples used in this estimate. + /// + /// [Read more](https://github.com/pubky/mainline/blob/main/docs/dht_size_estimate.md) + pub fn dht_size_estimate(&self) -> (usize, f64) { + self.statistics.dht_size_estimate() + } + + /// Returns a thread safe and lightweight summary of this node's + /// information and statistics. + pub fn info(&self) -> Info { + Info::from(self) + } + + // === Public Methods === + + /// Advances routing-table maintenance and in-flight queries by one step. + /// + /// Call periodically; delays degrade query completion and routing table quality. + pub fn tick(&mut self) -> RpcTickReport { + let mut done_put_queries = self.tick_put_queries(); + + let (done_get_queries, finished_self_findnode) = self.tick_get_queries(); + + self.cleanup_done_queries(&done_get_queries, &mut done_put_queries); + + self.periodic_node_maintenance(); + + let new_query_response = self.handle_message(); + + if finished_self_findnode { + self.log_bootstrap(self.id()); + } + + RpcTickReport { + done_get_queries, + done_put_queries, + new_query_response, + } + } + + /// Send a request to the given address and return the transaction_id + pub fn request(&mut self, address: SocketAddrV4, request: RequestSpecific) -> u32 { + self.socket.request(address, request) + } + + /// Send a response to the given address. + pub fn response( + &mut self, + address: SocketAddrV4, + transaction_id: u32, + response: ResponseSpecific, + ) { + self.socket.response(address, transaction_id, response) + } + + /// Send an error to the given address. + pub fn error(&mut self, address: SocketAddrV4, transaction_id: u32, error: ErrorSpecific) { + self.socket.error(address, transaction_id, error) + } + + /// Store a value in the closest nodes, optionally trigger a lookup query if + /// the cached closest_nodes aren't fresh enough. + /// + /// - `request`: the put request. + pub fn put( + &mut self, + request: PutRequestSpecific, + extra_nodes: Option>, + ) -> Result<(), PutError> { + let target = *request.target(); + + if let PutRequestSpecific::PutMutable(PutMutableRequestArguments { + sig, cas, seq, .. + }) = &request + { + if let Some(PutRequestSpecific::PutMutable(inflight_request)) = self + .put_queries + .get(&target) + .map(|existing| &existing.request) + { + debug!(?inflight_request, ?request, "Possible conflict risk"); + + if *sig == inflight_request.sig { + // Noop, the inflight query is sufficient. + return Ok(()); + } else if *seq < inflight_request.seq { + return Err(ConcurrencyError::NotMostRecent)?; + } else if let Some(cas) = cas { + if *cas == inflight_request.seq { + // The user is aware of the inflight query and whiches to overrides it. + // + // Remove the inflight request, and create a new one. + self.put_queries.remove(&target); + } else { + return Err(ConcurrencyError::CasFailed)?; + } + } else { + return Err(ConcurrencyError::ConflictRisk)?; + }; + }; + } + + // Extract salt before moving request (only clone small salt, not entire request) + let salt = match &request { + PutRequestSpecific::PutMutable(args) => args.salt.clone(), + _ => None, + }; + + let mut query = PutQuery::new(target, request, extra_nodes); + + if let Some(cached_nodes) = self.statistics.get_cached_query(&target) { + let closest_nodes: Box<[Node]> = cached_nodes.into(); + if !closest_nodes.is_empty() && closest_nodes.iter().any(|n| n.valid_token()) { + query.start(&mut self.socket, &closest_nodes)?; + self.put_queries.insert(target, query); + return Ok(()); + } + } + + // No cached nodes with valid tokens, need to do a GET first + self.get( + GetRequestSpecific::GetValue(GetValueRequestArguments { + target, + seq: None, + salt, + }), + None, + ); + + self.put_queries.insert(target, query); + + Ok(()) + } + + /// Start an iterative lookup toward `target`. While the query is in flight, + /// repeated calls return cached responses. New responses arrive via + /// [`RpcTickReport::new_query_response`] after [`Actor::tick`]. + pub fn get( + &mut self, + request: GetRequestSpecific, + extra_nodes: Option<&[SocketAddrV4]>, + ) -> Option> { + let target = match request { + GetRequestSpecific::FindNode(FindNodeRequestArguments { target }) => target, + GetRequestSpecific::GetPeers(GetPeersRequestArguments { info_hash, .. }) => info_hash, + GetRequestSpecific::GetValue(GetValueRequestArguments { target, .. }) => target, + }; + + let response_from_inflight_put_mutable_request = + self.put_queries.get(&target).and_then(|existing| { + if let PutRequestSpecific::PutMutable(request) = &existing.request { + Some(Response::Mutable(request.clone().into())) + } else { + None + } + }); + + // If query is still active, no need to create a new one. + if let Some(query) = self.iterative_queries.get(&target) { + let mut responses = query.responses().to_vec(); + + if let Some(response) = response_from_inflight_put_mutable_request { + responses.push(response); + } + + return Some(responses); + } + + let node_id = self.routing_table.id(); + + if target == *node_id { + debug!(?node_id, "Bootstrapping the routing table"); + } + + let mut query = IterativeQuery::new(*self.id(), target, request); + + // Seed the query either with the closest nodes from the routing table, or the + // bootstrapping nodes if the closest nodes are not enough. + + let routing_table_closest = self.routing_table.closest_secure( + target, + self.responders_based_dht_size_estimate(), + self.average_subnets(), + ); + + // If we don't have enough or any closest nodes, call the bootstrapping nodes. + if routing_table_closest.is_empty() || routing_table_closest.len() < self.bootstrap.len() { + for bootstrapping_node in self.bootstrap.clone() { + query.visit(&mut self.socket, bootstrapping_node); + } + } + + if let Some(extra_nodes) = extra_nodes { + for extra_node in extra_nodes { + query.visit(&mut self.socket, *extra_node) + } + } + + // Seed this query with the closest nodes we know about. + for node in routing_table_closest { + query.add_candidate(node) + } + + if let Some(closest_responding_nodes) = self.statistics.get_cached_query(&target) { + for node in closest_responding_nodes { + query.add_candidate(node.clone()) + } + } + + // After adding the nodes, we need to start the query. + query.start(&mut self.socket); + + self.iterative_queries.insert(target, query); + + // If there is an inflight PutQuery for mutable item return its value + if let Some(response) = response_from_inflight_put_mutable_request { + return Some(vec![response]); + } + + None + } + + // === Private Methods === + + /// Run periodic routing-table maintenance (purge, ping, repopulate, server-mode check). + fn periodic_node_maintenance(&mut self) { + let decisions = self + .maintenance + .periodic_maintenance_decisions(&self.routing_table); + + if decisions.should_switch_to_server { + self.try_switching_to_server_mode(); + } + + if decisions.should_ping { + self.purge_nodes(&decisions.nodes_to_purge); + self.ping_nodes(&decisions.nodes_to_ping); + + if !decisions.nodes_to_purge.is_empty() || !decisions.nodes_to_ping.is_empty() { + debug!( + removed = decisions.nodes_to_purge.len(), + pinged = decisions.nodes_to_ping.len(), + "Node maintenance executed" + ); + } + } + + if decisions.should_populate { + self.populate(); + } + } + + /// Switch to server mode if not already active and not firewalled. + fn try_switching_to_server_mode(&mut self) { + if !self.server_mode() && !self.firewalled() { + info!("Adaptive mode: have been running long enough (not firewalled), switching to server mode"); + self.socket.server_mode = true; + } + } + + /// Remove nodes from the routing table. + fn purge_nodes(&mut self, ids: &[Id]) { + for id in ids { + self.routing_table.remove(id); + } + } + + /// Ping nodes. + fn ping_nodes(&mut self, addrs: &[SocketAddrV4]) { + for address in addrs { + self.ping(*address); + } + } + + /// Populate routing table by asking bootstrap nodes to find ourselves, + /// Response will allow to add closest nodes candidates to routing table. + fn populate(&mut self) { + if self.bootstrap.is_empty() { + return; + } + + self.get( + GetRequestSpecific::FindNode(FindNodeRequestArguments { target: *self.id() }), + None, + ); + } + + /// Send a ping request to a node. + fn ping(&mut self, address: SocketAddrV4) { + self.socket.request( + address, + RequestSpecific { + requester_id: *self.id(), + request_type: RequestTypeSpecific::Ping, + }, + ); + } + + fn update_address_votes_from_iterative_query(&mut self, query: &IterativeQuery) { + let Some(new_address) = query.best_address() else { + return; + }; + + let needs_confirm = match self.public_address { + None => true, + Some(current) => current != new_address, + }; + + if needs_confirm { + debug!( + ?new_address, + "Query responses suggest a different public_address, trying to confirm.." + ); + + self.firewalled = true; + self.ping(new_address); + } + + self.public_address = Some(new_address); + } + + fn cache_iterative_query(&mut self, query: &IterativeQuery, closest_responding_nodes: &[Node]) { + self.statistics.cache_query(query, closest_responding_nodes); + } + + fn responders_based_dht_size_estimate(&self) -> usize { + self.statistics.responders_based_dht_size_estimate() + } + + fn average_subnets(&self) -> usize { + self.statistics.average_subnets() + } + + // === tick() helpers === + + /// Advance all PUT queries, return done ones. + fn tick_put_queries(&mut self) -> Vec<(Id, Option)> { + let mut done_put_queries = Vec::with_capacity(self.put_queries.len()); + + for (id, query) in self.put_queries.iter_mut() { + match query.tick(&self.socket) { + Ok(done) => { + if done { + done_put_queries.push((*id, None)); + } + } + Err(error) => done_put_queries.push((*id, Some(error))), + }; + } + + done_put_queries + } + + /// Advance all GET/FIND_NODE queries, return done ones and whether table refresh/find_node to self is finished. + fn tick_get_queries(&mut self) -> GetQueriesResult { + let self_id = *self.id(); + let responders_based_dht_size_estimate = self.responders_based_dht_size_estimate(); + let average_subnets = self.average_subnets(); + + let mut done_get_queries = Vec::with_capacity(self.iterative_queries.len()); + let mut finished_self_findnode = false; + + for (id, query) in self.iterative_queries.iter_mut() { + if !query.tick(&mut self.socket) { + continue; + } + + let closest_nodes = if let RequestTypeSpecific::FindNode(_) = query.request.request_type + { + finished_self_findnode = *id == self_id; + + query + .closest() + .nodes() + .iter() + .take(MAX_BUCKET_SIZE_K) + .cloned() + .collect::>() + } else { + query + .responders() + .take_until_secure(responders_based_dht_size_estimate, average_subnets) + .to_vec() + .into_boxed_slice() + }; + + done_get_queries.push((*id, closest_nodes)); + } + + (done_get_queries, finished_self_findnode) + } + + /// Remove completed GET and PUT queries from internal state. + fn cleanup_done_queries( + &mut self, + done_get: &[(Id, Box<[Node]>)], + done_put: &mut Vec<(Id, Option)>, + ) { + // Has to happen _before_ `self.socket.recv_from()`. + for (id, closest_nodes) in done_get { + let query = match self.iterative_queries.remove(id) { + Some(query) => query, + None => continue, + }; + + self.update_address_votes_from_iterative_query(&query); + self.cache_iterative_query(&query, closest_nodes); + + // Only for get queries, not find node. + if matches!(query.request.request_type, RequestTypeSpecific::FindNode(_)) { + continue; + } + + let put_query = match self.put_queries.get_mut(id) { + Some(put_query) => put_query, + None => continue, + }; + + if put_query.started() { + continue; + } + + if let Err(error) = put_query.start(&mut self.socket, closest_nodes) { + done_put.push((*id, Some(error))) + } + } + + for (id, _) in done_put.iter() { + self.put_queries.remove(id); + } + } + + /// Handle one incoming message, either a request or a response message. One message per tick. + fn handle_message(&mut self) -> Option<(Id, Response)> { + self.socket + .recv_from() + .and_then(|(message, from)| match message.message_type { + MessageType::Request(request_specific) => { + self.handle_request(from, message.transaction_id, request_specific); + None + } + _ => self.handle_response(from, message), + }) + } + + /// Check if routing table is empty and log an error if so. + fn log_bootstrap(&self, self_id: &Id) { + let table_size = self.routing_table.size(); + if table_size == 0 { + error!("Could not bootstrap the routing table"); + } else { + debug!(?self_id, table_size, "Populated the routing table"); + } + } +} + +/// Results from a single [`Actor::tick`] call. +#[derive(Debug, Clone)] +pub struct RpcTickReport { + /// Completed GET queries with their closest nodes. + pub done_get_queries: Vec<(Id, Box<[Node]>)>, + /// Completed PUT queries; `Some(err)` on failure. + pub done_put_queries: Vec<(Id, Option)>, + /// A value response received for an in-flight GET query. + pub new_query_response: Option<(Id, Response)>, +} + +#[derive(Debug, Clone)] +pub enum Response { + Peers(Vec), + Immutable(Box<[u8]>), + Mutable(MutableItem), +} + +pub(crate) fn to_socket_address(bootstrap: &[T]) -> Vec { + bootstrap + .iter() + .flat_map(|s| { + s.to_socket_addrs().map(|addrs| { + addrs + .filter_map(|addr| match addr { + SocketAddr::V4(addr_v4) => Some(addr_v4), + _ => None, + }) + .collect::>() + }) + }) + .flatten() + .collect() +} diff --git a/src/rpc/socket.rs b/src/actor/socket.rs similarity index 100% rename from src/rpc/socket.rs rename to src/actor/socket.rs diff --git a/src/rpc/socket/inflight_requests.rs b/src/actor/socket/inflight_requests.rs similarity index 100% rename from src/rpc/socket/inflight_requests.rs rename to src/actor/socket/inflight_requests.rs diff --git a/src/async_dht.rs b/src/async_dht.rs index 701577b..451111b 100644 --- a/src/async_dht.rs +++ b/src/async_dht.rs @@ -9,13 +9,13 @@ use std::{ use futures_lite::{Stream, StreamExt}; use crate::{ + actor::{GetRequestSpecific, Info, PutError, PutQueryError}, common::{ hash_immutable, AnnouncePeerRequestArguments, FindNodeRequestArguments, GetPeersRequestArguments, GetValueRequestArguments, Id, MutableItem, Node, PutImmutableRequestArguments, PutMutableRequestArguments, PutRequestSpecific, }, dht::{ActorMessage, Dht, PutMutableError, ResponseSender}, - rpc::{GetRequestSpecific, Info, PutError, PutQueryError}, }; impl Dht { @@ -289,7 +289,7 @@ impl AsyncDht { /// ## Errors /// /// In addition to the [PutQueryError] common with all PUT queries, PUT mutable item - /// query has other [Concurrency errors][crate::rpc::ConcurrencyError], that try to detect write conflict + /// query has other [Concurrency errors][crate::actor::ConcurrencyError], that try to detect write conflict /// risks or obvious conflicts. /// /// If you are lucky to get one of these errors (which is not guaranteed), then you should @@ -387,7 +387,7 @@ mod test { use ed25519_dalek::SigningKey; use futures::StreamExt; - use crate::{dht::Testnet, rpc::ConcurrencyError}; + use crate::{actor::ConcurrencyError, dht::Testnet}; use super::*; diff --git a/src/common.rs b/src/common.rs index bbc04a1..d59acda 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,5 +1,6 @@ //! Miscellaneous common structs used throughout the library. +mod closest_nodes; mod id; mod immutable; pub mod messages; @@ -7,6 +8,7 @@ mod mutable; mod node; mod routing_table; +pub use closest_nodes::*; pub use id::*; pub use immutable::*; pub use messages::*; diff --git a/src/rpc/closest_nodes.rs b/src/common/closest_nodes.rs similarity index 100% rename from src/rpc/closest_nodes.rs rename to src/common/closest_nodes.rs diff --git a/src/common/routing_table.rs b/src/common/routing_table.rs index 1899513..28c6a91 100644 --- a/src/common/routing_table.rs +++ b/src/common/routing_table.rs @@ -3,8 +3,7 @@ use std::collections::BTreeMap; use std::slice::Iter; -use crate::common::{Id, Node}; -use crate::rpc::ClosestNodes; +use crate::common::{ClosestNodes, Id, Node}; /// K = the default maximum size of a k-bucket. pub const MAX_BUCKET_SIZE_K: usize = 20; diff --git a/src/rpc/iterative_query.rs b/src/core/iterative_query.rs similarity index 94% rename from src/rpc/iterative_query.rs rename to src/core/iterative_query.rs index 2436f9f..3d28263 100644 --- a/src/rpc/iterative_query.rs +++ b/src/core/iterative_query.rs @@ -6,12 +6,12 @@ use std::net::SocketAddrV4; use tracing::{debug, trace}; -use super::{socket::KrpcSocket, ClosestNodes}; -use crate::common::{FindNodeRequestArguments, GetPeersRequestArguments, GetValueRequestArguments}; -use crate::{ - common::{Id, Node, RequestSpecific, RequestTypeSpecific, MAX_BUCKET_SIZE_K}, - rpc::Response, +use crate::actor::socket::KrpcSocket; +use crate::actor::Response; +use crate::common::{ + ClosestNodes, Id, Node, RequestSpecific, RequestTypeSpecific, MAX_BUCKET_SIZE_K, }; +use crate::common::{FindNodeRequestArguments, GetPeersRequestArguments, GetValueRequestArguments}; /// An iterative process of concurrently sending a request to the closest known nodes to /// the target, updating the routing table with closer nodes discovered in the responses, and @@ -78,6 +78,11 @@ impl IterativeQuery { self.responders.target() } + /// Check if this is a FindNode query + pub fn is_find_node(&self) -> bool { + matches!(self.request.request_type, RequestTypeSpecific::FindNode(_)) + } + /// Closest nodes according to other nodes. pub fn closest(&self) -> &ClosestNodes { &self.closest diff --git a/src/core/mod.rs b/src/core/mod.rs new file mode 100644 index 0000000..1572d9f --- /dev/null +++ b/src/core/mod.rs @@ -0,0 +1,11 @@ +//! Core DHT logic - pure computation with no direct I/O. +//! +//! Contains query drivers (`iterative_query`, `put_query`), the `server` request +//! handler, and stateful helpers (`statistics`, `routing_maintenance`). +//! All I/O orchestration lives in `actor/`, which calls into this module. + +pub(crate) mod iterative_query; +pub(crate) mod put_query; +pub(crate) mod routing_maintenance; +pub(crate) mod server; +pub(crate) mod statistics; diff --git a/src/rpc/put_query.rs b/src/core/put_query.rs similarity index 98% rename from src/rpc/put_query.rs rename to src/core/put_query.rs index 3f53aac..a441ccf 100644 --- a/src/rpc/put_query.rs +++ b/src/core/put_query.rs @@ -7,10 +7,10 @@ use crate::{ Node, }; -use super::socket::KrpcSocket; +use crate::actor::socket::KrpcSocket; #[derive(Debug)] -/// Once an [super::IterativeQuery] is done, or if a previous cached one was a vailable, +/// Once an [IterativeQuery](super::iterative_query::IterativeQuery) is done, or if a previous cached one was available, /// we can store data at the closest nodes using this PutQuery, that keeps track of /// acknowledging nodes, and or errors. pub struct PutQuery { diff --git a/src/core/routing_maintenance.rs b/src/core/routing_maintenance.rs new file mode 100644 index 0000000..186b3e5 --- /dev/null +++ b/src/core/routing_maintenance.rs @@ -0,0 +1,152 @@ +//! Routing table maintenance logic. + +use std::net::SocketAddrV4; +use std::time::{Duration, Instant}; + +use crate::common::{Id, RoutingTable}; + +const REFRESH_TABLE_INTERVAL: Duration = Duration::from_secs(15 * 60); +const PING_TABLE_INTERVAL: Duration = Duration::from_secs(5 * 60); + +/// Routing table maintenance state +#[derive(Debug)] +pub struct RoutingMaintenance { + last_table_refresh: Instant, + last_table_ping: Instant, +} + +/// Decisions about routing table maintenance +#[derive(Debug)] +pub struct MaintenanceDecisions { + /// Whether to populate the routing table (bootstrap) + pub should_populate: bool, + + /// Whether to ping nodes + pub should_ping: bool, + + /// Whether to try switching to server mode + pub should_switch_to_server: bool, + + /// Node IDs to purge from the routing table + pub nodes_to_purge: Vec, + + /// Node addresses to ping + pub nodes_to_ping: Vec, +} + +impl RoutingMaintenance { + /// Create new routing maintenance tracker + pub fn new() -> Self { + RoutingMaintenance { + last_table_refresh: Instant::now(), + last_table_ping: Instant::now(), + } + } + + /// Determine what maintenance operations should be performed. + /// + /// Computes decisions and resets internal timers when intervals elapse. + pub fn periodic_maintenance_decisions( + &mut self, + routing_table: &RoutingTable, + ) -> MaintenanceDecisions { + self.periodic_maintenance_decisions_at(Instant::now(), routing_table) + } + + fn periodic_maintenance_decisions_at( + &mut self, + now: Instant, + routing_table: &RoutingTable, + ) -> MaintenanceDecisions { + let refresh_is_due = now.duration_since(self.last_table_refresh) >= REFRESH_TABLE_INTERVAL; + let ping_is_due = now.duration_since(self.last_table_ping) >= PING_TABLE_INTERVAL; + + let should_populate = routing_table.is_empty() || refresh_is_due; + let should_switch_to_server = refresh_is_due; + + let (nodes_to_purge, nodes_to_ping) = if ping_is_due { + self.last_table_ping = now; + self.purge_and_ping_candidates(routing_table) + } else { + (Vec::new(), Vec::new()) + }; + + if refresh_is_due { + self.last_table_refresh = now; + } + + MaintenanceDecisions { + should_populate, + should_ping: ping_is_due, + should_switch_to_server, + nodes_to_purge, + nodes_to_ping, + } + } + + /// Determine which nodes to purge and which to ping. + /// + /// Pure function - examines routing table and returns decisions. + fn purge_and_ping_candidates( + &self, + routing_table: &RoutingTable, + ) -> (Vec, Vec) { + let mut to_purge = Vec::with_capacity(routing_table.size()); + let mut to_ping = Vec::with_capacity(routing_table.size()); + + for node in routing_table.nodes() { + if node.is_stale() { + to_purge.push(*node.id()); + } else if node.should_ping() { + to_ping.push(node.address()); + } + } + + (to_purge, to_ping) + } +} + +impl Default for RoutingMaintenance { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use std::time::{Duration, Instant}; + + use crate::common::{Id, RoutingTable}; + + use super::{RoutingMaintenance, REFRESH_TABLE_INTERVAL}; + + #[test] + fn empty_table_does_not_reset_refresh_timer() { + let mut maintenance = RoutingMaintenance::new(); + let routing_table = RoutingTable::new(Id::random()); + let before = maintenance.last_table_refresh; + + let _ = maintenance.periodic_maintenance_decisions(&routing_table); + + assert_eq!(maintenance.last_table_refresh, before); + } + + #[test] + fn refresh_due_updates_refresh_timer() { + let mut maintenance = RoutingMaintenance::new(); + let routing_table = RoutingTable::new(Id::random()); + let before = maintenance.last_table_refresh; + + // Advance time forward past the refresh interval. + // We add to Instant::now() instead of subtracting, because on Windows + // Instant can be close to its internal epoch and subtraction overflows. + let future = Instant::now() + REFRESH_TABLE_INTERVAL + Duration::from_secs(1); + + let decisions = maintenance.periodic_maintenance_decisions_at(future, &routing_table); + + assert!(decisions.should_populate); + assert!(decisions.should_switch_to_server); + assert!(maintenance.last_table_refresh > before); + assert_eq!(maintenance.last_table_refresh, future); + } +} diff --git a/src/rpc/server.rs b/src/core/server.rs similarity index 100% rename from src/rpc/server.rs rename to src/core/server.rs diff --git a/src/rpc/server/peers.rs b/src/core/server/peers.rs similarity index 100% rename from src/rpc/server/peers.rs rename to src/core/server/peers.rs diff --git a/src/rpc/server/tokens.rs b/src/core/server/tokens.rs similarity index 100% rename from src/rpc/server/tokens.rs rename to src/core/server/tokens.rs diff --git a/src/core/statistics.rs b/src/core/statistics.rs new file mode 100644 index 0000000..2856439 --- /dev/null +++ b/src/core/statistics.rs @@ -0,0 +1,218 @@ +//! DHT statistics and size estimation. + +use std::num::NonZeroUsize; + +use lru::LruCache; + +use crate::common::{Id, Node}; +use crate::core::iterative_query::IterativeQuery; + +const MAX_CACHED_ITERATIVE_QUERIES: usize = 1000; + +/// Cold-start DHT size assumption to avoid storing to too many nodes before real data arrives. +const INITIAL_DHT_SIZE_ESTIMATE: f64 = 1_000_000.0; + +/// Default subnet diversity assumption until real query data arrives. +const INITIAL_SUBNETS_SUM: usize = 20; + +/// Empirical coefficients for standard deviation of DHT size estimates. +/// See +const STD_DEV_COEFFICIENT: f64 = 0.281; +const STD_DEV_EXPONENT: f64 = -0.529; + +/// Statistics about the DHT network +#[derive(Debug)] +pub struct DhtStatistics { + /// Sum of DHT size estimates from all queries + dht_size_estimates_sum: f64, + + /// Sum of DHT size estimates based on responding nodes + responders_based_dht_size_estimates_sum: f64, + + /// Count of queries used for responders-based estimates + responders_based_dht_size_estimates_count: usize, + + /// Sum of the number of subnets with 6-bit prefix in the closest nodes' IPv4 addresses + subnets_sum: usize, + + /// Cache of completed queries with their results + cached_queries: LruCache, +} + +#[derive(Debug)] +struct CachedIterativeQuery { + closest_responding_nodes: Box<[Node]>, + dht_size_estimate: f64, + responders_dht_size_estimate: f64, + subnets: u8, + is_find_node: bool, +} + +impl DhtStatistics { + /// Create new DHT statistics tracker + pub fn new() -> Self { + DhtStatistics { + dht_size_estimates_sum: 0.0, + responders_based_dht_size_estimates_sum: INITIAL_DHT_SIZE_ESTIMATE, + responders_based_dht_size_estimates_count: 0, + subnets_sum: INITIAL_SUBNETS_SUM, + cached_queries: LruCache::new( + NonZeroUsize::new(MAX_CACHED_ITERATIVE_QUERIES).expect("valid non-zero"), + ), + } + } + + /// Get DHT size estimate based on all queries + pub fn dht_size_estimate(&self) -> (usize, f64) { + let sample_count = self.cached_queries.len(); + if sample_count == 0 { + return (0, 0.0); + } + + let normal = self.dht_size_estimates_sum as usize / sample_count; + + let std_dev = STD_DEV_COEFFICIENT * (sample_count as f64).powf(STD_DEV_EXPONENT); + + (normal, std_dev) + } + + /// Get DHT size estimate based on responding nodes only + pub fn responders_based_dht_size_estimate(&self) -> usize { + self.responders_based_dht_size_estimates_sum as usize + / self.responders_based_dht_size_estimates_count.max(1) + } + + /// Get average subnet diversity + pub fn average_subnets(&self) -> usize { + self.subnets_sum / self.cached_queries.len().max(1) + } + + /// Cache a completed query + pub fn cache_query(&mut self, query: &IterativeQuery, closest_responding_nodes: &[Node]) { + let closest = query.closest(); + if closest.nodes().is_empty() { + // Node is offline + return; + } + + // Evict LRU if at capacity + if self.cached_queries.len() >= MAX_CACHED_ITERATIVE_QUERIES { + if let Some((_, old_query)) = self.cached_queries.pop_lru() { + self.decrement_stats(&old_query); + } + } + + let responders = query.responders(); + + let dht_size_estimate = closest.dht_size_estimate(); + let responders_dht_size_estimate = responders.dht_size_estimate(); + let subnets_count = closest.subnets_count(); + + let is_find_node = query.is_find_node(); + + let cached = CachedIterativeQuery { + closest_responding_nodes: closest_responding_nodes.into(), + dht_size_estimate, + responders_dht_size_estimate, + subnets: subnets_count, + is_find_node, + }; + + // Update sums before inserting + if let Some(old) = self.cached_queries.put(query.target(), cached) { + self.decrement_stats(&old); + } + + // Increment with new values + self.dht_size_estimates_sum += dht_size_estimate; + self.responders_based_dht_size_estimates_sum += responders_dht_size_estimate; + self.subnets_sum += subnets_count as usize; + + if !is_find_node { + self.responders_based_dht_size_estimates_count += 1; + } + } + + /// Get cached query results and update LRU + pub fn get_cached_query(&mut self, target: &Id) -> Option<&[Node]> { + self.cached_queries + .get(target) + .map(|q| q.closest_responding_nodes.as_ref()) + } + + fn decrement_stats(&mut self, query: &CachedIterativeQuery) { + self.dht_size_estimates_sum -= query.dht_size_estimate; + self.responders_based_dht_size_estimates_sum -= query.responders_dht_size_estimate; + self.subnets_sum -= query.subnets as usize; + + if !query.is_find_node { + self.responders_based_dht_size_estimates_count -= 1; + } + } +} + +impl Default for DhtStatistics { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use crate::common::GetValueRequestArguments; + use crate::common::{Id, Node}; + use crate::core::iterative_query::{GetRequestSpecific, IterativeQuery}; + + use super::{DhtStatistics, MAX_CACHED_ITERATIVE_QUERIES}; + + fn build_query(target: Id, node: Node) -> IterativeQuery { + let mut query = IterativeQuery::new( + Id::random(), + target, + GetRequestSpecific::GetValue(GetValueRequestArguments { + target, + seq: None, + salt: None, + }), + ); + query.add_candidate(node.clone()); + query.add_responding_node(node); + query + } + + #[test] + fn dht_size_estimate_empty_is_zero() { + let stats = DhtStatistics::new(); + assert_eq!(stats.dht_size_estimate(), (0, 0.0)); + } + + #[test] + fn cache_query_does_not_evict_on_empty_closest() { + let mut stats = DhtStatistics::new(); + + for i in 0..MAX_CACHED_ITERATIVE_QUERIES { + let target = Id::random(); + let node = Node::unique(i); + let query = build_query(target, node); + let closest = query.closest().nodes(); + stats.cache_query(&query, closest); + } + + let (_, std_dev_before) = stats.dht_size_estimate(); + + let empty_target = Id::random(); + let empty_query = IterativeQuery::new( + Id::random(), + empty_target, + GetRequestSpecific::GetValue(GetValueRequestArguments { + target: empty_target, + seq: None, + salt: None, + }), + ); + stats.cache_query(&empty_query, &[]); + + let (_, std_dev_after) = stats.dht_size_estimate(); + assert!((std_dev_before - std_dev_after).abs() < f64::EPSILON); + } +} diff --git a/src/dht.rs b/src/dht.rs index a867fec..7f1ab20 100644 --- a/src/dht.rs +++ b/src/dht.rs @@ -12,19 +12,19 @@ use flume::{Receiver, Sender, TryRecvError}; use tracing::info; use crate::{ + actor::{ + to_socket_address, Actor, ConcurrencyError, GetRequestSpecific, Info, PutError, + PutQueryError, Response, + }, common::{ hash_immutable, AnnouncePeerRequestArguments, FindNodeRequestArguments, GetPeersRequestArguments, GetValueRequestArguments, Id, MutableItem, PutImmutableRequestArguments, PutMutableRequestArguments, PutRequestSpecific, }, - rpc::{ - to_socket_address, ConcurrencyError, GetRequestSpecific, Info, PutError, PutQueryError, - Response, Rpc, - }, Node, ServerSettings, }; -use crate::rpc::config::Config; +use crate::actor::config::Config; #[derive(Debug, Clone)] /// Mainline Dht node. @@ -501,9 +501,9 @@ impl Iterator for GetIterator { } fn run(config: Config, receiver: Receiver) { - match Rpc::new(config) { - Ok(mut rpc) => { - let address = rpc.local_addr(); + match Actor::new(config) { + Ok(mut actor) => { + let address = actor.local_addr(); info!(?address, "Mainline DHT listening"); let mut put_senders = HashMap::new(); @@ -516,12 +516,12 @@ fn run(config: Config, receiver: Receiver) { let _ = sender.send(Ok(())); } ActorMessage::Info(sender) => { - let _ = sender.send(rpc.info()); + let _ = sender.send(actor.info()); } ActorMessage::Put(request, sender, extra_nodes) => { let target = *request.target(); - match rpc.put(request, extra_nodes) { + match actor.put(request, extra_nodes) { Ok(()) => { let senders = put_senders.entry(target).or_insert(vec![]); @@ -535,7 +535,7 @@ fn run(config: Config, receiver: Receiver) { ActorMessage::Get(request, sender) => { let target = *request.target(); - if let Some(responses) = rpc.get(request, None) { + if let Some(responses) = actor.get(request, None) { for response in responses { send(&sender, response); } @@ -546,11 +546,11 @@ fn run(config: Config, receiver: Receiver) { senders.push(sender); } ActorMessage::ToBootstrap(sender) => { - let _ = sender.send(rpc.routing_table().to_bootstrap()); + let _ = sender.send(actor.routing_table().to_bootstrap()); } ActorMessage::SeedRouting(nodes, sender) => { for node in nodes { - rpc.routing_table_mut().add(node); + actor.routing_table_mut().add(node); } let _ = sender.send(()); } @@ -565,7 +565,7 @@ fn run(config: Config, receiver: Receiver) { } } - let report = rpc.tick(); + let report = actor.tick(); // Response for an ongoing GET query if let Some((target, response)) = report.new_query_response { @@ -912,7 +912,7 @@ mod test { use ed25519_dalek::SigningKey; - use crate::rpc::ConcurrencyError; + use crate::actor::ConcurrencyError; use super::*; diff --git a/src/lib.rs b/src/lib.rs index c6b1114..fc13a9e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,10 +7,13 @@ #![deny(rustdoc::broken_intra_doc_links)] #![cfg_attr(not(test), deny(clippy::unwrap_used))] +#[cfg(feature = "node")] +mod actor; mod common; #[cfg(feature = "node")] +mod core; +#[cfg(feature = "node")] mod dht; -mod rpc; // Public modules #[cfg(feature = "async")] @@ -19,24 +22,25 @@ pub mod async_dht; pub use common::{Id, MutableItem, Node, RoutingTable}; #[cfg(feature = "node")] -pub use dht::{Dht, DhtBuilder, Testnet, TestnetBuilder}; -#[cfg(feature = "node")] -pub use rpc::{ +pub use actor::{ messages::{MessageType, PutRequestSpecific, RequestSpecific}, - server::{RequestFilter, ServerSettings, MAX_INFO_HASHES, MAX_PEERS, MAX_VALUES}, - ClosestNodes, DEFAULT_REQUEST_TIMEOUT, + RequestFilter, ServerSettings, DEFAULT_REQUEST_TIMEOUT, MAX_INFO_HASHES, MAX_PEERS, MAX_VALUES, }; +#[cfg(feature = "node")] +pub use common::ClosestNodes; +#[cfg(feature = "node")] +pub use dht::{Dht, DhtBuilder, Testnet, TestnetBuilder}; pub use ed25519_dalek::SigningKey; pub mod errors { //! Exported errors #[cfg(feature = "node")] + pub use super::actor::{ConcurrencyError, PutError, PutQueryError}; + #[cfg(feature = "node")] pub use super::common::ErrorSpecific; #[cfg(feature = "node")] pub use super::dht::PutMutableError; - #[cfg(feature = "node")] - pub use super::rpc::{ConcurrencyError, PutError, PutQueryError}; pub use super::common::DecodeIdError; pub use super::common::MutableError; diff --git a/src/rpc.rs b/src/rpc.rs deleted file mode 100644 index 1503de2..0000000 --- a/src/rpc.rs +++ /dev/null @@ -1,1123 +0,0 @@ -//! K-RPC implementation. - -mod closest_nodes; -pub(crate) mod config; -mod info; -mod iterative_query; -mod put_query; -pub(crate) mod server; -mod socket; - -use std::collections::HashMap; -use std::net::{SocketAddr, SocketAddrV4, ToSocketAddrs}; -use std::num::NonZeroUsize; -use std::time::{Duration, Instant}; - -use lru::LruCache; -use tracing::{debug, error, info}; - -use iterative_query::IterativeQuery; -use put_query::PutQuery; - -use crate::common::{ - validate_immutable, ErrorSpecific, FindNodeRequestArguments, GetImmutableResponseArguments, - GetMutableResponseArguments, GetPeersResponseArguments, GetValueRequestArguments, Id, Message, - MessageType, MutableItem, NoMoreRecentValueResponseArguments, NoValuesResponseArguments, Node, - PutRequestSpecific, RequestSpecific, RequestTypeSpecific, ResponseSpecific, RoutingTable, - MAX_BUCKET_SIZE_K, -}; -use server::Server; - -use self::messages::{GetPeersRequestArguments, PutMutableRequestArguments}; -use server::ServerSettings; -use socket::KrpcSocket; - -pub use crate::common::messages; -pub use closest_nodes::ClosestNodes; -pub use info::Info; -pub use iterative_query::GetRequestSpecific; -pub use put_query::{ConcurrencyError, PutError, PutQueryError}; -pub use socket::DEFAULT_REQUEST_TIMEOUT; - -pub const DEFAULT_BOOTSTRAP_NODES: [&str; 4] = [ - "router.bittorrent.com:6881", - "dht.transmissionbt.com:6881", - "dht.libtorrent.org:25401", - "relay.pkarr.org:6881", -]; - -const REFRESH_TABLE_INTERVAL: Duration = Duration::from_secs(15 * 60); -const PING_TABLE_INTERVAL: Duration = Duration::from_secs(5 * 60); - -/// Result of `tick_get_queries`: completed queries and whether self-findnode finished. -type GetQueriesResult = (Vec<(Id, Box<[Node]>)>, bool); - -const MAX_CACHED_ITERATIVE_QUERIES: usize = 1000; - -#[derive(Debug)] -/// Internal Rpc called in the Dht thread loop, useful to create your own actor setup. -pub struct Rpc { - // Options - bootstrap: Box<[SocketAddrV4]>, - - socket: KrpcSocket, - - // Routing - /// Closest nodes to this node - routing_table: RoutingTable, - /// Last time we refreshed the routing table with a find_node query. - last_table_refresh: Instant, - /// Last time we pinged nodes in the routing table. - last_table_ping: Instant, - /// Closest responding nodes to specific target - /// - /// as well as the: - /// 1. dht size estimate based on closest claimed nodes, - /// 2. dht size estimate based on closest responding nodes. - /// 3. number of subnets with unique 6 bits prefix in ipv4 - cached_iterative_queries: LruCache, - - // Active IterativeQueries - iterative_queries: HashMap, - /// Put queries are special, since they have to wait for a corresponding - /// get query to finish, update the closest_nodes, then `query_all` these. - put_queries: HashMap, - - /// Sum of Dht size estimates from closest nodes from get queries. - dht_size_estimates_sum: f64, - - /// Sum of Dht size estimates from closest _responding_ nodes from get queries. - responders_based_dht_size_estimates_sum: f64, - responders_based_dht_size_estimates_count: usize, - - /// Sum of the number of subnets with 6 bits prefix in the closest nodes ipv4 - subnets_sum: usize, - - server: Server, - - public_address: Option, - firewalled: bool, -} - -impl Rpc { - /// Creates a new RPC instance and prepares the routing table and socket. - /// - /// This does not perform network IO by itself. Call [`Rpc::tick`] to bootstrap - /// and perform scheduled maintenance. - /// - /// Returns an instance ready to accept `get`/`put` requests and handle incoming - /// messages via [`Rpc::handle_message`] if you integrate it with your socket loop. - pub fn new(config: config::Config) -> Result { - let id = if let Some(ip) = config.public_ip { - Id::from_ip(ip.into()) - } else { - Id::random() - }; - - let socket = KrpcSocket::new(&config)?; - - Ok(Rpc { - bootstrap: config - .bootstrap - .unwrap_or_else(|| to_socket_address(&DEFAULT_BOOTSTRAP_NODES)) - .into(), - socket, - - routing_table: RoutingTable::new(id), - iterative_queries: HashMap::new(), - put_queries: HashMap::new(), - - cached_iterative_queries: LruCache::new( - NonZeroUsize::new(MAX_CACHED_ITERATIVE_QUERIES) - .expect("MAX_CACHED_BUCKETS is NonZeroUsize"), - ), - - last_table_refresh: Instant::now(), - last_table_ping: Instant::now(), - - dht_size_estimates_sum: 0.0, - responders_based_dht_size_estimates_count: 0, - - // Don't store to too many nodes just because you are in a cold start. - responders_based_dht_size_estimates_sum: 1_000_000.0, - subnets_sum: 20, - - server: Server::new(config.server_settings), - - public_address: None, - firewalled: true, - }) - } - - // === Getters === - - /// Returns the node's Id - pub fn id(&self) -> &Id { - self.routing_table.id() - } - - /// Returns the address the server is listening to. - #[inline] - pub fn local_addr(&self) -> SocketAddrV4 { - self.socket.local_addr() - } - - /// Returns the best guess for this node's Public address. - /// - /// If [crate::DhtBuilder::public_ip] was set, this is what will be returned - /// (plus the local port), otherwise it will rely on consensus from - /// responding nodes voting on our public IP and port. - pub fn public_address(&self) -> Option { - self.public_address - } - - /// Returns `true` if we can't confirm that [Self::public_address] is publicly addressable. - /// - /// If this node is firewalled, it won't switch to server mode if it is in adaptive mode, - /// but if [crate::DhtBuilder::server_mode] was set to true, then whether or not this node is firewalled - /// won't matter. - pub fn firewalled(&self) -> bool { - self.firewalled - } - - /// Returns whether or not this node is running in server mode. - pub fn server_mode(&self) -> bool { - self.socket.server_mode - } - - pub fn routing_table(&self) -> &RoutingTable { - &self.routing_table - } - - pub fn routing_table_mut(&mut self) -> &mut RoutingTable { - &mut self.routing_table - } - - /// Returns: - /// 1. Normal Dht size estimate based on all closer `nodes` in query responses. - /// 2. Standard deviaiton as a function of the number of samples used in this estimate. - /// - /// [Read more](https://github.com/pubky/mainline/blob/main/docs/dht_size_estimate.md) - pub fn dht_size_estimate(&self) -> (usize, f64) { - let normal = - self.dht_size_estimates_sum as usize / self.cached_iterative_queries.len().max(1); - - // See https://github.com/pubky/mainline/blob/main/docs/standard-deviation-vs-lookups.png - let std_dev = 0.281 * (self.cached_iterative_queries.len() as f64).powf(-0.529); - - (normal, std_dev) - } - - /// Returns a thread safe and lightweight summary of this node's - /// information and statistics. - pub fn info(&self) -> Info { - Info::from(self) - } - - // === Public Methods === - - /// Advances maintenance and in-flight queries by one step. - /// - /// - Performs routing-table refreshes and liveness checks on schedule. - /// - Progresses outstanding `get`/`put` queries and evicts completed ones. - /// - May emit newly-available query responses. - /// - /// Returns a [`RpcTickReport`] summarizing work done during this call. - /// - /// Call this periodically; typical intervals are tied to IO loop cadence - /// or a fixed timer. Missing calls will delay query completion and degrade - /// the routing table quality. - pub fn tick(&mut self) -> RpcTickReport { - let mut done_put_queries = self.tick_put_queries(); - - let (done_get_queries, finished_self_findnode) = self.tick_get_queries(); - - self.cleanup_done_queries(&done_get_queries, &mut done_put_queries); - - self.periodic_node_maintenance(); - - let new_query_response = self.handle_message(); - - if finished_self_findnode { - self.log_bootstrap(self.id()); - } - - RpcTickReport { - done_get_queries, - done_put_queries, - new_query_response, - } - } - - /// Send a request to the given address and return the transaction_id - pub fn request(&mut self, address: SocketAddrV4, request: RequestSpecific) -> u32 { - self.socket.request(address, request) - } - - /// Send a response to the given address. - pub fn response( - &mut self, - address: SocketAddrV4, - transaction_id: u32, - response: ResponseSpecific, - ) { - self.socket.response(address, transaction_id, response) - } - - /// Send an error to the given address. - pub fn error(&mut self, address: SocketAddrV4, transaction_id: u32, error: ErrorSpecific) { - self.socket.error(address, transaction_id, error) - } - - /// Store a value in the closest nodes, optionally trigger a lookup query if - /// the cached closest_nodes aren't fresh enough. - /// - /// - `request`: the put request. - pub fn put( - &mut self, - request: PutRequestSpecific, - extra_nodes: Option>, - ) -> Result<(), PutError> { - let target = *request.target(); - - if let PutRequestSpecific::PutMutable(PutMutableRequestArguments { - sig, cas, seq, .. - }) = &request - { - if let Some(PutRequestSpecific::PutMutable(inflight_request)) = self - .put_queries - .get(&target) - .map(|existing| &existing.request) - { - debug!(?inflight_request, ?request, "Possible conflict risk"); - - if *sig == inflight_request.sig { - // Noop, the inflight query is sufficient. - return Ok(()); - } else if *seq < inflight_request.seq { - return Err(ConcurrencyError::NotMostRecent)?; - } else if let Some(cas) = cas { - if *cas == inflight_request.seq { - // The user is aware of the inflight query and whiches to overrides it. - // - // Remove the inflight request, and create a new one. - self.put_queries.remove(&target); - } else { - return Err(ConcurrencyError::CasFailed)?; - } - } else { - return Err(ConcurrencyError::ConflictRisk)?; - }; - }; - } - - let mut query = PutQuery::new(target, request.clone(), extra_nodes); - - if let Some(closest_nodes) = self - .cached_iterative_queries - .get(&target) - .map(|cached| cached.closest_responding_nodes.clone()) - .filter(|closest_nodes| { - !closest_nodes.is_empty() && closest_nodes.iter().any(|n| n.valid_token()) - }) - { - query.start(&mut self.socket, &closest_nodes)? - } else { - let salt = match request { - PutRequestSpecific::PutMutable(args) => args.salt, - _ => None, - }; - - self.get( - GetRequestSpecific::GetValue(GetValueRequestArguments { - target, - seq: None, - salt, - }), - None, - ); - }; - - self.put_queries.insert(target, query); - - Ok(()) - } - - /// Send a message to closer and closer nodes until we can't find any more nodes. - /// - /// Queries take few seconds to fully traverse the network, once it is done, it will be removed from - /// self.iterative_queries. But until then, calling [Rpc::get] multiple times, will just return the list - /// of responses seen so far. - /// - /// Subsequent responses can be obtained from the [RpcTickReport::new_query_response] you get after calling [Rpc::tick]. - /// - /// Effectively, we are caching responses and backing off the network for the duration it takes - /// to traverse it. - /// - /// - `request` [RequestTypeSpecific], except [RequestTypeSpecific::Ping] and - /// [RequestTypeSpecific::Put] which will be ignored. - /// - `extra_nodes` option allows the query to visit specific nodes, that won't necessesarily be visited - /// through the query otherwise. - pub fn get( - &mut self, - request: GetRequestSpecific, - extra_nodes: Option<&[SocketAddrV4]>, - ) -> Option> { - let target = match request { - GetRequestSpecific::FindNode(FindNodeRequestArguments { target }) => target, - GetRequestSpecific::GetPeers(GetPeersRequestArguments { info_hash, .. }) => info_hash, - GetRequestSpecific::GetValue(GetValueRequestArguments { target, .. }) => target, - }; - - let response_from_inflight_put_mutable_request = - self.put_queries.get(&target).and_then(|existing| { - if let PutRequestSpecific::PutMutable(request) = &existing.request { - Some(Response::Mutable(request.clone().into())) - } else { - None - } - }); - - // If query is still active, no need to create a new one. - if let Some(query) = self.iterative_queries.get(&target) { - let mut responses = query.responses().to_vec(); - - if let Some(response) = response_from_inflight_put_mutable_request { - responses.push(response); - } - - return Some(responses); - } - - let node_id = self.routing_table.id(); - - if target == *node_id { - debug!(?node_id, "Bootstrapping the routing table"); - } - - let mut query = IterativeQuery::new(*self.id(), target, request); - - // Seed the query either with the closest nodes from the routing table, or the - // bootstrapping nodes if the closest nodes are not enough. - - let routing_table_closest = self.routing_table.closest_secure( - target, - self.responders_based_dht_size_estimate(), - self.average_subnets(), - ); - - // If we don't have enough or any closest nodes, call the bootstrapping nodes. - if routing_table_closest.is_empty() || routing_table_closest.len() < self.bootstrap.len() { - for bootstrapping_node in self.bootstrap.clone() { - query.visit(&mut self.socket, bootstrapping_node); - } - } - - if let Some(extra_nodes) = extra_nodes { - for extra_node in extra_nodes { - query.visit(&mut self.socket, *extra_node) - } - } - - // Seed this query with the closest nodes we know about. - for node in routing_table_closest { - query.add_candidate(node) - } - - if let Some(CachedIterativeQuery { - closest_responding_nodes, - .. - }) = self.cached_iterative_queries.get(&target) - { - for node in closest_responding_nodes { - query.add_candidate(node.clone()) - } - } - - // After adding the nodes, we need to start the query. - query.start(&mut self.socket); - - self.iterative_queries.insert(target, query); - - // If there is an inflight PutQuery for mutable item return its value - if let Some(response) = response_from_inflight_put_mutable_request { - return Some(vec![response]); - } - - None - } - - // === Private Methods === - - /// Handles a single inbound KRPC request. - /// - /// Responsibilities: - /// - During initial bootstrap (no known bootstrap nodes), adds the requester of - /// a `FindNode` to the routing table to seed it. - /// - If running in server mode, forwards the request to the embedded server and - /// emits a response or error using the provided `transaction_id`. - /// - Detects successful NAT traversal: when a `Ping` arrives from our own public - /// address, clears `firewalled`. - /// - Ensures node ID/IP consistency: if our ID is invalid for the observed - /// public IPv4, generates a new secure ID, resets the routing table, and - /// initiates a `FindNode` to repopulate it. - /// - /// Parameters: - /// - `from`: Source socket address of the requester. - /// - `transaction_id`: Transaction ID to echo in any response/error. - /// - `request_specific`: Parsed request payload and type. - /// - /// Side effects: - /// - May add `from` to the routing table (bootstrap exception). - /// - May send a protocol response or error. - /// - May set `firewalled = false` on self-`Ping`. - /// - May rotate this node’s ID, reset the routing table, and trigger a - /// rebootstrap query. - /// - /// Returns: Nothing. - fn handle_request( - &mut self, - from: SocketAddrV4, - transaction_id: u32, - request_specific: RequestSpecific, - ) { - // By default we only add nodes that responds to our requests. - // - // This is the only exception; the first node creating the DHT, - // without this exception, the bootstrapping node's routing table - // will never be populated. - if self.bootstrap.is_empty() { - if let RequestTypeSpecific::FindNode(param) = &request_specific.request_type { - self.routing_table.add(Node::new(param.target, from)); - } - } - - let is_ping = matches!(request_specific.request_type, RequestTypeSpecific::Ping); - - if self.server_mode() { - let server = &mut self.server; - - match server.handle_request(&self.routing_table, from, request_specific) { - Some(MessageType::Error(error)) => { - self.error(from, transaction_id, error); - } - Some(MessageType::Response(response)) => { - self.response(from, transaction_id, response); - } - _ => {} - }; - } - - if let Some(our_address) = self.public_address { - if from == our_address && is_ping { - self.firewalled = false; - - let ipv4 = our_address.ip(); - - // Restarting our routing table with new secure Id if necessary. - if !self.id().is_valid_for_ip(*ipv4) { - let new_id = Id::from_ipv4(*ipv4); - - info!( - "Our current id {} is not valid for adrsess {}. Using new id {}", - self.id(), - our_address, - new_id - ); - - self.get( - GetRequestSpecific::FindNode(FindNodeRequestArguments { target: new_id }), - None, - ); - - self.routing_table = RoutingTable::new(new_id); - } - } - } - } - - /// Handles an inbound KRPC response for RPC, updating in-flight queries and optionally - /// returning a final value for the associated target. - /// - /// Behavior: - /// - Ignores responses from read-only nodes. - /// - If it matches an in-flight PutQuery, treats `Ping` as a storage ACK (success/error) - /// and stops further handling. - /// - If it matches an in-flight iterative query: - /// - Incorporates network info: adds closer candidates, records responder token, - /// and votes on the observed requester IP. - /// - On value responses: - /// - `GetPeers` → returns `(target, Response::Peers)` - /// - `GetImmutable` → validates content; on success returns `(target, Response::Immutable)` - /// - `GetMutable` → verifies record (sig/seq/salt); on success returns `(target, Response::Mutable)` - /// - Logs and continues on `NoValues` / `NoMoreRecentValue` / `Error`. - /// - On any expected response, adds the responder (by author ID) to the routing table. - /// - /// Parameters: - /// - `from`: Responder socket address. - /// - `message`: Decoded KRPC message. - /// - /// Returns: - /// - `Some((target, Response))` when a terminal value is obtained for the query. - /// - `None` otherwise. - fn handle_response(&mut self, from: SocketAddrV4, message: Message) -> Option<(Id, Response)> { - // If someone claims to be readonly, then let's not store anything even if they respond. - if message.read_only { - return None; - }; - - // If the response looks like a Ping response, check StoreQueries for the transaction_id. - if let Some(query) = self - .put_queries - .values_mut() - .find(|query| query.inflight(message.transaction_id)) - { - match message.message_type { - MessageType::Response(ResponseSpecific::Ping(_)) => { - // Mark storage at that node as a success. - query.success(); - } - MessageType::Error(error) => query.error(error), - _ => {} - }; - - return None; - } - - let mut should_add_node = false; - let author_id = message.get_author_id(); - let from_version = message.version.to_owned(); - - // Get corresponding query for message.transaction_id - if let Some(query) = self - .iterative_queries - .values_mut() - .find(|query| query.inflight(message.transaction_id)) - { - // KrpcSocket would not give us a response from the wrong address for the transaction_id - should_add_node = true; - - if let Some(nodes) = message.get_closer_nodes() { - for node in nodes { - query.add_candidate(node.clone()); - } - } - - if let Some((responder_id, token)) = message.get_token() { - query.add_responding_node(Node::new_with_token(responder_id, from, token.into())); - } - - if let Some(proposed_ip) = message.requester_ip { - query.add_address_vote(proposed_ip); - } - - let target = query.target(); - - match message.message_type { - MessageType::Response(ResponseSpecific::GetPeers(GetPeersResponseArguments { - values, - .. - })) => { - let response = Response::Peers(values); - query.response(from, response.clone()); - - return Some((target, response)); - } - MessageType::Response(ResponseSpecific::GetImmutable( - GetImmutableResponseArguments { - v, responder_id, .. - }, - )) => { - if validate_immutable(&v, query.target()) { - let response = Response::Immutable(v); - query.response(from, response.clone()); - - return Some((target, response)); - } - - let target = query.target(); - debug!( - ?v, - ?target, - ?responder_id, - ?from, - ?from_version, - "Invalid immutable value" - ); - } - MessageType::Response(ResponseSpecific::GetMutable( - GetMutableResponseArguments { - v, - seq, - sig, - k, - responder_id, - .. - }, - )) => { - let salt = match query.request.request_type.clone() { - RequestTypeSpecific::GetValue(args) => args.salt, - _ => None, - }; - let target = query.target(); - - match MutableItem::from_dht_message(query.target(), &k, v, seq, &sig, salt) { - Ok(item) => { - let response = Response::Mutable(item); - query.response(from, response.clone()); - - return Some((target, response)); - } - Err(error) => { - debug!( - ?error, - ?from, - ?responder_id, - ?from_version, - "Invalid mutable record" - ); - } - } - } - MessageType::Response(ResponseSpecific::NoMoreRecentValue( - NoMoreRecentValueResponseArguments { - seq, responder_id, .. - }, - )) => { - debug!( - target= ?query.target(), - salt= ?match query.request.request_type.clone() { - RequestTypeSpecific::GetValue(args) => args.salt, - _ => None, - }, - ?seq, - ?from, - ?responder_id, - ?from_version, - "No more recent value" - ); - } - MessageType::Response(ResponseSpecific::NoValues(NoValuesResponseArguments { - responder_id, - .. - })) => { - debug!( - target= ?query.target(), - salt= ?match query.request.request_type.clone() { - RequestTypeSpecific::GetValue(args) => args.salt, - _ => None, - }, - ?from, - ?responder_id, - ?from_version , - "No values" - ); - } - MessageType::Error(error) => { - debug!(?error, ?from_version, "Get query got error response"); - } - // Ping response is already handled in add_node() - // FindNode response is already handled in query.add_candidate() - // Requests are handled elsewhere - MessageType::Response(ResponseSpecific::Ping(_)) - | MessageType::Response(ResponseSpecific::FindNode(_)) - | MessageType::Request(_) => {} - }; - }; - - if should_add_node { - // Add a node to our routing table on any expected incoming response. - - if let Some(id) = author_id { - self.routing_table.add(Node::new(id, from)); - } - } - - None - } - - /// Periodically maintain the routing table: - /// - Switches to server mode if eligible (and refresh is due) - /// - Pings nodes and purges stale entries when needed - /// - Repopulates via bootstrap if table is empty or refresh is due - /// - Updates last_table_refresh and last_table_ping timers as needed - fn periodic_node_maintenance(&mut self) { - let refresh_is_due = self.last_table_refresh.elapsed() >= REFRESH_TABLE_INTERVAL; - let ping_is_due = self.last_table_ping.elapsed() >= PING_TABLE_INTERVAL; - - // Decide first, act once: avoid double populate in the same tick. - let should_populate = self.routing_table.is_empty() || refresh_is_due; - - if refresh_is_due { - self.try_switching_to_server_mode(); - } - - if ping_is_due { - self.ping_and_purge(); - } - - if should_populate { - self.populate(); - } - } - - /// Attempts to switch this node into server mode if eligible. - /// - /// If the node is not currently operating - /// in server mode and is not detected as being behind a firewall, it will promote the - /// node into server mode (by setting the server_mode field to `true`). - /// - /// Server mode enables the node to answer unsolicited requests and fulfill a key - /// responsibility in the DHT. Nodes that are firewalled, or behind NAT, should not - /// enable server mode unless explicitly configured to do so. - fn try_switching_to_server_mode(&mut self) { - if !self.server_mode() && !self.firewalled() { - info!("Adaptive mode: have been running long enough (not firewalled), switching to server mode"); - self.socket.server_mode = true; - } - } - - /// Purge stale nodes and ping nodes that need probing when due is reached. - /// - /// It will purge stale nodes from the routing table and periodcially ping nodes. - /// It will reset the last_table_ping timer. - fn ping_and_purge(&mut self) { - self.last_table_ping = Instant::now(); - - let (to_purge, to_ping) = self.purge_and_ping_candidates(); - - self.purge_nodes(&to_purge); - self.ping_nodes(&to_ping); - - if to_purge.is_empty() && to_ping.is_empty() { - return; - } - - debug!( - removed = to_purge.len(), - pinged = to_ping.len(), - "Node maintenance executed" - ); - } - - /// Pure decision function: compute which nodes to remove and which to ping. - fn purge_and_ping_candidates(&self) -> (Vec, Vec) { - let mut to_purge = Vec::with_capacity(self.routing_table.size()); - let mut to_ping = Vec::with_capacity(self.routing_table.size()); - - for node in self.routing_table.nodes() { - if node.is_stale() { - to_purge.push(*node.id()) - } else if node.should_ping() { - to_ping.push(node.address()) - } - } - - (to_purge, to_ping) - } - - /// Remove nodes from the routing table. - fn purge_nodes(&mut self, ids: &[Id]) { - for id in ids { - self.routing_table.remove(id); - } - } - - /// Ping nodes. - fn ping_nodes(&mut self, addrs: &[SocketAddrV4]) { - for address in addrs { - self.ping(*address); - } - } - - /// Populate routing table by asking bootstrap nodes to find ourselves, - /// Response will allow to add closest nodes candidates to routing table. - /// - /// Reset the last_table_refresh timer. - fn populate(&mut self) { - self.last_table_refresh = Instant::now(); - - if self.bootstrap.is_empty() { - return; - } - - self.get( - GetRequestSpecific::FindNode(FindNodeRequestArguments { target: *self.id() }), - None, - ); - } - - /// Send a ping request to a node. - fn ping(&mut self, address: SocketAddrV4) { - self.socket.request( - address, - RequestSpecific { - requester_id: *self.id(), - request_type: RequestTypeSpecific::Ping, - }, - ); - } - - fn update_address_votes_from_iterative_query(&mut self, query: &IterativeQuery) { - let Some(new_address) = query.best_address() else { - return; - }; - - let needs_confirm = match self.public_address { - None => true, - Some(current) => current != new_address, - }; - - if needs_confirm { - debug!( - ?new_address, - "Query responses suggest a different public_address, trying to confirm.." - ); - - self.firewalled = true; - self.ping(new_address); - } - - self.public_address = Some(new_address); - } - - fn cache_iterative_query(&mut self, query: &IterativeQuery, closest_responding_nodes: &[Node]) { - if self.cached_iterative_queries.len() >= MAX_CACHED_ITERATIVE_QUERIES { - let q = self.cached_iterative_queries.pop_lru(); - self.decrement_cached_iterative_query_stats(q.map(|q| q.1)); - } - - let closest = query.closest(); - let responders = query.responders(); - - if closest.nodes().is_empty() { - // We are clearly offline. - return; - } - - let dht_size_estimate = closest.dht_size_estimate(); - let responders_dht_size_estimate = responders.dht_size_estimate(); - let subnets_count = closest.subnets_count(); - - let previous = self.cached_iterative_queries.put( - query.target(), - CachedIterativeQuery { - closest_responding_nodes: closest_responding_nodes.into(), - dht_size_estimate, - responders_dht_size_estimate, - subnets: subnets_count, - - is_find_node: matches!( - query.request.request_type, - RequestTypeSpecific::FindNode(_) - ), - }, - ); - - self.decrement_cached_iterative_query_stats(previous); - - self.dht_size_estimates_sum += dht_size_estimate; - self.responders_based_dht_size_estimates_sum += responders_dht_size_estimate; - self.subnets_sum += subnets_count as usize; - self.responders_based_dht_size_estimates_count += 1; - } - - fn responders_based_dht_size_estimate(&self) -> usize { - self.responders_based_dht_size_estimates_sum as usize - / self.responders_based_dht_size_estimates_count.max(1) - } - - fn average_subnets(&self) -> usize { - self.subnets_sum / self.cached_iterative_queries.len().max(1) - } - - fn decrement_cached_iterative_query_stats(&mut self, query: Option) { - if let Some(CachedIterativeQuery { - dht_size_estimate, - responders_dht_size_estimate, - subnets, - is_find_node, - .. - }) = query - { - self.dht_size_estimates_sum -= dht_size_estimate; - self.responders_based_dht_size_estimates_sum -= responders_dht_size_estimate; - self.subnets_sum -= subnets as usize; - - if !is_find_node { - self.responders_based_dht_size_estimates_count -= 1; - } - }; - } - - // === tick() helpers === - - /// Advance all PUT queries, return done ones. - fn tick_put_queries(&mut self) -> Vec<(Id, Option)> { - let mut done_put_queries = Vec::with_capacity(self.put_queries.len()); - - for (id, query) in self.put_queries.iter_mut() { - match query.tick(&self.socket) { - Ok(done) => { - if done { - done_put_queries.push((*id, None)); - } - } - Err(error) => done_put_queries.push((*id, Some(error))), - }; - } - - done_put_queries - } - - /// Advance all GET/FIND_NODE queries, return done ones and whether table refresh/find_node to self is finished. - fn tick_get_queries(&mut self) -> GetQueriesResult { - let self_id = *self.id(); - let responders_based_dht_size_estimate = self.responders_based_dht_size_estimate(); - let average_subnets = self.average_subnets(); - - let mut done_get_queries = Vec::with_capacity(self.iterative_queries.len()); - let mut finished_self_findnode = false; - - for (id, query) in self.iterative_queries.iter_mut() { - if !query.tick(&mut self.socket) { - continue; - } - - let closest_nodes = if let RequestTypeSpecific::FindNode(_) = query.request.request_type - { - finished_self_findnode = *id == self_id; - - query - .closest() - .nodes() - .iter() - .take(MAX_BUCKET_SIZE_K) - .cloned() - .collect::>() - } else { - query - .responders() - .take_until_secure(responders_based_dht_size_estimate, average_subnets) - .to_vec() - .into_boxed_slice() - }; - - done_get_queries.push((*id, closest_nodes)); - } - - (done_get_queries, finished_self_findnode) - } - - /// Remove completed GET and PUT queries from internal state. - fn cleanup_done_queries( - &mut self, - done_get: &[(Id, Box<[Node]>)], - done_put: &mut Vec<(Id, Option)>, - ) { - // Has to happen _before_ `self.socket.recv_from()`. - for (id, closest_nodes) in done_get { - let query = match self.iterative_queries.remove(id) { - Some(query) => query, - None => continue, - }; - - self.update_address_votes_from_iterative_query(&query); - self.cache_iterative_query(&query, closest_nodes); - - // Only for get queries, not find node. - if matches!(query.request.request_type, RequestTypeSpecific::FindNode(_)) { - continue; - } - - let put_query = match self.put_queries.get_mut(id) { - Some(put_query) => put_query, - None => continue, - }; - - if put_query.started() { - continue; - } - - if let Err(error) = put_query.start(&mut self.socket, closest_nodes) { - done_put.push((*id, Some(error))) - } - } - - for (id, _) in done_put.iter() { - self.put_queries.remove(id); - } - } - - /// Handle one incoming message, either a request or a response message. One message per tick. - fn handle_message(&mut self) -> Option<(Id, Response)> { - self.socket - .recv_from() - .and_then(|(message, from)| match message.message_type { - MessageType::Request(request_specific) => { - self.handle_request(from, message.transaction_id, request_specific); - None - } - _ => self.handle_response(from, message), - }) - } - - /// Check if routing table is empty and log an error if so. - fn log_bootstrap(&self, self_id: &Id) { - let table_size = self.routing_table.size(); - if table_size == 0 { - error!("Could not bootstrap the routing table"); - } else { - debug!(?self_id, table_size, "Populated the routing table"); - } - } -} - -struct CachedIterativeQuery { - closest_responding_nodes: Box<[Node]>, - dht_size_estimate: f64, - responders_dht_size_estimate: f64, - subnets: u8, - - /// Keeping track of find_node queries, because they shouldn't - /// be counted in `responders_based_dht_size_estimates_count` - is_find_node: bool, -} - -/// State change after a call to [Rpc::tick], including -/// done PUT, GET, and FIND_NODE queries, as well as any -/// incoming value response for any GET query. -#[derive(Debug, Clone)] -pub struct RpcTickReport { - /// All the [Id]s of the done [Rpc::get] queries. - pub done_get_queries: Vec<(Id, Box<[Node]>)>, - /// All the [Id]s of the done [Rpc::put] queries, - /// and optional [PutError] if the query failed. - pub done_put_queries: Vec<(Id, Option)>, - /// Received GET query response. - pub new_query_response: Option<(Id, Response)>, -} - -#[derive(Debug, Clone)] -pub enum Response { - Peers(Vec), - Immutable(Box<[u8]>), - Mutable(MutableItem), -} - -pub(crate) fn to_socket_address(bootstrap: &[T]) -> Vec { - bootstrap - .iter() - .flat_map(|s| { - s.to_socket_addrs().map(|addrs| { - addrs - .filter_map(|addr| match addr { - SocketAddr::V4(addr_v4) => Some(addr_v4), - _ => None, - }) - .collect::>() - }) - }) - .flatten() - .collect() -}