Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions lading_capture/src/formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub enum Error {
/// IO errors during write operations
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
/// Rotation not supported for this format
#[error("File rotation not supported for this format")]
RotationNotSupported,
}

/// Trait for output format implementations
Expand Down Expand Up @@ -63,6 +66,29 @@ pub trait OutputFormat {
///
/// Returns an error if closing fails.
fn close(self) -> Result<(), Error>;

/// Rotate to a new output file
///
/// Flushes and closes the current file (writing footer for Parquet), then
/// opens a new file at the specified path. This allows continuous metrics
/// collection while producing multiple readable output files.
///
/// The default implementation returns `RotationNotSupported`. Formats that
/// support rotation (like Parquet with file-based writers) should override.
///
/// # Arguments
///
/// * `path` - Path for the new output file
///
/// # Errors
///
/// Returns an error if rotation is not supported or if file operations fail.
fn rotate(self, _path: std::path::PathBuf) -> Result<Self, Error>
where
Self: Sized,
{
Err(Error::RotationNotSupported)
}
}

#[cfg(test)]
Expand Down
33 changes: 33 additions & 0 deletions lading_capture/src/formats/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@
writer: ArrowWriter<W>,
/// Pre-computed Arrow schema
schema: Arc<Schema>,
/// Compression level for Zstd (stored for rotation)
compression_level: i32,
}

impl<W: Write + Seek + Send> Format<W> {
Expand Down Expand Up @@ -192,6 +194,7 @@
buffers: ColumnBuffers::new(),
writer: arrow_writer,
schema,
compression_level,
})
}

Expand Down Expand Up @@ -338,6 +341,36 @@
}
}

impl Format<std::io::BufWriter<std::fs::File>> {
/// Rotate to a new output file
///
/// Closes the current Parquet file (writing footer) and opens a new file
/// at the specified path with the same compression settings.
///
/// # Errors
///
/// Returns an error if closing the current file or creating the new file fails.
pub fn rotate_to(self, path: std::path::PathBuf) -> Result<Self, Error> {

Check failure on line 353 in lading_capture/src/formats/parquet.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

this argument is passed by value, but not consumed in the function body

Check failure on line 353 in lading_capture/src/formats/parquet.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

this argument is passed by value, but not consumed in the function body
// Store compression level before closing
let compression_level = self.compression_level;

// Close current file (writes footer)
self.close()?;

// Create new file and writer
let file = std::fs::File::create(&path)?;
let writer = std::io::BufWriter::new(file);
let format = Self::new(writer, compression_level)?;

Ok(format)
}

/// Get the compression level for this format
pub fn compression_level(&self) -> i32 {

Check failure on line 369 in lading_capture/src/formats/parquet.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

this method could have a `#[must_use]` attribute

Check failure on line 369 in lading_capture/src/formats/parquet.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

this method could have a `#[must_use]` attribute
self.compression_level
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
183 changes: 182 additions & 1 deletion lading_capture/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
};

use arc_swap::ArcSwap;
use tokio::{fs, sync::mpsc, time};
use tokio::{fs, sync::mpsc, sync::oneshot, time};

use crate::{
accumulator,
Expand Down Expand Up @@ -445,6 +445,27 @@
}
}

/// Request to rotate to a new output file
///
/// Contains the path for the new file and a channel to send the result.
pub struct RotationRequest {
/// Path for the new output file
pub path: PathBuf,
/// Channel to send rotation result (Ok on success, Err on failure)
pub response: oneshot::Sender<Result<(), formats::Error>>,
}

impl std::fmt::Debug for RotationRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RotationRequest")
.field("path", &self.path)
.finish_non_exhaustive()
}
}

/// Handle for sending rotation requests to a running CaptureManager

Check failure on line 466 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

item in documentation is missing backticks

Check failure on line 466 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

item in documentation is missing backticks
pub type RotationSender = mpsc::Sender<RotationRequest>;

impl CaptureManager<formats::parquet::Format<BufWriter<std::fs::File>>, RealClock> {
/// Create a new [`CaptureManager`] with file-based Parquet writer
///
Expand Down Expand Up @@ -478,6 +499,166 @@
RealClock::default(),
))
}

/// Run [`CaptureManager`] with file rotation support
///
/// Similar to [`start`](CaptureManager::start), but also provides a channel
/// for rotation requests. When a rotation request is received, the current
/// Parquet file is finalized (footer written) and a new file is created at
/// the specified path.
///
/// Returns a tuple of ([`RotationSender`], [`JoinHandle`](tokio::task::JoinHandle))
/// immediately. The `RotationSender` can be used to trigger rotations while
/// the event loop runs. The `JoinHandle` can be awaited to ensure the
/// CaptureManager has fully drained and closed before shutdown.

Check failure on line 513 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

item in documentation is missing backticks

Check failure on line 513 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

item in documentation is missing backticks
///
/// # Errors
///
/// Returns an error if there is already a global recorder set.
#[allow(clippy::cast_possible_truncation)]
pub async fn start_with_rotation(

Check failure on line 519 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

docs for function which may panic missing `# Panics` section

Check failure on line 519 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

docs for function which may panic missing `# Panics` section
mut self,
) -> Result<(RotationSender, tokio::task::JoinHandle<()>), Error> {
// Create rotation channel - return the sender immediately
let (rotation_tx, rotation_rx) = mpsc::channel::<RotationRequest>(4);

// Initialize historical sender
HISTORICAL_SENDER.store(Arc::new(Some(Arc::new(Sender {
snd: self.snd.clone(),
}))));

self.install()?;
info!("Capture manager installed with rotation support, recording to capture file.");

// Wait until the target is running then mark time-zero
self.target_running.recv().await;
self.clock.mark_start();

let compression_level = self.format.compression_level();

// Run the event loop in a spawned task so we can return the sender immediately
let expiration = self.expiration;
let format = self.format;
let flush_seconds = self.flush_seconds;
let registry = self.registry;
let accumulator = self.accumulator;
let global_labels = self.global_labels;

Check warning on line 545 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, fmt)

