From 9ce162f4411481369ecfa21c4591b6d1570dde83 Mon Sep 17 00:00:00 2001 From: "datadog-datadog-prod-us1[bot]" <88084959+datadog-datadog-prod-us1[bot]@users.noreply.github.com> Date: Wed, 14 Jan 2026 21:24:26 +0000 Subject: [PATCH 1/4] Extract observer into new crate Co-authored-by: scottopell <996472+scottopell@users.noreply.github.com> --- Cargo.toml | 5 +++ lading/Cargo.toml | 8 ++-- lading/src/lib.rs | 3 +- lading/src/observer/linux/utils.rs | 1 - lading_observer/Cargo.toml | 33 +++++++++++++++ .../observer.rs => lading_observer/src/lib.rs | 41 ++++++++++--------- .../observer => lading_observer/src}/linux.rs | 2 +- .../src}/linux/cgroup.rs | 0 .../src}/linux/cgroup/v2.rs | 0 .../src}/linux/cgroup/v2/cpu.rs | 0 .../src}/linux/cgroup/v2/io.rs | 0 .../src}/linux/cgroup/v2/memory.rs | 0 .../src}/linux/procfs.rs | 2 +- .../src}/linux/procfs/memory.rs | 0 .../src}/linux/procfs/memory/smaps.rs | 2 +- .../src}/linux/procfs/memory/smaps_rollup.rs | 0 .../src}/linux/procfs/stat.rs | 2 +- .../src}/linux/procfs/uptime.rs | 0 .../src}/linux/procfs/vmstat.rs | 0 lading_observer/src/linux/utils.rs | 1 + .../src}/linux/utils/process_descendents.rs | 6 +-- .../linux/utils/tests/create_process_tree.py | 0 .../src}/linux/wss.rs | 2 +- .../src}/linux/wss/pfnset.rs | 0 24 files changed, 74 insertions(+), 34 deletions(-) delete mode 100644 lading/src/observer/linux/utils.rs create mode 100644 lading_observer/Cargo.toml rename lading/src/observer.rs => lading_observer/src/lib.rs (78%) rename {lading/src/observer => lading_observer/src}/linux.rs (98%) rename {lading/src/observer => lading_observer/src}/linux/cgroup.rs (100%) rename {lading/src/observer => lading_observer/src}/linux/cgroup/v2.rs (100%) rename {lading/src/observer => lading_observer/src}/linux/cgroup/v2/cpu.rs (100%) rename {lading/src/observer => lading_observer/src}/linux/cgroup/v2/io.rs (100%) rename {lading/src/observer => lading_observer/src}/linux/cgroup/v2/memory.rs (100%) rename {lading/src/observer => lading_observer/src}/linux/procfs.rs (99%) rename {lading/src/observer => lading_observer/src}/linux/procfs/memory.rs (100%) rename {lading/src/observer => lading_observer/src}/linux/procfs/memory/smaps.rs (99%) rename {lading/src/observer => lading_observer/src}/linux/procfs/memory/smaps_rollup.rs (100%) rename {lading/src/observer => lading_observer/src}/linux/procfs/stat.rs (99%) rename {lading/src/observer => lading_observer/src}/linux/procfs/uptime.rs (100%) rename {lading/src/observer => lading_observer/src}/linux/procfs/vmstat.rs (100%) create mode 100644 lading_observer/src/linux/utils.rs rename {lading/src/observer => lading_observer/src}/linux/utils/process_descendents.rs (93%) rename {lading/src/observer => lading_observer/src}/linux/utils/tests/create_process_tree.py (100%) mode change 100755 => 100644 rename {lading/src/observer => lading_observer/src}/linux/wss.rs (97%) rename {lading/src/observer => lading_observer/src}/linux/wss/pfnset.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index 634792332..a345747dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "lading_capture", "lading_fuzz", "lading_payload", + "lading_observer", "lading_throttle", ] @@ -60,6 +61,10 @@ http-body-util = { version = "0.1" } http-serde = { version = "2.1" } hyper = { version = "1.7", default-features = false } hyper-util = { version = "0.1", default-features = false } +heck = { version = "0.5", default-features = false } +nix = { version = "0.30" } +num-traits = { version = "0.2", default-features = false } +regex = { version = "1.12" } sysinfo = { version = "0.37", default-features = false } opentelemetry-proto = "0.31" ddsketch-agent = { git = "https://github.com/DataDog/saluki", rev = "f47a7ef588c53aa1da35dcfd93808595ebeb1291", features = [ diff --git a/lading/Cargo.toml b/lading/Cargo.toml index a53e1ea31..32ca72a1b 100644 --- a/lading/Cargo.toml +++ b/lading/Cargo.toml @@ -18,6 +18,7 @@ description = "A tool for load testing daemons." lading-capture = { version = "0.2", path = "../lading_capture" } lading-capture-schema = { path = "../lading_capture_schema" } lading-payload = { version = "0.1", path = "../lading_payload" } +lading-observer = { version = "0.1", path = "../lading_observer" } lading-throttle = { version = "0.1", path = "../lading_throttle" } lading-signal = { version = "0.1", path = "../lading_signal" } @@ -39,7 +40,6 @@ either = { version = "1.15.0" } flate2 = { version = "1.1.2" } futures = { version = "0.3.31" } fuser = { version = "0.16", optional = true, features = ["libfuse"] } -heck = { version = "0.5", default-features = false } http = { workspace = true } http-body-util = { workspace = true } http-serde = { version = "2.1" } @@ -53,14 +53,13 @@ k8s-openapi = { version = "0.26.0", default-features = false, features = [ metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } metrics-util = { workspace = true } -nix = { version = "0.30", features = ["fs", "signal"] } +nix = { workspace = true, features = ["fs", "signal"] } num_cpus = { version = "1.17" } -num-traits = { version = "0.2", default-features = false } once_cell = { workspace = true } opentelemetry-proto = { workspace = true } prost = { workspace = true } rand = { workspace = true, features = ["small_rng", "thread_rng"] } -regex = { version = "1.12" } +regex = { workspace = true } reqwest = { version = "0.12", default-features = false, features = [ "default-tls", "json", @@ -95,7 +94,6 @@ tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } zstd = { version = "0.13.3" } [target.'cfg(target_os = "linux")'.dependencies] -procfs = { version = "0.18", default-features = false, features = [] } async-pidfd = { version = "0.1" } [dev-dependencies] diff --git a/lading/src/lib.rs b/lading/src/lib.rs index f8d382aab..a4952fb82 100644 --- a/lading/src/lib.rs +++ b/lading/src/lib.rs @@ -17,7 +17,8 @@ mod common; pub mod config; pub mod generator; pub mod inspector; -pub mod observer; +/// Target observation utilities. +pub use lading_observer as observer; pub(crate) mod proto; pub mod target; pub mod target_metrics; diff --git a/lading/src/observer/linux/utils.rs b/lading/src/observer/linux/utils.rs deleted file mode 100644 index 836f25422..000000000 --- a/lading/src/observer/linux/utils.rs +++ /dev/null @@ -1 +0,0 @@ -pub(in crate::observer::linux) mod process_descendents; diff --git a/lading_observer/Cargo.toml b/lading_observer/Cargo.toml new file mode 100644 index 000000000..07e4b3624 --- /dev/null +++ b/lading_observer/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "lading-observer" +version = "0.1.0" +authors = [ + "Brian L. Troutwine ", + "George Hahn ", +] +edition = "2024" +license = "MIT" +repository = "https://github.com/datadog/lading/" +keywords = ["random_test", "generator"] +categories = ["development-tools::profiling"] +description = "A tool for load testing daemons." + +[dependencies] +lading-signal = { version = "0.1", path = "../lading_signal" } +metrics = { workspace = true } +nix = { workspace = true, features = ["fs", "signal"] } +num-traits = { workspace = true } +procfs = { version = "0.18", default-features = false, features = [] } +regex = { workspace = true } +rustc-hash = { workspace = true } +serde = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["fs", "macros", "sync", "time"] } +tracing = { workspace = true } +heck = { workspace = true } + +[lib] +doctest = false + +[lints] +workspace = true \ No newline at end of file diff --git a/lading/src/observer.rs b/lading_observer/src/lib.rs similarity index 78% rename from lading/src/observer.rs rename to lading_observer/src/lib.rs index 6d1f3495f..da3fdea56 100644 --- a/lading/src/observer.rs +++ b/lading_observer/src/lib.rs @@ -1,21 +1,26 @@ -//! Manage the target observer +//! Observe target resource usage for lading. //! -//! The interogation that lading does of the target sub-process is intentionally -//! limited to in-process concerns, for the most part. The 'inspector' does -//! allow for a sub-process to do out-of-band inspection of the target but -//! cannot incorporate whatever it's doing into the capture data that lading -//! produces. This observer, on Linux, looks up the target process in procfs and -//! writes out key details about memory and CPU consumption into the capture -//! data. On non-Linux systems the observer, if enabled, will emit a warning. +//! This library supports the lading binary found elsewhere in this project. It +//! samples target resource usage, primarily on Linux via procfs, and is not +//! intended for external use. -use std::io; +#![deny(clippy::cargo)] +#![allow(clippy::cast_precision_loss)] +#![allow(clippy::multiple_crate_versions)] -use crate::target::TargetPidReceiver; +use std::{io, time::Duration}; + +#[cfg(target_os = "linux")] +use crate::linux::Sampler; use serde::Deserialize; +use tokio::sync::broadcast; #[cfg(target_os = "linux")] mod linux; +/// Type used to receive the target PID once it is running. +pub type TargetPidReceiver = broadcast::Receiver>; + #[derive(thiserror::Error, Debug)] /// Errors produced by [`Server`] pub enum Error { @@ -38,13 +43,13 @@ pub enum Error { pub struct Config {} #[derive(Debug)] -/// The inspector sub-process server. +/// The observer sub-process server. /// /// This struct manages a sub-process that can be used to do further examination -/// of the [`crate::target::Server`] by means of operating system facilities. The -/// sub-process is not created until [`Server::run`] is called. It is assumed -/// that only one instance of this struct will ever exist at a time, although -/// there are no protections for that. +/// of a target process by means of operating system facilities. The sub-process +/// is not created until [`Server::run`] is called. It is assumed that only one +/// instance of this struct will ever exist at a time, although there are no +/// protections for that. pub struct Server { #[allow(dead_code)] // config is not actively used, left as a stub config: Config, @@ -93,10 +98,8 @@ impl Server { pub async fn run( self, mut pid_snd: TargetPidReceiver, - sample_period: std::time::Duration, + sample_period: Duration, ) -> Result<(), Error> { - use crate::observer::linux::Sampler; - let target_pid = pid_snd .recv() .await @@ -143,7 +146,7 @@ impl Server { pub async fn run( self, _pid_snd: TargetPidReceiver, - _sample_period: std::time::Duration, + _sample_period: Duration, ) -> Result<(), Error> { tracing::warn!("observer unavailable on non-Linux system"); Ok(()) diff --git a/lading/src/observer/linux.rs b/lading_observer/src/linux.rs similarity index 98% rename from lading/src/observer/linux.rs rename to lading_observer/src/linux.rs index 78dc5826f..5b1347a1c 100644 --- a/lading/src/observer/linux.rs +++ b/lading_observer/src/linux.rs @@ -3,7 +3,7 @@ mod procfs; mod utils; mod wss; -use tracing::{error, warn}; +use tracing::warn; #[derive(thiserror::Error, Debug)] /// Errors produced by functions in this module diff --git a/lading/src/observer/linux/cgroup.rs b/lading_observer/src/linux/cgroup.rs similarity index 100% rename from lading/src/observer/linux/cgroup.rs rename to lading_observer/src/linux/cgroup.rs diff --git a/lading/src/observer/linux/cgroup/v2.rs b/lading_observer/src/linux/cgroup/v2.rs similarity index 100% rename from lading/src/observer/linux/cgroup/v2.rs rename to lading_observer/src/linux/cgroup/v2.rs diff --git a/lading/src/observer/linux/cgroup/v2/cpu.rs b/lading_observer/src/linux/cgroup/v2/cpu.rs similarity index 100% rename from lading/src/observer/linux/cgroup/v2/cpu.rs rename to lading_observer/src/linux/cgroup/v2/cpu.rs diff --git a/lading/src/observer/linux/cgroup/v2/io.rs b/lading_observer/src/linux/cgroup/v2/io.rs similarity index 100% rename from lading/src/observer/linux/cgroup/v2/io.rs rename to lading_observer/src/linux/cgroup/v2/io.rs diff --git a/lading/src/observer/linux/cgroup/v2/memory.rs b/lading_observer/src/linux/cgroup/v2/memory.rs similarity index 100% rename from lading/src/observer/linux/cgroup/v2/memory.rs rename to lading_observer/src/linux/cgroup/v2/memory.rs diff --git a/lading/src/observer/linux/procfs.rs b/lading_observer/src/linux/procfs.rs similarity index 99% rename from lading/src/observer/linux/procfs.rs rename to lading_observer/src/linux/procfs.rs index dabc1f87c..5cbe7c4c1 100644 --- a/lading/src/observer/linux/procfs.rs +++ b/lading_observer/src/linux/procfs.rs @@ -14,7 +14,7 @@ use procfs::process::Process; use rustc_hash::FxHashMap; use tracing::{error, warn}; -use crate::observer::linux::utils::process_descendents::ProcessDescendantsIterator; +use crate::linux::utils::process_descendents::ProcessDescendantsIterator; const BYTES_PER_KIBIBYTE: u64 = 1024; diff --git a/lading/src/observer/linux/procfs/memory.rs b/lading_observer/src/linux/procfs/memory.rs similarity index 100% rename from lading/src/observer/linux/procfs/memory.rs rename to lading_observer/src/linux/procfs/memory.rs diff --git a/lading/src/observer/linux/procfs/memory/smaps.rs b/lading_observer/src/linux/procfs/memory/smaps.rs similarity index 99% rename from lading/src/observer/linux/procfs/memory/smaps.rs rename to lading_observer/src/linux/procfs/memory/smaps.rs index b8fd18a58..689d7738e 100644 --- a/lading/src/observer/linux/procfs/memory/smaps.rs +++ b/lading_observer/src/linux/procfs/memory/smaps.rs @@ -3,7 +3,7 @@ use std::{fs, io::Read}; use regex::Regex; use rustc_hash::FxHashMap; -use crate::observer::linux::procfs::BYTES_PER_KIBIBYTE; +use crate::linux::procfs::BYTES_PER_KIBIBYTE; use super::next_token; diff --git a/lading/src/observer/linux/procfs/memory/smaps_rollup.rs b/lading_observer/src/linux/procfs/memory/smaps_rollup.rs similarity index 100% rename from lading/src/observer/linux/procfs/memory/smaps_rollup.rs rename to lading_observer/src/linux/procfs/memory/smaps_rollup.rs diff --git a/lading/src/observer/linux/procfs/stat.rs b/lading_observer/src/linux/procfs/stat.rs similarity index 99% rename from lading/src/observer/linux/procfs/stat.rs rename to lading_observer/src/linux/procfs/stat.rs index ad85813c7..f06913c5a 100644 --- a/lading/src/observer/linux/procfs/stat.rs +++ b/lading_observer/src/linux/procfs/stat.rs @@ -1,7 +1,7 @@ use metrics::gauge; use tokio::fs; -use crate::observer::linux::cgroup; +use crate::linux::cgroup; #[derive(thiserror::Error, Debug)] pub enum Error { diff --git a/lading/src/observer/linux/procfs/uptime.rs b/lading_observer/src/linux/procfs/uptime.rs similarity index 100% rename from lading/src/observer/linux/procfs/uptime.rs rename to lading_observer/src/linux/procfs/uptime.rs diff --git a/lading/src/observer/linux/procfs/vmstat.rs b/lading_observer/src/linux/procfs/vmstat.rs similarity index 100% rename from lading/src/observer/linux/procfs/vmstat.rs rename to lading_observer/src/linux/procfs/vmstat.rs diff --git a/lading_observer/src/linux/utils.rs b/lading_observer/src/linux/utils.rs new file mode 100644 index 000000000..8dc781208 --- /dev/null +++ b/lading_observer/src/linux/utils.rs @@ -0,0 +1 @@ +pub(in crate::linux) mod process_descendents; diff --git a/lading/src/observer/linux/utils/process_descendents.rs b/lading_observer/src/linux/utils/process_descendents.rs similarity index 93% rename from lading/src/observer/linux/utils/process_descendents.rs rename to lading_observer/src/linux/utils/process_descendents.rs index a08a80dc9..9507f40f7 100644 --- a/lading/src/observer/linux/utils/process_descendents.rs +++ b/lading_observer/src/linux/utils/process_descendents.rs @@ -1,12 +1,12 @@ use procfs::process::Process; /// Iterator which, given a process ID, returns the process and all its descendants -pub(in crate::observer::linux) struct ProcessDescendantsIterator { +pub(in crate::linux) struct ProcessDescendantsIterator { stack: Vec, } impl ProcessDescendantsIterator { - pub(in crate::observer::linux) fn new(parent_pid: i32) -> Self { + pub(in crate::linux) fn new(parent_pid: i32) -> Self { Self { stack: vec![ Process::new(parent_pid) @@ -59,7 +59,7 @@ mod tests { const NB_PROCESSES: usize = (NB_PROCESSES_PER_LEVEL.pow(NB_LEVELS + 1) - 1) / (NB_PROCESSES_PER_LEVEL - 1); - let mut child = Command::new("src/observer/linux/utils/tests/create_process_tree.py") + let mut child = Command::new("src/linux/utils/tests/create_process_tree.py") .arg(NB_PROCESSES_PER_LEVEL.to_string()) .arg(NB_LEVELS.to_string()) .stdout(Stdio::piped()) diff --git a/lading/src/observer/linux/utils/tests/create_process_tree.py b/lading_observer/src/linux/utils/tests/create_process_tree.py old mode 100755 new mode 100644 similarity index 100% rename from lading/src/observer/linux/utils/tests/create_process_tree.py rename to lading_observer/src/linux/utils/tests/create_process_tree.py diff --git a/lading/src/observer/linux/wss.rs b/lading_observer/src/linux/wss.rs similarity index 97% rename from lading/src/observer/linux/wss.rs rename to lading_observer/src/linux/wss.rs index 4a3ba3795..096ae69d0 100644 --- a/lading/src/observer/linux/wss.rs +++ b/lading_observer/src/linux/wss.rs @@ -7,7 +7,7 @@ use std::{ }; use tracing::debug; -use crate::observer::linux::utils::process_descendents::ProcessDescendantsIterator; +use crate::linux::utils::process_descendents::ProcessDescendantsIterator; mod pfnset; use pfnset::PfnSet; diff --git a/lading/src/observer/linux/wss/pfnset.rs b/lading_observer/src/linux/wss/pfnset.rs similarity index 100% rename from lading/src/observer/linux/wss/pfnset.rs rename to lading_observer/src/linux/wss/pfnset.rs From 7fd67b6eeeff1d66e939a3b007e59ad2a5189adc Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Wed, 14 Jan 2026 22:05:12 +0000 Subject: [PATCH 2/4] feat(observer): make modules and functions public for external consumption Make linux, cgroup, procfs modules and key functions public to allow external crates like fgm-observer to use the polling APIs directly without going through the Server interface. Public API changes: - linux module (lib.rs) - cgroup, procfs modules (linux.rs) - v2 module (cgroup.rs) - memory module (procfs.rs) - smaps_rollup module (procfs/memory.rs) - poll function (cgroup/v2.rs) --- lading_observer/src/lib.rs | 2 +- lading_observer/src/linux.rs | 4 ++-- lading_observer/src/linux/cgroup.rs | 2 +- lading_observer/src/linux/cgroup/v2.rs | 2 +- lading_observer/src/linux/procfs.rs | 2 +- lading_observer/src/linux/procfs/memory.rs | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lading_observer/src/lib.rs b/lading_observer/src/lib.rs index da3fdea56..25db58957 100644 --- a/lading_observer/src/lib.rs +++ b/lading_observer/src/lib.rs @@ -16,7 +16,7 @@ use serde::Deserialize; use tokio::sync::broadcast; #[cfg(target_os = "linux")] -mod linux; +pub mod linux; /// Type used to receive the target PID once it is running. pub type TargetPidReceiver = broadcast::Receiver>; diff --git a/lading_observer/src/linux.rs b/lading_observer/src/linux.rs index 5b1347a1c..6fff4cd14 100644 --- a/lading_observer/src/linux.rs +++ b/lading_observer/src/linux.rs @@ -1,5 +1,5 @@ -mod cgroup; -mod procfs; +pub mod cgroup; +pub mod procfs; mod utils; mod wss; diff --git a/lading_observer/src/linux/cgroup.rs b/lading_observer/src/linux/cgroup.rs index 73de6abd5..8b46843e7 100644 --- a/lading_observer/src/linux/cgroup.rs +++ b/lading_observer/src/linux/cgroup.rs @@ -1,5 +1,5 @@ /// Code to read cgroup information. -pub(crate) mod v2; +pub mod v2; use std::{collections::VecDeque, io, path::PathBuf}; diff --git a/lading_observer/src/linux/cgroup/v2.rs b/lading_observer/src/linux/cgroup/v2.rs index 4d435e57d..186277dd5 100644 --- a/lading_observer/src/linux/cgroup/v2.rs +++ b/lading_observer/src/linux/cgroup/v2.rs @@ -52,7 +52,7 @@ pub(crate) async fn get_path(pid: i32) -> Result { /// Polls for any cgroup metrics that can be read, v2 version. #[tracing::instrument(skip_all)] #[allow(clippy::too_many_lines)] -pub(crate) async fn poll(file_path: &Path, labels: &[(String, String)]) -> Result<(), Error> { +pub async fn poll(file_path: &Path, labels: &[(String, String)]) -> Result<(), Error> { // Read all files in the cgroup `path` and create metrics for them. If we // lack permissions to read we skip the file. We do not use ? to allow for // the maximal number of files to be read. diff --git a/lading_observer/src/linux/procfs.rs b/lading_observer/src/linux/procfs.rs index 5cbe7c4c1..ccc6a212e 100644 --- a/lading_observer/src/linux/procfs.rs +++ b/lading_observer/src/linux/procfs.rs @@ -1,5 +1,5 @@ /// Sampler implementation for procfs filesystems -mod memory; +pub mod memory; mod stat; mod uptime; mod vmstat; diff --git a/lading_observer/src/linux/procfs/memory.rs b/lading_observer/src/linux/procfs/memory.rs index 99775a7fc..1a6edc2b6 100644 --- a/lading_observer/src/linux/procfs/memory.rs +++ b/lading_observer/src/linux/procfs/memory.rs @@ -1,5 +1,5 @@ pub(crate) mod smaps; -pub(crate) mod smaps_rollup; +pub mod smaps_rollup; const BYTES_PER_KIBIBYTE: u64 = 1024; From 508b213bd83f22715e28c5c3c31b0473f1f8dc42 Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Wed, 14 Jan 2026 22:05:54 +0000 Subject: [PATCH 3/4] feat(observer): make smaps_rollup types and functions public Make Aggregator struct, poll function, and Error enum public in smaps_rollup module to allow external crates to use memory sampling functionality. --- .../src/linux/procfs/memory/smaps_rollup.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lading_observer/src/linux/procfs/memory/smaps_rollup.rs b/lading_observer/src/linux/procfs/memory/smaps_rollup.rs index 17c6b777d..d3aada3fe 100644 --- a/lading_observer/src/linux/procfs/memory/smaps_rollup.rs +++ b/lading_observer/src/linux/procfs/memory/smaps_rollup.rs @@ -6,7 +6,7 @@ use super::{BYTES_PER_KIBIBYTE, next_token}; #[derive(thiserror::Error, Debug)] /// Errors produced by functions in this module -pub(crate) enum Error { +pub enum Error { /// Wrapper for [`std::io::Error`] #[error("IO error: {0}")] Io(#[from] std::io::Error), @@ -17,13 +17,13 @@ pub(crate) enum Error { } #[derive(Debug, Clone, Copy, Default)] -pub(crate) struct Aggregator { - pub(crate) rss: u64, - pub(crate) pss: u64, +pub struct Aggregator { + pub rss: u64, + pub pss: u64, } // Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics. -pub(crate) async fn poll( +pub async fn poll( pid: i32, labels: &[(&'static str, String)], aggr: &mut Aggregator, From dc40d1b55842675540f546c7e2a4f7b08a7ed575 Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Thu, 15 Jan 2026 18:05:07 +0000 Subject: [PATCH 4/4] Add num_cpus dependency and documentation for public APIs - Add num_cpus to workspace dependencies for CPU count detection - Document all public modules, types, and functions to satisfy linter - Fix missing docs for linux, cgroup, procfs modules - Add comprehensive error variant documentation --- Cargo.lock | 23 ++++++++++++++++--- Cargo.toml | 1 + lading_observer/Cargo.toml | 1 + lading_observer/src/lib.rs | 1 + lading_observer/src/linux.rs | 2 ++ lading_observer/src/linux/cgroup/v2.rs | 6 +++++ lading_observer/src/linux/procfs/memory.rs | 1 + .../src/linux/procfs/memory/smaps_rollup.rs | 11 ++++++++- 8 files changed, 42 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5afdef675..ef9520172 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1843,7 +1843,6 @@ dependencies = [ "flate2", "fuser", "futures", - "heck", "http", "http-body-util", "http-serde", @@ -1854,6 +1853,7 @@ dependencies = [ "kube-core", "lading-capture", "lading-capture-schema", + "lading-observer", "lading-payload", "lading-signal", "lading-throttle", @@ -1861,12 +1861,10 @@ dependencies = [ "metrics-exporter-prometheus", "metrics-util", "nix 0.30.1", - "num-traits", "num_cpus", "once_cell", "opentelemetry-proto", "parquet", - "procfs", "proptest", "prost", "prost-build", @@ -1940,6 +1938,25 @@ dependencies = [ name = "lading-fuzz" version = "0.1.0" +[[package]] +name = "lading-observer" +version = "0.1.0" +dependencies = [ + "heck", + "lading-signal", + "metrics", + "nix 0.30.1", + "num-traits", + "num_cpus", + "procfs", + "regex", + "rustc-hash", + "serde", + "thiserror 2.0.17", + "tokio", + "tracing", +] + [[package]] name = "lading-payload" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index a345747dd..5ac3f938a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ hyper-util = { version = "0.1", default-features = false } heck = { version = "0.5", default-features = false } nix = { version = "0.30" } num-traits = { version = "0.2", default-features = false } +num_cpus = "1.17" regex = { version = "1.12" } sysinfo = { version = "0.37", default-features = false } opentelemetry-proto = "0.31" diff --git a/lading_observer/Cargo.toml b/lading_observer/Cargo.toml index 07e4b3624..fddb1272d 100644 --- a/lading_observer/Cargo.toml +++ b/lading_observer/Cargo.toml @@ -17,6 +17,7 @@ lading-signal = { version = "0.1", path = "../lading_signal" } metrics = { workspace = true } nix = { workspace = true, features = ["fs", "signal"] } num-traits = { workspace = true } +num_cpus = { workspace = true } procfs = { version = "0.18", default-features = false, features = [] } regex = { workspace = true } rustc-hash = { workspace = true } diff --git a/lading_observer/src/lib.rs b/lading_observer/src/lib.rs index 25db58957..c75fb3c15 100644 --- a/lading_observer/src/lib.rs +++ b/lading_observer/src/lib.rs @@ -16,6 +16,7 @@ use serde::Deserialize; use tokio::sync::broadcast; #[cfg(target_os = "linux")] +/// Linux-specific observer functionality for cgroups and procfs. pub mod linux; /// Type used to receive the target PID once it is running. diff --git a/lading_observer/src/linux.rs b/lading_observer/src/linux.rs index 6fff4cd14..b28ccf4c1 100644 --- a/lading_observer/src/linux.rs +++ b/lading_observer/src/linux.rs @@ -1,4 +1,6 @@ +/// Cgroup v2 metrics collection. pub mod cgroup; +/// Procfs metrics collection. pub mod procfs; mod utils; mod wss; diff --git a/lading_observer/src/linux/cgroup/v2.rs b/lading_observer/src/linux/cgroup/v2.rs index 186277dd5..d4239729a 100644 --- a/lading_observer/src/linux/cgroup/v2.rs +++ b/lading_observer/src/linux/cgroup/v2.rs @@ -13,15 +13,21 @@ use tokio::fs; use tracing::{debug, error, warn}; #[derive(thiserror::Error, Debug)] +/// Errors that can occur during cgroup v2 operations. pub enum Error { + /// I/O error. #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Integer parsing error. #[error("Parse int error: {0}")] ParseInt(#[from] std::num::ParseIntError), + /// Float parsing error. #[error("Parse float error: {0}")] ParseFloat(#[from] std::num::ParseFloatError), + /// Cgroup v2 not found. #[error("Cgroup v2 not found")] CgroupV2NotFound, + /// PSI parsing error. #[error("Parsing PSI error: {0}")] ParsingPsi(String), } diff --git a/lading_observer/src/linux/procfs/memory.rs b/lading_observer/src/linux/procfs/memory.rs index 1a6edc2b6..629401396 100644 --- a/lading_observer/src/linux/procfs/memory.rs +++ b/lading_observer/src/linux/procfs/memory.rs @@ -1,4 +1,5 @@ pub(crate) mod smaps; +/// Memory metrics from /proc/[pid]/smaps_rollup. pub mod smaps_rollup; const BYTES_PER_KIBIBYTE: u64 = 1024; diff --git a/lading_observer/src/linux/procfs/memory/smaps_rollup.rs b/lading_observer/src/linux/procfs/memory/smaps_rollup.rs index d3aada3fe..923342ee3 100644 --- a/lading_observer/src/linux/procfs/memory/smaps_rollup.rs +++ b/lading_observer/src/linux/procfs/memory/smaps_rollup.rs @@ -10,19 +10,28 @@ pub enum Error { /// Wrapper for [`std::io::Error`] #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Integer parsing error. #[error("Number Parsing: {0}")] ParseInt(#[from] std::num::ParseIntError), + /// Parsing error. #[error("Parsing: {0}")] Parsing(String), } #[derive(Debug, Clone, Copy, Default)] +/// Aggregator for memory metrics from smaps_rollup. pub struct Aggregator { + /// Resident Set Size in bytes. pub rss: u64, + /// Proportional Set Size in bytes. pub pss: u64, } -// Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics. +/// Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics. +/// +/// # Errors +/// +/// Returns an error if the file cannot be read or parsed. pub async fn poll( pid: i32, labels: &[(&'static str, String)],