From f4e795936f96ef4581f7a1f2d3606a3f07e40fbe Mon Sep 17 00:00:00 2001 From: Jake Saferstein Date: Tue, 30 Dec 2025 10:26:58 -0500 Subject: [PATCH 1/3] resize blocks --- CHANGELOG.md | 2 ++ lading_payload/src/block.rs | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 491b2b6ec..9429fba3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fingerprint mechanism now calculates Shannon entropy. - Lading now supports a '--json-logs' flag to output logs in structured JSON format. +- Resize blocks after payload generation so that when a significant portion of + the buffer is unused it frees that memory. ## [0.30.0] ## Added diff --git a/lading_payload/src/block.rs b/lading_payload/src/block.rs index ab2c4f99d..3684b711b 100644 --- a/lading_payload/src/block.rs +++ b/lading_payload/src/block.rs @@ -675,7 +675,16 @@ where { let mut block: Writer = BytesMut::with_capacity(chunk_size as usize).writer(); serializer.to_bytes(&mut rng, chunk_size as usize, &mut block)?; - let bytes: Bytes = block.into_inner().freeze(); + let inner = block.into_inner(); + // When the actual block data usage is under half of its allocated capacity (chunk_size), + // shrink its buffer to the actual size to avoid holding onto excess capacity. + // This ensures that generators with lots of small blocks respect the total cache size, and + // no block cache will hold more than 2x the total cache size in allocated buffers. + let bytes: Bytes = if inner.len() < inner.capacity() / 2 { + Bytes::copy_from_slice(&inner) + } else { + inner.freeze() + }; if bytes.is_empty() { // Blocks should not be empty and if they are empty this is an // error. Caller may choose to handle this however they wish, often it From 413e3a8a2cbf733aad9029603b5a0c26e1bad31c Mon Sep 17 00:00:00 2001 From: Jake Saferstein Date: Mon, 5 Jan 2026 10:54:39 -0500 Subject: [PATCH 2/3] throttler error cleanup --- lading/src/generator/file_gen/logrotate.rs | 2 +- lading/src/generator/file_gen/traditional.rs | 2 +- lading/src/generator/http.rs | 2 +- lading/src/generator/splunk_hec.rs | 2 +- lading/src/generator/tcp.rs | 2 +- lading/src/generator/trace_agent.rs | 2 +- lading/src/generator/udp.rs | 2 +- lading/src/generator/unix_datagram.rs | 2 +- lading/src/generator/unix_stream.rs | 2 +- lading_throttle/src/linear.rs | 14 +++++++++++--- lading_throttle/src/stable.rs | 19 +++++++++++++++---- 11 files changed, 35 insertions(+), 16 deletions(-) diff --git a/lading/src/generator/file_gen/logrotate.rs b/lading/src/generator/file_gen/logrotate.rs index 5efdbbbb5..77101131e 100644 --- a/lading/src/generator/file_gen/logrotate.rs +++ b/lading/src/generator/file_gen/logrotate.rs @@ -382,7 +382,7 @@ impl Child { &self.labels).await?; } Err(err) => { - error!("Throttle request of {} is larger than throttle capacity. Block will be discarded. Error: {}", total_bytes, err); + error!("Discarding block due to throttle error: {err}"); } } } diff --git a/lading/src/generator/file_gen/traditional.rs b/lading/src/generator/file_gen/traditional.rs index 72ea23fd3..02b246670 100644 --- a/lading/src/generator/file_gen/traditional.rs +++ b/lading/src/generator/file_gen/traditional.rs @@ -351,7 +351,7 @@ impl Child { } } Err(err) => { - error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); + error!("Discarding block due to throttle error: {err}"); } } } diff --git a/lading/src/generator/http.rs b/lading/src/generator/http.rs index 3ba49a64d..c94ce97d7 100644 --- a/lading/src/generator/http.rs +++ b/lading/src/generator/http.rs @@ -285,7 +285,7 @@ impl Http { } Err(err) => { - error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); + error!("Discarding block due to throttle error: {err}"); } } }, diff --git a/lading/src/generator/splunk_hec.rs b/lading/src/generator/splunk_hec.rs index 9cc03e7a3..779e35486 100644 --- a/lading/src/generator/splunk_hec.rs +++ b/lading/src/generator/splunk_hec.rs @@ -319,7 +319,7 @@ impl SplunkHec { tokio::spawn(send_hec_request(permit, block_length, labels, channel, client, request, request_shutdown.clone(), uri_clone)); } Err(err) => { - error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); + error!("Discarding block due to throttle error: {err}"); } } } diff --git a/lading/src/generator/tcp.rs b/lading/src/generator/tcp.rs index 8bcdb3efe..c089f539e 100644 --- a/lading/src/generator/tcp.rs +++ b/lading/src/generator/tcp.rs @@ -272,7 +272,7 @@ impl TcpWorker { } } Err(err) => { - error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); + error!("Discarding block due to throttle error: {err}"); } } } diff --git a/lading/src/generator/trace_agent.rs b/lading/src/generator/trace_agent.rs index c31e8ef73..a4068b413 100644 --- a/lading/src/generator/trace_agent.rs +++ b/lading/src/generator/trace_agent.rs @@ -378,7 +378,7 @@ impl TraceAgent { }); } Err(err) => { - error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}", total_bytes = total_bytes); + error!("Discarding block due to throttle error: {err}"); } } }, diff --git a/lading/src/generator/udp.rs b/lading/src/generator/udp.rs index 3062ae8e5..3e84560b8 100644 --- a/lading/src/generator/udp.rs +++ b/lading/src/generator/udp.rs @@ -275,7 +275,7 @@ impl UdpWorker { } } Err(err) => { - error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); + error!("Discarding block due to throttle error: {err}"); } } } diff --git a/lading/src/generator/unix_datagram.rs b/lading/src/generator/unix_datagram.rs index 006e0cc78..b513bb302 100644 --- a/lading/src/generator/unix_datagram.rs +++ b/lading/src/generator/unix_datagram.rs @@ -298,7 +298,7 @@ impl Child { } } Err(err) => { - error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); + error!("Discarding block due to throttle error: {err}"); } } } diff --git a/lading/src/generator/unix_stream.rs b/lading/src/generator/unix_stream.rs index 366f20662..f2808ef11 100644 --- a/lading/src/generator/unix_stream.rs +++ b/lading/src/generator/unix_stream.rs @@ -321,7 +321,7 @@ impl Child { } } Err(err) => { - error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); + error!("Discarding block due to throttle error: {err}"); } } } diff --git a/lading_throttle/src/linear.rs b/lading_throttle/src/linear.rs index b06f6b576..5df496445 100644 --- a/lading_throttle/src/linear.rs +++ b/lading_throttle/src/linear.rs @@ -12,8 +12,13 @@ use super::{Clock, RealClock}; #[derive(thiserror::Error, Debug, Clone, Copy)] pub enum Error { /// Requested capacity is greater than maximum allowed capacity. - #[error("Capacity")] - Capacity, + #[error("capacity request {requested} exceeds maximum {maximum}")] + Capacity { + /// The requested capacity that exceeded the maximum. + requested: u32, + /// The maximum capacity permitted by the throttle. + maximum: u32, + }, } #[derive(Debug)] @@ -133,7 +138,10 @@ impl Valve { // ticker if the caller makes a zero request. It's strange but it's a // valid thing to do. if capacity_request > self.maximum_capacity { - return Err(Error::Capacity); + return Err(Error::Capacity { + requested: capacity_request, + maximum: self.maximum_capacity, + }); } let current_interval = ticks_elapsed / INTERVAL_TICKS; diff --git a/lading_throttle/src/stable.rs b/lading_throttle/src/stable.rs index c09895328..ab138c0bc 100644 --- a/lading_throttle/src/stable.rs +++ b/lading_throttle/src/stable.rs @@ -10,8 +10,13 @@ use super::{Clock, INTERVAL_TICKS, RealClock}; #[derive(thiserror::Error, Debug, Clone, Copy, PartialEq)] pub enum Error { /// Requested capacity is greater than maximum allowed capacity. - #[error("Capacity")] - Capacity, + #[error("capacity request {requested} exceeds throttle's maximum {maximum}")] + Capacity { + /// The requested capacity that exceeded the maximum. + requested: u32, + /// The maximum capacity permitted by the throttle. + maximum: u32, + }, } #[derive(Debug)] @@ -144,7 +149,10 @@ impl Valve { // ticker if the caller makes a zero request. It's strange but it's a // valid thing to do. if capacity_request > self.maximum_capacity { - return Err(Error::Capacity); + return Err(Error::Capacity { + requested: capacity_request, + maximum: self.maximum_capacity, + }); } let current_interval = tick_to_interval(ticks_elapsed); @@ -353,7 +361,10 @@ mod verification { capacity_request: u32, ) -> Result { if capacity_request > self.maximum_capacity { - return Err(super::Error::Capacity); + return Err(super::Error::Capacity { + requested: capacity_request, + maximum: self.maximum_capacity, + }); } let current_interval = tick_to_interval(ticks_elapsed); From a1be97ca274939c3632f5c7dbe21d0f8456785a5 Mon Sep 17 00:00:00 2001 From: Jake Saferstein Date: Tue, 6 Jan 2026 16:36:31 -0500 Subject: [PATCH 3/3] config tests --- lading/src/generator/common.rs | 60 +++++++++++++++++++ lading/src/generator/file_gen/logrotate_fs.rs | 53 ++++++++++++++++ 2 files changed, 113 insertions(+) diff --git a/lading/src/generator/common.rs b/lading/src/generator/common.rs index eeee3d86c..95829e729 100644 --- a/lading/src/generator/common.rs +++ b/lading/src/generator/common.rs @@ -296,4 +296,64 @@ mod tests { let pooled = ConcurrencyStrategy::new(None, false); assert_eq!(pooled.connection_count(), 1); } + + mod throttle_config_parsing { + use crate::generator::common::BytesThrottleConfig; + use serde_yaml::with::singleton_map_recursive; + + /// Helper to deserialize ThrottleConfig using singleton_map_recursive + /// (matches how the main config deserializes it) + fn parse_throttle_config(yaml: &str) -> BytesThrottleConfig { + let value: serde_yaml::Value = serde_yaml::from_str(yaml).unwrap(); + singleton_map_recursive::deserialize(value).unwrap() + } + + #[test] + fn parse_all_out() { + let yaml = r#"all_out"#; + let config = parse_throttle_config(yaml); + assert!(matches!(config, BytesThrottleConfig::AllOut)); + } + + #[test] + fn parse_stable_bytes_per_second() { + let yaml = r#" + stable: + bytes_per_second: "10 MiB" + timeout_millis: 100 + "#; + let config = parse_throttle_config(yaml); + assert!(matches!(config, BytesThrottleConfig::Stable { .. })); + if let BytesThrottleConfig::Stable { + bytes_per_second, + timeout_millis, + } = config + { + assert_eq!(timeout_millis, 100); + assert_eq!(bytes_per_second.as_u64(), 10 * 1024 * 1024); + } + } + + #[test] + fn parse_linear_bytes_per_second() { + let yaml = r#" + linear: + initial_bytes_per_second: "10 MiB" + maximum_bytes_per_second: "100 MiB" + rate_of_change: "1 MiB" + "#; + let config = parse_throttle_config(yaml); + assert!(matches!(config, BytesThrottleConfig::Linear { .. })); + if let BytesThrottleConfig::Linear { + initial_bytes_per_second, + maximum_bytes_per_second, + rate_of_change, + } = config + { + assert_eq!(initial_bytes_per_second.as_u64(), 10 * 1024 * 1024); + assert_eq!(maximum_bytes_per_second.as_u64(), 100 * 1024 * 1024); + assert_eq!(rate_of_change.as_u64(), 1 * 1024 * 1024); + } + } + } } diff --git a/lading/src/generator/file_gen/logrotate_fs.rs b/lading/src/generator/file_gen/logrotate_fs.rs index c07d58ffc..a885ecd93 100644 --- a/lading/src/generator/file_gen/logrotate_fs.rs +++ b/lading/src/generator/file_gen/logrotate_fs.rs @@ -477,3 +477,56 @@ impl Filesystem for LogrotateFS { } } } + +#[cfg(test)] +mod tests { + use super::LoadProfile; + use serde::Deserialize; + use serde_yaml::with::singleton_map_recursive; + + #[derive(Debug, Deserialize)] + #[serde(deny_unknown_fields)] + struct Wrapper { + load_profile: LoadProfile, + } + /// Helper to deserialize Wrapper using singleton_map_recursive + /// (matches how the main config deserializes nested enums) + fn parse_wrapper(yaml: &str) -> Wrapper { + let value: serde_yaml::Value = serde_yaml::from_str(yaml).unwrap(); + singleton_map_recursive::deserialize(value).unwrap() + } + + #[test] + fn load_profile_constant_bytes() { + let yaml = r#" + load_profile: + constant: "5 MiB" + "#; + let w = parse_wrapper(yaml); + assert!(matches!(w.load_profile, LoadProfile::Constant(..))); + if let LoadProfile::Constant(bytes) = w.load_profile { + assert_eq!(bytes.as_u64(), 5 * 1024 * 1024); + } + } + + #[test] + fn load_profile_linear_bytes_per_second() { + let yaml = r#" + load_profile: + linear: + initial_bytes_per_second: "10 MiB" + rate: "1 MiB" + "#; + + let w = parse_wrapper(yaml); + assert!(matches!(w.load_profile, LoadProfile::Linear { .. })); + if let LoadProfile::Linear { + initial_bytes_per_second, + rate, + } = w.load_profile + { + assert_eq!(initial_bytes_per_second.as_u64(), 10 * 1024 * 1024); + assert_eq!(rate.as_u64(), 1 * 1024 * 1024); + } + } +}