Diff in /home/runner/work/lading/lading/lading_capture/src/manager.rs

Check warning on line 545 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (macos-latest, fmt)

Diff in /Users/runner/work/lading/lading/lading_capture/src/manager.rs

Check warning on line 545 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, fmt)

Diff in /home/runner/work/lading/lading/lading_capture/src/manager.rs

Check warning on line 545 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (macos-latest, fmt)

Diff in /Users/runner/work/lading/lading/lading_capture/src/manager.rs
let clock = self.clock;
let recv = self.recv;
let shutdown = self.shutdown.take().expect("shutdown watcher must be present");

let handle = tokio::spawn(async move {
if let Err(e) = Self::rotation_event_loop(
expiration,
format,
flush_seconds,
registry,
accumulator,
global_labels,
clock,
recv,
shutdown,
rotation_rx,
compression_level,
)
.await
{
error!(error = %e, "CaptureManager rotation event loop error");
}
});

Ok((rotation_tx, handle))
}

/// Internal event loop with rotation support
#[allow(clippy::too_many_arguments)]
async fn rotation_event_loop(
expiration: Duration,
format: formats::parquet::Format<BufWriter<std::fs::File>>,
flush_seconds: u64,
registry: Arc<Registry<Key, AtomicStorage>>,
accumulator: Accumulator,
global_labels: FxHashMap<String, String>,
clock: RealClock,
mut recv: mpsc::Receiver<Metric>,
shutdown: lading_signal::Watcher,
mut rotation_rx: mpsc::Receiver<RotationRequest>,
compression_level: i32,
) -> Result<(), Error> {
let mut flush_interval = clock.interval(Duration::from_millis(TICK_DURATION_MS as u64));

Check failure on line 588 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

casting `u128` to `u64` may truncate the value

Check failure on line 588 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

casting `u128` to `u64` may truncate the value
let shutdown_wait = shutdown.recv();
tokio::pin!(shutdown_wait);

// Create state machine with owned state
let mut state_machine = StateMachine::new(
expiration,
format,
flush_seconds,
registry,
accumulator,
global_labels,
clock,
);

// Event loop with rotation support
loop {
let event = tokio::select! {
val = recv.recv() => {
match val {
Some(metric) => Event::MetricReceived(metric),
None => Event::ChannelClosed,
}
}
() = flush_interval.tick() => Event::FlushTick,
Some(rotation_req) = rotation_rx.recv() => {
// Handle rotation inline since it's not a state machine event
let result = Self::handle_rotation(
&mut state_machine,
rotation_req.path,
compression_level,
).await;
// Send result back to caller (ignore send error if receiver dropped)
let _ = rotation_req.response.send(result);
continue;
}
() = &mut shutdown_wait => Event::ShutdownSignaled,
};

match state_machine.next(event)? {
Operation::Continue => {}
Operation::Exit => return Ok(()),
}
}
}

/// Handle a rotation request
async fn handle_rotation(
state_machine: &mut StateMachine<
formats::parquet::Format<BufWriter<std::fs::File>>,
RealClock,
>,
new_path: PathBuf,
compression_level: i32,
) -> Result<(), formats::Error> {
// Create new file and format
let fp = fs::File::create(&new_path)
.await
.map_err(formats::Error::Io)?;
let fp = fp.into_std().await;
let writer = BufWriter::new(fp);
let new_format = parquet::Format::new(writer, compression_level)?;

// Swap formats - this flushes any buffered data
let old_format = state_machine
.replace_format(new_format)
.map_err(|e| formats::Error::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?;

Check failure on line 654 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

this can be `std::io::Error::other(_)`

Check failure on line 654 in lading_capture/src/manager.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, clippy)

this can be `std::io::Error::other(_)`

// Close old format to write Parquet footer
old_format.close()?;

info!(path = %new_path.display(), "Rotated to new capture file");
Ok(())
}
}

impl
Expand Down
33 changes: 33 additions & 0 deletions lading_capture/src/manager/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,39 @@ impl<F: OutputFormat, C: Clock> StateMachine<F, C> {
Ok(Operation::Exit)
}

/// Replace the current format with a new one, returning the old format.
///
/// This method flushes any buffered data before returning the old format.
/// The caller is responsible for closing the old format (to write any
/// footer/metadata) and providing a properly initialized new format.
///
/// This enables file rotation: the caller can close the old format (writing
/// the Parquet footer), create a new file, and provide the new format.
///
/// # Errors
///
/// Returns an error if flushing the current format fails.
///
/// # Panics
///
/// Panics if called when no format is present (after shutdown).
pub(crate) fn replace_format(&mut self, new_format: F) -> Result<F, Error> {
// Flush any buffered data in the current format
self.format
.as_mut()
.expect("format must be present during operation")
.flush()?;

// Swap in the new format and return the old one
let old_format = self
.format
.replace(new_format)
.expect("format must be present during operation");

info!("Format replaced for file rotation");
Ok(old_format)
}

/// Convert an Instant timestamp to `Accumulator` logical tick time.
#[inline]
fn instant_to_tick(&self, timestamp: Instant) -> u64 {
Expand Down
Loading