diff --git a/datadog-sidecar-ffi/src/span.rs b/datadog-sidecar-ffi/src/span.rs index e8aacd88f3..d2e423f9c7 100644 --- a/datadog-sidecar-ffi/src/span.rs +++ b/datadog-sidecar-ffi/src/span.rs @@ -3,7 +3,7 @@ use libdd_common_ffi::slice::{AsBytes, CharSlice}; use libdd_tinybytes::{Bytes, BytesString}; -use libdd_trace_utils::span::{ +use libdd_trace_utils::span::v04::{ AttributeAnyValueBytes, AttributeArrayValueBytes, SpanBytes, SpanEventBytes, SpanLinkBytes, }; use std::borrow::Cow; diff --git a/datadog-sidecar-ffi/tests/span.rs b/datadog-sidecar-ffi/tests/span.rs index 6e7068779d..8e79ba8000 100644 --- a/datadog-sidecar-ffi/tests/span.rs +++ b/datadog-sidecar-ffi/tests/span.rs @@ -4,7 +4,7 @@ use datadog_sidecar_ffi::span::*; use libdd_common_ffi::slice::*; use libdd_tinybytes::*; -use libdd_trace_utils::span::*; +use libdd_trace_utils::span::v04::*; use std::collections::HashMap; #[test] diff --git a/datadog-sidecar/src/service/tracing/trace_flusher.rs b/datadog-sidecar/src/service/tracing/trace_flusher.rs index 12d41fa029..78bc5f99c2 100644 --- a/datadog-sidecar/src/service/tracing/trace_flusher.rs +++ b/datadog-sidecar/src/service/tracing/trace_flusher.rs @@ -355,9 +355,8 @@ mod tests { }; let send_data_1 = create_send_data(size, &target_endpoint); - - let send_data_2 = send_data_1.clone(); - let send_data_3 = send_data_1.clone(); + let send_data_2 = create_send_data(size, &target_endpoint); + let send_data_3 = create_send_data(size, &target_endpoint); trace_flusher.enqueue(send_data_1); trace_flusher.enqueue(send_data_2); diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index c62351138c..743d312661 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -546,7 +546,7 @@ mod tests { use crate::error::ddog_trace_exporter_error_free; use httpmock::prelude::*; use httpmock::MockServer; - use libdd_trace_utils::span::SpanSlice; + use libdd_trace_utils::span::v04::SpanSlice; use std::{borrow::Borrow, mem::MaybeUninit}; #[test] diff --git a/libdd-data-pipeline/src/stats_exporter.rs b/libdd-data-pipeline/src/stats_exporter.rs index 150b9dfcc7..a154be6b18 100644 --- a/libdd-data-pipeline/src/stats_exporter.rs +++ b/libdd-data-pipeline/src/stats_exporter.rs @@ -195,7 +195,7 @@ mod tests { use httpmock::prelude::*; use httpmock::MockServer; use libdd_common::hyper_migration::new_default_client; - use libdd_trace_utils::span::{trace_utils, SpanSlice}; + use libdd_trace_utils::span::{trace_utils, v04::SpanSlice}; use libdd_trace_utils::test_utils::poll_for_mock_hit; use time::Duration; use time::SystemTime; diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index a8c0d2a8ac..1c4bb22292 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -42,7 +42,7 @@ use libdd_trace_utils::msgpack_decoder; use libdd_trace_utils::send_with_retry::{ send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult, }; -use libdd_trace_utils::span::{Span, SpanText}; +use libdd_trace_utils::span::{v04::Span, TraceData}; use libdd_trace_utils::trace_utils::TracerHeaderTags; use std::io; use std::sync::{Arc, Mutex}; @@ -587,7 +587,7 @@ impl TraceExporter { /// # Returns /// * Ok(String): The response from the agent /// * Err(TraceExporterError): An error detailing what went wrong in the process - pub fn send_trace_chunks( + pub fn send_trace_chunks( &self, trace_chunks: Vec>>, ) -> Result { @@ -604,7 +604,7 @@ impl TraceExporter { /// # Returns /// * Ok(String): The response from the agent /// * Err(TraceExporterError): An error detailing what went wrong in the process - pub async fn send_trace_chunks_async( + pub async fn send_trace_chunks_async( &self, trace_chunks: Vec>>, ) -> Result { @@ -687,7 +687,7 @@ impl TraceExporter { self.handle_send_result(result, chunks, payload_len).await } - async fn send_trace_chunks_inner( + async fn send_trace_chunks_inner( &self, mut traces: Vec>>, ) -> Result { @@ -1012,8 +1012,8 @@ mod tests { use httpmock::MockServer; use libdd_tinybytes::BytesString; use libdd_trace_utils::msgpack_encoder; + use libdd_trace_utils::span::v04::SpanBytes; use libdd_trace_utils::span::v05; - use libdd_trace_utils::span::SpanBytes; use std::collections::HashMap; use std::net; use std::time::Duration; @@ -2024,7 +2024,7 @@ mod single_threaded_tests { use crate::agent_info; use httpmock::prelude::*; use libdd_trace_utils::msgpack_encoder; - use libdd_trace_utils::span::SpanBytes; + use libdd_trace_utils::span::v04::SpanBytes; use std::time::Duration; use tokio::time::sleep; diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 1b2090776b..3f2654e2f5 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -206,9 +206,9 @@ pub(crate) fn handle_stats_enabled( /// Add all spans from the given iterator into the stats concentrator /// # Panic /// Will panic if another thread panicked will holding the lock on `stats_concentrator` -fn add_spans_to_stats( +fn add_spans_to_stats( stats_concentrator: &Mutex, - traces: &[Vec>], + traces: &[Vec>], ) { let mut stats_concentrator = stats_concentrator.lock_or_panic(); @@ -219,8 +219,8 @@ fn add_spans_to_stats( } /// Process traces for stats computation and update header tags accordingly -pub(crate) fn process_traces_for_stats( - traces: &mut Vec>>, +pub(crate) fn process_traces_for_stats( + traces: &mut Vec>>, header_tags: &mut libdd_trace_utils::trace_utils::TracerHeaderTags, client_side_stats: &ArcSwap, client_computed_top_level: bool, diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 515495d7a7..9f17b2e16a 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -12,7 +12,7 @@ use libdd_common::header::{ }; use libdd_trace_utils::msgpack_decoder::decode::error::DecodeError; use libdd_trace_utils::msgpack_encoder; -use libdd_trace_utils::span::{Span, SpanText}; +use libdd_trace_utils::span::{v04::Span, TraceData}; use libdd_trace_utils::trace_utils::{self, TracerHeaderTags}; use libdd_trace_utils::tracer_payload; use std::collections::HashMap; @@ -46,7 +46,7 @@ impl<'a> TraceSerializer<'a> { } /// Prepare traces payload and HTTP headers for sending to agent - pub(super) fn prepare_traces_payload( + pub(super) fn prepare_traces_payload( &self, traces: Vec>>, header_tags: TracerHeaderTags, @@ -64,7 +64,7 @@ impl<'a> TraceSerializer<'a> { } /// Collect trace chunks based on output format - fn collect_and_process_traces( + fn collect_and_process_traces( &self, traces: Vec>>, ) -> Result, TraceExporterError> { @@ -97,7 +97,7 @@ impl<'a> TraceSerializer<'a> { } /// Serialize payload to msgpack format - fn serialize_payload( + fn serialize_payload( &self, payload: &tracer_payload::TraceChunks, ) -> Result, TraceExporterError> { @@ -119,7 +119,7 @@ mod tests { APPLICATION_MSGPACK_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR, DATADOG_TRACE_COUNT_STR, }; use libdd_tinybytes::BytesString; - use libdd_trace_utils::span::SpanBytes; + use libdd_trace_utils::span::v04::SpanBytes; use libdd_trace_utils::trace_utils::TracerHeaderTags; fn create_test_span() -> SpanBytes { diff --git a/libdd-tinybytes/src/lib.rs b/libdd-tinybytes/src/lib.rs index 3d0a124d8c..ea5dfbf913 100644 --- a/libdd-tinybytes/src/lib.rs +++ b/libdd-tinybytes/src/lib.rs @@ -7,6 +7,8 @@ #![cfg_attr(not(test), deny(clippy::todo))] #![cfg_attr(not(test), deny(clippy::unimplemented))] +#[cfg(feature = "serde")] +use serde::Serialize; use std::{ borrow, cmp, fmt, hash, ops::{self, RangeBounds}, @@ -14,11 +16,9 @@ use std::{ sync::atomic::AtomicUsize, }; -#[cfg(feature = "serde")] -use serde::Serialize; - /// Immutable bytes type with zero copy cloning and slicing. #[derive(Clone)] +#[repr(C)] // fixed layout for ad-hoc conversion to slice pub struct Bytes { ptr: NonNull, len: usize, @@ -49,11 +49,24 @@ impl Bytes { len: usize, refcount: RefCountedCell, ) -> Self { - Self { - ptr, - len, - bytes: Some(refcount), - } + Self::from_raw(ptr, len, Some(refcount)) + } + + #[inline] + /// Creates a new `Bytes` from the given slice data and the refcount. Can be used after calling + /// into_raw(). + /// + /// # Safety + /// + /// * the pointer should be valid for the given length + /// * the pointer should be valid for reads as long as the refcount or any of it's clone is not + /// dropped + pub const unsafe fn from_raw( + ptr: NonNull, + len: usize, + bytes: Option, + ) -> Self { + Self { ptr, len, bytes } } /// Creates empty `Bytes`. @@ -235,6 +248,11 @@ impl Bytes { // SAFETY: ptr is valid for the associated length unsafe { std::slice::from_raw_parts(self.ptr.as_ptr().cast_const(), self.len()) } } + + #[inline] + pub fn into_raw(self) -> (NonNull, usize, Option) { + (self.ptr, self.len, self.bytes) + } } // Implementations of `UnderlyingBytes` for common types. diff --git a/libdd-trace-stats/benches/span_concentrator_bench.rs b/libdd-trace-stats/benches/span_concentrator_bench.rs index cad2e55404..21c105c952 100644 --- a/libdd-trace-stats/benches/span_concentrator_bench.rs +++ b/libdd-trace-stats/benches/span_concentrator_bench.rs @@ -7,7 +7,7 @@ use std::{ use criterion::{criterion_group, Criterion}; use libdd_trace_stats::span_concentrator::SpanConcentrator; -use libdd_trace_utils::span::SpanBytes; +use libdd_trace_utils::span::v04::SpanBytes; fn get_bucket_start(now: SystemTime, n: u64) -> i64 { let start = now.duration_since(time::UNIX_EPOCH).unwrap() + Duration::from_secs(10 * n); diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index 3ef068cd5f..07863b3a98 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -301,7 +301,7 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl #[cfg(test)] mod tests { - use libdd_trace_utils::span::{SpanBytes, SpanSlice}; + use libdd_trace_utils::span::v04::{SpanBytes, SpanSlice}; use super::*; use std::{collections::HashMap, hash::Hash}; diff --git a/libdd-trace-stats/src/span_concentrator/stat_span.rs b/libdd-trace-stats/src/span_concentrator/stat_span.rs index 31ac1676e2..4696d50581 100644 --- a/libdd-trace-stats/src/span_concentrator/stat_span.rs +++ b/libdd-trace-stats/src/span_concentrator/stat_span.rs @@ -5,8 +5,9 @@ //! support both trace-utils' Span and pb::Span. use libdd_trace_protobuf::pb; -use libdd_trace_utils::span::{trace_utils, Span, SpanText}; +use libdd_trace_utils::span::{trace_utils, v04::Span, TraceData}; use libdd_trace_utils::trace_utils as pb_utils; +use std::borrow::Borrow; /// Common interface for spans used in stats computation pub trait StatSpan<'a> { @@ -38,7 +39,7 @@ pub trait StatSpan<'a> { fn get_metrics(&'a self, key: &str) -> Option; } -impl<'a, T: SpanText> StatSpan<'a> for Span { +impl<'a, T: TraceData> StatSpan<'a> for Span { fn service(&'a self) -> &'a str { self.service.borrow() } diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index 3f4f8c05df..647b267dce 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -4,7 +4,7 @@ use crate::span_concentrator::aggregation::OwnedAggregationKey; use super::*; -use libdd_trace_utils::span::{trace_utils::compute_top_level_span, SpanSlice}; +use libdd_trace_utils::span::{trace_utils::compute_top_level_span, v04::SpanSlice}; use rand::{thread_rng, Rng}; const BUCKET_SIZE: u64 = Duration::from_secs(2).as_nanos() as u64; diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/buffer.rs b/libdd-trace-utils/src/msgpack_decoder/decode/buffer.rs new file mode 100644 index 0000000000..76df2f1c1f --- /dev/null +++ b/libdd-trace-utils/src/msgpack_decoder/decode/buffer.rs @@ -0,0 +1,63 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::msgpack_decoder::decode::error::DecodeError; +use crate::span::TraceData; +use rmp::decode; +use rmp::decode::DecodeStringError; + +use std::borrow::Borrow; +use std::ops::Deref; + +/// Read a string from `buf`. +/// +/// # Errors +/// Fails if the buffer doesn't contain a valid utf8 msgpack string. +#[inline] +pub fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { + decode::read_str_from_slice(buf).map_err(|e| match e { + DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()), + DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()), + DecodeStringError::TypeMismatch(marker) => { + DecodeError::InvalidType(format!("Type mismatch at marker {marker:?}")) + } + DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()), + _ => DecodeError::IOError, + }) +} + +/// Internal Buffer used to wrap msgpack data for decoding. +/// Provides a couple accessors to extract data from the buffer. +pub struct Buffer(T::Bytes); + +impl Buffer { + pub fn new(data: T::Bytes) -> Self { + Buffer(data) + } + + /// Returns a mutable reference to the underlying slice. + pub fn as_mut_slice(&mut self) -> &mut &'static [u8] { + T::get_mut_slice(&mut self.0) + } + + /// Tries to extract a slice of `bytes` from the buffer and advances the buffer. + pub fn try_slice_and_advance(&mut self, bytes: usize) -> Option { + T::try_slice_and_advance(&mut self.0, bytes) + } + + /// Read a string from the slices `buf`. + /// + /// # Errors + /// Fails if the buffer doesn't contain a valid utf8 msgpack string. + pub fn read_string(&mut self) -> Result { + T::read_string(&mut self.0) + } +} + +impl Deref for Buffer { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.0.borrow() + } +} diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/map.rs b/libdd-trace-utils/src/msgpack_decoder/decode/map.rs index c6dd22fb1d..9641d80864 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/map.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/map.rs @@ -1,7 +1,8 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::msgpack_decoder::decode::error::DecodeError; +use crate::msgpack_decoder::decode::{buffer::Buffer, error::DecodeError}; +use crate::span::TraceData; use rmp::{decode, decode::RmpRead, Marker}; use std::collections::HashMap; @@ -33,14 +34,14 @@ use std::collections::HashMap; /// * `V` - The type of the values in the map. /// * `F` - The type of the function used to read key-value pairs from the buffer. #[inline] -pub fn read_map<'a, K, V, F>( +pub fn read_map( len: usize, - buf: &mut &'a [u8], + buf: &mut B, read_pair: F, ) -> Result, DecodeError> where K: std::hash::Hash + Eq, - F: Fn(&mut &'a [u8]) -> Result<(K, V), DecodeError>, + F: Fn(&mut B) -> Result<(K, V), DecodeError>, { let mut map = HashMap::with_capacity(len); for _ in 0..len { @@ -67,7 +68,8 @@ where /// - The buffer does not contain a map. /// - There is an error reading from the buffer. #[inline] -pub fn read_map_len(buf: &mut &[u8]) -> Result { +pub fn read_map_len(buf: &mut Buffer) -> Result { + let buf = buf.as_mut_slice(); match decode::read_marker(buf) .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))? { diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/meta_struct.rs b/libdd-trace-utils/src/msgpack_decoder/decode/meta_struct.rs index 1e28a7057a..30c7fe4620 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/meta_struct.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/meta_struct.rs @@ -1,33 +1,35 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; use crate::msgpack_decoder::decode::map::{read_map, read_map_len}; -use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_ref}; -use libdd_tinybytes::Bytes; +use crate::msgpack_decoder::decode::string::handle_null_marker; +use crate::span::TraceData; use rmp::decode; use std::collections::HashMap; -fn read_byte_array_len(buf: &mut &[u8]) -> Result { - decode::read_bin_len(buf).map_err(|_| { +fn read_byte_array_len(buf: &mut Buffer) -> Result { + decode::read_bin_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidFormat("Unable to read binary len for meta_struct".to_owned()) }) } #[inline] -pub fn read_meta_struct<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { +pub fn read_meta_struct( + buf: &mut Buffer, +) -> Result, DecodeError> { if handle_null_marker(buf) { return Ok(HashMap::default()); } - fn read_meta_struct_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, Bytes), DecodeError> { - let key = read_string_ref(buf)?; + fn read_meta_struct_pair( + buf: &mut Buffer, + ) -> Result<(T::Text, T::Bytes), DecodeError> { + let key = buf.read_string()?; let byte_array_len = read_byte_array_len(buf)? as usize; - let slice = buf.get(0..byte_array_len); - if let Some(slice) = slice { - let data = Bytes::copy_from_slice(slice); - *buf = &buf[byte_array_len..]; + if let Some(data) = buf.try_slice_and_advance(byte_array_len) { Ok((key, data)) } else { Err(DecodeError::InvalidFormat( @@ -43,13 +45,15 @@ pub fn read_meta_struct<'a>(buf: &mut &'a [u8]) -> Result::new(serialized.as_ref()); let res = read_meta_struct(&mut slice).unwrap(); assert_eq!(res.get("key").unwrap().to_vec(), vec![1, 2, 3, 4]); @@ -60,7 +64,7 @@ mod tests { let meta = HashMap::from([("key".to_string(), vec![1, 2, 3, 4])]); let serialized = rmp_serde::to_vec_named(&meta).unwrap(); - let mut slice = serialized.as_ref(); + let mut slice = Buffer::::new(serialized.as_ref()); let res = read_meta_struct(&mut slice); assert!(res.is_err()); diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/metrics.rs b/libdd-trace-utils/src/msgpack_decoder/decode/metrics.rs index 5b84ea16be..8dfc8cefed 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/metrics.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/metrics.rs @@ -1,21 +1,25 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; use crate::msgpack_decoder::decode::map::{read_map, read_map_len}; -use crate::msgpack_decoder::decode::number::read_number_slice; -use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_ref}; +use crate::msgpack_decoder::decode::number::read_number; +use crate::msgpack_decoder::decode::string::handle_null_marker; +use crate::span::TraceData; use std::collections::HashMap; #[inline] -pub fn read_metric_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, f64), DecodeError> { - let key = read_string_ref(buf)?; - let v = read_number_slice(buf)?; +pub fn read_metric_pair(buf: &mut Buffer) -> Result<(T::Text, f64), DecodeError> { + let key = buf.read_string()?; + let v = read_number(buf)?; Ok((key, v)) } #[inline] -pub fn read_metrics<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { +pub fn read_metrics( + buf: &mut Buffer, +) -> Result, DecodeError> { if handle_null_marker(buf) { return Ok(HashMap::default()); } diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/mod.rs b/libdd-trace-utils/src/msgpack_decoder/decode/mod.rs index 156f8614d1..48f0d75506 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/mod.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/mod.rs @@ -1,6 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +pub mod buffer; pub mod error; pub mod map; pub mod meta_struct; diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/number.rs b/libdd-trace-utils/src/msgpack_decoder/decode/number.rs index 10ef05b02d..d057ba9545 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/number.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/number.rs @@ -1,7 +1,9 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; +use crate::span::TraceData; use rmp::{decode::RmpRead, Marker}; use std::fmt; @@ -146,7 +148,7 @@ impl TryFrom for f64 { } } -fn read_number(buf: &mut &[u8], allow_null: bool) -> Result { +fn read_num(buf: &mut &[u8], allow_null: bool) -> Result { match rmp::decode::read_marker(buf) .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for number".to_owned()))? { @@ -194,22 +196,23 @@ fn read_number(buf: &mut &[u8], allow_null: bool) -> Result } /// Read a msgpack encoded number from `buf`. -pub fn read_number_slice>( - buf: &mut &[u8], -) -> Result { - read_number(buf, false)?.try_into() +pub fn read_number>( + buf: &mut Buffer, +) -> Result { + read_num(buf.as_mut_slice(), false)?.try_into() } /// Read a msgpack encoded number from `buf` and return 0 if null. -pub fn read_nullable_number_slice>( - buf: &mut &[u8], -) -> Result { - read_number(buf, true)?.try_into() +pub fn read_nullable_number>( + buf: &mut Buffer, +) -> Result { + read_num(buf.as_mut_slice(), true)?.try_into() } #[cfg(test)] mod tests { use super::*; + use crate::span::SliceData; use serde_json::json; use std::f64; @@ -219,8 +222,8 @@ mod tests { let expected_value = 42; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: u8 = read_number_slice(&mut slice).unwrap(); + let mut slice = Buffer::::new(buf.as_slice()); + let result: u8 = read_number(&mut slice).unwrap(); assert_eq!(result, expected_value); } @@ -230,8 +233,8 @@ mod tests { let expected_value = 42; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: i8 = read_number_slice(&mut slice).unwrap(); + let mut slice = Buffer::::new(buf.as_slice()); + let result: i8 = read_number(&mut slice).unwrap(); assert_eq!(result, expected_value); } @@ -241,18 +244,18 @@ mod tests { let expected_value = 42.98; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: f64 = read_number_slice(&mut slice).unwrap(); + let mut slice = Buffer::::new(buf.as_slice()); + let result: f64 = read_number(&mut slice).unwrap(); assert_eq!(result, expected_value); } #[test] - fn test_decoding_null_through_read_number_slice_raises_exception() { + fn test_decoding_null_through_read_number_raises_exception() { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: Result = read_number_slice(&mut slice); + let mut slice = Buffer::::new(buf.as_slice()); + let result: Result = read_number(&mut slice); assert!(matches!(result, Err(DecodeError::InvalidType(_)))); assert_eq!( @@ -266,8 +269,8 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: u8 = read_nullable_number_slice(&mut slice).unwrap(); + let mut slice = Buffer::::new(buf.as_slice()); + let result: u8 = read_nullable_number(&mut slice).unwrap(); assert_eq!(result, 0); } @@ -276,8 +279,8 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: i8 = read_nullable_number_slice(&mut slice).unwrap(); + let mut slice = Buffer::::new(buf.as_slice()); + let result: i8 = read_nullable_number(&mut slice).unwrap(); assert_eq!(result, 0); } @@ -286,8 +289,8 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: f64 = read_nullable_number_slice(&mut slice).unwrap(); + let mut slice = Buffer::::new(buf.as_slice()); + let result: f64 = read_nullable_number(&mut slice).unwrap(); assert_eq!(result, 0.0); } diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/span_event.rs b/libdd-trace-utils/src/msgpack_decoder/decode/span_event.rs index 4b3cb0cc43..6befce9f40 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/span_event.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/span_event.rs @@ -1,10 +1,13 @@ // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; -use crate::msgpack_decoder::decode::number::read_number_slice; -use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_ref}; -use crate::span::{AttributeAnyValueSlice, AttributeArrayValueSlice, SpanEventSlice}; +use crate::msgpack_decoder::decode::number::read_number; +use crate::msgpack_decoder::decode::string::handle_null_marker; +use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, SpanEvent}; +use crate::span::TraceData; +use std::borrow::Borrow; use std::collections::HashMap; use std::str::FromStr; @@ -24,18 +27,18 @@ use std::str::FromStr; /// This function will return an error if: /// - The marker for the array length cannot be read. /// - Any `SpanEvent` cannot be decoded. -pub(crate) fn read_span_events<'a>( - buf: &mut &'a [u8], -) -> Result>, DecodeError> { +pub(crate) fn read_span_events( + buf: &mut Buffer, +) -> Result>, DecodeError> { if handle_null_marker(buf) { return Ok(Vec::default()); } - let len = rmp::decode::read_array_len(buf).map_err(|_| { + let len = rmp::decode::read_array_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidType("Unable to get array len for span events".to_owned()) })?; - let mut vec: Vec = Vec::with_capacity(len as usize); + let mut vec: Vec> = Vec::with_capacity(len as usize); for _ in 0..len { vec.push(decode_span_event(buf)?); } @@ -63,15 +66,15 @@ impl FromStr for SpanEventKey { } } -fn decode_span_event<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { - let mut event = SpanEventSlice::default(); - let event_size = rmp::decode::read_map_len(buf) +fn decode_span_event(buf: &mut Buffer) -> Result, DecodeError> { + let mut event = SpanEvent::default(); + let event_size = rmp::decode::read_map_len(buf.as_mut_slice()) .map_err(|_| DecodeError::InvalidType("Unable to get map len for event size".to_owned()))?; for _ in 0..event_size { - match read_string_ref(buf)?.parse::()? { - SpanEventKey::TimeUnixNano => event.time_unix_nano = read_number_slice(buf)?, - SpanEventKey::Name => event.name = read_string_ref(buf)?, + match buf.read_string()?.borrow().parse::()? { + SpanEventKey::TimeUnixNano => event.time_unix_nano = read_number(buf)?, + SpanEventKey::Name => event.name = buf.read_string()?, SpanEventKey::Attributes => event.attributes = read_attributes_map(buf)?, } } @@ -79,16 +82,16 @@ fn decode_span_event<'a>(buf: &mut &'a [u8]) -> Result, Decod Ok(event) } -fn read_attributes_map<'a>( - buf: &mut &'a [u8], -) -> Result>, DecodeError> { - let len = rmp::decode::read_map_len(buf) +fn read_attributes_map( + buf: &mut Buffer, +) -> Result>, DecodeError> { + let len = rmp::decode::read_map_len(buf.as_mut_slice()) .map_err(|_| DecodeError::InvalidType("Unable to get map len for attributes".to_owned()))?; #[allow(clippy::expect_used)] let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); for _ in 0..len { - let key = read_string_ref(buf)?; + let key = buf.read_string()?; let value = decode_attribute_any(buf)?; map.insert(key, value); } @@ -121,9 +124,11 @@ impl FromStr for AttributeAnyKey { } } -fn decode_attribute_any<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { - let mut attribute: Option = None; - let attribute_size = rmp::decode::read_map_len(buf).map_err(|_| { +fn decode_attribute_any( + buf: &mut Buffer, +) -> Result, DecodeError> { + let mut attribute: Option> = None; + let attribute_size = rmp::decode::read_map_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidType("Unable to get map len for attribute size".to_owned()) })?; @@ -135,15 +140,15 @@ fn decode_attribute_any<'a>(buf: &mut &'a [u8]) -> Result = None; for _ in 0..attribute_size { - match read_string_ref(buf)?.parse::()? { - AttributeAnyKey::Type => attribute_type = Some(read_number_slice(buf)?), + match buf.read_string()?.borrow().parse::()? { + AttributeAnyKey::Type => attribute_type = Some(read_number(buf)?), AttributeAnyKey::SingleValue(key) => { - attribute = Some(AttributeAnyValueSlice::SingleValue(get_attribute_from_key( + attribute = Some(AttributeAnyValue::SingleValue(get_attribute_from_key( buf, key, )?)) } AttributeAnyKey::ArrayValue => { - attribute = Some(AttributeAnyValueSlice::Array(read_attributes_array(buf)?)) + attribute = Some(AttributeAnyValue::Array(read_attributes_array(buf)?)) } } } @@ -168,14 +173,14 @@ fn decode_attribute_any<'a>(buf: &mut &'a [u8]) -> Result( - buf: &mut &'a [u8], -) -> Result>, DecodeError> { +fn read_attributes_array( + buf: &mut Buffer, +) -> Result>, DecodeError> { if handle_null_marker(buf) { return Ok(Vec::default()); } - let map_len = rmp::decode::read_map_len(buf).map_err(|_| { + let map_len = rmp::decode::read_map_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidType( "Unable to get map len for event attributes array_value object".to_owned(), ) @@ -187,20 +192,20 @@ fn read_attributes_array<'a>( )); } - let key = read_string_ref(buf)?; - if key != "values" { + let key = buf.read_string()?; + if key.borrow() != "values" { return Err(DecodeError::InvalidFormat( "Expected 'values' field in event attributes array_value object".to_owned(), )); } - let len = rmp::decode::read_array_len(buf).map_err(|_| { + let len = rmp::decode::read_array_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidType( "Unable to get array len for event attributes values field".to_owned(), ) })?; - let mut vec: Vec = Vec::with_capacity(len as usize); + let mut vec: Vec> = Vec::with_capacity(len as usize); if len > 0 { let first = decode_attribute_array(buf, None)?; let array_type = (&first).into(); @@ -238,41 +243,35 @@ impl FromStr for AttributeArrayKey { } } -fn get_attribute_from_key<'a>( - buf: &mut &'a [u8], +fn get_attribute_from_key( + buf: &mut Buffer, key: AttributeArrayKey, -) -> Result, DecodeError> { +) -> Result, DecodeError> { match key { - AttributeArrayKey::StringValue => { - Ok(AttributeArrayValueSlice::String(read_string_ref(buf)?)) - } + AttributeArrayKey::StringValue => Ok(AttributeArrayValue::String(buf.read_string()?)), AttributeArrayKey::BoolValue => { - let boolean = rmp::decode::read_bool(buf); + let boolean = rmp::decode::read_bool(buf.as_mut_slice()); if let Ok(value) = boolean { match value { - true => Ok(AttributeArrayValueSlice::Boolean(true)), - false => Ok(AttributeArrayValueSlice::Boolean(false)), + true => Ok(AttributeArrayValue::Boolean(true)), + false => Ok(AttributeArrayValue::Boolean(false)), } } else { Err(DecodeError::InvalidType("Invalid boolean field".to_owned())) } } - AttributeArrayKey::IntValue => { - Ok(AttributeArrayValueSlice::Integer(read_number_slice(buf)?)) - } - AttributeArrayKey::DoubleValue => { - Ok(AttributeArrayValueSlice::Double(read_number_slice(buf)?)) - } + AttributeArrayKey::IntValue => Ok(AttributeArrayValue::Integer(read_number(buf)?)), + AttributeArrayKey::DoubleValue => Ok(AttributeArrayValue::Double(read_number(buf)?)), _ => Err(DecodeError::InvalidFormat("Invalid attribute".to_owned())), } } -fn decode_attribute_array<'a>( - buf: &mut &'a [u8], +fn decode_attribute_array( + buf: &mut Buffer, array_type: Option, -) -> Result, DecodeError> { - let mut attribute: Option = None; - let attribute_size = rmp::decode::read_map_len(buf).map_err(|_| { +) -> Result, DecodeError> { + let mut attribute: Option> = None; + let attribute_size = rmp::decode::read_map_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidType("Unable to get map len for attribute size".to_owned()) })?; @@ -284,8 +283,8 @@ fn decode_attribute_array<'a>( let mut attribute_type: Option = None; for _ in 0..attribute_size { - match read_string_ref(buf)?.parse::()? { - AttributeArrayKey::Type => attribute_type = Some(read_number_slice(buf)?), + match buf.read_string()?.borrow().parse::()? { + AttributeArrayKey::Type => attribute_type = Some(read_number(buf)?), key => attribute = Some(get_attribute_from_key(buf, key)?), } } diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/span_link.rs b/libdd-trace-utils/src/msgpack_decoder/decode/span_link.rs index acb3efb15d..fffacbe3ac 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/span_link.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/span_link.rs @@ -1,12 +1,13 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; -use crate::msgpack_decoder::decode::number::read_number_slice; -use crate::msgpack_decoder::decode::string::{ - handle_null_marker, read_str_map_to_strings, read_string_ref, -}; -use crate::span::SpanLinkSlice; +use crate::msgpack_decoder::decode::number::read_number; +use crate::msgpack_decoder::decode::string::{handle_null_marker, read_str_map_to_strings}; +use crate::span::v04::SpanLink; +use crate::span::TraceData; +use std::borrow::Borrow; use std::str::FromStr; /// Reads a slice of bytes and decodes it into a vector of `SpanLink` objects. @@ -26,18 +27,18 @@ use std::str::FromStr; /// - The marker for the array length cannot be read. /// - Any `SpanLink` cannot be decoded. /// ``` -pub(crate) fn read_span_links<'a>( - buf: &mut &'a [u8], -) -> Result>, DecodeError> { +pub(crate) fn read_span_links( + buf: &mut Buffer, +) -> Result>, DecodeError> { if handle_null_marker(buf) { return Ok(Vec::default()); } - let len = rmp::decode::read_array_len(buf).map_err(|_| { + let len = rmp::decode::read_array_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidType("Unable to get array len for span links".to_owned()) })?; - let mut vec: Vec = Vec::with_capacity(len as usize); + let mut vec: Vec> = Vec::with_capacity(len as usize); for _ in 0..len { vec.push(decode_span_link(buf)?); } @@ -71,19 +72,19 @@ impl FromStr for SpanLinkKey { } } -fn decode_span_link<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { - let mut span = SpanLinkSlice::default(); - let span_size = rmp::decode::read_map_len(buf) +fn decode_span_link(buf: &mut Buffer) -> Result, DecodeError> { + let mut span = SpanLink::default(); + let span_size = rmp::decode::read_map_len(buf.as_mut_slice()) .map_err(|_| DecodeError::InvalidType("Unable to get map len for span size".to_owned()))?; for _ in 0..span_size { - match read_string_ref(buf)?.parse::()? { - SpanLinkKey::TraceId => span.trace_id = read_number_slice(buf)?, - SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number_slice(buf)?, - SpanLinkKey::SpanId => span.span_id = read_number_slice(buf)?, + match buf.read_string()?.borrow().parse::()? { + SpanLinkKey::TraceId => span.trace_id = read_number(buf)?, + SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number(buf)?, + SpanLinkKey::SpanId => span.span_id = read_number(buf)?, SpanLinkKey::Attributes => span.attributes = read_str_map_to_strings(buf)?, - SpanLinkKey::Tracestate => span.tracestate = read_string_ref(buf)?, - SpanLinkKey::Flags => span.flags = read_number_slice(buf)?, + SpanLinkKey::Tracestate => span.tracestate = buf.read_string()?, + SpanLinkKey::Flags => span.flags = read_number(buf)?, } } diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/string.rs b/libdd-trace-utils/src/msgpack_decoder/decode/string.rs index 9fff188c82..8a2f8a8041 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/string.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/string.rs @@ -1,53 +1,25 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; +use crate::span::TraceData; use rmp::decode; -use rmp::decode::DecodeStringError; use std::collections::HashMap; // https://docs.rs/rmp/latest/rmp/enum.Marker.html#variant.Null (0xc0 == 192) const NULL_MARKER: &u8 = &0xc0; -/// Read a string from `buf`. -/// -/// # Errors -/// Fails if the buffer doesn't contain a valid utf8 msgpack string. -#[inline] -pub fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { - decode::read_str_from_slice(buf).map_err(|e| match e { - DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()), - DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()), - DecodeStringError::TypeMismatch(marker) => { - DecodeError::InvalidType(format!("Type mismatch at marker {marker:?}")) - } - DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()), - _ => DecodeError::IOError, - }) -} - -/// Read a string from the slices `buf`. -/// -/// # Errors -/// Fails if the buffer doesn't contain a valid utf8 msgpack string. -#[inline] -pub fn read_string_ref<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { - read_string_ref_nomut(buf).map(|(str, newbuf)| { - *buf = newbuf; - str - }) -} - /// Read a nullable string from the slices `buf`. /// /// # Errors /// Fails if the buffer doesn't contain a valid utf8 msgpack string or a null marker. #[inline] -pub fn read_nullable_string<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { +pub fn read_nullable_string(buf: &mut Buffer) -> Result { if handle_null_marker(buf) { - Ok("") + Ok(T::Text::default()) } else { - read_string_ref(buf) + buf.read_string() } } @@ -58,19 +30,19 @@ pub fn read_nullable_string<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeErr /// or if any key or value is not a valid utf8 msgpack string. /// Null values are skipped (key not inserted into map). #[inline] -pub fn read_str_map_to_strings<'a>( - buf: &mut &'a [u8], -) -> Result, DecodeError> { - let len = decode::read_map_len(buf) +pub fn read_str_map_to_strings( + buf: &mut Buffer, +) -> Result, DecodeError> { + let len = decode::read_map_len(buf.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; #[allow(clippy::expect_used)] let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); for _ in 0..len { - let key = read_string_ref(buf)?; + let key = buf.read_string()?; // Only insert if value is not null if !handle_null_marker(buf) { - let value = read_string_ref(buf)?; + let value = buf.read_string()?; map.insert(key, value); } } @@ -84,9 +56,9 @@ pub fn read_str_map_to_strings<'a>( /// or if any key or value is not a valid utf8 msgpack string. /// Null values are skipped (key not inserted into map). #[inline] -pub fn read_nullable_str_map_to_strings<'a>( - buf: &mut &'a [u8], -) -> Result, DecodeError> { +pub fn read_nullable_str_map_to_strings( + buf: &mut Buffer, +) -> Result, DecodeError> { if handle_null_marker(buf) { return Ok(HashMap::default()); } @@ -100,9 +72,10 @@ pub fn read_nullable_str_map_to_strings<'a>( /// # Returns /// A boolean indicating whether the next value is null or not. #[inline] -pub fn handle_null_marker(buf: &mut &[u8]) -> bool { - if buf.first() == Some(NULL_MARKER) { - *buf = &buf[1..]; +pub fn handle_null_marker(buf: &mut Buffer) -> bool { + let slice = buf.as_mut_slice(); + if slice.first() == Some(NULL_MARKER) { + *slice = &slice[1..]; true } else { false diff --git a/libdd-trace-utils/src/msgpack_decoder/v04/mod.rs b/libdd-trace-utils/src/msgpack_decoder/v04/mod.rs index c9dd29892b..4ee8d4cb06 100644 --- a/libdd-trace-utils/src/msgpack_decoder/v04/mod.rs +++ b/libdd-trace-utils/src/msgpack_decoder/v04/mod.rs @@ -4,8 +4,10 @@ pub(crate) mod span; use self::span::decode_span; +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; -use crate::span::{SpanBytes, SpanSlice}; +use crate::span::v04::{Span, SpanBytes, SpanSlice}; +use crate::span::TraceData; /// Decodes a Bytes buffer into a `Vec>` object, also represented as a vector of /// `TracerPayloadV04` objects. @@ -52,20 +54,7 @@ use crate::span::{SpanBytes, SpanSlice}; pub fn from_bytes( data: libdd_tinybytes::Bytes, ) -> Result<(Vec>, usize), DecodeError> { - let (traces_ref, size) = from_slice(data.as_ref())?; - - #[allow(clippy::unwrap_used)] - let traces_owned = traces_ref - .iter() - .map(|trace| { - trace - .iter() - // Safe to unwrap since the spans use subslices of the `data` slice - .map(|span| span.try_to_bytes(&data).unwrap()) - .collect() - }) - .collect(); - Ok((traces_owned, size)) + from_buffer(&mut Buffer::new(data)) } /// Decodes a slice of bytes into a `Vec>` object. @@ -110,11 +99,19 @@ pub fn from_bytes( /// let decoded_span = &decoded_traces[0][0]; /// assert_eq!("test-span", decoded_span.name); /// ``` -pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), DecodeError> { - let trace_count = rmp::decode::read_array_len(&mut data).map_err(|_| { +pub fn from_slice(data: &[u8]) -> Result<(Vec>>, usize), DecodeError> { + from_buffer(&mut Buffer::new(data)) +} + +#[allow(clippy::type_complexity)] +pub fn from_buffer( + data: &mut Buffer, +) -> Result<(Vec>>, usize), DecodeError> { + let trace_count = rmp::decode::read_array_len(data.as_mut_slice()).map_err(|_| { DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) })?; + // Intentionally skip the size of the array (as it will be recomputed after coalescing). let start_len = data.len(); #[allow(clippy::expect_used)] @@ -126,9 +123,12 @@ pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), D .expect("Unable to cast trace_count to usize"), ), |mut traces, _| { - let span_count = rmp::decode::read_array_len(&mut data).map_err(|_| { - DecodeError::InvalidFormat("Unable to read array len for span count".to_owned()) - })?; + let span_count = + rmp::decode::read_array_len(data.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat( + "Unable to read array len for span count".to_owned(), + ) + })?; let trace = (0..span_count).try_fold( Vec::with_capacity( @@ -137,7 +137,7 @@ pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), D .expect("Unable to cast span_count to usize"), ), |mut trace, _| { - let span = decode_span(&mut data)?; + let span = decode_span(data)?; trace.push(span); Ok(trace) }, diff --git a/libdd-trace-utils/src/msgpack_decoder/v04/span.rs b/libdd-trace-utils/src/msgpack_decoder/v04/span.rs index 7b37a5aea3..33a48f5dd8 100644 --- a/libdd-trace-utils/src/msgpack_decoder/v04/span.rs +++ b/libdd-trace-utils/src/msgpack_decoder/v04/span.rs @@ -1,15 +1,17 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; -use crate::msgpack_decoder::decode::number::read_nullable_number_slice; +use crate::msgpack_decoder::decode::number::read_nullable_number; use crate::msgpack_decoder::decode::span_event::read_span_events; use crate::msgpack_decoder::decode::span_link::read_span_links; use crate::msgpack_decoder::decode::string::{ - read_nullable_str_map_to_strings, read_nullable_string, read_string_ref, + read_nullable_str_map_to_strings, read_nullable_string, }; use crate::msgpack_decoder::decode::{meta_struct::read_meta_struct, metrics::read_metrics}; -use crate::span::{SpanKey, SpanSlice}; +use crate::span::{v04::Span, v04::SpanKey, TraceData}; +use std::borrow::Borrow; /// Decodes a slice of bytes into a `Span` object. /// @@ -27,10 +29,10 @@ use crate::span::{SpanKey, SpanSlice}; /// This function will return an error if: /// - The map length cannot be read. /// - Any key or value cannot be decoded. -pub fn decode_span<'a>(buffer: &mut &'a [u8]) -> Result, DecodeError> { - let mut span = SpanSlice::default(); +pub fn decode_span(buffer: &mut Buffer) -> Result, DecodeError> { + let mut span = Span::::default(); - let span_size = rmp::decode::read_map_len(buffer).map_err(|_| { + let span_size = rmp::decode::read_map_len(buffer.as_mut_slice()).map_err(|_| { DecodeError::InvalidFormat("Unable to get map len for span size".to_owned()) })?; @@ -43,8 +45,10 @@ pub fn decode_span<'a>(buffer: &mut &'a [u8]) -> Result, DecodeErr // Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the // BytesStrings -fn fill_span<'a>(span: &mut SpanSlice<'a>, buf: &mut &'a [u8]) -> Result<(), DecodeError> { - let key = read_string_ref(buf)? +fn fill_span(span: &mut Span, buf: &mut Buffer) -> Result<(), DecodeError> { + let key = buf + .read_string()? + .borrow() .parse::() .map_err(|e| DecodeError::InvalidFormat(e.message))?; @@ -52,12 +56,12 @@ fn fill_span<'a>(span: &mut SpanSlice<'a>, buf: &mut &'a [u8]) -> Result<(), Dec SpanKey::Service => span.service = read_nullable_string(buf)?, SpanKey::Name => span.name = read_nullable_string(buf)?, SpanKey::Resource => span.resource = read_nullable_string(buf)?, - SpanKey::TraceId => span.trace_id = read_nullable_number_slice::(buf)? as u128, - SpanKey::SpanId => span.span_id = read_nullable_number_slice(buf)?, - SpanKey::ParentId => span.parent_id = read_nullable_number_slice(buf)?, - SpanKey::Start => span.start = read_nullable_number_slice(buf)?, - SpanKey::Duration => span.duration = read_nullable_number_slice(buf)?, - SpanKey::Error => span.error = read_nullable_number_slice(buf)?, + SpanKey::TraceId => span.trace_id = read_nullable_number::<_, u64>(buf)? as u128, + SpanKey::SpanId => span.span_id = read_nullable_number(buf)?, + SpanKey::ParentId => span.parent_id = read_nullable_number(buf)?, + SpanKey::Start => span.start = read_nullable_number(buf)?, + SpanKey::Duration => span.duration = read_nullable_number(buf)?, + SpanKey::Error => span.error = read_nullable_number(buf)?, SpanKey::Type => span.r#type = read_nullable_string(buf)?, SpanKey::Meta => span.meta = read_nullable_str_map_to_strings(buf)?, SpanKey::Metrics => span.metrics = read_metrics(buf)?, diff --git a/libdd-trace-utils/src/msgpack_decoder/v05/mod.rs b/libdd-trace-utils/src/msgpack_decoder/v05/mod.rs index ea544acb34..5f54949be2 100644 --- a/libdd-trace-utils/src/msgpack_decoder/v05/mod.rs +++ b/libdd-trace-utils/src/msgpack_decoder/v05/mod.rs @@ -3,11 +3,10 @@ use crate::msgpack_decoder::decode::error::DecodeError; use crate::msgpack_decoder::decode::{ - map::read_map_len, - number::read_number_slice, - string::{handle_null_marker, read_string_ref}, + buffer::Buffer, map::read_map_len, number::read_number, string::handle_null_marker, }; -use crate::span::{SpanBytes, SpanSlice}; +use crate::span::v04::{Span, SpanBytes, SpanSlice}; +use crate::span::TraceData; use std::collections::HashMap; const PAYLOAD_LEN: u32 = 2; @@ -70,20 +69,7 @@ const SPAN_ELEM_COUNT: u32 = 12; pub fn from_bytes( data: libdd_tinybytes::Bytes, ) -> Result<(Vec>, usize), DecodeError> { - let (traces_ref, size) = from_slice(data.as_ref())?; - - #[allow(clippy::unwrap_used)] - let traces_owned = traces_ref - .iter() - .map(|trace| { - trace - .iter() - // Safe to unwrap since the spans use subslices of the `data` slice - .map(|span| span.try_to_bytes(&data).unwrap()) - .collect() - }) - .collect(); - Ok((traces_owned, size)) + from_buffer(&mut Buffer::new(data)) } /// Decodes a slice of bytes into a `Vec>` object. @@ -140,8 +126,18 @@ pub fn from_bytes( /// let decoded_span = &decoded_traces[0][0]; /// assert_eq!("", decoded_span.name); /// ``` -pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), DecodeError> { - let data_elem = rmp::decode::read_array_len(&mut data) +pub fn from_slice(data: &[u8]) -> Result<(Vec>>, usize), DecodeError> { + from_buffer(&mut Buffer::new(data)) +} + +#[allow(clippy::type_complexity)] +fn from_buffer( + data: &mut Buffer, +) -> Result<(Vec>>, usize), DecodeError> +where + T::Text: Clone, +{ + let data_elem = rmp::decode::read_array_len(data.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to read payload len".to_string()))?; if data_elem != PAYLOAD_LEN { @@ -150,21 +146,21 @@ pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), D )); } - let dict = deserialize_dict(&mut data)?; + let dict = deserialize_dict(data)?; - let trace_count = rmp::decode::read_array_len(&mut data) + let trace_count = rmp::decode::read_array_len(data.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to read trace len".to_string()))?; - let mut traces: Vec> = Vec::with_capacity(trace_count as usize); + let mut traces: Vec>> = Vec::with_capacity(trace_count as usize); let start_len = data.len(); for _ in 0..trace_count { - let span_count = rmp::decode::read_array_len(&mut data) + let span_count = rmp::decode::read_array_len(data.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to read span len".to_string()))?; - let mut trace: Vec = Vec::with_capacity(span_count as usize); + let mut trace: Vec> = Vec::with_capacity(span_count as usize); for _ in 0..span_count { - let span = deserialize_span(&mut data, &dict)?; + let span = deserialize_span(data, &dict)?; trace.push(span); } traces.push(trace); @@ -172,21 +168,27 @@ pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), D Ok((traces, start_len - data.len())) } -fn deserialize_dict<'a>(data: &mut &'a [u8]) -> Result, DecodeError> { - let dict_len = rmp::decode::read_array_len(data) +fn deserialize_dict(data: &mut Buffer) -> Result, DecodeError> { + let dict_len = rmp::decode::read_array_len(data.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?; - let mut dict: Vec<&'a str> = Vec::with_capacity(dict_len as usize); + let mut dict: Vec = Vec::with_capacity(dict_len as usize); for _ in 0..dict_len { - let str = read_string_ref(data)?; + let str = data.read_string()?; dict.push(str); } Ok(dict) } -fn deserialize_span<'a>(data: &mut &[u8], dict: &[&'a str]) -> Result, DecodeError> { - let mut span = SpanSlice::default(); - let span_len = rmp::decode::read_array_len(data) +fn deserialize_span( + data: &mut Buffer, + dict: &[T::Text], +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let mut span = Span::default(); + let span_len = rmp::decode::read_array_len(data.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?; if span_len != SPAN_ELEM_COUNT { @@ -198,12 +200,12 @@ fn deserialize_span<'a>(data: &mut &[u8], dict: &[&'a str]) -> Result(data)? as u128; - span.span_id = read_number_slice(data)?; - span.parent_id = read_number_slice(data)?; - span.start = read_number_slice(data)?; - span.duration = read_number_slice(data)?; - span.error = read_number_slice(data)?; + span.trace_id = read_number::<_, u64>(data)? as u128; + span.span_id = read_number(data)?; + span.parent_id = read_number(data)?; + span.start = read_number(data)?; + span.duration = read_number(data)?; + span.error = read_number(data)?; span.meta = read_indexed_map_to_bytes_strings(data, dict)?; span.metrics = read_metrics(data, dict)?; span.r#type = get_from_dict(data, dict)?; @@ -211,21 +213,30 @@ fn deserialize_span<'a>(data: &mut &[u8], dict: &[&'a str]) -> Result(data: &mut &[u8], dict: &[&'a str]) -> Result<&'a str, DecodeError> { - let index: u32 = read_number_slice(data)?; +fn get_from_dict( + data: &mut Buffer, + dict: &[T::Text], +) -> Result +where + T::Text: Clone, +{ + let index: u32 = read_number(data)?; match dict.get(index as usize) { - Some(value) => Ok(value), + Some(value) => Ok(value.clone()), None => Err(DecodeError::InvalidFormat( "Unable to locate string in the dictionary".to_string(), )), } } -fn read_indexed_map_to_bytes_strings<'a>( - buf: &mut &[u8], - dict: &[&'a str], -) -> Result, DecodeError> { - let len = rmp::decode::read_map_len(buf) +fn read_indexed_map_to_bytes_strings( + buf: &mut Buffer, + dict: &[T::Text], +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let len = rmp::decode::read_map_len(buf.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; #[allow(clippy::expect_used)] @@ -238,10 +249,13 @@ fn read_indexed_map_to_bytes_strings<'a>( Ok(map) } -fn read_metrics<'a>( - buf: &mut &[u8], - dict: &[&'a str], -) -> Result, DecodeError> { +fn read_metrics( + buf: &mut Buffer, + dict: &[T::Text], +) -> Result, DecodeError> +where + T::Text: Clone, +{ if handle_null_marker(buf) { return Ok(HashMap::default()); } @@ -251,7 +265,7 @@ fn read_metrics<'a>( let mut map = HashMap::with_capacity(len); for _ in 0..len { let k = get_from_dict(buf, dict)?; - let v = read_number_slice(buf)?; + let v = read_number(buf)?; map.insert(k, v); } Ok(map) @@ -260,7 +274,9 @@ fn read_metrics<'a>( #[cfg(test)] mod tests { use super::*; + use crate::span::SliceData; use std::collections::HashMap; + type V05Span = ( u8, u8, @@ -297,7 +313,7 @@ mod tests { fn deserialize_dict_test() { let dict = vec!["foo", "bar", "baz"]; let mpack = rmp_serde::to_vec(&dict).unwrap(); - let mut payload = mpack.as_ref(); + let mut payload = Buffer::::new(mpack.as_ref()); let result = deserialize_dict(&mut payload).unwrap(); assert_eq!(dict, result); diff --git a/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs index 9b8b8f402a..97cf222268 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs @@ -1,13 +1,14 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::span::{Span, SpanText}; +use crate::span::v04::Span; +use crate::span::TraceData; use rmp::encode::{write_array_len, ByteBuf, RmpWrite, ValueWriteError}; mod span; #[inline(always)] -fn to_writer]>>( +fn to_writer]>>( writer: &mut W, traces: &[S], ) -> Result<(), ValueWriteError> { @@ -44,10 +45,10 @@ fn to_writer]>>( /// /// ``` /// use libdd_trace_utils::msgpack_encoder::v04::write_to_slice; -/// use libdd_trace_utils::span::Span; +/// use libdd_trace_utils::span::v04::SpanSlice; /// /// let mut buffer = vec![0u8; 1024]; -/// let span = Span { +/// let span = SpanSlice { /// name: "test-span", /// ..Default::default() /// }; @@ -55,7 +56,7 @@ fn to_writer]>>( /// /// write_to_slice(&mut &mut buffer[..], &traces).expect("Encoding failed"); /// ``` -pub fn write_to_slice]>>( +pub fn write_to_slice]>>( slice: &mut &mut [u8], traces: &[S], ) -> Result<(), ValueWriteError> { @@ -76,9 +77,9 @@ pub fn write_to_slice]>>( /// /// ``` /// use libdd_trace_utils::msgpack_encoder::v04::to_vec; -/// use libdd_trace_utils::span::Span; +/// use libdd_trace_utils::span::v04::SpanSlice; /// -/// let span = Span { +/// let span = SpanSlice { /// name: "test-span", /// ..Default::default() /// }; @@ -87,7 +88,7 @@ pub fn write_to_slice]>>( /// /// assert!(!encoded.is_empty()); /// ``` -pub fn to_vec]>>(traces: &[S]) -> Vec { +pub fn to_vec]>>(traces: &[S]) -> Vec { to_vec_with_capacity(traces, 0) } @@ -106,9 +107,9 @@ pub fn to_vec]>>(traces: &[S]) -> Vec { /// /// ``` /// use libdd_trace_utils::msgpack_encoder::v04::to_vec_with_capacity; -/// use libdd_trace_utils::span::Span; +/// use libdd_trace_utils::span::v04::SpanSlice; /// -/// let span = Span { +/// let span = SpanSlice { /// name: "test-span", /// ..Default::default() /// }; @@ -117,7 +118,7 @@ pub fn to_vec]>>(traces: &[S]) -> Vec { /// /// assert!(encoded.capacity() >= 1024); /// ``` -pub fn to_vec_with_capacity]>>( +pub fn to_vec_with_capacity]>>( traces: &[S], capacity: u32, ) -> Vec { @@ -165,9 +166,9 @@ impl std::io::Write for CountLength { /// /// ``` /// use libdd_trace_utils::msgpack_encoder::v04::to_len; -/// use libdd_trace_utils::span::Span; +/// use libdd_trace_utils::span::v04::SpanSlice; /// -/// let span = Span { +/// let span = SpanSlice { /// name: "test-span", /// ..Default::default() /// }; @@ -176,7 +177,7 @@ impl std::io::Write for CountLength { /// /// assert!(encoded_len > 0); /// ``` -pub fn to_len]>>(traces: &[S]) -> u32 { +pub fn to_len]>>(traces: &[S]) -> u32 { let mut counter = CountLength(0); #[allow(clippy::expect_used)] to_writer(&mut counter, traces).expect("infallible: CountLength never fails"); diff --git a/libdd-trace-utils/src/msgpack_encoder/v04/span.rs b/libdd-trace-utils/src/msgpack_encoder/v04/span.rs index 2d9207857c..4fa22dc385 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v04/span.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v04/span.rs @@ -1,11 +1,13 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::span::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink, SpanText}; +use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink}; +use crate::span::TraceData; use rmp::encode::{ write_bin, write_bool, write_f64, write_i64, write_sint, write_str, write_u32, write_u64, write_u8, RmpWrite, ValueWriteError, }; +use std::borrow::Borrow; /// Encodes a `SpanLink` object into a slice of bytes. /// @@ -15,13 +17,13 @@ use rmp::encode::{ /// /// # Returns /// -/// * `Ok(()))` - Nothing if successful. +/// * `Ok(())` - Nothing if successful. /// * `Err(ValueWriteError)` - An error if the writing fails. /// /// # Errors /// /// This function will return any error emitted by the writer. -pub fn encode_span_links( +pub fn encode_span_links( writer: &mut W, span_links: &[SpanLink], ) -> Result<(), ValueWriteError> { @@ -82,7 +84,7 @@ pub fn encode_span_links( /// # Errors /// /// This function will return any error emitted by the writer. -pub fn encode_span_events( +pub fn encode_span_events( writer: &mut W, span_events: &[SpanEvent], ) -> Result<(), ValueWriteError> { @@ -106,7 +108,7 @@ pub fn encode_span_events( for (k, attribute) in event.attributes.iter() { write_str(writer, k.borrow())?; - fn write_array_value( + fn write_array_value( writer: &mut W, value: &AttributeArrayValue, ) -> Result<(), ValueWriteError> { @@ -181,7 +183,7 @@ pub fn encode_span_events( /// /// This function will return any error emitted by the writer. #[inline(always)] -pub fn encode_span( +pub fn encode_span( writer: &mut W, span: &Span, ) -> Result<(), ValueWriteError> { @@ -256,7 +258,7 @@ pub fn encode_span( rmp::encode::write_map_len(writer, span.meta_struct.len() as u32)?; for (k, v) in span.meta_struct.iter() { write_str(writer, k.borrow())?; - write_bin(writer, v.as_ref())?; + write_bin(writer, v.borrow())?; } } diff --git a/libdd-trace-utils/src/send_data/mod.rs b/libdd-trace-utils/src/send_data/mod.rs index f704204e0d..1e8b32ec0e 100644 --- a/libdd-trace-utils/src/send_data/mod.rs +++ b/libdd-trace-utils/src/send_data/mod.rs @@ -26,7 +26,7 @@ use std::io::Write; #[cfg(feature = "compression")] use zstd::stream::write::Encoder; -#[derive(Debug, Clone)] +#[derive(Debug)] /// `SendData` is a structure that holds the data to be sent to a target endpoint. /// It includes the payloads to be sent, the size of the data, the target endpoint, /// headers for the request, and a retry strategy for sending the data. @@ -81,7 +81,6 @@ pub enum Compression { None, } -#[derive(Clone)] pub struct SendDataBuilder { pub(crate) tracer_payloads: TracerPayloadCollection, pub(crate) size: usize, @@ -219,35 +218,24 @@ impl SendData { self.retry_strategy = retry_strategy; } - /// Returns a clone of the SendData with the user-defined endpoint. - /// - /// # Arguments - /// - /// * `endpoint`: The new endpoint to be used. - pub fn with_endpoint(&self, endpoint: Endpoint) -> SendData { - SendData { - target: endpoint, - ..self.clone() - } - } - /// Sends the data to the target endpoint. /// /// # Returns /// /// A `SendDataResult` instance containing the result of the operation. pub async fn send(&self, http_client: &GenericHttpClient) -> SendDataResult { - self.send_internal(http_client).await + self.send_internal(http_client, None).await } async fn send_internal( &self, http_client: &GenericHttpClient, + endpoint: Option, ) -> SendDataResult { if self.use_protobuf() { - self.send_with_protobuf(http_client).await + self.send_with_protobuf(http_client, endpoint).await } else { - self.send_with_msgpack(http_client).await + self.send_with_msgpack(http_client, endpoint).await } } @@ -257,13 +245,14 @@ impl SendData { payload: Vec, headers: HashMap<&'static str, String>, http_client: &GenericHttpClient, + endpoint: Option<&Endpoint>, ) -> (SendWithRetryResult, u64, u64) { #[allow(clippy::unwrap_used)] let payload_len = u64::try_from(payload.len()).unwrap(); ( send_with_retry( http_client, - &self.target, + endpoint.unwrap_or(&self.target), payload, &headers, &self.retry_strategy, @@ -303,6 +292,7 @@ impl SendData { async fn send_with_protobuf( &self, http_client: &GenericHttpClient, + endpoint: Option, ) -> SendDataResult { let mut result = SendDataResult::default(); @@ -330,7 +320,13 @@ impl SendData { request_headers.insert(CONTENT_TYPE.as_str(), APPLICATION_PROTOBUF_STR.to_string()); let (response, bytes_sent, chunks) = self - .send_payload(chunks, final_payload, request_headers, http_client) + .send_payload( + chunks, + final_payload, + request_headers, + http_client, + endpoint.as_ref(), + ) .await; result.update(response, bytes_sent, chunks); @@ -344,6 +340,7 @@ impl SendData { async fn send_with_msgpack( &self, http_client: &GenericHttpClient, + endpoint: Option, ) -> SendDataResult { let mut result = SendDataResult::default(); let mut futures = FuturesUnordered::new(); @@ -362,7 +359,13 @@ impl SendData { Err(e) => return result.error(anyhow!(e)), }; - futures.push(self.send_payload(chunks, payload, headers, http_client)); + futures.push(self.send_payload( + chunks, + payload, + headers, + http_client, + endpoint.as_ref(), + )); } } TracerPayloadCollection::V04(payload) => { @@ -374,7 +377,13 @@ impl SendData { let payload = msgpack_encoder::v04::to_vec(payload); - futures.push(self.send_payload(chunks, payload, headers, http_client)); + futures.push(self.send_payload( + chunks, + payload, + headers, + http_client, + endpoint.as_ref(), + )); } TracerPayloadCollection::V05(payload) => { #[allow(clippy::unwrap_used)] @@ -388,7 +397,13 @@ impl SendData { Err(e) => return result.error(anyhow!(e)), }; - futures.push(self.send_payload(chunks, payload, headers, http_client)); + futures.push(self.send_payload( + chunks, + payload, + headers, + http_client, + endpoint.as_ref(), + )); } } @@ -1004,56 +1019,6 @@ mod tests { assert_eq!(res.responses_count_per_code.len(), 0); } - #[test] - fn test_with_endpoint() { - let header_tags = HEADER_TAGS; - let payload = setup_payload(&header_tags); - let original_endpoint = Endpoint { - api_key: Some(std::borrow::Cow::Borrowed("original-key")), - url: "http://originalexample.com/".parse::().unwrap(), - timeout_ms: 1000, - ..Endpoint::default() - }; - - let original_data = SendData::new( - 100, - TracerPayloadCollection::V07(vec![payload]), - header_tags, - &original_endpoint, - ); - - let new_endpoint = Endpoint { - api_key: Some(std::borrow::Cow::Borrowed("new-key")), - url: "http://newexample.com/".parse::().unwrap(), - timeout_ms: 2000, - ..Endpoint::default() - }; - - let new_data = original_data.with_endpoint(new_endpoint.clone()); - - assert_eq!(new_data.target.api_key, new_endpoint.api_key); - assert_eq!(new_data.target.url, new_endpoint.url); - assert_eq!(new_data.target.timeout_ms, new_endpoint.timeout_ms); - - assert_eq!(new_data.size, original_data.size); - assert_eq!(new_data.headers, original_data.headers); - assert_eq!(new_data.retry_strategy, original_data.retry_strategy); - assert_eq!( - new_data.tracer_payloads.size(), - original_data.tracer_payloads.size() - ); - - assert_eq!(original_data.target.api_key, original_endpoint.api_key); - assert_eq!(original_data.target.url, original_endpoint.url); - assert_eq!( - original_data.target.timeout_ms, - original_endpoint.timeout_ms - ); - - #[cfg(feature = "compression")] - assert!(matches!(new_data.compression, Compression::None)); - } - #[test] fn test_builder() { let header_tags = HEADER_TAGS; diff --git a/libdd-trace-utils/src/span/mod.rs b/libdd-trace-utils/src/span/mod.rs index 268e4ea957..7a3332f11f 100644 --- a/libdd-trace-utils/src/span/mod.rs +++ b/libdd-trace-utils/src/span/mod.rs @@ -2,68 +2,25 @@ // SPDX-License-Identifier: Apache-2.0 pub mod trace_utils; +pub mod v04; pub mod v05; +use crate::msgpack_decoder::decode::buffer::read_string_ref_nomut; +use crate::msgpack_decoder::decode::error::DecodeError; +use crate::span::v05::dict::SharedDict; use libdd_tinybytes::{Bytes, BytesString}; -use serde::ser::SerializeStruct; use serde::Serialize; use std::borrow::Borrow; -use std::collections::HashMap; -use std::fmt; +use std::fmt::Debug; use std::hash::Hash; -use std::str::FromStr; -use v05::dict::SharedDict; - -use crate::tracer_payload::TraceChunks; - -#[derive(Debug, PartialEq)] -pub enum SpanKey { - Service, - Name, - Resource, - TraceId, - SpanId, - ParentId, - Start, - Duration, - Error, - Meta, - Metrics, - Type, - MetaStruct, - SpanLinks, - SpanEvents, -} - -impl FromStr for SpanKey { - type Err = SpanKeyParseError; - - fn from_str(s: &str) -> Result { - match s { - "service" => Ok(SpanKey::Service), - "name" => Ok(SpanKey::Name), - "resource" => Ok(SpanKey::Resource), - "trace_id" => Ok(SpanKey::TraceId), - "span_id" => Ok(SpanKey::SpanId), - "parent_id" => Ok(SpanKey::ParentId), - "start" => Ok(SpanKey::Start), - "duration" => Ok(SpanKey::Duration), - "error" => Ok(SpanKey::Error), - "meta" => Ok(SpanKey::Meta), - "metrics" => Ok(SpanKey::Metrics), - "type" => Ok(SpanKey::Type), - "meta_struct" => Ok(SpanKey::MetaStruct), - "span_links" => Ok(SpanKey::SpanLinks), - "span_events" => Ok(SpanKey::SpanEvents), - _ => Err(SpanKeyParseError::new(format!("Invalid span key: {s}"))), - } - } -} +use std::marker::PhantomData; +use std::ptr::NonNull; +use std::{fmt, ptr}; /// Trait representing the requirements for a type to be used as a Span "string" type. /// Note: Borrow is not required by the derived traits, but allows to access HashMap elements /// from a static str and check if the string is empty. -pub trait SpanText: Eq + Hash + Borrow + Serialize + Default { +pub trait SpanText: Debug + Eq + Hash + Borrow + Serialize + Default { fn from_static_str(value: &'static str) -> Self; } @@ -79,351 +36,111 @@ impl SpanText for BytesString { } } -/// Checks if the `value` represents an empty string. Used to skip serializing empty strings -/// with serde. -fn is_empty_str>(value: &T) -> bool { - value.borrow().is_empty() +pub trait SpanBytes: Debug + Eq + Hash + Borrow<[u8]> + Serialize + Default { + fn from_static_bytes(value: &'static [u8]) -> Self; } -/// The generic representation of a V04 span. -/// -/// `T` is the type used to represent strings in the span, it can be either owned (e.g. BytesString) -/// or borrowed (e.g. &str). To define a generic function taking any `Span` you can use the -/// [`SpanValue`] trait: -/// ``` -/// use libdd_trace_utils::span::{Span, SpanText}; -/// fn foo(span: Span) { -/// let _ = span.meta.get("foo"); -/// } -/// ``` -#[derive(Clone, Debug, Default, PartialEq, Serialize)] -pub struct Span -where - T: SpanText, -{ - pub service: T, - pub name: T, - pub resource: T, - #[serde(skip_serializing_if = "is_empty_str")] - pub r#type: T, - #[serde(serialize_with = "serialize_lower_64_bits")] - pub trace_id: u128, - pub span_id: u64, - #[serde(skip_serializing_if = "is_default")] - pub parent_id: u64, - pub start: i64, - pub duration: i64, - #[serde(skip_serializing_if = "is_default")] - pub error: i32, - #[serde(skip_serializing_if = "HashMap::is_empty")] - pub meta: HashMap, - #[serde(skip_serializing_if = "HashMap::is_empty")] - pub metrics: HashMap, - // TODO: APMSP-1941 - Replace `Bytes` with a wrapper that borrows the underlying - // slice and serializes to bytes in MessagePack. - #[serde(skip_serializing_if = "HashMap::is_empty")] - pub meta_struct: HashMap, - #[serde(skip_serializing_if = "Vec::is_empty")] - pub span_links: Vec>, - #[serde(skip_serializing_if = "Vec::is_empty")] - pub span_events: Vec>, +impl SpanBytes for &[u8] { + fn from_static_bytes(value: &'static [u8]) -> Self { + value + } } -fn serialize_lower_64_bits(v: &u128, serializer: S) -> Result -where - S: serde::Serializer, -{ - serializer.serialize_u64(*v as u64) +impl SpanBytes for Bytes { + fn from_static_bytes(value: &'static [u8]) -> Self { + Bytes::from_static(value) + } } -/// The generic representation of a V04 span link. -/// `T` is the type used to represent strings in the span link. -#[derive(Clone, Debug, Default, PartialEq, Serialize)] -pub struct SpanLink -where - T: SpanText, -{ - pub trace_id: u64, - pub trace_id_high: u64, - pub span_id: u64, - #[serde(skip_serializing_if = "HashMap::is_empty")] - pub attributes: HashMap, - #[serde(skip_serializing_if = "is_empty_str")] - pub tracestate: T, - #[serde(skip_serializing_if = "is_default")] - pub flags: u32, -} +/// Trait representing a tuple of (Text, Bytes) types used for different underlying data structures. +/// Note: The functions are internal to the msgpack decoder and should not be used directly: they're +/// only exposed here due to the inavailability of min_specialization in stable Rust. +/// Also note that the Clone and PartialEq bounds are only present for tests. +pub trait TraceData: Default + Clone + Debug + PartialEq + Serialize { + type Text: SpanText; + type Bytes: SpanBytes; -/// The generic representation of a V04 span event. -/// `T` is the type used to represent strings in the span event. -#[derive(Clone, Debug, Default, PartialEq, Serialize)] -pub struct SpanEvent -where - T: SpanText, -{ - pub time_unix_nano: u64, - pub name: T, - #[serde(skip_serializing_if = "HashMap::is_empty")] - pub attributes: HashMap>, -} + fn get_mut_slice(buf: &mut Self::Bytes) -> &mut &'static [u8]; -#[derive(Clone, Debug, PartialEq)] -pub enum AttributeAnyValue -where - T: SpanText, -{ - SingleValue(AttributeArrayValue), - Array(Vec>), -} + fn try_slice_and_advance(buf: &mut Self::Bytes, bytes: usize) -> Option; -#[derive(Serialize)] -struct ArrayValueWrapper<'a, T: SpanText> { - values: &'a Vec>, + fn read_string(buf: &mut Self::Bytes) -> Result; } -impl Serialize for AttributeAnyValue -where - T: SpanText, -{ - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let mut state = serializer.serialize_struct("AttributeAnyValue", 2)?; - - match self { - AttributeAnyValue::SingleValue(attribute) => { - serialize_attribute_array::(&mut state, attribute)?; - } - AttributeAnyValue::Array(value) => { - let value_type: u8 = self.into(); - state.serialize_field("type", &value_type)?; - let wrapped_value = ArrayValueWrapper { values: value }; - state.serialize_field("array_value", &wrapped_value)?; - } - } +/// TraceData implementation using `Bytes` and `BytesString`. +#[derive(Clone, Default, Debug, PartialEq, Serialize)] +pub struct BytesData; +impl TraceData for BytesData { + type Text = BytesString; + type Bytes = Bytes; - state.end() + #[inline] + fn get_mut_slice(buf: &mut Bytes) -> &mut &'static [u8] { + // SAFETY: Bytes has the same layout + unsafe { std::mem::transmute::<&mut Bytes, &mut &[u8]>(buf) } } -} -impl From<&AttributeAnyValue> for u8 -where - T: SpanText, -{ - fn from(attribute: &AttributeAnyValue) -> u8 { - match attribute { - AttributeAnyValue::SingleValue(value) => value.into(), - AttributeAnyValue::Array(_) => 4, + #[inline] + fn try_slice_and_advance(buf: &mut Bytes, bytes: usize) -> Option { + let data = buf.slice_ref(&buf[0..bytes])?; + unsafe { + // SAFETY: forwarding the buffer requires that buf is borrowed from static. + let (ptr, len, underlying) = ptr::read(buf).into_raw(); + ptr::write( + buf, + Bytes::from_raw(ptr.add(bytes), len - bytes, underlying), + ); } - } -} - -#[derive(Clone, Debug, PartialEq)] -pub enum AttributeArrayValue -where - T: SpanText, -{ - String(T), - Boolean(bool), - Integer(i64), - Double(f64), -} - -impl Serialize for AttributeArrayValue -where - T: SpanText, -{ - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let mut state = serializer.serialize_struct("AttributeArrayValue", 2)?; - serialize_attribute_array::(&mut state, self)?; - state.end() - } -} - -fn serialize_attribute_array( - state: &mut S::SerializeStruct, - attribute: &AttributeArrayValue, -) -> Result<(), ::Error> -where - T: SpanText, - S: serde::Serializer, -{ - let attribute_type: u8 = attribute.into(); - state.serialize_field("type", &attribute_type)?; - match attribute { - AttributeArrayValue::String(value) => state.serialize_field("string_value", value), - AttributeArrayValue::Boolean(value) => state.serialize_field("bool_value", value), - AttributeArrayValue::Integer(value) => state.serialize_field("int_value", value), - AttributeArrayValue::Double(value) => state.serialize_field("double_value", value), - } -} - -impl From<&AttributeArrayValue> for u8 -where - T: SpanText, -{ - fn from(attribute: &AttributeArrayValue) -> u8 { - match attribute { - AttributeArrayValue::String(_) => 0, - AttributeArrayValue::Boolean(_) => 1, - AttributeArrayValue::Integer(_) => 2, - AttributeArrayValue::Double(_) => 3, + Some(data) + } + + #[inline] + fn read_string(buf: &mut Bytes) -> Result { + // Note: we need to pass a &'static lifetime here, otherwise it'll complain + let (str, newbuf) = read_string_ref_nomut(buf.as_ref())?; + let string = BytesString::from_bytes_slice(buf, str); + unsafe { + // SAFETY: forwarding the buffer requires that buf is borrowed from static. + let (_, _, underlying) = ptr::read(buf).into_raw(); + let new = Bytes::from_raw( + NonNull::new_unchecked(newbuf.as_ptr() as *mut _), + newbuf.len(), + underlying, + ); + ptr::write(buf, new); } + Ok(string) } } -pub type SpanBytes = Span; -pub type SpanLinkBytes = SpanLink; -pub type SpanEventBytes = SpanEvent; -pub type AttributeAnyValueBytes = AttributeAnyValue; -pub type AttributeArrayValueBytes = AttributeArrayValue; - -pub type SpanSlice<'a> = Span<&'a str>; -pub type SpanLinkSlice<'a> = SpanLink<&'a str>; -pub type SpanEventSlice<'a> = SpanEvent<&'a str>; -pub type AttributeAnyValueSlice<'a> = AttributeAnyValue<&'a str>; -pub type AttributeArrayValueSlice<'a> = AttributeArrayValue<&'a str>; - -pub type TraceChunksBytes = TraceChunks; +/// TraceData implementation using `&str` and `&[u8]`. +#[derive(Clone, Default, Debug, PartialEq, Serialize)] +pub struct SliceData<'a>(PhantomData<&'a u8>); +impl<'a> TraceData for SliceData<'a> { + type Text = &'a str; + type Bytes = &'a [u8]; -pub type SharedDictBytes = SharedDict; - -impl SpanSlice<'_> { - /// Converts a borrowed `SpanSlice` into an owned `SpanBytes`, by resolving all internal - /// references into slices of the provided `Bytes` buffer. Returns `None` if any slice is - /// out of bounds or invalid. - pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { - Some(SpanBytes { - service: BytesString::try_from_bytes_slice(bytes, self.service)?, - name: BytesString::try_from_bytes_slice(bytes, self.name)?, - resource: BytesString::try_from_bytes_slice(bytes, self.resource)?, - r#type: BytesString::try_from_bytes_slice(bytes, self.r#type)?, - trace_id: self.trace_id, - span_id: self.span_id, - parent_id: self.parent_id, - start: self.start, - duration: self.duration, - error: self.error, - meta: self - .meta - .iter() - .map(|(k, v)| { - Some(( - BytesString::try_from_bytes_slice(bytes, k)?, - BytesString::try_from_bytes_slice(bytes, v)?, - )) - }) - .collect::>>()?, - metrics: self - .metrics - .iter() - .map(|(k, v)| Some((BytesString::try_from_bytes_slice(bytes, k)?, *v))) - .collect::>>()?, - meta_struct: self - .meta_struct - .iter() - .map(|(k, v)| Some((BytesString::try_from_bytes_slice(bytes, k)?, v.clone()))) - .collect::>>()?, - span_links: self - .span_links - .iter() - .map(|link| link.try_to_bytes(bytes)) - .collect::>>()?, - span_events: self - .span_events - .iter() - .map(|event| event.try_to_bytes(bytes)) - .collect::>>()?, - }) + #[inline] + fn get_mut_slice<'b>(buf: &'b mut &'a [u8]) -> &'b mut &'static [u8] { + unsafe { std::mem::transmute::<&'b mut &[u8], &'b mut &'static [u8]>(buf) } } -} -impl SpanLinkSlice<'_> { - /// Converts a borrowed `SpanLinkSlice` into an owned `SpanLinkBytes`, using the provided - /// `Bytes` buffer to resolve all referenced strings. Returns `None` if conversion fails due - /// to invalid slice ranges. - pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { - Some(SpanLinkBytes { - trace_id: self.trace_id, - trace_id_high: self.trace_id_high, - span_id: self.span_id, - attributes: self - .attributes - .iter() - .map(|(k, v)| { - Some(( - BytesString::try_from_bytes_slice(bytes, k)?, - BytesString::try_from_bytes_slice(bytes, v)?, - )) - }) - .collect::>>()?, - tracestate: BytesString::try_from_bytes_slice(bytes, self.tracestate)?, - flags: self.flags, - }) + #[inline] + fn try_slice_and_advance(buf: &mut &'a [u8], bytes: usize) -> Option<&'a [u8]> { + let slice = buf.get(0..bytes)?; + *buf = &buf[bytes..]; + Some(slice) } -} -impl SpanEventSlice<'_> { - /// Converts a borrowed `SpanEventSlice` into an owned `SpanEventBytes`, resolving references - /// into the provided `Bytes` buffer. Fails with `None` if any slice is invalid or cannot be - /// converted. - pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { - Some(SpanEventBytes { - time_unix_nano: self.time_unix_nano, - name: BytesString::try_from_bytes_slice(bytes, self.name)?, - attributes: self - .attributes - .iter() - .map(|(k, v)| { - Some(( - BytesString::try_from_bytes_slice(bytes, k)?, - v.try_to_bytes(bytes)?, - )) - }) - .collect::>>()?, + #[inline] + fn read_string(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { + read_string_ref_nomut(buf).map(|(str, newbuf)| { + *buf = newbuf; + str }) } } -impl AttributeAnyValueSlice<'_> { - /// Converts a borrowed `AttributeAnyValueSlice` into its owned `AttributeAnyValueBytes` - /// representation, using the provided `Bytes` buffer. Recursively processes inner values if - /// it's an array. - pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { - match self { - AttributeAnyValue::SingleValue(value) => { - Some(AttributeAnyValue::SingleValue(value.try_to_bytes(bytes)?)) - } - AttributeAnyValue::Array(value) => Some(AttributeAnyValue::Array( - value - .iter() - .map(|attribute| attribute.try_to_bytes(bytes)) - .collect::>>()?, - )), - } - } -} - -impl AttributeArrayValueSlice<'_> { - /// Converts a single `AttributeArrayValueSlice` item into its owned form - /// (`AttributeArrayValueBytes`), borrowing data from the provided `Bytes` buffer when - /// necessary. - pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { - match self { - AttributeArrayValue::String(value) => Some(AttributeArrayValue::String( - BytesString::try_from_bytes_slice(bytes, value)?, - )), - AttributeArrayValue::Boolean(value) => Some(AttributeArrayValue::Boolean(*value)), - AttributeArrayValue::Integer(value) => Some(AttributeArrayValue::Integer(*value)), - AttributeArrayValue::Double(value) => Some(AttributeArrayValue::Double(*value)), - } - } -} - #[derive(Debug)] pub struct SpanKeyParseError { pub message: String, @@ -443,113 +160,4 @@ impl fmt::Display for SpanKeyParseError { } impl std::error::Error for SpanKeyParseError {} -fn is_default(t: &T) -> bool { - t == &T::default() -} - -#[cfg(test)] -mod tests { - use super::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink}; - use crate::msgpack_decoder::v04::span::decode_span; - use std::collections::HashMap; - - #[test] - fn skip_serializing_empty_fields_test() { - let expected = b"\x87\xa7service\xa0\xa4name\xa0\xa8resource\xa0\xa8trace_id\x00\xa7span_id\x00\xa5start\x00\xa8duration\x00"; - let val: Span<&str> = Span::default(); - let serialized = rmp_serde::encode::to_vec_named(&val).unwrap(); - assert_eq!(expected, serialized.as_slice()); - } - - #[test] - fn serialize_deserialize_test() { - let span: Span<&str> = Span { - name: "tracing.operation", - resource: "MyEndpoint", - span_links: vec![SpanLink { - trace_id: 42, - attributes: HashMap::from([("span", "link")]), - tracestate: "running", - ..Default::default() - }], - span_events: vec![SpanEvent { - time_unix_nano: 1727211691770716000, - name: "exception", - attributes: HashMap::from([ - ( - "exception.message", - AttributeAnyValue::SingleValue(AttributeArrayValue::String( - "Cannot divide by zero", - )), - ), - ( - "exception.type", - AttributeAnyValue::SingleValue(AttributeArrayValue::String("RuntimeError")), - ), - ( - "exception.escaped", - AttributeAnyValue::SingleValue(AttributeArrayValue::Boolean(false)), - ), - ( - "exception.count", - AttributeAnyValue::SingleValue(AttributeArrayValue::Integer(1)), - ), - ( - "exception.lines", - AttributeAnyValue::Array(vec![ - AttributeArrayValue::String(" File \"\", line 1, in "), - AttributeArrayValue::String(" File \"\", line 1, in divide"), - AttributeArrayValue::String("RuntimeError: Cannot divide by zero"), - ]), - ), - ]), - }], - ..Default::default() - }; - - let serialized = rmp_serde::encode::to_vec_named(&span).unwrap(); - let mut serialized_slice = serialized.as_ref(); - let deserialized = decode_span(&mut serialized_slice).unwrap(); - - assert_eq!(span.name, deserialized.name); - assert_eq!(span.resource, deserialized.resource); - assert_eq!( - span.span_links[0].trace_id, - deserialized.span_links[0].trace_id - ); - assert_eq!( - span.span_links[0].tracestate, - deserialized.span_links[0].tracestate - ); - assert_eq!(span.span_events[0].name, deserialized.span_events[0].name); - assert_eq!( - span.span_events[0].time_unix_nano, - deserialized.span_events[0].time_unix_nano - ); - for attribut in &deserialized.span_events[0].attributes { - assert!(span.span_events[0].attributes.contains_key(attribut.0)) - } - } - - #[test] - fn serialize_event_test() { - // `expected` is created by transforming the span into bytes - // and passing each bytes through `escaped_default` - let expected = b"\x88\xa7service\xa0\xa4name\xa0\xa8resource\xa0\xa8trace_id\x00\xa7span_id\x00\xa5start\x00\xa8duration\x00\xabspan_events\x91\x83\xaetime_unix_nano\xcf\x17\xf8I\xe1\xeb\xe5\x1f`\xa4name\xa4test\xaaattributes\x81\xaatest.event\x82\xa4type\x03\xacdouble_value\xcb@\x10\xcc\xcc\xcc\xcc\xcc\xcd"; - - let span: Span<&str> = Span { - span_events: vec![SpanEvent { - time_unix_nano: 1727211691770716000, - name: "test", - attributes: HashMap::from([( - "test.event", - AttributeAnyValue::SingleValue(AttributeArrayValue::Double(4.2)), - )]), - }], - ..Default::default() - }; - - let serialized = rmp_serde::encode::to_vec_named(&span).unwrap(); - assert_eq!(expected, serialized.as_slice()); - } -} +pub type SharedDictBytes = SharedDict; diff --git a/libdd-trace-utils/src/span/trace_utils.rs b/libdd-trace-utils/src/span/trace_utils.rs index 5583182aaa..8dd8a03b05 100644 --- a/libdd-trace-utils/src/span/trace_utils.rs +++ b/libdd-trace-utils/src/span/trace_utils.rs @@ -3,7 +3,7 @@ //! Trace-utils functionalities implementation for tinybytes based spans -use super::{Span, SpanText}; +use super::{v04::Span, SpanText, TraceData}; use std::collections::HashMap; /// Span metric the mini agent must set for the backend to recognize top level span @@ -15,10 +15,11 @@ const PARTIAL_VERSION_KEY: &str = "_dd.partial_version"; fn set_top_level_span(span: &mut Span, is_top_level: bool) where - T: SpanText, + T: TraceData, { if is_top_level { - span.metrics.insert(T::from_static_str(TOP_LEVEL_KEY), 1.0); + span.metrics + .insert(T::Text::from_static_str(TOP_LEVEL_KEY), 1.0); } else { span.metrics.remove(TOP_LEVEL_KEY); } @@ -32,7 +33,7 @@ where /// ancestor of other spans belonging to this service and attached to it). pub fn compute_top_level_span(trace: &mut [Span]) where - T: SpanText, + T: TraceData, { let mut span_id_idx: HashMap = HashMap::new(); for (i, span) in trace.iter().enumerate() { @@ -60,7 +61,7 @@ where } /// Return true if the span has a top level key set -pub fn has_top_level(span: &Span) -> bool { +pub fn has_top_level(span: &Span) -> bool { span.metrics .get(TRACER_TOP_LEVEL_KEY) .is_some_and(|v| *v == 1.0) @@ -68,7 +69,7 @@ pub fn has_top_level(span: &Span) -> bool { } /// Returns true if a span should be measured (i.e., it should get trace metrics calculated). -pub fn is_measured(span: &Span) -> bool { +pub fn is_measured(span: &Span) -> bool { span.metrics.get(MEASURED_KEY).is_some_and(|v| *v == 1.0) } @@ -77,7 +78,7 @@ pub fn is_measured(span: &Span) -> bool { /// When incomplete, a partial snapshot has a metric _dd.partial_version which is a positive /// integer. The metric usually increases each time a new version of the same span is sent by /// the tracer -pub fn is_partial_snapshot(span: &Span) -> bool { +pub fn is_partial_snapshot(span: &Span) -> bool { span.metrics .get(PARTIAL_VERSION_KEY) .is_some_and(|v| *v >= 0.0) @@ -102,10 +103,10 @@ const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr"; /// dropped and the latter to the spans dropped. /// /// # Trace-level attributes -/// Some attributes related to the whole trace are stored in the root span of the chunk. +/// Some attributes related to the whole trace are stored in the root span of the chunk. pub fn drop_chunks(traces: &mut Vec>>) -> DroppedP0Stats where - T: SpanText, + T: TraceData, { let mut dropped_p0_traces = 0; let mut dropped_p0_spans = 0; @@ -163,7 +164,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::span::SpanBytes; + use crate::span::v04::SpanBytes; fn create_test_span( trace_id: u64, diff --git a/libdd-trace-utils/src/span/v04/mod.rs b/libdd-trace-utils/src/span/v04/mod.rs new file mode 100644 index 0000000000..4f92a49f3b --- /dev/null +++ b/libdd-trace-utils/src/span/v04/mod.rs @@ -0,0 +1,430 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::span::{BytesData, SliceData, SpanKeyParseError, TraceData}; +use crate::tracer_payload::TraceChunks; +use serde::ser::SerializeStruct; +use serde::Serialize; +use std::borrow::Borrow; +use std::collections::HashMap; +use std::str::FromStr; + +#[derive(Debug, PartialEq)] +pub enum SpanKey { + Service, + Name, + Resource, + TraceId, + SpanId, + ParentId, + Start, + Duration, + Error, + Meta, + Metrics, + Type, + MetaStruct, + SpanLinks, + SpanEvents, +} + +impl FromStr for SpanKey { + type Err = SpanKeyParseError; + + fn from_str(s: &str) -> Result { + match s { + "service" => Ok(SpanKey::Service), + "name" => Ok(SpanKey::Name), + "resource" => Ok(SpanKey::Resource), + "trace_id" => Ok(SpanKey::TraceId), + "span_id" => Ok(SpanKey::SpanId), + "parent_id" => Ok(SpanKey::ParentId), + "start" => Ok(SpanKey::Start), + "duration" => Ok(SpanKey::Duration), + "error" => Ok(SpanKey::Error), + "meta" => Ok(SpanKey::Meta), + "metrics" => Ok(SpanKey::Metrics), + "type" => Ok(SpanKey::Type), + "meta_struct" => Ok(SpanKey::MetaStruct), + "span_links" => Ok(SpanKey::SpanLinks), + "span_events" => Ok(SpanKey::SpanEvents), + _ => Err(SpanKeyParseError::new(format!("Invalid span key: {s}"))), + } + } +} + +/// Checks if the `value` represents an empty string. Used to skip serializing empty strings +/// with serde. +fn is_empty_str>(value: &T) -> bool { + value.borrow().is_empty() +} + +/// The generic representation of a V04 span. +/// +/// `T` is the type used to represent strings in the span, it can be either owned (e.g. BytesString) +/// or borrowed (e.g. &str). To define a generic function taking any `Span` you can use the +/// [`SpanValue`] trait: +/// ``` +/// use libdd_trace_utils::span::{v04::Span, TraceData}; +/// fn foo(span: Span) { +/// let _ = span.meta.get("foo"); +/// } +/// ``` +#[derive(Debug, Default, PartialEq, Serialize)] +pub struct Span { + pub service: T::Text, + pub name: T::Text, + pub resource: T::Text, + #[serde(skip_serializing_if = "is_empty_str")] + pub r#type: T::Text, + #[serde(serialize_with = "serialize_lower_64_bits")] + pub trace_id: u128, + pub span_id: u64, + #[serde(skip_serializing_if = "is_default")] + pub parent_id: u64, + pub start: i64, + pub duration: i64, + #[serde(skip_serializing_if = "is_default")] + pub error: i32, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub meta: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub metrics: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub meta_struct: HashMap, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub span_links: Vec>, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub span_events: Vec>, +} + +impl Clone for Span +where + T::Text: Clone, + T::Bytes: Clone, +{ + fn clone(&self) -> Self { + Span { + service: self.service.clone(), + name: self.name.clone(), + resource: self.resource.clone(), + r#type: self.r#type.clone(), + trace_id: self.trace_id, + span_id: self.span_id, + parent_id: self.parent_id, + start: self.start, + duration: self.duration, + error: self.error, + meta: self.meta.clone(), + metrics: self.metrics.clone(), + meta_struct: self.meta_struct.clone(), + span_links: self.span_links.clone(), + span_events: self.span_events.clone(), + } + } +} + +fn serialize_lower_64_bits(v: &u128, serializer: S) -> Result +where + S: serde::Serializer, +{ + serializer.serialize_u64(*v as u64) +} + +/// The generic representation of a V04 span link. +/// `T` is the type used to represent strings in the span link. +#[derive(Debug, Default, PartialEq, Serialize)] +pub struct SpanLink { + pub trace_id: u64, + pub trace_id_high: u64, + pub span_id: u64, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub attributes: HashMap, + #[serde(skip_serializing_if = "is_empty_str")] + pub tracestate: T::Text, + #[serde(skip_serializing_if = "is_default")] + pub flags: u32, +} + +impl Clone for SpanLink +where + T::Text: Clone, +{ + fn clone(&self) -> Self { + SpanLink { + trace_id: self.trace_id, + trace_id_high: self.trace_id_high, + span_id: self.span_id, + attributes: self.attributes.clone(), + tracestate: self.tracestate.clone(), + flags: self.flags, + } + } +} + +/// The generic representation of a V04 span event. +/// `T` is the type used to represent strings in the span event. +#[derive(Debug, Default, PartialEq, Serialize)] +pub struct SpanEvent { + pub time_unix_nano: u64, + pub name: T::Text, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub attributes: HashMap>, +} + +impl Clone for SpanEvent +where + T::Text: Clone, +{ + fn clone(&self) -> Self { + SpanEvent { + time_unix_nano: self.time_unix_nano, + name: self.name.clone(), + attributes: self.attributes.clone(), + } + } +} + +#[derive(Debug, PartialEq)] +pub enum AttributeAnyValue { + SingleValue(AttributeArrayValue), + Array(Vec>), +} + +#[derive(Serialize)] +struct ArrayValueWrapper<'a, T: TraceData> { + values: &'a Vec>, +} + +impl Serialize for AttributeAnyValue { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut state = serializer.serialize_struct("AttributeAnyValue", 2)?; + + match self { + AttributeAnyValue::SingleValue(attribute) => { + serialize_attribute_array::(&mut state, attribute)?; + } + AttributeAnyValue::Array(value) => { + let value_type: u8 = self.into(); + state.serialize_field("type", &value_type)?; + let wrapped_value = ArrayValueWrapper { values: value }; + state.serialize_field("array_value", &wrapped_value)?; + } + } + + state.end() + } +} + +impl From<&AttributeAnyValue> for u8 { + fn from(attribute: &AttributeAnyValue) -> u8 { + match attribute { + AttributeAnyValue::SingleValue(value) => value.into(), + AttributeAnyValue::Array(_) => 4, + } + } +} + +impl Clone for AttributeAnyValue +where + T::Text: Clone, +{ + fn clone(&self) -> Self { + match self { + AttributeAnyValue::SingleValue(v) => AttributeAnyValue::SingleValue(v.clone()), + AttributeAnyValue::Array(vec) => AttributeAnyValue::Array(vec.clone()), + } + } +} + +#[derive(Debug, PartialEq)] +pub enum AttributeArrayValue { + String(T::Text), + Boolean(bool), + Integer(i64), + Double(f64), +} + +impl Clone for AttributeArrayValue +where + T::Text: Clone, +{ + fn clone(&self) -> Self { + match self { + AttributeArrayValue::String(v) => AttributeArrayValue::String(v.clone()), + AttributeArrayValue::Boolean(v) => AttributeArrayValue::Boolean(*v), + AttributeArrayValue::Integer(v) => AttributeArrayValue::Integer(*v), + AttributeArrayValue::Double(v) => AttributeArrayValue::Double(*v), + } + } +} + +impl Serialize for AttributeArrayValue { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut state = serializer.serialize_struct("AttributeArrayValue", 2)?; + serialize_attribute_array::(&mut state, self)?; + state.end() + } +} + +fn serialize_attribute_array( + state: &mut S::SerializeStruct, + attribute: &AttributeArrayValue, +) -> Result<(), ::Error> +where + T: TraceData, + S: serde::Serializer, +{ + let attribute_type: u8 = attribute.into(); + state.serialize_field("type", &attribute_type)?; + match attribute { + AttributeArrayValue::String(value) => state.serialize_field("string_value", value), + AttributeArrayValue::Boolean(value) => state.serialize_field("bool_value", value), + AttributeArrayValue::Integer(value) => state.serialize_field("int_value", value), + AttributeArrayValue::Double(value) => state.serialize_field("double_value", value), + } +} + +impl From<&AttributeArrayValue> for u8 { + fn from(attribute: &AttributeArrayValue) -> u8 { + match attribute { + AttributeArrayValue::String(_) => 0, + AttributeArrayValue::Boolean(_) => 1, + AttributeArrayValue::Integer(_) => 2, + AttributeArrayValue::Double(_) => 3, + } + } +} + +fn is_default(t: &T) -> bool { + t == &T::default() +} + +pub type SpanBytes = Span; +pub type SpanLinkBytes = SpanLink; +pub type SpanEventBytes = SpanEvent; +pub type AttributeAnyValueBytes = AttributeAnyValue; +pub type AttributeArrayValueBytes = AttributeArrayValue; + +pub type SpanSlice<'a> = Span>; +pub type SpanLinkSlice<'a> = SpanLink>; +pub type SpanEventSlice<'a> = SpanEvent>; +pub type AttributeAnyValueSlice<'a> = AttributeAnyValue>; +pub type AttributeArrayValueSlice<'a> = AttributeArrayValue>; + +pub type TraceChunksBytes = TraceChunks; + +#[cfg(test)] +mod tests { + use super::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink}; + use crate::msgpack_decoder::decode::buffer::Buffer; + use crate::msgpack_decoder::v04::span::decode_span; + use crate::span::SliceData; + use std::collections::HashMap; + + #[test] + fn skip_serializing_empty_fields_test() { + let expected = b"\x87\xa7service\xa0\xa4name\xa0\xa8resource\xa0\xa8trace_id\x00\xa7span_id\x00\xa5start\x00\xa8duration\x00"; + let val: Span> = Span::default(); + let serialized = rmp_serde::encode::to_vec_named(&val).unwrap(); + assert_eq!(expected, serialized.as_slice()); + } + + #[test] + fn serialize_deserialize_test() { + let span: Span> = Span { + name: "tracing.operation", + resource: "MyEndpoint", + span_links: vec![SpanLink { + trace_id: 42, + attributes: HashMap::from([("span", "link")]), + tracestate: "running", + ..Default::default() + }], + span_events: vec![SpanEvent { + time_unix_nano: 1727211691770716000, + name: "exception", + attributes: HashMap::from([ + ( + "exception.message", + AttributeAnyValue::SingleValue(AttributeArrayValue::String( + "Cannot divide by zero", + )), + ), + ( + "exception.type", + AttributeAnyValue::SingleValue(AttributeArrayValue::String("RuntimeError")), + ), + ( + "exception.escaped", + AttributeAnyValue::SingleValue(AttributeArrayValue::Boolean(false)), + ), + ( + "exception.count", + AttributeAnyValue::SingleValue(AttributeArrayValue::Integer(1)), + ), + ( + "exception.lines", + AttributeAnyValue::Array(vec![ + AttributeArrayValue::String(" File \"\", line 1, in "), + AttributeArrayValue::String(" File \"\", line 1, in divide"), + AttributeArrayValue::String("RuntimeError: Cannot divide by zero"), + ]), + ), + ]), + }], + ..Default::default() + }; + + let serialized = rmp_serde::encode::to_vec_named(&span).unwrap(); + let mut serialized_slice = Buffer::>::new(serialized.as_ref()); + let deserialized = decode_span(&mut serialized_slice).unwrap(); + + assert_eq!(span.name, deserialized.name); + assert_eq!(span.resource, deserialized.resource); + assert_eq!( + span.span_links[0].trace_id, + deserialized.span_links[0].trace_id + ); + assert_eq!( + span.span_links[0].tracestate, + deserialized.span_links[0].tracestate + ); + assert_eq!(span.span_events[0].name, deserialized.span_events[0].name); + assert_eq!( + span.span_events[0].time_unix_nano, + deserialized.span_events[0].time_unix_nano + ); + for attribut in &deserialized.span_events[0].attributes { + assert!(span.span_events[0].attributes.contains_key(attribut.0)) + } + } + + #[test] + fn serialize_event_test() { + // `expected` is created by transforming the span into bytes + // and passing each bytes through `escaped_default` + let expected = b"\x88\xa7service\xa0\xa4name\xa0\xa8resource\xa0\xa8trace_id\x00\xa7span_id\x00\xa5start\x00\xa8duration\x00\xabspan_events\x91\x83\xaetime_unix_nano\xcf\x17\xf8I\xe1\xeb\xe5\x1f`\xa4name\xa4test\xaaattributes\x81\xaatest.event\x82\xa4type\x03\xacdouble_value\xcb@\x10\xcc\xcc\xcc\xcc\xcc\xcd"; + + let span: Span> = Span { + span_events: vec![SpanEvent { + time_unix_nano: 1727211691770716000, + name: "test", + attributes: HashMap::from([( + "test.event", + AttributeAnyValue::SingleValue(AttributeArrayValue::Double(4.2)), + )]), + }], + ..Default::default() + }; + + let serialized = rmp_serde::encode::to_vec_named(&span).unwrap(); + assert_eq!(expected, serialized.as_slice()); + } +} diff --git a/libdd-trace-utils/src/span/v05/mod.rs b/libdd-trace-utils/src/span/v05/mod.rs index e21746e1c0..a4f4e8ad9d 100644 --- a/libdd-trace-utils/src/span/v05/mod.rs +++ b/libdd-trace-utils/src/span/v05/mod.rs @@ -3,7 +3,7 @@ pub mod dict; -use crate::span::{v05::dict::SharedDict, SpanText}; +use crate::span::{v05::dict::SharedDict, TraceData}; use anyhow::Result; use serde::Serialize; use std::collections::HashMap; @@ -28,9 +28,9 @@ pub struct Span { pub r#type: u32, } -pub fn from_span( - span: crate::span::Span, - dict: &mut SharedDict, +pub fn from_v04_span( + span: crate::span::v04::Span, + dict: &mut SharedDict, ) -> Result { let meta_len = span.meta.len(); let metrics_len = span.metrics.len(); @@ -65,7 +65,7 @@ pub fn from_span( #[cfg(test)] mod tests { use super::*; - use crate::span::SpanBytes; + use crate::span::v04::SpanBytes; use libdd_tinybytes::BytesString; #[test] @@ -92,7 +92,7 @@ mod tests { }; let mut dict = SharedDict::default(); - let v05_span = from_span(span, &mut dict).unwrap(); + let v05_span = from_v04_span(span, &mut dict).unwrap(); let get_index_from_str = |str: &str| -> u32 { dict.iter() diff --git a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs index 5425442b1e..c1d3cfa5f5 100644 --- a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs +++ b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs @@ -235,7 +235,7 @@ impl DatadogAgentContainerBuilder { /// /// let data = SendData::new( /// 100, -/// TracerPayloadCollection::V04(vec![trace.clone()]), +/// TracerPayloadCollection::V04(vec![trace]), /// TracerHeaderTags::default(), /// &endpoint, /// ); diff --git a/libdd-trace-utils/src/test_utils/mod.rs b/libdd-trace-utils/src/test_utils/mod.rs index ba8858ee72..da4d2b8cff 100644 --- a/libdd-trace-utils/src/test_utils/mod.rs +++ b/libdd-trace-utils/src/test_utils/mod.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::time::Duration; use crate::send_data::SendData; -use crate::span::SpanBytes; +use crate::span::v04::SpanBytes; use crate::span::{v05, SharedDictBytes}; use crate::trace_utils::TracerHeaderTags; use crate::tracer_payload::TracerPayloadCollection; diff --git a/libdd-trace-utils/src/trace_utils.rs b/libdd-trace-utils/src/trace_utils.rs index e85cdab1da..b9311061ca 100644 --- a/libdd-trace-utils/src/trace_utils.rs +++ b/libdd-trace-utils/src/trace_utils.rs @@ -4,7 +4,7 @@ pub use crate::send_data::send_data_result::SendDataResult; pub use crate::send_data::SendData; use crate::span::v05::dict::SharedDict; -use crate::span::{v05, SpanText}; +use crate::span::{v05, TraceData}; pub use crate::tracer_header_tags::TracerHeaderTags; use crate::tracer_payload::TracerPayloadCollection; use crate::tracer_payload::{self, TraceChunks}; @@ -601,8 +601,8 @@ macro_rules! parse_root_span_tags { } } -pub fn collect_trace_chunks( - traces: Vec>>, +pub fn collect_trace_chunks( + traces: Vec>>, use_v05_format: bool, ) -> anyhow::Result> { if use_v05_format { @@ -613,7 +613,7 @@ pub fn collect_trace_chunks( let v05_trace = trace.into_iter().try_fold( Vec::with_capacity(trace_len), |mut acc, span| -> anyhow::Result> { - acc.push(v05::from_span(span, &mut shared_dict)?); + acc.push(v05::from_v04_span(span, &mut shared_dict)?); Ok(acc) }, )?; @@ -728,36 +728,32 @@ mod tests { #[test] fn test_coalescing_does_not_exceed_max_size() { - let dummy = SendData::new( - MAX_PAYLOAD_SIZE / 5 + 1, - TracerPayloadCollection::V07(vec![pb::TracerPayload { - container_id: "".to_string(), - language_name: "".to_string(), - language_version: "".to_string(), - tracer_version: "".to_string(), - runtime_id: "".to_string(), - chunks: vec![pb::TraceChunk { - priority: 0, - origin: "".to_string(), - spans: vec![], + fn dummy() -> SendData { + SendData::new( + MAX_PAYLOAD_SIZE / 5 + 1, + TracerPayloadCollection::V07(vec![pb::TracerPayload { + container_id: "".to_string(), + language_name: "".to_string(), + language_version: "".to_string(), + tracer_version: "".to_string(), + runtime_id: "".to_string(), + chunks: vec![pb::TraceChunk { + priority: 0, + origin: "".to_string(), + spans: vec![], + tags: Default::default(), + dropped_trace: false, + }], tags: Default::default(), - dropped_trace: false, - }], - tags: Default::default(), - env: "".to_string(), - hostname: "".to_string(), - app_version: "".to_string(), - }]), - TracerHeaderTags::default(), - &Endpoint::default(), - ); - let coalesced = coalesce_send_data(vec![ - dummy.clone(), - dummy.clone(), - dummy.clone(), - dummy.clone(), - dummy.clone(), - ]); + env: "".to_string(), + hostname: "".to_string(), + app_version: "".to_string(), + }]), + TracerHeaderTags::default(), + &Endpoint::default(), + ) + } + let coalesced = coalesce_send_data(vec![dummy(), dummy(), dummy(), dummy(), dummy()]); assert_eq!( 5, coalesced diff --git a/libdd-trace-utils/src/tracer_payload.rs b/libdd-trace-utils/src/tracer_payload.rs index 05143fbfc6..5649ed7c74 100644 --- a/libdd-trace-utils/src/tracer_payload.rs +++ b/libdd-trace-utils/src/tracer_payload.rs @@ -2,15 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use crate::span::v05::dict::SharedDict; -use crate::span::{v05, SharedDictBytes, Span, SpanBytes, SpanText}; +use crate::span::{v04, v05, BytesData, SharedDictBytes, TraceData}; use crate::trace_utils::collect_trace_chunks; use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads}; -use libdd_tinybytes::BytesString; use libdd_trace_protobuf::pb; use std::cmp::Ordering; use std::iter::Iterator; -pub type TracerPayloadV04 = Vec; +pub type TracerPayloadV04 = Vec; pub type TracerPayloadV05 = Vec; #[derive(Debug, Clone)] @@ -18,19 +17,19 @@ pub type TracerPayloadV05 = Vec; pub enum TraceEncoding { /// v0.4 encoding (TracerPayloadV04). V04, - /// v054 encoding (TracerPayloadV04). + /// v0.5 encoding (TracerPayloadV05). V05, } -#[derive(Debug, Clone)] -pub enum TraceChunks { +#[derive(Debug)] +pub enum TraceChunks { /// Collection of TraceChunkSpan. - V04(Vec>>), + V04(Vec>>), /// Collection of TraceChunkSpan with de-duplicated strings. - V05((SharedDict, Vec>)), + V05((SharedDict, Vec>)), } -impl TraceChunks { +impl TraceChunks { pub fn into_tracer_payload_collection(self) -> TracerPayloadCollection { match self { TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces), @@ -39,7 +38,7 @@ impl TraceChunks { } } -impl TraceChunks { +impl TraceChunks { /// Returns the number of traces in the chunk pub fn size(&self) -> usize { match self { @@ -49,13 +48,13 @@ impl TraceChunks { } } -#[derive(Debug, Clone)] +#[derive(Debug)] /// Enum representing a general abstraction for a collection of tracer payloads. pub enum TracerPayloadCollection { /// Collection of TracerPayloads. V07(Vec), /// Collection of TraceChunkSpan. - V04(Vec>), + V04(Vec>), /// Collection of TraceChunkSpan with de-duplicated strings. V05((SharedDictBytes, Vec>)), } @@ -223,7 +222,7 @@ impl TraceChunkProcessor for DefaultTraceChunkProcessor { pub fn decode_to_trace_chunks( data: libdd_tinybytes::Bytes, encoding_type: TraceEncoding, -) -> Result<(TraceChunks, usize), anyhow::Error> { +) -> Result<(TraceChunks, usize), anyhow::Error> { let (data, size) = match encoding_type { TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data), TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data), @@ -239,7 +238,7 @@ pub fn decode_to_trace_chunks( #[cfg(test)] mod tests { use super::*; - use crate::span::SpanBytes; + use crate::span::v04::SpanBytes; use crate::test_utils::create_test_no_alloc_span; use libdd_tinybytes::BytesString; use libdd_trace_protobuf::pb; @@ -278,34 +277,43 @@ mod tests { #[test] fn test_append_traces_v07() { + let mut two_traces = create_dummy_collection_v07(); + two_traces.append(&mut create_dummy_collection_v07()); + let mut trace = create_dummy_collection_v07(); - let empty = TracerPayloadCollection::V07(vec![]); + let mut empty = TracerPayloadCollection::V07(vec![]); - trace.append(&mut trace.clone()); + trace.append(&mut create_dummy_collection_v07()); assert_eq!(2, trace.size()); - trace.append(&mut trace.clone()); + trace.append(&mut two_traces); assert_eq!(4, trace.size()); - trace.append(&mut empty.clone()); + trace.append(&mut empty); assert_eq!(4, trace.size()); } #[test] fn test_append_traces_v04() { - let mut trace = - TracerPayloadCollection::V04(vec![vec![create_test_no_alloc_span(0, 1, 0, 2, true)]]); + fn create_trace() -> TracerPayloadCollection { + TracerPayloadCollection::V04(vec![vec![create_test_no_alloc_span(0, 1, 0, 2, true)]]) + } + + let mut two_traces = create_trace(); + two_traces.append(&mut create_trace()); + + let mut trace = create_trace(); - let empty = TracerPayloadCollection::V04(vec![]); + let mut empty = TracerPayloadCollection::V04(vec![]); - trace.append(&mut trace.clone()); + trace.append(&mut create_trace()); assert_eq!(2, trace.size()); - trace.append(&mut trace.clone()); + trace.append(&mut two_traces); assert_eq!(4, trace.size()); - trace.append(&mut empty.clone()); + trace.append(&mut empty); assert_eq!(4, trace.size()); } @@ -313,7 +321,7 @@ mod tests { fn test_merge_traces() { let mut trace = create_dummy_collection_v07(); - trace.append(&mut trace.clone()); + trace.append(&mut create_dummy_collection_v07()); assert_eq!(2, trace.size()); trace.merge(); @@ -429,7 +437,7 @@ mod tests { #[test] fn test_try_into_meta_metrics_success() { let dummy_trace = create_trace(); - let expected = vec![dummy_trace.clone()]; + let expected = vec![create_trace()]; let payload = rmp_serde::to_vec_named(&expected).unwrap(); let payload = libdd_tinybytes::Bytes::from(payload);