From 8bfd82eeca03d768f88ed5a87e7d5388ab9aad9d Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Wed, 14 Jan 2026 21:46:45 +0000 Subject: [PATCH] feat(capture): add file rotation support for Parquet format Add rotation API to CaptureManager that allows rotating to new output files without stopping the capture. This enables long-running capture sessions to produce multiple readable Parquet files with valid footers. Changes: - Add RotationRequest/RotationSender types for async rotation requests - Add start_with_rotation() that spawns event loop and returns (sender, JoinHandle) - Add replace_format() to StateMachine for IO-agnostic format swapping - Add rotate() trait method stub to OutputFormat (returns error by default) - Add rotate_to() inherent method on parquet Format> - Return JoinHandle for graceful shutdown and data flush guarantees The rotation flow: 1. Caller sends RotationRequest with new file path via RotationSender 2. CaptureManager creates new file and format 3. StateMachine.replace_format() flushes and swaps formats 4. Old format is closed (writing Parquet footer) 5. Response sent back to caller The returned JoinHandle allows callers to await the capture task to ensure all buffered metrics are flushed before process exit. Combines commits d0aebbe and 0542dd6 from sopell/expose-observer-public. Co-Authored-By: Claude Sonnet 4.5 --- lading_capture/src/formats.rs | 26 +++ lading_capture/src/formats/parquet.rs | 33 ++++ lading_capture/src/manager.rs | 183 +++++++++++++++++++- lading_capture/src/manager/state_machine.rs | 33 ++++ 4 files changed, 274 insertions(+), 1 deletion(-) diff --git a/lading_capture/src/formats.rs b/lading_capture/src/formats.rs index 571f09b88..416954d3d 100644 --- a/lading_capture/src/formats.rs +++ b/lading_capture/src/formats.rs @@ -24,6 +24,9 @@ pub enum Error { /// IO errors during write operations #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Rotation not supported for this format + #[error("File rotation not supported for this format")] + RotationNotSupported, } /// Trait for output format implementations @@ -63,6 +66,29 @@ pub trait OutputFormat { /// /// Returns an error if closing fails. fn close(self) -> Result<(), Error>; + + /// Rotate to a new output file + /// + /// Flushes and closes the current file (writing footer for Parquet), then + /// opens a new file at the specified path. This allows continuous metrics + /// collection while producing multiple readable output files. + /// + /// The default implementation returns `RotationNotSupported`. Formats that + /// support rotation (like Parquet with file-based writers) should override. + /// + /// # Arguments + /// + /// * `path` - Path for the new output file + /// + /// # Errors + /// + /// Returns an error if rotation is not supported or if file operations fail. + fn rotate(self, _path: std::path::PathBuf) -> Result + where + Self: Sized, + { + Err(Error::RotationNotSupported) + } } #[cfg(test)] diff --git a/lading_capture/src/formats/parquet.rs b/lading_capture/src/formats/parquet.rs index fb0e40483..1febed746 100644 --- a/lading_capture/src/formats/parquet.rs +++ b/lading_capture/src/formats/parquet.rs @@ -153,6 +153,8 @@ pub struct Format { writer: ArrowWriter, /// Pre-computed Arrow schema schema: Arc, + /// Compression level for Zstd (stored for rotation) + compression_level: i32, } impl Format { @@ -192,6 +194,7 @@ impl Format { buffers: ColumnBuffers::new(), writer: arrow_writer, schema, + compression_level, }) } @@ -338,6 +341,36 @@ impl crate::formats::OutputFormat for Format { } } +impl Format> { + /// Rotate to a new output file + /// + /// Closes the current Parquet file (writing footer) and opens a new file + /// at the specified path with the same compression settings. + /// + /// # Errors + /// + /// Returns an error if closing the current file or creating the new file fails. + pub fn rotate_to(self, path: std::path::PathBuf) -> Result { + // Store compression level before closing + let compression_level = self.compression_level; + + // Close current file (writes footer) + self.close()?; + + // Create new file and writer + let file = std::fs::File::create(&path)?; + let writer = std::io::BufWriter::new(file); + let format = Self::new(writer, compression_level)?; + + Ok(format) + } + + /// Get the compression level for this format + pub fn compression_level(&self) -> i32 { + self.compression_level + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/lading_capture/src/manager.rs b/lading_capture/src/manager.rs index 2ca2a6c9f..66de09796 100644 --- a/lading_capture/src/manager.rs +++ b/lading_capture/src/manager.rs @@ -17,7 +17,7 @@ use std::{ }; use arc_swap::ArcSwap; -use tokio::{fs, sync::mpsc, time}; +use tokio::{fs, sync::mpsc, sync::oneshot, time}; use crate::{ accumulator, @@ -445,6 +445,27 @@ impl CaptureManager>, RealClock> } } +/// Request to rotate to a new output file +/// +/// Contains the path for the new file and a channel to send the result. +pub struct RotationRequest { + /// Path for the new output file + pub path: PathBuf, + /// Channel to send rotation result (Ok on success, Err on failure) + pub response: oneshot::Sender>, +} + +impl std::fmt::Debug for RotationRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RotationRequest") + .field("path", &self.path) + .finish_non_exhaustive() + } +} + +/// Handle for sending rotation requests to a running CaptureManager +pub type RotationSender = mpsc::Sender; + impl CaptureManager>, RealClock> { /// Create a new [`CaptureManager`] with file-based Parquet writer /// @@ -478,6 +499,166 @@ impl CaptureManager>, RealCloc RealClock::default(), )) } + + /// Run [`CaptureManager`] with file rotation support + /// + /// Similar to [`start`](CaptureManager::start), but also provides a channel + /// for rotation requests. When a rotation request is received, the current + /// Parquet file is finalized (footer written) and a new file is created at + /// the specified path. + /// + /// Returns a tuple of ([`RotationSender`], [`JoinHandle`](tokio::task::JoinHandle)) + /// immediately. The `RotationSender` can be used to trigger rotations while + /// the event loop runs. The `JoinHandle` can be awaited to ensure the + /// CaptureManager has fully drained and closed before shutdown. + /// + /// # Errors + /// + /// Returns an error if there is already a global recorder set. + #[allow(clippy::cast_possible_truncation)] + pub async fn start_with_rotation( + mut self, + ) -> Result<(RotationSender, tokio::task::JoinHandle<()>), Error> { + // Create rotation channel - return the sender immediately + let (rotation_tx, rotation_rx) = mpsc::channel::(4); + + // Initialize historical sender + HISTORICAL_SENDER.store(Arc::new(Some(Arc::new(Sender { + snd: self.snd.clone(), + })))); + + self.install()?; + info!("Capture manager installed with rotation support, recording to capture file."); + + // Wait until the target is running then mark time-zero + self.target_running.recv().await; + self.clock.mark_start(); + + let compression_level = self.format.compression_level(); + + // Run the event loop in a spawned task so we can return the sender immediately + let expiration = self.expiration; + let format = self.format; + let flush_seconds = self.flush_seconds; + let registry = self.registry; + let accumulator = self.accumulator; + let global_labels = self.global_labels; + let clock = self.clock; + let recv = self.recv; + let shutdown = self.shutdown.take().expect("shutdown watcher must be present"); + + let handle = tokio::spawn(async move { + if let Err(e) = Self::rotation_event_loop( + expiration, + format, + flush_seconds, + registry, + accumulator, + global_labels, + clock, + recv, + shutdown, + rotation_rx, + compression_level, + ) + .await + { + error!(error = %e, "CaptureManager rotation event loop error"); + } + }); + + Ok((rotation_tx, handle)) + } + + /// Internal event loop with rotation support + #[allow(clippy::too_many_arguments)] + async fn rotation_event_loop( + expiration: Duration, + format: formats::parquet::Format>, + flush_seconds: u64, + registry: Arc>, + accumulator: Accumulator, + global_labels: FxHashMap, + clock: RealClock, + mut recv: mpsc::Receiver, + shutdown: lading_signal::Watcher, + mut rotation_rx: mpsc::Receiver, + compression_level: i32, + ) -> Result<(), Error> { + let mut flush_interval = clock.interval(Duration::from_millis(TICK_DURATION_MS as u64)); + let shutdown_wait = shutdown.recv(); + tokio::pin!(shutdown_wait); + + // Create state machine with owned state + let mut state_machine = StateMachine::new( + expiration, + format, + flush_seconds, + registry, + accumulator, + global_labels, + clock, + ); + + // Event loop with rotation support + loop { + let event = tokio::select! { + val = recv.recv() => { + match val { + Some(metric) => Event::MetricReceived(metric), + None => Event::ChannelClosed, + } + } + () = flush_interval.tick() => Event::FlushTick, + Some(rotation_req) = rotation_rx.recv() => { + // Handle rotation inline since it's not a state machine event + let result = Self::handle_rotation( + &mut state_machine, + rotation_req.path, + compression_level, + ).await; + // Send result back to caller (ignore send error if receiver dropped) + let _ = rotation_req.response.send(result); + continue; + } + () = &mut shutdown_wait => Event::ShutdownSignaled, + }; + + match state_machine.next(event)? { + Operation::Continue => {} + Operation::Exit => return Ok(()), + } + } + } + + /// Handle a rotation request + async fn handle_rotation( + state_machine: &mut StateMachine< + formats::parquet::Format>, + RealClock, + >, + new_path: PathBuf, + compression_level: i32, + ) -> Result<(), formats::Error> { + // Create new file and format + let fp = fs::File::create(&new_path) + .await + .map_err(formats::Error::Io)?; + let fp = fp.into_std().await; + let writer = BufWriter::new(fp); + let new_format = parquet::Format::new(writer, compression_level)?; + + // Swap formats - this flushes any buffered data + let old_format = state_machine + .replace_format(new_format) + .map_err(|e| formats::Error::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?; + + // Close old format to write Parquet footer + old_format.close()?; + + info!(path = %new_path.display(), "Rotated to new capture file"); + Ok(()) + } } impl diff --git a/lading_capture/src/manager/state_machine.rs b/lading_capture/src/manager/state_machine.rs index 9d9a2c0fb..5836e2bc3 100644 --- a/lading_capture/src/manager/state_machine.rs +++ b/lading_capture/src/manager/state_machine.rs @@ -258,6 +258,39 @@ impl StateMachine { Ok(Operation::Exit) } + /// Replace the current format with a new one, returning the old format. + /// + /// This method flushes any buffered data before returning the old format. + /// The caller is responsible for closing the old format (to write any + /// footer/metadata) and providing a properly initialized new format. + /// + /// This enables file rotation: the caller can close the old format (writing + /// the Parquet footer), create a new file, and provide the new format. + /// + /// # Errors + /// + /// Returns an error if flushing the current format fails. + /// + /// # Panics + /// + /// Panics if called when no format is present (after shutdown). + pub(crate) fn replace_format(&mut self, new_format: F) -> Result { + // Flush any buffered data in the current format + self.format + .as_mut() + .expect("format must be present during operation") + .flush()?; + + // Swap in the new format and return the old one + let old_format = self + .format + .replace(new_format) + .expect("format must be present during operation"); + + info!("Format replaced for file rotation"); + Ok(old_format) + } + /// Convert an Instant timestamp to `Accumulator` logical tick time. #[inline] fn instant_to_tick(&self, timestamp: Instant) -> u64 {