diff --git a/lading_capture/src/formats.rs b/lading_capture/src/formats.rs index 571f09b88..416954d3d 100644 --- a/lading_capture/src/formats.rs +++ b/lading_capture/src/formats.rs @@ -24,6 +24,9 @@ pub enum Error { /// IO errors during write operations #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Rotation not supported for this format + #[error("File rotation not supported for this format")] + RotationNotSupported, } /// Trait for output format implementations @@ -63,6 +66,29 @@ pub trait OutputFormat { /// /// Returns an error if closing fails. fn close(self) -> Result<(), Error>; + + /// Rotate to a new output file + /// + /// Flushes and closes the current file (writing footer for Parquet), then + /// opens a new file at the specified path. This allows continuous metrics + /// collection while producing multiple readable output files. + /// + /// The default implementation returns `RotationNotSupported`. Formats that + /// support rotation (like Parquet with file-based writers) should override. + /// + /// # Arguments + /// + /// * `path` - Path for the new output file + /// + /// # Errors + /// + /// Returns an error if rotation is not supported or if file operations fail. + fn rotate(self, _path: std::path::PathBuf) -> Result + where + Self: Sized, + { + Err(Error::RotationNotSupported) + } } #[cfg(test)] diff --git a/lading_capture/src/formats/parquet.rs b/lading_capture/src/formats/parquet.rs index fb0e40483..1febed746 100644 --- a/lading_capture/src/formats/parquet.rs +++ b/lading_capture/src/formats/parquet.rs @@ -153,6 +153,8 @@ pub struct Format { writer: ArrowWriter, /// Pre-computed Arrow schema schema: Arc, + /// Compression level for Zstd (stored for rotation) + compression_level: i32, } impl Format { @@ -192,6 +194,7 @@ impl Format { buffers: ColumnBuffers::new(), writer: arrow_writer, schema, + compression_level, }) } @@ -338,6 +341,36 @@ impl crate::formats::OutputFormat for Format { } } +impl Format> { + /// Rotate to a new output file + /// + /// Closes the current Parquet file (writing footer) and opens a new file + /// at the specified path with the same compression settings. + /// + /// # Errors + /// + /// Returns an error if closing the current file or creating the new file fails. + pub fn rotate_to(self, path: std::path::PathBuf) -> Result { + // Store compression level before closing + let compression_level = self.compression_level; + + // Close current file (writes footer) + self.close()?; + + // Create new file and writer + let file = std::fs::File::create(&path)?; + let writer = std::io::BufWriter::new(file); + let format = Self::new(writer, compression_level)?; + + Ok(format) + } + + /// Get the compression level for this format + pub fn compression_level(&self) -> i32 { + self.compression_level + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/lading_capture/src/manager.rs b/lading_capture/src/manager.rs index 2ca2a6c9f..66de09796 100644 --- a/lading_capture/src/manager.rs +++ b/lading_capture/src/manager.rs @@ -17,7 +17,7 @@ use std::{ }; use arc_swap::ArcSwap; -use tokio::{fs, sync::mpsc, time}; +use tokio::{fs, sync::mpsc, sync::oneshot, time}; use crate::{ accumulator, @@ -445,6 +445,27 @@ impl CaptureManager>, RealClock> } } +/// Request to rotate to a new output file +/// +/// Contains the path for the new file and a channel to send the result. +pub struct RotationRequest { + /// Path for the new output file + pub path: PathBuf, + /// Channel to send rotation result (Ok on success, Err on failure) + pub response: oneshot::Sender>, +} + +impl std::fmt::Debug for RotationRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RotationRequest") + .field("path", &self.path) + .finish_non_exhaustive() + } +} + +/// Handle for sending rotation requests to a running CaptureManager +pub type RotationSender = mpsc::Sender; + impl CaptureManager>, RealClock> { /// Create a new [`CaptureManager`] with file-based Parquet writer /// @@ -478,6 +499,166 @@ impl CaptureManager>, RealCloc RealClock::default(), )) } + + /// Run [`CaptureManager`] with file rotation support + /// + /// Similar to [`start`](CaptureManager::start), but also provides a channel + /// for rotation requests. When a rotation request is received, the current + /// Parquet file is finalized (footer written) and a new file is created at + /// the specified path. + /// + /// Returns a tuple of ([`RotationSender`], [`JoinHandle`](tokio::task::JoinHandle)) + /// immediately. The `RotationSender` can be used to trigger rotations while + /// the event loop runs. The `JoinHandle` can be awaited to ensure the + /// CaptureManager has fully drained and closed before shutdown. + /// + /// # Errors + /// + /// Returns an error if there is already a global recorder set. + #[allow(clippy::cast_possible_truncation)] + pub async fn start_with_rotation( + mut self, + ) -> Result<(RotationSender, tokio::task::JoinHandle<()>), Error> { + // Create rotation channel - return the sender immediately + let (rotation_tx, rotation_rx) = mpsc::channel::(4); + + // Initialize historical sender + HISTORICAL_SENDER.store(Arc::new(Some(Arc::new(Sender { + snd: self.snd.clone(), + })))); + + self.install()?; + info!("Capture manager installed with rotation support, recording to capture file."); + + // Wait until the target is running then mark time-zero + self.target_running.recv().await; + self.clock.mark_start(); + + let compression_level = self.format.compression_level(); + + // Run the event loop in a spawned task so we can return the sender immediately + let expiration = self.expiration; + let format = self.format; + let flush_seconds = self.flush_seconds; + let registry = self.registry; + let accumulator = self.accumulator; + let global_labels = self.global_labels; + let clock = self.clock; + let recv = self.recv; + let shutdown = self.shutdown.take().expect("shutdown watcher must be present"); + + let handle = tokio::spawn(async move { + if let Err(e) = Self::rotation_event_loop( + expiration, + format, + flush_seconds, + registry, + accumulator, + global_labels, + clock, + recv, + shutdown, + rotation_rx, + compression_level, + ) + .await + { + error!(error = %e, "CaptureManager rotation event loop error"); + } + }); + + Ok((rotation_tx, handle)) + } + + /// Internal event loop with rotation support + #[allow(clippy::too_many_arguments)] + async fn rotation_event_loop( + expiration: Duration, + format: formats::parquet::Format>, + flush_seconds: u64, + registry: Arc>, + accumulator: Accumulator, + global_labels: FxHashMap, + clock: RealClock, + mut recv: mpsc::Receiver, + shutdown: lading_signal::Watcher, + mut rotation_rx: mpsc::Receiver, + compression_level: i32, + ) -> Result<(), Error> { + let mut flush_interval = clock.interval(Duration::from_millis(TICK_DURATION_MS as u64)); + let shutdown_wait = shutdown.recv(); + tokio::pin!(shutdown_wait); + + // Create state machine with owned state + let mut state_machine = StateMachine::new( + expiration, + format, + flush_seconds, + registry, + accumulator, + global_labels, + clock, + ); + + // Event loop with rotation support + loop { + let event = tokio::select! { + val = recv.recv() => { + match val { + Some(metric) => Event::MetricReceived(metric), + None => Event::ChannelClosed, + } + } + () = flush_interval.tick() => Event::FlushTick, + Some(rotation_req) = rotation_rx.recv() => { + // Handle rotation inline since it's not a state machine event + let result = Self::handle_rotation( + &mut state_machine, + rotation_req.path, + compression_level, + ).await; + // Send result back to caller (ignore send error if receiver dropped) + let _ = rotation_req.response.send(result); + continue; + } + () = &mut shutdown_wait => Event::ShutdownSignaled, + }; + + match state_machine.next(event)? { + Operation::Continue => {} + Operation::Exit => return Ok(()), + } + } + } + + /// Handle a rotation request + async fn handle_rotation( + state_machine: &mut StateMachine< + formats::parquet::Format>, + RealClock, + >, + new_path: PathBuf, + compression_level: i32, + ) -> Result<(), formats::Error> { + // Create new file and format + let fp = fs::File::create(&new_path) + .await + .map_err(formats::Error::Io)?; + let fp = fp.into_std().await; + let writer = BufWriter::new(fp); + let new_format = parquet::Format::new(writer, compression_level)?; + + // Swap formats - this flushes any buffered data + let old_format = state_machine + .replace_format(new_format) + .map_err(|e| formats::Error::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?; + + // Close old format to write Parquet footer + old_format.close()?; + + info!(path = %new_path.display(), "Rotated to new capture file"); + Ok(()) + } } impl diff --git a/lading_capture/src/manager/state_machine.rs b/lading_capture/src/manager/state_machine.rs index 9d9a2c0fb..5836e2bc3 100644 --- a/lading_capture/src/manager/state_machine.rs +++ b/lading_capture/src/manager/state_machine.rs @@ -258,6 +258,39 @@ impl StateMachine { Ok(Operation::Exit) } + /// Replace the current format with a new one, returning the old format. + /// + /// This method flushes any buffered data before returning the old format. + /// The caller is responsible for closing the old format (to write any + /// footer/metadata) and providing a properly initialized new format. + /// + /// This enables file rotation: the caller can close the old format (writing + /// the Parquet footer), create a new file, and provide the new format. + /// + /// # Errors + /// + /// Returns an error if flushing the current format fails. + /// + /// # Panics + /// + /// Panics if called when no format is present (after shutdown). + pub(crate) fn replace_format(&mut self, new_format: F) -> Result { + // Flush any buffered data in the current format + self.format + .as_mut() + .expect("format must be present during operation") + .flush()?; + + // Swap in the new format and return the old one + let old_format = self + .format + .replace(new_format) + .expect("format must be present during operation"); + + info!("Format replaced for file rotation"); + Ok(old_format) + } + /// Convert an Instant timestamp to `Accumulator` logical tick time. #[inline] fn instant_to_tick(&self, timestamp: Instant) -> u64 {