diff --git a/src/aggregator.rs b/src/aggregator.rs new file mode 100644 index 00000000..418f4d30 --- /dev/null +++ b/src/aggregator.rs @@ -0,0 +1,289 @@ +use std::hash::DefaultHasher; +use std::{collections::HashMap, hash::Hash, hash::Hasher}; + +use tracing::warn; + +use crate::profile::{RawAggregatedProfile, RawAggregatedSample, RawSample}; + +#[derive(Default)] +pub struct Aggregator {} + +impl Aggregator { + pub fn aggregate(&self, raw_samples: Vec) -> RawAggregatedProfile { + if raw_samples.is_empty() { + return Vec::new(); + } + + let mut sample_hash_to_aggregated: HashMap = HashMap::new(); + for sample in raw_samples { + if sample.ustack.is_none() & sample.kstack.is_none() { + warn!( + "No stack present in provided sample={}, skipping...", + sample + ); + continue; + } + + let mut hasher = DefaultHasher::new(); + sample.hash(&mut hasher); + let sample_hash = hasher.finish(); + + sample_hash_to_aggregated + .entry(sample_hash) + .and_modify(|aggregated_sample| aggregated_sample.count += 1) + .or_insert(RawAggregatedSample { sample, count: 1 }); + } + sample_hash_to_aggregated.into_values().collect() + } +} + +#[cfg(test)] +mod tests { + use crate::aggregator::Aggregator; + use crate::bpf::profiler_bindings::native_stack_t; + use crate::profile::RawSample; + + #[test] + fn test_aggregate_raw_samples() { + // Given + let mut ustack1_data = [0; 127]; + ustack1_data[0] = 0xffff; + ustack1_data[1] = 0xdeadbeef; + let ustack1 = Some(native_stack_t { + addresses: ustack1_data, + len: 2, + }); + + let mut kstack1_data = [0; 127]; + kstack1_data[0] = 0xffff; + kstack1_data[1] = 0xdddd; + kstack1_data[2] = 0xaaaa; + kstack1_data[3] = 0xeeee; + kstack1_data[4] = 0xaaae; + let kstack1 = Some(native_stack_t { + addresses: kstack1_data, + len: 5, + }); + + let raw_sample_1 = RawSample { + pid: 1234, + tid: 1235, + ustack: ustack1, + kstack: kstack1, + }; + + let mut ustack2_data = [0; 127]; + ustack2_data[0] = 0xdddd; + ustack2_data[1] = 0xfeedbee; + ustack2_data[0] = 0xddddef; + ustack2_data[1] = 0xbeefdad; + let ustack2 = Some(native_stack_t { + addresses: ustack2_data, + len: 4, + }); + + let raw_sample_2 = RawSample { + pid: 1234, + tid: 1235, + ustack: ustack2, + kstack: None, + }; + + let raw_samples = vec![ + raw_sample_1, + raw_sample_2, + raw_sample_1, + raw_sample_2, + raw_sample_2, + raw_sample_2, + ]; + + let aggregator = Aggregator::default(); + + // When + let raw_aggregated_profile = aggregator.aggregate(raw_samples); + + // Then + assert_eq!(raw_aggregated_profile.len(), 2); + for sample in raw_aggregated_profile { + if sample.sample == raw_sample_1 { + assert_eq!(sample.count, 2); + } else { + assert_eq!(sample.count, 4); + } + } + } + + #[test] + fn test_aggregate_raw_samples_same_ustack_diff_kstack() { + let mut ustack_data = [0; 127]; + ustack_data[0] = 0xffff; + ustack_data[1] = 0xdeadbeef; + let ustack = Some(native_stack_t { + addresses: ustack_data, + len: 2, + }); + + let mut kstack1_data = [0; 127]; + kstack1_data[0] = 0xffff; + kstack1_data[1] = 0xdddd; + kstack1_data[2] = 0xaaaa; + kstack1_data[3] = 0xeeee; + kstack1_data[4] = 0xaaae; + let kstack1 = Some(native_stack_t { + addresses: kstack1_data, + len: 5, + }); + + let raw_sample_1 = RawSample { + pid: 1234, + tid: 1235, + ustack, + kstack: kstack1, + }; + + let raw_sample_2 = RawSample { + pid: 1234, + tid: 1235, + ustack, + kstack: None, + }; + + let raw_samples = vec![raw_sample_1, raw_sample_2, raw_sample_2]; + + let aggregator = Aggregator::default(); + + // When + let raw_aggregated_profile = aggregator.aggregate(raw_samples); + + // Then + assert_eq!(raw_aggregated_profile.len(), 2); + for sample in raw_aggregated_profile { + if sample.sample == raw_sample_1 { + assert_eq!(sample.count, 1); + } else { + assert_eq!(sample.count, 2); + } + } + } + + #[test] + fn test_aggregate_raw_samples_diff_ustack_same_kstack() { + let mut ustack1_data = [0; 127]; + ustack1_data[0] = 0xffff; + ustack1_data[1] = 0xdeadbeef; + let ustack1 = Some(native_stack_t { + addresses: ustack1_data, + len: 2, + }); + + let mut kstack1_data = [0; 127]; + kstack1_data[0] = 0xffff; + kstack1_data[1] = 0xdddd; + kstack1_data[2] = 0xaaaa; + kstack1_data[3] = 0xeeee; + kstack1_data[4] = 0xaaae; + let kstack1 = Some(native_stack_t { + addresses: kstack1_data, + len: 5, + }); + + let raw_sample_1 = RawSample { + pid: 1234, + tid: 1235, + ustack: ustack1, + kstack: kstack1, + }; + + let mut ustack2_data = [0; 127]; + ustack2_data[0] = 0xdddd; + ustack2_data[1] = 0xfeedbee; + ustack2_data[0] = 0xddddef; + ustack2_data[1] = 0xbeefdad; + let ustack2 = Some(native_stack_t { + addresses: ustack2_data, + len: 4, + }); + + let raw_sample_2 = RawSample { + pid: 1234, + tid: 1235, + ustack: ustack2, + kstack: kstack1, + }; + + let raw_samples = vec![ + raw_sample_1, + raw_sample_2, + raw_sample_1, + raw_sample_2, + raw_sample_1, + ]; + + let aggregator = Aggregator::default(); + + // When + let raw_aggregated_profile = aggregator.aggregate(raw_samples); + + // Then + assert_eq!(raw_aggregated_profile.len(), 2); + for sample in raw_aggregated_profile { + if sample.sample == raw_sample_1 { + assert_eq!(sample.count, 3); + } else { + assert_eq!(sample.count, 2); + } + } + } + + #[test] + fn test_aggregate_same_stack_traces_different_pid_tid() { + let mut ustack_data = [0; 127]; + ustack_data[0] = 0xffff; + ustack_data[1] = 0xdeadbeef; + let ustack = Some(native_stack_t { + addresses: ustack_data, + len: 2, + }); + + let mut kstack_data = [0; 127]; + kstack_data[0] = 0xffff; + kstack_data[1] = 0xdddd; + kstack_data[2] = 0xaaaa; + let kstack = Some(native_stack_t { + addresses: kstack_data, + len: 5, + }); + + let raw_sample_1 = RawSample { + pid: 1234, + tid: 1235, + ustack, + kstack, + }; + + let raw_sample_2 = RawSample { + pid: 1234, + tid: 1236, + ustack, + kstack, + }; + + let raw_sample_3 = RawSample { + pid: 123, + tid: 124, + ustack, + kstack, + }; + + let raw_samples = vec![raw_sample_1, raw_sample_2, raw_sample_3]; + + let aggregator = Aggregator::default(); + + // When + let raw_aggregated_profile = aggregator.aggregate(raw_samples); + + // Then + assert_eq!(raw_aggregated_profile.len(), 3); + } +} diff --git a/src/bpf/profiler.bpf.c b/src/bpf/profiler.bpf.c index a0cb5d8c..756f203a 100644 --- a/src/bpf/profiler.bpf.c +++ b/src/bpf/profiler.bpf.c @@ -14,18 +14,16 @@ #include struct { - __uint(type, BPF_MAP_TYPE_HASH); - __uint(max_entries, 100000); - __type(key, u64); - __type(value, native_stack_t); -} stacks SEC(".maps"); + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 256 * 1024 /* 256 KB */); // adjust max entries based on frequency on userspace -- todo +} stacks_rb SEC(".maps"); struct { - __uint(type, BPF_MAP_TYPE_HASH); - __uint(max_entries, MAX_AGGREGATED_STACKS_ENTRIES); - __type(key, stack_count_key_t); - __type(value, u64); -} aggregated_stacks SEC(".maps"); + __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); + __uint(key_size, sizeof(u32)); + __uint(value_size, sizeof(u32)); + // adjust max entries based on frequency on userspace - todo +} stacks SEC(".maps"); struct { __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); @@ -317,75 +315,37 @@ static __always_inline bool retrieve_task_registers(u64 *ip, u64 *sp, u64 *bp, u return true; } -static __always_inline void * -bpf_map_lookup_or_try_init(void *map, const void *key, const void *init) { - void *val; - long err; - - val = bpf_map_lookup_elem(map, key); - if (val) { - return val; - } - - err = bpf_map_update_elem(map, key, init, BPF_NOEXIST); - if (err && !STACK_COLLISION(err)) { - LOG("[error] bpf_map_lookup_or_try_init with ret: %d", err); - return 0; - } - - return bpf_map_lookup_elem(map, key); -} - -// Aggregate the given stacktrace. static __always_inline void add_stack(struct bpf_perf_event_data *ctx, - u64 pid_tgid, - unwind_state_t *unwind_state) { - stack_count_key_t *stack_key = &unwind_state->stack_key; +u64 pid_tgid, +unwind_state_t *unwind_state) { + // Get the kernel stack + int ret = bpf_get_stack(ctx, unwind_state->raw_stack.kernel_stack.addresses, MAX_STACK_DEPTH * sizeof(u64), 0); + if (ret >= 0) { + unwind_state->raw_stack.kernel_stack.len = ret / sizeof(u64); + } int per_process_id = pid_tgid >> 32; int per_thread_id = pid_tgid; - stack_key->pid = per_process_id; - stack_key->task_id = per_thread_id; - - // Hash and add user stack. - if (unwind_state->stack.len >= 1) { - u64 user_stack_id = hash_stack(&unwind_state->stack); - int err = bpf_map_update_elem(&stacks, &user_stack_id, &unwind_state->stack, - BPF_ANY); - if (err == 0) { - stack_key->user_stack_id = user_stack_id; - } else { - LOG("[error] failed to insert user stack: %d", err); - } - } + unwind_state->raw_stack.stack_key.pid = per_process_id; + unwind_state->raw_stack.stack_key.task_id = per_thread_id; - // Walk, hash and add kernel stack. - int ret = bpf_get_stack(ctx, unwind_state->stack.addresses, MAX_STACK_DEPTH * sizeof(u64), 0); - if (ret >= 0) { - unwind_state->stack.len = ret / sizeof(u64); - } + u64 timestamp = bpf_ktime_get_boot_ns() + walltime_at_system_boot_ns; + unwind_state->raw_stack.stack_key.collected_at = timestamp; - if (unwind_state->stack.len >= 1) { - u64 kernel_stack_id = hash_stack(&unwind_state->stack); - int err = bpf_map_update_elem(&stacks, &kernel_stack_id, &unwind_state->stack, - BPF_ANY); - if (err == 0) { - stack_key->kernel_stack_id = kernel_stack_id; - } else { - LOG("[error] failed to insert kernel stack: %d", err); - } + if (lightswitch_config.use_ring_buffers) { + ret = bpf_ringbuf_output(&stacks_rb, &(unwind_state->raw_stack), sizeof(raw_stack_t), 0); + } else { + ret = bpf_perf_event_output(ctx, &stacks, BPF_F_CURRENT_CPU, &(unwind_state->raw_stack), sizeof(raw_stack_t)); } - // Insert aggregated stack. - u64 zero = 0; - u64 *count = bpf_map_lookup_or_try_init(&aggregated_stacks, - &unwind_state->stack_key, &zero); - if (count) { - __sync_fetch_and_add(count, 1); + if (ret < 0) { + bpf_printk("add_stack failed ret=%d, use_ring_buffers=", ret); + bump_unwind_error_failure_adding_stack(); } } + // The unwinding machinery lives here. SEC("perf_event") int dwarf_unwind(struct bpf_perf_event_data *ctx) { @@ -404,7 +364,7 @@ int dwarf_unwind(struct bpf_perf_event_data *ctx) { for (int i = 0; i < MAX_STACK_DEPTH_PER_PROGRAM; i++) { // LOG("[debug] Within unwinding machinery loop"); - LOG("## frame: %d", unwind_state->stack.len); + LOG("## frame: %d",unwind_state->raw_stack.stack.len); LOG("\tcurrent pc: %llx", unwind_state->ip); LOG("\tcurrent sp: %llx", unwind_state->sp); @@ -522,11 +482,11 @@ int dwarf_unwind(struct bpf_perf_event_data *ctx) { } // Add address to stack. - u64 len = unwind_state->stack.len; + u64 len =unwind_state->raw_stack.stack.len; // Appease the verifier. if (len >= 0 && len < MAX_STACK_DEPTH) { - unwind_state->stack.addresses[len] = unwind_state->ip; - unwind_state->stack.len++; + unwind_state->raw_stack.stack.addresses[len] = unwind_state->ip; + unwind_state->raw_stack.stack.len++; } if (found_rbp_type == RBP_TYPE_REGISTER || @@ -610,7 +570,7 @@ int dwarf_unwind(struct bpf_perf_event_data *ctx) { #ifdef __TARGET_ARCH_arm64 // Special handling for leaf frame. - if (unwind_state->stack.len == 0) { + if (unwind_state->raw_stack.stack.len == 0) { previous_rip = unwind_state->lr; } else { // This is guaranteed by the Aarch64 ABI. @@ -671,7 +631,7 @@ int dwarf_unwind(struct bpf_perf_event_data *ctx) { bump_unwind_success_dwarf(); return 0; - } else if (unwind_state->stack.len < MAX_STACK_DEPTH && + } else if (unwind_state->raw_stack.stack.len < MAX_STACK_DEPTH && unwind_state->tail_calls < MAX_TAIL_CALLS) { LOG("Continuing walking the stack in a tail call, current tail %d", unwind_state->tail_calls); @@ -687,13 +647,13 @@ int dwarf_unwind(struct bpf_perf_event_data *ctx) { // Set up the initial unwinding state. static __always_inline bool set_initial_state(unwind_state_t *unwind_state, bpf_user_pt_regs_t *regs) { - unwind_state->stack.len = 0; - unwind_state->tail_calls = 0; + unwind_state->raw_stack.stack.len = 0; + unwind_state->raw_stack.kernel_stack.len = 0; + unwind_state->tail_calls = 0; - unwind_state->stack_key.pid = 0; - unwind_state->stack_key.task_id = 0; - unwind_state->stack_key.user_stack_id = 0; - unwind_state->stack_key.kernel_stack_id = 0; + unwind_state->raw_stack.stack_key.pid = 0; + unwind_state->raw_stack.stack_key.task_id = 0; + unwind_state->raw_stack.stack_key.collected_at = 0; if (in_kernel(PT_REGS_IP(regs))) { if (!retrieve_task_registers(&unwind_state->ip, &unwind_state->sp, &unwind_state->bp, &unwind_state->lr)) { diff --git a/src/bpf/profiler.h b/src/bpf/profiler.h index 38ed16e3..41cb7623 100644 --- a/src/bpf/profiler.h +++ b/src/bpf/profiler.h @@ -50,7 +50,6 @@ typedef struct { u32 high_index; } page_value_t; -#define MAX_AGGREGATED_STACKS_ENTRIES 10000 // Values for dwarf expressions. #define DWARF_EXPRESSION_UNKNOWN 0 @@ -109,6 +108,7 @@ struct unwinder_stats_t { u64 error_sending_new_process_event; u64 error_cfa_offset_did_not_fit; u64 error_rbp_offset_did_not_fit; + u64 error_failure_adding_stack; u64 bp_non_zero_for_bottom_frame; u64 vdso_encountered; u64 jit_encountered; @@ -120,10 +120,7 @@ const volatile struct lightswitch_config_t lightswitch_config = { .use_task_pt_regs_helper = false, }; -// A different stack produced the same hash. -#define STACK_COLLISION(err) (err == -EEXIST) -// Tried to read a kernel stack from a non-kernel context. -#define IN_USERSPACE(err) (err == -EFAULT) +const volatile u64 walltime_at_system_boot_ns = 0; #define LOG(fmt, ...) \ ({ \ @@ -172,12 +169,17 @@ typedef struct { typedef struct { int task_id; int pid; - unsigned long long user_stack_id; - unsigned long long kernel_stack_id; -} stack_count_key_t; + u64 collected_at; +} stack_key_t; typedef struct { + stack_key_t stack_key; native_stack_t stack; + native_stack_t kernel_stack; +} raw_stack_t; + +typedef struct { + raw_stack_t raw_stack; unsigned long long ip; unsigned long long sp; @@ -185,7 +187,6 @@ typedef struct { unsigned long long lr; int tail_calls; - stack_count_key_t stack_key; } unwind_state_t; enum event_type { @@ -202,32 +203,3 @@ typedef struct { enum program { PROGRAM_NATIVE_UNWINDER = 0, }; - -#define BIG_CONSTANT(x) (x##LLU) -unsigned long long hash_stack(native_stack_t *stack) { - const unsigned long long m = BIG_CONSTANT(0xc6a4a7935bd1e995); - const int r = 47; - const int seed = 123; - - unsigned long long hash = seed ^ (stack->len * m); - - for (unsigned long long i = 0; i < MAX_STACK_DEPTH; i++) { - // The stack is not zeroed when we initialise it, we simply - // set the length to zero. This is a workaround to produce - // the same hash for two stacks that might have garbage values - // after their length. - unsigned long long k = 0; - if (i < stack->len) { - k = stack->addresses[i]; - } - - k *= m; - k ^= k >> r; - k *= m; - - hash ^= k; - hash *= m; - } - - return hash; -} diff --git a/src/bpf/profiler_bindings.rs b/src/bpf/profiler_bindings.rs index 0a8bbbf9..0c30f716 100644 --- a/src/bpf/profiler_bindings.rs +++ b/src/bpf/profiler_bindings.rs @@ -9,7 +9,8 @@ use crate::unwind_info::types::CompactUnwindRow; include!(concat!(env!("OUT_DIR"), "/profiler_bindings.rs")); -unsafe impl Plain for stack_count_key_t {} +unsafe impl Plain for raw_stack_t {} +unsafe impl Plain for stack_key_t {} unsafe impl Plain for native_stack_t {} unsafe impl Plain for Event {} unsafe impl Plain for unwinder_stats_t {} @@ -67,6 +68,8 @@ impl Add for unwinder_stats_t { + other.error_cfa_offset_did_not_fit, error_rbp_offset_did_not_fit: self.error_rbp_offset_did_not_fit + other.error_rbp_offset_did_not_fit, + error_failure_adding_stack: self.error_failure_adding_stack + + other.error_failure_adding_stack, bp_non_zero_for_bottom_frame: self.bp_non_zero_for_bottom_frame + other.bp_non_zero_for_bottom_frame, vdso_encountered: self.vdso_encountered + other.vdso_encountered, diff --git a/src/bpf/shared_maps.h b/src/bpf/shared_maps.h index fe44ef09..527346e7 100644 --- a/src/bpf/shared_maps.h +++ b/src/bpf/shared_maps.h @@ -45,6 +45,7 @@ DEFINE_COUNTER(error_binary_search_exhausted_iterations); DEFINE_COUNTER(error_sending_new_process_event); DEFINE_COUNTER(error_cfa_offset_did_not_fit); DEFINE_COUNTER(error_rbp_offset_did_not_fit); +DEFINE_COUNTER(error_failure_adding_stack); DEFINE_COUNTER(bp_non_zero_for_bottom_frame); DEFINE_COUNTER(vdso_encountered); DEFINE_COUNTER(jit_encountered); diff --git a/src/cli/args.rs b/src/cli/args.rs index 8795f72d..50184ee8 100644 --- a/src/cli/args.rs +++ b/src/cli/args.rs @@ -107,18 +107,6 @@ pub(crate) struct CliArgs { // Print out info on eBPF map sizes #[arg(long, help = "Print eBPF map sizes after creation")] pub(crate) mapsize_info: bool, - #[arg( - long, - default_value_t = ProfilerConfig::default().mapsize_stacks, - help = "max number of individual stacks to capture before aggregation" - )] - pub(crate) mapsize_stacks: u32, - #[arg( - long, - default_value_t = ProfilerConfig::default().mapsize_aggregated_stacks, - help = "max number of unique stacks after aggregation" - )] - pub(crate) mapsize_aggregated_stacks: u32, #[arg( long, default_value_t = ProfilerConfig::default().mapsize_rate_limits, diff --git a/src/cli/main.rs b/src/cli/main.rs index 89d93bf4..b09f7edb 100644 --- a/src/cli/main.rs +++ b/src/cli/main.rs @@ -192,8 +192,6 @@ fn main() -> Result<(), Box> { sample_freq: args.sample_freq, perf_buffer_bytes: args.perf_buffer_bytes, mapsize_info: args.mapsize_info, - mapsize_stacks: args.mapsize_stacks, - mapsize_aggregated_stacks: args.mapsize_aggregated_stacks, mapsize_rate_limits: args.mapsize_rate_limits, exclude_self: args.exclude_self, debug_info_manager, @@ -353,7 +351,7 @@ mod tests { cmd.arg("--help"); cmd.assert().success(); let actual = String::from_utf8(cmd.unwrap().stdout).unwrap(); - insta::assert_yaml_snapshot!(actual, @r#""Usage: lightswitch [OPTIONS] [COMMAND]\n\nCommands:\n object-info \n show-unwind \n system-info \n help Print this message or the help of the given subcommand(s)\n\nOptions:\n --pids \n Specific PIDs to profile\n\n -D, --duration \n How long this agent will run in seconds\n \n [default: 18446744073709551615]\n\n --libbpf-debug\n Enable libbpf logs. This includes the BPF verifier output\n\n --bpf-logging\n Enable BPF programs logging\n\n --logging \n Set lightswitch's logging level\n \n [default: info]\n [possible values: trace, debug, info, warn, error]\n\n --sample-freq \n Per-CPU Sampling Frequency in Hz\n \n [default: 19]\n\n --profile-format \n Output file for Flame Graph in SVG format\n \n [default: flame-graph]\n [possible values: none, flame-graph, pprof]\n\n --profile-path \n Path for the generated profile\n\n --profile-name \n Name for the generated profile\n\n --sender \n Where to write the profile\n \n [default: local-disk]\n\n Possible values:\n - none: Discard the profile. Used for kernel tests\n - local-disk\n - remote\n\n --server-url \n \n\n --token \n \n\n --perf-buffer-bytes \n Size of each profiler perf buffer, in bytes (must be a power of 2)\n \n [default: 524288]\n\n --mapsize-info\n Print eBPF map sizes after creation\n\n --mapsize-stacks \n max number of individual stacks to capture before aggregation\n \n [default: 100000]\n\n --mapsize-aggregated-stacks \n max number of unique stacks after aggregation\n \n [default: 10000]\n\n --mapsize-rate-limits \n max number of rate limit entries\n \n [default: 5000]\n\n --exclude-self\n Do not profile the profiler (myself)\n\n --symbolizer \n [default: local]\n [possible values: local, none]\n\n --debug-info-backend \n [default: none]\n [possible values: none, copy, remote]\n\n --max-native-unwind-info-size-mb \n approximate max size in megabytes used for the BPF maps that hold unwind information\n \n [default: 2147483647]\n\n --enable-deadlock-detector\n enable parking_lot's deadlock detector\n\n --cache-dir-base \n [default: /tmp]\n\n --killswitch-path-override \n Override the default path to the killswitch file (/tmp/lighswitch/killswitch) which prevents the profiler from starting\n\n --unsafe-start\n Force the profiler to start even if the system killswitch is enabled\n\n --force-perf-buffer\n force perf buffers even if ring buffers can be used\n\n -h, --help\n Print help (see a summary with '-h')\n""#); + insta::assert_yaml_snapshot!(actual, @r#""Usage: lightswitch [OPTIONS] [COMMAND]\n\nCommands:\n object-info \n show-unwind \n system-info \n help Print this message or the help of the given subcommand(s)\n\nOptions:\n --pids \n Specific PIDs to profile\n\n -D, --duration \n How long this agent will run in seconds\n \n [default: 18446744073709551615]\n\n --libbpf-debug\n Enable libbpf logs. This includes the BPF verifier output\n\n --bpf-logging\n Enable BPF programs logging\n\n --logging \n Set lightswitch's logging level\n \n [default: info]\n [possible values: trace, debug, info, warn, error]\n\n --sample-freq \n Per-CPU Sampling Frequency in Hz\n \n [default: 19]\n\n --profile-format \n Output file for Flame Graph in SVG format\n \n [default: flame-graph]\n [possible values: none, flame-graph, pprof]\n\n --profile-path \n Path for the generated profile\n\n --profile-name \n Name for the generated profile\n\n --sender \n Where to write the profile\n \n [default: local-disk]\n\n Possible values:\n - none: Discard the profile. Used for kernel tests\n - local-disk\n - remote\n\n --server-url \n \n\n --token \n \n\n --perf-buffer-bytes \n Size of each profiler perf buffer, in bytes (must be a power of 2)\n \n [default: 524288]\n\n --mapsize-info\n Print eBPF map sizes after creation\n\n --mapsize-rate-limits \n max number of rate limit entries\n \n [default: 5000]\n\n --exclude-self\n Do not profile the profiler (myself)\n\n --symbolizer \n [default: local]\n [possible values: local, none]\n\n --debug-info-backend \n [default: none]\n [possible values: none, copy, remote]\n\n --max-native-unwind-info-size-mb \n approximate max size in megabytes used for the BPF maps that hold unwind information\n \n [default: 2147483647]\n\n --enable-deadlock-detector\n enable parking_lot's deadlock detector\n\n --cache-dir-base \n [default: /tmp]\n\n --killswitch-path-override \n Override the default path to the killswitch file (/tmp/lighswitch/killswitch) which prevents the profiler from starting\n\n --unsafe-start\n Force the profiler to start even if the system killswitch is enabled\n\n --force-perf-buffer\n force perf buffers even if ring buffers can be used\n\n -h, --help\n Print help (see a summary with '-h')\n""#); } #[rstest] diff --git a/src/lib.rs b/src/lib.rs index b1416091..c1a169e4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod aggregator; pub mod bpf; pub mod collector; pub mod debug_info; diff --git a/src/profile/sample.rs b/src/profile/sample.rs index f76d97c0..8e46b4bc 100644 --- a/src/profile/sample.rs +++ b/src/profile/sample.rs @@ -12,7 +12,7 @@ use crate::process::Pid; use crate::process::ProcessInfo; use crate::profile::Frame; -#[derive(Debug, Hash, Eq, PartialEq)] +#[derive(Debug, Hash, Eq, PartialEq, Copy, Clone)] pub struct RawSample { pub pid: Pid, pub tid: Pid, @@ -47,7 +47,7 @@ impl fmt::Display for RawSample { } } -#[derive(Debug, Hash, Eq, PartialEq)] +#[derive(Debug, PartialEq)] pub struct RawAggregatedSample { pub sample: RawSample, pub count: u64, diff --git a/src/profiler.rs b/src/profiler.rs index 2333bf53..b6e4fe4f 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -34,6 +34,7 @@ use memmap2::MmapOptions; use procfs; use tracing::{debug, error, info, span, warn, Level}; +use crate::aggregator::Aggregator; use crate::bpf::profiler_bindings::*; use crate::bpf::profiler_skel::{OpenProfilerSkel, ProfilerSkel, ProfilerSkelBuilder}; use crate::bpf::tracers_bindings::*; @@ -131,6 +132,11 @@ pub struct Profiler { // Profile channel profile_send: Arc>, profile_receive: Arc>, + // A vector of raw samples received from bpf in the current profiling session + raw_samples: Vec, + // Raw sample channel. Used for receiving raw samples from the rinbuf/perfbuf poll thread + raw_sample_send: Arc>, + raw_sample_receive: Arc>, /// For how long to profile. duration: Duration, /// Per-CPU sampling frequency in Hz. @@ -151,6 +157,7 @@ pub struct Profiler { max_native_unwind_info_size_mb: i32, unwind_info_manager: UnwindInfoManager, use_ring_buffers: bool, + aggregator: Aggregator, metadata_provider: ThreadSafeGlobalMetadataProvider, } @@ -163,8 +170,6 @@ pub struct ProfilerConfig { pub perf_buffer_bytes: usize, pub session_duration: Duration, pub mapsize_info: bool, - pub mapsize_stacks: u32, - pub mapsize_aggregated_stacks: u32, pub mapsize_rate_limits: u32, pub exclude_self: bool, pub native_unwind_info_bucket_sizes: Vec, @@ -185,8 +190,6 @@ impl Default for ProfilerConfig { perf_buffer_bytes: 512 * 1024, session_duration: Duration::from_secs(5), mapsize_info: false, - mapsize_stacks: 100000, - mapsize_aggregated_stacks: 10000, mapsize_rate_limits: 5000, exclude_self: false, native_unwind_info_bucket_sizes: vec![ @@ -321,20 +324,7 @@ impl Profiler { map_shapes } - pub fn set_profiler_map_sizes( - open_skel: &mut OpenProfilerSkel, - profiler_config: &ProfilerConfig, - ) { - open_skel - .maps - .stacks - .set_max_entries(profiler_config.mapsize_stacks) - .expect("Unable to set stacks map max_entries"); - open_skel - .maps - .aggregated_stacks - .set_max_entries(profiler_config.mapsize_aggregated_stacks) - .expect("Unable to set aggregated_stacks map max_entries"); + pub fn setup_profiler_maps(open_skel: &mut OpenProfilerSkel, profiler_config: &ProfilerConfig) { open_skel .maps .rate_limits @@ -353,7 +343,22 @@ impl Profiler { .use_ring_buffers .write(profiler_config.use_ring_buffers); + // Set baseline for calculating raw_sample collection wall clock time + // as bpf currently only supports getting the offset since system boot. + open_skel.maps.rodata_data.walltime_at_system_boot_ns = Self::walltime_at_system_boot(); + + let max_raw_sample_entries = Profiler::get_stacks_sampling_buffer_size( + profiler_config.sample_freq as u32, + profiler_config.session_duration, + ); + if profiler_config.use_ring_buffers { + open_skel + .maps + .stacks_rb + .set_max_entries(max_raw_sample_entries) + .expect("failed to set stacks_rb max entries"); + // Even set to zero it will create as many entries as CPUs. open_skel .maps @@ -361,6 +366,12 @@ impl Profiler { .set_max_entries(0) .expect("set perf buffer entries to zero as it's unused"); } else { + open_skel + .maps + .stacks + .set_max_entries(max_raw_sample_entries) + .expect("failed to set stacks max entries"); + // Seems like ring buffers need to have size of at least 1... // It will use at least a page. open_skel @@ -395,20 +406,16 @@ impl Profiler { pub fn show_actual_profiler_map_sizes(bpf: &ProfilerSkel) { info!("BPF map sizes:"); - info!( - "stacks: {}", - bpf.maps.stacks.info().unwrap().info.max_entries - ); - info!( - "aggregated_stacks: {}", - bpf.maps.aggregated_stacks.info().unwrap().info.max_entries - ); info!( "rate_limits: {}", bpf.maps.rate_limits.info().unwrap().info.max_entries ); } + fn walltime_at_system_boot() -> u64 { + procfs::boot_time().unwrap().timestamp_nanos_opt().unwrap() as u64 + } + pub fn new( profiler_config: ProfilerConfig, stop_signal_receive: Receiver<()>, @@ -452,7 +459,7 @@ impl Profiler { &mut open_skel, &profiler_config.native_unwind_info_bucket_sizes, ); - Self::set_profiler_map_sizes(&mut open_skel, &profiler_config); + Self::setup_profiler_maps(&mut open_skel, &profiler_config); let native_unwinder = ManuallyDrop::new(open_skel.load().expect("load skel")); @@ -524,6 +531,10 @@ impl Profiler { let profile_send = Arc::new(sender); let profile_receive = Arc::new(receiver); + let (sender, receiver) = unbounded(); + let raw_sample_sender = Arc::new(sender); + let raw_sample_receiver = Arc::new(receiver); + Profiler { cache_dir, _links: Vec::new(), @@ -542,6 +553,9 @@ impl Profiler { filter_pids: HashMap::new(), profile_send, profile_receive, + raw_samples: Vec::new(), + raw_sample_send: raw_sample_sender, + raw_sample_receive: raw_sample_receiver, duration: profiler_config.duration, sample_freq: profiler_config.sample_freq, perf_buffer_bytes: profiler_config.perf_buffer_bytes, @@ -552,6 +566,7 @@ impl Profiler { max_native_unwind_info_size_mb: profiler_config.max_native_unwind_info_size_mb, unwind_info_manager: UnwindInfoManager::new(&unwind_cache_dir, None), use_ring_buffers: profiler_config.use_ring_buffers, + aggregator: Aggregator::default(), metadata_provider, } } @@ -702,15 +717,24 @@ impl Profiler { } } - pub fn run(mut self, collector: ThreadSafeCollector) -> Duration { - // In this case, we only want to calculate maximum sampling buffer sizes based on the + fn get_stacks_sampling_buffer_size(sample_freq: u32, session_duration: Duration) -> u32 { + // In this case, we only want to calculate maximum sampling buffer size based on the // number of "online" CPUs, not "possible" CPUs, which they sometimes differ. - let num_cpus = get_online_cpus().expect("get online CPUs").len() as u64; - let max_samples_per_session = self.sample_freq * num_cpus * self.session_duration.as_secs(); - if max_samples_per_session >= MAX_AGGREGATED_STACKS_ENTRIES.into() { - warn!("samples might be lost due to too many samples in a profile session"); - } + let num_cpus: u32 = get_online_cpus().expect("get online CPUs").len() as u32; + let max_entries: u32 = sample_freq * num_cpus * session_duration.as_secs() as u32; + info!( + "num_cpus={} sample_freq={} duration={} max_entries={}", + num_cpus, + sample_freq, + session_duration.as_secs(), + max_entries + ); + // max_entries + 4096 + } + + pub fn run(mut self, collector: ThreadSafeCollector) -> Duration { self.setup_perf_events(); self.set_bpf_map_info(); self.add_kernel_modules(); @@ -718,6 +742,16 @@ impl Profiler { self.tracers.attach().expect("attach tracers"); let chan_send = self.new_proc_chan_send.clone(); + let raw_sample_send = self.raw_sample_send.clone(); + + self.start_poll_thread( + "raw_samples", + &self.native_unwinder.maps.stacks_rb, + &self.native_unwinder.maps.stacks, + move |data| Self::handle_stack(&raw_sample_send, data), + Self::handle_lost_stack, + ); + self.start_poll_thread( "unwinder_events", &self.native_unwinder.maps.events_rb, @@ -790,7 +824,15 @@ impl Profiler { debug!("collecting profiles on schedule"); let profile = self.collect_profile(); self.send_profile(profile); - } + }, + recv(self.raw_sample_receive) -> raw_sample => { + if let Ok(raw_sample) = raw_sample { + self.raw_samples.push(raw_sample); + } + else { + warn!("Failed to receive raw sample, what={:?}", raw_sample.err()); + } + }, recv(self.tracers_chan_receive) -> read => { match read { Ok(TracerEvent::Munmap(pid, start_address)) => { @@ -1049,86 +1091,21 @@ impl Profiler { .expect("zero percpu_stats"); } - /// Clear the `percpu_stats`, `stacks`, and `aggregated_stacks` maps one entry at a time. + /// Clear the `percpu_stats` maps one entry at a time. pub fn clear_maps(&mut self) { let _span = span!(Level::DEBUG, "clear_maps").entered(); - self.clear_map("stacks"); - self.clear_map("aggregated_stacks"); self.clear_map("rate_limits"); } pub fn collect_profile(&mut self) -> RawAggregatedProfile { debug!("collecting profile"); - - self.teardown_perf_events(); - - let mut result = Vec::new(); - let maps = &self.native_unwinder.maps; - let aggregated_stacks = &maps.aggregated_stacks; - let stacks = &maps.stacks; - - let mut all_stacks_bytes = Vec::new(); - for aggregated_stack_key_bytes in aggregated_stacks.keys() { - match aggregated_stacks.lookup(&aggregated_stack_key_bytes, MapFlags::ANY) { - Ok(Some(aggregated_value_bytes)) => { - let mut result_ustack: Option = None; - let mut result_kstack: Option = None; - - let key: &stack_count_key_t = - plain::from_bytes(&aggregated_stack_key_bytes).unwrap(); - let count: &u64 = plain::from_bytes(&aggregated_value_bytes).unwrap(); - - all_stacks_bytes.push(aggregated_stack_key_bytes.clone()); - - // Maybe check if procinfo is up to date - // Fetch actual stacks - // Handle errors later - if key.user_stack_id > 0 { - match stacks.lookup(&key.user_stack_id.to_ne_bytes(), MapFlags::ANY) { - Ok(Some(stack_bytes)) => { - result_ustack = Some(*plain::from_bytes(&stack_bytes).unwrap()); - } - Ok(None) => { - warn!("NO USER STACK FOUND"); - } - Err(e) => { - error!("\tfailed getting user stack {}", e); - } - } - } - if key.kernel_stack_id > 0 { - match stacks.lookup(&key.kernel_stack_id.to_ne_bytes(), MapFlags::ANY) { - Ok(Some(stack_bytes)) => { - result_kstack = Some(*plain::from_bytes(&stack_bytes).unwrap()); - } - _ => { - error!("\tfailed getting kernel stack"); - } - } - } - - let raw_aggregated_sample = RawAggregatedSample { - sample: RawSample { - pid: key.pid, - tid: key.task_id, - ustack: result_ustack, - kstack: result_kstack, - }, - count: *count, - }; - result.push(raw_aggregated_sample); - } - _ => continue, - } - } - - debug!("===== got {} unique stacks", all_stacks_bytes.len()); + let result = self.aggregator.aggregate(self.raw_samples.clone()); + self.raw_samples.clear(); self.bump_last_used(&result); self.collect_unwinder_stats(); self.clear_maps(); - self.setup_perf_events(); result } @@ -2132,6 +2109,23 @@ impl Profiler { Ok(()) } + fn handle_stack(raw_sample_send: &Arc>, data: &[u8]) { + let mut raw_stack = raw_stack_t::default(); + plain::copy_from_bytes(&mut raw_stack, data).expect("handle stack serde"); + + let raw_sample = RawSample { + pid: raw_stack.stack_key.pid, + tid: raw_stack.stack_key.task_id, + ustack: Some(raw_stack.stack), + kstack: Some(raw_stack.kernel_stack), + }; + raw_sample_send.send(raw_sample).expect("Send raw sample"); + } + + fn handle_lost_stack(cpu: i32, count: u64) { + error!("lost {count} stacks on cpu {cpu}"); + } + fn handle_event(sender: &Arc>, data: &[u8]) { let mut event = Event::default(); plain::copy_from_bytes(&mut event, data).expect("handle event serde"); @@ -2199,8 +2193,7 @@ mod tests { &mut open_skel, &profiler_config.native_unwind_info_bucket_sizes, ); - Profiler::set_profiler_map_sizes(&mut open_skel, &profiler_config); - + Profiler::setup_profiler_maps(&mut open_skel, &profiler_config); let native_unwinder = open_skel.load().expect("load skel"); // add and delete bpf process works