diff --git a/CHANGELOG.md b/CHANGELOG.md index c8ac39496..bd4c34af3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Changed - **Breaking Change**: Rename `lost_bytes` to `missed_bytes` in `logrotate_fs`. This reflects preexisting terminology. +- **Breaking Change**: Counter metrics in capture files are now emitted as + deltas instead of cumulative values. Parquet files include new metadata fields + to identify the format. ## Added - Lading now supports histogram approximations in its capture files. - HTTP blackhole now tracks distribution of bytes received, both decoded and diff --git a/lading_capture/src/accumulator.rs b/lading_capture/src/accumulator.rs index 1e3671be2..f903cae9b 100644 --- a/lading_capture/src/accumulator.rs +++ b/lading_capture/src/accumulator.rs @@ -215,10 +215,22 @@ impl Iterator for DrainIter { let interval = interval_idx(tick_to_flush); for (key, values) in &self.accumulator.counters { + // Skip ticks before this key was first written + if let Some(&first_tick) = self.accumulator.counter_first_tick.get(key) + && tick_to_flush < first_tick + { + continue; + } let value = values[interval]; metrics.push((key.clone(), MetricValue::Counter(value), tick_to_flush)); } for (key, values) in &self.accumulator.gauges { + // Skip ticks before this key was first written + if let Some(&first_tick) = self.accumulator.gauge_first_tick.get(key) + && tick_to_flush < first_tick + { + continue; + } let value = values[interval]; metrics.push((key.clone(), MetricValue::Gauge(value), tick_to_flush)); } @@ -295,6 +307,10 @@ pub(crate) struct Accumulator { counters: FxHashMap, gauges: FxHashMap, histograms: FxHashMap, + /// Tick when each counter key was first written (for delta counter support) + counter_first_tick: FxHashMap, + /// Tick when each gauge key was first written + gauge_first_tick: FxHashMap, pub(crate) current_tick: u64, last_flushed_tick: Option, } @@ -308,6 +324,8 @@ impl std::fmt::Debug for Accumulator { "histograms", &format!("<{len} histogram keys>", len = self.histograms.len()), ) + .field("counter_first_tick", &self.counter_first_tick) + .field("gauge_first_tick", &self.gauge_first_tick) .field("current_tick", &self.current_tick) .field("last_flushed_tick", &self.last_flushed_tick) .finish() @@ -320,6 +338,8 @@ impl Accumulator { counters: FxHashMap::default(), gauges: FxHashMap::default(), histograms: FxHashMap::default(), + counter_first_tick: FxHashMap::default(), + gauge_first_tick: FxHashMap::default(), current_tick: 0, last_flushed_tick: None, } @@ -351,6 +371,10 @@ impl Accumulator { return Err(Error::TickTooOld { tick }); } + // Track when this key was first written + let key_clone = c.key.clone(); + self.counter_first_tick.entry(key_clone).or_insert(tick); + let values = self.counters.entry(c.key).or_insert([0; BUFFER_SIZE]); for offset in 0..=tick_offset { @@ -392,6 +416,10 @@ impl Accumulator { return Err(Error::TickTooOld { tick }); } + // Track when this key was first written + let key_clone = g.key.clone(); + self.gauge_first_tick.entry(key_clone).or_insert(tick); + let values = self.gauges.entry(g.key).or_insert([0.0; BUFFER_SIZE]); for offset in 0..=tick_offset { @@ -505,11 +533,23 @@ impl Accumulator { let flush_interval = interval_idx(flush_tick); for (key, values) in &self.counters { + // Skip ticks before this key was first written + if let Some(&first_tick) = self.counter_first_tick.get(key) + && flush_tick < first_tick + { + continue; + } let value = values[flush_interval]; metrics.push((key.clone(), MetricValue::Counter(value), flush_tick)); } for (key, values) in &self.gauges { + // Skip ticks before this key was first written + if let Some(&first_tick) = self.gauge_first_tick.get(key) + && flush_tick < first_tick + { + continue; + } let value = values[flush_interval]; metrics.push((key.clone(), MetricValue::Gauge(value), flush_tick)); } diff --git a/lading_capture/src/formats/parquet.rs b/lading_capture/src/formats/parquet.rs index fb0e40483..95cb03cee 100644 --- a/lading_capture/src/formats/parquet.rs +++ b/lading_capture/src/formats/parquet.rs @@ -19,7 +19,10 @@ use lading_capture_schema::{capture_schema, columns}; use parquet::{ arrow::ArrowWriter, basic::{Compression, ZstdLevel}, - file::properties::{WriterProperties, WriterVersion}, + file::{ + metadata::KeyValue, + properties::{WriterProperties, WriterVersion}, + }, schema::types::ColumnPath, }; @@ -182,6 +185,10 @@ impl Format { let props = WriterProperties::builder() .set_writer_version(WriterVersion::PARQUET_2_0) .set_compression(Compression::ZSTD(ZstdLevel::try_new(compression_level)?)) + .set_key_value_metadata(Some(vec![ + KeyValue::new("lading.schema_version".to_string(), "2".to_string()), + KeyValue::new("lading.counter_semantics".to_string(), "delta".to_string()), + ])) .set_column_dictionary_enabled(ColumnPath::from(columns::METRIC_KIND), true) .set_column_dictionary_enabled(ColumnPath::from(columns::RUN_ID), true) .build(); diff --git a/lading_capture/src/manager/state_machine.rs b/lading_capture/src/manager/state_machine.rs index 9d9a2c0fb..6e06552f9 100644 --- a/lading_capture/src/manager/state_machine.rs +++ b/lading_capture/src/manager/state_machine.rs @@ -109,6 +109,10 @@ pub(crate) struct StateMachine { filtered_global_labels: FxHashMap, /// Clock for time operations clock: C, + /// Previous counter values for delta calculation. + /// First observation of each counter is skipped to avoid emitting + /// the entire cumulative value as a "delta". + counter_prev: FxHashMap, } impl StateMachine { @@ -167,6 +171,7 @@ impl StateMachine { accumulator, filtered_global_labels: global_labels, clock, + counter_prev: FxHashMap::default(), } } @@ -268,15 +273,34 @@ impl StateMachine { fn record_captures(&mut self, now: Instant) -> Result<(), Error> { let tick = self.accumulator.current_tick; - // Capture all counter values from the registry + // Capture all counter values from the registry as DELTAS. + // First observation of each counter is skipped to avoid emitting + // the entire cumulative value as a "delta". for (k, c) in self.registry.get_counter_handles() { - let val = c.load(Ordering::Relaxed); - let counter = Counter { - key: k, - timestamp: now, - value: CounterValue::Absolute(val), - }; - self.accumulator.counter(counter, tick)?; + let current = c.load(Ordering::Relaxed); + + match self.counter_prev.get(&k) { + Some(&prev) => { + // Have previous value: calculate and emit delta + let delta = current.saturating_sub(prev); + self.counter_prev.insert(k.clone(), current); + + let counter = Counter { + key: k, + timestamp: now, + // NOTE: Must use Absolute(delta), NOT Increment(delta) + // because advance_tick() copies values forward. + // Absolute overwrites the copied value; Increment would sum. + value: CounterValue::Absolute(delta), + }; + self.accumulator.counter(counter, tick)?; + } + None => { + // First observation: just record, don't emit + self.counter_prev.insert(k.clone(), current); + // Skip accumulator - no output for first tick + } + } } // Capture all gauge values from the registry @@ -1248,13 +1272,13 @@ mod tests { assert_eq!(machine.accumulator.current_tick, initial_tick + 6); } - /// Test that strictly increasing counter and gauge values per interval - /// produce strictly monotonic output in both flush and drain operations. + /// Test that counter deltas are non-negative and gauges remain strictly increasing. /// - /// Runs for 600 intervals, incrementing metrics each tick, and verifies - /// the JSON output has strictly increasing values throughout. + /// Counters are emitted as delta values (change since last observation), + /// so they should be non-negative but not necessarily increasing. + /// Gauges are still cumulative, so they should be strictly increasing. #[test] - fn strictly_increasing_values_remain_monotonic() { + fn counter_deltas_non_negative_and_gauges_monotonic() { let writer = InMemoryWriter::new(); let format = jsonl::Format::new(writer.clone()); let clock = TestClock::new(0); @@ -1295,7 +1319,7 @@ mod tests { machine.next(Event::ShutdownSignaled).unwrap(); - // Parse output and verify strictly increasing values + // Parse output and verify counter deltas are non-negative and gauges strictly increasing let lines = writer.parse_lines().unwrap(); println!("Total lines written (including drain): {}", lines.len()); println!( @@ -1303,66 +1327,38 @@ mod tests { lines.len() - lines_before_shutdown ); - let mut last_counter_value: Option = None; let mut last_gauge_value: Option = None; let mut counter_line_number = 0; let mut gauge_line_number = 0; - // Check flush phase - for line in &lines[..lines_before_shutdown] { - if line.metric_name == "monotonic_counter" { - let value = line.value.as_f64(); - if let Some(last) = last_counter_value { - if value <= last { - println!("\n=== MONOTONICITY VIOLATION IN FLUSH ==="); - println!("Counter line #{counter_line_number}"); - println!("Current value: {value}, Previous: {last}"); - println!("Time: {} ms, Fetch index: {}", line.time, line.fetch_index); - panic!("Counter not strictly increasing during FLUSH: {value} <= {last}"); - } - } - last_counter_value = Some(value); - counter_line_number += 1; - } else if line.metric_name == "monotonic_gauge" { - let value = line.value.as_f64(); - if let Some(last) = last_gauge_value { - if value <= last { - println!("\n=== MONOTONICITY VIOLATION IN FLUSH ==="); - println!("Gauge line #{gauge_line_number}"); - println!("Current value: {value}, Previous: {last}"); - println!("Time: {} ms, Fetch index: {}", line.time, line.fetch_index); - panic!("Gauge not strictly increasing during FLUSH: {value} <= {last}"); - } - } - last_gauge_value = Some(value); - gauge_line_number += 1; - } - } - - // Check drain phase - for line in &lines[lines_before_shutdown..] { + // Check all lines + for line in &lines { if line.metric_name == "monotonic_counter" { let value = line.value.as_f64(); - if let Some(last) = last_counter_value { - if value <= last { - println!("\n=== MONOTONICITY VIOLATION IN DRAIN ==="); - println!("Counter line #{counter_line_number}"); - println!("Current value: {value}, Previous: {last}"); - println!("Time: {} ms, Fetch index: {}", line.time, line.fetch_index); - panic!("Counter not strictly increasing during DRAIN: {value} <= {last}"); - } + // Counter deltas should be non-negative (we increment by 1 each tick) + if value < 0.0 { + println!("\n=== NEGATIVE DELTA VIOLATION ==="); + println!("Counter line #{counter_line_number}"); + println!("Value: {value}"); + println!("Time: {} ms, Fetch index: {}", line.time, line.fetch_index); + panic!("Counter delta is negative: {value}"); } - last_counter_value = Some(value); + // Each delta should be exactly 1 (we increment by 1 each tick) + assert!( + (value - 1.0).abs() < f64::EPSILON, + "Counter delta should be 1.0, got {value} at line {counter_line_number}" + ); counter_line_number += 1; } else if line.metric_name == "monotonic_gauge" { let value = line.value.as_f64(); + // Gauges are still cumulative and should be strictly increasing if let Some(last) = last_gauge_value { if value <= last { - println!("\n=== MONOTONICITY VIOLATION IN DRAIN ==="); + println!("\n=== GAUGE MONOTONICITY VIOLATION ==="); println!("Gauge line #{gauge_line_number}"); println!("Current value: {value}, Previous: {last}"); println!("Time: {} ms, Fetch index: {}", line.time, line.fetch_index); - panic!("Gauge not strictly increasing during DRAIN: {value} <= {last}"); + panic!("Gauge not strictly increasing: {value} <= {last}"); } } last_gauge_value = Some(value); @@ -1371,7 +1367,11 @@ mod tests { } // Verify we actually produced output - assert!(last_counter_value.is_some(), "No counter values in output"); + // Note: First counter observation is skipped, so we have 599 counter values + assert!( + counter_line_number > 0, + "No counter values in output (expected 599)" + ); assert!(last_gauge_value.is_some(), "No gauge values in output"); } @@ -1418,11 +1418,14 @@ mod tests { let unique_ticks: std::collections::HashSet = lines.iter().map(|l| l.fetch_index).collect(); - // We start at tick 0 and advance 30 times, so we have ticks 0-30 (31 total) + // We start at tick 0 but due to: + // 1. Drift correction (clock advanced before first FlushTick), tick 0 is skipped + // 2. First observation at tick 1 is skipped for delta baseline + // So we have ticks 2-30 (29 total) assert_eq!( unique_ticks.len(), - 31, - "30-second run should drain 31 unique ticks (0-30)" + 29, + "30-second run should drain 29 unique ticks (2-30, ticks 0-1 skipped)" ); } @@ -1465,17 +1468,13 @@ mod tests { lines.iter().map(|l| l.fetch_index).collect(); // After 60 FlushTick events, we have: - // - // - Advanced to tick 60 - // - Flushed tick 0 (when we reached tick 60) - // - Written metrics for ticks 0-60 - // - // Total output includes both the flushed tick 0 and drained ticks - // 1-60 (61 ticks total). + // - Drift correction skips tick 0 + // - First observation at tick 1 is skipped for delta baseline + // - Ticks 2-60 have delta data (59 ticks) assert_eq!( unique_ticks.len(), - 61, - "60-second run should output 61 unique ticks total (0-60)" + 59, + "60-second run should output 59 unique ticks total (2-60, ticks 0-1 skipped)" ); } @@ -1582,6 +1581,9 @@ mod tests { } // Test case 5: Run with minimal flushes + // + // With delta counters, the first observation is skipped (used as baseline + // for delta calculation). Need 2 FlushTicks to produce any output. { let writer = InMemoryWriter::new(); let format = jsonl::Format::new(writer.clone()); @@ -1604,26 +1606,29 @@ mod tests { clock.clone(), ); - // Write a metric and do one flush to get it into the accumulator - metrics::with_local_recorder(&recorder, || { - metrics::counter!("test_counter").increment(1); - }); - - // One FlushTick to record the metric at tick 0 and advance to tick 1 - let _ = machine.next(Event::FlushTick); + // Write a metric and do two flushes to get delta output + // First FlushTick: drift correction to tick 1, then baseline recorded (skipped), advance to tick 2 + // Second FlushTick: delta=1 emitted at tick 2, advance to tick 3 + for _ in 0..2 { + metrics::with_local_recorder(&recorder, || { + metrics::counter!("test_counter").increment(1); + }); + clock.advance(1000); + let _ = machine.next(Event::FlushTick); + } // Shutdown immediately let _ = machine.next(Event::ShutdownSignaled); let lines = writer.parse_lines().expect("should parse"); - // Should have tick 0 from the single flush/advance + // Should have tick 2 (tick 0 skipped by drift, tick 1 skipped as delta baseline) let unique_ticks: std::collections::HashSet = lines.iter().map(|l| l.fetch_index).collect(); assert_eq!( unique_ticks.len(), 1, - "Run with single FlushTick should drain tick 0" + "Run with two FlushTicks should drain tick 2 (ticks 0-1 skipped)" ); } }