diff --git a/CHANGELOG.md b/CHANGELOG.md index 9429fba3b..a642368eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 format. - Resize blocks after payload generation so that when a significant portion of the buffer is unused it frees that memory. +- Add support for throttling based on block rate instead of bytes. This can be useful + for generators (like "Static Chunks") that output variably sized blocks intentionally. ## [0.30.0] ## Added diff --git a/lading/src/generator/common.rs b/lading/src/generator/common.rs index 95829e729..9079a28a7 100644 --- a/lading/src/generator/common.rs +++ b/lading/src/generator/common.rs @@ -1,37 +1,108 @@ //! Common types for generators use byte_unit::Byte; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use std::num::{NonZeroU16, NonZeroU32}; -/// Generator-specific throttle configuration with field names that are specific -/// to byte-oriented generators. +/// Unified rate specification as a "one of" - either byte-based or block-based. +#[derive(Debug, Serialize, PartialEq, Clone, Copy)] +#[serde(rename_all = "snake_case")] +#[serde(untagged)] +pub enum RateSpec { + /// Byte-based rate specification + Bytes { + /// Bytes per second + bytes_per_second: Byte, + }, + /// Block-based rate specification + Blocks { + /// Blocks per second + blocks_per_second: NonZeroU32, + }, +} + +// Custom deserialize implementation to support both legacy (direct Byte) and new (RateSpec struct) formats +impl<'de> Deserialize<'de> for RateSpec { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + #[serde(untagged)] + enum RateSpecHelper { + // New format with explicit fields + Bytes { bytes_per_second: Byte }, + Blocks { blocks_per_second: NonZeroU32 }, + // Legacy format: direct Byte value + LegacyByte(Byte), + } + + let helper = RateSpecHelper::deserialize(deserializer)?; + match helper { + RateSpecHelper::Bytes { bytes_per_second } => Ok(RateSpec::Bytes { bytes_per_second }), + RateSpecHelper::Blocks { blocks_per_second } => { + Ok(RateSpec::Blocks { blocks_per_second }) + } + RateSpecHelper::LegacyByte(bytes) => Ok(RateSpec::Bytes { + bytes_per_second: bytes, + }), + } + } +} + +impl RateSpec { + fn resolve(&self) -> Result<(ThrottleMode, NonZeroU32), ThrottleConversionError> { + match self { + RateSpec::Bytes { bytes_per_second } => { + let val = bytes_per_second.as_u128(); + let val = u32::try_from(val) + .map_err(|_| ThrottleConversionError::ValueTooLarge(*bytes_per_second))?; + NonZeroU32::new(val) + .map(|n| (ThrottleMode::Bytes, n)) + .ok_or(ThrottleConversionError::Zero) + } + RateSpec::Blocks { blocks_per_second } => { + Ok((ThrottleMode::Blocks, *blocks_per_second)) + } + } + } +} + +/// Generator-specific throttle configuration unified for bytes or blocks. +/// +/// Note: We intentionally do not use `#[serde(deny_unknown_fields)]` here because the +/// `Stable` variant uses `#[serde(flatten)]` on the `rate` field. Serde's `flatten` +/// attribute is incompatible with `deny_unknown_fields` - the flattened struct consumes +/// fields dynamically, making it impossible for serde to determine what constitutes an +/// "unknown" field during deserialization. #[derive(Debug, Deserialize, Serialize, PartialEq, Clone, Copy)] #[serde(rename_all = "snake_case")] -#[serde(deny_unknown_fields)] -pub enum BytesThrottleConfig { +pub enum ThrottleConfig { /// A throttle that allows the generator to produce as fast as possible AllOut, /// A throttle that attempts stable load Stable { - /// The bytes per second rate limit (e.g., "1MB", "512KiB") - bytes_per_second: Byte, + /// Rate specification (bytes or blocks). + #[serde(flatten)] + rate: RateSpec, /// The timeout in milliseconds for IO operations. Default is 0. #[serde(default)] timeout_millis: u64, }, /// A throttle that linearly increases load over time Linear { - /// The initial bytes per second (e.g., "100KB") - initial_bytes_per_second: Byte, - /// The maximum bytes per second (e.g., "10MB") - maximum_bytes_per_second: Byte, - /// The rate of change in bytes per second per second - rate_of_change: Byte, + /// The initial rate (bytes or blocks per second) + #[serde(alias = "initial_bytes_per_second")] + initial: RateSpec, + /// The maximum rate (bytes or blocks per second) + #[serde(alias = "maximum_bytes_per_second")] + maximum: RateSpec, + /// The rate of change (must be in same units as initial and maximum) + rate_of_change: RateSpec, }, } -/// Error converting `BytesThrottleConfig` to internal throttle config +/// Error converting `ThrottleConfig` to internal throttle config #[derive(Debug, thiserror::Error, Clone, Copy)] pub enum ThrottleConversionError { /// Value exceeds u32 capacity @@ -43,110 +114,172 @@ pub enum ThrottleConversionError { /// Conflicting configuration provided #[error("Cannot specify both throttle config and bytes_per_second")] ConflictingConfig, + /// Missing rate specification + #[error("Rate must be specified for the selected throttle mode")] + MissingRate, + /// Mixed throttle modes in a linear profile + #[error("All rate specs in a linear throttle must use the same mode")] + MixedModes, } -/// Create a throttle from optional config and `bytes_per_second` fallback -/// -/// This function implements the standard throttle creation logic for -/// byte-oriented generators. It handles the interaction between the new -/// `BytesThrottleConfig` and the legacy `bytes_per_second` field. -/// -/// # Decision Logic -/// -/// | `BytesThrottleConfig` | `bytes_per_second` | Result | -/// |---------------------|------------------|--------| -/// | Some(config) | Some(bps) | Error - Conflicting configuration | -/// | Some(config) | None | Use `BytesThrottleConfig` | -/// | None | Some(bps) | Create Stable throttle with `timeout_micros`: 0 | -/// | None | None | `AllOut` throttle (no rate limiting) | -/// -/// # Errors +/// Indicates how a throttle should interpret its token units. +#[derive(Debug, Deserialize, Serialize, PartialEq, Clone, Copy)] +#[serde(rename_all = "snake_case")] +#[serde(deny_unknown_fields)] +pub(super) enum ThrottleMode { + /// Throttle tokens represent bytes. + Bytes, + /// Throttle tokens represent block counts. + Blocks, +} + +/// Wrapper around a throttle and how its tokens should be interpreted. +#[derive(Debug)] +pub(super) struct BlockThrottle { + /// Underlying throttle instance. + inner: lading_throttle::Throttle, + /// Token interpretation mode. + pub mode: ThrottleMode, +} + +impl BlockThrottle { + /// Wait for capacity for a block, interpreting tokens according to `mode`. + pub(super) async fn wait_for_block( + &mut self, + block_cache: &lading_payload::block::Cache, + handle: &lading_payload::block::Handle, + ) -> Result<(), lading_throttle::Error> { + let tokens: NonZeroU32 = match self.mode { + ThrottleMode::Bytes => block_cache.peek_next_size(handle), + ThrottleMode::Blocks => NonZeroU32::new(1).expect("non-zero"), + }; + self.inner.wait_for(tokens).await + } + + /// Divide the underlying throttle capacity by `n`, preserving mode. + pub(super) fn divide(self, n: NonZeroU32) -> Result { + let throttle = self.inner.divide(n)?; + Ok(Self { + inner: throttle, + mode: self.mode, + }) + } + + /// Get the maximum capacity of the underlying throttle in bytes + pub(super) fn maximum_capacity_bytes(&self, maximum_block_size: u32) -> usize { + match self.mode { + ThrottleMode::Bytes => self.inner.maximum_capacity() as usize, + ThrottleMode::Blocks => self + .inner + .maximum_capacity() + .saturating_mul(maximum_block_size) as usize, + } + } +} + +/// Create a throttle from config plus optional legacy bytes-per-second fallback. /// -/// Returns an error if: -/// - Both config and `bytes_per_second` are provided (conflicting configuration) -/// - The `bytes_per_second` value exceeds `u32::MAX` -/// - The `bytes_per_second` value is zero +/// Returns a [`BlockThrottle`] that carries both the throttle and its mode +/// (bytes vs blocks). pub(super) fn create_throttle( - config: Option<&BytesThrottleConfig>, - bytes_per_second: Option<&byte_unit::Byte>, -) -> Result { - let throttle_config = match (config, bytes_per_second) { + config: Option<&ThrottleConfig>, + legacy_bytes_per_second: Option<&byte_unit::Byte>, +) -> Result { + let config_with_fallback = match (config, legacy_bytes_per_second) { (Some(_), Some(_)) => { return Err(ThrottleConversionError::ConflictingConfig); } - (Some(tc), None) => tc.try_into()?, - (None, Some(bps)) => { - let bps_value = bps.as_u128(); - if bps_value > u128::from(u32::MAX) { - return Err(ThrottleConversionError::ValueTooLarge(*bps)); - } - #[allow(clippy::cast_possible_truncation)] - let bps_u32 = NonZeroU32::new(bps_value as u32).ok_or(ThrottleConversionError::Zero)?; - lading_throttle::Config::Stable { - maximum_capacity: bps_u32, - timeout_micros: 0, + (Some(config), None) => *config, + (None, Some(bytes_per_second)) => ThrottleConfig::Stable { + rate: RateSpec::Bytes { + bytes_per_second: *bytes_per_second, + }, + timeout_millis: 0, + }, + (None, None) => ThrottleConfig::AllOut, + }; + + let (throttle, mode) = match config_with_fallback { + ThrottleConfig::AllOut => ( + lading_throttle::Throttle::new_with_config(lading_throttle::Config::AllOut), + ThrottleMode::Bytes, + ), + ThrottleConfig::Stable { + rate, + timeout_millis, + } => { + let (resolved_mode, cap) = rate.resolve()?; + ( + lading_throttle::Throttle::new_with_config(lading_throttle::Config::Stable { + maximum_capacity: cap, + timeout_micros: timeout_millis.saturating_mul(1000), + }), + resolved_mode, + ) + } + ThrottleConfig::Linear { + initial, + maximum, + rate_of_change, + } => { + let (m1, init) = initial.resolve()?; + let (m2, max) = maximum.resolve()?; + let (m3, rate) = rate_of_change.resolve()?; + if m1 != m2 || m1 != m3 { + return Err(ThrottleConversionError::MixedModes); } + ( + lading_throttle::Throttle::new_with_config(lading_throttle::Config::Linear { + initial_capacity: init.get(), + maximum_capacity: max, + rate_of_change: rate.get(), + }), + m1, + ) } - (None, None) => lading_throttle::Config::AllOut, }; - Ok(lading_throttle::Throttle::new_with_config(throttle_config)) + + Ok(BlockThrottle { + inner: throttle, + mode, + }) } -impl TryFrom<&BytesThrottleConfig> for lading_throttle::Config { +impl TryFrom<&ThrottleConfig> for lading_throttle::Config { type Error = ThrottleConversionError; #[allow(clippy::cast_possible_truncation)] - fn try_from(config: &BytesThrottleConfig) -> Result { + fn try_from(config: &ThrottleConfig) -> Result { match config { - BytesThrottleConfig::AllOut => Ok(lading_throttle::Config::AllOut), - BytesThrottleConfig::Stable { - bytes_per_second, + ThrottleConfig::AllOut => Ok(lading_throttle::Config::AllOut), + ThrottleConfig::Stable { + rate, timeout_millis, } => { - let value = bytes_per_second.as_u128(); - if value > u128::from(u32::MAX) { - return Err(ThrottleConversionError::ValueTooLarge(*bytes_per_second)); + let (mode, cap) = rate.resolve()?; + if mode != ThrottleMode::Bytes { + return Err(ThrottleConversionError::MixedModes); } - let value = value as u32; - let value = NonZeroU32::new(value).ok_or(ThrottleConversionError::Zero)?; Ok(lading_throttle::Config::Stable { - maximum_capacity: value, + maximum_capacity: cap, timeout_micros: timeout_millis.saturating_mul(1000), }) } - BytesThrottleConfig::Linear { - initial_bytes_per_second, - maximum_bytes_per_second, + ThrottleConfig::Linear { + initial, + maximum, rate_of_change, } => { - let initial = initial_bytes_per_second.as_u128(); - let maximum = maximum_bytes_per_second.as_u128(); - let rate = rate_of_change.as_u128(); - - if initial > u128::from(u32::MAX) { - return Err(ThrottleConversionError::ValueTooLarge( - *initial_bytes_per_second, - )); - } - if maximum > u128::from(u32::MAX) { - return Err(ThrottleConversionError::ValueTooLarge( - *maximum_bytes_per_second, - )); + let (m1, init) = initial.resolve()?; + let (m2, max) = maximum.resolve()?; + let (m3, rate) = rate_of_change.resolve()?; + if m1 != m2 || m1 != m3 || m1 != ThrottleMode::Bytes { + return Err(ThrottleConversionError::MixedModes); } - if rate > u128::from(u32::MAX) { - return Err(ThrottleConversionError::ValueTooLarge(*rate_of_change)); - } - - let initial = initial as u32; - let maximum = maximum as u32; - let rate = rate as u32; - - let maximum = NonZeroU32::new(maximum).ok_or(ThrottleConversionError::Zero)?; - Ok(lading_throttle::Config::Linear { - initial_capacity: initial, - maximum_capacity: maximum, - rate_of_change: rate, + initial_capacity: init.get(), + maximum_capacity: max, + rate_of_change: rate.get(), }) } } @@ -161,7 +294,7 @@ impl TryFrom<&BytesThrottleConfig> for lading_throttle::Config { /// - Pooled: Multiple concurrent requests with semaphore limiting (HTTP/Splunk /// HEC pattern) /// - Workers: Multiple persistent worker tasks (TCP/UDP/Unix pattern) -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub(super) enum ConcurrencyStrategy { /// Pool of connections with semaphore limiting concurrent requests Pooled { @@ -193,7 +326,7 @@ impl ConcurrencyStrategy { } /// Get the number of parallel connections for this strategy - pub(super) fn connection_count(&self) -> u16 { + pub(super) fn connection_count(self) -> u16 { match self { Self::Pooled { max_connections } => max_connections.get(), Self::Workers { count } => count.get(), @@ -297,13 +430,61 @@ mod tests { assert_eq!(pooled.connection_count(), 1); } + mod rate_spec_parsing { + use crate::generator::common::RateSpec; + + #[test] + fn parse_bytes_new_format() { + let yaml = r#" + bytes_per_second: "10 MiB" + "#; + let rate: RateSpec = serde_yaml::from_str(yaml).unwrap(); + assert!(matches!(rate, RateSpec::Bytes { .. })); + if let RateSpec::Bytes { bytes_per_second } = rate { + assert_eq!(bytes_per_second.as_u64(), 10 * 1024 * 1024); + } + } + + #[test] + fn parse_blocks_new_format() { + let yaml = r#" + blocks_per_second: 1000 + "#; + let rate: RateSpec = serde_yaml::from_str(yaml).unwrap(); + assert!(matches!(rate, RateSpec::Blocks { .. })); + if let RateSpec::Blocks { blocks_per_second } = rate { + assert_eq!(blocks_per_second.get(), 1000); + } + } + + #[test] + fn parse_legacy_direct_byte_value() { + let yaml = r#""500 KiB""#; + let rate: RateSpec = serde_yaml::from_str(yaml).unwrap(); + assert!(matches!(rate, RateSpec::Bytes { .. })); + if let RateSpec::Bytes { bytes_per_second } = rate { + assert_eq!(bytes_per_second.as_u64(), 500 * 1024); + } + } + + #[test] + fn parse_legacy_direct_byte_value_no_quotes() { + let yaml = r#"5 MiB"#; + let rate: RateSpec = serde_yaml::from_str(yaml).unwrap(); + assert!(matches!(rate, RateSpec::Bytes { .. })); + if let RateSpec::Bytes { bytes_per_second } = rate { + assert_eq!(bytes_per_second.as_u64(), 5 * 1024 * 1024); + } + } + } + mod throttle_config_parsing { - use crate::generator::common::BytesThrottleConfig; + use crate::generator::common::{RateSpec, ThrottleConfig}; use serde_yaml::with::singleton_map_recursive; /// Helper to deserialize ThrottleConfig using singleton_map_recursive /// (matches how the main config deserializes it) - fn parse_throttle_config(yaml: &str) -> BytesThrottleConfig { + fn parse_throttle_config(yaml: &str) -> ThrottleConfig { let value: serde_yaml::Value = serde_yaml::from_str(yaml).unwrap(); singleton_map_recursive::deserialize(value).unwrap() } @@ -312,7 +493,7 @@ mod tests { fn parse_all_out() { let yaml = r#"all_out"#; let config = parse_throttle_config(yaml); - assert!(matches!(config, BytesThrottleConfig::AllOut)); + assert!(matches!(config, ThrottleConfig::AllOut)); } #[test] @@ -323,14 +504,39 @@ mod tests { timeout_millis: 100 "#; let config = parse_throttle_config(yaml); - assert!(matches!(config, BytesThrottleConfig::Stable { .. })); - if let BytesThrottleConfig::Stable { - bytes_per_second, + assert!(matches!(config, ThrottleConfig::Stable { .. })); + if let ThrottleConfig::Stable { + rate, timeout_millis, } = config { assert_eq!(timeout_millis, 100); - assert_eq!(bytes_per_second.as_u64(), 10 * 1024 * 1024); + assert!(matches!(rate, RateSpec::Bytes { .. })); + if let RateSpec::Bytes { bytes_per_second } = rate { + assert_eq!(bytes_per_second.as_u64(), 10 * 1024 * 1024); + } + } + } + + #[test] + fn parse_stable_blocks_per_second() { + let yaml = r#" + stable: + blocks_per_second: 1000 + timeout_millis: 0 + "#; + let config = parse_throttle_config(yaml); + assert!(matches!(config, ThrottleConfig::Stable { .. })); + if let ThrottleConfig::Stable { + rate, + timeout_millis, + } = config + { + assert_eq!(timeout_millis, 0); + assert!(matches!(rate, RateSpec::Blocks { .. })); + if let RateSpec::Blocks { blocks_per_second } = rate { + assert_eq!(blocks_per_second.get(), 1000); + } } } @@ -343,17 +549,268 @@ mod tests { rate_of_change: "1 MiB" "#; let config = parse_throttle_config(yaml); - assert!(matches!(config, BytesThrottleConfig::Linear { .. })); - if let BytesThrottleConfig::Linear { - initial_bytes_per_second, - maximum_bytes_per_second, + assert!(matches!(config, ThrottleConfig::Linear { .. })); + if let ThrottleConfig::Linear { + initial, + maximum, + rate_of_change, + } = config + { + assert!(matches!(initial, RateSpec::Bytes { .. })); + assert!(matches!(maximum, RateSpec::Bytes { .. })); + assert!(matches!(rate_of_change, RateSpec::Bytes { .. })); + + if let RateSpec::Bytes { bytes_per_second } = initial { + assert_eq!(bytes_per_second.as_u64(), 10 * 1024 * 1024); + } + if let RateSpec::Bytes { bytes_per_second } = maximum { + assert_eq!(bytes_per_second.as_u64(), 100 * 1024 * 1024); + } + if let RateSpec::Bytes { bytes_per_second } = rate_of_change { + assert_eq!(bytes_per_second.as_u64(), 1 * 1024 * 1024); + } + } + } + + #[test] + fn parse_linear_new_format() { + let yaml = r#" + linear: + initial: + bytes_per_second: "1 MiB" + maximum: + bytes_per_second: "50 MiB" + rate_of_change: + bytes_per_second: "500 KiB" + "#; + let config = parse_throttle_config(yaml); + assert!(matches!(config, ThrottleConfig::Linear { .. })); + if let ThrottleConfig::Linear { + initial, + maximum, + rate_of_change, + } = config + { + assert!(matches!(initial, RateSpec::Bytes { .. })); + assert!(matches!(maximum, RateSpec::Bytes { .. })); + assert!(matches!(rate_of_change, RateSpec::Bytes { .. })); + + if let RateSpec::Bytes { bytes_per_second } = initial { + assert_eq!(bytes_per_second.as_u64(), 1 * 1024 * 1024); + } + if let RateSpec::Bytes { bytes_per_second } = maximum { + assert_eq!(bytes_per_second.as_u64(), 50 * 1024 * 1024); + } + if let RateSpec::Bytes { bytes_per_second } = rate_of_change { + assert_eq!(bytes_per_second.as_u64(), 500 * 1024); + } + } + } + + #[test] + fn parse_linear_new_format_flattened_bytes() { + let yaml = r#" + linear: + initial: "1 MiB" + maximum: "50 MiB" + rate_of_change: "500 KiB" + "#; + let config = parse_throttle_config(yaml); + assert!(matches!(config, ThrottleConfig::Linear { .. })); + if let ThrottleConfig::Linear { + initial, + maximum, + rate_of_change, + } = config + { + assert!(matches!(initial, RateSpec::Bytes { .. })); + assert!(matches!(maximum, RateSpec::Bytes { .. })); + assert!(matches!(rate_of_change, RateSpec::Bytes { .. })); + + if let RateSpec::Bytes { bytes_per_second } = initial { + assert_eq!(bytes_per_second.as_u64(), 1 * 1024 * 1024); + } + if let RateSpec::Bytes { bytes_per_second } = maximum { + assert_eq!(bytes_per_second.as_u64(), 50 * 1024 * 1024); + } + if let RateSpec::Bytes { bytes_per_second } = rate_of_change { + assert_eq!(bytes_per_second.as_u64(), 500 * 1024); + } + } + } + + #[test] + fn parse_linear_new_format_blocks() { + let yaml = r#" + linear: + initial: + blocks_per_second: 100 + maximum: + blocks_per_second: 5000 + rate_of_change: + blocks_per_second: 50 + "#; + let config = parse_throttle_config(yaml); + assert!(matches!(config, ThrottleConfig::Linear { .. })); + if let ThrottleConfig::Linear { + initial, + maximum, rate_of_change, } = config { - assert_eq!(initial_bytes_per_second.as_u64(), 10 * 1024 * 1024); - assert_eq!(maximum_bytes_per_second.as_u64(), 100 * 1024 * 1024); - assert_eq!(rate_of_change.as_u64(), 1 * 1024 * 1024); + assert!(matches!(initial, RateSpec::Blocks { .. })); + assert!(matches!(maximum, RateSpec::Blocks { .. })); + assert!(matches!(rate_of_change, RateSpec::Blocks { .. })); + + if let RateSpec::Blocks { blocks_per_second } = initial { + assert_eq!(blocks_per_second.get(), 100); + } + if let RateSpec::Blocks { blocks_per_second } = maximum { + assert_eq!(blocks_per_second.get(), 5000); + } + if let RateSpec::Blocks { blocks_per_second } = rate_of_change { + assert_eq!(blocks_per_second.get(), 50); + } } } } + + mod rate_spec_resolve { + use crate::generator::common::{RateSpec, ThrottleConversionError, ThrottleMode}; + use std::num::NonZeroU32; + + #[test] + fn resolve_bytes_rate() { + let rate = RateSpec::Bytes { + bytes_per_second: byte_unit::Byte::from_u64(1024 * 1024), + }; + let result = rate.resolve(); + assert!(result.is_ok()); + let (mode, capacity) = result.unwrap(); + assert_eq!(mode, ThrottleMode::Bytes); + assert_eq!(capacity.get(), 1024 * 1024); + } + + #[test] + fn resolve_blocks_rate() { + let rate = RateSpec::Blocks { + blocks_per_second: NonZeroU32::new(1000).unwrap(), + }; + let result = rate.resolve(); + assert!(result.is_ok()); + let (mode, capacity) = result.unwrap(); + assert_eq!(mode, ThrottleMode::Blocks); + assert_eq!(capacity.get(), 1000); + } + + #[test] + fn resolve_zero_bytes_fails() { + let rate = RateSpec::Bytes { + bytes_per_second: byte_unit::Byte::from_u64(0), + }; + let result = rate.resolve(); + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), ThrottleConversionError::Zero)); + } + } + + mod create_throttle_tests { + use crate::generator::common::{ + RateSpec, ThrottleConfig, ThrottleConversionError, ThrottleMode, create_throttle, + }; + use std::num::NonZeroU32; + use std::str::FromStr; + + #[test] + fn create_throttle_with_stable_bytes() { + let config = ThrottleConfig::Stable { + rate: RateSpec::Bytes { + bytes_per_second: byte_unit::Byte::from_str("10 MiB").unwrap(), + }, + timeout_millis: 100, + }; + let result = create_throttle(Some(&config), None); + assert!(result.is_ok()); + let throttle = result.unwrap(); + assert_eq!(throttle.mode, ThrottleMode::Bytes); + } + + #[test] + fn create_throttle_with_stable_blocks() { + let config = ThrottleConfig::Stable { + rate: RateSpec::Blocks { + blocks_per_second: NonZeroU32::new(1000).unwrap(), + }, + timeout_millis: 0, + }; + let result = create_throttle(Some(&config), None); + assert!(result.is_ok()); + let throttle = result.unwrap(); + assert_eq!(throttle.mode, ThrottleMode::Blocks); + } + + #[test] + fn create_throttle_with_legacy_bytes_per_second() { + let legacy = byte_unit::Byte::from_str("5 MiB").unwrap(); + let result = create_throttle(None, Some(&legacy)); + assert!(result.is_ok()); + let throttle = result.unwrap(); + assert_eq!(throttle.mode, ThrottleMode::Bytes); + } + + #[test] + fn create_throttle_conflicting_config() { + let config = ThrottleConfig::Stable { + rate: RateSpec::Bytes { + bytes_per_second: byte_unit::Byte::from_str("10 MiB").unwrap(), + }, + timeout_millis: 100, + }; + let legacy = byte_unit::Byte::from_str("5 MiB").unwrap(); + let result = create_throttle(Some(&config), Some(&legacy)); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + ThrottleConversionError::ConflictingConfig + )); + } + + #[test] + fn create_throttle_linear_mixed_modes_fails() { + let config = ThrottleConfig::Linear { + initial: RateSpec::Bytes { + bytes_per_second: byte_unit::Byte::from_str("1 MiB").unwrap(), + }, + maximum: RateSpec::Blocks { + blocks_per_second: NonZeroU32::new(1000).unwrap(), + }, + rate_of_change: RateSpec::Bytes { + bytes_per_second: byte_unit::Byte::from_str("100 KiB").unwrap(), + }, + }; + let result = create_throttle(Some(&config), None); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + ThrottleConversionError::MixedModes + )); + } + + #[test] + fn create_throttle_all_out() { + let config = ThrottleConfig::AllOut; + let result = create_throttle(Some(&config), None); + assert!(result.is_ok()); + let throttle = result.unwrap(); + assert_eq!(throttle.mode, ThrottleMode::Bytes); + } + + #[test] + fn create_throttle_none_defaults_to_all_out() { + let result = create_throttle(None, None); + assert!(result.is_ok()); + let throttle = result.unwrap(); + assert_eq!(throttle.mode, ThrottleMode::Bytes); + } + } } diff --git a/lading/src/generator/file_gen/logrotate.rs b/lading/src/generator/file_gen/logrotate.rs index 77101131e..8588bd9b0 100644 --- a/lading/src/generator/file_gen/logrotate.rs +++ b/lading/src/generator/file_gen/logrotate.rs @@ -36,7 +36,7 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; /// An enum to allow us to determine what operation caused an IO errror as the @@ -147,8 +147,9 @@ pub struct Config { /// Whether to use a fixed or streaming block cache #[serde(default = "lading_payload::block::default_cache_method")] block_cache_method: block::CacheMethod, - /// The load throttle configuration - pub throttle: Option, + /// Throughput profile controlling emission rate (bytes or blocks). + #[serde(default)] + pub throttle: Option, } #[derive(Debug)] @@ -214,7 +215,7 @@ impl Server { let mut handles = Vec::new(); for idx in 0..config.concurrent_logs { - let throttle = + let throughput_throttle = create_throttle(config.throttle.as_ref(), config.bytes_per_second.as_ref())?; let mut dir_path = config.root.clone(); @@ -234,8 +235,9 @@ impl Server { &basename, config.total_rotations, maximum_bytes_per_log, + maximum_block_size.get(), Arc::clone(&block_cache), - throttle, + throughput_throttle, shutdown.clone(), child_labels, ); @@ -283,8 +285,9 @@ struct Child { names: Vec, // The soft limit bytes per file that will trigger a rotation. maximum_bytes_per_log: NonZeroU32, + maximum_block_size: u32, block_cache: Arc, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, shutdown: lading_signal::Watcher, labels: Vec<(String, String)>, } @@ -295,8 +298,9 @@ impl Child { basename: &Path, total_rotations: u8, maximum_bytes_per_log: NonZeroU32, + maximum_block_size: u32, block_cache: Arc, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, shutdown: lading_signal::Watcher, labels: Vec<(String, String)>, ) -> Self { @@ -316,6 +320,7 @@ impl Child { Self { names, maximum_bytes_per_log, + maximum_block_size, block_cache, throttle, shutdown, @@ -324,7 +329,10 @@ impl Child { } async fn spin(mut self) -> Result<(), Error> { - let buffer_capacity = self.throttle.maximum_capacity() as usize; + let mut handle = self.block_cache.handle(); + let buffer_capacity = self + .throttle + .maximum_capacity_bytes(self.maximum_block_size); let mut total_bytes_written: u64 = 0; let maximum_bytes_per_log: u64 = u64::from(self.maximum_bytes_per_log.get()); @@ -357,21 +365,16 @@ impl Child { })?, ); - let mut handle = self.block_cache.handle(); - let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); loop { // SAFETY: By construction the block cache will never be empty // except in the event of a catastrophic failure. - let total_bytes = self.block_cache.peek_next_size(&handle); - tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { - let block = self.block_cache.advance(&mut handle); - write_bytes(block, + write_bytes(self.block_cache.advance(&mut handle), &mut fp, &mut total_bytes_written, buffer_capacity, diff --git a/lading/src/generator/file_gen/logrotate_fs.rs b/lading/src/generator/file_gen/logrotate_fs.rs index a885ecd93..f9d203050 100644 --- a/lading/src/generator/file_gen/logrotate_fs.rs +++ b/lading/src/generator/file_gen/logrotate_fs.rs @@ -5,6 +5,7 @@ #![allow(clippy::cast_possible_wrap)] use crate::generator; +use crate::generator::common::{RateSpec, ThrottleConversionError, ThrottleMode}; use fuser::{ BackgroundSession, FileAttr, Filesystem, MountOption, ReplyAttr, ReplyData, ReplyDirectory, ReplyEntry, Request, spawn_mount2, @@ -55,7 +56,7 @@ pub struct Config { maximum_block_size: byte_unit::Byte, /// The mount-point for this filesystem mount_point: PathBuf, - /// The load profile, controlling bytes per second as a function of time. + /// The load profile, controlling bytes or blocks per second as a function of time. load_profile: LoadProfile, } @@ -63,33 +64,66 @@ pub struct Config { #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)] #[serde(rename_all = "snake_case")] pub enum LoadProfile { - /// Constant bytes per second - Constant(byte_unit::Byte), - /// Linear growth of bytes per second + /// Constant rate (bytes or blocks per second). + Constant(RateSpec), + /// Linear growth of rate (bytes or blocks per second). Linear { - /// Starting point for bytes per second - initial_bytes_per_second: byte_unit::Byte, - /// Amount to increase per second - rate: byte_unit::Byte, + /// Starting point for the rate. + #[serde(alias = "initial_bytes_per_second")] + initial: RateSpec, + /// Amount to increase per second. + rate: RateSpec, }, } impl LoadProfile { - fn to_model(self) -> model::LoadProfile { + fn to_model(self) -> Result { // For now, one tick is one second. match self { - LoadProfile::Constant(bpt) => model::LoadProfile::Constant(bpt.as_u128() as u64), - LoadProfile::Linear { - initial_bytes_per_second, - rate, - } => model::LoadProfile::Linear { - start: initial_bytes_per_second.as_u128() as u64, - rate: rate.as_u128() as u64, - }, + LoadProfile::Constant(rate) => { + let (mode, cap) = resolve_rate(&rate)?; + match mode { + ThrottleMode::Bytes => Ok(model::LoadProfile::Constant(u64::from(cap.get()))), + ThrottleMode::Blocks => Ok(model::LoadProfile::Blocks { + blocks_per_tick: u64::from(cap.get()), + }), + } + } + LoadProfile::Linear { initial, rate } => { + let (m1, init) = resolve_rate(&initial)?; + let (m2, rate) = resolve_rate(&rate)?; + if m1 != m2 { + return Err(ThrottleConversionError::MixedModes); + } + match m1 { + ThrottleMode::Bytes => Ok(model::LoadProfile::Linear { + start: u64::from(init.get()), + rate: u64::from(rate.get()), + }), + ThrottleMode::Blocks => Ok(model::LoadProfile::BlocksLinear { + start: u64::from(init.get()), + rate: u64::from(rate.get()), + }), + } + } } } } +fn resolve_rate(rate: &RateSpec) -> Result<(ThrottleMode, NonZeroU32), ThrottleConversionError> { + match rate { + RateSpec::Bytes { bytes_per_second } => { + let val = bytes_per_second.as_u128(); + let val = u32::try_from(val) + .map_err(|_| ThrottleConversionError::ValueTooLarge(*bytes_per_second))?; + NonZeroU32::new(val) + .map(|n| (ThrottleMode::Bytes, n)) + .ok_or(ThrottleConversionError::Zero) + } + RateSpec::Blocks { blocks_per_second } => Ok((ThrottleMode::Blocks, *blocks_per_second)), + } +} + #[derive(thiserror::Error, Debug)] /// Error for `LogrotateFs` pub enum Error { @@ -99,6 +133,9 @@ pub enum Error { /// Creation of payload blocks failed. #[error("Block creation error: {0}")] Block(#[from] block::Error), + /// Throttle conversion error + #[error("Throttle configuration error: {0}")] + ThrottleConversion(#[from] ThrottleConversionError), /// Failed to convert, value is 0 #[error("Value provided must not be zero")] Zero, @@ -154,10 +191,18 @@ impl Server { // divvy this up in the future. total_bytes.get() as usize, )?; + let load_profile = config.load_profile.to_model()?; let start_time = Instant::now(); let start_time_system = SystemTime::now(); + let block_cache_size = block_cache.total_size(); + info!( + "LogrotateFS block cache initialized: requested={}, actual={} bytes, blocks={}", + config.maximum_prebuild_cache_size_bytes, + block_cache_size, + block_cache.len() + ); let state = model::State::new( &mut rng, start_time.elapsed().as_secs(), @@ -166,7 +211,7 @@ impl Server { block_cache, config.max_depth, config.concurrent_logs, - config.load_profile.to_model(), + load_profile, ); info!( @@ -481,6 +526,7 @@ impl Filesystem for LogrotateFS { #[cfg(test)] mod tests { use super::LoadProfile; + use crate::generator::common::RateSpec; use serde::Deserialize; use serde_yaml::with::singleton_map_recursive; @@ -504,8 +550,28 @@ mod tests { "#; let w = parse_wrapper(yaml); assert!(matches!(w.load_profile, LoadProfile::Constant(..))); - if let LoadProfile::Constant(bytes) = w.load_profile { - assert_eq!(bytes.as_u64(), 5 * 1024 * 1024); + if let LoadProfile::Constant(rate) = w.load_profile { + assert!(matches!(rate, RateSpec::Bytes { .. })); + if let RateSpec::Bytes { bytes_per_second } = rate { + assert_eq!(bytes_per_second.as_u64(), 5 * 1024 * 1024); + } + } + } + + #[test] + fn load_profile_constant_blocks_per_second() { + let yaml = r#" + load_profile: + constant: + blocks_per_second: 100 + "#; + let w = parse_wrapper(yaml); + assert!(matches!(w.load_profile, LoadProfile::Constant(_))); + if let LoadProfile::Constant(rate) = w.load_profile { + assert!(matches!(rate, RateSpec::Blocks { .. })); + if let RateSpec::Blocks { blocks_per_second } = rate { + assert_eq!(blocks_per_second.get(), 100); + } } } @@ -520,13 +586,61 @@ mod tests { let w = parse_wrapper(yaml); assert!(matches!(w.load_profile, LoadProfile::Linear { .. })); - if let LoadProfile::Linear { - initial_bytes_per_second, - rate, - } = w.load_profile - { - assert_eq!(initial_bytes_per_second.as_u64(), 10 * 1024 * 1024); - assert_eq!(rate.as_u64(), 1 * 1024 * 1024); + if let LoadProfile::Linear { initial, rate } = w.load_profile { + assert!(matches!(initial, RateSpec::Bytes { .. })); + if let RateSpec::Bytes { bytes_per_second } = initial { + assert_eq!(bytes_per_second.as_u64(), 10 * 1024 * 1024); + } + assert!(matches!(rate, RateSpec::Bytes { .. })); + if let RateSpec::Bytes { bytes_per_second } = rate { + assert_eq!(bytes_per_second.as_u64(), 1 * 1024 * 1024); + } + } + } + + #[test] + fn load_profile_linear_new_format_flattened_bytes() { + let yaml = r#" + load_profile: + linear: + initial: "1 MiB" + rate: "100 KiB" + "#; + let w = parse_wrapper(yaml); + assert!(matches!(w.load_profile, LoadProfile::Linear { .. })); + if let LoadProfile::Linear { initial, rate } = w.load_profile { + assert!(matches!(initial, RateSpec::Bytes { .. })); + if let RateSpec::Bytes { bytes_per_second } = initial { + assert_eq!(bytes_per_second.as_u64(), 1 * 1024 * 1024); + } + assert!(matches!(rate, RateSpec::Bytes { .. })); + if let RateSpec::Bytes { bytes_per_second } = rate { + assert_eq!(bytes_per_second.as_u64(), 100 * 1024); + } + } + } + + #[test] + fn load_profile_linear_blocks_per_second() { + let yaml = r#" + load_profile: + linear: + initial: + blocks_per_second: 100 + rate: + blocks_per_second: 10 + "#; + let w = parse_wrapper(yaml); + assert!(matches!(w.load_profile, LoadProfile::Linear { .. })); + if let LoadProfile::Linear { initial, rate } = w.load_profile { + assert!(matches!(initial, RateSpec::Blocks { .. })); + if let RateSpec::Blocks { blocks_per_second } = initial { + assert_eq!(blocks_per_second.get(), 100); + } + assert!(matches!(rate, RateSpec::Blocks { .. })); + if let RateSpec::Blocks { blocks_per_second } = rate { + assert_eq!(blocks_per_second.get(), 10); + } } } } diff --git a/lading/src/generator/file_gen/logrotate_fs/model.rs b/lading/src/generator/file_gen/logrotate_fs/model.rs index bc5c7d9d9..9b2c593e9 100644 --- a/lading/src/generator/file_gen/logrotate_fs/model.rs +++ b/lading/src/generator/file_gen/logrotate_fs/model.rs @@ -13,9 +13,20 @@ pub(crate) type Tick = u64; /// The identification node number pub(crate) type Inode = usize; +/// Parameters describing a file's position in the rotation hierarchy +#[derive(Debug, Clone, Copy)] +pub(crate) struct FileHierarchy { + /// The parent node of this file + pub(crate) parent: Inode, + /// The peer of this file (next in rotation sequence) + pub(crate) peer: Option, + /// The group ID shared by all files in the same rotation group + pub(crate) group_id: u16, +} + /// Model representation of a `File`. Does not actually contain any bytes but /// stores sufficient metadata to determine access patterns over time. -#[derive(Debug, Clone)] +#[derive(Debug)] pub(crate) struct File { /// The parent `Node` of this `File`. parent: Inode, @@ -82,6 +93,9 @@ pub(crate) struct File { /// starting positions in the cache. cache_offset: u64, + /// Handle for iterating block sizes when simulating block-based writes. + block_handle: block::Handle, + /// The random number generator used to generate the cache offset. rng: SmallRng, } @@ -112,20 +126,34 @@ pub(crate) fn generate_cache_offset(rng: &mut SmallRng, total_cache_size: u64) - rng.random_range(0..total_cache_size) } +fn generate_block_handle(rng: &mut R, block_cache: &block::Cache) -> block::Handle +where + R: Rng + ?Sized, +{ + let mut handle = block_cache.handle(); + let len = block_cache.len(); + if len == 0 { + return handle; + } + let offset = rng.random_range(0..len); + for _ in 0..offset { + let _ = block_cache.advance(&mut handle); + } + handle +} impl File { /// Create a new instance of `File` pub(crate) fn new( mut rng: SmallRng, - parent: Inode, - group_id: u16, + hierarchy: FileHierarchy, bytes_per_tick: u64, now: Tick, - peer: Option, total_cache_size: u64, + block_handle: block::Handle, ) -> Self { let cache_offset = generate_cache_offset(&mut rng, total_cache_size); Self { - parent, + parent: hierarchy.parent, bytes_written: 0, bytes_read: 0, access_tick: now, @@ -136,12 +164,13 @@ impl File { read_only: false, read_only_since: None, ordinal: 0, - peer, - group_id, + peer: hierarchy.peer, + group_id: hierarchy.group_id, open_handles: 0, unlinked: false, max_offset_observed: 0, cache_offset, + block_handle, rng, } } @@ -295,6 +324,18 @@ pub(crate) enum LoadProfile { /// Amount to increase per tick rate: u64, }, + /// Constant blocks per tick + Blocks { + /// Blocks per tick + blocks_per_tick: u64, + }, + /// Linear growth of blocks per tick + BlocksLinear { + /// Starting point for blocks per tick + start: u64, + /// Amount to increase per tick + rate: u64, + }, } /// The state of the filesystem @@ -505,16 +546,21 @@ impl State { // Generate a new SmallRng instance from the states rng to be used in deterministic offset generation let child_seed: [u8; 32] = rng.random(); - let child_rng = SmallRng::from_seed(child_seed); + let mut child_rng = SmallRng::from_seed(child_seed); + let block_handle = generate_block_handle(&mut child_rng, &state.block_cache); + let hierarchy = FileHierarchy { + parent: current_inode, + peer: None, + group_id, + }; let file = File::new( child_rng, - current_inode, - group_id, + hierarchy, 0, state.now, - None, state.total_cache_size, + block_handle, ); state.nodes.insert(file_inode, Node::File { file }); @@ -581,20 +627,41 @@ impl State { } } + fn blocks_to_bytes( + block_cache: &block::Cache, + blocks_len: u64, + total_cache_size: u64, + handle: &mut block::Handle, + blocks_per_tick: u64, + ) -> u64 { + if blocks_per_tick == 0 { + return 0; + } + if blocks_len == 0 { + return 0; + } + let cycles = blocks_per_tick / blocks_len; + let remainder = blocks_per_tick % blocks_len; + let mut bytes = total_cache_size.saturating_mul(cycles); + for _ in 0..remainder { + let size = block_cache.peek_next_size(handle); + bytes = bytes.saturating_add(u64::from(size.get())); + let _ = block_cache.advance(handle); + } + bytes + } + #[inline] #[allow(clippy::too_many_lines)] fn advance_time_inner(&mut self, now: Tick) { assert!(now >= self.now); - // Compute new global bytes_per_tick, at now - 1. + // Compute new global throughput, at now - 1. let elapsed_ticks = now.saturating_sub(self.initial_tick).saturating_sub(1); - let bytes_per_tick = match &self.load_profile { - LoadProfile::Constant(bytes) => *bytes, - LoadProfile::Linear { start, rate } => { - start.saturating_add(rate.saturating_mul(elapsed_ticks)) - } - }; + let block_cache = &self.block_cache; + let blocks_len = block_cache.len() as u64; + let total_cache_size = block_cache.total_size(); // Update each File's bytes_per_tick but do not advance time, as that is // done later. for node in self.nodes.values_mut() { @@ -602,7 +669,30 @@ impl State { && !file.read_only && !file.unlinked { - file.bytes_per_tick = bytes_per_tick; + file.bytes_per_tick = match &self.load_profile { + LoadProfile::Constant(bytes) => *bytes, + LoadProfile::Linear { start, rate } => { + start.saturating_add(rate.saturating_mul(elapsed_ticks)) + } + LoadProfile::Blocks { blocks_per_tick } => Self::blocks_to_bytes( + block_cache, + blocks_len, + total_cache_size, + &mut file.block_handle, + *blocks_per_tick, + ), + LoadProfile::BlocksLinear { start, rate } => { + let blocks_per_tick = + start.saturating_add(rate.saturating_mul(elapsed_ticks)); + Self::blocks_to_bytes( + block_cache, + blocks_len, + total_cache_size, + &mut file.block_handle, + blocks_per_tick, + ) + } + }; } } @@ -611,7 +701,15 @@ impl State { } for inode in self.inode_scratch.drain(..) { - let (rotated_inode, parent_inode, group_id, ordinal, file_rng, cache_offset) = { + let ( + rotated_inode, + parent_inode, + group_id, + ordinal, + file_rng, + cache_offset, + bytes_per_tick, + ) = { // If the node pointed to by inode doesn't exist, that's a // catastrophic programming error. We just copied all inode to node // pairs. @@ -656,6 +754,7 @@ impl State { file.ordinal, file.rng.clone(), file.cache_offset, + file.bytes_per_tick, ) }; @@ -666,14 +765,20 @@ impl State { // Set bytes_per_tick to current and now to now-1 else we'll never // ramp properly. let new_file_inode = self.next_inode; + let mut file_rng = file_rng; + let block_handle = generate_block_handle(&mut file_rng, &self.block_cache); + let hierarchy = FileHierarchy { + parent: parent_inode, + peer: Some(rotated_inode), + group_id, + }; let mut new_file = File::new( file_rng, - parent_inode, - group_id, + hierarchy, bytes_per_tick, self.now.saturating_sub(1), - Some(rotated_inode), self.total_cache_size, + block_handle, ); let new_file_cache_offset = new_file.cache_offset; @@ -1277,20 +1382,25 @@ mod test { } // Property 7: bytes_written are tick accurate - for (&inode, node) in &state.nodes { - if let Node::File { file } = node { - let end_tick = file.read_only_since.unwrap_or(state.now); - let expected_bytes = compute_expected_bytes_written( - &state.load_profile, - state.initial_tick, - file.created_tick, - end_tick, - ); - assert_eq!( - file.bytes_written, expected_bytes, - "bytes_written ({}) does not match expected_bytes_written ({expected_bytes}) for file with inode {inode}", - file.bytes_written, - ); + if !matches!( + state.load_profile, + LoadProfile::Blocks { .. } | LoadProfile::BlocksLinear { .. } + ) { + for (&inode, node) in &state.nodes { + if let Node::File { file } = node { + let end_tick = file.read_only_since.unwrap_or(state.now); + let expected_bytes = compute_expected_bytes_written( + &state.load_profile, + state.initial_tick, + file.created_tick, + end_tick, + ); + assert_eq!( + file.bytes_written, expected_bytes, + "bytes_written ({}) does not match expected_bytes_written ({expected_bytes}) for file with inode {inode}", + file.bytes_written, + ); + } } } @@ -1392,6 +1502,7 @@ mod test { .saturating_add(rate.saturating_mul(sum_of_terms)); total_bytes } + LoadProfile::Blocks { .. } | LoadProfile::BlocksLinear { .. } => 0, } } diff --git a/lading/src/generator/file_gen/traditional.rs b/lading/src/generator/file_gen/traditional.rs index 02b246670..bc55f15b5 100644 --- a/lading/src/generator/file_gen/traditional.rs +++ b/lading/src/generator/file_gen/traditional.rs @@ -37,7 +37,7 @@ use lading_payload::{self, block}; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; #[derive(thiserror::Error, Debug)] @@ -103,6 +103,7 @@ pub struct Config { /// written _continuously_ per second from this target. Higher bursts are /// possible as the internal governor accumulates, up to /// `maximum_bytes_burst`. + #[deprecated(note = "Use throttle.stable.bytes_per_second instead")] bytes_per_second: Option, /// Defines the maximum internal cache of this log target. `file_gen` will /// pre-build its outputs up to the byte capacity specified here. @@ -119,7 +120,7 @@ pub struct Config { #[serde(default = "default_rotation")] rotate: bool, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } #[derive(Debug)] @@ -166,6 +167,9 @@ impl Server { let file_index = Arc::new(AtomicU32::new(0)); for _ in 0..config.duplicates { + // Accept the deprecated `bytes_per_second` path for legacy configs while + // newer callers migrate to `throttle`. + #[allow(deprecated)] let throttle = create_throttle(config.throttle.as_ref(), config.bytes_per_second.as_ref())?; @@ -192,6 +196,7 @@ impl Server { maximum_bytes_per_file, throttle, block_cache: Arc::new(block_cache), + maximum_block_size: maximum_block_size as u32, file_index: Arc::clone(&file_index), rotate: config.rotate, shutdown: shutdown.clone(), @@ -264,22 +269,29 @@ impl Server { struct Child { path_template: String, maximum_bytes_per_file: NonZeroU32, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: Arc, + maximum_block_size: u32, rotate: bool, file_index: Arc, shutdown: lading_signal::Watcher, } impl Child { + #[allow(clippy::too_many_lines)] pub(crate) async fn spin(mut self) -> Result<(), Error> { - let buffer_capacity = self.throttle.maximum_capacity() as usize; let mut total_bytes_written: u64 = 0; let maximum_bytes_per_file: u64 = u64::from(self.maximum_bytes_per_file.get()); let mut file_index = self.file_index.fetch_add(1, Ordering::Relaxed); let mut path = path_from_template(&self.path_template, file_index); + let mut handle = self.block_cache.handle(); + // Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity + // (converted to bytes if necessary) to approximate flush every second. + let buffer_capacity = self + .throttle + .maximum_capacity_bytes(self.maximum_block_size); let mut fp = BufWriter::with_capacity( buffer_capacity, fs::OpenOptions::new() @@ -298,20 +310,15 @@ impl Child { } })?, ); - - let mut handle = self.block_cache.handle(); - let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); loop { - let total_bytes = self.block_cache.peek_next_size(&handle); - tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { let block = self.block_cache.advance(&mut handle); - let total_bytes = u64::from(total_bytes.get()); + let total_bytes = u64::from(block.total_bytes.get()); { fp.write_all(&block.bytes).await?; diff --git a/lading/src/generator/grpc.rs b/lading/src/generator/grpc.rs index 8cda3eab1..12b449074 100644 --- a/lading/src/generator/grpc.rs +++ b/lading/src/generator/grpc.rs @@ -36,7 +36,7 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; /// Errors produced by [`Grpc`] @@ -115,7 +115,7 @@ pub struct Config { /// The total number of parallel connections to maintain pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } /// No-op tonic codec. Sends raw bytes and returns the number of bytes received. @@ -175,7 +175,7 @@ pub struct Grpc { target_uri: Uri, rpc_path: PathAndQuery, shutdown: lading_signal::Watcher, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: block::Cache, metric_labels: Vec<(String, String)>, } @@ -304,10 +304,9 @@ impl Grpc { let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); loop { - let total_bytes = self.block_cache.peek_next_size(&handle); - tokio::select! { - _ = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { + let _ = result; let block = self.block_cache.advance(&mut handle); let block_length = block.bytes.len(); counter!("requests_sent", &self.metric_labels).increment(1); diff --git a/lading/src/generator/http.rs b/lading/src/generator/http.rs index c94ce97d7..efdf068bf 100644 --- a/lading/src/generator/http.rs +++ b/lading/src/generator/http.rs @@ -30,7 +30,7 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, ConcurrencyStrategy, MetricsBuilder, ThrottleConversionError, + BlockThrottle, ConcurrencyStrategy, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; @@ -75,7 +75,7 @@ pub struct Config { /// The total number of parallel connections to maintain pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } #[derive(thiserror::Error, Debug)] @@ -123,7 +123,7 @@ pub struct Http { method: hyper::Method, headers: hyper::HeaderMap, concurrency: ConcurrencyStrategy, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: Arc, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -225,9 +225,8 @@ impl Http { let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); loop { - let total_bytes = self.block_cache.peek_next_size(&handle); tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { let block = self.block_cache.advance(&mut handle); diff --git a/lading/src/generator/passthru_file.rs b/lading/src/generator/passthru_file.rs index 5baddd096..8eca8f5bc 100644 --- a/lading/src/generator/passthru_file.rs +++ b/lading/src/generator/passthru_file.rs @@ -14,7 +14,6 @@ use std::{num::NonZeroU32, path::PathBuf, time::Duration}; use tokio::{fs, io::AsyncWriteExt}; use byte_unit::Byte; -use lading_throttle::Throttle; use metrics::{counter, gauge}; use rand::{SeedableRng, rngs::StdRng}; use serde::{Deserialize, Serialize}; @@ -24,7 +23,7 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; #[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] @@ -45,7 +44,7 @@ pub struct Config { /// The maximum size in bytes of the cache of prebuilt messages pub maximum_prebuild_cache_size_bytes: Byte, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } /// Errors produced by [`PassthruFile`]. @@ -74,7 +73,7 @@ pub enum Error { /// This generator is responsible for sending data to a file on disk. pub struct PassthruFile { path: PathBuf, - throttle: Throttle, + throttle: BlockThrottle, block_cache: block::Cache, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -183,9 +182,8 @@ impl PassthruFile { continue; }; - let total_bytes = self.block_cache.peek_next_size(&handle); tokio::select! { - _ = self.throttle.wait_for(total_bytes) => { + _ = self.throttle.wait_for_block(&self.block_cache, &handle) => { let block = self.block_cache.advance(&mut handle); match current_file.write_all(&block.bytes).await { Ok(()) => { diff --git a/lading/src/generator/splunk_hec.rs b/lading/src/generator/splunk_hec.rs index 779e35486..39ae0fe5b 100644 --- a/lading/src/generator/splunk_hec.rs +++ b/lading/src/generator/splunk_hec.rs @@ -45,7 +45,7 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; static CONNECTION_SEMAPHORE: OnceCell = OnceCell::new(); @@ -93,7 +93,7 @@ pub struct Config { /// The total number of parallel connections to maintain pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } #[derive(thiserror::Error, Debug)] @@ -152,7 +152,7 @@ pub struct SplunkHec { uri: Uri, token: String, parallel_connections: u16, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: Arc, metric_labels: Vec<(String, String)>, channels: Channels, @@ -288,10 +288,9 @@ impl SplunkHec { .next() .expect("channel should never be empty") .clone(); - let total_bytes = self.block_cache.peek_next_size(&handle); tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { let client = client.clone(); diff --git a/lading/src/generator/tcp.rs b/lading/src/generator/tcp.rs index c089f539e..b65093baf 100644 --- a/lading/src/generator/tcp.rs +++ b/lading/src/generator/tcp.rs @@ -32,7 +32,7 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, ConcurrencyStrategy, MetricsBuilder, ThrottleConversionError, + BlockThrottle, ConcurrencyStrategy, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; @@ -61,7 +61,7 @@ pub struct Config { #[serde(default = "default_parallel_connections")] pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } #[derive(thiserror::Error, Debug)] @@ -216,7 +216,7 @@ impl Tcp { struct TcpWorker { addr: SocketAddr, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: Arc, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -250,9 +250,8 @@ impl TcpWorker { continue; }; - let total_bytes = self.block_cache.peek_next_size(&handle); tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { let block = self.block_cache.advance(&mut handle); diff --git a/lading/src/generator/trace_agent.rs b/lading/src/generator/trace_agent.rs index a4068b413..b183a1a46 100644 --- a/lading/src/generator/trace_agent.rs +++ b/lading/src/generator/trace_agent.rs @@ -17,7 +17,7 @@ //! Additional metrics may be emitted by this generator's [throttle]. use super::General; use crate::generator::common::{ - BytesThrottleConfig, ConcurrencyStrategy, MetricsBuilder, ThrottleConversionError, + BlockThrottle, ConcurrencyStrategy, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; use bytes::Bytes; @@ -175,7 +175,7 @@ pub struct Config { /// The total number of parallel connections to maintain pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } #[derive(thiserror::Error, Debug)] @@ -246,7 +246,7 @@ pub struct TraceAgent { trace_endpoint: Uri, backoff_behavior: BackoffBehavior, concurrency: ConcurrencyStrategy, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: Arc, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -353,9 +353,8 @@ impl TraceAgent { let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); loop { - let total_bytes = self.block_cache.peek_next_size(&handle); tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { let block = self.block_cache.advance(&mut handle); diff --git a/lading/src/generator/udp.rs b/lading/src/generator/udp.rs index 3e84560b8..08f1e9bd3 100644 --- a/lading/src/generator/udp.rs +++ b/lading/src/generator/udp.rs @@ -32,7 +32,7 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, ConcurrencyStrategy, MetricsBuilder, ThrottleConversionError, + BlockThrottle, ConcurrencyStrategy, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; @@ -66,7 +66,7 @@ pub struct Config { #[serde(default = "default_parallel_connections")] pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } /// Errors produced by [`Udp`]. @@ -219,7 +219,7 @@ impl Udp { struct UdpWorker { addr: SocketAddr, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: Arc, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -234,8 +234,6 @@ impl UdpWorker { let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); loop { - let total_bytes = self.block_cache.peek_next_size(&handle); - tokio::select! { conn = UdpSocket::bind("127.0.0.1:0"), if connection.is_none() => { match conn { @@ -253,7 +251,7 @@ impl UdpWorker { } } } - result = self.throttle.wait_for(total_bytes), if connection.is_some() => { + result = self.throttle.wait_for_block(&self.block_cache, &handle), if connection.is_some() => { match result { Ok(()) => { let sock = connection.expect("connection failed"); diff --git a/lading/src/generator/unix_datagram.rs b/lading/src/generator/unix_datagram.rs index b513bb302..35b6cd3f9 100644 --- a/lading/src/generator/unix_datagram.rs +++ b/lading/src/generator/unix_datagram.rs @@ -14,7 +14,6 @@ use byte_unit::{Byte, Unit}; use futures::future::join_all; use lading_payload::block; -use lading_throttle::Throttle; use metrics::counter; use rand::{SeedableRng, rngs::StdRng}; use serde::{Deserialize, Serialize}; @@ -29,7 +28,7 @@ use tracing::{debug, error, info}; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; fn default_parallel_connections() -> u16 { @@ -68,7 +67,7 @@ pub struct Config { #[serde(default = "default_parallel_connections")] pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } /// Errors produced by [`UnixDatagram`]. @@ -233,7 +232,7 @@ impl UnixDatagram { #[derive(Debug)] struct Child { path: PathBuf, - throttle: Throttle, + throttle: BlockThrottle, block_cache: Arc, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -269,10 +268,8 @@ impl Child { let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); loop { - let total_bytes = self.block_cache.peek_next_size(&handle); - tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { // NOTE When we write into a unix socket it may be that only diff --git a/lading/src/generator/unix_stream.rs b/lading/src/generator/unix_stream.rs index f2808ef11..b79723e64 100644 --- a/lading/src/generator/unix_stream.rs +++ b/lading/src/generator/unix_stream.rs @@ -12,7 +12,6 @@ //! use lading_payload::block; -use lading_throttle::Throttle; use metrics::counter; use rand::{SeedableRng, rngs::StdRng}; use serde::{Deserialize, Serialize}; @@ -27,7 +26,7 @@ use tracing::{debug, error, info, warn}; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; fn default_parallel_connections() -> u16 { @@ -59,7 +58,7 @@ pub struct Config { #[serde(default = "default_parallel_connections")] pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } /// Errors produced by [`UnixStream`]. @@ -229,7 +228,7 @@ impl UnixStream { #[derive(Debug)] struct Child { path: PathBuf, - throttle: Throttle, + throttle: BlockThrottle, block_cache: block::Cache, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -268,10 +267,8 @@ impl Child { continue; }; - let total_bytes = self.block_cache.peek_next_size(&handle); - tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { // NOTE When we write into a unix stream it may be that only diff --git a/lading_payload/src/block.rs b/lading_payload/src/block.rs index 3684b711b..4aff3baa8 100644 --- a/lading_payload/src/block.rs +++ b/lading_payload/src/block.rs @@ -408,6 +408,20 @@ impl Cache { Handle { idx: 0 } } + /// Number of blocks in the cache. + #[must_use] + pub fn len(&self) -> usize { + match self { + Self::Fixed { blocks, .. } => blocks.len(), + } + } + + /// Whether the cache is empty. + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + /// Get the total size of the cache in bytes. #[must_use] pub fn total_size(&self) -> u64 {