diff --git a/lading/src/generator/file_gen/logrotate.rs b/lading/src/generator/file_gen/logrotate.rs index 8588bd9b0..1114bbb08 100644 --- a/lading/src/generator/file_gen/logrotate.rs +++ b/lading/src/generator/file_gen/logrotate.rs @@ -30,13 +30,14 @@ use tokio::{ io::{AsyncWriteExt, BufWriter}, task::{JoinError, JoinHandle}, }; -use tracing::{error, info}; +use tracing::{error, info, warn}; use lading_payload::block; use super::General; use crate::generator::common::{ - BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, ThrottleMode, + create_throttle, }; /// An enum to allow us to determine what operation caused an IO errror as the @@ -150,6 +151,14 @@ pub struct Config { /// Throughput profile controlling emission rate (bytes or blocks). #[serde(default)] pub throttle: Option, + /// Force flush every n blocks. This generator uses a `BufWriter` with capacity + /// based on the throttle's maximum capacity. So when the block cache outputs data roughly + /// equal to the throttle's max rate, the `BufWriter` will flush roughly every second. + /// However when blocks are small relative to the maximum possible rate, the `BufWriter` + /// will flush less frequently. This setting allows you to force a flush after writing to N blocks + /// to create a more consistent flush interval. + #[serde(default)] + pub flush_every_n_blocks: Option, } #[derive(Debug)] @@ -240,6 +249,7 @@ impl Server { throughput_throttle, shutdown.clone(), child_labels, + config.flush_every_n_blocks, ); handles.push(tokio::spawn(child.spin())); @@ -290,6 +300,7 @@ struct Child { throttle: BlockThrottle, shutdown: lading_signal::Watcher, labels: Vec<(String, String)>, + flush_every_n_blocks: Option, } impl Child { @@ -303,6 +314,7 @@ impl Child { throttle: BlockThrottle, shutdown: lading_signal::Watcher, labels: Vec<(String, String)>, + flush_every_n_blocks: Option, ) -> Self { let mut names = Vec::with_capacity((total_rotations + 1).into()); names.push(PathBuf::from(basename)); @@ -325,16 +337,29 @@ impl Child { throttle, shutdown, labels, + flush_every_n_blocks, } } async fn spin(mut self) -> Result<(), Error> { let mut handle = self.block_cache.handle(); + // Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity + // (converted to bytes if necessary) to approximate flush every second (ASSUMING that throttler will write + // bytes at the throttle's maximum byte rate. When using a block throttler with blocks smaller than + // the throttle's maximum block size, this will flush less frequently and thus it's recommended to use + // the flush_every_n_blocks setting). + if self.throttle.mode == ThrottleMode::Blocks && self.flush_every_n_blocks.is_none() { + warn!( + "BufWriter flush frequency can be inconsistent when using block-based throttling - \ + consider setting flush_every_n_blocks in your generator config" + ); + } let buffer_capacity = self .throttle .maximum_capacity_bytes(self.maximum_block_size); let mut total_bytes_written: u64 = 0; let maximum_bytes_per_log: u64 = u64::from(self.maximum_bytes_per_log.get()); + let mut blocks_since_flush: u32 = 0; let total_names = self.names.len(); // SAFETY: By construction there is guaranteed to be at least one name. @@ -374,7 +399,7 @@ impl Child { result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { - write_bytes(self.block_cache.advance(&mut handle), + let did_rotate = write_bytes(self.block_cache.advance(&mut handle), &mut fp, &mut total_bytes_written, buffer_capacity, @@ -383,6 +408,15 @@ impl Child { &self.names, last_name, &self.labels).await?; + if did_rotate { + blocks_since_flush = 0; + } else { + blocks_since_flush += 1; + if self.flush_every_n_blocks.is_some_and(|n| blocks_since_flush == n.get()) { + fp.flush().await.map_err(|err| Error::IoFlush { err })?; + blocks_since_flush = 0; + } + } } Err(err) => { error!("Discarding block due to throttle error: {err}"); @@ -401,6 +435,8 @@ impl Child { } } +/// Writes a block to the file, rotating if necessary. +/// Returns `true` if a rotation occurred (and thus the file was flushed). #[allow(clippy::too_many_arguments)] async fn write_bytes( blk: &block::Block, @@ -412,7 +448,7 @@ async fn write_bytes( names: &[PathBuf], last_name: &Path, labels: &[(String, String)], -) -> Result<(), Error> { +) -> Result { let total_bytes = u64::from(blk.total_bytes.get()); { @@ -479,7 +515,8 @@ async fn write_bytes( })?, ); *total_bytes_written = 0; + return Ok(true); } - Ok(()) + Ok(false) } diff --git a/lading/src/generator/file_gen/traditional.rs b/lading/src/generator/file_gen/traditional.rs index bc55f15b5..91d493772 100644 --- a/lading/src/generator/file_gen/traditional.rs +++ b/lading/src/generator/file_gen/traditional.rs @@ -31,13 +31,14 @@ use tokio::{ io::{AsyncWriteExt, BufWriter}, task::{JoinError, JoinSet}, }; -use tracing::{error, info}; +use tracing::{error, info, warn}; use lading_payload::{self, block}; use super::General; use crate::generator::common::{ - BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, ThrottleMode, + create_throttle, }; #[derive(thiserror::Error, Debug)] @@ -121,6 +122,14 @@ pub struct Config { rotate: bool, /// The load throttle configuration pub throttle: Option, + /// Force flush every n blocks. This generator uses a `BufWriter` with capacity + /// based on the throttle's maximum capacity. So when the block cache outputs data roughly + /// equal to the throttle's max rate, the `BufWriter` will flush roughly every second. + /// However when blocks are small relative to the maximum possible rate, the `BufWriter` + /// will flush less frequently. This setting allows you to force a flush after writing to N blocks + /// to create a more consistent flush interval. + #[serde(default)] + pub flush_every_n_blocks: Option, } #[derive(Debug)] @@ -200,6 +209,7 @@ impl Server { file_index: Arc::clone(&file_index), rotate: config.rotate, shutdown: shutdown.clone(), + flush_every_n_blocks: config.flush_every_n_blocks, }; handles.spawn(child.spin()); @@ -275,6 +285,7 @@ struct Child { rotate: bool, file_index: Arc, shutdown: lading_signal::Watcher, + flush_every_n_blocks: Option, } impl Child { @@ -288,7 +299,16 @@ impl Child { let mut handle = self.block_cache.handle(); // Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity - // (converted to bytes if necessary) to approximate flush every second. + // (converted to bytes if necessary) to approximate flush every second (ASSUMING that throttler will write + // bytes at the throttle's maximum byte rate. When using a block throttler with blocks smaller than + // the throttle's maximum block size, this will flush less frequently and thus it's reccomended to use + // the flush_every_n_blocks setting). + if self.throttle.mode == ThrottleMode::Blocks && self.flush_every_n_blocks.is_none() { + warn!( + "BufWriter flush frequency can be inconsistent when using block-based throttling - + consider setting flush_every_n_blocks in your generator config" + ); + } let buffer_capacity = self .throttle .maximum_capacity_bytes(self.maximum_block_size); @@ -312,6 +332,7 @@ impl Child { ); let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); + let mut blocks_since_flush = 0; loop { tokio::select! { result = self.throttle.wait_for_block(&self.block_cache, &handle) => { @@ -324,10 +345,12 @@ impl Child { fp.write_all(&block.bytes).await?; counter!("bytes_written").increment(total_bytes); total_bytes_written += total_bytes; + blocks_since_flush += 1; } if total_bytes_written > maximum_bytes_per_file { fp.flush().await?; + blocks_since_flush = 0; if self.rotate { // Delete file, leaving any open file handlers intact. This // includes our own `fp` for the time being. @@ -355,6 +378,9 @@ impl Child { })?, ); total_bytes_written = 0; + } else if self.flush_every_n_blocks.is_some_and(|n| blocks_since_flush == n.get()) { + fp.flush().await?; + blocks_since_flush = 0; } } Err(err) => {