From ff9d7fda702b58a5c67dfc73ca18a10e651a59e1 Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Wed, 31 Dec 2025 15:06:17 -0500 Subject: [PATCH 1/2] feat(capture): flatten labels to top-level l_* columns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace MapArray-based label storage with flat l_ columns in Parquet output. This enables predicate pushdown for filtering by container_id and other labels, avoiding full file scans. Key changes: - Dynamic schema generation based on discovered label keys - Dictionary encoding for low-cardinality label columns - Lazy ArrowWriter initialization (schema determined at first flush) - Updated validation and round-trip tests for new schema 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- lading_capture/src/formats.rs | 45 +-- lading_capture/src/formats/parquet.rs | 404 +++++++++++++++++++------ lading_capture/src/manager.rs | 14 +- lading_capture/src/validate/parquet.rs | 83 +++-- 4 files changed, 382 insertions(+), 164 deletions(-) diff --git a/lading_capture/src/formats.rs b/lading_capture/src/formats.rs index 416954d3d..3585ef5e7 100644 --- a/lading_capture/src/formats.rs +++ b/lading_capture/src/formats.rs @@ -97,8 +97,7 @@ mod tests { use crate::line::{Line, LineValue, MetricKind}; use approx::relative_eq; use arrow_array::{ - Array, BinaryArray, Float64Array, MapArray, StringArray, StructArray, - TimestampMillisecondArray, UInt64Array, + Array, BinaryArray, Float64Array, StringArray, TimestampMillisecondArray, UInt64Array, }; use bytes::Bytes; use datadog_protos::metrics::Dogsketch; @@ -487,12 +486,23 @@ mod tests { .downcast_ref::() .expect("value_float is Float64Array"); - let labels_array = batch - .column_by_name("labels") - .expect("labels column") - .as_any() - .downcast_ref::() - .expect("labels is MapArray"); + // Collect l_* columns for label extraction (new schema uses flat columns) + let schema = batch.schema(); + let l_columns: Vec<(&str, &StringArray)> = schema + .fields() + .iter() + .filter_map(|field| { + let name = field.name(); + if name.starts_with("l_") { + batch + .column_by_name(name) + .and_then(|c| c.as_any().downcast_ref::()) + .map(|arr| (name.strip_prefix("l_").unwrap_or(name), arr)) + } else { + None + } + }) + .collect(); let value_histogram_array = batch .column_by_name("value_histogram") @@ -524,21 +534,12 @@ mod tests { LineValue::Int(value_int_array.value(row_idx)) }; - let labels_slice: StructArray = labels_array.value(row_idx); - let keys = labels_slice - .column(0) - .as_any() - .downcast_ref::() - .expect("label keys are strings"); - let values = labels_slice - .column(1) - .as_any() - .downcast_ref::() - .expect("label values are strings"); - + // Extract labels from l_* columns let mut labels = FxHashMap::default(); - for i in 0..keys.len() { - labels.insert(keys.value(i).to_string(), values.value(i).to_string()); + for (key, arr) in &l_columns { + if !arr.is_null(row_idx) { + labels.insert((*key).to_string(), arr.value(row_idx).to_string()); + } } let value_histogram = if value_histogram_array.is_null(row_idx) { diff --git a/lading_capture/src/formats/parquet.rs b/lading_capture/src/formats/parquet.rs index 1febed746..6db3bd38b 100644 --- a/lading_capture/src/formats/parquet.rs +++ b/lading_capture/src/formats/parquet.rs @@ -3,19 +3,27 @@ //! This format buffers metrics in memory and periodically writes them as //! Parquet row groups, providing better compression and query performance than //! JSONL. +//! +//! Labels are stored as top-level columns with `l_` prefix (e.g., `l_container_id`, +//! `l_namespace`). This enables Parquet predicate pushdown for efficient filtering +//! by any label key, unlike the previous `MapArray` approach which required post-read +//! filtering. +//! +//! Schema is determined dynamically at first flush based on label keys discovered +//! in the buffered data. All label columns are nullable Utf8 strings. use std::{ - io::{Seek, Write}, + collections::{BTreeMap, BTreeSet}, + fs::File, + io::{BufWriter, Seek, Write}, sync::Arc, }; use arrow_array::{ - ArrayRef, BinaryArray, Float64Array, MapArray, RecordBatch, StringArray, StructArray, - TimestampMillisecondArray, UInt64Array, + ArrayRef, BinaryArray, Float64Array, RecordBatch, StringArray, TimestampMillisecondArray, + UInt64Array, }; -use arrow_buffer::OffsetBuffer; -use arrow_schema::{ArrowError, DataType, Field, Fields, Schema}; -use lading_capture_schema::{capture_schema, columns}; +use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit}; use parquet::{ arrow::ArrowWriter, basic::{Compression, ZstdLevel}, @@ -44,6 +52,9 @@ pub enum Error { /// Holds reusable allocations for all columns to avoid repeated allocation /// during flush operations. Buffers are cleared after each write, preserving /// capacity for the next batch. +/// +/// Labels are stored per-row as a map, with all unique keys tracked separately +/// to enable dynamic schema generation with `l_` columns. #[derive(Debug)] struct ColumnBuffers { run_ids: Vec, @@ -53,9 +64,12 @@ struct ColumnBuffers { metric_kinds: Vec<&'static str>, values_int: Vec>, values_float: Vec>, - label_keys: Vec, - label_values: Vec, - label_offsets: Vec, + /// Per-row label maps. Each entry contains the labels for one metric row. + /// Using `BTreeMap` for consistent ordering when iterating. + row_labels: Vec>, + /// All unique label keys seen in this batch. Used to generate schema. + /// `BTreeSet` ensures consistent column ordering across runs. + unique_label_keys: BTreeSet, values_histogram: Vec>, } @@ -72,9 +86,8 @@ impl ColumnBuffers { metric_kinds: Vec::with_capacity(INITIAL_CAPACITY), values_int: Vec::with_capacity(INITIAL_CAPACITY), values_float: Vec::with_capacity(INITIAL_CAPACITY), - label_keys: Vec::with_capacity(INITIAL_CAPACITY * 2), - label_values: Vec::with_capacity(INITIAL_CAPACITY * 2), - label_offsets: Vec::with_capacity(INITIAL_CAPACITY + 1), + row_labels: Vec::with_capacity(INITIAL_CAPACITY), + unique_label_keys: BTreeSet::new(), values_histogram: Vec::with_capacity(INITIAL_CAPACITY), } } @@ -88,9 +101,9 @@ impl ColumnBuffers { self.metric_kinds.clear(); self.values_int.clear(); self.values_float.clear(); - self.label_keys.clear(); - self.label_values.clear(); - self.label_offsets.clear(); + self.row_labels.clear(); + // Note: unique_label_keys is NOT cleared - it accumulates across flushes + // to maintain consistent schema within a file self.values_histogram.clear(); } @@ -118,13 +131,13 @@ impl ColumnBuffers { } } - // Add labels for this row + // Store labels for this row and track unique keys + let mut row_map = BTreeMap::new(); for (k, v) in &line.labels { - self.label_keys.push(k.clone()); - self.label_values.push(v.clone()); + self.unique_label_keys.insert(k.clone()); + row_map.insert(k.clone(), v.clone()); } - #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)] - self.label_offsets.push(self.label_keys.len() as i32); + self.row_labels.push(row_map); self.values_histogram.push(line.value_histogram.clone()); } @@ -145,18 +158,29 @@ impl ColumnBuffers { /// /// Buffers metrics in memory. Calling `flush()` writes accumulated metrics as a /// Parquet row group. +/// +/// The schema is determined dynamically at first flush based on label keys +/// discovered in the buffered data. Label columns use the `l_` prefix +/// (e.g., `l_container_id`). #[derive(Debug)] pub struct Format { /// Reusable column buffers for building Arrow arrays buffers: ColumnBuffers, - /// Parquet writer - writer: ArrowWriter, - /// Pre-computed Arrow schema - schema: Arc, + /// Parquet writer - created lazily on first flush when schema is known + writer: Option>, + /// The underlying writer, stored until `ArrowWriter` is created + raw_writer: Option, + /// Arrow schema - created on first flush based on discovered label keys + schema: Option>, + /// Ordered list of label keys in schema (for consistent column ordering) + schema_label_keys: Vec, /// Compression level for Zstd (stored for rotation) compression_level: i32, } +/// Label column prefix for flattened labels +const LABEL_COLUMN_PREFIX: &str = "l_"; + impl Format { /// Create a new Parquet format writer /// @@ -167,10 +191,61 @@ impl Format { /// /// # Errors /// - /// Returns error if Arrow writer creation fails + /// Returns error if compression level is invalid pub fn new(writer: W, compression_level: i32) -> Result { - let schema = Arc::new(capture_schema()); + // Validate compression level early + let _ = ZstdLevel::try_new(compression_level)?; + + Ok(Self { + buffers: ColumnBuffers::new(), + writer: None, + raw_writer: Some(writer), + schema: None, + schema_label_keys: Vec::new(), + compression_level, + }) + } + + /// Generate schema based on discovered label keys + /// + /// Creates base columns plus `l_` columns for each unique label key. + /// Label columns are nullable Utf8 strings, sorted alphabetically for + /// consistent ordering. + fn generate_schema(label_keys: &BTreeSet) -> (Arc, Vec) { + let mut fields = vec![ + Field::new("run_id", DataType::Utf8, false), + Field::new( + "time", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("fetch_index", DataType::UInt64, false), + Field::new("metric_name", DataType::Utf8, false), + Field::new("metric_kind", DataType::Utf8, false), + Field::new("value_int", DataType::UInt64, true), + Field::new("value_float", DataType::Float64, true), + ]; + + // Add l_ columns for each label key (sorted by BTreeSet) + let ordered_keys: Vec = label_keys.iter().cloned().collect(); + for key in &ordered_keys { + fields.push(Field::new( + format!("{LABEL_COLUMN_PREFIX}{key}"), + DataType::Utf8, + true, // nullable - not all rows have all labels + )); + } + + fields.push(Field::new("value_histogram", DataType::Binary, true)); + (Arc::new(Schema::new(fields)), ordered_keys) + } + + /// Create writer properties with dictionary encoding for appropriate columns + fn create_writer_properties( + compression_level: i32, + label_keys: &[String], + ) -> Result { // Use Parquet v2 format for better encodings and compression: // // - DELTA_BINARY_PACKED encoding for integers (timestamps, fetch_index) @@ -179,23 +254,45 @@ impl Format { // // Dictionary encoding for low-cardinality columns: // - // - metric_kind: only 2 values ("counter", "gauge") + // - metric_kind: only 3 values ("counter", "gauge", "histogram") // - run_id: one UUID per run - let props = WriterProperties::builder() + // - label columns: often low cardinality (container_id, namespace, etc.) + let mut builder = WriterProperties::builder() .set_writer_version(WriterVersion::PARQUET_2_0) .set_compression(Compression::ZSTD(ZstdLevel::try_new(compression_level)?)) - .set_column_dictionary_enabled(ColumnPath::from(columns::METRIC_KIND), true) - .set_column_dictionary_enabled(ColumnPath::from(columns::RUN_ID), true) - .build(); + .set_column_dictionary_enabled(ColumnPath::from("metric_kind"), true) + .set_column_dictionary_enabled(ColumnPath::from("run_id"), true); + + // Enable dictionary encoding for all label columns + for key in label_keys { + builder = builder.set_column_dictionary_enabled( + ColumnPath::from(format!("{LABEL_COLUMN_PREFIX}{key}")), + true, + ); + } - let arrow_writer = ArrowWriter::try_new(writer, schema.clone(), Some(props))?; + Ok(builder.build()) + } - Ok(Self { - buffers: ColumnBuffers::new(), - writer: arrow_writer, - schema, - compression_level, - }) + /// Initialize the writer with schema based on discovered label keys + /// + /// Called on first flush when we know what label keys exist. + fn initialize_writer(&mut self) -> Result<(), Error> { + let raw_writer = self + .raw_writer + .take() + .expect("raw_writer should be present before initialization"); + + let (schema, ordered_keys) = Self::generate_schema(&self.buffers.unique_label_keys); + let props = Self::create_writer_properties(self.compression_level, &ordered_keys)?; + + let arrow_writer = ArrowWriter::try_new(raw_writer, schema.clone(), Some(props))?; + + self.schema = Some(schema); + self.schema_label_keys = ordered_keys; + self.writer = Some(arrow_writer); + + Ok(()) } /// Convert buffered data to Arrow `RecordBatch` @@ -204,43 +301,19 @@ impl Format { /// /// Returns error if `RecordBatch` construction fails fn buffers_to_record_batch(&self) -> Result { + let schema = self + .schema + .as_ref() + .expect("schema should be initialized before creating record batch"); + if self.buffers.is_empty() { - return Ok(RecordBatch::new_empty(self.schema.clone())); + return Ok(RecordBatch::new_empty(schema.clone())); } - // Prepare label offsets with initial 0 - let mut label_offsets = Vec::with_capacity(self.buffers.label_offsets.len() + 1); - label_offsets.push(0i32); - label_offsets.extend_from_slice(&self.buffers.label_offsets); - - // Build the labels map array using pre-allocated buffers - let keys_array = Arc::new(StringArray::from(self.buffers.label_keys.clone())); - let values_array = Arc::new(StringArray::from(self.buffers.label_values.clone())); - let struct_array = StructArray::from(vec![ - ( - Arc::new(Field::new(columns::LABEL_KEY, DataType::Utf8, false)), - keys_array as ArrayRef, - ), - ( - Arc::new(Field::new(columns::LABEL_VALUE, DataType::Utf8, false)), - values_array as ArrayRef, - ), - ]); - - let field = Arc::new(Field::new( - columns::LABEL_ENTRIES, - DataType::Struct(Fields::from(vec![ - Field::new(columns::LABEL_KEY, DataType::Utf8, false), - Field::new(columns::LABEL_VALUE, DataType::Utf8, false), - ])), - false, - )); - - let offsets = OffsetBuffer::new(label_offsets.into()); - let labels_map = MapArray::new(field, offsets, struct_array, None, false); + let num_rows = self.buffers.run_ids.len(); - // Build arrays directly from pre-allocated buffers - let arrays: Vec = vec![ + // Build base column arrays + let mut arrays: Vec = vec![ Arc::new(StringArray::from(self.buffers.run_ids.clone())), Arc::new(TimestampMillisecondArray::from(self.buffers.times.clone())), Arc::new(UInt64Array::from(self.buffers.fetch_indices.clone())), @@ -248,23 +321,47 @@ impl Format { Arc::new(StringArray::from(self.buffers.metric_kinds.clone())), Arc::new(UInt64Array::from(self.buffers.values_int.clone())), Arc::new(Float64Array::from(self.buffers.values_float.clone())), - Arc::new(labels_map), - Arc::new(BinaryArray::from_opt_vec( - self.buffers - .values_histogram - .iter() - .map(|v| { - if v.is_empty() { - None - } else { - Some(v.as_slice()) - } - }) - .collect(), - )), ]; - Ok(RecordBatch::try_new(self.schema.clone(), arrays)?) + // Build l_ columns for each label key in schema order + for key in &self.schema_label_keys { + let values: Vec> = self + .buffers + .row_labels + .iter() + .map(|row_map| row_map.get(key).map(String::as_str)) + .collect(); + arrays.push(Arc::new(StringArray::from(values))); + } + + // Add histogram column last + arrays.push(Arc::new( + self.buffers + .values_histogram + .iter() + .map(|v| { + if v.is_empty() { + None + } else { + Some(v.as_slice()) + } + }) + .collect::(), + )); + + debug_assert_eq!( + arrays.len(), + schema.fields().len(), + "array count ({}) must match schema field count ({})", + arrays.len(), + schema.fields().len() + ); + debug_assert!( + arrays.iter().all(|a| a.len() == num_rows), + "all arrays must have {num_rows} rows", + ); + + Ok(RecordBatch::try_new(schema.clone(), arrays)?) } /// Write buffered metrics as a Parquet row group @@ -277,8 +374,16 @@ impl Format { return Ok(()); } + // Initialize writer on first flush when we know the label keys + if self.writer.is_none() { + self.initialize_writer()?; + } + let batch = self.buffers_to_record_batch()?; - self.writer.write(&batch)?; + self.writer + .as_mut() + .expect("writer should be initialized") + .write(&batch)?; self.buffers.clear(); Ok(()) } @@ -321,8 +426,12 @@ impl Format { pub fn close(mut self) -> Result<(), Error> { // Write any remaining buffered data as a final row group self.write_parquet()?; - // Close the ArrowWriter which consumes it - self.writer.close()?; + + // Close the ArrowWriter if it was created + if let Some(writer) = self.writer { + writer.close()?; + } + // If writer was never created (no data written), nothing to close Ok(()) } } @@ -350,7 +459,7 @@ impl Format> { /// # Errors /// /// Returns an error if closing the current file or creating the new file fails. - pub fn rotate_to(self, path: std::path::PathBuf) -> Result { + pub fn rotate_to(self, path: &std::path::Path) -> Result { // Store compression level before closing let compression_level = self.compression_level; @@ -358,14 +467,15 @@ impl Format> { self.close()?; // Create new file and writer - let file = std::fs::File::create(&path)?; - let writer = std::io::BufWriter::new(file); + let file = File::create(path)?; + let writer = BufWriter::new(file); let format = Self::new(writer, compression_level)?; Ok(format) } /// Get the compression level for this format + #[must_use] pub fn compression_level(&self) -> i32 { self.compression_level } @@ -462,4 +572,114 @@ mod tests { assert!(!buffer.get_ref().is_empty(), "should have written data"); } + + #[test] + fn writes_label_columns() { + use arrow_array::{Array, RecordBatchReader}; + use bytes::Bytes; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + + let mut buffer = Cursor::new(Vec::new()); + + { + let mut format = Format::new(&mut buffer, 3).expect("create format"); + + // Write metric with labels + let mut labels = FxHashMap::default(); + labels.insert("container_id".to_string(), "abc123".to_string()); + labels.insert("namespace".to_string(), "default".to_string()); + labels.insert("qos_class".to_string(), "Guaranteed".to_string()); + + let line = Line { + run_id: Uuid::new_v4(), + time: 1000, + fetch_index: 0, + metric_name: "test_metric".into(), + metric_kind: MetricKind::Gauge, + value: LineValue::Float(42.0), + labels, + value_histogram: Vec::new(), + }; + + format.write_metric(&line).expect("write should succeed"); + + // Write another metric with different labels + let mut labels2 = FxHashMap::default(); + labels2.insert("container_id".to_string(), "def456".to_string()); + labels2.insert("namespace".to_string(), "kube-system".to_string()); + // Note: no qos_class label + + let line2 = Line { + run_id: Uuid::new_v4(), + time: 2000, + fetch_index: 1, + metric_name: "test_metric".into(), + metric_kind: MetricKind::Gauge, + value: LineValue::Float(100.0), + labels: labels2, + value_histogram: Vec::new(), + }; + + format.write_metric(&line2).expect("write should succeed"); + format.close().expect("close should succeed"); + } + + // Read back and verify schema has l_* columns + let data = Bytes::from(buffer.into_inner()); + let reader = ParquetRecordBatchReaderBuilder::try_new(data) + .expect("create reader") + .build() + .expect("build reader"); + + let schema = reader.schema(); + + // Check that l_* columns exist (sorted alphabetically) + assert!( + schema.field_with_name("l_container_id").is_ok(), + "should have l_container_id column" + ); + assert!( + schema.field_with_name("l_namespace").is_ok(), + "should have l_namespace column" + ); + assert!( + schema.field_with_name("l_qos_class").is_ok(), + "should have l_qos_class column" + ); + + // Check no labels MapArray column + assert!( + schema.field_with_name("labels").is_err(), + "should NOT have labels column (replaced by l_* columns)" + ); + + // Read data and verify values + let batches: Vec<_> = reader.into_iter().collect(); + assert_eq!(batches.len(), 1, "should have one batch"); + let batch = batches[0].as_ref().expect("batch should be ok"); + + assert_eq!(batch.num_rows(), 2, "should have 2 rows"); + + // Check l_container_id values + let container_col = batch + .column_by_name("l_container_id") + .expect("l_container_id column"); + let container_arr = container_col + .as_any() + .downcast_ref::() + .expect("string array"); + assert_eq!(container_arr.value(0), "abc123"); + assert_eq!(container_arr.value(1), "def456"); + + // Check l_qos_class values (second row should be null) + let qos_col = batch + .column_by_name("l_qos_class") + .expect("l_qos_class column"); + let qos_arr = qos_col + .as_any() + .downcast_ref::() + .expect("string array"); + assert_eq!(qos_arr.value(0), "Guaranteed"); + assert!(qos_arr.is_null(1), "second row should have null qos_class"); + } } diff --git a/lading_capture/src/manager.rs b/lading_capture/src/manager.rs index 66de09796..4f22274aa 100644 --- a/lading_capture/src/manager.rs +++ b/lading_capture/src/manager.rs @@ -463,7 +463,7 @@ impl std::fmt::Debug for RotationRequest { } } -/// Handle for sending rotation requests to a running CaptureManager +/// Handle for sending rotation requests to a running [`CaptureManager`] pub type RotationSender = mpsc::Sender; impl CaptureManager>, RealClock> { @@ -512,6 +512,10 @@ impl CaptureManager>, RealCloc /// the event loop runs. The `JoinHandle` can be awaited to ensure the /// CaptureManager has fully drained and closed before shutdown. /// + /// # Panics + /// + /// Panics if the shutdown watcher is missing (should never happen in normal use). + /// /// # Errors /// /// Returns an error if there is already a global recorder set. @@ -545,7 +549,10 @@ impl CaptureManager>, RealCloc let global_labels = self.global_labels; let clock = self.clock; let recv = self.recv; - let shutdown = self.shutdown.take().expect("shutdown watcher must be present"); + let shutdown = self + .shutdown + .take() + .expect("shutdown watcher must be present"); let handle = tokio::spawn(async move { if let Err(e) = Self::rotation_event_loop( @@ -572,6 +579,7 @@ impl CaptureManager>, RealCloc /// Internal event loop with rotation support #[allow(clippy::too_many_arguments)] + #[allow(clippy::cast_possible_truncation)] async fn rotation_event_loop( expiration: Duration, format: formats::parquet::Format>, @@ -651,7 +659,7 @@ impl CaptureManager>, RealCloc // Swap formats - this flushes any buffered data let old_format = state_machine .replace_format(new_format) - .map_err(|e| formats::Error::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?; + .map_err(|e| formats::Error::Io(io::Error::other(e.to_string())))?; // Close old format to write Parquet footer old_format.close()?; diff --git a/lading_capture/src/validate/parquet.rs b/lading_capture/src/validate/parquet.rs index a2eb49aae..d46fbf980 100644 --- a/lading_capture/src/validate/parquet.rs +++ b/lading_capture/src/validate/parquet.rs @@ -10,10 +10,7 @@ use std::fs::File; use std::hash::{BuildHasher, Hasher}; use std::path::Path; -use arrow_array::{ - Array, MapArray, StringArray, StructArray, TimestampMillisecondArray, UInt64Array, -}; -use lading_capture_schema::columns; +use arrow_array::{Array, StringArray, TimestampMillisecondArray, UInt64Array}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use crate::validate::ValidationResult; @@ -95,51 +92,57 @@ pub fn validate_parquet>( } let time_array = batch - .column_by_name(columns::TIME) - .ok_or_else(|| Error::MissingColumn(columns::TIME.to_string()))? + .column_by_name("time") + .ok_or_else(|| Error::MissingColumn("time".to_string()))? .as_any() .downcast_ref::() .ok_or_else(|| { - Error::InvalidColumnType(format!( - "'{}' column is not TimestampMillisecond", - columns::TIME - )) + Error::InvalidColumnType("'time' column is not TimestampMillisecond".to_string()) })?; let fetch_index_array = batch - .column_by_name(columns::FETCH_INDEX) - .ok_or_else(|| Error::MissingColumn(columns::FETCH_INDEX.to_string()))? + .column_by_name("fetch_index") + .ok_or_else(|| Error::MissingColumn("fetch_index".to_string()))? .as_any() .downcast_ref::() .ok_or_else(|| { - Error::InvalidColumnType(format!("'{}' column is not UInt64", columns::FETCH_INDEX)) + Error::InvalidColumnType("'fetch_index' column is not UInt64".to_string()) })?; let metric_name_array = batch - .column_by_name(columns::METRIC_NAME) - .ok_or_else(|| Error::MissingColumn(columns::METRIC_NAME.to_string()))? + .column_by_name("metric_name") + .ok_or_else(|| Error::MissingColumn("metric_name".to_string()))? .as_any() .downcast_ref::() .ok_or_else(|| { - Error::InvalidColumnType(format!("'{}' column is not String", columns::METRIC_NAME)) + Error::InvalidColumnType("'metric_name' column is not String".to_string()) })?; - let labels_array = batch - .column_by_name(columns::LABELS) - .ok_or_else(|| Error::MissingColumn(columns::LABELS.to_string()))? - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::InvalidColumnType(format!("'{}' column is not Map", columns::LABELS)) - })?; + // Collect l_* columns for label extraction (new schema uses flat columns) + let schema = batch.schema(); + let l_columns: Vec<(&str, &StringArray)> = schema + .fields() + .iter() + .filter_map(|field| { + let name = field.name(); + if name.starts_with("l_") { + batch + .column_by_name(name) + .and_then(|c| c.as_any().downcast_ref::()) + .map(|arr| (name.strip_prefix("l_").unwrap_or(name), arr)) + } else { + None + } + }) + .collect(); let metric_kind_array = batch - .column_by_name(columns::METRIC_KIND) - .ok_or_else(|| Error::MissingColumn(columns::METRIC_KIND.to_string()))? + .column_by_name("metric_kind") + .ok_or_else(|| Error::MissingColumn("metric_kind".to_string()))? .as_any() .downcast_ref::() .ok_or_else(|| { - Error::InvalidColumnType(format!("'{}' column is not String", columns::METRIC_KIND)) + Error::InvalidColumnType("'metric_kind' column is not String".to_string()) })?; // Validate invariants: fetch_index uniquely maps to time, @@ -178,27 +181,13 @@ pub fn validate_parquet>( fetch_index_to_time.insert(fetch_index, time); } - let labels_slice: StructArray = labels_array.value(row); - let key_array = labels_slice - .column(0) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::InvalidColumnType("Labels keys are not StringArray".to_string()) - })?; - let value_array = labels_slice - .column(1) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::InvalidColumnType("Labels values are not StringArray".to_string()) - })?; - + // Extract labels from l_* columns let mut sorted_labels: BTreeSet = BTreeSet::new(); - for i in 0..key_array.len() { - let key = key_array.value(i); - let value = value_array.value(i); - sorted_labels.insert(format!("{key}:{value}")); + for (key, arr) in &l_columns { + if !arr.is_null(row) { + let value = arr.value(row); + sorted_labels.insert(format!("{key}:{value}")); + } } let mut hasher = hash_builder.build_hasher(); From 410a2dc527854033244187fdda8e65d28863faa6 Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Fri, 2 Jan 2026 15:39:08 -0500 Subject: [PATCH 2/2] Add bloom filter configuration API for parquet format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add BloomFilterConfig and BloomFilterColumn types to configure bloom filters on label columns. Bloom filters enable efficient query-time filtering by allowing readers to skip row groups that definitely don't contain a target value. New APIs: - Format::with_bloom_filter() - create writer with bloom filter config - format.bloom_filter_config() - getter for rotation - CaptureManager::new_parquet_with_bloom_filter() - CaptureManager::new_multi_with_bloom_filter() Backwards compatible - existing Format::new() and new_parquet() still work unchanged using BloomFilterConfig::default(). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- lading_capture/src/formats/parquet.rs | 101 ++++++++++++++++++++++++-- lading_capture/src/manager.rs | 97 ++++++++++++++++++++++++- 2 files changed, 189 insertions(+), 9 deletions(-) diff --git a/lading_capture/src/formats/parquet.rs b/lading_capture/src/formats/parquet.rs index 6db3bd38b..5cc652771 100644 --- a/lading_capture/src/formats/parquet.rs +++ b/lading_capture/src/formats/parquet.rs @@ -33,6 +33,52 @@ use parquet::{ use crate::line; +/// Configuration for a single bloom filter column +#[derive(Debug, Clone)] +pub struct BloomFilterColumn { + /// Label name (e.g., `container_id` will be applied to `l_container_id` column) + pub label_name: String, + /// Expected number of distinct values (NDV) for sizing the bloom filter. + /// Higher values create larger filters with lower false positive rates. + pub ndv: u64, +} + +impl BloomFilterColumn { + /// Create a new bloom filter column configuration + #[must_use] + pub fn new(label_name: impl Into, ndv: u64) -> Self { + Self { + label_name: label_name.into(), + ndv, + } + } +} + +/// Configuration for bloom filters on label columns +/// +/// Bloom filters enable efficient query-time filtering by allowing readers +/// to skip row groups that definitely don't contain a target value. +/// This is especially useful for high-cardinality label columns like +/// `container_id` where queries often filter for a specific value. +/// +/// # Example +/// +/// ``` +/// use lading_capture::formats::parquet::{BloomFilterConfig, BloomFilterColumn}; +/// +/// let config = BloomFilterConfig { +/// columns: vec![ +/// BloomFilterColumn::new("container_id", 100), +/// ], +/// }; +/// ``` +#[derive(Debug, Clone, Default)] +pub struct BloomFilterConfig { + /// Label columns to enable bloom filters on. + /// Each column specifies the label name (without `l_` prefix) and expected NDV. + pub columns: Vec, +} + /// Parquet format errors #[derive(thiserror::Error, Debug)] pub enum Error { @@ -176,6 +222,8 @@ pub struct Format { schema_label_keys: Vec, /// Compression level for Zstd (stored for rotation) compression_level: i32, + /// Bloom filter configuration (stored for rotation) + bloom_filter_config: BloomFilterConfig, } /// Label column prefix for flattened labels @@ -193,6 +241,25 @@ impl Format { /// /// Returns error if compression level is invalid pub fn new(writer: W, compression_level: i32) -> Result { + Self::with_bloom_filter(writer, compression_level, BloomFilterConfig::default()) + } + + /// Create a new Parquet format writer with bloom filter configuration + /// + /// # Arguments + /// + /// * `writer` - Writer implementing Write + Seek for Parquet output + /// * `compression_level` - Zstd compression level (1-22) + /// * `bloom_filter_config` - Configuration for bloom filters on label columns + /// + /// # Errors + /// + /// Returns error if compression level is invalid + pub fn with_bloom_filter( + writer: W, + compression_level: i32, + bloom_filter_config: BloomFilterConfig, + ) -> Result { // Validate compression level early let _ = ZstdLevel::try_new(compression_level)?; @@ -203,6 +270,7 @@ impl Format { schema: None, schema_label_keys: Vec::new(), compression_level, + bloom_filter_config, }) } @@ -245,6 +313,7 @@ impl Format { fn create_writer_properties( compression_level: i32, label_keys: &[String], + bloom_filter_config: &BloomFilterConfig, ) -> Result { // Use Parquet v2 format for better encodings and compression: // @@ -271,6 +340,17 @@ impl Format { ); } + // Enable bloom filters for configured label columns + // Bloom filters allow readers to skip row groups that definitely don't + // contain the target value, improving query performance significantly. + for bloom_col in &bloom_filter_config.columns { + let column_name = format!("{LABEL_COLUMN_PREFIX}{}", bloom_col.label_name); + let column_path = ColumnPath::from(column_name); + + builder = builder.set_column_bloom_filter_enabled(column_path.clone(), true); + builder = builder.set_column_bloom_filter_ndv(column_path, bloom_col.ndv); + } + Ok(builder.build()) } @@ -284,7 +364,11 @@ impl Format { .expect("raw_writer should be present before initialization"); let (schema, ordered_keys) = Self::generate_schema(&self.buffers.unique_label_keys); - let props = Self::create_writer_properties(self.compression_level, &ordered_keys)?; + let props = Self::create_writer_properties( + self.compression_level, + &ordered_keys, + &self.bloom_filter_config, + )?; let arrow_writer = ArrowWriter::try_new(raw_writer, schema.clone(), Some(props))?; @@ -454,22 +538,23 @@ impl Format> { /// Rotate to a new output file /// /// Closes the current Parquet file (writing footer) and opens a new file - /// at the specified path with the same compression settings. + /// at the specified path with the same compression and bloom filter settings. /// /// # Errors /// /// Returns an error if closing the current file or creating the new file fails. pub fn rotate_to(self, path: &std::path::Path) -> Result { - // Store compression level before closing + // Store settings before closing let compression_level = self.compression_level; + let bloom_filter_config = self.bloom_filter_config.clone(); // Close current file (writes footer) self.close()?; - // Create new file and writer + // Create new file and writer with same settings let file = File::create(path)?; let writer = BufWriter::new(file); - let format = Self::new(writer, compression_level)?; + let format = Self::with_bloom_filter(writer, compression_level, bloom_filter_config)?; Ok(format) } @@ -479,6 +564,12 @@ impl Format> { pub fn compression_level(&self) -> i32 { self.compression_level } + + /// Get the bloom filter configuration for this format + #[must_use] + pub fn bloom_filter_config(&self) -> &BloomFilterConfig { + &self.bloom_filter_config + } } #[cfg(test)] diff --git a/lading_capture/src/manager.rs b/lading_capture/src/manager.rs index 4f22274aa..b3e095783 100644 --- a/lading_capture/src/manager.rs +++ b/lading_capture/src/manager.rs @@ -481,13 +481,55 @@ impl CaptureManager>, RealCloc experiment_started: lading_signal::Watcher, target_running: lading_signal::Watcher, expiration: Duration, + ) -> Result { + Self::new_parquet_with_bloom_filter( + capture_path, + flush_seconds, + compression_level, + parquet::BloomFilterConfig::default(), + shutdown, + experiment_started, + target_running, + expiration, + ) + .await + } + + /// Create a new [`CaptureManager`] with file-based Parquet writer and bloom filter config + /// + /// # Arguments + /// + /// * `capture_path` - Path to the output Parquet file + /// * `flush_seconds` - How often to flush buffered data + /// * `compression_level` - Zstd compression level (1-22) + /// * `bloom_filter_config` - Configuration for bloom filters on label columns + /// * `shutdown` - Signal to gracefully shut down the capture manager + /// * `experiment_started` - Signal that the experiment has started + /// * `target_running` - Signal that the target is running + /// * `expiration` - Duration after which metrics expire + /// + /// # Errors + /// + /// Function will error if the underlying capture file cannot be opened or + /// if Parquet writer creation fails. + #[allow(clippy::too_many_arguments)] + pub async fn new_parquet_with_bloom_filter( + capture_path: PathBuf, + flush_seconds: u64, + compression_level: i32, + bloom_filter_config: parquet::BloomFilterConfig, + shutdown: lading_signal::Watcher, + experiment_started: lading_signal::Watcher, + target_running: lading_signal::Watcher, + expiration: Duration, ) -> Result { let fp = fs::File::create(&capture_path) .await .map_err(formats::Error::Io)?; let fp = fp.into_std().await; let writer = BufWriter::new(fp); - let format = parquet::Format::new(writer, compression_level)?; + let format = + parquet::Format::with_bloom_filter(writer, compression_level, bloom_filter_config)?; Ok(Self::new_with_format( format, @@ -539,6 +581,7 @@ impl CaptureManager>, RealCloc self.clock.mark_start(); let compression_level = self.format.compression_level(); + let bloom_filter_config = self.format.bloom_filter_config().clone(); // Run the event loop in a spawned task so we can return the sender immediately let expiration = self.expiration; @@ -567,6 +610,7 @@ impl CaptureManager>, RealCloc shutdown, rotation_rx, compression_level, + bloom_filter_config, ) .await { @@ -592,6 +636,7 @@ impl CaptureManager>, RealCloc shutdown: lading_signal::Watcher, mut rotation_rx: mpsc::Receiver, compression_level: i32, + bloom_filter_config: parquet::BloomFilterConfig, ) -> Result<(), Error> { let mut flush_interval = clock.interval(Duration::from_millis(TICK_DURATION_MS as u64)); let shutdown_wait = shutdown.recv(); @@ -624,6 +669,7 @@ impl CaptureManager>, RealCloc &mut state_machine, rotation_req.path, compression_level, + &bloom_filter_config, ).await; // Send result back to caller (ignore send error if receiver dropped) let _ = rotation_req.response.send(result); @@ -647,14 +693,19 @@ impl CaptureManager>, RealCloc >, new_path: PathBuf, compression_level: i32, + bloom_filter_config: &parquet::BloomFilterConfig, ) -> Result<(), formats::Error> { - // Create new file and format + // Create new file and format with same settings let fp = fs::File::create(&new_path) .await .map_err(formats::Error::Io)?; let fp = fp.into_std().await; let writer = BufWriter::new(fp); - let new_format = parquet::Format::new(writer, compression_level)?; + let new_format = parquet::Format::with_bloom_filter( + writer, + compression_level, + bloom_filter_config.clone(), + )?; // Swap formats - this flushes any buffered data let old_format = state_machine @@ -693,6 +744,40 @@ impl experiment_started: lading_signal::Watcher, target_running: lading_signal::Watcher, expiration: Duration, + ) -> Result { + Self::new_multi_with_bloom_filter( + base_path, + flush_seconds, + compression_level, + parquet::BloomFilterConfig::default(), + shutdown, + experiment_started, + target_running, + expiration, + ) + .await + } + + /// Create a new [`CaptureManager`] with file-based multi-format writer and bloom filter config + /// + /// Writes to both JSONL and Parquet formats simultaneously. The base path + /// is used to generate two output files: `{base_path}.jsonl` and + /// `{base_path}.parquet`. + /// + /// # Errors + /// + /// Function will error if either capture file cannot be opened or if + /// format creation fails. + #[allow(clippy::too_many_arguments)] + pub async fn new_multi_with_bloom_filter( + base_path: PathBuf, + flush_seconds: u64, + compression_level: i32, + bloom_filter_config: parquet::BloomFilterConfig, + shutdown: lading_signal::Watcher, + experiment_started: lading_signal::Watcher, + target_running: lading_signal::Watcher, + expiration: Duration, ) -> Result { let jsonl_path = base_path.with_extension("jsonl"); let parquet_path = base_path.with_extension("parquet"); @@ -709,7 +794,11 @@ impl .map_err(formats::Error::Io)?; let parquet_file = parquet_file.into_std().await; let parquet_writer = BufWriter::new(parquet_file); - let parquet_format = parquet::Format::new(parquet_writer, compression_level)?; + let parquet_format = parquet::Format::with_bloom_filter( + parquet_writer, + compression_level, + bloom_filter_config, + )?; let format = multi::Format::new(jsonl_format, parquet_format);