Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Changed
- **Breaking Change**: Rename `lost_bytes` to `missed_bytes` in `logrotate_fs`.
This reflects preexisting terminology.
- **Breaking Change**: Counter metrics in capture files are now emitted as
deltas instead of cumulative values. Parquet files include new metadata fields
to identify the format.
## Added
- Lading now supports histogram approximations in its capture files.
- HTTP blackhole now tracks distribution of bytes received, both decoded and
Expand Down
40 changes: 40 additions & 0 deletions lading_capture/src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,22 @@ impl Iterator for DrainIter {
let interval = interval_idx(tick_to_flush);

for (key, values) in &self.accumulator.counters {
// Skip ticks before this key was first written
if let Some(&first_tick) = self.accumulator.counter_first_tick.get(key)
&& tick_to_flush < first_tick
{
continue;
}
let value = values[interval];
metrics.push((key.clone(), MetricValue::Counter(value), tick_to_flush));
}
for (key, values) in &self.accumulator.gauges {
// Skip ticks before this key was first written
if let Some(&first_tick) = self.accumulator.gauge_first_tick.get(key)
&& tick_to_flush < first_tick
{
continue;
}
let value = values[interval];
metrics.push((key.clone(), MetricValue::Gauge(value), tick_to_flush));
}
Expand Down Expand Up @@ -295,6 +307,10 @@ pub(crate) struct Accumulator {
counters: FxHashMap<Key, [u64; BUFFER_SIZE]>,
gauges: FxHashMap<Key, [f64; BUFFER_SIZE]>,
histograms: FxHashMap<Key, [DDSketch; BUFFER_SIZE]>,
/// Tick when each counter key was first written (for delta counter support)
counter_first_tick: FxHashMap<Key, u64>,
/// Tick when each gauge key was first written
gauge_first_tick: FxHashMap<Key, u64>,
Comment on lines +310 to +313
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than have two whole separate maps I would almost rather than we alter

counters: FxHashMap<Key, { first_tick: u64, buf: [u64; BUFFER_SIZE]}>,

That way the information is co-local in memory and we avoid two additional maps, lookups.

pub(crate) current_tick: u64,
last_flushed_tick: Option<u64>,
}
Expand All @@ -308,6 +324,8 @@ impl std::fmt::Debug for Accumulator {
"histograms",
&format!("<{len} histogram keys>", len = self.histograms.len()),
)
.field("counter_first_tick", &self.counter_first_tick)
.field("gauge_first_tick", &self.gauge_first_tick)
.field("current_tick", &self.current_tick)
.field("last_flushed_tick", &self.last_flushed_tick)
.finish()
Expand All @@ -320,6 +338,8 @@ impl Accumulator {
counters: FxHashMap::default(),
gauges: FxHashMap::default(),
histograms: FxHashMap::default(),
counter_first_tick: FxHashMap::default(),
gauge_first_tick: FxHashMap::default(),
current_tick: 0,
last_flushed_tick: None,
}
Expand Down Expand Up @@ -351,6 +371,10 @@ impl Accumulator {
return Err(Error::TickTooOld { tick });
}

// Track when this key was first written
let key_clone = c.key.clone();
self.counter_first_tick.entry(key_clone).or_insert(tick);

let values = self.counters.entry(c.key).or_insert([0; BUFFER_SIZE]);

for offset in 0..=tick_offset {
Expand Down Expand Up @@ -392,6 +416,10 @@ impl Accumulator {
return Err(Error::TickTooOld { tick });
}

// Track when this key was first written
let key_clone = g.key.clone();
self.gauge_first_tick.entry(key_clone).or_insert(tick);

let values = self.gauges.entry(g.key).or_insert([0.0; BUFFER_SIZE]);

for offset in 0..=tick_offset {
Expand Down Expand Up @@ -505,11 +533,23 @@ impl Accumulator {
let flush_interval = interval_idx(flush_tick);

for (key, values) in &self.counters {
// Skip ticks before this key was first written
if let Some(&first_tick) = self.counter_first_tick.get(key)
&& flush_tick < first_tick
{
continue;
}
let value = values[flush_interval];
metrics.push((key.clone(), MetricValue::Counter(value), flush_tick));
}

for (key, values) in &self.gauges {
// Skip ticks before this key was first written
if let Some(&first_tick) = self.gauge_first_tick.get(key)
&& flush_tick < first_tick
{
continue;
}
let value = values[flush_interval];
metrics.push((key.clone(), MetricValue::Gauge(value), flush_tick));
}
Expand Down
9 changes: 8 additions & 1 deletion lading_capture/src/formats/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use lading_capture_schema::{capture_schema, columns};
use parquet::{
arrow::ArrowWriter,
basic::{Compression, ZstdLevel},
file::properties::{WriterProperties, WriterVersion},
file::{
metadata::KeyValue,
properties::{WriterProperties, WriterVersion},
},
schema::types::ColumnPath,
};

Expand Down Expand Up @@ -182,6 +185,10 @@ impl<W: Write + Seek + Send> Format<W> {
let props = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_compression(Compression::ZSTD(ZstdLevel::try_new(compression_level)?))
.set_key_value_metadata(Some(vec![
KeyValue::new("lading.schema_version".to_string(), "2".to_string()),
KeyValue::new("lading.counter_semantics".to_string(), "delta".to_string()),
]))
.set_column_dictionary_enabled(ColumnPath::from(columns::METRIC_KIND), true)
.set_column_dictionary_enabled(ColumnPath::from(columns::RUN_ID), true)
.build();
Expand Down
Loading
Loading