Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dashmap = "6.1"
flume = "0.12.0"
colored = "3.1.1"
chrono = "0.4"
cpu-time = "1.0"

[features]
## Include [Dht] node.
Expand Down
140 changes: 140 additions & 0 deletions examples/bench_cpu.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//! Measures CPU utilization of a DHT node under homeserver-like conditions.
//!
//! Emulates how pubky-homeserver uses mainline:
//! - Server mode on the real DHT
//! - Periodic put_mutable to republish signed packets (homeserver key + user keys)
//! - get_mutable to verify publication reached enough nodes
//!
//! Reports CPU % for idle maintenance vs active republish cycles.
//!
//! Run: `cargo run --release --example bench_cpu`

use cpu_time::ProcessTime;
use mainline::{Dht, MutableItem, SigningKey};
use std::thread;
use std::time::{Duration, Instant};

fn random_signing_key() -> SigningKey {
let mut bytes = [0u8; 32];
getrandom::fill(&mut bytes).expect("getrandom failed");
SigningKey::from_bytes(&bytes)
}

/// Number of simulated user keys to republish.
const USER_COUNT: usize = 10;

/// Homeserver republishes user keys with up to 12 concurrent threads.
const REPUBLISH_CONCURRENCY: usize = 12;

fn main() {
println!("cpu_usage (homeserver simulation, {USER_COUNT} users)\n");

// Homeserver starts a server-mode DHT node on the real network.
let dht = Dht::server().expect("failed to create DHT node");

print!("bootstrapping...");
let start = Instant::now();
while !dht.bootstrapped() {
if start.elapsed() > Duration::from_secs(30) {
println!(" timeout after 30s");
return;
}
thread::sleep(Duration::from_secs(1));
}
println!(" done ({:.1}s)\n", start.elapsed().as_secs_f64());

// Phase 1: idle — node does routing table maintenance and responds to
// incoming queries from the real network. This is what a homeserver
// looks like between republish cycles (most of the time).
let phase_duration = Duration::from_secs(30);

let cpu_before = ProcessTime::now();
let wall_before = Instant::now();
thread::sleep(phase_duration);
let idle_cpu = cpu_before.elapsed();
let idle_wall = wall_before.elapsed();

println!("idle ({:.0}s)", idle_wall.as_secs_f64());
println!(
" cpu: {:.2}s / {:.1}s = {:.1}%\n",
idle_cpu.as_secs_f64(),
idle_wall.as_secs_f64(),
cpu_pct(idle_cpu, idle_wall),
);

// Phase 2: republish cycle — simulate what the homeserver does every 4 hours.
// Generate a homeserver key + N user keys, then publish them all with
// put_mutable and verify with get_mutable, using 12 concurrent threads
// (matching the homeserver's MultiRepublisher concurrency).
let homeserver_key = random_signing_key();
let user_keys: Vec<SigningKey> = (0..USER_COUNT).map(|_| random_signing_key()).collect();

// Collect all keys: homeserver + users.
let mut all_keys = vec![&homeserver_key];
all_keys.extend(user_keys.iter());

let cpu_before = ProcessTime::now();
let wall_before = Instant::now();

// Republish in batches of REPUBLISH_CONCURRENCY, matching homeserver behavior.
let mut published = 0usize;
let mut verified = 0usize;

for chunk in all_keys.chunks(REPUBLISH_CONCURRENCY) {
let handles: Vec<_> = chunk
.iter()
.map(|key| {
let dht = dht.clone();
let item = MutableItem::new((*key).clone(), b"bench_cpu_packet", 1, None);
let pub_key = key.verifying_key().to_bytes();

thread::spawn(move || {
// put_mutable — publish the signed packet.
let put_ok = dht.put_mutable(item, None).is_ok();

// get_mutable — verify it reached nodes (homeserver counts responses).
let mut node_count = 0usize;
if put_ok {
for _item in dht.get_mutable(&pub_key, None, None) {
node_count += 1;
}
}

(put_ok, node_count)
})
})
.collect();

for h in handles {
if let Ok((put_ok, node_count)) = h.join() {
if put_ok {
published += 1;
}
if node_count > 0 {
verified += 1;
}
}
}
}

let republish_cpu = cpu_before.elapsed();
let republish_wall = wall_before.elapsed();

println!(
"republish cycle ({} keys, {} published, {} verified, {:.0}s)",
all_keys.len(),
published,
verified,
republish_wall.as_secs_f64(),
);
println!(
" cpu: {:.2}s / {:.1}s = {:.1}%\n",
republish_cpu.as_secs_f64(),
republish_wall.as_secs_f64(),
cpu_pct(republish_cpu, republish_wall),
);
}

fn cpu_pct(cpu: Duration, wall: Duration) -> f64 {
cpu.as_secs_f64() / wall.as_secs_f64() * 100.0
}
65 changes: 65 additions & 0 deletions examples/bench_latency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//! Single-operation latency on a 100-node local testnet.
//!
//! Measures wall-clock time for individual get_immutable and put_immutable
//! calls. Catches regressions in the iterative query algorithm, routing
//! table lookups, or actor tick processing.
//!
//! Run: `cargo run --release --example bench_latency`

use mainline::Testnet;
use std::time::{Duration, Instant};

