From 7e1a732e92227a1d62652c6f10e5e4f7f8f19d43 Mon Sep 17 00:00:00 2001 From: Jake Saferstein Date: Wed, 7 Jan 2026 16:39:01 -0500 Subject: [PATCH] static-timestamped --- Cargo.lock | 4 + Cargo.toml | 1 + lading_payload/Cargo.toml | 2 + lading_payload/src/block.rs | 24 ++ lading_payload/src/lib.rs | 22 + lading_payload/src/static_timestamped.rs | 515 +++++++++++++++++++++++ 6 files changed, 568 insertions(+) create mode 100644 lading_payload/src/static_timestamped.rs diff --git a/Cargo.lock b/Cargo.lock index 5afdef675..eb5796795 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -605,8 +605,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link 0.2.1", ] @@ -1947,6 +1949,7 @@ dependencies = [ "arbitrary", "byte-unit", "bytes", + "chrono", "criterion", "opentelemetry-proto", "proptest", @@ -1957,6 +1960,7 @@ dependencies = [ "serde", "serde_json", "serde_tuple", + "tempfile", "thiserror 2.0.17", "time", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 634792332..565112c11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ byte-unit = { version = "5.2", default-features = false, features = [ "byte", "serde", ] } +chrono = "0.4.42" sha2 = { version = "0.10", default-features = false } metrics = { version = "0.24" } metrics-util = "0.20" diff --git a/lading_payload/Cargo.toml b/lading_payload/Cargo.toml index aee2d9018..dfca6d585 100644 --- a/lading_payload/Cargo.toml +++ b/lading_payload/Cargo.toml @@ -15,6 +15,7 @@ description = "A tool for load testing daemons." [dependencies] bytes = { workspace = true } byte-unit = { workspace = true } +chrono = { workspace = true } opentelemetry-proto = { workspace = true } prost = { workspace = true } rand = { workspace = true, features = ["small_rng", "std", "std_rng"] } @@ -32,6 +33,7 @@ arbitrary = { version = "1", optional = true, features = ["derive"] } proptest = { workspace = true } proptest-derive = { workspace = true } criterion = { version = "0.8", features = ["html_reports"] } +tempfile = { workspace = true } [features] default = [] diff --git a/lading_payload/src/block.rs b/lading_payload/src/block.rs index 4aff3baa8..eadcc2bd5 100644 --- a/lading_payload/src/block.rs +++ b/lading_payload/src/block.rs @@ -61,6 +61,9 @@ pub enum Error { /// `StaticChunks` payload creation error #[error(transparent)] StaticChunks(#[from] crate::static_chunks::Error), + /// Static timestamp-grouped payload creation error + #[error(transparent)] + StaticTimestamped(#[from] crate::static_timestamped::Error), /// Error for crate deserialization #[error("Deserialization error: {0}")] Deserialize(#[from] crate::Error), @@ -354,6 +357,27 @@ impl Cache { total_bytes.get(), )? } + crate::Config::StaticTimestamped { + static_path, + timestamp_format, + emit_placeholder, + start_line_index, + } => { + let span = span!(Level::INFO, "fixed", payload = "static-second"); + let _guard = span.enter(); + let mut serializer = crate::StaticTimestamped::new( + static_path, + timestamp_format, + *emit_placeholder, + *start_line_index, + )?; + construct_block_cache_inner( + &mut rng, + &mut serializer, + maximum_block_bytes, + total_bytes.get(), + )? + } crate::Config::OpentelemetryTraces(config) => { let mut pyld = crate::OpentelemetryTraces::with_config(*config, &mut rng)?; let span = span!(Level::INFO, "fixed", payload = "otel-traces"); diff --git a/lading_payload/src/lib.rs b/lading_payload/src/lib.rs index 034f182b6..2a18c0926 100644 --- a/lading_payload/src/lib.rs +++ b/lading_payload/src/lib.rs @@ -27,6 +27,7 @@ pub use opentelemetry::metric::OpentelemetryMetrics; pub use opentelemetry::trace::OpentelemetryTraces; pub use splunk_hec::SplunkHec; pub use static_chunks::StaticChunks; +pub use static_timestamped::StaticTimestamped; pub use statik::Static; pub use syslog::Syslog5424; @@ -41,6 +42,7 @@ pub mod opentelemetry; pub mod procfs; pub mod splunk_hec; pub mod static_chunks; +pub mod static_timestamped; pub mod statik; pub mod syslog; pub mod trace_agent; @@ -139,6 +141,23 @@ pub enum Config { /// all files under it (non-recursively) will be read line by line. static_path: PathBuf, }, + /// Generates static data grouped by second; each block contains one + /// second's worth of logs as determined by a parsed timestamp prefix. + StaticTimestamped { + /// Defines the file path to read static variant data from. Content is + /// assumed to be line-oriented. + static_path: PathBuf, + /// Chrono-compatible timestamp format string used to parse the leading + /// timestamp in each line. + timestamp_format: String, + /// Emit a minimal placeholder block (single newline) for seconds with + /// no lines. When false, empty seconds are skipped. + #[serde(default)] + emit_placeholder: bool, + /// Optional starting line offset; lines before this index are skipped. + #[serde(default)] + start_line_index: Option, + }, /// Generates a line of printable ascii characters Ascii, /// Generates a json encoded line @@ -179,6 +198,8 @@ pub enum Payload { Static(Static), /// Static file content, chunked into lines that fill blocks as closely as possible. StaticChunks(StaticChunks), + /// Static file content grouped into one-second blocks based on timestamps + StaticTimestamped(StaticTimestamped), /// Syslog RFC 5424 format Syslog(Syslog5424), /// OpenTelemetry traces @@ -208,6 +229,7 @@ impl Serialize for Payload { Payload::SplunkHec(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::Static(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::StaticChunks(ser) => ser.to_bytes(rng, max_bytes, writer), + Payload::StaticTimestamped(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::Syslog(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::OtelTraces(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::OtelLogs(ser) => ser.to_bytes(rng, max_bytes, writer), diff --git a/lading_payload/src/static_timestamped.rs b/lading_payload/src/static_timestamped.rs new file mode 100644 index 000000000..76d06246f --- /dev/null +++ b/lading_payload/src/static_timestamped.rs @@ -0,0 +1,515 @@ +//! Static file payload that emits one second of data per block, calculated by +//! parsing a timestamp at the start of each line. The parsed timestamp is +//! stripped from emitted lines; only the message body is replayed. + +use std::{ + fs::File, + io::{BufRead, BufReader, Write}, + path::{Path, PathBuf}, +}; + +use chrono::{NaiveDateTime, TimeZone, Utc}; +use rand::Rng; +use tracing::info; + +#[derive(Debug)] +struct BlockLines { + lines: Vec>, +} + +#[derive(thiserror::Error, Debug)] +/// Errors produced by [`StaticTimestamped`]. +pub enum Error { + /// IO error + #[error(transparent)] + Io(#[from] std::io::Error), + /// No lines were discovered in the provided path + #[error("No lines found in static path")] + NoLines, + /// Timestamp parsing failed for a line + #[error("Failed to parse timestamp from line: {0}")] + Timestamp(String), + /// Timestamps are not in ascending order + #[error( + "Out-of-order timestamp at line {line_number}: expected >= {previous_timestamp}, got {current_timestamp}" + )] + OutOfOrder { + /// The 0-indexed line number where the out-of-order timestamp was found + line_number: u64, + /// The timestamp of the previous line + previous_timestamp: i64, + /// The timestamp of the current (out-of-order) line + current_timestamp: i64, + }, +} + +#[derive(Debug)] +/// Static payload grouped by second boundaries. +pub struct StaticTimestamped { + /// The path to the static file. + path: PathBuf, + /// The timestamp format to use. + timestamp_format: String, + /// Whether to emit a placeholder for empty seconds. + emit_placeholder: bool, + /// The initial line to start reading from (0-indexed). None means a random offset will be chosen on first use. + initial_offset: Option, + + total_lines: u64, + reader: BufReader, + + /// Line (and its parsed timestamp) "carried" over while reading the previous block. + carry_first_line: Option<(i64, Vec)>, + /// The number of seconds to skip before reading the next block. E.g. if `pending_gap` is 1, + /// we will skip the first second before reading the second block. + pending_gap: u64, + /// The next block to read. + next_block: Option, +} + +impl StaticTimestamped { + /// Create a new instance of `StaticTimestamped` + /// + /// Lines are grouped into blocks by the second of their timestamp. The + /// timestamp is parsed from the start of the line up to the first + /// whitespace, using `timestamp_format` (chrono strftime syntax). The + /// parsed timestamp is removed from the emitted line, leaving only the + /// remainder of the message. `start_line_index`, when provided, skips that + /// many lines (modulo the total number of available lines) before + /// returning payloads. + /// + /// # Errors + /// + /// Returns an error if the file cannot be read, contains no lines, or a + /// timestamp fails to parse. + pub fn new( + path: &Path, + timestamp_format: &str, + emit_placeholder: bool, + start_line_index: Option, + ) -> Result { + let file_size_bytes = File::open(path) + .ok() + .and_then(|f| f.metadata().ok()) + .map_or(0, |m| m.len()); + + // Validation pass: verify this file has lines with valid, ascending timestamps. + let mut validation_reader = BufReader::new(File::open(path)?); + let mut total_lines: u64 = 0; + let mut prev_timestamp: Option = None; + let mut buf = String::new(); + loop { + buf.clear(); + let bytes = validation_reader.read_line(&mut buf)?; + if bytes == 0 { + break; + } + if buf.trim().is_empty() { + continue; + } + let (ts, _) = Self::parse_line_with_format(&buf, timestamp_format)?; + if let Some(prev_ts) = prev_timestamp + && ts < prev_ts + { + return Err(Error::OutOfOrder { + line_number: total_lines, + previous_timestamp: prev_ts, + current_timestamp: ts, + }); + } + prev_timestamp = Some(ts); + total_lines += 1; + } + if total_lines == 0 { + return Err(Error::NoLines); + } + + let initial_offset = start_line_index.map(|idx| idx % total_lines); + let reader = BufReader::new(File::open(path)?); + + info!( + "StaticTimestamped streaming from {} ({} bytes, {} total lines) with options emit_placeholder={}, start_line_index={:?}", + path.display(), + file_size_bytes, + total_lines, + emit_placeholder, + initial_offset + ); + + let mut instance = Self { + path: path.to_path_buf(), + timestamp_format: timestamp_format.to_owned(), + emit_placeholder, + initial_offset, + total_lines, + reader, + carry_first_line: None, + pending_gap: 0, + next_block: None, + }; + + // Reset reader state to the initial offset if specified. + if let Some(offset) = initial_offset { + instance.reset_reader(offset)?; + } + + Ok(instance) + } + + fn parse_line_with_format(line: &str, timestamp_format: &str) -> Result<(i64, Vec), Error> { + let mut parts = line.splitn(2, char::is_whitespace); + let ts_token = parts.next().unwrap_or(""); + let payload = parts + .next() + .unwrap_or("") + .trim_start() + // Strip trailing newlines so we don't double-append in `to_bytes`. + .trim_end_matches(['\r', '\n']) + .as_bytes() + .to_vec(); + let ts = NaiveDateTime::parse_from_str(ts_token, timestamp_format) + .map_err(|_| Error::Timestamp(line.to_string()))?; + let sec = Utc.from_utc_datetime(&ts).timestamp(); + Ok((sec, payload)) + } + + fn reset_reader(&mut self, offset: u64) -> Result<(), Error> { + let file = File::open(&self.path)?; + self.reader = BufReader::new(file); + self.carry_first_line = None; + self.pending_gap = 0; + self.next_block = None; + + // Skip to the specified offset. + let mut buf = String::new(); + let mut lines_skipped = 0u64; + while lines_skipped < offset { + buf.clear(); + let bytes = self.reader.read_line(&mut buf)?; + if bytes == 0 { + break; + } + if buf.trim().is_empty() { + continue; + } + lines_skipped += 1; + } + Ok(()) + } + + /// Given a target line offset, find the start of the timestamp group it belongs to. + /// This ensures we always begin reading from the first line of a timestamp group. + fn find_group_start(&self, target_offset: u64) -> Result { + let file = File::open(&self.path)?; + let mut reader = BufReader::new(file); + let mut buf = String::new(); + + let mut line_index: u64 = 0; + let mut group_start: u64 = 0; + let mut current_timestamp: Option = None; + + loop { + buf.clear(); + let bytes = reader.read_line(&mut buf)?; + if bytes == 0 { + break; + } + if buf.trim().is_empty() { + continue; + } + + let (ts, _) = Self::parse_line_with_format(&buf, &self.timestamp_format)?; + + match current_timestamp { + None => { + current_timestamp = Some(ts); + group_start = line_index; + } + Some(prev_ts) if prev_ts != ts => { + current_timestamp = Some(ts); + group_start = line_index; + } + _ => {} + } + + if line_index == target_offset { + break; + } + + line_index += 1; + } + + Ok(group_start) + } + + fn fill_next_block(&mut self) -> Result<(), Error> { + if self.next_block.is_some() { + return Ok(()); + } + + // Try to read a block, wrapping around if we hit EOF + loop { + self.next_block = self.read_next_block()?; + if self.next_block.is_some() { + break; + } + // Hit EOF - wrap around. + self.reset_reader(0)?; + } + Ok(()) + } + + fn read_next_block(&mut self) -> Result, Error> { + // While reading with pending gap, emit an empty block. This occurs with files don't populate every second with logs e.g.: + // 2024-01-01T00:00:00 file line 0 + // + // 2024-01-01T00:00:10 file line 1 + if self.pending_gap > 0 { + self.pending_gap -= 1; + return Ok(Some(BlockLines { lines: Vec::new() })); + } + + let mut lines: Vec> = Vec::with_capacity(256); + let mut current_second: Option = None; + + // If we have a carry-over line (found when reading the previous block), use it to start the block. + if let Some((carry_sec, payload)) = self.carry_first_line.take() { + current_second = Some(carry_sec); + lines.push(payload); + } + + let mut buf = String::new(); + loop { + buf.clear(); + let bytes = self.reader.read_line(&mut buf)?; + if bytes == 0 { + break; + } + if buf.trim().is_empty() { + continue; + } + + let (line_sec, payload) = Self::parse_line_with_format(&buf, &self.timestamp_format)?; + + match current_second { + None => { + // No carry over - start a new block + current_second = Some(line_sec); + lines.push(payload); + } + Some(s) if s == line_sec => { + // This line is in the same second as the current block add to it. + lines.push(payload); + } + Some(s) if s < line_sec => { + // This line is in the future - set the pending gap and carry over the line. + if self.emit_placeholder && line_sec > s + 1 { + // The condition above guarantees line_sec - s - 1 > 0 + self.pending_gap = u64::try_from(line_sec - s - 1).unwrap_or(0); + } + self.carry_first_line = Some((line_sec, payload)); + break; + } + Some(s) => { + // This should be unreachable due to validation in `new()`, but return + // an error defensively + return Err(Error::OutOfOrder { + line_number: 0, // Line number not tracked at runtime + previous_timestamp: s, + current_timestamp: line_sec, + }); + } + } + } + if current_second.is_none() && lines.is_empty() { + // EOF reached return None and let fill_next_block handle the wrap around. + return Ok(None); + } + Ok(Some(BlockLines { lines })) + } +} + +impl crate::Serialize for StaticTimestamped { + fn to_bytes( + &mut self, + mut rng: R, + max_bytes: usize, + writer: &mut W, + ) -> Result<(), crate::Error> + where + R: Rng + Sized, + W: Write, + { + if self.initial_offset.is_none() { + let random_offset = rng.random_range(0..self.total_lines); + // Backtrack to the start of the timestamp group containing this line + let adjusted_offset = self.find_group_start(random_offset)?; + self.initial_offset = Some(adjusted_offset); + // Reset reader to apply the adjusted offset + self.reset_reader(adjusted_offset)?; + } + + self.fill_next_block()?; + let block = self + .next_block + .take() + .expect("fill_next_block guarantees a block"); + + let mut bytes_written = 0usize; + if block.lines.is_empty() { + // When requested, emit a minimal placeholder (one newline) for + // empty seconds to preserve timing gaps without breaking the + // non-zero block invariant. + if self.emit_placeholder && max_bytes > 0 { + writer.write_all(b"\n")?; + } + } else { + for line in &block.lines { + let needed = line.len() + 1; // newline + if bytes_written + needed > max_bytes { + break; + } + writer.write_all(line)?; + writer.write_all(b"\n")?; + bytes_written += needed; + } + } + Ok(()) + } +} + +impl From for crate::Error { + fn from(err: Error) -> Self { + match err { + Error::Io(e) => crate::Error::Io(e), + Error::NoLines | Error::Timestamp(_) | Error::OutOfOrder { .. } => { + crate::Error::Serialize + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Serialize; + use rand::{SeedableRng, rngs::StdRng}; + use std::{fs::File, io::Write}; + use tempfile::tempdir; + + #[test] + fn removes_timestamp_from_output() { + let temp_dir = tempdir().unwrap(); + let path = temp_dir.path().join("static_second_test.log"); + { + let mut f = File::create(&path).unwrap(); + writeln!(f, "2024-01-01T00:00:00 first").unwrap(); + writeln!(f, "2024-01-01T00:00:00 second").unwrap(); + writeln!(f, "2024-01-01T00:00:01 third").unwrap(); + } + + let mut serializer = StaticTimestamped::new( + &path, + "%Y-%m-%dT%H:%M:%S", + /* emit_placeholder */ false, + None, + ) + .unwrap(); + let mut rng = StdRng::seed_from_u64(7); + let mut buf = Vec::new(); + + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + assert_eq!(buf, b"first\nsecond\n"); + + buf.clear(); + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + assert_eq!(buf, b"third\n"); + } + + #[test] + fn emits_placeholders_for_missing_seconds() { + let temp_dir = tempdir().unwrap(); + let path = temp_dir.path().join("placeholder_test.log"); + { + let mut f = File::create(&path).unwrap(); + writeln!(f, "2024-01-01T00:00:00 first").unwrap(); + // Intentionally skip 00:00:01 + writeln!(f, "2024-01-01T00:00:02 third").unwrap(); + } + + let mut serializer = + StaticTimestamped::new(&path, "%Y-%m-%dT%H:%M:%S", true, None).unwrap(); + let mut rng = StdRng::seed_from_u64(7); + + let mut buf = Vec::new(); + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + assert_eq!(buf, b"first\n"); + + buf.clear(); + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + // Placeholder newline for the missing second + assert_eq!(buf, b"\n"); + + buf.clear(); + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + assert_eq!(buf, b"third\n"); + } + + #[test] + fn honors_start_line_index_with_wraparound() { + let temp_dir = tempdir().unwrap(); + let path = temp_dir.path().join("start_index_test.log"); + { + let mut f = File::create(&path).unwrap(); + // Two lines in the first second, one in the second second. + writeln!(f, "2024-01-01T00:00:00 first").unwrap(); + writeln!(f, "2024-01-01T00:00:00 second").unwrap(); + writeln!(f, "2024-01-01T00:00:01 third").unwrap(); + } + + // Skip the first two lines; the stream should begin with "third". + let mut serializer = + StaticTimestamped::new(&path, "%Y-%m-%dT%H:%M:%S", false, Some(2)).unwrap(); + let mut rng = StdRng::seed_from_u64(7); + + let mut buf = Vec::new(); + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + assert_eq!(buf, b"third\n"); + + buf.clear(); + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + // After wrapping, we return to the beginning of the stream. + assert_eq!(buf, b"first\nsecond\n"); + } + + #[test] + fn random_offset_backtracks_to_group_start() { + let temp_dir = tempdir().unwrap(); + let path = temp_dir.path().join("backtrack_test.log"); + { + let mut f = File::create(&path).unwrap(); + // Three lines in the first second (indices 0, 1, 2) + writeln!(f, "2024-01-01T00:00:00 ts 1 line 0").unwrap(); + writeln!(f, "2024-01-01T00:00:00 ts 1 line 1").unwrap(); + writeln!(f, "2024-01-01T00:00:00 ts 1 line 2").unwrap(); + // Two lines in the second second (indices 3, 4) + writeln!(f, "2024-01-01T00:00:01 ts 2 line 3").unwrap(); + writeln!(f, "2024-01-01T00:00:01 ts 2 line 4").unwrap(); + } + + // Test find_group_start directly + let serializer = + StaticTimestamped::new(&path, "%Y-%m-%dT%H:%M:%S", false, Some(0)).unwrap(); + + // Offset 0 is at start of first group -> should return 0 + assert_eq!(serializer.find_group_start(0).unwrap(), 0); + // Offset 1 is in first group -> should backtrack to 0 + assert_eq!(serializer.find_group_start(1).unwrap(), 0); + // Offset 2 is in first group -> should backtrack to 0 + assert_eq!(serializer.find_group_start(2).unwrap(), 0); + // Offset 3 is at start of second group -> should return 3 + assert_eq!(serializer.find_group_start(3).unwrap(), 3); + // Offset 4 is in second group -> should backtrack to 3 + assert_eq!(serializer.find_group_start(4).unwrap(), 3); + } +}