Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,3 @@ default = ["full"]

[package.metadata.docs.rs]
all-features = true

13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,19 @@ If you want to explicitly start in Server mode, because you know you are not run
you can call `Dht::builder().server_mode().build()`, and you can optionally add your known public ip so the node doesn't have to depend on,
votes from responding nodes: `Dht::builder().server_mode().public_ip().build()`.

## Benchmarks

There are manual benchmarks in `benches/` for tracking performance of core DHT
operations. Each file has a doc comment describing what it measures and what
regressions it catches. Run them with:

```sh
cargo run --release --features full --bin latency
cargo run --release --features full --bin throughput
cargo run --release --features full --bin scalability
cargo run --release --features full --bin routing_table
```

## Acknowledgment

This implementation was possible thanks to [Webtorrent's Bittorrent-dht](https://github.com/webtorrent/bittorrent-dht) as a reference,
Expand Down
File renamed without changes.
76 changes: 76 additions & 0 deletions src/actor/handle_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::net::SocketAddrV4;

use tracing::info;

use crate::common::{
FindNodeRequestArguments, Id, MessageType, Node, RequestSpecific, RequestTypeSpecific,
RoutingTable,
};
use crate::core::iterative_query::GetRequestSpecific;

use super::Actor;

impl Actor {
/// Handle an inbound KRPC request: forward to server, detect NAT traversal,
/// and rotate our ID if it doesn't match our public IP.
pub(super) fn handle_request(
&mut self,
from: SocketAddrV4,
transaction_id: u32,
request_specific: RequestSpecific,
) {
// By default we only add nodes that responds to our requests.
//
// This is the only exception; the first node creating the DHT,
// without this exception, the bootstrapping node's routing table
// will never be populated.
if self.bootstrap.is_empty() {
if let RequestTypeSpecific::FindNode(param) = &request_specific.request_type {
self.routing_table.add(Node::new(param.target, from));
}
}

let is_ping = matches!(request_specific.request_type, RequestTypeSpecific::Ping);

if self.server_mode() {
let server = &mut self.server;

match server.handle_request(&self.routing_table, from, request_specific) {
Some(MessageType::Error(error)) => {
self.error(from, transaction_id, error);
}
Some(MessageType::Response(response)) => {
self.response(from, transaction_id, response);
}
_ => {}
};
}

if let Some(our_address) = self.public_address {
if from == our_address && is_ping {
self.firewalled = false;

let ipv4 = our_address.ip();

// Restarting our routing table with new secure Id if necessary.
if !self.id().is_valid_for_ip(*ipv4) {
let new_id = Id::from_ipv4(*ipv4);

info!(
"Our current id {} is not valid for address {}. Using new id {}",
self.id(),
our_address,
new_id
);

self.get(
GetRequestSpecific::FindNode(FindNodeRequestArguments { target: new_id }),
None,
);

self.routing_table = RoutingTable::new(new_id);
}
}
}
}
}
197 changes: 197 additions & 0 deletions src/actor/handle_response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
use std::net::SocketAddrV4;

use tracing::debug;

use crate::common::{
validate_immutable, GetImmutableResponseArguments, GetMutableResponseArguments,
GetPeersResponseArguments, Id, Message, MessageType, MutableItem,
NoMoreRecentValueResponseArguments, NoValuesResponseArguments, Node, RequestTypeSpecific,
ResponseSpecific,
};

use super::{Actor, Response};

impl Actor {
/// Handle an inbound KRPC response: match it to a put or iterative query,
/// validate any returned value, and add the responder to the routing table.
///
/// Returns `Some((target, response))` when a value is obtained.
pub(super) fn handle_response(
&mut self,
from: SocketAddrV4,
message: Message,
) -> Option<(Id, Response)> {
// If someone claims to be readonly, then let's not store anything even if they respond.
if message.read_only {
return None;
};

// If the response looks like a Ping response, check StoreQueries for the transaction_id.
if let Some(query) = self
.put_queries
.values_mut()
.find(|query| query.inflight(message.transaction_id))
{
match message.message_type {
MessageType::Response(ResponseSpecific::Ping(_)) => {
// Mark storage at that node as a success.
query.success();
}
MessageType::Error(error) => query.error(error),
_ => {}
};

return None;
}

let mut should_add_node = false;
let author_id = message.get_author_id();
let from_version = message.version.to_owned();

// Get corresponding query for message.transaction_id
if let Some(query) = self
.iterative_queries
.values_mut()
.find(|query| query.inflight(message.transaction_id))
{
// KrpcSocket would not give us a response from the wrong address for the transaction_id
should_add_node = true;

if let Some(nodes) = message.get_closer_nodes() {
for node in nodes {
query.add_candidate(node.clone());
}
}

if let Some((responder_id, token)) = message.get_token() {
query.add_responding_node(Node::new_with_token(responder_id, from, token.into()));
}

if let Some(proposed_ip) = message.requester_ip {
query.add_address_vote(proposed_ip);
}

let target = query.target();

match message.message_type {
MessageType::Response(ResponseSpecific::GetPeers(GetPeersResponseArguments {
values,
..
})) => {
let response = Response::Peers(values);
query.response(from, response.clone());

return Some((target, response));
}
MessageType::Response(ResponseSpecific::GetImmutable(
GetImmutableResponseArguments {
v, responder_id, ..
},
)) => {
if validate_immutable(&v, query.target()) {
let response = Response::Immutable(v);
query.response(from, response.clone());

return Some((target, response));
}

let target = query.target();
debug!(
?v,
?target,
?responder_id,
?from,
?from_version,
"Invalid immutable value"
);
}
MessageType::Response(ResponseSpecific::GetMutable(
GetMutableResponseArguments {
v,
seq,
sig,
k,
responder_id,
..
},
)) => {
let salt = match query.request.request_type.clone() {
RequestTypeSpecific::GetValue(args) => args.salt,
_ => None,
};
let target = query.target();

match MutableItem::from_dht_message(query.target(), &k, v, seq, &sig, salt) {
Ok(item) => {
let response = Response::Mutable(item);
query.response(from, response.clone());

return Some((target, response));
}
Err(error) => {
debug!(
?error,
?from,
?responder_id,
?from_version,
"Invalid mutable record"
);
}
}
}
MessageType::Response(ResponseSpecific::NoMoreRecentValue(
NoMoreRecentValueResponseArguments {
seq, responder_id, ..
},
)) => {
debug!(
target= ?query.target(),
salt= ?match query.request.request_type.clone() {
RequestTypeSpecific::GetValue(args) => args.salt,
_ => None,
},
?seq,
?from,
?responder_id,
?from_version,
"No more recent value"
);
}
MessageType::Response(ResponseSpecific::NoValues(NoValuesResponseArguments {
responder_id,
..
})) => {
debug!(
target= ?query.target(),
salt= ?match query.request.request_type.clone() {
RequestTypeSpecific::GetValue(args) => args.salt,
_ => None,
},
?from,
?responder_id,
?from_version ,
"No values"
);
}
MessageType::Error(error) => {
debug!(?error, ?from_version, "Get query got error response");
}
// Ping/FindNode: no value payload; node is added to routing table below.
// Requests are handled elsewhere.
MessageType::Response(ResponseSpecific::Ping(_))
| MessageType::Response(ResponseSpecific::FindNode(_))
| MessageType::Request(_) => {}
};
};

if should_add_node {
// Add a node to our routing table on any expected incoming response.

if let Some(id) = author_id {
self.routing_table.add(Node::new(id, from));
}
}

None
}
}
18 changes: 9 additions & 9 deletions src/rpc/info.rs → src/actor/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::net::SocketAddrV4;

use crate::Id;

use super::Rpc;
use super::Actor;

/// Information and statistics about this mainline node.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -56,15 +56,15 @@ impl Info {
}
}

impl From<&Rpc> for Info {
fn from(rpc: &Rpc) -> Self {
impl From<&Actor> for Info {
fn from(actor: &Actor) -> Self {
Self {
id: *rpc.id(),
local_addr: rpc.local_addr(),
dht_size_estimate: rpc.dht_size_estimate(),
public_address: rpc.public_address(),
firewalled: rpc.firewalled(),
server_mode: rpc.server_mode(),
id: *actor.id(),
local_addr: actor.local_addr(),
dht_size_estimate: actor.dht_size_estimate(),
public_address: actor.public_address(),
firewalled: actor.firewalled(),
server_mode: actor.server_mode(),
}
}
}
Loading