Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions libdd-ddsketch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,47 @@ impl DDSketch {
pub fn encode_to_vec(self) -> Vec<u8> {
self.into_pb().encode_to_vec()
}

/// Create a DDSketch from its protobuf representation
///
/// Returns an error if:
/// - The mapping is missing
/// - The interpolation mode is not NONE
/// - The gamma value is invalid (≤ 1.0)
/// - Negative values are present
/// - Both sparse and contiguous encodings are used in the same store
pub fn try_from_pb(pb: pb::DdSketch) -> Result<Self, Box<dyn std::error::Error>> {
// Validate no negative values
if let Some(ref neg) = pb.negative_values {
if !neg.bin_counts.is_empty() || !neg.contiguous_bin_counts.is_empty() {
return Err("negative values not supported".into());
}
}

// Extract and validate mapping
let pb_mapping = pb.mapping.ok_or("missing mapping")?;

// Validate interpolation mode is NONE
if pb_mapping.interpolation != pb::index_mapping::Interpolation::None as i32 {
return Err("unsupported interpolation mode".into());
}

// Create LogMapping
let mapping = LogMapping::new(pb_mapping.gamma, pb_mapping.index_offset)
.ok_or("invalid mapping parameters")?;

// Build store from positive_values
let store = match pb.positive_values {
Some(positive) => LowCollapsingDenseStore::try_from_pb_store(positive)?,
None => LowCollapsingDenseStore::default(),
};

Ok(DDSketch {
store,
zero_count: pb.zero_count,
mapping,
})
}
}

/// A store mapping the bin indexes to their respective weights
Expand Down Expand Up @@ -142,6 +183,59 @@ impl LowCollapsingDenseStore {
})
}

fn try_from_pb_store(store: pb::Store) -> Result<Self, Box<dyn std::error::Error>> {
let has_sparse = !store.bin_counts.is_empty();
let has_contiguous = !store.contiguous_bin_counts.is_empty();

if has_sparse && has_contiguous {
return Err("both sparse and contiguous encodings present".into());
}

if has_sparse {
Self::from_sparse_bins(&store.bin_counts)
} else if has_contiguous {
Ok(Self::from_contiguous_bins(
store.contiguous_bin_counts,
store.contiguous_bin_index_offset,
))
} else {
Ok(Self::default())
}
}

fn from_sparse_bins(
bin_counts: &HashMap<i32, f64>,
) -> Result<Self, Box<dyn std::error::Error>> {
if bin_counts.is_empty() {
return Ok(Self::default());
}

let min_index = *bin_counts.keys().min().ok_or("empty bin_counts")?;
let max_index = *bin_counts.keys().max().ok_or("empty bin_counts")?;
let range_size = (max_index - min_index + 1) as usize;

let mut bins = VecDeque::with_capacity(range_size);
for i in 0..range_size {
let index = min_index + i as i32;
let count = bin_counts.get(&index).copied().unwrap_or(0.0);
bins.push_back(count);
}

Ok(Self {
bins,
offset: min_index,
max_size: 2048,
})
}

fn from_contiguous_bins(bins: Vec<f64>, offset: i32) -> Self {
Self {
bins: bins.into(),
offset,
max_size: 2048,
}
}

/// Return an iterator over the bins
/// The iterator yields pairs `(bin_index, count)`
fn bins(&self) -> impl Iterator<Item = (i32, f64)> + '_ {
Expand Down Expand Up @@ -507,4 +601,126 @@ mod test {
&[(1, 1.0), (2, 0.0), (3, 1.0)]
)
}

#[test]
fn test_sketch_try_from_pb_roundtrip() {
let mut sketch = DDSketch::default();
let points: &[f64] = &[0.0, 1e-5, 0.1, 2.0, 10.0, 25.0, 10000.0];
for (i, &point) in points.iter().enumerate() {
assert!(sketch.add_with_count(point, i as f64 + 1.0).is_ok());
}

let original_count = sketch.count();
let original_bins = sketch.ordered_bins();

let pb = sketch.into_pb();
let restored = DDSketch::try_from_pb(pb).unwrap();

assert_within!(restored.count(), original_count, f64::EPSILON);
let restored_bins = restored.ordered_bins();
assert_eq!(original_bins.len(), restored_bins.len());
for (orig, rest) in original_bins.iter().zip(restored_bins.iter()) {
assert_within!(orig.0, rest.0, f64::EPSILON);
assert_within!(orig.1, rest.1, f64::EPSILON);
}
}

#[test]
fn test_sketch_try_from_pb_missing_mapping() {
let pb = pb::DdSketch {
mapping: None,
positive_values: None,
negative_values: None,
zero_count: 0.0,
};
assert!(DDSketch::try_from_pb(pb).is_err());
}

#[test]
fn test_sketch_try_from_pb_unsupported_interpolation() {
let pb = pb::DdSketch {
mapping: Some(pb::IndexMapping {
gamma: 1.02,
index_offset: 0.0,
interpolation: pb::index_mapping::Interpolation::Linear as i32,
}),
positive_values: None,
negative_values: None,
zero_count: 0.0,
};
assert!(DDSketch::try_from_pb(pb).is_err());
}

#[test]
fn test_sketch_try_from_pb_negative_values_error() {
let pb = pb::DdSketch {
mapping: Some(pb::IndexMapping {
gamma: 1.02,
index_offset: 0.0,
interpolation: pb::index_mapping::Interpolation::None as i32,
}),
positive_values: None,
negative_values: Some(pb::Store {
bin_counts: HashMap::new(),
contiguous_bin_counts: vec![1.0, 2.0],
contiguous_bin_index_offset: 0,
}),
zero_count: 0.0,
};
assert!(DDSketch::try_from_pb(pb).is_err());
}

#[test]
fn test_sketch_try_from_pb_mixed_encoding_error() {
let mut sparse_bins = HashMap::new();
sparse_bins.insert(5, 1.0);

let pb = pb::DdSketch {
mapping: Some(pb::IndexMapping {
gamma: 1.02,
index_offset: 0.0,
interpolation: pb::index_mapping::Interpolation::None as i32,
}),
positive_values: Some(pb::Store {
bin_counts: sparse_bins,
contiguous_bin_counts: vec![1.0, 2.0],
contiguous_bin_index_offset: 0,
}),
negative_values: None,
zero_count: 0.0,
};
assert!(DDSketch::try_from_pb(pb).is_err());
}

#[test]
fn test_sketch_try_from_pb_sparse_encoding() {
let mut sparse_bins = HashMap::new();
sparse_bins.insert(10, 1.0);
sparse_bins.insert(12, 3.0);

let pb = pb::DdSketch {
mapping: Some(pb::IndexMapping {
gamma: 1.02,
index_offset: 0.0,
interpolation: pb::index_mapping::Interpolation::None as i32,
}),
positive_values: Some(pb::Store {
bin_counts: sparse_bins,
contiguous_bin_counts: Vec::new(),
contiguous_bin_index_offset: 0,
}),
negative_values: None,
zero_count: 5.0,
};

let sketch = DDSketch::try_from_pb(pb).unwrap();
assert_within!(sketch.zero_count, 5.0, f64::EPSILON);
assert_within!(sketch.count(), 9.0, f64::EPSILON);

let bins: Vec<_> = sketch.store.bins().collect();
assert_eq!(bins.len(), 3);
assert_eq!(bins[0], (10, 1.0));
assert_eq!(bins[1], (11, 0.0));
assert_eq!(bins[2], (12, 3.0));
}
}
Loading