Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"lading_capture",
"lading_fuzz",
"lading_payload",
"lading_observer",
"lading_throttle",
]

Expand Down Expand Up @@ -60,6 +61,11 @@ http-body-util = { version = "0.1" }
http-serde = { version = "2.1" }
hyper = { version = "1.7", default-features = false }
hyper-util = { version = "0.1", default-features = false }
heck = { version = "0.5", default-features = false }
nix = { version = "0.30" }
num-traits = { version = "0.2", default-features = false }
num_cpus = "1.17"
regex = { version = "1.12" }
sysinfo = { version = "0.37", default-features = false }
opentelemetry-proto = "0.31"
ddsketch-agent = { git = "https://github.com/DataDog/saluki", rev = "f47a7ef588c53aa1da35dcfd93808595ebeb1291", features = [
Expand Down
8 changes: 3 additions & 5 deletions lading/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ description = "A tool for load testing daemons."
lading-capture = { version = "0.2", path = "../lading_capture" }
lading-capture-schema = { path = "../lading_capture_schema" }
lading-payload = { version = "0.1", path = "../lading_payload" }
lading-observer = { version = "0.1", path = "../lading_observer" }
lading-throttle = { version = "0.1", path = "../lading_throttle" }
lading-signal = { version = "0.1", path = "../lading_signal" }

Expand All @@ -39,7 +40,6 @@ either = { version = "1.15.0" }
flate2 = { version = "1.1.2" }
futures = { version = "0.3.31" }
fuser = { version = "0.16", optional = true, features = ["libfuse"] }
heck = { version = "0.5", default-features = false }
http = { workspace = true }
http-body-util = { workspace = true }
http-serde = { version = "2.1" }
Expand All @@ -53,14 +53,13 @@ k8s-openapi = { version = "0.26.0", default-features = false, features = [
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
metrics-util = { workspace = true }
nix = { version = "0.30", features = ["fs", "signal"] }
nix = { workspace = true, features = ["fs", "signal"] }
num_cpus = { version = "1.17" }
num-traits = { version = "0.2", default-features = false }
once_cell = { workspace = true }
opentelemetry-proto = { workspace = true }
prost = { workspace = true }
rand = { workspace = true, features = ["small_rng", "thread_rng"] }
regex = { version = "1.12" }
regex = { workspace = true }
reqwest = { version = "0.12", default-features = false, features = [
"default-tls",
"json",
Expand Down Expand Up @@ -95,7 +94,6 @@ tracing-subscriber = { workspace = true, features = ["env-filter", "json"] }
zstd = { version = "0.13.3" }

[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "0.18", default-features = false, features = [] }
async-pidfd = { version = "0.1" }

[dev-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion lading/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ mod common;
pub mod config;
pub mod generator;
pub mod inspector;
pub mod observer;
/// Target observation utilities.
pub use lading_observer as observer;
pub(crate) mod proto;
pub mod target;
pub mod target_metrics;
Expand Down
1 change: 0 additions & 1 deletion lading/src/observer/linux/utils.rs

This file was deleted.

34 changes: 34 additions & 0 deletions lading_observer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "lading-observer"
version = "0.1.0"
authors = [
"Brian L. Troutwine <brian.troutwine@datadoghq.com>",
"George Hahn <george.hahn@datadoghq.com>",
]
edition = "2024"
license = "MIT"
repository = "https://github.com/datadog/lading/"
keywords = ["random_test", "generator"]
categories = ["development-tools::profiling"]
description = "A tool for load testing daemons."

[dependencies]
lading-signal = { version = "0.1", path = "../lading_signal" }
metrics = { workspace = true }
nix = { workspace = true, features = ["fs", "signal"] }
num-traits = { workspace = true }
num_cpus = { workspace = true }
procfs = { version = "0.18", default-features = false, features = [] }
regex = { workspace = true }
rustc-hash = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["fs", "macros", "sync", "time"] }
tracing = { workspace = true }
heck = { workspace = true }

[lib]
doctest = false

[lints]
workspace = true
44 changes: 24 additions & 20 deletions lading/src/observer.rs → lading_observer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
//! Manage the target observer
//! Observe target resource usage for lading.
//!
//! The interogation that lading does of the target sub-process is intentionally
//! limited to in-process concerns, for the most part. The 'inspector' does
//! allow for a sub-process to do out-of-band inspection of the target but
//! cannot incorporate whatever it's doing into the capture data that lading
//! produces. This observer, on Linux, looks up the target process in procfs and
//! writes out key details about memory and CPU consumption into the capture
//! data. On non-Linux systems the observer, if enabled, will emit a warning.
//! This library supports the lading binary found elsewhere in this project. It
//! samples target resource usage, primarily on Linux via procfs, and is not
//! intended for external use.

use std::io;
#![deny(clippy::cargo)]
#![allow(clippy::cast_precision_loss)]
#![allow(clippy::multiple_crate_versions)]

use crate::target::TargetPidReceiver;
use std::{io, time::Duration};

#[cfg(target_os = "linux")]
use crate::linux::Sampler;
use serde::Deserialize;
use tokio::sync::broadcast;

#[cfg(target_os = "linux")]
mod linux;
/// Linux-specific observer functionality for cgroups and procfs.
pub mod linux;

/// Type used to receive the target PID once it is running.
pub type TargetPidReceiver = broadcast::Receiver<Option<i32>>;

#[derive(thiserror::Error, Debug)]
/// Errors produced by [`Server`]
Expand All @@ -38,13 +44,13 @@ pub enum Error {
pub struct Config {}

#[derive(Debug)]
/// The inspector sub-process server.
/// The observer sub-process server.
///
/// This struct manages a sub-process that can be used to do further examination
/// of the [`crate::target::Server`] by means of operating system facilities. The
/// sub-process is not created until [`Server::run`] is called. It is assumed
/// that only one instance of this struct will ever exist at a time, although
/// there are no protections for that.
/// of a target process by means of operating system facilities. The sub-process
/// is not created until [`Server::run`] is called. It is assumed that only one
/// instance of this struct will ever exist at a time, although there are no
/// protections for that.
pub struct Server {
#[allow(dead_code)] // config is not actively used, left as a stub
config: Config,
Expand Down Expand Up @@ -93,10 +99,8 @@ impl Server {
pub async fn run(
self,
mut pid_snd: TargetPidReceiver,
sample_period: std::time::Duration,
sample_period: Duration,
) -> Result<(), Error> {
use crate::observer::linux::Sampler;

let target_pid = pid_snd
.recv()
.await
Expand Down Expand Up @@ -143,7 +147,7 @@ impl Server {
pub async fn run(
self,
_pid_snd: TargetPidReceiver,
_sample_period: std::time::Duration,
_sample_period: Duration,
) -> Result<(), Error> {
tracing::warn!("observer unavailable on non-Linux system");
Ok(())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod cgroup;
mod procfs;
/// Cgroup v2 metrics collection.
pub mod cgroup;
/// Procfs metrics collection.
pub mod procfs;
mod utils;
mod wss;

use tracing::{error, warn};
use tracing::warn;

#[derive(thiserror::Error, Debug)]
/// Errors produced by functions in this module
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// Code to read cgroup information.
pub(crate) mod v2;
pub mod v2;

use std::{collections::VecDeque, io, path::PathBuf};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@
use tracing::{debug, error, warn};

#[derive(thiserror::Error, Debug)]
/// Errors that can occur during cgroup v2 operations.
pub enum Error {
/// I/O error.
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
/// Integer parsing error.
#[error("Parse int error: {0}")]
ParseInt(#[from] std::num::ParseIntError),
/// Float parsing error.
#[error("Parse float error: {0}")]
ParseFloat(#[from] std::num::ParseFloatError),
/// Cgroup v2 not found.
#[error("Cgroup v2 not found")]
CgroupV2NotFound,
/// PSI parsing error.
#[error("Parsing PSI error: {0}")]
ParsingPsi(String),
}
Expand Down Expand Up @@ -52,7 +58,7 @@
/// Polls for any cgroup metrics that can be read, v2 version.
#[tracing::instrument(skip_all)]
#[allow(clippy::too_many_lines)]
pub(crate) async fn poll(file_path: &Path, labels: &[(String, String)]) -> Result<(), Error> {
pub async fn poll(file_path: &Path, labels: &[(String, String)]) -> Result<(), Error> {

Check failure on line 61 in lading_observer/src/linux/cgroup/v2.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

docs for function returning `Result` missing `# Errors` section
// Read all files in the cgroup `path` and create metrics for them. If we
// lack permissions to read we skip the file. We do not use ? to allow for
// the maximal number of files to be read.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// Sampler implementation for procfs filesystems
mod memory;
pub mod memory;
mod stat;
mod uptime;
mod vmstat;
Expand All @@ -14,7 +14,7 @@
use rustc_hash::FxHashMap;
use tracing::{error, warn};

use crate::observer::linux::utils::process_descendents::ProcessDescendantsIterator;
use crate::linux::utils::process_descendents::ProcessDescendantsIterator;

const BYTES_PER_KIBIBYTE: u64 = 1024;

Expand Down Expand Up @@ -410,10 +410,10 @@

#[cfg(test)]
mod tests {
use super::*;

Check failure on line 413 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, check)

unused import: `super::*`

Check failure on line 413 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `super::*`

Check failure on line 413 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Check Benchmarks Compile

unused import: `super::*`
use proptest::prelude::*;

Check failure on line 414 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, check)

failed to resolve: use of unresolved module or unlinked crate `proptest`

Check failure on line 414 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Test Suite

failed to resolve: use of unresolved module or unlinked crate `proptest`

Check failure on line 414 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Check Benchmarks Compile

failed to resolve: use of unresolved module or unlinked crate `proptest`

prop_compose! {

Check failure on line 416 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, check)

cannot find macro `prop_compose` in this scope

Check failure on line 416 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Test Suite

cannot find macro `prop_compose` in this scope

Check failure on line 416 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Check Benchmarks Compile

cannot find macro `prop_compose` in this scope
/// Generate a valid executable path
fn arb_exe_path()(
components in prop::collection::vec("[a-zA-Z0-9_-]+", 1..5),
Expand All @@ -422,7 +422,7 @@
}
}

prop_compose! {

Check failure on line 425 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, check)

cannot find macro `prop_compose` in this scope

Check failure on line 425 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Test Suite

cannot find macro `prop_compose` in this scope

Check failure on line 425 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Check Benchmarks Compile

cannot find macro `prop_compose` in this scope
/// Generate a command line with arguments
fn arb_cmdline()(
cmd in "[a-zA-Z0-9_-]+",
Expand All @@ -436,7 +436,7 @@
}
}

prop_compose! {

Check failure on line 439 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, check)

cannot find macro `prop_compose` in this scope

Check failure on line 439 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Test Suite

cannot find macro `prop_compose` in this scope

Check failure on line 439 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Check Benchmarks Compile

cannot find macro `prop_compose` in this scope
/// Generate process info for testing
fn arb_process_info()(
exe in arb_exe_path(),
Expand All @@ -454,7 +454,7 @@
}
}

proptest! {

Check failure on line 457 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, check)

cannot find macro `proptest` in this scope

Check failure on line 457 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Test Suite

cannot find macro `proptest` in this scope

Check failure on line 457 in lading_observer/src/linux/procfs.rs

View workflow job for this annotation

GitHub Actions / Check Benchmarks Compile

cannot find macro `proptest` in this scope
#[test]
fn identical_processes_are_forked_but_not_execed(
exe in arb_exe_path(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub(crate) mod smaps;
pub(crate) mod smaps_rollup;
/// Memory metrics from /proc/[pid]/smaps_rollup.

Check failure on line 2 in lading_observer/src/linux/procfs/memory.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

item in documentation is missing backticks
pub mod smaps_rollup;

const BYTES_PER_KIBIBYTE: u64 = 1024;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{fs, io::Read};
use regex::Regex;
use rustc_hash::FxHashMap;

use crate::observer::linux::procfs::BYTES_PER_KIBIBYTE;
use crate::linux::procfs::BYTES_PER_KIBIBYTE;

use super::next_token;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,33 @@

#[derive(thiserror::Error, Debug)]
/// Errors produced by functions in this module
pub(crate) enum Error {
pub enum Error {
/// Wrapper for [`std::io::Error`]
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
/// Integer parsing error.
#[error("Number Parsing: {0}")]
ParseInt(#[from] std::num::ParseIntError),
/// Parsing error.
#[error("Parsing: {0}")]
Parsing(String),
}

#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct Aggregator {
pub(crate) rss: u64,
pub(crate) pss: u64,
/// Aggregator for memory metrics from smaps_rollup.

Check failure on line 22 in lading_observer/src/linux/procfs/memory/smaps_rollup.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

item in documentation is missing backticks
pub struct Aggregator {
/// Resident Set Size in bytes.
pub rss: u64,
/// Proportional Set Size in bytes.
pub pss: u64,
}

// Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics.
pub(crate) async fn poll(
/// Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics.
///
/// # Errors
///
/// Returns an error if the file cannot be read or parsed.
pub async fn poll(
pid: i32,
labels: &[(&'static str, String)],
aggr: &mut Aggregator,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use metrics::gauge;
use tokio::fs;

use crate::observer::linux::cgroup;
use crate::linux::cgroup;

#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down
1 change: 1 addition & 0 deletions lading_observer/src/linux/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(in crate::linux) mod process_descendents;
Loading
Loading