diff --git a/Cargo.toml b/Cargo.toml index d8f4c8e..bb8972f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ dashmap = "6.1" flume = "0.12.0" colored = "3.1.1" chrono = "0.4" +cpu-time = "1.0" [features] ## Include [Dht] node. diff --git a/examples/bench_cpu.rs b/examples/bench_cpu.rs new file mode 100644 index 0000000..177312e --- /dev/null +++ b/examples/bench_cpu.rs @@ -0,0 +1,140 @@ +//! Measures CPU utilization of a DHT node under homeserver-like conditions. +//! +//! Emulates how pubky-homeserver uses mainline: +//! - Server mode on the real DHT +//! - Periodic put_mutable to republish signed packets (homeserver key + user keys) +//! - get_mutable to verify publication reached enough nodes +//! +//! Reports CPU % for idle maintenance vs active republish cycles. +//! +//! Run: `cargo run --release --example bench_cpu` + +use cpu_time::ProcessTime; +use mainline::{Dht, MutableItem, SigningKey}; +use std::thread; +use std::time::{Duration, Instant}; + +fn random_signing_key() -> SigningKey { + let mut bytes = [0u8; 32]; + getrandom::fill(&mut bytes).expect("getrandom failed"); + SigningKey::from_bytes(&bytes) +} + +/// Number of simulated user keys to republish. +const USER_COUNT: usize = 10; + +/// Homeserver republishes user keys with up to 12 concurrent threads. +const REPUBLISH_CONCURRENCY: usize = 12; + +fn main() { + println!("cpu_usage (homeserver simulation, {USER_COUNT} users)\n"); + + // Homeserver starts a server-mode DHT node on the real network. + let dht = Dht::server().expect("failed to create DHT node"); + + print!("bootstrapping..."); + let start = Instant::now(); + while !dht.bootstrapped() { + if start.elapsed() > Duration::from_secs(30) { + println!(" timeout after 30s"); + return; + } + thread::sleep(Duration::from_secs(1)); + } + println!(" done ({:.1}s)\n", start.elapsed().as_secs_f64()); + + // Phase 1: idle — node does routing table maintenance and responds to + // incoming queries from the real network. This is what a homeserver + // looks like between republish cycles (most of the time). + let phase_duration = Duration::from_secs(30); + + let cpu_before = ProcessTime::now(); + let wall_before = Instant::now(); + thread::sleep(phase_duration); + let idle_cpu = cpu_before.elapsed(); + let idle_wall = wall_before.elapsed(); + + println!("idle ({:.0}s)", idle_wall.as_secs_f64()); + println!( + " cpu: {:.2}s / {:.1}s = {:.1}%\n", + idle_cpu.as_secs_f64(), + idle_wall.as_secs_f64(), + cpu_pct(idle_cpu, idle_wall), + ); + + // Phase 2: republish cycle — simulate what the homeserver does every 4 hours. + // Generate a homeserver key + N user keys, then publish them all with + // put_mutable and verify with get_mutable, using 12 concurrent threads + // (matching the homeserver's MultiRepublisher concurrency). + let homeserver_key = random_signing_key(); + let user_keys: Vec = (0..USER_COUNT).map(|_| random_signing_key()).collect(); + + // Collect all keys: homeserver + users. + let mut all_keys = vec![&homeserver_key]; + all_keys.extend(user_keys.iter()); + + let cpu_before = ProcessTime::now(); + let wall_before = Instant::now(); + + // Republish in batches of REPUBLISH_CONCURRENCY, matching homeserver behavior. + let mut published = 0usize; + let mut verified = 0usize; + + for chunk in all_keys.chunks(REPUBLISH_CONCURRENCY) { + let handles: Vec<_> = chunk + .iter() + .map(|key| { + let dht = dht.clone(); + let item = MutableItem::new((*key).clone(), b"bench_cpu_packet", 1, None); + let pub_key = key.verifying_key().to_bytes(); + + thread::spawn(move || { + // put_mutable — publish the signed packet. + let put_ok = dht.put_mutable(item, None).is_ok(); + + // get_mutable — verify it reached nodes (homeserver counts responses). + let mut node_count = 0usize; + if put_ok { + for _item in dht.get_mutable(&pub_key, None, None) { + node_count += 1; + } + } + + (put_ok, node_count) + }) + }) + .collect(); + + for h in handles { + if let Ok((put_ok, node_count)) = h.join() { + if put_ok { + published += 1; + } + if node_count > 0 { + verified += 1; + } + } + } + } + + let republish_cpu = cpu_before.elapsed(); + let republish_wall = wall_before.elapsed(); + + println!( + "republish cycle ({} keys, {} published, {} verified, {:.0}s)", + all_keys.len(), + published, + verified, + republish_wall.as_secs_f64(), + ); + println!( + " cpu: {:.2}s / {:.1}s = {:.1}%\n", + republish_cpu.as_secs_f64(), + republish_wall.as_secs_f64(), + cpu_pct(republish_cpu, republish_wall), + ); +} + +fn cpu_pct(cpu: Duration, wall: Duration) -> f64 { + cpu.as_secs_f64() / wall.as_secs_f64() * 100.0 +} diff --git a/examples/bench_latency.rs b/examples/bench_latency.rs new file mode 100644 index 0000000..e8ace5b --- /dev/null +++ b/examples/bench_latency.rs @@ -0,0 +1,65 @@ +//! Single-operation latency on a 100-node local testnet. +//! +//! Measures wall-clock time for individual get_immutable and put_immutable +//! calls. Catches regressions in the iterative query algorithm, routing +//! table lookups, or actor tick processing. +//! +//! Run: `cargo run --release --example bench_latency` + +use mainline::Testnet; +use std::time::{Duration, Instant}; + +fn main() { + println!("latency\n"); + + let size = 100; + let testnet = Testnet::builder(size).build().unwrap(); + let nodes = &testnet.nodes; + + // GET: publish a value, then time gets from different nodes. + let target = nodes[0].put_immutable(b"bench_latency_get").unwrap(); + + let samples = 30; + let mut timings = Vec::with_capacity(samples); + + for i in 0..samples { + let node_idx = (i % (size - 1)) + 1; + let start = Instant::now(); + let _ = nodes[node_idx].get_immutable(target); + timings.push(start.elapsed()); + } + + println!("get_immutable ({size} nodes, {samples} samples)"); + print_stats(&timings); + + // PUT: time each put_immutable individually. + let samples = 10; + let mut timings = Vec::with_capacity(samples); + + for i in 0..samples { + let value = format!("bench_latency_put_{i}"); + let node_idx = (i % (size - 1)) + 1; + let start = Instant::now(); + let _ = nodes[node_idx].put_immutable(value.as_bytes()); + timings.push(start.elapsed()); + } + + println!("put_immutable ({size} nodes, {samples} samples)"); + print_stats(&timings); +} + +fn print_stats(timings: &[Duration]) { + let mut ms: Vec = timings.iter().map(|d| d.as_secs_f64() * 1000.0).collect(); + ms.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let n = ms.len(); + let mean: f64 = ms.iter().sum::() / n as f64; + + println!( + " min={:.1}ms mean={:.1}ms p50={:.1}ms p95={:.1}ms max={:.1}ms\n", + ms[0], + mean, + ms[n / 2], + ms[n * 95 / 100], + ms[n - 1], + ); +} diff --git a/examples/bench_scalability.rs b/examples/bench_scalability.rs new file mode 100644 index 0000000..4e8ee14 --- /dev/null +++ b/examples/bench_scalability.rs @@ -0,0 +1,49 @@ +//! Query cost at varying network sizes. +//! +//! Runs the same workload across testnets of 10 to 100 nodes and reports +//! init time and get latency (p50/p95). In a Kademlia DHT, query cost +//! should grow O(log n) — if these numbers grow faster, there is a +//! scaling bug in the tick loop or routing table maintenance. +//! +//! Run: `cargo run --release --example bench_scalability` + +use mainline::Testnet; +use std::time::Instant; + +fn main() { + println!("scalability\n"); + println!( + "{:<8} {:<10} {:<10} {:<10}", + "nodes", "init", "get_p50", "get_p95" + ); + + for size in [10, 25, 50, 100] { + let init_start = Instant::now(); + let testnet = Testnet::builder(size).build().unwrap(); + let init = init_start.elapsed(); + let nodes = &testnet.nodes; + + // Publish a value, then measure gets from different nodes. + let target = nodes[0].put_immutable(b"bench_scaling").unwrap(); + + let samples = 20; + let mut ms: Vec = Vec::with_capacity(samples); + + for i in 0..samples { + let node_idx = if size > 1 { (i % (size - 1)) + 1 } else { 0 }; + let start = Instant::now(); + let _ = nodes[node_idx].get_immutable(target); + ms.push(start.elapsed().as_secs_f64() * 1000.0); + } + ms.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let n = ms.len(); + + println!( + "{:<8} {:<10} {:<10} {:<10}", + size, + format!("{:.2}s", init.as_secs_f64()), + format!("{:.1}ms", ms[n / 2]), + format!("{:.1}ms", ms[n * 95 / 100]), + ); + } +} diff --git a/examples/bench_throughput.rs b/examples/bench_throughput.rs new file mode 100644 index 0000000..51b0502 --- /dev/null +++ b/examples/bench_throughput.rs @@ -0,0 +1,83 @@ +//! Concurrent query throughput on a 100-node local testnet. +//! +//! Fires N parallel get_immutable calls and measures total wall-clock time +//! and ops/sec. Increasing concurrency should scale roughly linearly — +//! if ops/sec drops or wall time grows disproportionately, there is +//! contention in the actor loop or channel backpressure. +//! +//! Each concurrency level is run 5 times; the median is reported to +//! reduce noise from CPU scheduling and localhost contention. +//! +//! Run: `cargo run --release --example bench_throughput` + +use mainline::Testnet; +use std::thread; +use std::time::Instant; + +const RUNS: usize = 5; + +fn main() { + println!("throughput ({RUNS} runs, median)\n"); + + let testnet = Testnet::builder(100).build().unwrap(); + + // Publish values upfront so all gets have something to find. + let max_concurrency = 100; + let mut targets = Vec::with_capacity(max_concurrency); + for i in 0..max_concurrency { + let value = format!("bench_throughput_{i}"); + let target = testnet.nodes[0].put_immutable(value.as_bytes()).unwrap(); + targets.push(target); + } + + println!( + "{:<12} {:<10} {:<10} {:<10}", + "concurrency", "wall", "ops/s", "ok" + ); + + for concurrency in [1, 10, 50, 100] { + let mut ops_per_sec = Vec::with_capacity(RUNS); + let mut walls = Vec::with_capacity(RUNS); + let mut all_ok = true; + + for _ in 0..RUNS { + let start = Instant::now(); + + let handles: Vec<_> = targets[..concurrency] + .iter() + .enumerate() + .map(|(i, &target)| { + let dht = testnet.nodes[(i % 99) + 1].clone(); + thread::spawn(move || dht.get_immutable(target).is_some()) + }) + .collect(); + + let ok = handles + .into_iter() + .filter_map(|h| h.join().ok()) + .filter(|&ok| ok) + .count(); + + let wall = start.elapsed(); + ops_per_sec.push(concurrency as f64 / wall.as_secs_f64()); + walls.push(wall.as_secs_f64()); + + if ok != concurrency { + all_ok = false; + } + } + + ops_per_sec.sort_by(|a, b| a.partial_cmp(b).unwrap()); + walls.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let median_ops = ops_per_sec[RUNS / 2]; + let median_wall = walls[RUNS / 2]; + + println!( + "{:<12} {:<10} {:<10} {}", + concurrency, + format!("{:.2}s", median_wall), + format!("{:.0}", median_ops), + if all_ok { "ok" } else { "MISS" }, + ); + } +}