fn main() {
println!("latency\n");

let size = 100;
let testnet = Testnet::builder(size).build().unwrap();
let nodes = &testnet.nodes;

// GET: publish a value, then time gets from different nodes.
let target = nodes[0].put_immutable(b"bench_latency_get").unwrap();

let samples = 30;
let mut timings = Vec::with_capacity(samples);

for i in 0..samples {
let node_idx = (i % (size - 1)) + 1;
let start = Instant::now();
let _ = nodes[node_idx].get_immutable(target);
timings.push(start.elapsed());
}

println!("get_immutable ({size} nodes, {samples} samples)");
print_stats(&timings);

// PUT: time each put_immutable individually.
let samples = 10;
let mut timings = Vec::with_capacity(samples);

for i in 0..samples {
let value = format!("bench_latency_put_{i}");
let node_idx = (i % (size - 1)) + 1;
let start = Instant::now();
let _ = nodes[node_idx].put_immutable(value.as_bytes());
timings.push(start.elapsed());
}

println!("put_immutable ({size} nodes, {samples} samples)");
print_stats(&timings);
}

fn print_stats(timings: &[Duration]) {
let mut ms: Vec<f64> = timings.iter().map(|d| d.as_secs_f64() * 1000.0).collect();
ms.sort_by(|a, b| a.partial_cmp(b).unwrap());
let n = ms.len();
let mean: f64 = ms.iter().sum::<f64>() / n as f64;

println!(
" min={:.1}ms mean={:.1}ms p50={:.1}ms p95={:.1}ms max={:.1}ms\n",
ms[0],
mean,
ms[n / 2],
ms[n * 95 / 100],
ms[n - 1],
);
}
49 changes: 49 additions & 0 deletions examples/bench_scalability.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//! Query cost at varying network sizes.
//!
//! Runs the same workload across testnets of 10 to 100 nodes and reports
//! init time and get latency (p50/p95). In a Kademlia DHT, query cost
//! should grow O(log n) — if these numbers grow faster, there is a
//! scaling bug in the tick loop or routing table maintenance.
//!
//! Run: `cargo run --release --example bench_scalability`

use mainline::Testnet;
use std::time::Instant;

fn main() {
println!("scalability\n");
println!(
"{:<8} {:<10} {:<10} {:<10}",
"nodes", "init", "get_p50", "get_p95"
);

for size in [10, 25, 50, 100] {
let init_start = Instant::now();
let testnet = Testnet::builder(size).build().unwrap();
let init = init_start.elapsed();
let nodes = &testnet.nodes;

// Publish a value, then measure gets from different nodes.
let target = nodes[0].put_immutable(b"bench_scaling").unwrap();

let samples = 20;
let mut ms: Vec<f64> = Vec::with_capacity(samples);

for i in 0..samples {
let node_idx = if size > 1 { (i % (size - 1)) + 1 } else { 0 };
let start = Instant::now();
let _ = nodes[node_idx].get_immutable(target);
ms.push(start.elapsed().as_secs_f64() * 1000.0);
}
ms.sort_by(|a, b| a.partial_cmp(b).unwrap());
let n = ms.len();

println!(
"{:<8} {:<10} {:<10} {:<10}",
size,
format!("{:.2}s", init.as_secs_f64()),
format!("{:.1}ms", ms[n / 2]),
format!("{:.1}ms", ms[n * 95 / 100]),
);
}
}
83 changes: 83 additions & 0 deletions examples/bench_throughput.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//! Concurrent query throughput on a 100-node local testnet.
//!
//! Fires N parallel get_immutable calls and measures total wall-clock time
//! and ops/sec. Increasing concurrency should scale roughly linearly —
//! if ops/sec drops or wall time grows disproportionately, there is
//! contention in the actor loop or channel backpressure.
//!
//! Each concurrency level is run 5 times; the median is reported to
//! reduce noise from CPU scheduling and localhost contention.
//!
//! Run: `cargo run --release --example bench_throughput`

use mainline::Testnet;
use std::thread;
use std::time::Instant;

const RUNS: usize = 5;

fn main() {
println!("throughput ({RUNS} runs, median)\n");

let testnet = Testnet::builder(100).build().unwrap();

// Publish values upfront so all gets have something to find.
let max_concurrency = 100;
let mut targets = Vec::with_capacity(max_concurrency);
for i in 0..max_concurrency {
let value = format!("bench_throughput_{i}");
let target = testnet.nodes[0].put_immutable(value.as_bytes()).unwrap();
targets.push(target);
}

println!(
"{:<12} {:<10} {:<10} {:<10}",
"concurrency", "wall", "ops/s", "ok"
);

for concurrency in [1, 10, 50, 100] {
let mut ops_per_sec = Vec::with_capacity(RUNS);
let mut walls = Vec::with_capacity(RUNS);
let mut all_ok = true;

for _ in 0..RUNS {
let start = Instant::now();

let handles: Vec<_> = targets[..concurrency]
.iter()
.enumerate()
.map(|(i, &target)| {
let dht = testnet.nodes[(i % 99) + 1].clone();
thread::spawn(move || dht.get_immutable(target).is_some())
})
.collect();

let ok = handles
.into_iter()
.filter_map(|h| h.join().ok())
.filter(|&ok| ok)
.count();

let wall = start.elapsed();
ops_per_sec.push(concurrency as f64 / wall.as_secs_f64());
walls.push(wall.as_secs_f64());

if ok != concurrency {
all_ok = false;
}
}

ops_per_sec.sort_by(|a, b| a.partial_cmp(b).unwrap());
walls.sort_by(|a, b| a.partial_cmp(b).unwrap());
let median_ops = ops_per_sec[RUNS / 2];
let median_wall = walls[RUNS / 2];

println!(
"{:<12} {:<10} {:<10} {}",
concurrency,
format!("{:.2}s", median_wall),
format!("{:.0}", median_ops),
if all_ok { "ok" } else { "MISS" },
);
}
}
Loading