From 7f1cd2925f6a1cfc5d75282cf6305894e9f06a27 Mon Sep 17 00:00:00 2001 From: Francisco Javier Honduvilla Coto Date: Fri, 4 Apr 2025 13:33:56 +0100 Subject: [PATCH 01/13] wip --- src/bpf/profiler.bpf.c | 51 +++++++++++++++++++++++++++++------ src/bpf/profiler.h | 5 ++-- src/bpf/profiler_bindings.rs | 2 ++ src/profiler.rs | 52 +++++++++++++++++++++++++----------- today.md | 21 +++++++++++++++ 5 files changed, 106 insertions(+), 25 deletions(-) create mode 100644 today.md diff --git a/src/bpf/profiler.bpf.c b/src/bpf/profiler.bpf.c index 9c7e3689..20639ee4 100644 --- a/src/bpf/profiler.bpf.c +++ b/src/bpf/profiler.bpf.c @@ -12,20 +12,34 @@ #include #include #include - +/* struct { __uint(type, BPF_MAP_TYPE_HASH); __uint(max_entries, 100000); __type(key, u64); __type(value, native_stack_t); -} stacks SEC(".maps"); +} stacks SEC(".maps"); */ -struct { +/* struct { __uint(type, BPF_MAP_TYPE_HASH); __uint(max_entries, MAX_AGGREGATED_STACKS_ENTRIES); __type(key, stack_count_key_t); __type(value, u64); } aggregated_stacks SEC(".maps"); + */ + +struct { + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 256 * 1024 /* 256 KB */); // adjust max entries based on frequency on userspace +} stacks_rb SEC(".maps"); + +struct { + __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); + __uint(key_size, sizeof(u32)); + __uint(value_size, sizeof(u32)); + __uint(max_entries, 1); + // adjust max entries based on frequency on userspace +} stacks SEC(".maps"); struct { __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); @@ -312,7 +326,7 @@ static __always_inline bool retrieve_task_registers(u64 *ip, u64 *sp, u64 *bp, u return true; } -static __always_inline void * +/* static __always_inline void * bpf_map_lookup_or_try_init(void *map, const void *key, const void *init) { void *val; long err; @@ -330,9 +344,29 @@ bpf_map_lookup_or_try_init(void *map, const void *key, const void *init) { return bpf_map_lookup_elem(map, key); } + */ + -// Aggregate the given stacktrace. static __always_inline void add_stack(struct bpf_perf_event_data *ctx, +u64 pid_tgid, +unwind_state_t *unwind_state) { + int ret = bpf_get_stack(ctx, unwind_state->kernel_stack.addresses, MAX_STACK_DEPTH * sizeof(u64), 0); + if (ret >= 0) { + unwind_state->kernel_stack.len = ret / sizeof(u64); + } + + ret = bpf_ringbuf_output(&stacks_rb, unwind_state, sizeof(unwind_state_t), 0); + (void)pid_tgid; + (void)ctx; + if (ret < 0) { + bpf_printk("add_stack ringbuf failed ret=%d", ret); + // TODO: add counter + } +} + + +// Aggregate the given stacktrace. +/* static __always_inline void add_stack(struct bpf_perf_event_data *ctx, u64 pid_tgid, unwind_state_t *unwind_state) { stack_count_key_t *stack_key = &unwind_state->stack_key; @@ -379,7 +413,7 @@ static __always_inline void add_stack(struct bpf_perf_event_data *ctx, if (count) { __sync_fetch_and_add(count, 1); } -} +} */ // The unwinding machinery lives here. SEC("perf_event") @@ -679,12 +713,13 @@ int dwarf_unwind(struct bpf_perf_event_data *ctx) { // Set up the initial unwinding state. static __always_inline bool set_initial_state(unwind_state_t *unwind_state, bpf_user_pt_regs_t *regs) { unwind_state->stack.len = 0; + unwind_state->kernel_stack.len = 0; unwind_state->tail_calls = 0; unwind_state->stack_key.pid = 0; unwind_state->stack_key.task_id = 0; - unwind_state->stack_key.user_stack_id = 0; - unwind_state->stack_key.kernel_stack_id = 0; +/* unwind_state->stack_key.user_stack_id = 0; + unwind_state->stack_key.kernel_stack_id = 0; */ if (in_kernel(PT_REGS_IP(regs))) { if (!retrieve_task_registers(&unwind_state->ip, &unwind_state->sp, &unwind_state->bp, &unwind_state->lr)) { diff --git a/src/bpf/profiler.h b/src/bpf/profiler.h index 8c976d48..e2d2ced3 100644 --- a/src/bpf/profiler.h +++ b/src/bpf/profiler.h @@ -170,12 +170,13 @@ typedef struct { typedef struct { int task_id; int pid; - unsigned long long user_stack_id; - unsigned long long kernel_stack_id; +/* unsigned long long user_stack_id; + unsigned long long kernel_stack_id; */ } stack_count_key_t; typedef struct { native_stack_t stack; + native_stack_t kernel_stack; unsigned long long ip; unsigned long long sp; diff --git a/src/bpf/profiler_bindings.rs b/src/bpf/profiler_bindings.rs index 0a8bbbf9..ad868b51 100644 --- a/src/bpf/profiler_bindings.rs +++ b/src/bpf/profiler_bindings.rs @@ -9,6 +9,8 @@ use crate::unwind_info::types::CompactUnwindRow; include!(concat!(env!("OUT_DIR"), "/profiler_bindings.rs")); + +unsafe impl Plain for unwind_state_t {} // @nocommit: this is a hack, we should create a new structure for this unsafe impl Plain for stack_count_key_t {} unsafe impl Plain for native_stack_t {} unsafe impl Plain for Event {} diff --git a/src/profiler.rs b/src/profiler.rs index 687de12e..130c9873 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -316,16 +316,16 @@ impl Profiler { open_skel: &mut OpenProfilerSkel, profiler_config: &ProfilerConfig, ) { - open_skel +/* open_skel .maps .stacks .set_max_entries(profiler_config.mapsize_stacks) - .expect("Unable to set stacks map max_entries"); - open_skel + .expect("Unable to set stacks map max_entries"); */ +/* open_skel .maps .aggregated_stacks .set_max_entries(profiler_config.mapsize_aggregated_stacks) - .expect("Unable to set aggregated_stacks map max_entries"); + .expect("Unable to set aggregated_stacks map max_entries"); */ open_skel .maps .rate_limits @@ -386,14 +386,14 @@ impl Profiler { pub fn show_actual_profiler_map_sizes(bpf: &ProfilerSkel) { info!("BPF map sizes:"); - info!( +/* info!( "stacks: {}", bpf.maps.stacks.info().unwrap().info.max_entries ); info!( "aggregated_stacks: {}", bpf.maps.aggregated_stacks.info().unwrap().info.max_entries - ); + ); */ info!( "rate_limits: {}", bpf.maps.rate_limits.info().unwrap().info.max_entries @@ -710,6 +710,15 @@ impl Profiler { self.tracers.attach().expect("attach tracers"); let chan_send = self.new_proc_chan_send.clone(); + + self.start_poll_thread( + "aaaaaa", + &self.native_unwinder.maps.stacks_rb, + &self.native_unwinder.maps.stacks, + move |data| Self::handle_stack(&data), + Self::handle_lost_stack, + ); + self.start_poll_thread( "unwinder_events", &self.native_unwinder.maps.events_rb, @@ -1043,22 +1052,24 @@ impl Profiler { pub fn clear_maps(&mut self) { let _span = span!(Level::DEBUG, "clear_maps").entered(); - self.clear_map("stacks"); - self.clear_map("aggregated_stacks"); +/* self.clear_map("stacks"); + self.clear_map("aggregated_stacks"); */ self.clear_map("rate_limits"); } pub fn collect_profile(&mut self) -> RawAggregatedProfile { debug!("collecting profile"); - self.teardown_perf_events(); - - let mut result = Vec::new(); + // self.teardown_perf_events(); + let result = Vec::new(); + /* let maps = &self.native_unwinder.maps; let aggregated_stacks = &maps.aggregated_stacks; let stacks = &maps.stacks; - - let mut all_stacks_bytes = Vec::new(); + */ + /* let mut all_stacks_bytes = Vec::new(); + // -- storage (stack_hash) => [addr1, addr2] + // -- agg (pid, tid, stack_hash) => count for aggregated_stack_key_bytes in aggregated_stacks.keys() { match aggregated_stacks.lookup(&aggregated_stack_key_bytes, MapFlags::ANY) { Ok(Some(aggregated_value_bytes)) => { @@ -1113,10 +1124,10 @@ impl Profiler { debug!("===== got {} unique stacks", all_stacks_bytes.len()); - self.bump_last_used(&result); + self.bump_last_used(&result); */ self.collect_unwinder_stats(); self.clear_maps(); - self.setup_perf_events(); + // self.setup_perf_events(); result } @@ -2106,6 +2117,17 @@ impl Profiler { Ok(()) } + fn handle_stack(data: &[u8]) { + let mut unwind_state = unwind_state_t::default(); + plain::copy_from_bytes(&mut unwind_state, data).expect("handle stack serde"); + + println!("!!!! got a stack !!!! user len: {} kernel len: {}", unwind_state.stack.len, unwind_state.kernel_stack.len); + } + + fn handle_lost_stack(_cpu: i32, _count: u64) { + + } + fn handle_event(sender: &Arc>, data: &[u8]) { let mut event = Event::default(); plain::copy_from_bytes(&mut event, data).expect("handle event serde"); diff --git a/today.md b/today.md new file mode 100644 index 00000000..ebfbc056 --- /dev/null +++ b/today.md @@ -0,0 +1,21 @@ +## Rework stack storage + +### Benefits +- reduce the amount of work we need to do in BPF (less hashing, etc) +- removing 2 BPF maps that are statically pre-allocated (stack storage, and aggregated map) +- reduce race conditions due to profiling happening while stacks are collected, while not having any gaps in profiling +- more granular samples so we can aggregate in whichever way we prefer in userspace +- only pay for what you use: only use the minimum necessary memory for the profiling frequency, never lose any events! + +### Downsides + +- more data sent in streaming from kernel -> user (!!! we can send just the amount of frames we have) + + + + +### To do + +- implement this (lol) +- test performance of user and kernel space +- check correctness \ No newline at end of file From 4e1256818cf1d53c73521ca4f36479e8b58cdb5e Mon Sep 17 00:00:00 2001 From: Okwudili Pat-Nebe Date: Wed, 16 Apr 2025 09:50:46 +0100 Subject: [PATCH 02/13] Create RawSample struct --- src/profile/convert.rs | 2 +- src/profile/mod.rs | 4 +- src/profile/{aggregated.rs => sample.rs} | 107 +++++++++++++---------- src/profiler.rs | 22 ++--- 4 files changed, 78 insertions(+), 57 deletions(-) rename src/profile/{aggregated.rs => sample.rs} (79%) diff --git a/src/profile/convert.rs b/src/profile/convert.rs index ba6c1374..b545c0b3 100644 --- a/src/profile/convert.rs +++ b/src/profile/convert.rs @@ -16,7 +16,7 @@ use crate::ksym::Ksym; use crate::ksym::KsymIter; use crate::process::ObjectFileInfo; use crate::process::ProcessInfo; -use crate::profile::aggregated::RawAggregatedProfile; +use crate::profile::sample::RawAggregatedProfile; use crate::profile::{AggregatedProfile, AggregatedSample, Frame, FrameAddress}; use crate::usym::symbolize_native_stack_blaze; use lightswitch_object::ExecutableId; diff --git a/src/profile/mod.rs b/src/profile/mod.rs index 6b68082f..fbb337e6 100644 --- a/src/profile/mod.rs +++ b/src/profile/mod.rs @@ -1,7 +1,7 @@ -mod aggregated; mod convert; mod frame; +mod sample; -pub use aggregated::*; pub use convert::*; pub use frame::*; +pub use sample::*; diff --git a/src/profile/aggregated.rs b/src/profile/sample.rs similarity index 79% rename from src/profile/aggregated.rs rename to src/profile/sample.rs index 7ef4bfea..f76d97c0 100644 --- a/src/profile/aggregated.rs +++ b/src/profile/sample.rs @@ -13,11 +13,43 @@ use crate::process::ProcessInfo; use crate::profile::Frame; #[derive(Debug, Hash, Eq, PartialEq)] -pub struct RawAggregatedSample { +pub struct RawSample { pub pid: Pid, pub tid: Pid, pub ustack: Option, pub kstack: Option, +} + +impl fmt::Display for RawSample { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let format_native_stack = |native_stack: Option| -> String { + let mut res: Vec = Vec::new(); + match native_stack { + Some(native_stack) => { + for (i, addr) in native_stack.addresses.into_iter().enumerate() { + if native_stack.len <= i.try_into().unwrap() { + break; + } + res.push(format!("{:3}: {:#018x}", i, addr)); + } + } + None => res.push("NONE".into()), + }; + format!("[{}]", res.join(",")) + }; + + fmt.debug_struct("RawSample") + .field("pid", &self.pid) + .field("tid", &self.tid) + .field("ustack", &format_native_stack(self.ustack)) + .field("kstack", &format_native_stack(self.kstack)) + .finish() + } +} + +#[derive(Debug, Hash, Eq, PartialEq)] +pub struct RawAggregatedSample { + pub sample: RawSample, pub count: u64, } @@ -32,15 +64,15 @@ impl RawAggregatedSample { objs: &HashMap, ) -> Result { let mut processed_sample = AggregatedSample { - pid: self.pid, - tid: self.tid, + pid: self.sample.pid, + tid: self.sample.tid, ustack: Vec::new(), kstack: Vec::new(), count: self.count, }; - if let Some(native_stack) = self.ustack { - let Some(info) = procs.get(&self.pid) else { + if let Some(native_stack) = self.sample.ustack { + let Some(info) = procs.get(&self.sample.pid) else { return Err(anyhow!("process not found")); }; @@ -69,7 +101,7 @@ impl RawAggregatedSample { } } - if let Some(kernel_stack) = self.kstack { + if let Some(kernel_stack) = self.sample.kstack { let Some(info) = procs.get(&KERNEL_PID) else { return Err(anyhow!("kernel process not found")); }; @@ -111,27 +143,8 @@ impl RawAggregatedSample { impl fmt::Display for RawAggregatedSample { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - let format_native_stack = |native_stack: Option| -> String { - let mut res: Vec = Vec::new(); - match native_stack { - Some(native_stack) => { - for (i, addr) in native_stack.addresses.into_iter().enumerate() { - if native_stack.len <= i.try_into().unwrap() { - break; - } - res.push(format!("{:3}: {:#018x}", i, addr)); - } - } - None => res.push("NONE".into()), - }; - format!("[{}]", res.join(",")) - }; - fmt.debug_struct("RawAggregatedSample") - .field("pid", &self.pid) - .field("tid", &self.tid) - .field("ustack", &format_native_stack(self.ustack)) - .field("kstack", &format_native_stack(self.kstack)) + .field("sample", &format!("{}", self.sample)) .field("count", &self.count) .finish() } @@ -203,24 +216,28 @@ mod tests { len: 2, }); - let sample = RawAggregatedSample { - pid: 1234, - tid: 1235, - ustack: ustack_data, - kstack: None, + let raw_aggregated_sample = RawAggregatedSample { + sample: RawSample { + pid: 1234, + tid: 1235, + ustack: ustack_data, + kstack: None, + }, count: 1, }; - insta::assert_yaml_snapshot!(format!("{}", sample), @r#""RawAggregatedSample { pid: 1234, tid: 1235, ustack: \"[ 0: 0x000000000000ffff, 1: 0x00000000deadbeef]\", kstack: \"[NONE]\", count: 1 }""#); + insta::assert_yaml_snapshot!(format!("{}", raw_aggregated_sample), @r#""RawAggregatedSample { sample: \"RawSample { pid: 1234, tid: 1235, ustack: \\\"[ 0: 0x000000000000ffff, 1: 0x00000000deadbeef]\\\", kstack: \\\"[NONE]\\\" }\", count: 1 }""#); // No user or kernel stacks - let sample = RawAggregatedSample { - pid: 1234, - tid: 1235, - ustack: None, - kstack: None, + let raw_aggregated_sample = RawAggregatedSample { + sample: RawSample { + pid: 1234, + tid: 1235, + ustack: None, + kstack: None, + }, count: 1, }; - insta::assert_yaml_snapshot!(format!("{}", sample), @r#""RawAggregatedSample { pid: 1234, tid: 1235, ustack: \"[NONE]\", kstack: \"[NONE]\", count: 1 }""#); + insta::assert_yaml_snapshot!(format!("{}", raw_aggregated_sample), @r#""RawAggregatedSample { sample: \"RawSample { pid: 1234, tid: 1235, ustack: \\\"[NONE]\\\", kstack: \\\"[NONE]\\\" }\", count: 1 }""#); // user and kernel stacks let mut ustack = addrs; @@ -262,14 +279,16 @@ mod tests { len: kreplace.len() as u64, }); - let sample = RawAggregatedSample { - pid: 128821, - tid: 128822, - ustack: ustack_data, - kstack: kstack_data, + let raw_aggregated_sample = RawAggregatedSample { + sample: RawSample { + pid: 128821, + tid: 128822, + ustack: ustack_data, + kstack: kstack_data, + }, count: 42, }; - insta::assert_yaml_snapshot!(format!("{}", sample), @r#""RawAggregatedSample { pid: 128821, tid: 128822, ustack: \"[ 0: 0x00007f7c91c82314, 1: 0x00007f7c91c4ff93, 2: 0x00007f7c91c5d8ae, 3: 0x00007f7c91c4d2c3, 4: 0x00007f7c91c45400, 5: 0x00007f7c91c10933, 6: 0x00007f7c91c38153, 7: 0x00007f7c91c331d9, 8: 0x00007f7c91dfa501, 9: 0x00007f7c91c16b05, 10: 0x00007f7c91e22038, 11: 0x00007f7c91e23fc6]\", kstack: \"[ 0: 0xffffffff8749ae51, 1: 0xffffffffc04c4804, 2: 0xffffffff874ddfd0, 3: 0xffffffff874e0843, 4: 0xffffffff874e0b8a, 5: 0xffffffff8727f600, 6: 0xffffffff8727f8a7, 7: 0xffffffff87e0116e]\", count: 42 }""#); + insta::assert_yaml_snapshot!(format!("{}", raw_aggregated_sample), @r#""RawAggregatedSample { sample: \"RawSample { pid: 128821, tid: 128822, ustack: \\\"[ 0: 0x00007f7c91c82314, 1: 0x00007f7c91c4ff93, 2: 0x00007f7c91c5d8ae, 3: 0x00007f7c91c4d2c3, 4: 0x00007f7c91c45400, 5: 0x00007f7c91c10933, 6: 0x00007f7c91c38153, 7: 0x00007f7c91c331d9, 8: 0x00007f7c91dfa501, 9: 0x00007f7c91c16b05, 10: 0x00007f7c91e22038, 11: 0x00007f7c91e23fc6]\\\", kstack: \\\"[ 0: 0xffffffff8749ae51, 1: 0xffffffffc04c4804, 2: 0xffffffff874ddfd0, 3: 0xffffffff874e0843, 4: 0xffffffff874e0b8a, 5: 0xffffffff8727f600, 6: 0xffffffff8727f8a7, 7: 0xffffffff87e0116e]\\\" }\", count: 42 }""#); } #[test] diff --git a/src/profiler.rs b/src/profiler.rs index 745e3197..eefadfaa 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -937,12 +937,12 @@ impl Profiler { } /// Updates the last time processes and executables were seen. This is used during evictions. - pub fn bump_last_used(&mut self, raw_samples: &[RawAggregatedSample]) { + pub fn bump_last_used(&mut self, raw_aggregated_samples: &[RawAggregatedSample]) { let now = Instant::now(); - for raw_sample in raw_samples { - let pid = raw_sample.pid; - let ustack = raw_sample.ustack; + for aggregated_sample in raw_aggregated_samples { + let pid = aggregated_sample.sample.pid; + let ustack = aggregated_sample.sample.ustack; let Some(ustack) = ustack else { continue; }; @@ -1103,14 +1103,16 @@ impl Profiler { } } - let raw_sample = RawAggregatedSample { - pid: key.pid, - tid: key.task_id, - ustack: result_ustack, - kstack: result_kstack, + let raw_aggregated_sample = RawAggregatedSample { + sample: RawSample { + pid: key.pid, + tid: key.task_id, + ustack: result_ustack, + kstack: result_kstack, + }, count: *count, }; - result.push(raw_sample); + result.push(raw_aggregated_sample); } _ => continue, } From 3705523578a7c8bbf1eaf0010fd84455ee9392d8 Mon Sep 17 00:00:00 2001 From: Okwudili Pat-Nebe Date: Wed, 16 Apr 2025 09:54:07 +0100 Subject: [PATCH 03/13] . --- src/profile/convert.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/profile/convert.rs b/src/profile/convert.rs index b545c0b3..50eef6c3 100644 --- a/src/profile/convert.rs +++ b/src/profile/convert.rs @@ -16,8 +16,9 @@ use crate::ksym::Ksym; use crate::ksym::KsymIter; use crate::process::ObjectFileInfo; use crate::process::ProcessInfo; -use crate::profile::sample::RawAggregatedProfile; -use crate::profile::{AggregatedProfile, AggregatedSample, Frame, FrameAddress}; +use crate::profile::{ + AggregatedProfile, AggregatedSample, Frame, FrameAddress, RawAggregatedProfile, +}; use crate::usym::symbolize_native_stack_blaze; use lightswitch_object::ExecutableId; From be02cfdfbe7e0f2edb8a22b6a92d9b99c5dc7d23 Mon Sep 17 00:00:00 2001 From: Okwudili Pat-Nebe Date: Sun, 20 Apr 2025 17:51:42 +0100 Subject: [PATCH 04/13] Implement uspace aggregation --- src/aggregator.rs | 114 ++++++++++++++++++++++++++ src/bpf/profiler.bpf.c | 107 ++++-------------------- src/bpf/profiler.h | 12 +-- src/bpf/profiler_bindings.rs | 3 +- src/lib.rs | 1 + src/profile/sample.rs | 5 +- src/profiler.rs | 153 +++++++++++++---------------------- vm.nix | 3 +- 8 files changed, 193 insertions(+), 205 deletions(-) create mode 100644 src/aggregator.rs diff --git a/src/aggregator.rs b/src/aggregator.rs new file mode 100644 index 00000000..949835b9 --- /dev/null +++ b/src/aggregator.rs @@ -0,0 +1,114 @@ +use std::hash::DefaultHasher; +use std::{collections::HashMap, hash::Hash, hash::Hasher}; + +use tracing::warn; + +use crate::profile::{RawAggregatedProfile, RawAggregatedSample, RawSample}; + +#[derive(Default)] +pub struct Aggregator {} + +impl Aggregator { + pub fn aggregate(&self, raw_samples: Vec) -> RawAggregatedProfile { + if raw_samples.is_empty() { + return Vec::new(); // TODO: return error if nothing to aggregate? + } + + let mut sample_hash_to_aggregated: HashMap = HashMap::new(); + for sample in raw_samples { + if sample.ustack.is_none() & sample.kstack.is_none() { + warn!( + "No stack present in provided sample={}, skipping...", + sample + ); + continue; + } + + let mut hasher = DefaultHasher::new(); + sample.hash(&mut hasher); + let sample_hash = hasher.finish(); + + sample_hash_to_aggregated + .entry(sample_hash) + .and_modify(|aggregated_sample| aggregated_sample.count += 1) + .or_insert(RawAggregatedSample { sample, count: 1 }); + } + sample_hash_to_aggregated.into_values().collect() + } +} + +#[cfg(test)] +mod tests { + use crate::aggregator::Aggregator; + use crate::bpf::profiler_bindings::native_stack_t; + use crate::profile::RawSample; + + #[test] + fn test_aggregate_raw_samples() { + // Given + let mut ustack1_data = [0; 127]; + ustack1_data[0] = 0xffff; + ustack1_data[1] = 0xdeadbeef; + let ustack1 = Some(native_stack_t { + addresses: ustack1_data, + len: 2, + }); + + // todo: opatnebe (add kstack) + let raw_sample_1 = RawSample { + pid: 1234, + tid: 1235, + ustack: ustack1, + kstack: None, + }; + + let mut ustack2_data = [0; 127]; + ustack2_data[0] = 0xdddd; + ustack2_data[1] = 0xfeedbee; + ustack2_data[0] = 0xddddef; + ustack2_data[1] = 0xbeefdad; + let ustack2 = Some(native_stack_t { + addresses: ustack2_data, + len: 4, + }); + + let raw_sample_2 = RawSample { + pid: 1234, + tid: 1235, + ustack: ustack2, + kstack: None, + }; + + let raw_samples = vec![ + raw_sample_1, + raw_sample_2, + raw_sample_1, + raw_sample_2, + raw_sample_2, + raw_sample_2, + ]; + + let aggregator = Aggregator::default(); + + // When + let raw_aggregated_profile = aggregator.aggregate(raw_samples); + + // Then + assert_eq!(raw_aggregated_profile.len(), 2); + } + + #[test] + fn test_aggregate_raw_samples_diff_ustack_same_kstack() {} + + #[test] + fn test_aggregate_raw_samples_same_ustack_diff_kstack() {} + + #[test] + fn test_aggregate_raw_samples_no_ustack() {} + + #[test] + fn test_aggregate_raw_samples_no_kstack() {} + + #[test] + fn test_aggregate_same_stack_traces_different_pid_tid() {} +} diff --git a/src/bpf/profiler.bpf.c b/src/bpf/profiler.bpf.c index 2a7a0e80..f91bb66e 100644 --- a/src/bpf/profiler.bpf.c +++ b/src/bpf/profiler.bpf.c @@ -12,33 +12,18 @@ #include #include #include -/* -struct { - __uint(type, BPF_MAP_TYPE_HASH); - __uint(max_entries, 100000); - __type(key, u64); - __type(value, native_stack_t); -} stacks SEC(".maps"); */ -/* struct { - __uint(type, BPF_MAP_TYPE_HASH); - __uint(max_entries, MAX_AGGREGATED_STACKS_ENTRIES); - __type(key, stack_count_key_t); - __type(value, u64); -} aggregated_stacks SEC(".maps"); - */ struct { __uint(type, BPF_MAP_TYPE_RINGBUF); - __uint(max_entries, 256 * 1024 /* 256 KB */); // adjust max entries based on frequency on userspace + __uint(max_entries, 256 * 1024 /* 256 KB */); // adjust max entries based on frequency on userspace -- todo } stacks_rb SEC(".maps"); struct { __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); __uint(key_size, sizeof(u32)); __uint(value_size, sizeof(u32)); - __uint(max_entries, 1); - // adjust max entries based on frequency on userspace + // adjust max entries based on frequency on userspace - todo (what does this mean?) } stacks SEC(".maps"); struct { @@ -331,94 +316,34 @@ static __always_inline bool retrieve_task_registers(u64 *ip, u64 *sp, u64 *bp, u return true; } -/* static __always_inline void * -bpf_map_lookup_or_try_init(void *map, const void *key, const void *init) { - void *val; - long err; - - val = bpf_map_lookup_elem(map, key); - if (val) { - return val; - } - - err = bpf_map_update_elem(map, key, init, BPF_NOEXIST); - if (err && !STACK_COLLISION(err)) { - LOG("[error] bpf_map_lookup_or_try_init with ret: %d", err); - return 0; - } - - return bpf_map_lookup_elem(map, key); -} - */ - - static __always_inline void add_stack(struct bpf_perf_event_data *ctx, u64 pid_tgid, unwind_state_t *unwind_state) { + // Get the kernel stack int ret = bpf_get_stack(ctx, unwind_state->kernel_stack.addresses, MAX_STACK_DEPTH * sizeof(u64), 0); if (ret >= 0) { unwind_state->kernel_stack.len = ret / sizeof(u64); } - ret = bpf_ringbuf_output(&stacks_rb, unwind_state, sizeof(unwind_state_t), 0); - (void)pid_tgid; - (void)ctx; - if (ret < 0) { - bpf_printk("add_stack ringbuf failed ret=%d", ret); - // TODO: add counter - } -} - - -// Aggregate the given stacktrace. -/* static __always_inline void add_stack(struct bpf_perf_event_data *ctx, - u64 pid_tgid, - unwind_state_t *unwind_state) { - stack_count_key_t *stack_key = &unwind_state->stack_key; - int per_process_id = pid_tgid >> 32; int per_thread_id = pid_tgid; - stack_key->pid = per_process_id; - stack_key->task_id = per_thread_id; - - // Hash and add user stack. - if (unwind_state->stack.len >= 1) { - u64 user_stack_id = hash_stack(&unwind_state->stack); - int err = bpf_map_update_elem(&stacks, &user_stack_id, &unwind_state->stack, - BPF_ANY); - if (err == 0) { - stack_key->user_stack_id = user_stack_id; - } else { - LOG("[error] failed to insert user stack: %d", err); - } - } - - // Walk, hash and add kernel stack. - int ret = bpf_get_stack(ctx, unwind_state->stack.addresses, MAX_STACK_DEPTH * sizeof(u64), 0); - if (ret >= 0) { - unwind_state->stack.len = ret / sizeof(u64); - } + unwind_state->stack_key.pid = per_process_id; + unwind_state->stack_key.task_id = per_thread_id; - if (unwind_state->stack.len >= 1) { - u64 kernel_stack_id = hash_stack(&unwind_state->stack); - int err = bpf_map_update_elem(&stacks, &kernel_stack_id, &unwind_state->stack, - BPF_ANY); - if (err == 0) { - stack_key->kernel_stack_id = kernel_stack_id; - } else { - LOG("[error] failed to insert kernel stack: %d", err); + if (lightswitch_config.use_ring_buffers) { + ret = bpf_ringbuf_output(&stacks_rb, unwind_state, sizeof(unwind_state_t), 0); + if (ret < 0) { + bpf_printk("add_stack ringbuf failed ret=%d", ret); + // TODO: add counter + // What will the counter be used for? } + } else { + ret = bpf_perf_event_output(ctx, &stacks, BPF_F_CURRENT_CPU, unwind_state, sizeof(unwind_state_t)); + // todo: What is ret used for here? } +} - // Insert aggregated stack. - u64 zero = 0; - u64 *count = bpf_map_lookup_or_try_init(&aggregated_stacks, - &unwind_state->stack_key, &zero); - if (count) { - __sync_fetch_and_add(count, 1); - } -} */ // The unwinding machinery lives here. SEC("perf_event") @@ -727,8 +652,6 @@ static __always_inline bool set_initial_state(unwind_state_t *unwind_state, bpf_ unwind_state->stack_key.pid = 0; unwind_state->stack_key.task_id = 0; -/* unwind_state->stack_key.user_stack_id = 0; - unwind_state->stack_key.kernel_stack_id = 0; */ if (in_kernel(PT_REGS_IP(regs))) { if (!retrieve_task_registers(&unwind_state->ip, &unwind_state->sp, &unwind_state->bp, &unwind_state->lr)) { diff --git a/src/bpf/profiler.h b/src/bpf/profiler.h index 90814c13..16ac4525 100644 --- a/src/bpf/profiler.h +++ b/src/bpf/profiler.h @@ -50,7 +50,6 @@ typedef struct { u32 high_index; } page_value_t; -#define MAX_AGGREGATED_STACKS_ENTRIES 10000 // Values for dwarf expressions. #define DWARF_EXPRESSION_UNKNOWN 0 @@ -120,11 +119,6 @@ const volatile struct lightswitch_config_t lightswitch_config = { .use_task_pt_regs_helper = false, }; -// A different stack produced the same hash. -#define STACK_COLLISION(err) (err == -EEXIST) -// Tried to read a kernel stack from a non-kernel context. -#define IN_USERSPACE(err) (err == -EFAULT) - #define LOG(fmt, ...) \ ({ \ if (lightswitch_config.verbose_logging) { \ @@ -172,9 +166,7 @@ typedef struct { typedef struct { int task_id; int pid; -/* unsigned long long user_stack_id; - unsigned long long kernel_stack_id; */ -} stack_count_key_t; +} stack_key_t; typedef struct { native_stack_t stack; @@ -186,7 +178,7 @@ typedef struct { unsigned long long lr; int tail_calls; - stack_count_key_t stack_key; + stack_key_t stack_key; } unwind_state_t; enum event_type { diff --git a/src/bpf/profiler_bindings.rs b/src/bpf/profiler_bindings.rs index ad868b51..808ef4fb 100644 --- a/src/bpf/profiler_bindings.rs +++ b/src/bpf/profiler_bindings.rs @@ -9,9 +9,8 @@ use crate::unwind_info::types::CompactUnwindRow; include!(concat!(env!("OUT_DIR"), "/profiler_bindings.rs")); - unsafe impl Plain for unwind_state_t {} // @nocommit: this is a hack, we should create a new structure for this -unsafe impl Plain for stack_count_key_t {} +unsafe impl Plain for stack_key_t {} unsafe impl Plain for native_stack_t {} unsafe impl Plain for Event {} unsafe impl Plain for unwinder_stats_t {} diff --git a/src/lib.rs b/src/lib.rs index b1416091..c1a169e4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod aggregator; pub mod bpf; pub mod collector; pub mod debug_info; diff --git a/src/profile/sample.rs b/src/profile/sample.rs index f76d97c0..3f8ef6fa 100644 --- a/src/profile/sample.rs +++ b/src/profile/sample.rs @@ -12,7 +12,7 @@ use crate::process::Pid; use crate::process::ProcessInfo; use crate::profile::Frame; -#[derive(Debug, Hash, Eq, PartialEq)] +#[derive(Debug, Hash, Eq, PartialEq, Copy, Clone)] pub struct RawSample { pub pid: Pid, pub tid: Pid, @@ -47,7 +47,8 @@ impl fmt::Display for RawSample { } } -#[derive(Debug, Hash, Eq, PartialEq)] +// todo - opatnebe (do we need to derive hash for this?) +#[derive(Debug, PartialEq)] pub struct RawAggregatedSample { pub sample: RawSample, pub count: u64, diff --git a/src/profiler.rs b/src/profiler.rs index b7440ef0..1d494bc6 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -34,6 +34,7 @@ use memmap2::MmapOptions; use procfs; use tracing::{debug, error, info, span, warn, Level}; +use crate::aggregator::Aggregator; use crate::bpf::profiler_bindings::*; use crate::bpf::profiler_skel::{OpenProfilerSkel, ProfilerSkel, ProfilerSkelBuilder}; use crate::bpf::tracers_bindings::*; @@ -131,6 +132,11 @@ pub struct Profiler { // Profile channel profile_send: Arc>, profile_receive: Arc>, + // A vector of raw samples received from bpf in the current profiling session + raw_samples: Vec, + // Raw sample channel + raw_sample_send: Arc>, + raw_sample_receive: Arc>, // TODO: Does this need to be ARCed /// For how long to profile. duration: Duration, /// Per-CPU sampling frequency in Hz. @@ -151,6 +157,7 @@ pub struct Profiler { max_native_unwind_info_size_mb: i32, unwind_info_manager: UnwindInfoManager, use_ring_buffers: bool, + aggregator: Aggregator, metadata_provider: ThreadSafeGlobalMetadataProvider, } @@ -281,6 +288,8 @@ enum AddUnwindInformationError { BpfPages(String), } +static MAX_AGGREGATED_STACKS_ENTRIES: u64 = 10000; + impl Profiler { pub fn create_unwind_info_maps( open_skel: &mut OpenProfilerSkel, @@ -325,16 +334,16 @@ impl Profiler { open_skel: &mut OpenProfilerSkel, profiler_config: &ProfilerConfig, ) { -/* open_skel - .maps - .stacks - .set_max_entries(profiler_config.mapsize_stacks) - .expect("Unable to set stacks map max_entries"); */ -/* open_skel - .maps - .aggregated_stacks - .set_max_entries(profiler_config.mapsize_aggregated_stacks) - .expect("Unable to set aggregated_stacks map max_entries"); */ + /* open_skel + .maps + .stacks + .set_max_entries(profiler_config.mapsize_stacks) + .expect("Unable to set stacks map max_entries"); */ + /* open_skel + .maps + .aggregated_stacks + .set_max_entries(profiler_config.mapsize_aggregated_stacks) + .expect("Unable to set aggregated_stacks map max_entries"); */ open_skel .maps .rate_limits @@ -395,14 +404,6 @@ impl Profiler { pub fn show_actual_profiler_map_sizes(bpf: &ProfilerSkel) { info!("BPF map sizes:"); -/* info!( - "stacks: {}", - bpf.maps.stacks.info().unwrap().info.max_entries - ); - info!( - "aggregated_stacks: {}", - bpf.maps.aggregated_stacks.info().unwrap().info.max_entries - ); */ info!( "rate_limits: {}", bpf.maps.rate_limits.info().unwrap().info.max_entries @@ -524,6 +525,10 @@ impl Profiler { let profile_send = Arc::new(sender); let profile_receive = Arc::new(receiver); + let (sender, receiver) = unbounded(); + let raw_sample_sender = Arc::new(sender); + let raw_sample_receiver = Arc::new(receiver); + Profiler { cache_dir, _links: Vec::new(), @@ -542,6 +547,9 @@ impl Profiler { filter_pids: HashMap::new(), profile_send, profile_receive, + raw_samples: Vec::new(), + raw_sample_send: raw_sample_sender, + raw_sample_receive: raw_sample_receiver, duration: profiler_config.duration, sample_freq: profiler_config.sample_freq, perf_buffer_bytes: profiler_config.perf_buffer_bytes, @@ -552,6 +560,7 @@ impl Profiler { max_native_unwind_info_size_mb: profiler_config.max_native_unwind_info_size_mb, unwind_info_manager: UnwindInfoManager::new(&unwind_cache_dir, None), use_ring_buffers: profiler_config.use_ring_buffers, + aggregator: Aggregator::default(), metadata_provider, } } @@ -708,7 +717,7 @@ impl Profiler { // number of "online" CPUs, not "possible" CPUs, which they sometimes differ. let num_cpus = get_online_cpus().expect("get online CPUs").len() as u64; let max_samples_per_session = self.sample_freq * num_cpus * self.session_duration.as_secs(); - if max_samples_per_session >= MAX_AGGREGATED_STACKS_ENTRIES.into() { + if max_samples_per_session >= MAX_AGGREGATED_STACKS_ENTRIES { warn!("samples might be lost due to too many samples in a profile session"); } @@ -719,12 +728,13 @@ impl Profiler { self.tracers.attach().expect("attach tracers"); let chan_send = self.new_proc_chan_send.clone(); + let raw_sample_send = self.raw_sample_send.clone(); self.start_poll_thread( - "aaaaaa", + "raw_samples", &self.native_unwinder.maps.stacks_rb, &self.native_unwinder.maps.stacks, - move |data| Self::handle_stack(&data), + move |data| Self::handle_stack(&raw_sample_send, data), Self::handle_lost_stack, ); @@ -783,6 +793,7 @@ impl Profiler { let session_tick = tick(self.session_duration); loop { + // TODO: Should we use select_biased here? select! { recv(self.stop_chan_receive) -> _ => { debug!("received ctrl+c"); @@ -800,7 +811,15 @@ impl Profiler { debug!("collecting profiles on schedule"); let profile = self.collect_profile(); self.send_profile(profile); - } + }, + recv(self.raw_sample_receive) -> raw_sample => { + if let Ok(raw_sample) = raw_sample { + self.raw_samples.push(raw_sample); + } + else { + warn!("Failed to receive raw sample, what={:?}", raw_sample.err()); + } + }, recv(self.tracers_chan_receive) -> read => { match read { Ok(TracerEvent::Munmap(pid, start_address)) => { @@ -1057,88 +1076,22 @@ impl Profiler { .expect("zero percpu_stats"); } - /// Clear the `percpu_stats`, `stacks`, and `aggregated_stacks` maps one entry at a time. + /// Clear the `percpu_stats` maps one entry at a time. pub fn clear_maps(&mut self) { let _span = span!(Level::DEBUG, "clear_maps").entered(); -/* self.clear_map("stacks"); - self.clear_map("aggregated_stacks"); */ self.clear_map("rate_limits"); } pub fn collect_profile(&mut self) -> RawAggregatedProfile { debug!("collecting profile"); + println!("Num raw samples {}", self.raw_samples.len()); + let result = self.aggregator.aggregate(self.raw_samples.clone()); + self.raw_samples.clear(); - // self.teardown_perf_events(); - let result = Vec::new(); - /* - let maps = &self.native_unwinder.maps; - let aggregated_stacks = &maps.aggregated_stacks; - let stacks = &maps.stacks; - */ - /* let mut all_stacks_bytes = Vec::new(); - // -- storage (stack_hash) => [addr1, addr2] - // -- agg (pid, tid, stack_hash) => count - for aggregated_stack_key_bytes in aggregated_stacks.keys() { - match aggregated_stacks.lookup(&aggregated_stack_key_bytes, MapFlags::ANY) { - Ok(Some(aggregated_value_bytes)) => { - let mut result_ustack: Option = None; - let mut result_kstack: Option = None; - - let key: &stack_count_key_t = - plain::from_bytes(&aggregated_stack_key_bytes).unwrap(); - let count: &u64 = plain::from_bytes(&aggregated_value_bytes).unwrap(); - - all_stacks_bytes.push(aggregated_stack_key_bytes.clone()); - - // Maybe check if procinfo is up to date - // Fetch actual stacks - // Handle errors later - if key.user_stack_id > 0 { - match stacks.lookup(&key.user_stack_id.to_ne_bytes(), MapFlags::ANY) { - Ok(Some(stack_bytes)) => { - result_ustack = Some(*plain::from_bytes(&stack_bytes).unwrap()); - } - Ok(None) => { - warn!("NO USER STACK FOUND"); - } - Err(e) => { - error!("\tfailed getting user stack {}", e); - } - } - } - if key.kernel_stack_id > 0 { - match stacks.lookup(&key.kernel_stack_id.to_ne_bytes(), MapFlags::ANY) { - Ok(Some(stack_bytes)) => { - result_kstack = Some(*plain::from_bytes(&stack_bytes).unwrap()); - } - _ => { - error!("\tfailed getting kernel stack"); - } - } - } - - let raw_aggregated_sample = RawAggregatedSample { - sample: RawSample { - pid: key.pid, - tid: key.task_id, - ustack: result_ustack, - kstack: result_kstack, - }, - count: *count, - }; - result.push(raw_aggregated_sample); - } - _ => continue, - } - } - - debug!("===== got {} unique stacks", all_stacks_bytes.len()); - - self.bump_last_used(&result); */ + self.bump_last_used(&result); self.collect_unwinder_stats(); self.clear_maps(); - // self.setup_perf_events(); result } @@ -2117,16 +2070,20 @@ impl Profiler { Ok(()) } - fn handle_stack(data: &[u8]) { + fn handle_stack(raw_sample_send: &Arc>, data: &[u8]) { let mut unwind_state = unwind_state_t::default(); plain::copy_from_bytes(&mut unwind_state, data).expect("handle stack serde"); - println!("!!!! got a stack !!!! user len: {} kernel len: {}", unwind_state.stack.len, unwind_state.kernel_stack.len); + let raw_sample = RawSample { + pid: unwind_state.stack_key.pid, + tid: unwind_state.stack_key.task_id, + ustack: Some(unwind_state.stack), + kstack: Some(unwind_state.kernel_stack), + }; + raw_sample_send.send(raw_sample).expect("Send raw sample"); } - fn handle_lost_stack(_cpu: i32, _count: u64) { - - } + fn handle_lost_stack(_cpu: i32, _count: u64) {} fn handle_event(sender: &Arc>, data: &[u8]) { let mut event = Event::default(); diff --git a/vm.nix b/vm.nix index 976a435d..0022c4cd 100644 --- a/vm.nix +++ b/vm.nix @@ -135,7 +135,8 @@ let rev = "51f11bf301fea054342996802a16ed21fb5054f4"; sha256 = "sha256-qtTq0dnDHi1ITfQzKrXz+1dRMymAFBivWpjXntD09+A="; }; - cargoHash = "sha256-SHjjCWz4FVVk1cczkMltRVEB3GK8jz2tVABNSlSZiUc="; + useFetchCargoVendor = true; + cargoHash = "sha256-1U6Dn0MCyNsdbd7MrE7a6k8dJBtjyquOrW6RvhvCW7Y="; # nativeCheckInputs = [ pkgs.qemu ]; # There are some errors trying to access `/build/source/tests/*`. From f12184721f0ad6a5abd08d8bda3029f33a97fdd9 Mon Sep 17 00:00:00 2001 From: Okwudili Pat-Nebe Date: Sun, 20 Apr 2025 20:17:08 +0100 Subject: [PATCH 05/13] . --- src/profile/sample.rs | 1 - src/profiler.rs | 8 +++----- today.md | 21 --------------------- 3 files changed, 3 insertions(+), 27 deletions(-) delete mode 100644 today.md diff --git a/src/profile/sample.rs b/src/profile/sample.rs index 3f8ef6fa..8e46b4bc 100644 --- a/src/profile/sample.rs +++ b/src/profile/sample.rs @@ -47,7 +47,6 @@ impl fmt::Display for RawSample { } } -// todo - opatnebe (do we need to derive hash for this?) #[derive(Debug, PartialEq)] pub struct RawAggregatedSample { pub sample: RawSample, diff --git a/src/profiler.rs b/src/profiler.rs index 1d494bc6..2db4d817 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -134,9 +134,9 @@ pub struct Profiler { profile_receive: Arc>, // A vector of raw samples received from bpf in the current profiling session raw_samples: Vec, - // Raw sample channel + // Raw sample channel. Used for receiving raw samples from the rinbuf/perfbuf poll thread raw_sample_send: Arc>, - raw_sample_receive: Arc>, // TODO: Does this need to be ARCed + raw_sample_receive: Arc>, /// For how long to profile. duration: Duration, /// Per-CPU sampling frequency in Hz. @@ -731,7 +731,7 @@ impl Profiler { let raw_sample_send = self.raw_sample_send.clone(); self.start_poll_thread( - "raw_samples", + "raw-samples-poll-thread", &self.native_unwinder.maps.stacks_rb, &self.native_unwinder.maps.stacks, move |data| Self::handle_stack(&raw_sample_send, data), @@ -793,7 +793,6 @@ impl Profiler { let session_tick = tick(self.session_duration); loop { - // TODO: Should we use select_biased here? select! { recv(self.stop_chan_receive) -> _ => { debug!("received ctrl+c"); @@ -1085,7 +1084,6 @@ impl Profiler { pub fn collect_profile(&mut self) -> RawAggregatedProfile { debug!("collecting profile"); - println!("Num raw samples {}", self.raw_samples.len()); let result = self.aggregator.aggregate(self.raw_samples.clone()); self.raw_samples.clear(); diff --git a/today.md b/today.md deleted file mode 100644 index ebfbc056..00000000 --- a/today.md +++ /dev/null @@ -1,21 +0,0 @@ -## Rework stack storage - -### Benefits -- reduce the amount of work we need to do in BPF (less hashing, etc) -- removing 2 BPF maps that are statically pre-allocated (stack storage, and aggregated map) -- reduce race conditions due to profiling happening while stacks are collected, while not having any gaps in profiling -- more granular samples so we can aggregate in whichever way we prefer in userspace -- only pay for what you use: only use the minimum necessary memory for the profiling frequency, never lose any events! - -### Downsides - -- more data sent in streaming from kernel -> user (!!! we can send just the amount of frames we have) - - - - -### To do - -- implement this (lol) -- test performance of user and kernel space -- check correctness \ No newline at end of file From b1cacbfb5fce201329dc85520112e70f00df20bd Mon Sep 17 00:00:00 2001 From: Okwudili Pat-Nebe Date: Tue, 22 Apr 2025 08:31:40 +0100 Subject: [PATCH 06/13] Unit test the aggregator --- src/aggregator.rs | 195 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 185 insertions(+), 10 deletions(-) diff --git a/src/aggregator.rs b/src/aggregator.rs index 949835b9..033303da 100644 --- a/src/aggregator.rs +++ b/src/aggregator.rs @@ -11,7 +11,7 @@ pub struct Aggregator {} impl Aggregator { pub fn aggregate(&self, raw_samples: Vec) -> RawAggregatedProfile { if raw_samples.is_empty() { - return Vec::new(); // TODO: return error if nothing to aggregate? + return Vec::new(); } let mut sample_hash_to_aggregated: HashMap = HashMap::new(); @@ -54,12 +54,22 @@ mod tests { len: 2, }); - // todo: opatnebe (add kstack) + let mut kstack1_data = [0; 127]; + kstack1_data[0] = 0xffff; + kstack1_data[1] = 0xdddd; + kstack1_data[2] = 0xaaaa; + kstack1_data[3] = 0xeeee; + kstack1_data[4] = 0xaaae; + let kstack1 = Some(native_stack_t { + addresses: kstack1_data, + len: 5, + }); + let raw_sample_1 = RawSample { pid: 1234, tid: 1235, ustack: ustack1, - kstack: None, + kstack: kstack1, }; let mut ustack2_data = [0; 127]; @@ -95,20 +105,185 @@ mod tests { // Then assert_eq!(raw_aggregated_profile.len(), 2); + for sample in raw_aggregated_profile { + if sample.sample == raw_sample_1 { + assert_eq!(sample.count, 2); + } else { + assert_eq!(sample.count, 4); + } + } } #[test] - fn test_aggregate_raw_samples_diff_ustack_same_kstack() {} + fn test_aggregate_raw_samples_same_ustack_diff_kstack() { + let mut ustack1_data = [0; 127]; + ustack1_data[0] = 0xffff; + ustack1_data[1] = 0xdeadbeef; + let ustack1 = Some(native_stack_t { + addresses: ustack1_data, + len: 2, + }); - #[test] - fn test_aggregate_raw_samples_same_ustack_diff_kstack() {} + let mut kstack1_data = [0; 127]; + kstack1_data[0] = 0xffff; + kstack1_data[1] = 0xdddd; + kstack1_data[2] = 0xaaaa; + kstack1_data[3] = 0xeeee; + kstack1_data[4] = 0xaaae; + let kstack1 = Some(native_stack_t { + addresses: kstack1_data, + len: 5, + }); - #[test] - fn test_aggregate_raw_samples_no_ustack() {} + let raw_sample_1 = RawSample { + pid: 1234, + tid: 1235, + ustack: ustack1, + kstack: kstack1, + }; + + let raw_sample_2 = RawSample { + pid: 1234, + tid: 1235, + ustack: ustack1, + kstack: None, + }; + + let raw_samples = vec![raw_sample_1, raw_sample_2, raw_sample_2]; + + let aggregator = Aggregator::default(); + + // When + let raw_aggregated_profile = aggregator.aggregate(raw_samples); + + // Then + assert_eq!(raw_aggregated_profile.len(), 2); + for sample in raw_aggregated_profile { + if sample.sample == raw_sample_1 { + assert_eq!(sample.count, 1); + } else { + assert_eq!(sample.count, 2); + } + } + } #[test] - fn test_aggregate_raw_samples_no_kstack() {} + fn test_aggregate_raw_samples_diff_ustack_same_kstack() { + let mut ustack1_data = [0; 127]; + ustack1_data[0] = 0xffff; + ustack1_data[1] = 0xdeadbeef; + let ustack1 = Some(native_stack_t { + addresses: ustack1_data, + len: 2, + }); + + let mut kstack1_data = [0; 127]; + kstack1_data[0] = 0xffff; + kstack1_data[1] = 0xdddd; + kstack1_data[2] = 0xaaaa; + kstack1_data[3] = 0xeeee; + kstack1_data[4] = 0xaaae; + let kstack1 = Some(native_stack_t { + addresses: kstack1_data, + len: 5, + }); + + let raw_sample_1 = RawSample { + pid: 1234, + tid: 1235, + ustack: ustack1, + kstack: kstack1, + }; + + let mut ustack2_data = [0; 127]; + ustack2_data[0] = 0xdddd; + ustack2_data[1] = 0xfeedbee; + ustack2_data[0] = 0xddddef; + ustack2_data[1] = 0xbeefdad; + let ustack2 = Some(native_stack_t { + addresses: ustack2_data, + len: 4, + }); + + let raw_sample_2 = RawSample { + pid: 1234, + tid: 1235, + ustack: ustack2, + kstack: kstack1, + }; + + let raw_samples = vec![ + raw_sample_1, + raw_sample_2, + raw_sample_1, + raw_sample_2, + raw_sample_1, + ]; + + let aggregator = Aggregator::default(); + + // When + let raw_aggregated_profile = aggregator.aggregate(raw_samples); + + // Then + assert_eq!(raw_aggregated_profile.len(), 2); + for sample in raw_aggregated_profile { + if sample.sample == raw_sample_1 { + assert_eq!(sample.count, 3); + } else { + assert_eq!(sample.count, 2); + } + } + } #[test] - fn test_aggregate_same_stack_traces_different_pid_tid() {} + fn test_aggregate_same_stack_traces_different_pid_tid() { + let mut ustack_data = [0; 127]; + ustack_data[0] = 0xffff; + ustack_data[1] = 0xdeadbeef; + let ustack = Some(native_stack_t { + addresses: ustack_data, + len: 2, + }); + + let mut kstack_data = [0; 127]; + kstack_data[0] = 0xffff; + kstack_data[1] = 0xdddd; + kstack_data[2] = 0xaaaa; + let kstack = Some(native_stack_t { + addresses: kstack_data, + len: 5, + }); + + let raw_sample_1 = RawSample { + pid: 1234, + tid: 1235, + ustack: ustack, + kstack: kstack, + }; + + let raw_sample_2 = RawSample { + pid: 1234, + tid: 1236, + ustack: ustack, + kstack: kstack, + }; + + let raw_sample_3 = RawSample { + pid: 123, + tid: 124, + ustack: ustack, + kstack: kstack, + }; + + let raw_samples = vec![raw_sample_1, raw_sample_2, raw_sample_3]; + + let aggregator = Aggregator::default(); + + // When + let raw_aggregated_profile = aggregator.aggregate(raw_samples); + + // Then + assert_eq!(raw_aggregated_profile.len(), 3); + } } From 6fca22be7f78d604ad60204f734ab277ea079de9 Mon Sep 17 00:00:00 2001 From: Okwudili Pat-Nebe Date: Mon, 28 Apr 2025 23:41:08 +0100 Subject: [PATCH 07/13] . --- src/aggregator.rs | 26 ++++++++--------- src/bpf/profiler.bpf.c | 12 ++++---- src/bpf/profiler.h | 1 + src/bpf/profiler_bindings.rs | 2 ++ src/bpf/shared_maps.h | 1 + src/cli/args.rs | 12 -------- src/cli/main.rs | 4 +-- src/profiler.rs | 56 +++++++++++++++++++++--------------- 8 files changed, 57 insertions(+), 57 deletions(-) diff --git a/src/aggregator.rs b/src/aggregator.rs index 033303da..418f4d30 100644 --- a/src/aggregator.rs +++ b/src/aggregator.rs @@ -116,11 +116,11 @@ mod tests { #[test] fn test_aggregate_raw_samples_same_ustack_diff_kstack() { - let mut ustack1_data = [0; 127]; - ustack1_data[0] = 0xffff; - ustack1_data[1] = 0xdeadbeef; - let ustack1 = Some(native_stack_t { - addresses: ustack1_data, + let mut ustack_data = [0; 127]; + ustack_data[0] = 0xffff; + ustack_data[1] = 0xdeadbeef; + let ustack = Some(native_stack_t { + addresses: ustack_data, len: 2, }); @@ -138,14 +138,14 @@ mod tests { let raw_sample_1 = RawSample { pid: 1234, tid: 1235, - ustack: ustack1, + ustack, kstack: kstack1, }; let raw_sample_2 = RawSample { pid: 1234, tid: 1235, - ustack: ustack1, + ustack, kstack: None, }; @@ -258,22 +258,22 @@ mod tests { let raw_sample_1 = RawSample { pid: 1234, tid: 1235, - ustack: ustack, - kstack: kstack, + ustack, + kstack, }; let raw_sample_2 = RawSample { pid: 1234, tid: 1236, - ustack: ustack, - kstack: kstack, + ustack, + kstack, }; let raw_sample_3 = RawSample { pid: 123, tid: 124, - ustack: ustack, - kstack: kstack, + ustack, + kstack, }; let raw_samples = vec![raw_sample_1, raw_sample_2, raw_sample_3]; diff --git a/src/bpf/profiler.bpf.c b/src/bpf/profiler.bpf.c index f91bb66e..88ff9cd6 100644 --- a/src/bpf/profiler.bpf.c +++ b/src/bpf/profiler.bpf.c @@ -333,14 +333,14 @@ unwind_state_t *unwind_state) { if (lightswitch_config.use_ring_buffers) { ret = bpf_ringbuf_output(&stacks_rb, unwind_state, sizeof(unwind_state_t), 0); - if (ret < 0) { - bpf_printk("add_stack ringbuf failed ret=%d", ret); - // TODO: add counter - // What will the counter be used for? - } + } else { ret = bpf_perf_event_output(ctx, &stacks, BPF_F_CURRENT_CPU, unwind_state, sizeof(unwind_state_t)); - // todo: What is ret used for here? + } + + if (ret < 0) { + bpf_printk("add_stack failed ret=%d", ret); + bump_unwind_error_failure_adding_stack(); } } diff --git a/src/bpf/profiler.h b/src/bpf/profiler.h index 16ac4525..06d6308b 100644 --- a/src/bpf/profiler.h +++ b/src/bpf/profiler.h @@ -108,6 +108,7 @@ struct unwinder_stats_t { u64 error_sending_new_process_event; u64 error_cfa_offset_did_not_fit; u64 error_rbp_offset_did_not_fit; + u64 error_failure_adding_stack; u64 bp_non_zero_for_bottom_frame; u64 vdso_encountered; u64 jit_encountered; diff --git a/src/bpf/profiler_bindings.rs b/src/bpf/profiler_bindings.rs index 808ef4fb..8bbca4b3 100644 --- a/src/bpf/profiler_bindings.rs +++ b/src/bpf/profiler_bindings.rs @@ -68,6 +68,8 @@ impl Add for unwinder_stats_t { + other.error_cfa_offset_did_not_fit, error_rbp_offset_did_not_fit: self.error_rbp_offset_did_not_fit + other.error_rbp_offset_did_not_fit, + error_failure_adding_stack: self.error_failure_adding_stack + + other.error_failure_adding_stack, bp_non_zero_for_bottom_frame: self.bp_non_zero_for_bottom_frame + other.bp_non_zero_for_bottom_frame, vdso_encountered: self.vdso_encountered + other.vdso_encountered, diff --git a/src/bpf/shared_maps.h b/src/bpf/shared_maps.h index fe44ef09..527346e7 100644 --- a/src/bpf/shared_maps.h +++ b/src/bpf/shared_maps.h @@ -45,6 +45,7 @@ DEFINE_COUNTER(error_binary_search_exhausted_iterations); DEFINE_COUNTER(error_sending_new_process_event); DEFINE_COUNTER(error_cfa_offset_did_not_fit); DEFINE_COUNTER(error_rbp_offset_did_not_fit); +DEFINE_COUNTER(error_failure_adding_stack); DEFINE_COUNTER(bp_non_zero_for_bottom_frame); DEFINE_COUNTER(vdso_encountered); DEFINE_COUNTER(jit_encountered); diff --git a/src/cli/args.rs b/src/cli/args.rs index 8795f72d..50184ee8 100644 --- a/src/cli/args.rs +++ b/src/cli/args.rs @@ -107,18 +107,6 @@ pub(crate) struct CliArgs { // Print out info on eBPF map sizes #[arg(long, help = "Print eBPF map sizes after creation")] pub(crate) mapsize_info: bool, - #[arg( - long, - default_value_t = ProfilerConfig::default().mapsize_stacks, - help = "max number of individual stacks to capture before aggregation" - )] - pub(crate) mapsize_stacks: u32, - #[arg( - long, - default_value_t = ProfilerConfig::default().mapsize_aggregated_stacks, - help = "max number of unique stacks after aggregation" - )] - pub(crate) mapsize_aggregated_stacks: u32, #[arg( long, default_value_t = ProfilerConfig::default().mapsize_rate_limits, diff --git a/src/cli/main.rs b/src/cli/main.rs index 6bc777fa..18b996db 100644 --- a/src/cli/main.rs +++ b/src/cli/main.rs @@ -192,8 +192,6 @@ fn main() -> Result<(), Box> { sample_freq: args.sample_freq, perf_buffer_bytes: args.perf_buffer_bytes, mapsize_info: args.mapsize_info, - mapsize_stacks: args.mapsize_stacks, - mapsize_aggregated_stacks: args.mapsize_aggregated_stacks, mapsize_rate_limits: args.mapsize_rate_limits, exclude_self: args.exclude_self, debug_info_manager, @@ -353,7 +351,7 @@ mod tests { cmd.arg("--help"); cmd.assert().success(); let actual = String::from_utf8(cmd.unwrap().stdout).unwrap(); - insta::assert_yaml_snapshot!(actual, @r#""Usage: lightswitch [OPTIONS] [COMMAND]\n\nCommands:\n object-info \n show-unwind \n system-info \n help Print this message or the help of the given subcommand(s)\n\nOptions:\n --pids \n Specific PIDs to profile\n\n -D, --duration \n How long this agent will run in seconds\n \n [default: 18446744073709551615]\n\n --libbpf-debug\n Enable libbpf logs. This includes the BPF verifier output\n\n --bpf-logging\n Enable BPF programs logging\n\n --logging \n Set lightswitch's logging level\n \n [default: info]\n [possible values: trace, debug, info, warn, error]\n\n --sample-freq \n Per-CPU Sampling Frequency in Hz\n \n [default: 19]\n\n --profile-format \n Output file for Flame Graph in SVG format\n \n [default: flame-graph]\n [possible values: none, flame-graph, pprof]\n\n --profile-path \n Path for the generated profile\n\n --profile-name \n Name for the generated profile\n\n --sender \n Where to write the profile\n \n [default: local-disk]\n\n Possible values:\n - none: Discard the profile. Used for kernel tests\n - local-disk\n - remote\n\n --server-url \n \n\n --token \n \n\n --perf-buffer-bytes \n Size of each profiler perf buffer, in bytes (must be a power of 2)\n \n [default: 524288]\n\n --mapsize-info\n Print eBPF map sizes after creation\n\n --mapsize-stacks \n max number of individual stacks to capture before aggregation\n \n [default: 100000]\n\n --mapsize-aggregated-stacks \n max number of unique stacks after aggregation\n \n [default: 10000]\n\n --mapsize-rate-limits \n max number of rate limit entries\n \n [default: 5000]\n\n --exclude-self\n Do not profile the profiler (myself)\n\n --symbolizer \n [default: local]\n [possible values: local, none]\n\n --debug-info-backend \n [default: none]\n [possible values: none, copy, remote]\n\n --max-native-unwind-info-size-mb \n approximate max size in megabytes used for the BPF maps that hold unwind information\n \n [default: 2147483647]\n\n --enable-deadlock-detector\n enable parking_lot's deadlock detector\n\n --cache-dir-base \n [default: /tmp]\n\n --killswitch-path-override \n Override the default path to the killswitch file (/tmp/lighswitch/killswitch) which prevents the profiler from starting\n\n --unsafe-start\n Force the profiler to start even if the system killswitch is enabled\n\n --force-perf-buffer\n force perf buffers even if ring buffers can be used\n\n -h, --help\n Print help (see a summary with '-h')\n""#); + insta::assert_yaml_snapshot!(actual, @r#""Usage: lightswitch [OPTIONS] [COMMAND]\n\nCommands:\n object-info \n show-unwind \n system-info \n help Print this message or the help of the given subcommand(s)\n\nOptions:\n --pids \n Specific PIDs to profile\n\n -D, --duration \n How long this agent will run in seconds\n \n [default: 18446744073709551615]\n\n --libbpf-debug\n Enable libbpf logs. This includes the BPF verifier output\n\n --bpf-logging\n Enable BPF programs logging\n\n --logging \n Set lightswitch's logging level\n \n [default: info]\n [possible values: trace, debug, info, warn, error]\n\n --sample-freq \n Per-CPU Sampling Frequency in Hz\n \n [default: 19]\n\n --profile-format \n Output file for Flame Graph in SVG format\n \n [default: flame-graph]\n [possible values: none, flame-graph, pprof]\n\n --profile-path \n Path for the generated profile\n\n --profile-name \n Name for the generated profile\n\n --sender \n Where to write the profile\n \n [default: local-disk]\n\n Possible values:\n - none: Discard the profile. Used for kernel tests\n - local-disk\n - remote\n\n --server-url \n \n\n --token \n \n\n --perf-buffer-bytes \n Size of each profiler perf buffer, in bytes (must be a power of 2)\n \n [default: 524288]\n\n --mapsize-info\n Print eBPF map sizes after creation\n\n --mapsize-rate-limits \n max number of rate limit entries\n \n [default: 5000]\n\n --exclude-self\n Do not profile the profiler (myself)\n\n --symbolizer \n [default: local]\n [possible values: local, none]\n\n --debug-info-backend \n [default: none]\n [possible values: none, copy, remote]\n\n --max-native-unwind-info-size-mb \n approximate max size in megabytes used for the BPF maps that hold unwind information\n \n [default: 2147483647]\n\n --enable-deadlock-detector\n enable parking_lot's deadlock detector\n\n --cache-dir-base \n [default: /tmp]\n\n --killswitch-path-override \n Override the default path to the killswitch file (/tmp/lighswitch/killswitch) which prevents the profiler from starting\n\n --unsafe-start\n Force the profiler to start even if the system killswitch is enabled\n\n --force-perf-buffer\n force perf buffers even if ring buffers can be used\n\n -h, --help\n Print help (see a summary with '-h')\n""#); } #[rstest] diff --git a/src/profiler.rs b/src/profiler.rs index 2db4d817..79540755 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -170,8 +170,6 @@ pub struct ProfilerConfig { pub perf_buffer_bytes: usize, pub session_duration: Duration, pub mapsize_info: bool, - pub mapsize_stacks: u32, - pub mapsize_aggregated_stacks: u32, pub mapsize_rate_limits: u32, pub exclude_self: bool, pub native_unwind_info_bucket_sizes: Vec, @@ -192,8 +190,6 @@ impl Default for ProfilerConfig { perf_buffer_bytes: 512 * 1024, session_duration: Duration::from_secs(5), mapsize_info: false, - mapsize_stacks: 100000, - mapsize_aggregated_stacks: 10000, mapsize_rate_limits: 5000, exclude_self: false, native_unwind_info_bucket_sizes: vec![ @@ -288,8 +284,6 @@ enum AddUnwindInformationError { BpfPages(String), } -static MAX_AGGREGATED_STACKS_ENTRIES: u64 = 10000; - impl Profiler { pub fn create_unwind_info_maps( open_skel: &mut OpenProfilerSkel, @@ -334,16 +328,6 @@ impl Profiler { open_skel: &mut OpenProfilerSkel, profiler_config: &ProfilerConfig, ) { - /* open_skel - .maps - .stacks - .set_max_entries(profiler_config.mapsize_stacks) - .expect("Unable to set stacks map max_entries"); */ - /* open_skel - .maps - .aggregated_stacks - .set_max_entries(profiler_config.mapsize_aggregated_stacks) - .expect("Unable to set aggregated_stacks map max_entries"); */ open_skel .maps .rate_limits @@ -362,7 +346,18 @@ impl Profiler { .use_ring_buffers .write(profiler_config.use_ring_buffers); + let max_raw_sample_entries = Profiler::get_stacks_sampling_buffer_size( + profiler_config.sample_freq as u32, + profiler_config.session_duration, + ); + if profiler_config.use_ring_buffers { + open_skel + .maps + .stacks_rb + .set_max_entries(max_raw_sample_entries) + .expect("failed to set stacks_rb max entries"); + // Even set to zero it will create as many entries as CPUs. open_skel .maps @@ -370,6 +365,12 @@ impl Profiler { .set_max_entries(0) .expect("set perf buffer entries to zero as it's unused"); } else { + open_skel + .maps + .stacks + .set_max_entries(max_raw_sample_entries) + .expect("failed to set stacks max entries"); + // Seems like ring buffers need to have size of at least 1... // It will use at least a page. open_skel @@ -712,15 +713,24 @@ impl Profiler { } } - pub fn run(mut self, collector: ThreadSafeCollector) -> Duration { - // In this case, we only want to calculate maximum sampling buffer sizes based on the + fn get_stacks_sampling_buffer_size(sample_freq: u32, session_duration: Duration) -> u32 { + // In this case, we only want to calculate maximum sampling buffer size based on the // number of "online" CPUs, not "possible" CPUs, which they sometimes differ. - let num_cpus = get_online_cpus().expect("get online CPUs").len() as u64; - let max_samples_per_session = self.sample_freq * num_cpus * self.session_duration.as_secs(); - if max_samples_per_session >= MAX_AGGREGATED_STACKS_ENTRIES { - warn!("samples might be lost due to too many samples in a profile session"); - } + let num_cpus: u32 = get_online_cpus().expect("get online CPUs").len() as u32; + let max_entries: u32 = sample_freq * num_cpus * session_duration.as_secs() as u32; + + info!( + "num_cpus={} sample_freq={} duration={} max_entries={}", + num_cpus, + sample_freq, + session_duration.as_secs(), + max_entries + ); + // max_entries + 4096 + } + pub fn run(mut self, collector: ThreadSafeCollector) -> Duration { self.setup_perf_events(); self.set_bpf_map_info(); self.add_kernel_modules(); From 199d6a6c4b07cbb8c495917bedbf7c5110941d8b Mon Sep 17 00:00:00 2001 From: Okwudili Pat-Nebe Date: Wed, 7 May 2025 08:09:16 +0100 Subject: [PATCH 08/13] Send raw stack instead of whole unwind_state to uspace --- src/bpf/profiler.bpf.c | 34 +++++++++++++++++----------------- src/bpf/profiler.h | 7 ++++++- src/bpf/profiler_bindings.rs | 2 +- src/profiler.rs | 12 ++++++------ 4 files changed, 30 insertions(+), 25 deletions(-) diff --git a/src/bpf/profiler.bpf.c b/src/bpf/profiler.bpf.c index 88ff9cd6..8ebcf01c 100644 --- a/src/bpf/profiler.bpf.c +++ b/src/bpf/profiler.bpf.c @@ -320,22 +320,22 @@ static __always_inline void add_stack(struct bpf_perf_event_data *ctx, u64 pid_tgid, unwind_state_t *unwind_state) { // Get the kernel stack - int ret = bpf_get_stack(ctx, unwind_state->kernel_stack.addresses, MAX_STACK_DEPTH * sizeof(u64), 0); + int ret = bpf_get_stack(ctx, unwind_state->raw_stack.kernel_stack.addresses, MAX_STACK_DEPTH * sizeof(u64), 0); if (ret >= 0) { - unwind_state->kernel_stack.len = ret / sizeof(u64); + unwind_state->raw_stack.kernel_stack.len = ret / sizeof(u64); } int per_process_id = pid_tgid >> 32; int per_thread_id = pid_tgid; - unwind_state->stack_key.pid = per_process_id; - unwind_state->stack_key.task_id = per_thread_id; + unwind_state->raw_stack.stack_key.pid = per_process_id; + unwind_state->raw_stack.stack_key.task_id = per_thread_id; if (lightswitch_config.use_ring_buffers) { - ret = bpf_ringbuf_output(&stacks_rb, unwind_state, sizeof(unwind_state_t), 0); + ret = bpf_ringbuf_output(&stacks_rb, &(unwind_state->raw_stack), sizeof(raw_stack_t), 0); } else { - ret = bpf_perf_event_output(ctx, &stacks, BPF_F_CURRENT_CPU, unwind_state, sizeof(unwind_state_t)); + ret = bpf_perf_event_output(ctx, &stacks, BPF_F_CURRENT_CPU, &(unwind_state->raw_stack), sizeof(raw_stack_t)); } if (ret < 0) { @@ -363,7 +363,7 @@ int dwarf_unwind(struct bpf_perf_event_data *ctx) { for (int i = 0; i < MAX_STACK_DEPTH_PER_PROGRAM; i++) { // LOG("[debug] Within unwinding machinery loop"); - LOG("## frame: %d", unwind_state->stack.len); + LOG("## frame: %d",unwind_state->raw_stack.stack.len); LOG("\tcurrent pc: %llx", unwind_state->ip); LOG("\tcurrent sp: %llx", unwind_state->sp); @@ -481,11 +481,11 @@ int dwarf_unwind(struct bpf_perf_event_data *ctx) { } // Add address to stack. - u64 len = unwind_state->stack.len; + u64 len =unwind_state->raw_stack.stack.len; // Appease the verifier. if (len >= 0 && len < MAX_STACK_DEPTH) { - unwind_state->stack.addresses[len] = unwind_state->ip; - unwind_state->stack.len++; + unwind_state->raw_stack.stack.addresses[len] = unwind_state->ip; + unwind_state->raw_stack.stack.len++; } if (found_rbp_type == RBP_TYPE_REGISTER || @@ -569,7 +569,7 @@ int dwarf_unwind(struct bpf_perf_event_data *ctx) { #ifdef __TARGET_ARCH_arm64 // Special handling for leaf frame. - if (unwind_state->stack.len == 0) { + if (unwind_state->raw_stack.stack.len == 0) { previous_rip = unwind_state->lr; } else { // This is guaranteed by the Aarch64 ABI. @@ -630,7 +630,7 @@ int dwarf_unwind(struct bpf_perf_event_data *ctx) { bump_unwind_success_dwarf(); return 0; - } else if (unwind_state->stack.len < MAX_STACK_DEPTH && + } else if (unwind_state->raw_stack.stack.len < MAX_STACK_DEPTH && unwind_state->tail_calls < MAX_TAIL_CALLS) { LOG("Continuing walking the stack in a tail call, current tail %d", unwind_state->tail_calls); @@ -646,12 +646,12 @@ int dwarf_unwind(struct bpf_perf_event_data *ctx) { // Set up the initial unwinding state. static __always_inline bool set_initial_state(unwind_state_t *unwind_state, bpf_user_pt_regs_t *regs) { - unwind_state->stack.len = 0; - unwind_state->kernel_stack.len = 0; - unwind_state->tail_calls = 0; + unwind_state->raw_stack.stack.len = 0; + unwind_state->raw_stack.kernel_stack.len = 0; + unwind_state->tail_calls = 0; - unwind_state->stack_key.pid = 0; - unwind_state->stack_key.task_id = 0; + unwind_state->raw_stack.stack_key.pid = 0; + unwind_state->raw_stack.stack_key.task_id = 0; if (in_kernel(PT_REGS_IP(regs))) { if (!retrieve_task_registers(&unwind_state->ip, &unwind_state->sp, &unwind_state->bp, &unwind_state->lr)) { diff --git a/src/bpf/profiler.h b/src/bpf/profiler.h index 06d6308b..3fda7382 100644 --- a/src/bpf/profiler.h +++ b/src/bpf/profiler.h @@ -170,8 +170,14 @@ typedef struct { } stack_key_t; typedef struct { + stack_key_t stack_key; native_stack_t stack; native_stack_t kernel_stack; +} raw_stack_t; + + +typedef struct { + raw_stack_t raw_stack; unsigned long long ip; unsigned long long sp; @@ -179,7 +185,6 @@ typedef struct { unsigned long long lr; int tail_calls; - stack_key_t stack_key; } unwind_state_t; enum event_type { diff --git a/src/bpf/profiler_bindings.rs b/src/bpf/profiler_bindings.rs index 8bbca4b3..0c30f716 100644 --- a/src/bpf/profiler_bindings.rs +++ b/src/bpf/profiler_bindings.rs @@ -9,7 +9,7 @@ use crate::unwind_info::types::CompactUnwindRow; include!(concat!(env!("OUT_DIR"), "/profiler_bindings.rs")); -unsafe impl Plain for unwind_state_t {} // @nocommit: this is a hack, we should create a new structure for this +unsafe impl Plain for raw_stack_t {} unsafe impl Plain for stack_key_t {} unsafe impl Plain for native_stack_t {} unsafe impl Plain for Event {} diff --git a/src/profiler.rs b/src/profiler.rs index 4faf3182..61383f4e 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -2105,14 +2105,14 @@ impl Profiler { } fn handle_stack(raw_sample_send: &Arc>, data: &[u8]) { - let mut unwind_state = unwind_state_t::default(); - plain::copy_from_bytes(&mut unwind_state, data).expect("handle stack serde"); + let mut raw_stack = raw_stack_t::default(); + plain::copy_from_bytes(&mut raw_stack, data).expect("handle stack serde"); let raw_sample = RawSample { - pid: unwind_state.stack_key.pid, - tid: unwind_state.stack_key.task_id, - ustack: Some(unwind_state.stack), - kstack: Some(unwind_state.kernel_stack), + pid: raw_stack.stack_key.pid, + tid: raw_stack.stack_key.task_id, + ustack: Some(raw_stack.stack), + kstack: Some(raw_stack.kernel_stack), }; raw_sample_send.send(raw_sample).expect("Send raw sample"); } From d5a9cb0aa4ac6a117d16ef634a60cfba5485a62c Mon Sep 17 00:00:00 2001 From: Okwudili Pat-Nebe Date: Wed, 7 May 2025 08:12:04 +0100 Subject: [PATCH 09/13] rm hash_stack --- src/bpf/profiler.h | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/src/bpf/profiler.h b/src/bpf/profiler.h index 3fda7382..f6b723f6 100644 --- a/src/bpf/profiler.h +++ b/src/bpf/profiler.h @@ -175,7 +175,6 @@ typedef struct { native_stack_t kernel_stack; } raw_stack_t; - typedef struct { raw_stack_t raw_stack; @@ -201,32 +200,3 @@ typedef struct { enum program { PROGRAM_NATIVE_UNWINDER = 0, }; - -#define BIG_CONSTANT(x) (x##LLU) -unsigned long long hash_stack(native_stack_t *stack) { - const unsigned long long m = BIG_CONSTANT(0xc6a4a7935bd1e995); - const int r = 47; - const int seed = 123; - - unsigned long long hash = seed ^ (stack->len * m); - - for (unsigned long long i = 0; i < MAX_STACK_DEPTH; i++) { - // The stack is not zeroed when we initialise it, we simply - // set the length to zero. This is a workaround to produce - // the same hash for two stacks that might have garbage values - // after their length. - unsigned long long k = 0; - if (i < stack->len) { - k = stack->addresses[i]; - } - - k *= m; - k ^= k >> r; - k *= m; - - hash ^= k; - hash *= m; - } - - return hash; -} From b86cfdb3777135be0af02001d9ec4c6fa9f48ded Mon Sep 17 00:00:00 2001 From: Okwudili Pat-Nebe Date: Mon, 12 May 2025 12:42:48 +0100 Subject: [PATCH 10/13] Add sample collection timestamp (nanoseconds) to stack_key --- src/bpf/profiler.bpf.c | 5 +++++ src/bpf/profiler.h | 1 + src/profiler.rs | 31 +++++++++++++++++++------------ 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/src/bpf/profiler.bpf.c b/src/bpf/profiler.bpf.c index 8ebcf01c..1a03d944 100644 --- a/src/bpf/profiler.bpf.c +++ b/src/bpf/profiler.bpf.c @@ -13,6 +13,7 @@ #include #include +volatile u64 walltime_at_system_boot_ns = 0; struct { __uint(type, BPF_MAP_TYPE_RINGBUF); @@ -331,6 +332,9 @@ unwind_state_t *unwind_state) { unwind_state->raw_stack.stack_key.pid = per_process_id; unwind_state->raw_stack.stack_key.task_id = per_thread_id; + u64 timestamp = bpf_ktime_get_boot_ns() + walltime_at_system_boot_ns; + unwind_state->raw_stack.stack_key.collected_at = timestamp; + if (lightswitch_config.use_ring_buffers) { ret = bpf_ringbuf_output(&stacks_rb, &(unwind_state->raw_stack), sizeof(raw_stack_t), 0); @@ -652,6 +656,7 @@ static __always_inline bool set_initial_state(unwind_state_t *unwind_state, bpf_ unwind_state->raw_stack.stack_key.pid = 0; unwind_state->raw_stack.stack_key.task_id = 0; + unwind_state->raw_stack.stack_key.collected_at = 0; if (in_kernel(PT_REGS_IP(regs))) { if (!retrieve_task_registers(&unwind_state->ip, &unwind_state->sp, &unwind_state->bp, &unwind_state->lr)) { diff --git a/src/bpf/profiler.h b/src/bpf/profiler.h index f6b723f6..143e5ea7 100644 --- a/src/bpf/profiler.h +++ b/src/bpf/profiler.h @@ -167,6 +167,7 @@ typedef struct { typedef struct { int task_id; int pid; + u64 collected_at; } stack_key_t; typedef struct { diff --git a/src/profiler.rs b/src/profiler.rs index 61383f4e..d36a64d0 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -324,10 +324,7 @@ impl Profiler { map_shapes } - pub fn set_profiler_map_sizes( - open_skel: &mut OpenProfilerSkel, - profiler_config: &ProfilerConfig, - ) { + pub fn init_profiler_maps(open_skel: &mut OpenProfilerSkel, profiler_config: &ProfilerConfig) { open_skel .maps .rate_limits @@ -411,6 +408,10 @@ impl Profiler { ); } + fn walltime_at_system_boot() -> u64 { + procfs::boot_time().unwrap().timestamp_nanos_opt().unwrap() as u64 + } + pub fn new( profiler_config: ProfilerConfig, stop_signal_receive: Receiver<()>, @@ -454,20 +455,18 @@ impl Profiler { &mut open_skel, &profiler_config.native_unwind_info_bucket_sizes, ); - Self::set_profiler_map_sizes(&mut open_skel, &profiler_config); + Self::init_profiler_maps(&mut open_skel, &profiler_config); let native_unwinder = ManuallyDrop::new(open_skel.load().expect("load skel")); // SAFETY: native_unwinder never outlives native_unwinder_open_object - let native_unwinder = unsafe { + let mut native_unwinder = unsafe { std::mem::transmute::>, ManuallyDrop>>( native_unwinder, ) }; info!("native unwinder BPF program loaded"); - let native_unwinder_maps = &native_unwinder.maps; - let exec_mappings_fd = native_unwinder_maps.exec_mappings.as_fd(); // BPF map sizes can be overriden, this is a debugging option to print the actual size once // the maps are created and the BPF program is loaded. @@ -475,6 +474,13 @@ impl Profiler { Self::show_actual_profiler_map_sizes(&native_unwinder); } + let native_unwinder_maps = &mut native_unwinder.maps; + let exec_mappings_fd = native_unwinder_maps.exec_mappings.as_fd(); + + // Set baseline for calculating raw_sample wallclock collection time + // using offset since boot. + native_unwinder_maps.bss_data.walltime_at_system_boot_ns = Self::walltime_at_system_boot(); + let mut tracers_builder = TracersSkelBuilder::default(); tracers_builder .obj_builder @@ -740,7 +746,7 @@ impl Profiler { let raw_sample_send = self.raw_sample_send.clone(); self.start_poll_thread( - "raw-samples-poll-thread", + "raw_samples", &self.native_unwinder.maps.stacks_rb, &self.native_unwinder.maps.stacks, move |data| Self::handle_stack(&raw_sample_send, data), @@ -2117,7 +2123,9 @@ impl Profiler { raw_sample_send.send(raw_sample).expect("Send raw sample"); } - fn handle_lost_stack(_cpu: i32, _count: u64) {} + fn handle_lost_stack(cpu: i32, count: u64) { + error!("lost {count} stacks on cpu {cpu}"); + } fn handle_event(sender: &Arc>, data: &[u8]) { let mut event = Event::default(); @@ -2186,8 +2194,7 @@ mod tests { &mut open_skel, &profiler_config.native_unwind_info_bucket_sizes, ); - Profiler::set_profiler_map_sizes(&mut open_skel, &profiler_config); - + Profiler::init_profiler_maps(&mut open_skel, &profiler_config); let native_unwinder = open_skel.load().expect("load skel"); // add and delete bpf process works From e566136b1ef90d92898de1e5cd53f99ab772ddb1 Mon Sep 17 00:00:00 2001 From: Okwudili Pat-Nebe Date: Mon, 12 May 2025 13:03:06 +0100 Subject: [PATCH 11/13] Move walltime_at_system_boot_ns to rodata --- src/bpf/profiler.bpf.c | 5 +---- src/bpf/profiler.h | 2 ++ src/profiler.rs | 18 ++++++++++-------- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/bpf/profiler.bpf.c b/src/bpf/profiler.bpf.c index 1a03d944..5fb52a33 100644 --- a/src/bpf/profiler.bpf.c +++ b/src/bpf/profiler.bpf.c @@ -13,8 +13,6 @@ #include #include -volatile u64 walltime_at_system_boot_ns = 0; - struct { __uint(type, BPF_MAP_TYPE_RINGBUF); __uint(max_entries, 256 * 1024 /* 256 KB */); // adjust max entries based on frequency on userspace -- todo @@ -337,13 +335,12 @@ unwind_state_t *unwind_state) { if (lightswitch_config.use_ring_buffers) { ret = bpf_ringbuf_output(&stacks_rb, &(unwind_state->raw_stack), sizeof(raw_stack_t), 0); - } else { ret = bpf_perf_event_output(ctx, &stacks, BPF_F_CURRENT_CPU, &(unwind_state->raw_stack), sizeof(raw_stack_t)); } if (ret < 0) { - bpf_printk("add_stack failed ret=%d", ret); + bpf_printk("add_stack failed ret=%d, use_ring_buffers=", ret); bump_unwind_error_failure_adding_stack(); } } diff --git a/src/bpf/profiler.h b/src/bpf/profiler.h index 143e5ea7..41cb7623 100644 --- a/src/bpf/profiler.h +++ b/src/bpf/profiler.h @@ -120,6 +120,8 @@ const volatile struct lightswitch_config_t lightswitch_config = { .use_task_pt_regs_helper = false, }; +const volatile u64 walltime_at_system_boot_ns = 0; + #define LOG(fmt, ...) \ ({ \ if (lightswitch_config.verbose_logging) { \ diff --git a/src/profiler.rs b/src/profiler.rs index d36a64d0..fcc6720e 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -343,6 +343,10 @@ impl Profiler { .use_ring_buffers .write(profiler_config.use_ring_buffers); + // Set baseline for calculating raw_sample collection wall clock time + // using offset since boot. + open_skel.maps.rodata_data.walltime_at_system_boot_ns = Self::walltime_at_system_boot(); + let max_raw_sample_entries = Profiler::get_stacks_sampling_buffer_size( profiler_config.sample_freq as u32, profiler_config.session_duration, @@ -460,7 +464,7 @@ impl Profiler { let native_unwinder = ManuallyDrop::new(open_skel.load().expect("load skel")); // SAFETY: native_unwinder never outlives native_unwinder_open_object - let mut native_unwinder = unsafe { + let native_unwinder = unsafe { std::mem::transmute::>, ManuallyDrop>>( native_unwinder, ) @@ -468,19 +472,15 @@ impl Profiler { info!("native unwinder BPF program loaded"); + let native_unwinder_maps = &native_unwinder.maps; + let exec_mappings_fd = native_unwinder_maps.exec_mappings.as_fd(); + // BPF map sizes can be overriden, this is a debugging option to print the actual size once // the maps are created and the BPF program is loaded. if profiler_config.mapsize_info { Self::show_actual_profiler_map_sizes(&native_unwinder); } - let native_unwinder_maps = &mut native_unwinder.maps; - let exec_mappings_fd = native_unwinder_maps.exec_mappings.as_fd(); - - // Set baseline for calculating raw_sample wallclock collection time - // using offset since boot. - native_unwinder_maps.bss_data.walltime_at_system_boot_ns = Self::walltime_at_system_boot(); - let mut tracers_builder = TracersSkelBuilder::default(); tracers_builder .obj_builder @@ -2114,6 +2114,8 @@ impl Profiler { let mut raw_stack = raw_stack_t::default(); plain::copy_from_bytes(&mut raw_stack, data).expect("handle stack serde"); + info!("*************** {}", raw_stack.stack_key.collected_at); + let raw_sample = RawSample { pid: raw_stack.stack_key.pid, tid: raw_stack.stack_key.task_id, From fe5553184bf0af71e595741780fb485399d1554f Mon Sep 17 00:00:00 2001 From: Okwudili Pat-Nebe Date: Mon, 12 May 2025 13:07:05 +0100 Subject: [PATCH 12/13] . --- src/profiler.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/profiler.rs b/src/profiler.rs index fcc6720e..c9834d4f 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -324,7 +324,7 @@ impl Profiler { map_shapes } - pub fn init_profiler_maps(open_skel: &mut OpenProfilerSkel, profiler_config: &ProfilerConfig) { + pub fn setup_profiler_maps(open_skel: &mut OpenProfilerSkel, profiler_config: &ProfilerConfig) { open_skel .maps .rate_limits @@ -344,7 +344,7 @@ impl Profiler { .write(profiler_config.use_ring_buffers); // Set baseline for calculating raw_sample collection wall clock time - // using offset since boot. + // as bpf currently only supports getting the offset since system boot. open_skel.maps.rodata_data.walltime_at_system_boot_ns = Self::walltime_at_system_boot(); let max_raw_sample_entries = Profiler::get_stacks_sampling_buffer_size( @@ -459,7 +459,7 @@ impl Profiler { &mut open_skel, &profiler_config.native_unwind_info_bucket_sizes, ); - Self::init_profiler_maps(&mut open_skel, &profiler_config); + Self::setup_profiler_maps(&mut open_skel, &profiler_config); let native_unwinder = ManuallyDrop::new(open_skel.load().expect("load skel")); @@ -2114,8 +2114,6 @@ impl Profiler { let mut raw_stack = raw_stack_t::default(); plain::copy_from_bytes(&mut raw_stack, data).expect("handle stack serde"); - info!("*************** {}", raw_stack.stack_key.collected_at); - let raw_sample = RawSample { pid: raw_stack.stack_key.pid, tid: raw_stack.stack_key.task_id, @@ -2196,7 +2194,7 @@ mod tests { &mut open_skel, &profiler_config.native_unwind_info_bucket_sizes, ); - Profiler::init_profiler_maps(&mut open_skel, &profiler_config); + Profiler::setup_profiler_maps(&mut open_skel, &profiler_config); let native_unwinder = open_skel.load().expect("load skel"); // add and delete bpf process works From 9b22412c20f9df9099bc0fda90320e55e02a93c6 Mon Sep 17 00:00:00 2001 From: Okwudili Pat-Nebe Date: Tue, 13 May 2025 07:48:55 +0100 Subject: [PATCH 13/13] . --- src/bpf/profiler.bpf.c | 2 +- src/profiler.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/bpf/profiler.bpf.c b/src/bpf/profiler.bpf.c index 5fb52a33..756f203a 100644 --- a/src/bpf/profiler.bpf.c +++ b/src/bpf/profiler.bpf.c @@ -22,7 +22,7 @@ struct { __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); __uint(key_size, sizeof(u32)); __uint(value_size, sizeof(u32)); - // adjust max entries based on frequency on userspace - todo (what does this mean?) + // adjust max entries based on frequency on userspace - todo } stacks SEC(".maps"); struct { diff --git a/src/profiler.rs b/src/profiler.rs index c9834d4f..b6e4fe4f 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -471,7 +471,6 @@ impl Profiler { }; info!("native unwinder BPF program loaded"); - let native_unwinder_maps = &native_unwinder.maps; let exec_mappings_fd = native_unwinder_maps.exec_mappings.as_fd();