Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions lading/src/generator/file_gen/logrotate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ use tokio::{
io::{AsyncWriteExt, BufWriter},
task::{JoinError, JoinHandle},
};
use tracing::{error, info};
use tracing::{error, info, warn};

use lading_payload::block;

use super::General;
use crate::generator::common::{
BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle,
BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, ThrottleMode,
create_throttle,
};

/// An enum to allow us to determine what operation caused an IO errror as the
Expand Down Expand Up @@ -150,6 +151,14 @@ pub struct Config {
/// Throughput profile controlling emission rate (bytes or blocks).
#[serde(default)]
pub throttle: Option<ThrottleConfig>,
/// Force flush every n blocks. This generator uses a `BufWriter` with capacity
/// based on the throttle's maximum capacity. So when the block cache outputs data roughly
/// equal to the throttle's max rate, the `BufWriter` will flush roughly every second.
/// However when blocks are small relative to the maximum possible rate, the `BufWriter`
/// will flush less frequently. This setting allows you to force a flush after writing to N blocks
/// to create a more consistent flush interval.
#[serde(default)]
pub flush_every_n_blocks: Option<NonZeroU32>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -240,6 +249,7 @@ impl Server {
throughput_throttle,
shutdown.clone(),
child_labels,
config.flush_every_n_blocks,
);

handles.push(tokio::spawn(child.spin()));
Expand Down Expand Up @@ -290,6 +300,7 @@ struct Child {
throttle: BlockThrottle,
shutdown: lading_signal::Watcher,
labels: Vec<(String, String)>,
flush_every_n_blocks: Option<NonZeroU32>,
}

impl Child {
Expand All @@ -303,6 +314,7 @@ impl Child {
throttle: BlockThrottle,
shutdown: lading_signal::Watcher,
labels: Vec<(String, String)>,
flush_every_n_blocks: Option<NonZeroU32>,
) -> Self {
let mut names = Vec::with_capacity((total_rotations + 1).into());
names.push(PathBuf::from(basename));
Expand All @@ -325,16 +337,29 @@ impl Child {
throttle,
shutdown,
labels,
flush_every_n_blocks,
}
}

async fn spin(mut self) -> Result<(), Error> {
let mut handle = self.block_cache.handle();
// Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity
// (converted to bytes if necessary) to approximate flush every second (ASSUMING that throttler will write
// bytes at the throttle's maximum byte rate. When using a block throttler with blocks smaller than
// the throttle's maximum block size, this will flush less frequently and thus it's recommended to use
// the flush_every_n_blocks setting).
if self.throttle.mode == ThrottleMode::Blocks && self.flush_every_n_blocks.is_none() {
warn!(
"BufWriter flush frequency can be inconsistent when using block-based throttling - \
consider setting flush_every_n_blocks in your generator config"
);
}
let buffer_capacity = self
.throttle
.maximum_capacity_bytes(self.maximum_block_size);
let mut total_bytes_written: u64 = 0;
let maximum_bytes_per_log: u64 = u64::from(self.maximum_bytes_per_log.get());
let mut blocks_since_flush: u32 = 0;

let total_names = self.names.len();
// SAFETY: By construction there is guaranteed to be at least one name.
Expand Down Expand Up @@ -374,7 +399,7 @@ impl Child {
result = self.throttle.wait_for_block(&self.block_cache, &handle) => {
match result {
Ok(()) => {
write_bytes(self.block_cache.advance(&mut handle),
let did_rotate = write_bytes(self.block_cache.advance(&mut handle),
&mut fp,
&mut total_bytes_written,
buffer_capacity,
Expand All @@ -383,6 +408,15 @@ impl Child {
&self.names,
last_name,
&self.labels).await?;
if did_rotate {
blocks_since_flush = 0;
} else {
blocks_since_flush += 1;
if self.flush_every_n_blocks.is_some_and(|n| blocks_since_flush == n.get()) {
fp.flush().await.map_err(|err| Error::IoFlush { err })?;
blocks_since_flush = 0;
}
}
}
Err(err) => {
error!("Discarding block due to throttle error: {err}");
Expand All @@ -401,6 +435,8 @@ impl Child {
}
}

/// Writes a block to the file, rotating if necessary.
/// Returns `true` if a rotation occurred (and thus the file was flushed).
#[allow(clippy::too_many_arguments)]
async fn write_bytes(
blk: &block::Block,
Expand All @@ -412,7 +448,7 @@ async fn write_bytes(
names: &[PathBuf],
last_name: &Path,
labels: &[(String, String)],
) -> Result<(), Error> {
) -> Result<bool, Error> {
let total_bytes = u64::from(blk.total_bytes.get());

{
Expand Down Expand Up @@ -479,7 +515,8 @@ async fn write_bytes(
})?,
);
*total_bytes_written = 0;
return Ok(true);
}

Ok(())
Ok(false)
}
32 changes: 29 additions & 3 deletions lading/src/generator/file_gen/traditional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ use tokio::{
io::{AsyncWriteExt, BufWriter},
task::{JoinError, JoinSet},
};
use tracing::{error, info};
use tracing::{error, info, warn};

use lading_payload::{self, block};

use super::General;
use crate::generator::common::{
BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle,
BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, ThrottleMode,
create_throttle,
};

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -121,6 +122,14 @@ pub struct Config {
rotate: bool,
/// The load throttle configuration
pub throttle: Option<ThrottleConfig>,
/// Force flush every n blocks. This generator uses a `BufWriter` with capacity
/// based on the throttle's maximum capacity. So when the block cache outputs data roughly
/// equal to the throttle's max rate, the `BufWriter` will flush roughly every second.
/// However when blocks are small relative to the maximum possible rate, the `BufWriter`
/// will flush less frequently. This setting allows you to force a flush after writing to N blocks
/// to create a more consistent flush interval.
#[serde(default)]
pub flush_every_n_blocks: Option<NonZeroU32>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -200,6 +209,7 @@ impl Server {
file_index: Arc::clone(&file_index),
rotate: config.rotate,
shutdown: shutdown.clone(),
flush_every_n_blocks: config.flush_every_n_blocks,
};

handles.spawn(child.spin());
Expand Down Expand Up @@ -275,6 +285,7 @@ struct Child {
rotate: bool,
file_index: Arc<AtomicU32>,
shutdown: lading_signal::Watcher,
flush_every_n_blocks: Option<NonZeroU32>,
}

impl Child {
Expand All @@ -288,7 +299,16 @@ impl Child {

let mut handle = self.block_cache.handle();
// Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity
// (converted to bytes if necessary) to approximate flush every second.
// (converted to bytes if necessary) to approximate flush every second (ASSUMING that throttler will write
// bytes at the throttle's maximum byte rate. When using a block throttler with blocks smaller than
// the throttle's maximum block size, this will flush less frequently and thus it's reccomended to use
// the flush_every_n_blocks setting).
if self.throttle.mode == ThrottleMode::Blocks && self.flush_every_n_blocks.is_none() {
warn!(
"BufWriter flush frequency can be inconsistent when using block-based throttling -
consider setting flush_every_n_blocks in your generator config"
);
}
let buffer_capacity = self
.throttle
.maximum_capacity_bytes(self.maximum_block_size);
Expand All @@ -312,6 +332,7 @@ impl Child {
);
let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
let mut blocks_since_flush = 0;
loop {
tokio::select! {
result = self.throttle.wait_for_block(&self.block_cache, &handle) => {
Expand All @@ -324,10 +345,12 @@ impl Child {
fp.write_all(&block.bytes).await?;
counter!("bytes_written").increment(total_bytes);
total_bytes_written += total_bytes;
blocks_since_flush += 1;
}

if total_bytes_written > maximum_bytes_per_file {
fp.flush().await?;
blocks_since_flush = 0;
if self.rotate {
// Delete file, leaving any open file handlers intact. This
// includes our own `fp` for the time being.
Expand Down Expand Up @@ -355,6 +378,9 @@ impl Child {
})?,
);
total_bytes_written = 0;
} else if self.flush_every_n_blocks.is_some_and(|n| blocks_since_flush == n.get()) {
fp.flush().await?;
blocks_since_flush = 0;
}
}
Err(err) => {
Expand Down
Loading