From df14798270e87eeb25b903da0593471b923c8894 Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Thu, 25 Sep 2025 14:56:58 +0200 Subject: [PATCH] Introduce TraceData to unify text and binary data Also move Span structures to v04, to make space for v1 spans (which will eventually become the new default). TraceData is also going to be used in the V1 implementation, to carry around byte arrays and strings alike, separate from the indexed offsets into the big vector. Signed-off-by: Bob Weinand --- datadog-sidecar-ffi/src/span.rs | 2 +- datadog-sidecar-ffi/tests/span.rs | 2 +- .../src/service/tracing/trace_flusher.rs | 5 +- libdd-data-pipeline-ffi/src/trace_exporter.rs | 2 +- libdd-data-pipeline/src/stats_exporter.rs | 2 +- libdd-data-pipeline/src/trace_exporter/mod.rs | 12 +- .../src/trace_exporter/stats.rs | 8 +- .../src/trace_exporter/trace_serializer.rs | 10 +- libdd-tinybytes/src/lib.rs | 34 +- .../benches/span_concentrator_bench.rs | 2 +- .../src/span_concentrator/aggregation.rs | 2 +- .../src/span_concentrator/stat_span.rs | 5 +- .../src/span_concentrator/tests.rs | 2 +- .../src/msgpack_decoder/decode/buffer.rs | 63 ++ .../src/msgpack_decoder/decode/map.rs | 12 +- .../src/msgpack_decoder/decode/meta_struct.rs | 30 +- .../src/msgpack_decoder/decode/metrics.rs | 16 +- .../src/msgpack_decoder/decode/mod.rs | 1 + .../src/msgpack_decoder/decode/number.rs | 51 +- .../src/msgpack_decoder/decode/span_event.rs | 111 ++-- .../src/msgpack_decoder/decode/span_link.rs | 39 +- .../src/msgpack_decoder/decode/string.rs | 63 +- .../src/msgpack_decoder/v04/mod.rs | 42 +- .../src/msgpack_decoder/v04/span.rs | 32 +- .../src/msgpack_decoder/v05/mod.rs | 122 ++-- .../src/msgpack_encoder/v04/mod.rs | 29 +- .../src/msgpack_encoder/v04/span.rs | 16 +- libdd-trace-utils/src/send_data/mod.rs | 109 ++-- libdd-trace-utils/src/span/mod.rs | 566 +++--------------- libdd-trace-utils/src/span/trace_utils.rs | 21 +- libdd-trace-utils/src/span/v04/mod.rs | 430 +++++++++++++ libdd-trace-utils/src/span/v05/mod.rs | 12 +- .../src/test_utils/datadog_test_agent.rs | 2 +- libdd-trace-utils/src/test_utils/mod.rs | 2 +- libdd-trace-utils/src/trace_utils.rs | 62 +- libdd-trace-utils/src/tracer_payload.rs | 60 +- 36 files changed, 1039 insertions(+), 940 deletions(-) create mode 100644 libdd-trace-utils/src/msgpack_decoder/decode/buffer.rs create mode 100644 libdd-trace-utils/src/span/v04/mod.rs diff --git a/datadog-sidecar-ffi/src/span.rs b/datadog-sidecar-ffi/src/span.rs index e8aacd88f3..d2e423f9c7 100644 --- a/datadog-sidecar-ffi/src/span.rs +++ b/datadog-sidecar-ffi/src/span.rs @@ -3,7 +3,7 @@ use libdd_common_ffi::slice::{AsBytes, CharSlice}; use libdd_tinybytes::{Bytes, BytesString}; -use libdd_trace_utils::span::{ +use libdd_trace_utils::span::v04::{ AttributeAnyValueBytes, AttributeArrayValueBytes, SpanBytes, SpanEventBytes, SpanLinkBytes, }; use std::borrow::Cow; diff --git a/datadog-sidecar-ffi/tests/span.rs b/datadog-sidecar-ffi/tests/span.rs index 6e7068779d..8e79ba8000 100644 --- a/datadog-sidecar-ffi/tests/span.rs +++ b/datadog-sidecar-ffi/tests/span.rs @@ -4,7 +4,7 @@ use datadog_sidecar_ffi::span::*; use libdd_common_ffi::slice::*; use libdd_tinybytes::*; -use libdd_trace_utils::span::*; +use libdd_trace_utils::span::v04::*; use std::collections::HashMap; #[test] diff --git a/datadog-sidecar/src/service/tracing/trace_flusher.rs b/datadog-sidecar/src/service/tracing/trace_flusher.rs index 12d41fa029..78bc5f99c2 100644 --- a/datadog-sidecar/src/service/tracing/trace_flusher.rs +++ b/datadog-sidecar/src/service/tracing/trace_flusher.rs @@ -355,9 +355,8 @@ mod tests { }; let send_data_1 = create_send_data(size, &target_endpoint); - - let send_data_2 = send_data_1.clone(); - let send_data_3 = send_data_1.clone(); + let send_data_2 = create_send_data(size, &target_endpoint); + let send_data_3 = create_send_data(size, &target_endpoint); trace_flusher.enqueue(send_data_1); trace_flusher.enqueue(send_data_2); diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index c62351138c..743d312661 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -546,7 +546,7 @@ mod tests { use crate::error::ddog_trace_exporter_error_free; use httpmock::prelude::*; use httpmock::MockServer; - use libdd_trace_utils::span::SpanSlice; + use libdd_trace_utils::span::v04::SpanSlice; use std::{borrow::Borrow, mem::MaybeUninit}; #[test] diff --git a/libdd-data-pipeline/src/stats_exporter.rs b/libdd-data-pipeline/src/stats_exporter.rs index 150b9dfcc7..a154be6b18 100644 --- a/libdd-data-pipeline/src/stats_exporter.rs +++ b/libdd-data-pipeline/src/stats_exporter.rs @@ -195,7 +195,7 @@ mod tests { use httpmock::prelude::*; use httpmock::MockServer; use libdd_common::hyper_migration::new_default_client; - use libdd_trace_utils::span::{trace_utils, SpanSlice}; + use libdd_trace_utils::span::{trace_utils, v04::SpanSlice}; use libdd_trace_utils::test_utils::poll_for_mock_hit; use time::Duration; use time::SystemTime; diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index a8c0d2a8ac..1c4bb22292 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -42,7 +42,7 @@ use libdd_trace_utils::msgpack_decoder; use libdd_trace_utils::send_with_retry::{ send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult, }; -use libdd_trace_utils::span::{Span, SpanText}; +use libdd_trace_utils::span::{v04::Span, TraceData}; use libdd_trace_utils::trace_utils::TracerHeaderTags; use std::io; use std::sync::{Arc, Mutex}; @@ -587,7 +587,7 @@ impl TraceExporter { /// # Returns /// * Ok(String): The response from the agent /// * Err(TraceExporterError): An error detailing what went wrong in the process - pub fn send_trace_chunks( + pub fn send_trace_chunks( &self, trace_chunks: Vec>>, ) -> Result { @@ -604,7 +604,7 @@ impl TraceExporter { /// # Returns /// * Ok(String): The response from the agent /// * Err(TraceExporterError): An error detailing what went wrong in the process - pub async fn send_trace_chunks_async( + pub async fn send_trace_chunks_async( &self, trace_chunks: Vec>>, ) -> Result { @@ -687,7 +687,7 @@ impl TraceExporter { self.handle_send_result(result, chunks, payload_len).await } - async fn send_trace_chunks_inner( + async fn send_trace_chunks_inner( &self, mut traces: Vec>>, ) -> Result { @@ -1012,8 +1012,8 @@ mod tests { use httpmock::MockServer; use libdd_tinybytes::BytesString; use libdd_trace_utils::msgpack_encoder; + use libdd_trace_utils::span::v04::SpanBytes; use libdd_trace_utils::span::v05; - use libdd_trace_utils::span::SpanBytes; use std::collections::HashMap; use std::net; use std::time::Duration; @@ -2024,7 +2024,7 @@ mod single_threaded_tests { use crate::agent_info; use httpmock::prelude::*; use libdd_trace_utils::msgpack_encoder; - use libdd_trace_utils::span::SpanBytes; + use libdd_trace_utils::span::v04::SpanBytes; use std::time::Duration; use tokio::time::sleep; diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 1b2090776b..3f2654e2f5 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -206,9 +206,9 @@ pub(crate) fn handle_stats_enabled( /// Add all spans from the given iterator into the stats concentrator /// # Panic /// Will panic if another thread panicked will holding the lock on `stats_concentrator` -fn add_spans_to_stats( +fn add_spans_to_stats( stats_concentrator: &Mutex, - traces: &[Vec>], + traces: &[Vec>], ) { let mut stats_concentrator = stats_concentrator.lock_or_panic(); @@ -219,8 +219,8 @@ fn add_spans_to_stats( } /// Process traces for stats computation and update header tags accordingly -pub(crate) fn process_traces_for_stats( - traces: &mut Vec>>, +pub(crate) fn process_traces_for_stats( + traces: &mut Vec>>, header_tags: &mut libdd_trace_utils::trace_utils::TracerHeaderTags, client_side_stats: &ArcSwap, client_computed_top_level: bool, diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 515495d7a7..9f17b2e16a 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -12,7 +12,7 @@ use libdd_common::header::{ }; use libdd_trace_utils::msgpack_decoder::decode::error::DecodeError; use libdd_trace_utils::msgpack_encoder; -use libdd_trace_utils::span::{Span, SpanText}; +use libdd_trace_utils::span::{v04::Span, TraceData}; use libdd_trace_utils::trace_utils::{self, TracerHeaderTags}; use libdd_trace_utils::tracer_payload; use std::collections::HashMap; @@ -46,7 +46,7 @@ impl<'a> TraceSerializer<'a> { } /// Prepare traces payload and HTTP headers for sending to agent - pub(super) fn prepare_traces_payload( + pub(super) fn prepare_traces_payload( &self, traces: Vec>>, header_tags: TracerHeaderTags, @@ -64,7 +64,7 @@ impl<'a> TraceSerializer<'a> { } /// Collect trace chunks based on output format - fn collect_and_process_traces( + fn collect_and_process_traces( &self, traces: Vec>>, ) -> Result, TraceExporterError> { @@ -97,7 +97,7 @@ impl<'a> TraceSerializer<'a> { } /// Serialize payload to msgpack format - fn serialize_payload( + fn serialize_payload( &self, payload: &tracer_payload::TraceChunks, ) -> Result, TraceExporterError> { @@ -119,7 +119,7 @@ mod tests { APPLICATION_MSGPACK_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR, DATADOG_TRACE_COUNT_STR, }; use libdd_tinybytes::BytesString; - use libdd_trace_utils::span::SpanBytes; + use libdd_trace_utils::span::v04::SpanBytes; use libdd_trace_utils::trace_utils::TracerHeaderTags; fn create_test_span() -> SpanBytes { diff --git a/libdd-tinybytes/src/lib.rs b/libdd-tinybytes/src/lib.rs index 3d0a124d8c..ea5dfbf913 100644 --- a/libdd-tinybytes/src/lib.rs +++ b/libdd-tinybytes/src/lib.rs @@ -7,6 +7,8 @@ #![cfg_attr(not(test), deny(clippy::todo))] #![cfg_attr(not(test), deny(clippy::unimplemented))] +#[cfg(feature = "serde")] +use serde::Serialize; use std::{ borrow, cmp, fmt, hash, ops::{self, RangeBounds}, @@ -14,11 +16,9 @@ use std::{ sync::atomic::AtomicUsize, }; -#[cfg(feature = "serde")] -use serde::Serialize; - /// Immutable bytes type with zero copy cloning and slicing. #[derive(Clone)] +#[repr(C)] // fixed layout for ad-hoc conversion to slice pub struct Bytes { ptr: NonNull, len: usize, @@ -49,11 +49,24 @@ impl Bytes { len: usize, refcount: RefCountedCell, ) -> Self { - Self { - ptr, - len, - bytes: Some(refcount), - } + Self::from_raw(ptr, len, Some(refcount)) + } + + #[inline] + /// Creates a new `Bytes` from the given slice data and the refcount. Can be used after calling + /// into_raw(). + /// + /// # Safety + /// + /// * the pointer should be valid for the given length + /// * the pointer should be valid for reads as long as the refcount or any of it's clone is not + /// dropped + pub const unsafe fn from_raw( + ptr: NonNull, + len: usize, + bytes: Option, + ) -> Self { + Self { ptr, len, bytes } } /// Creates empty `Bytes`. @@ -235,6 +248,11 @@ impl Bytes { // SAFETY: ptr is valid for the associated length unsafe { std::slice::from_raw_parts(self.ptr.as_ptr().cast_const(), self.len()) } } + + #[inline] + pub fn into_raw(self) -> (NonNull, usize, Option) { + (self.ptr, self.len, self.bytes) + } } // Implementations of `UnderlyingBytes` for common types. diff --git a/libdd-trace-stats/benches/span_concentrator_bench.rs b/libdd-trace-stats/benches/span_concentrator_bench.rs index cad2e55404..21c105c952 100644 --- a/libdd-trace-stats/benches/span_concentrator_bench.rs +++ b/libdd-trace-stats/benches/span_concentrator_bench.rs @@ -7,7 +7,7 @@ use std::{ use criterion::{criterion_group, Criterion}; use libdd_trace_stats::span_concentrator::SpanConcentrator; -use libdd_trace_utils::span::SpanBytes; +use libdd_trace_utils::span::v04::SpanBytes; fn get_bucket_start(now: SystemTime, n: u64) -> i64 { let start = now.duration_since(time::UNIX_EPOCH).unwrap() + Duration::from_secs(10 * n); diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index 3ef068cd5f..07863b3a98 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -301,7 +301,7 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl #[cfg(test)] mod tests { - use libdd_trace_utils::span::{SpanBytes, SpanSlice}; + use libdd_trace_utils::span::v04::{SpanBytes, SpanSlice}; use super::*; use std::{collections::HashMap, hash::Hash}; diff --git a/libdd-trace-stats/src/span_concentrator/stat_span.rs b/libdd-trace-stats/src/span_concentrator/stat_span.rs index 31ac1676e2..4696d50581 100644 --- a/libdd-trace-stats/src/span_concentrator/stat_span.rs +++ b/libdd-trace-stats/src/span_concentrator/stat_span.rs @@ -5,8 +5,9 @@ //! support both trace-utils' Span and pb::Span. use libdd_trace_protobuf::pb; -use libdd_trace_utils::span::{trace_utils, Span, SpanText}; +use libdd_trace_utils::span::{trace_utils, v04::Span, TraceData}; use libdd_trace_utils::trace_utils as pb_utils; +use std::borrow::Borrow; /// Common interface for spans used in stats computation pub trait StatSpan<'a> { @@ -38,7 +39,7 @@ pub trait StatSpan<'a> { fn get_metrics(&'a self, key: &str) -> Option; } -impl<'a, T: SpanText> StatSpan<'a> for Span { +impl<'a, T: TraceData> StatSpan<'a> for Span { fn service(&'a self) -> &'a str { self.service.borrow() } diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index 3f4f8c05df..647b267dce 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -4,7 +4,7 @@ use crate::span_concentrator::aggregation::OwnedAggregationKey; use super::*; -use libdd_trace_utils::span::{trace_utils::compute_top_level_span, SpanSlice}; +use libdd_trace_utils::span::{trace_utils::compute_top_level_span, v04::SpanSlice}; use rand::{thread_rng, Rng}; const BUCKET_SIZE: u64 = Duration::from_secs(2).as_nanos() as u64; diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/buffer.rs b/libdd-trace-utils/src/msgpack_decoder/decode/buffer.rs new file mode 100644 index 0000000000..76df2f1c1f --- /dev/null +++ b/libdd-trace-utils/src/msgpack_decoder/decode/buffer.rs @@ -0,0 +1,63 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::msgpack_decoder::decode::error::DecodeError; +use crate::span::TraceData; +use rmp::decode; +use rmp::decode::DecodeStringError; + +use std::borrow::Borrow; +use std::ops::Deref; + +/// Read a string from `buf`. +/// +/// # Errors +/// Fails if the buffer doesn't contain a valid utf8 msgpack string. +#[inline] +pub fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { + decode::read_str_from_slice(buf).map_err(|e| match e { + DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()), + DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()), + DecodeStringError::TypeMismatch(marker) => { + DecodeError::InvalidType(format!("Type mismatch at marker {marker:?}")) + } + DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()), + _ => DecodeError::IOError, + }) +} + +/// Internal Buffer used to wrap msgpack data for decoding. +/// Provides a couple accessors to extract data from the buffer. +pub struct Buffer(T::Bytes); + +impl Buffer { + pub fn new(data: T::Bytes) -> Self { + Buffer(data) + } + + /// Returns a mutable reference to the underlying slice. + pub fn as_mut_slice(&mut self) -> &mut &'static [u8] { + T::get_mut_slice(&mut self.0) + } + + /// Tries to extract a slice of `bytes` from the buffer and advances the buffer. + pub fn try_slice_and_advance(&mut self, bytes: usize) -> Option { + T::try_slice_and_advance(&mut self.0, bytes) + } + + /// Read a string from the slices `buf`. + /// + /// # Errors + /// Fails if the buffer doesn't contain a valid utf8 msgpack string. + pub fn read_string(&mut self) -> Result { + T::read_string(&mut self.0) + } +} + +impl Deref for Buffer { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.0.borrow() + } +} diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/map.rs b/libdd-trace-utils/src/msgpack_decoder/decode/map.rs index c6dd22fb1d..9641d80864 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/map.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/map.rs @@ -1,7 +1,8 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::msgpack_decoder::decode::error::DecodeError; +use crate::msgpack_decoder::decode::{buffer::Buffer, error::DecodeError}; +use crate::span::TraceData; use rmp::{decode, decode::RmpRead, Marker}; use std::collections::HashMap; @@ -33,14 +34,14 @@ use std::collections::HashMap; /// * `V` - The type of the values in the map. /// * `F` - The type of the function used to read key-value pairs from the buffer. #[inline] -pub fn read_map<'a, K, V, F>( +pub fn read_map( len: usize, - buf: &mut &'a [u8], + buf: &mut B, read_pair: F, ) -> Result, DecodeError> where K: std::hash::Hash + Eq, - F: Fn(&mut &'a [u8]) -> Result<(K, V), DecodeError>, + F: Fn(&mut B) -> Result<(K, V), DecodeError>, { let mut map = HashMap::with_capacity(len); for _ in 0..len { @@ -67,7 +68,8 @@ where /// - The buffer does not contain a map. /// - There is an error reading from the buffer. #[inline] -pub fn read_map_len(buf: &mut &[u8]) -> Result { +pub fn read_map_len(buf: &mut Buffer) -> Result { + let buf = buf.as_mut_slice(); match decode::read_marker(buf) .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))? { diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/meta_struct.rs b/libdd-trace-utils/src/msgpack_decoder/decode/meta_struct.rs index 1e28a7057a..30c7fe4620 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/meta_struct.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/meta_struct.rs @@ -1,33 +1,35 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; use crate::msgpack_decoder::decode::map::{read_map, read_map_len}; -use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_ref}; -use libdd_tinybytes::Bytes; +use crate::msgpack_decoder::decode::string::handle_null_marker; +use crate::span::TraceData; use rmp::decode; use std::collections::HashMap; -fn read_byte_array_len(buf: &mut &[u8]) -> Result { - decode::read_bin_len(buf).map_err(|_| { +fn read_byte_array_len(buf: &mut Buffer) -> Result { + decode::read_bin_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidFormat("Unable to read binary len for meta_struct".to_owned()) }) } #[inline] -pub fn read_meta_struct<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { +pub fn read_meta_struct( + buf: &mut Buffer, +) -> Result, DecodeError> { if handle_null_marker(buf) { return Ok(HashMap::default()); } - fn read_meta_struct_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, Bytes), DecodeError> { - let key = read_string_ref(buf)?; + fn read_meta_struct_pair( + buf: &mut Buffer, + ) -> Result<(T::Text, T::Bytes), DecodeError> { + let key = buf.read_string()?; let byte_array_len = read_byte_array_len(buf)? as usize; - let slice = buf.get(0..byte_array_len); - if let Some(slice) = slice { - let data = Bytes::copy_from_slice(slice); - *buf = &buf[byte_array_len..]; + if let Some(data) = buf.try_slice_and_advance(byte_array_len) { Ok((key, data)) } else { Err(DecodeError::InvalidFormat( @@ -43,13 +45,15 @@ pub fn read_meta_struct<'a>(buf: &mut &'a [u8]) -> Result::new(serialized.as_ref()); let res = read_meta_struct(&mut slice).unwrap(); assert_eq!(res.get("key").unwrap().to_vec(), vec![1, 2, 3, 4]); @@ -60,7 +64,7 @@ mod tests { let meta = HashMap::from([("key".to_string(), vec![1, 2, 3, 4])]); let serialized = rmp_serde::to_vec_named(&meta).unwrap(); - let mut slice = serialized.as_ref(); + let mut slice = Buffer::::new(serialized.as_ref()); let res = read_meta_struct(&mut slice); assert!(res.is_err()); diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/metrics.rs b/libdd-trace-utils/src/msgpack_decoder/decode/metrics.rs index 5b84ea16be..8dfc8cefed 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/metrics.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/metrics.rs @@ -1,21 +1,25 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; use crate::msgpack_decoder::decode::map::{read_map, read_map_len}; -use crate::msgpack_decoder::decode::number::read_number_slice; -use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_ref}; +use crate::msgpack_decoder::decode::number::read_number; +use crate::msgpack_decoder::decode::string::handle_null_marker; +use crate::span::TraceData; use std::collections::HashMap; #[inline] -pub fn read_metric_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, f64), DecodeError> { - let key = read_string_ref(buf)?; - let v = read_number_slice(buf)?; +pub fn read_metric_pair(buf: &mut Buffer) -> Result<(T::Text, f64), DecodeError> { + let key = buf.read_string()?; + let v = read_number(buf)?; Ok((key, v)) } #[inline] -pub fn read_metrics<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { +pub fn read_metrics( + buf: &mut Buffer, +) -> Result, DecodeError> { if handle_null_marker(buf) { return Ok(HashMap::default()); } diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/mod.rs b/libdd-trace-utils/src/msgpack_decoder/decode/mod.rs index 156f8614d1..48f0d75506 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/mod.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/mod.rs @@ -1,6 +1,7 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +pub mod buffer; pub mod error; pub mod map; pub mod meta_struct; diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/number.rs b/libdd-trace-utils/src/msgpack_decoder/decode/number.rs index 10ef05b02d..d057ba9545 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/number.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/number.rs @@ -1,7 +1,9 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; +use crate::span::TraceData; use rmp::{decode::RmpRead, Marker}; use std::fmt; @@ -146,7 +148,7 @@ impl TryFrom for f64 { } } -fn read_number(buf: &mut &[u8], allow_null: bool) -> Result { +fn read_num(buf: &mut &[u8], allow_null: bool) -> Result { match rmp::decode::read_marker(buf) .map_err(|_| DecodeError::InvalidFormat("Unable to read marker for number".to_owned()))? { @@ -194,22 +196,23 @@ fn read_number(buf: &mut &[u8], allow_null: bool) -> Result } /// Read a msgpack encoded number from `buf`. -pub fn read_number_slice>( - buf: &mut &[u8], -) -> Result { - read_number(buf, false)?.try_into() +pub fn read_number>( + buf: &mut Buffer, +) -> Result { + read_num(buf.as_mut_slice(), false)?.try_into() } /// Read a msgpack encoded number from `buf` and return 0 if null. -pub fn read_nullable_number_slice>( - buf: &mut &[u8], -) -> Result { - read_number(buf, true)?.try_into() +pub fn read_nullable_number>( + buf: &mut Buffer, +) -> Result { + read_num(buf.as_mut_slice(), true)?.try_into() } #[cfg(test)] mod tests { use super::*; + use crate::span::SliceData; use serde_json::json; use std::f64; @@ -219,8 +222,8 @@ mod tests { let expected_value = 42; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: u8 = read_number_slice(&mut slice).unwrap(); + let mut slice = Buffer::::new(buf.as_slice()); + let result: u8 = read_number(&mut slice).unwrap(); assert_eq!(result, expected_value); } @@ -230,8 +233,8 @@ mod tests { let expected_value = 42; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: i8 = read_number_slice(&mut slice).unwrap(); + let mut slice = Buffer::::new(buf.as_slice()); + let result: i8 = read_number(&mut slice).unwrap(); assert_eq!(result, expected_value); } @@ -241,18 +244,18 @@ mod tests { let expected_value = 42.98; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: f64 = read_number_slice(&mut slice).unwrap(); + let mut slice = Buffer::::new(buf.as_slice()); + let result: f64 = read_number(&mut slice).unwrap(); assert_eq!(result, expected_value); } #[test] - fn test_decoding_null_through_read_number_slice_raises_exception() { + fn test_decoding_null_through_read_number_raises_exception() { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: Result = read_number_slice(&mut slice); + let mut slice = Buffer::::new(buf.as_slice()); + let result: Result = read_number(&mut slice); assert!(matches!(result, Err(DecodeError::InvalidType(_)))); assert_eq!( @@ -266,8 +269,8 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: u8 = read_nullable_number_slice(&mut slice).unwrap(); + let mut slice = Buffer::::new(buf.as_slice()); + let result: u8 = read_nullable_number(&mut slice).unwrap(); assert_eq!(result, 0); } @@ -276,8 +279,8 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: i8 = read_nullable_number_slice(&mut slice).unwrap(); + let mut slice = Buffer::::new(buf.as_slice()); + let result: i8 = read_nullable_number(&mut slice).unwrap(); assert_eq!(result, 0); } @@ -286,8 +289,8 @@ mod tests { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut slice = buf.as_slice(); - let result: f64 = read_nullable_number_slice(&mut slice).unwrap(); + let mut slice = Buffer::::new(buf.as_slice()); + let result: f64 = read_nullable_number(&mut slice).unwrap(); assert_eq!(result, 0.0); } diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/span_event.rs b/libdd-trace-utils/src/msgpack_decoder/decode/span_event.rs index 4b3cb0cc43..6befce9f40 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/span_event.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/span_event.rs @@ -1,10 +1,13 @@ // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; -use crate::msgpack_decoder::decode::number::read_number_slice; -use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_ref}; -use crate::span::{AttributeAnyValueSlice, AttributeArrayValueSlice, SpanEventSlice}; +use crate::msgpack_decoder::decode::number::read_number; +use crate::msgpack_decoder::decode::string::handle_null_marker; +use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, SpanEvent}; +use crate::span::TraceData; +use std::borrow::Borrow; use std::collections::HashMap; use std::str::FromStr; @@ -24,18 +27,18 @@ use std::str::FromStr; /// This function will return an error if: /// - The marker for the array length cannot be read. /// - Any `SpanEvent` cannot be decoded. -pub(crate) fn read_span_events<'a>( - buf: &mut &'a [u8], -) -> Result>, DecodeError> { +pub(crate) fn read_span_events( + buf: &mut Buffer, +) -> Result>, DecodeError> { if handle_null_marker(buf) { return Ok(Vec::default()); } - let len = rmp::decode::read_array_len(buf).map_err(|_| { + let len = rmp::decode::read_array_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidType("Unable to get array len for span events".to_owned()) })?; - let mut vec: Vec = Vec::with_capacity(len as usize); + let mut vec: Vec> = Vec::with_capacity(len as usize); for _ in 0..len { vec.push(decode_span_event(buf)?); } @@ -63,15 +66,15 @@ impl FromStr for SpanEventKey { } } -fn decode_span_event<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { - let mut event = SpanEventSlice::default(); - let event_size = rmp::decode::read_map_len(buf) +fn decode_span_event(buf: &mut Buffer) -> Result, DecodeError> { + let mut event = SpanEvent::default(); + let event_size = rmp::decode::read_map_len(buf.as_mut_slice()) .map_err(|_| DecodeError::InvalidType("Unable to get map len for event size".to_owned()))?; for _ in 0..event_size { - match read_string_ref(buf)?.parse::()? { - SpanEventKey::TimeUnixNano => event.time_unix_nano = read_number_slice(buf)?, - SpanEventKey::Name => event.name = read_string_ref(buf)?, + match buf.read_string()?.borrow().parse::()? { + SpanEventKey::TimeUnixNano => event.time_unix_nano = read_number(buf)?, + SpanEventKey::Name => event.name = buf.read_string()?, SpanEventKey::Attributes => event.attributes = read_attributes_map(buf)?, } } @@ -79,16 +82,16 @@ fn decode_span_event<'a>(buf: &mut &'a [u8]) -> Result, Decod Ok(event) } -fn read_attributes_map<'a>( - buf: &mut &'a [u8], -) -> Result>, DecodeError> { - let len = rmp::decode::read_map_len(buf) +fn read_attributes_map( + buf: &mut Buffer, +) -> Result>, DecodeError> { + let len = rmp::decode::read_map_len(buf.as_mut_slice()) .map_err(|_| DecodeError::InvalidType("Unable to get map len for attributes".to_owned()))?; #[allow(clippy::expect_used)] let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); for _ in 0..len { - let key = read_string_ref(buf)?; + let key = buf.read_string()?; let value = decode_attribute_any(buf)?; map.insert(key, value); } @@ -121,9 +124,11 @@ impl FromStr for AttributeAnyKey { } } -fn decode_attribute_any<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { - let mut attribute: Option = None; - let attribute_size = rmp::decode::read_map_len(buf).map_err(|_| { +fn decode_attribute_any( + buf: &mut Buffer, +) -> Result, DecodeError> { + let mut attribute: Option> = None; + let attribute_size = rmp::decode::read_map_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidType("Unable to get map len for attribute size".to_owned()) })?; @@ -135,15 +140,15 @@ fn decode_attribute_any<'a>(buf: &mut &'a [u8]) -> Result = None; for _ in 0..attribute_size { - match read_string_ref(buf)?.parse::()? { - AttributeAnyKey::Type => attribute_type = Some(read_number_slice(buf)?), + match buf.read_string()?.borrow().parse::()? { + AttributeAnyKey::Type => attribute_type = Some(read_number(buf)?), AttributeAnyKey::SingleValue(key) => { - attribute = Some(AttributeAnyValueSlice::SingleValue(get_attribute_from_key( + attribute = Some(AttributeAnyValue::SingleValue(get_attribute_from_key( buf, key, )?)) } AttributeAnyKey::ArrayValue => { - attribute = Some(AttributeAnyValueSlice::Array(read_attributes_array(buf)?)) + attribute = Some(AttributeAnyValue::Array(read_attributes_array(buf)?)) } } } @@ -168,14 +173,14 @@ fn decode_attribute_any<'a>(buf: &mut &'a [u8]) -> Result( - buf: &mut &'a [u8], -) -> Result>, DecodeError> { +fn read_attributes_array( + buf: &mut Buffer, +) -> Result>, DecodeError> { if handle_null_marker(buf) { return Ok(Vec::default()); } - let map_len = rmp::decode::read_map_len(buf).map_err(|_| { + let map_len = rmp::decode::read_map_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidType( "Unable to get map len for event attributes array_value object".to_owned(), ) @@ -187,20 +192,20 @@ fn read_attributes_array<'a>( )); } - let key = read_string_ref(buf)?; - if key != "values" { + let key = buf.read_string()?; + if key.borrow() != "values" { return Err(DecodeError::InvalidFormat( "Expected 'values' field in event attributes array_value object".to_owned(), )); } - let len = rmp::decode::read_array_len(buf).map_err(|_| { + let len = rmp::decode::read_array_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidType( "Unable to get array len for event attributes values field".to_owned(), ) })?; - let mut vec: Vec = Vec::with_capacity(len as usize); + let mut vec: Vec> = Vec::with_capacity(len as usize); if len > 0 { let first = decode_attribute_array(buf, None)?; let array_type = (&first).into(); @@ -238,41 +243,35 @@ impl FromStr for AttributeArrayKey { } } -fn get_attribute_from_key<'a>( - buf: &mut &'a [u8], +fn get_attribute_from_key( + buf: &mut Buffer, key: AttributeArrayKey, -) -> Result, DecodeError> { +) -> Result, DecodeError> { match key { - AttributeArrayKey::StringValue => { - Ok(AttributeArrayValueSlice::String(read_string_ref(buf)?)) - } + AttributeArrayKey::StringValue => Ok(AttributeArrayValue::String(buf.read_string()?)), AttributeArrayKey::BoolValue => { - let boolean = rmp::decode::read_bool(buf); + let boolean = rmp::decode::read_bool(buf.as_mut_slice()); if let Ok(value) = boolean { match value { - true => Ok(AttributeArrayValueSlice::Boolean(true)), - false => Ok(AttributeArrayValueSlice::Boolean(false)), + true => Ok(AttributeArrayValue::Boolean(true)), + false => Ok(AttributeArrayValue::Boolean(false)), } } else { Err(DecodeError::InvalidType("Invalid boolean field".to_owned())) } } - AttributeArrayKey::IntValue => { - Ok(AttributeArrayValueSlice::Integer(read_number_slice(buf)?)) - } - AttributeArrayKey::DoubleValue => { - Ok(AttributeArrayValueSlice::Double(read_number_slice(buf)?)) - } + AttributeArrayKey::IntValue => Ok(AttributeArrayValue::Integer(read_number(buf)?)), + AttributeArrayKey::DoubleValue => Ok(AttributeArrayValue::Double(read_number(buf)?)), _ => Err(DecodeError::InvalidFormat("Invalid attribute".to_owned())), } } -fn decode_attribute_array<'a>( - buf: &mut &'a [u8], +fn decode_attribute_array( + buf: &mut Buffer, array_type: Option, -) -> Result, DecodeError> { - let mut attribute: Option = None; - let attribute_size = rmp::decode::read_map_len(buf).map_err(|_| { +) -> Result, DecodeError> { + let mut attribute: Option> = None; + let attribute_size = rmp::decode::read_map_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidType("Unable to get map len for attribute size".to_owned()) })?; @@ -284,8 +283,8 @@ fn decode_attribute_array<'a>( let mut attribute_type: Option = None; for _ in 0..attribute_size { - match read_string_ref(buf)?.parse::()? { - AttributeArrayKey::Type => attribute_type = Some(read_number_slice(buf)?), + match buf.read_string()?.borrow().parse::()? { + AttributeArrayKey::Type => attribute_type = Some(read_number(buf)?), key => attribute = Some(get_attribute_from_key(buf, key)?), } } diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/span_link.rs b/libdd-trace-utils/src/msgpack_decoder/decode/span_link.rs index acb3efb15d..fffacbe3ac 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/span_link.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/span_link.rs @@ -1,12 +1,13 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; -use crate::msgpack_decoder::decode::number::read_number_slice; -use crate::msgpack_decoder::decode::string::{ - handle_null_marker, read_str_map_to_strings, read_string_ref, -}; -use crate::span::SpanLinkSlice; +use crate::msgpack_decoder::decode::number::read_number; +use crate::msgpack_decoder::decode::string::{handle_null_marker, read_str_map_to_strings}; +use crate::span::v04::SpanLink; +use crate::span::TraceData; +use std::borrow::Borrow; use std::str::FromStr; /// Reads a slice of bytes and decodes it into a vector of `SpanLink` objects. @@ -26,18 +27,18 @@ use std::str::FromStr; /// - The marker for the array length cannot be read. /// - Any `SpanLink` cannot be decoded. /// ``` -pub(crate) fn read_span_links<'a>( - buf: &mut &'a [u8], -) -> Result>, DecodeError> { +pub(crate) fn read_span_links( + buf: &mut Buffer, +) -> Result>, DecodeError> { if handle_null_marker(buf) { return Ok(Vec::default()); } - let len = rmp::decode::read_array_len(buf).map_err(|_| { + let len = rmp::decode::read_array_len(buf.as_mut_slice()).map_err(|_| { DecodeError::InvalidType("Unable to get array len for span links".to_owned()) })?; - let mut vec: Vec = Vec::with_capacity(len as usize); + let mut vec: Vec> = Vec::with_capacity(len as usize); for _ in 0..len { vec.push(decode_span_link(buf)?); } @@ -71,19 +72,19 @@ impl FromStr for SpanLinkKey { } } -fn decode_span_link<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { - let mut span = SpanLinkSlice::default(); - let span_size = rmp::decode::read_map_len(buf) +fn decode_span_link(buf: &mut Buffer) -> Result, DecodeError> { + let mut span = SpanLink::default(); + let span_size = rmp::decode::read_map_len(buf.as_mut_slice()) .map_err(|_| DecodeError::InvalidType("Unable to get map len for span size".to_owned()))?; for _ in 0..span_size { - match read_string_ref(buf)?.parse::()? { - SpanLinkKey::TraceId => span.trace_id = read_number_slice(buf)?, - SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number_slice(buf)?, - SpanLinkKey::SpanId => span.span_id = read_number_slice(buf)?, + match buf.read_string()?.borrow().parse::()? { + SpanLinkKey::TraceId => span.trace_id = read_number(buf)?, + SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number(buf)?, + SpanLinkKey::SpanId => span.span_id = read_number(buf)?, SpanLinkKey::Attributes => span.attributes = read_str_map_to_strings(buf)?, - SpanLinkKey::Tracestate => span.tracestate = read_string_ref(buf)?, - SpanLinkKey::Flags => span.flags = read_number_slice(buf)?, + SpanLinkKey::Tracestate => span.tracestate = buf.read_string()?, + SpanLinkKey::Flags => span.flags = read_number(buf)?, } } diff --git a/libdd-trace-utils/src/msgpack_decoder/decode/string.rs b/libdd-trace-utils/src/msgpack_decoder/decode/string.rs index 9fff188c82..8a2f8a8041 100644 --- a/libdd-trace-utils/src/msgpack_decoder/decode/string.rs +++ b/libdd-trace-utils/src/msgpack_decoder/decode/string.rs @@ -1,53 +1,25 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; +use crate::span::TraceData; use rmp::decode; -use rmp::decode::DecodeStringError; use std::collections::HashMap; // https://docs.rs/rmp/latest/rmp/enum.Marker.html#variant.Null (0xc0 == 192) const NULL_MARKER: &u8 = &0xc0; -/// Read a string from `buf`. -/// -/// # Errors -/// Fails if the buffer doesn't contain a valid utf8 msgpack string. -#[inline] -pub fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { - decode::read_str_from_slice(buf).map_err(|e| match e { - DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()), - DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()), - DecodeStringError::TypeMismatch(marker) => { - DecodeError::InvalidType(format!("Type mismatch at marker {marker:?}")) - } - DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()), - _ => DecodeError::IOError, - }) -} - -/// Read a string from the slices `buf`. -/// -/// # Errors -/// Fails if the buffer doesn't contain a valid utf8 msgpack string. -#[inline] -pub fn read_string_ref<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { - read_string_ref_nomut(buf).map(|(str, newbuf)| { - *buf = newbuf; - str - }) -} - /// Read a nullable string from the slices `buf`. /// /// # Errors /// Fails if the buffer doesn't contain a valid utf8 msgpack string or a null marker. #[inline] -pub fn read_nullable_string<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { +pub fn read_nullable_string(buf: &mut Buffer) -> Result { if handle_null_marker(buf) { - Ok("") + Ok(T::Text::default()) } else { - read_string_ref(buf) + buf.read_string() } } @@ -58,19 +30,19 @@ pub fn read_nullable_string<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeErr /// or if any key or value is not a valid utf8 msgpack string. /// Null values are skipped (key not inserted into map). #[inline] -pub fn read_str_map_to_strings<'a>( - buf: &mut &'a [u8], -) -> Result, DecodeError> { - let len = decode::read_map_len(buf) +pub fn read_str_map_to_strings( + buf: &mut Buffer, +) -> Result, DecodeError> { + let len = decode::read_map_len(buf.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; #[allow(clippy::expect_used)] let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); for _ in 0..len { - let key = read_string_ref(buf)?; + let key = buf.read_string()?; // Only insert if value is not null if !handle_null_marker(buf) { - let value = read_string_ref(buf)?; + let value = buf.read_string()?; map.insert(key, value); } } @@ -84,9 +56,9 @@ pub fn read_str_map_to_strings<'a>( /// or if any key or value is not a valid utf8 msgpack string. /// Null values are skipped (key not inserted into map). #[inline] -pub fn read_nullable_str_map_to_strings<'a>( - buf: &mut &'a [u8], -) -> Result, DecodeError> { +pub fn read_nullable_str_map_to_strings( + buf: &mut Buffer, +) -> Result, DecodeError> { if handle_null_marker(buf) { return Ok(HashMap::default()); } @@ -100,9 +72,10 @@ pub fn read_nullable_str_map_to_strings<'a>( /// # Returns /// A boolean indicating whether the next value is null or not. #[inline] -pub fn handle_null_marker(buf: &mut &[u8]) -> bool { - if buf.first() == Some(NULL_MARKER) { - *buf = &buf[1..]; +pub fn handle_null_marker(buf: &mut Buffer) -> bool { + let slice = buf.as_mut_slice(); + if slice.first() == Some(NULL_MARKER) { + *slice = &slice[1..]; true } else { false diff --git a/libdd-trace-utils/src/msgpack_decoder/v04/mod.rs b/libdd-trace-utils/src/msgpack_decoder/v04/mod.rs index c9dd29892b..4ee8d4cb06 100644 --- a/libdd-trace-utils/src/msgpack_decoder/v04/mod.rs +++ b/libdd-trace-utils/src/msgpack_decoder/v04/mod.rs @@ -4,8 +4,10 @@ pub(crate) mod span; use self::span::decode_span; +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; -use crate::span::{SpanBytes, SpanSlice}; +use crate::span::v04::{Span, SpanBytes, SpanSlice}; +use crate::span::TraceData; /// Decodes a Bytes buffer into a `Vec>` object, also represented as a vector of /// `TracerPayloadV04` objects. @@ -52,20 +54,7 @@ use crate::span::{SpanBytes, SpanSlice}; pub fn from_bytes( data: libdd_tinybytes::Bytes, ) -> Result<(Vec>, usize), DecodeError> { - let (traces_ref, size) = from_slice(data.as_ref())?; - - #[allow(clippy::unwrap_used)] - let traces_owned = traces_ref - .iter() - .map(|trace| { - trace - .iter() - // Safe to unwrap since the spans use subslices of the `data` slice - .map(|span| span.try_to_bytes(&data).unwrap()) - .collect() - }) - .collect(); - Ok((traces_owned, size)) + from_buffer(&mut Buffer::new(data)) } /// Decodes a slice of bytes into a `Vec>` object. @@ -110,11 +99,19 @@ pub fn from_bytes( /// let decoded_span = &decoded_traces[0][0]; /// assert_eq!("test-span", decoded_span.name); /// ``` -pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), DecodeError> { - let trace_count = rmp::decode::read_array_len(&mut data).map_err(|_| { +pub fn from_slice(data: &[u8]) -> Result<(Vec>>, usize), DecodeError> { + from_buffer(&mut Buffer::new(data)) +} + +#[allow(clippy::type_complexity)] +pub fn from_buffer( + data: &mut Buffer, +) -> Result<(Vec>>, usize), DecodeError> { + let trace_count = rmp::decode::read_array_len(data.as_mut_slice()).map_err(|_| { DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) })?; + // Intentionally skip the size of the array (as it will be recomputed after coalescing). let start_len = data.len(); #[allow(clippy::expect_used)] @@ -126,9 +123,12 @@ pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), D .expect("Unable to cast trace_count to usize"), ), |mut traces, _| { - let span_count = rmp::decode::read_array_len(&mut data).map_err(|_| { - DecodeError::InvalidFormat("Unable to read array len for span count".to_owned()) - })?; + let span_count = + rmp::decode::read_array_len(data.as_mut_slice()).map_err(|_| { + DecodeError::InvalidFormat( + "Unable to read array len for span count".to_owned(), + ) + })?; let trace = (0..span_count).try_fold( Vec::with_capacity( @@ -137,7 +137,7 @@ pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), D .expect("Unable to cast span_count to usize"), ), |mut trace, _| { - let span = decode_span(&mut data)?; + let span = decode_span(data)?; trace.push(span); Ok(trace) }, diff --git a/libdd-trace-utils/src/msgpack_decoder/v04/span.rs b/libdd-trace-utils/src/msgpack_decoder/v04/span.rs index 7b37a5aea3..33a48f5dd8 100644 --- a/libdd-trace-utils/src/msgpack_decoder/v04/span.rs +++ b/libdd-trace-utils/src/msgpack_decoder/v04/span.rs @@ -1,15 +1,17 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::msgpack_decoder::decode::buffer::Buffer; use crate::msgpack_decoder::decode::error::DecodeError; -use crate::msgpack_decoder::decode::number::read_nullable_number_slice; +use crate::msgpack_decoder::decode::number::read_nullable_number; use crate::msgpack_decoder::decode::span_event::read_span_events; use crate::msgpack_decoder::decode::span_link::read_span_links; use crate::msgpack_decoder::decode::string::{ - read_nullable_str_map_to_strings, read_nullable_string, read_string_ref, + read_nullable_str_map_to_strings, read_nullable_string, }; use crate::msgpack_decoder::decode::{meta_struct::read_meta_struct, metrics::read_metrics}; -use crate::span::{SpanKey, SpanSlice}; +use crate::span::{v04::Span, v04::SpanKey, TraceData}; +use std::borrow::Borrow; /// Decodes a slice of bytes into a `Span` object. /// @@ -27,10 +29,10 @@ use crate::span::{SpanKey, SpanSlice}; /// This function will return an error if: /// - The map length cannot be read. /// - Any key or value cannot be decoded. -pub fn decode_span<'a>(buffer: &mut &'a [u8]) -> Result, DecodeError> { - let mut span = SpanSlice::default(); +pub fn decode_span(buffer: &mut Buffer) -> Result, DecodeError> { + let mut span = Span::::default(); - let span_size = rmp::decode::read_map_len(buffer).map_err(|_| { + let span_size = rmp::decode::read_map_len(buffer.as_mut_slice()).map_err(|_| { DecodeError::InvalidFormat("Unable to get map len for span size".to_owned()) })?; @@ -43,8 +45,10 @@ pub fn decode_span<'a>(buffer: &mut &'a [u8]) -> Result, DecodeErr // Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the // BytesStrings -fn fill_span<'a>(span: &mut SpanSlice<'a>, buf: &mut &'a [u8]) -> Result<(), DecodeError> { - let key = read_string_ref(buf)? +fn fill_span(span: &mut Span, buf: &mut Buffer) -> Result<(), DecodeError> { + let key = buf + .read_string()? + .borrow() .parse::() .map_err(|e| DecodeError::InvalidFormat(e.message))?; @@ -52,12 +56,12 @@ fn fill_span<'a>(span: &mut SpanSlice<'a>, buf: &mut &'a [u8]) -> Result<(), Dec SpanKey::Service => span.service = read_nullable_string(buf)?, SpanKey::Name => span.name = read_nullable_string(buf)?, SpanKey::Resource => span.resource = read_nullable_string(buf)?, - SpanKey::TraceId => span.trace_id = read_nullable_number_slice::(buf)? as u128, - SpanKey::SpanId => span.span_id = read_nullable_number_slice(buf)?, - SpanKey::ParentId => span.parent_id = read_nullable_number_slice(buf)?, - SpanKey::Start => span.start = read_nullable_number_slice(buf)?, - SpanKey::Duration => span.duration = read_nullable_number_slice(buf)?, - SpanKey::Error => span.error = read_nullable_number_slice(buf)?, + SpanKey::TraceId => span.trace_id = read_nullable_number::<_, u64>(buf)? as u128, + SpanKey::SpanId => span.span_id = read_nullable_number(buf)?, + SpanKey::ParentId => span.parent_id = read_nullable_number(buf)?, + SpanKey::Start => span.start = read_nullable_number(buf)?, + SpanKey::Duration => span.duration = read_nullable_number(buf)?, + SpanKey::Error => span.error = read_nullable_number(buf)?, SpanKey::Type => span.r#type = read_nullable_string(buf)?, SpanKey::Meta => span.meta = read_nullable_str_map_to_strings(buf)?, SpanKey::Metrics => span.metrics = read_metrics(buf)?, diff --git a/libdd-trace-utils/src/msgpack_decoder/v05/mod.rs b/libdd-trace-utils/src/msgpack_decoder/v05/mod.rs index ea544acb34..5f54949be2 100644 --- a/libdd-trace-utils/src/msgpack_decoder/v05/mod.rs +++ b/libdd-trace-utils/src/msgpack_decoder/v05/mod.rs @@ -3,11 +3,10 @@ use crate::msgpack_decoder::decode::error::DecodeError; use crate::msgpack_decoder::decode::{ - map::read_map_len, - number::read_number_slice, - string::{handle_null_marker, read_string_ref}, + buffer::Buffer, map::read_map_len, number::read_number, string::handle_null_marker, }; -use crate::span::{SpanBytes, SpanSlice}; +use crate::span::v04::{Span, SpanBytes, SpanSlice}; +use crate::span::TraceData; use std::collections::HashMap; const PAYLOAD_LEN: u32 = 2; @@ -70,20 +69,7 @@ const SPAN_ELEM_COUNT: u32 = 12; pub fn from_bytes( data: libdd_tinybytes::Bytes, ) -> Result<(Vec>, usize), DecodeError> { - let (traces_ref, size) = from_slice(data.as_ref())?; - - #[allow(clippy::unwrap_used)] - let traces_owned = traces_ref - .iter() - .map(|trace| { - trace - .iter() - // Safe to unwrap since the spans use subslices of the `data` slice - .map(|span| span.try_to_bytes(&data).unwrap()) - .collect() - }) - .collect(); - Ok((traces_owned, size)) + from_buffer(&mut Buffer::new(data)) } /// Decodes a slice of bytes into a `Vec>` object. @@ -140,8 +126,18 @@ pub fn from_bytes( /// let decoded_span = &decoded_traces[0][0]; /// assert_eq!("", decoded_span.name); /// ``` -pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), DecodeError> { - let data_elem = rmp::decode::read_array_len(&mut data) +pub fn from_slice(data: &[u8]) -> Result<(Vec>>, usize), DecodeError> { + from_buffer(&mut Buffer::new(data)) +} + +#[allow(clippy::type_complexity)] +fn from_buffer( + data: &mut Buffer, +) -> Result<(Vec>>, usize), DecodeError> +where + T::Text: Clone, +{ + let data_elem = rmp::decode::read_array_len(data.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to read payload len".to_string()))?; if data_elem != PAYLOAD_LEN { @@ -150,21 +146,21 @@ pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), D )); } - let dict = deserialize_dict(&mut data)?; + let dict = deserialize_dict(data)?; - let trace_count = rmp::decode::read_array_len(&mut data) + let trace_count = rmp::decode::read_array_len(data.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to read trace len".to_string()))?; - let mut traces: Vec> = Vec::with_capacity(trace_count as usize); + let mut traces: Vec>> = Vec::with_capacity(trace_count as usize); let start_len = data.len(); for _ in 0..trace_count { - let span_count = rmp::decode::read_array_len(&mut data) + let span_count = rmp::decode::read_array_len(data.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to read span len".to_string()))?; - let mut trace: Vec = Vec::with_capacity(span_count as usize); + let mut trace: Vec> = Vec::with_capacity(span_count as usize); for _ in 0..span_count { - let span = deserialize_span(&mut data, &dict)?; + let span = deserialize_span(data, &dict)?; trace.push(span); } traces.push(trace); @@ -172,21 +168,27 @@ pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), D Ok((traces, start_len - data.len())) } -fn deserialize_dict<'a>(data: &mut &'a [u8]) -> Result, DecodeError> { - let dict_len = rmp::decode::read_array_len(data) +fn deserialize_dict(data: &mut Buffer) -> Result, DecodeError> { + let dict_len = rmp::decode::read_array_len(data.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?; - let mut dict: Vec<&'a str> = Vec::with_capacity(dict_len as usize); + let mut dict: Vec = Vec::with_capacity(dict_len as usize); for _ in 0..dict_len { - let str = read_string_ref(data)?; + let str = data.read_string()?; dict.push(str); } Ok(dict) } -fn deserialize_span<'a>(data: &mut &[u8], dict: &[&'a str]) -> Result, DecodeError> { - let mut span = SpanSlice::default(); - let span_len = rmp::decode::read_array_len(data) +fn deserialize_span( + data: &mut Buffer, + dict: &[T::Text], +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let mut span = Span::default(); + let span_len = rmp::decode::read_array_len(data.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?; if span_len != SPAN_ELEM_COUNT { @@ -198,12 +200,12 @@ fn deserialize_span<'a>(data: &mut &[u8], dict: &[&'a str]) -> Result(data)? as u128; - span.span_id = read_number_slice(data)?; - span.parent_id = read_number_slice(data)?; - span.start = read_number_slice(data)?; - span.duration = read_number_slice(data)?; - span.error = read_number_slice(data)?; + span.trace_id = read_number::<_, u64>(data)? as u128; + span.span_id = read_number(data)?; + span.parent_id = read_number(data)?; + span.start = read_number(data)?; + span.duration = read_number(data)?; + span.error = read_number(data)?; span.meta = read_indexed_map_to_bytes_strings(data, dict)?; span.metrics = read_metrics(data, dict)?; span.r#type = get_from_dict(data, dict)?; @@ -211,21 +213,30 @@ fn deserialize_span<'a>(data: &mut &[u8], dict: &[&'a str]) -> Result(data: &mut &[u8], dict: &[&'a str]) -> Result<&'a str, DecodeError> { - let index: u32 = read_number_slice(data)?; +fn get_from_dict( + data: &mut Buffer, + dict: &[T::Text], +) -> Result +where + T::Text: Clone, +{ + let index: u32 = read_number(data)?; match dict.get(index as usize) { - Some(value) => Ok(value), + Some(value) => Ok(value.clone()), None => Err(DecodeError::InvalidFormat( "Unable to locate string in the dictionary".to_string(), )), } } -fn read_indexed_map_to_bytes_strings<'a>( - buf: &mut &[u8], - dict: &[&'a str], -) -> Result, DecodeError> { - let len = rmp::decode::read_map_len(buf) +fn read_indexed_map_to_bytes_strings( + buf: &mut Buffer, + dict: &[T::Text], +) -> Result, DecodeError> +where + T::Text: Clone, +{ + let len = rmp::decode::read_map_len(buf.as_mut_slice()) .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; #[allow(clippy::expect_used)] @@ -238,10 +249,13 @@ fn read_indexed_map_to_bytes_strings<'a>( Ok(map) } -fn read_metrics<'a>( - buf: &mut &[u8], - dict: &[&'a str], -) -> Result, DecodeError> { +fn read_metrics( + buf: &mut Buffer, + dict: &[T::Text], +) -> Result, DecodeError> +where + T::Text: Clone, +{ if handle_null_marker(buf) { return Ok(HashMap::default()); } @@ -251,7 +265,7 @@ fn read_metrics<'a>( let mut map = HashMap::with_capacity(len); for _ in 0..len { let k = get_from_dict(buf, dict)?; - let v = read_number_slice(buf)?; + let v = read_number(buf)?; map.insert(k, v); } Ok(map) @@ -260,7 +274,9 @@ fn read_metrics<'a>( #[cfg(test)] mod tests { use super::*; + use crate::span::SliceData; use std::collections::HashMap; + type V05Span = ( u8, u8, @@ -297,7 +313,7 @@ mod tests { fn deserialize_dict_test() { let dict = vec!["foo", "bar", "baz"]; let mpack = rmp_serde::to_vec(&dict).unwrap(); - let mut payload = mpack.as_ref(); + let mut payload = Buffer::::new(mpack.as_ref()); let result = deserialize_dict(&mut payload).unwrap(); assert_eq!(dict, result); diff --git a/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs index 9b8b8f402a..97cf222268 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v04/mod.rs @@ -1,13 +1,14 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::span::{Span, SpanText}; +use crate::span::v04::Span; +use crate::span::TraceData; use rmp::encode::{write_array_len, ByteBuf, RmpWrite, ValueWriteError}; mod span; #[inline(always)] -fn to_writer]>>( +fn to_writer]>>( writer: &mut W, traces: &[S], ) -> Result<(), ValueWriteError> { @@ -44,10 +45,10 @@ fn to_writer]>>( /// /// ``` /// use libdd_trace_utils::msgpack_encoder::v04::write_to_slice; -/// use libdd_trace_utils::span::Span; +/// use libdd_trace_utils::span::v04::SpanSlice; /// /// let mut buffer = vec![0u8; 1024]; -/// let span = Span { +/// let span = SpanSlice { /// name: "test-span", /// ..Default::default() /// }; @@ -55,7 +56,7 @@ fn to_writer]>>( /// /// write_to_slice(&mut &mut buffer[..], &traces).expect("Encoding failed"); /// ``` -pub fn write_to_slice]>>( +pub fn write_to_slice]>>( slice: &mut &mut [u8], traces: &[S], ) -> Result<(), ValueWriteError> { @@ -76,9 +77,9 @@ pub fn write_to_slice]>>( /// /// ``` /// use libdd_trace_utils::msgpack_encoder::v04::to_vec; -/// use libdd_trace_utils::span::Span; +/// use libdd_trace_utils::span::v04::SpanSlice; /// -/// let span = Span { +/// let span = SpanSlice { /// name: "test-span", /// ..Default::default() /// }; @@ -87,7 +88,7 @@ pub fn write_to_slice]>>( /// /// assert!(!encoded.is_empty()); /// ``` -pub fn to_vec]>>(traces: &[S]) -> Vec { +pub fn to_vec]>>(traces: &[S]) -> Vec { to_vec_with_capacity(traces, 0) } @@ -106,9 +107,9 @@ pub fn to_vec]>>(traces: &[S]) -> Vec { /// /// ``` /// use libdd_trace_utils::msgpack_encoder::v04::to_vec_with_capacity; -/// use libdd_trace_utils::span::Span; +/// use libdd_trace_utils::span::v04::SpanSlice; /// -/// let span = Span { +/// let span = SpanSlice { /// name: "test-span", /// ..Default::default() /// }; @@ -117,7 +118,7 @@ pub fn to_vec]>>(traces: &[S]) -> Vec { /// /// assert!(encoded.capacity() >= 1024); /// ``` -pub fn to_vec_with_capacity]>>( +pub fn to_vec_with_capacity]>>( traces: &[S], capacity: u32, ) -> Vec { @@ -165,9 +166,9 @@ impl std::io::Write for CountLength { /// /// ``` /// use libdd_trace_utils::msgpack_encoder::v04::to_len; -/// use libdd_trace_utils::span::Span; +/// use libdd_trace_utils::span::v04::SpanSlice; /// -/// let span = Span { +/// let span = SpanSlice { /// name: "test-span", /// ..Default::default() /// }; @@ -176,7 +177,7 @@ impl std::io::Write for CountLength { /// /// assert!(encoded_len > 0); /// ``` -pub fn to_len]>>(traces: &[S]) -> u32 { +pub fn to_len]>>(traces: &[S]) -> u32 { let mut counter = CountLength(0); #[allow(clippy::expect_used)] to_writer(&mut counter, traces).expect("infallible: CountLength never fails"); diff --git a/libdd-trace-utils/src/msgpack_encoder/v04/span.rs b/libdd-trace-utils/src/msgpack_encoder/v04/span.rs index 2d9207857c..4fa22dc385 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v04/span.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v04/span.rs @@ -1,11 +1,13 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use crate::span::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink, SpanText}; +use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink}; +use crate::span::TraceData; use rmp::encode::{ write_bin, write_bool, write_f64, write_i64, write_sint, write_str, write_u32, write_u64, write_u8, RmpWrite, ValueWriteError, }; +use std::borrow::Borrow; /// Encodes a `SpanLink` object into a slice of bytes. /// @@ -15,13 +17,13 @@ use rmp::encode::{ /// /// # Returns /// -/// * `Ok(()))` - Nothing if successful. +/// * `Ok(())` - Nothing if successful. /// * `Err(ValueWriteError)` - An error if the writing fails. /// /// # Errors /// /// This function will return any error emitted by the writer. -pub fn encode_span_links( +pub fn encode_span_links( writer: &mut W, span_links: &[SpanLink], ) -> Result<(), ValueWriteError> { @@ -82,7 +84,7 @@ pub fn encode_span_links( /// # Errors /// /// This function will return any error emitted by the writer. -pub fn encode_span_events( +pub fn encode_span_events( writer: &mut W, span_events: &[SpanEvent], ) -> Result<(), ValueWriteError> { @@ -106,7 +108,7 @@ pub fn encode_span_events( for (k, attribute) in event.attributes.iter() { write_str(writer, k.borrow())?; - fn write_array_value( + fn write_array_value( writer: &mut W, value: &AttributeArrayValue, ) -> Result<(), ValueWriteError> { @@ -181,7 +183,7 @@ pub fn encode_span_events( /// /// This function will return any error emitted by the writer. #[inline(always)] -pub fn encode_span( +pub fn encode_span( writer: &mut W, span: &Span, ) -> Result<(), ValueWriteError> { @@ -256,7 +258,7 @@ pub fn encode_span( rmp::encode::write_map_len(writer, span.meta_struct.len() as u32)?; for (k, v) in span.meta_struct.iter() { write_str(writer, k.borrow())?; - write_bin(writer, v.as_ref())?; + write_bin(writer, v.borrow())?; } } diff --git a/libdd-trace-utils/src/send_data/mod.rs b/libdd-trace-utils/src/send_data/mod.rs index f704204e0d..1e8b32ec0e 100644 --- a/libdd-trace-utils/src/send_data/mod.rs +++ b/libdd-trace-utils/src/send_data/mod.rs @@ -26,7 +26,7 @@ use std::io::Write; #[cfg(feature = "compression")] use zstd::stream::write::Encoder; -#[derive(Debug, Clone)] +#[derive(Debug)] /// `SendData` is a structure that holds the data to be sent to a target endpoint. /// It includes the payloads to be sent, the size of the data, the target endpoint, /// headers for the request, and a retry strategy for sending the data. @@ -81,7 +81,6 @@ pub enum Compression { None, } -#[derive(Clone)] pub struct SendDataBuilder { pub(crate) tracer_payloads: TracerPayloadCollection, pub(crate) size: usize, @@ -219,35 +218,24 @@ impl SendData { self.retry_strategy = retry_strategy; } - /// Returns a clone of the SendData with the user-defined endpoint. - /// - /// # Arguments - /// - /// * `endpoint`: The new endpoint to be used. - pub fn with_endpoint(&self, endpoint: Endpoint) -> SendData { - SendData { - target: endpoint, - ..self.clone() - } - } - /// Sends the data to the target endpoint. /// /// # Returns /// /// A `SendDataResult` instance containing the result of the operation. pub async fn send(&self, http_client: &GenericHttpClient) -> SendDataResult { - self.send_internal(http_client).await + self.send_internal(http_client, None).await } async fn send_internal( &self, http_client: &GenericHttpClient, + endpoint: Option, ) -> SendDataResult { if self.use_protobuf() { - self.send_with_protobuf(http_client).await + self.send_with_protobuf(http_client, endpoint).await } else { - self.send_with_msgpack(http_client).await + self.send_with_msgpack(http_client, endpoint).await } } @@ -257,13 +245,14 @@ impl SendData { payload: Vec, headers: HashMap<&'static str, String>, http_client: &GenericHttpClient, + endpoint: Option<&Endpoint>, ) -> (SendWithRetryResult, u64, u64) { #[allow(clippy::unwrap_used)] let payload_len = u64::try_from(payload.len()).unwrap(); ( send_with_retry( http_client, - &self.target, + endpoint.unwrap_or(&self.target), payload, &headers, &self.retry_strategy, @@ -303,6 +292,7 @@ impl SendData { async fn send_with_protobuf( &self, http_client: &GenericHttpClient, + endpoint: Option, ) -> SendDataResult { let mut result = SendDataResult::default(); @@ -330,7 +320,13 @@ impl SendData { request_headers.insert(CONTENT_TYPE.as_str(), APPLICATION_PROTOBUF_STR.to_string()); let (response, bytes_sent, chunks) = self - .send_payload(chunks, final_payload, request_headers, http_client) + .send_payload( + chunks, + final_payload, + request_headers, + http_client, + endpoint.as_ref(), + ) .await; result.update(response, bytes_sent, chunks); @@ -344,6 +340,7 @@ impl SendData { async fn send_with_msgpack( &self, http_client: &GenericHttpClient, + endpoint: Option, ) -> SendDataResult { let mut result = SendDataResult::default(); let mut futures = FuturesUnordered::new(); @@ -362,7 +359,13 @@ impl SendData { Err(e) => return result.error(anyhow!(e)), }; - futures.push(self.send_payload(chunks, payload, headers, http_client)); + futures.push(self.send_payload( + chunks, + payload, + headers, + http_client, + endpoint.as_ref(), + )); } } TracerPayloadCollection::V04(payload) => { @@ -374,7 +377,13 @@ impl SendData { let payload = msgpack_encoder::v04::to_vec(payload); - futures.push(self.send_payload(chunks, payload, headers, http_client)); + futures.push(self.send_payload( + chunks, + payload, + headers, + http_client, + endpoint.as_ref(), + )); } TracerPayloadCollection::V05(payload) => { #[allow(clippy::unwrap_used)] @@ -388,7 +397,13 @@ impl SendData { Err(e) => return result.error(anyhow!(e)), }; - futures.push(self.send_payload(chunks, payload, headers, http_client)); + futures.push(self.send_payload( + chunks, + payload, + headers, + http_client, + endpoint.as_ref(), + )); } } @@ -1004,56 +1019,6 @@ mod tests { assert_eq!(res.responses_count_per_code.len(), 0); } - #[test] - fn test_with_endpoint() { - let header_tags = HEADER_TAGS; - let payload = setup_payload(&header_tags); - let original_endpoint = Endpoint { - api_key: Some(std::borrow::Cow::Borrowed("original-key")), - url: "http://originalexample.com/".parse::().unwrap(), - timeout_ms: 1000, - ..Endpoint::default() - }; - - let original_data = SendData::new( - 100, - TracerPayloadCollection::V07(vec![payload]), - header_tags, - &original_endpoint, - ); - - let new_endpoint = Endpoint { - api_key: Some(std::borrow::Cow::Borrowed("new-key")), - url: "http://newexample.com/".parse::().unwrap(), - timeout_ms: 2000, - ..Endpoint::default() - }; - - let new_data = original_data.with_endpoint(new_endpoint.clone()); - - assert_eq!(new_data.target.api_key, new_endpoint.api_key); - assert_eq!(new_data.target.url, new_endpoint.url); - assert_eq!(new_data.target.timeout_ms, new_endpoint.timeout_ms); - - assert_eq!(new_data.size, original_data.size); - assert_eq!(new_data.headers, original_data.headers); - assert_eq!(new_data.retry_strategy, original_data.retry_strategy); - assert_eq!( - new_data.tracer_payloads.size(), - original_data.tracer_payloads.size() - ); - - assert_eq!(original_data.target.api_key, original_endpoint.api_key); - assert_eq!(original_data.target.url, original_endpoint.url); - assert_eq!( - original_data.target.timeout_ms, - original_endpoint.timeout_ms - ); - - #[cfg(feature = "compression")] - assert!(matches!(new_data.compression, Compression::None)); - } - #[test] fn test_builder() { let header_tags = HEADER_TAGS; diff --git a/libdd-trace-utils/src/span/mod.rs b/libdd-trace-utils/src/span/mod.rs index 268e4ea957..7a3332f11f 100644 --- a/libdd-trace-utils/src/span/mod.rs +++ b/libdd-trace-utils/src/span/mod.rs @@ -2,68 +2,25 @@ // SPDX-License-Identifier: Apache-2.0 pub mod trace_utils; +pub mod v04; pub mod v05; +use crate::msgpack_decoder::decode::buffer::read_string_ref_nomut; +use crate::msgpack_decoder::decode::error::DecodeError; +use crate::span::v05::dict::SharedDict; use libdd_tinybytes::{Bytes, BytesString}; -use serde::ser::SerializeStruct; use serde::Serialize; use std::borrow::Borrow; -use std::collections::HashMap; -use std::fmt; +use std::fmt::Debug; use std::hash::Hash; -use std::str::FromStr; -use v05::dict::SharedDict; - -use crate::tracer_payload::TraceChunks; - -#[derive(Debug, PartialEq)] -pub enum SpanKey { - Service, - Name, - Resource, - TraceId, - SpanId, - ParentId, - Start, - Duration, - Error, - Meta, - Metrics, - Type, - MetaStruct, - SpanLinks, - SpanEvents, -} - -impl FromStr for SpanKey { - type Err = SpanKeyParseError; - - fn from_str(s: &str) -> Result { - match s { - "service" => Ok(SpanKey::Service), - "name" => Ok(SpanKey::Name), - "resource" => Ok(SpanKey::Resource), - "trace_id" => Ok(SpanKey::TraceId), - "span_id" => Ok(SpanKey::SpanId), - "parent_id" => Ok(SpanKey::ParentId), - "start" => Ok(SpanKey::Start), - "duration" => Ok(SpanKey::Duration), - "error" => Ok(SpanKey::Error), - "meta" => Ok(SpanKey::Meta), - "metrics" => Ok(SpanKey::Metrics), - "type" => Ok(SpanKey::Type), - "meta_struct" => Ok(SpanKey::MetaStruct), - "span_links" => Ok(SpanKey::SpanLinks), - "span_events" => Ok(SpanKey::SpanEvents), - _ => Err(SpanKeyParseError::new(format!("Invalid span key: {s}"))), - } - } -} +use std::marker::PhantomData; +use std::ptr::NonNull; +use std::{fmt, ptr}; /// Trait representing the requirements for a type to be used as a Span "string" type. /// Note: Borrow is not required by the derived traits, but allows to access HashMap elements /// from a static str and check if the string is empty. -pub trait SpanText: Eq + Hash + Borrow + Serialize + Default { +pub trait SpanText: Debug + Eq + Hash + Borrow + Serialize + Default { fn from_static_str(value: &'static str) -> Self; } @@ -79,351 +36,111 @@ impl SpanText for BytesString { } } -/// Checks if the `value` represents an empty string. Used to skip serializing empty strings -/// with serde. -fn is_empty_str>(value: &T) -> bool { - value.borrow().is_empty() +pub trait SpanBytes: Debug + Eq + Hash + Borrow<[u8]> + Serialize + Default { + fn from_static_bytes(value: &'static [u8]) -> Self; } -/// The generic representation of a V04 span. -/// -/// `T` is the type used to represent strings in the span, it can be either owned (e.g. BytesString) -/// or borrowed (e.g. &str). To define a generic function taking any `Span` you can use the -/// [`SpanValue`] trait: -/// ``` -/// use libdd_trace_utils::span::{Span, SpanText}; -/// fn foo(span: Span) { -/// let _ = span.meta.get("foo"); -/// } -/// ``` -#[derive(Clone, Debug, Default, PartialEq, Serialize)] -pub struct Span -where - T: SpanText, -{ - pub service: T, - pub name: T, - pub resource: T, - #[serde(skip_serializing_if = "is_empty_str")] - pub r#type: T, - #[serde(serialize_with = "serialize_lower_64_bits")] - pub trace_id: u128, - pub span_id: u64, - #[serde(skip_serializing_if = "is_default")] - pub parent_id: u64, - pub start: i64, - pub duration: i64, - #[serde(skip_serializing_if = "is_default")] - pub error: i32, - #[serde(skip_serializing_if = "HashMap::is_empty")] - pub meta: HashMap, - #[serde(skip_serializing_if = "HashMap::is_empty")] - pub metrics: HashMap, - // TODO: APMSP-1941 - Replace `Bytes` with a wrapper that borrows the underlying - // slice and serializes to bytes in MessagePack. - #[serde(skip_serializing_if = "HashMap::is_empty")] - pub meta_struct: HashMap, - #[serde(skip_serializing_if = "Vec::is_empty")] - pub span_links: Vec>, - #[serde(skip_serializing_if = "Vec::is_empty")] - pub span_events: Vec>, +impl SpanBytes for &[u8] { + fn from_static_bytes(value: &'static [u8]) -> Self { + value + } } -fn serialize_lower_64_bits(v: &u128, serializer: S) -> Result -where - S: serde::Serializer, -{ - serializer.serialize_u64(*v as u64) +impl SpanBytes for Bytes { + fn from_static_bytes(value: &'static [u8]) -> Self { + Bytes::from_static(value) + } } -/// The generic representation of a V04 span link. -/// `T` is the type used to represent strings in the span link. -#[derive(Clone, Debug, Default, PartialEq, Serialize)] -pub struct SpanLink -where - T: SpanText, -{ - pub trace_id: u64, - pub trace_id_high: u64, - pub span_id: u64, - #[serde(skip_serializing_if = "HashMap::is_empty")] - pub attributes: HashMap, - #[serde(skip_serializing_if = "is_empty_str")] - pub tracestate: T, - #[serde(skip_serializing_if = "is_default")] - pub flags: u32, -} +/// Trait representing a tuple of (Text, Bytes) types used for different underlying data structures. +/// Note: The functions are internal to the msgpack decoder and should not be used directly: they're +/// only exposed here due to the inavailability of min_specialization in stable Rust. +/// Also note that the Clone and PartialEq bounds are only present for tests. +pub trait TraceData: Default + Clone + Debug + PartialEq + Serialize { + type Text: SpanText; + type Bytes: SpanBytes; -/// The generic representation of a V04 span event. -/// `T` is the type used to represent strings in the span event. -#[derive(Clone, Debug, Default, PartialEq, Serialize)] -pub struct SpanEvent -where - T: SpanText, -{ - pub time_unix_nano: u64, - pub name: T, - #[serde(skip_serializing_if = "HashMap::is_empty")] - pub attributes: HashMap>, -} + fn get_mut_slice(buf: &mut Self::Bytes) -> &mut &'static [u8]; -#[derive(Clone, Debug, PartialEq)] -pub enum AttributeAnyValue -where - T: SpanText, -{ - SingleValue(AttributeArrayValue), - Array(Vec>), -} + fn try_slice_and_advance(buf: &mut Self::Bytes, bytes: usize) -> Option; -#[derive(Serialize)] -struct ArrayValueWrapper<'a, T: SpanText> { - values: &'a Vec>, + fn read_string(buf: &mut Self::Bytes) -> Result; } -impl Serialize for AttributeAnyValue -where - T: SpanText, -{ - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let mut state = serializer.serialize_struct("AttributeAnyValue", 2)?; - - match self { - AttributeAnyValue::SingleValue(attribute) => { - serialize_attribute_array::(&mut state, attribute)?; - } - AttributeAnyValue::Array(value) => { - let value_type: u8 = self.into(); - state.serialize_field("type", &value_type)?; - let wrapped_value = ArrayValueWrapper { values: value }; - state.serialize_field("array_value", &wrapped_value)?; - } - } +/// TraceData implementation using `Bytes` and `BytesString`. +#[derive(Clone, Default, Debug, PartialEq, Serialize)] +pub struct BytesData; +impl TraceData for BytesData { + type Text = BytesString; + type Bytes = Bytes; - state.end() + #[inline] + fn get_mut_slice(buf: &mut Bytes) -> &mut &'static [u8] { + // SAFETY: Bytes has the same layout + unsafe { std::mem::transmute::<&mut Bytes, &mut &[u8]>(buf) } } -} -impl From<&AttributeAnyValue> for u8 -where - T: SpanText, -{ - fn from(attribute: &AttributeAnyValue) -> u8 { - match attribute { - AttributeAnyValue::SingleValue(value) => value.into(), - AttributeAnyValue::Array(_) => 4, + #[inline] + fn try_slice_and_advance(buf: &mut Bytes, bytes: usize) -> Option { + let data = buf.slice_ref(&buf[0..bytes])?; + unsafe { + // SAFETY: forwarding the buffer requires that buf is borrowed from static. + let (ptr, len, underlying) = ptr::read(buf).into_raw(); + ptr::write( + buf, + Bytes::from_raw(ptr.add(bytes), len - bytes, underlying), + ); } - } -} - -#[derive(Clone, Debug, PartialEq)] -pub enum AttributeArrayValue -where - T: SpanText, -{ - String(T), - Boolean(bool), - Integer(i64), - Double(f64), -} - -impl Serialize for AttributeArrayValue -where - T: SpanText, -{ - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let mut state = serializer.serialize_struct("AttributeArrayValue", 2)?; - serialize_attribute_array::(&mut state, self)?; - state.end() - } -} - -fn serialize_attribute_array( - state: &mut S::SerializeStruct, - attribute: &AttributeArrayValue, -) -> Result<(), ::Error> -where - T: SpanText, - S: serde::Serializer, -{ - let attribute_type: u8 = attribute.into(); - state.serialize_field("type", &attribute_type)?; - match attribute { - AttributeArrayValue::String(value) => state.serialize_field("string_value", value), - AttributeArrayValue::Boolean(value) => state.serialize_field("bool_value", value), - AttributeArrayValue::Integer(value) => state.serialize_field("int_value", value), - AttributeArrayValue::Double(value) => state.serialize_field("double_value", value), - } -} - -impl From<&AttributeArrayValue> for u8 -where - T: SpanText, -{ - fn from(attribute: &AttributeArrayValue) -> u8 { - match attribute { - AttributeArrayValue::String(_) => 0, - AttributeArrayValue::Boolean(_) => 1, - AttributeArrayValue::Integer(_) => 2, - AttributeArrayValue::Double(_) => 3, + Some(data) + } + + #[inline] + fn read_string(buf: &mut Bytes) -> Result { + // Note: we need to pass a &'static lifetime here, otherwise it'll complain + let (str, newbuf) = read_string_ref_nomut(buf.as_ref())?; + let string = BytesString::from_bytes_slice(buf, str); + unsafe { + // SAFETY: forwarding the buffer requires that buf is borrowed from static. + let (_, _, underlying) = ptr::read(buf).into_raw(); + let new = Bytes::from_raw( + NonNull::new_unchecked(newbuf.as_ptr() as *mut _), + newbuf.len(), + underlying, + ); + ptr::write(buf, new); } + Ok(string) } } -pub type SpanBytes = Span; -pub type SpanLinkBytes = SpanLink; -pub type SpanEventBytes = SpanEvent; -pub type AttributeAnyValueBytes = AttributeAnyValue; -pub type AttributeArrayValueBytes = AttributeArrayValue; - -pub type SpanSlice<'a> = Span<&'a str>; -pub type SpanLinkSlice<'a> = SpanLink<&'a str>; -pub type SpanEventSlice<'a> = SpanEvent<&'a str>; -pub type AttributeAnyValueSlice<'a> = AttributeAnyValue<&'a str>; -pub type AttributeArrayValueSlice<'a> = AttributeArrayValue<&'a str>; - -pub type TraceChunksBytes = TraceChunks; +/// TraceData implementation using `&str` and `&[u8]`. +#[derive(Clone, Default, Debug, PartialEq, Serialize)] +pub struct SliceData<'a>(PhantomData<&'a u8>); +impl<'a> TraceData for SliceData<'a> { + type Text = &'a str; + type Bytes = &'a [u8]; -pub type SharedDictBytes = SharedDict; - -impl SpanSlice<'_> { - /// Converts a borrowed `SpanSlice` into an owned `SpanBytes`, by resolving all internal - /// references into slices of the provided `Bytes` buffer. Returns `None` if any slice is - /// out of bounds or invalid. - pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { - Some(SpanBytes { - service: BytesString::try_from_bytes_slice(bytes, self.service)?, - name: BytesString::try_from_bytes_slice(bytes, self.name)?, - resource: BytesString::try_from_bytes_slice(bytes, self.resource)?, - r#type: BytesString::try_from_bytes_slice(bytes, self.r#type)?, - trace_id: self.trace_id, - span_id: self.span_id, - parent_id: self.parent_id, - start: self.start, - duration: self.duration, - error: self.error, - meta: self - .meta - .iter() - .map(|(k, v)| { - Some(( - BytesString::try_from_bytes_slice(bytes, k)?, - BytesString::try_from_bytes_slice(bytes, v)?, - )) - }) - .collect::>>()?, - metrics: self - .metrics - .iter() - .map(|(k, v)| Some((BytesString::try_from_bytes_slice(bytes, k)?, *v))) - .collect::>>()?, - meta_struct: self - .meta_struct - .iter() - .map(|(k, v)| Some((BytesString::try_from_bytes_slice(bytes, k)?, v.clone()))) - .collect::>>()?, - span_links: self - .span_links - .iter() - .map(|link| link.try_to_bytes(bytes)) - .collect::>>()?, - span_events: self - .span_events - .iter() - .map(|event| event.try_to_bytes(bytes)) - .collect::>>()?, - }) + #[inline] + fn get_mut_slice<'b>(buf: &'b mut &'a [u8]) -> &'b mut &'static [u8] { + unsafe { std::mem::transmute::<&'b mut &[u8], &'b mut &'static [u8]>(buf) } } -} -impl SpanLinkSlice<'_> { - /// Converts a borrowed `SpanLinkSlice` into an owned `SpanLinkBytes`, using the provided - /// `Bytes` buffer to resolve all referenced strings. Returns `None` if conversion fails due - /// to invalid slice ranges. - pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { - Some(SpanLinkBytes { - trace_id: self.trace_id, - trace_id_high: self.trace_id_high, - span_id: self.span_id, - attributes: self - .attributes - .iter() - .map(|(k, v)| { - Some(( - BytesString::try_from_bytes_slice(bytes, k)?, - BytesString::try_from_bytes_slice(bytes, v)?, - )) - }) - .collect::>>()?, - tracestate: BytesString::try_from_bytes_slice(bytes, self.tracestate)?, - flags: self.flags, - }) + #[inline] + fn try_slice_and_advance(buf: &mut &'a [u8], bytes: usize) -> Option<&'a [u8]> { + let slice = buf.get(0..bytes)?; + *buf = &buf[bytes..]; + Some(slice) } -} -impl SpanEventSlice<'_> { - /// Converts a borrowed `SpanEventSlice` into an owned `SpanEventBytes`, resolving references - /// into the provided `Bytes` buffer. Fails with `None` if any slice is invalid or cannot be - /// converted. - pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { - Some(SpanEventBytes { - time_unix_nano: self.time_unix_nano, - name: BytesString::try_from_bytes_slice(bytes, self.name)?, - attributes: self - .attributes - .iter() - .map(|(k, v)| { - Some(( - BytesString::try_from_bytes_slice(bytes, k)?, - v.try_to_bytes(bytes)?, - )) - }) - .collect::>>()?, + #[inline] + fn read_string(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { + read_string_ref_nomut(buf).map(|(str, newbuf)| { + *buf = newbuf; + str }) } } -impl AttributeAnyValueSlice<'_> { - /// Converts a borrowed `AttributeAnyValueSlice` into its owned `AttributeAnyValueBytes` - /// representation, using the provided `Bytes` buffer. Recursively processes inner values if - /// it's an array. - pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { - match self { - AttributeAnyValue::SingleValue(value) => { - Some(AttributeAnyValue::SingleValue(value.try_to_bytes(bytes)?)) - } - AttributeAnyValue::Array(value) => Some(AttributeAnyValue::Array( - value - .iter() - .map(|attribute| attribute.try_to_bytes(bytes)) - .collect::>>()?, - )), - } - } -} - -impl AttributeArrayValueSlice<'_> { - /// Converts a single `AttributeArrayValueSlice` item into its owned form - /// (`AttributeArrayValueBytes`), borrowing data from the provided `Bytes` buffer when - /// necessary. - pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { - match self { - AttributeArrayValue::String(value) => Some(AttributeArrayValue::String( - BytesString::try_from_bytes_slice(bytes, value)?, - )), - AttributeArrayValue::Boolean(value) => Some(AttributeArrayValue::Boolean(*value)), - AttributeArrayValue::Integer(value) => Some(AttributeArrayValue::Integer(*value)), - AttributeArrayValue::Double(value) => Some(AttributeArrayValue::Double(*value)), - } - } -} - #[derive(Debug)] pub struct SpanKeyParseError { pub message: String, @@ -443,113 +160,4 @@ impl fmt::Display for SpanKeyParseError { } impl std::error::Error for SpanKeyParseError {} -fn is_default(t: &T) -> bool { - t == &T::default() -} - -#[cfg(test)] -mod tests { - use super::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink}; - use crate::msgpack_decoder::v04::span::decode_span; - use std::collections::HashMap; - - #[test] - fn skip_serializing_empty_fields_test() { - let expected = b"\x87\xa7service\xa0\xa4name\xa0\xa8resource\xa0\xa8trace_id\x00\xa7span_id\x00\xa5start\x00\xa8duration\x00"; - let val: Span<&str> = Span::default(); - let serialized = rmp_serde::encode::to_vec_named(&val).unwrap(); - assert_eq!(expected, serialized.as_slice()); - } - - #[test] - fn serialize_deserialize_test() { - let span: Span<&str> = Span { - name: "tracing.operation", - resource: "MyEndpoint", - span_links: vec![SpanLink { - trace_id: 42, - attributes: HashMap::from([("span", "link")]), - tracestate: "running", - ..Default::default() - }], - span_events: vec![SpanEvent { - time_unix_nano: 1727211691770716000, - name: "exception", - attributes: HashMap::from([ - ( - "exception.message", - AttributeAnyValue::SingleValue(AttributeArrayValue::String( - "Cannot divide by zero", - )), - ), - ( - "exception.type", - AttributeAnyValue::SingleValue(AttributeArrayValue::String("RuntimeError")), - ), - ( - "exception.escaped", - AttributeAnyValue::SingleValue(AttributeArrayValue::Boolean(false)), - ), - ( - "exception.count", - AttributeAnyValue::SingleValue(AttributeArrayValue::Integer(1)), - ), - ( - "exception.lines", - AttributeAnyValue::Array(vec![ - AttributeArrayValue::String(" File \"\", line 1, in "), - AttributeArrayValue::String(" File \"\", line 1, in divide"), - AttributeArrayValue::String("RuntimeError: Cannot divide by zero"), - ]), - ), - ]), - }], - ..Default::default() - }; - - let serialized = rmp_serde::encode::to_vec_named(&span).unwrap(); - let mut serialized_slice = serialized.as_ref(); - let deserialized = decode_span(&mut serialized_slice).unwrap(); - - assert_eq!(span.name, deserialized.name); - assert_eq!(span.resource, deserialized.resource); - assert_eq!( - span.span_links[0].trace_id, - deserialized.span_links[0].trace_id - ); - assert_eq!( - span.span_links[0].tracestate, - deserialized.span_links[0].tracestate - ); - assert_eq!(span.span_events[0].name, deserialized.span_events[0].name); - assert_eq!( - span.span_events[0].time_unix_nano, - deserialized.span_events[0].time_unix_nano - ); - for attribut in &deserialized.span_events[0].attributes { - assert!(span.span_events[0].attributes.contains_key(attribut.0)) - } - } - - #[test] - fn serialize_event_test() { - // `expected` is created by transforming the span into bytes - // and passing each bytes through `escaped_default` - let expected = b"\x88\xa7service\xa0\xa4name\xa0\xa8resource\xa0\xa8trace_id\x00\xa7span_id\x00\xa5start\x00\xa8duration\x00\xabspan_events\x91\x83\xaetime_unix_nano\xcf\x17\xf8I\xe1\xeb\xe5\x1f`\xa4name\xa4test\xaaattributes\x81\xaatest.event\x82\xa4type\x03\xacdouble_value\xcb@\x10\xcc\xcc\xcc\xcc\xcc\xcd"; - - let span: Span<&str> = Span { - span_events: vec![SpanEvent { - time_unix_nano: 1727211691770716000, - name: "test", - attributes: HashMap::from([( - "test.event", - AttributeAnyValue::SingleValue(AttributeArrayValue::Double(4.2)), - )]), - }], - ..Default::default() - }; - - let serialized = rmp_serde::encode::to_vec_named(&span).unwrap(); - assert_eq!(expected, serialized.as_slice()); - } -} +pub type SharedDictBytes = SharedDict; diff --git a/libdd-trace-utils/src/span/trace_utils.rs b/libdd-trace-utils/src/span/trace_utils.rs index 5583182aaa..8dd8a03b05 100644 --- a/libdd-trace-utils/src/span/trace_utils.rs +++ b/libdd-trace-utils/src/span/trace_utils.rs @@ -3,7 +3,7 @@ //! Trace-utils functionalities implementation for tinybytes based spans -use super::{Span, SpanText}; +use super::{v04::Span, SpanText, TraceData}; use std::collections::HashMap; /// Span metric the mini agent must set for the backend to recognize top level span @@ -15,10 +15,11 @@ const PARTIAL_VERSION_KEY: &str = "_dd.partial_version"; fn set_top_level_span(span: &mut Span, is_top_level: bool) where - T: SpanText, + T: TraceData, { if is_top_level { - span.metrics.insert(T::from_static_str(TOP_LEVEL_KEY), 1.0); + span.metrics + .insert(T::Text::from_static_str(TOP_LEVEL_KEY), 1.0); } else { span.metrics.remove(TOP_LEVEL_KEY); } @@ -32,7 +33,7 @@ where /// ancestor of other spans belonging to this service and attached to it). pub fn compute_top_level_span(trace: &mut [Span]) where - T: SpanText, + T: TraceData, { let mut span_id_idx: HashMap = HashMap::new(); for (i, span) in trace.iter().enumerate() { @@ -60,7 +61,7 @@ where } /// Return true if the span has a top level key set -pub fn has_top_level(span: &Span) -> bool { +pub fn has_top_level(span: &Span) -> bool { span.metrics .get(TRACER_TOP_LEVEL_KEY) .is_some_and(|v| *v == 1.0) @@ -68,7 +69,7 @@ pub fn has_top_level(span: &Span) -> bool { } /// Returns true if a span should be measured (i.e., it should get trace metrics calculated). -pub fn is_measured(span: &Span) -> bool { +pub fn is_measured(span: &Span) -> bool { span.metrics.get(MEASURED_KEY).is_some_and(|v| *v == 1.0) } @@ -77,7 +78,7 @@ pub fn is_measured(span: &Span) -> bool { /// When incomplete, a partial snapshot has a metric _dd.partial_version which is a positive /// integer. The metric usually increases each time a new version of the same span is sent by /// the tracer -pub fn is_partial_snapshot(span: &Span) -> bool { +pub fn is_partial_snapshot(span: &Span) -> bool { span.metrics .get(PARTIAL_VERSION_KEY) .is_some_and(|v| *v >= 0.0) @@ -102,10 +103,10 @@ const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr"; /// dropped and the latter to the spans dropped. /// /// # Trace-level attributes -/// Some attributes related to the whole trace are stored in the root span of the chunk. +/// Some attributes related to the whole trace are stored in the root span of the chunk. pub fn drop_chunks(traces: &mut Vec>>) -> DroppedP0Stats where - T: SpanText, + T: TraceData, { let mut dropped_p0_traces = 0; let mut dropped_p0_spans = 0; @@ -163,7 +164,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::span::SpanBytes; + use crate::span::v04::SpanBytes; fn create_test_span( trace_id: u64, diff --git a/libdd-trace-utils/src/span/v04/mod.rs b/libdd-trace-utils/src/span/v04/mod.rs new file mode 100644 index 0000000000..4f92a49f3b --- /dev/null +++ b/libdd-trace-utils/src/span/v04/mod.rs @@ -0,0 +1,430 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::span::{BytesData, SliceData, SpanKeyParseError, TraceData}; +use crate::tracer_payload::TraceChunks; +use serde::ser::SerializeStruct; +use serde::Serialize; +use std::borrow::Borrow; +use std::collections::HashMap; +use std::str::FromStr; + +#[derive(Debug, PartialEq)] +pub enum SpanKey { + Service, + Name, + Resource, + TraceId, + SpanId, + ParentId, + Start, + Duration, + Error, + Meta, + Metrics, + Type, + MetaStruct, + SpanLinks, + SpanEvents, +} + +impl FromStr for SpanKey { + type Err = SpanKeyParseError; + + fn from_str(s: &str) -> Result { + match s { + "service" => Ok(SpanKey::Service), + "name" => Ok(SpanKey::Name), + "resource" => Ok(SpanKey::Resource), + "trace_id" => Ok(SpanKey::TraceId), + "span_id" => Ok(SpanKey::SpanId), + "parent_id" => Ok(SpanKey::ParentId), + "start" => Ok(SpanKey::Start), + "duration" => Ok(SpanKey::Duration), + "error" => Ok(SpanKey::Error), + "meta" => Ok(SpanKey::Meta), + "metrics" => Ok(SpanKey::Metrics), + "type" => Ok(SpanKey::Type), + "meta_struct" => Ok(SpanKey::MetaStruct), + "span_links" => Ok(SpanKey::SpanLinks), + "span_events" => Ok(SpanKey::SpanEvents), + _ => Err(SpanKeyParseError::new(format!("Invalid span key: {s}"))), + } + } +} + +/// Checks if the `value` represents an empty string. Used to skip serializing empty strings +/// with serde. +fn is_empty_str>(value: &T) -> bool { + value.borrow().is_empty() +} + +/// The generic representation of a V04 span. +/// +/// `T` is the type used to represent strings in the span, it can be either owned (e.g. BytesString) +/// or borrowed (e.g. &str). To define a generic function taking any `Span` you can use the +/// [`SpanValue`] trait: +/// ``` +/// use libdd_trace_utils::span::{v04::Span, TraceData}; +/// fn foo(span: Span) { +/// let _ = span.meta.get("foo"); +/// } +/// ``` +#[derive(Debug, Default, PartialEq, Serialize)] +pub struct Span { + pub service: T::Text, + pub name: T::Text, + pub resource: T::Text, + #[serde(skip_serializing_if = "is_empty_str")] + pub r#type: T::Text, + #[serde(serialize_with = "serialize_lower_64_bits")] + pub trace_id: u128, + pub span_id: u64, + #[serde(skip_serializing_if = "is_default")] + pub parent_id: u64, + pub start: i64, + pub duration: i64, + #[serde(skip_serializing_if = "is_default")] + pub error: i32, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub meta: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub metrics: HashMap, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub meta_struct: HashMap, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub span_links: Vec>, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub span_events: Vec>, +} + +impl Clone for Span +where + T::Text: Clone, + T::Bytes: Clone, +{ + fn clone(&self) -> Self { + Span { + service: self.service.clone(), + name: self.name.clone(), + resource: self.resource.clone(), + r#type: self.r#type.clone(), + trace_id: self.trace_id, + span_id: self.span_id, + parent_id: self.parent_id, + start: self.start, + duration: self.duration, + error: self.error, + meta: self.meta.clone(), + metrics: self.metrics.clone(), + meta_struct: self.meta_struct.clone(), + span_links: self.span_links.clone(), + span_events: self.span_events.clone(), + } + } +} + +fn serialize_lower_64_bits(v: &u128, serializer: S) -> Result +where + S: serde::Serializer, +{ + serializer.serialize_u64(*v as u64) +} + +/// The generic representation of a V04 span link. +/// `T` is the type used to represent strings in the span link. +#[derive(Debug, Default, PartialEq, Serialize)] +pub struct SpanLink { + pub trace_id: u64, + pub trace_id_high: u64, + pub span_id: u64, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub attributes: HashMap, + #[serde(skip_serializing_if = "is_empty_str")] + pub tracestate: T::Text, + #[serde(skip_serializing_if = "is_default")] + pub flags: u32, +} + +impl Clone for SpanLink +where + T::Text: Clone, +{ + fn clone(&self) -> Self { + SpanLink { + trace_id: self.trace_id, + trace_id_high: self.trace_id_high, + span_id: self.span_id, + attributes: self.attributes.clone(), + tracestate: self.tracestate.clone(), + flags: self.flags, + } + } +} + +/// The generic representation of a V04 span event. +/// `T` is the type used to represent strings in the span event. +#[derive(Debug, Default, PartialEq, Serialize)] +pub struct SpanEvent { + pub time_unix_nano: u64, + pub name: T::Text, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub attributes: HashMap>, +} + +impl Clone for SpanEvent +where + T::Text: Clone, +{ + fn clone(&self) -> Self { + SpanEvent { + time_unix_nano: self.time_unix_nano, + name: self.name.clone(), + attributes: self.attributes.clone(), + } + } +} + +#[derive(Debug, PartialEq)] +pub enum AttributeAnyValue { + SingleValue(AttributeArrayValue), + Array(Vec>), +} + +#[derive(Serialize)] +struct ArrayValueWrapper<'a, T: TraceData> { + values: &'a Vec>, +} + +impl Serialize for AttributeAnyValue { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut state = serializer.serialize_struct("AttributeAnyValue", 2)?; + + match self { + AttributeAnyValue::SingleValue(attribute) => { + serialize_attribute_array::(&mut state, attribute)?; + } + AttributeAnyValue::Array(value) => { + let value_type: u8 = self.into(); + state.serialize_field("type", &value_type)?; + let wrapped_value = ArrayValueWrapper { values: value }; + state.serialize_field("array_value", &wrapped_value)?; + } + } + + state.end() + } +} + +impl From<&AttributeAnyValue> for u8 { + fn from(attribute: &AttributeAnyValue) -> u8 { + match attribute { + AttributeAnyValue::SingleValue(value) => value.into(), + AttributeAnyValue::Array(_) => 4, + } + } +} + +impl Clone for AttributeAnyValue +where + T::Text: Clone, +{ + fn clone(&self) -> Self { + match self { + AttributeAnyValue::SingleValue(v) => AttributeAnyValue::SingleValue(v.clone()), + AttributeAnyValue::Array(vec) => AttributeAnyValue::Array(vec.clone()), + } + } +} + +#[derive(Debug, PartialEq)] +pub enum AttributeArrayValue { + String(T::Text), + Boolean(bool), + Integer(i64), + Double(f64), +} + +impl Clone for AttributeArrayValue +where + T::Text: Clone, +{ + fn clone(&self) -> Self { + match self { + AttributeArrayValue::String(v) => AttributeArrayValue::String(v.clone()), + AttributeArrayValue::Boolean(v) => AttributeArrayValue::Boolean(*v), + AttributeArrayValue::Integer(v) => AttributeArrayValue::Integer(*v), + AttributeArrayValue::Double(v) => AttributeArrayValue::Double(*v), + } + } +} + +impl Serialize for AttributeArrayValue { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut state = serializer.serialize_struct("AttributeArrayValue", 2)?; + serialize_attribute_array::(&mut state, self)?; + state.end() + } +} + +fn serialize_attribute_array( + state: &mut S::SerializeStruct, + attribute: &AttributeArrayValue, +) -> Result<(), ::Error> +where + T: TraceData, + S: serde::Serializer, +{ + let attribute_type: u8 = attribute.into(); + state.serialize_field("type", &attribute_type)?; + match attribute { + AttributeArrayValue::String(value) => state.serialize_field("string_value", value), + AttributeArrayValue::Boolean(value) => state.serialize_field("bool_value", value), + AttributeArrayValue::Integer(value) => state.serialize_field("int_value", value), + AttributeArrayValue::Double(value) => state.serialize_field("double_value", value), + } +} + +impl From<&AttributeArrayValue> for u8 { + fn from(attribute: &AttributeArrayValue) -> u8 { + match attribute { + AttributeArrayValue::String(_) => 0, + AttributeArrayValue::Boolean(_) => 1, + AttributeArrayValue::Integer(_) => 2, + AttributeArrayValue::Double(_) => 3, + } + } +} + +fn is_default(t: &T) -> bool { + t == &T::default() +} + +pub type SpanBytes = Span; +pub type SpanLinkBytes = SpanLink; +pub type SpanEventBytes = SpanEvent; +pub type AttributeAnyValueBytes = AttributeAnyValue; +pub type AttributeArrayValueBytes = AttributeArrayValue; + +pub type SpanSlice<'a> = Span>; +pub type SpanLinkSlice<'a> = SpanLink>; +pub type SpanEventSlice<'a> = SpanEvent>; +pub type AttributeAnyValueSlice<'a> = AttributeAnyValue>; +pub type AttributeArrayValueSlice<'a> = AttributeArrayValue>; + +pub type TraceChunksBytes = TraceChunks; + +#[cfg(test)] +mod tests { + use super::{AttributeAnyValue, AttributeArrayValue, Span, SpanEvent, SpanLink}; + use crate::msgpack_decoder::decode::buffer::Buffer; + use crate::msgpack_decoder::v04::span::decode_span; + use crate::span::SliceData; + use std::collections::HashMap; + + #[test] + fn skip_serializing_empty_fields_test() { + let expected = b"\x87\xa7service\xa0\xa4name\xa0\xa8resource\xa0\xa8trace_id\x00\xa7span_id\x00\xa5start\x00\xa8duration\x00"; + let val: Span> = Span::default(); + let serialized = rmp_serde::encode::to_vec_named(&val).unwrap(); + assert_eq!(expected, serialized.as_slice()); + } + + #[test] + fn serialize_deserialize_test() { + let span: Span> = Span { + name: "tracing.operation", + resource: "MyEndpoint", + span_links: vec![SpanLink { + trace_id: 42, + attributes: HashMap::from([("span", "link")]), + tracestate: "running", + ..Default::default() + }], + span_events: vec![SpanEvent { + time_unix_nano: 1727211691770716000, + name: "exception", + attributes: HashMap::from([ + ( + "exception.message", + AttributeAnyValue::SingleValue(AttributeArrayValue::String( + "Cannot divide by zero", + )), + ), + ( + "exception.type", + AttributeAnyValue::SingleValue(AttributeArrayValue::String("RuntimeError")), + ), + ( + "exception.escaped", + AttributeAnyValue::SingleValue(AttributeArrayValue::Boolean(false)), + ), + ( + "exception.count", + AttributeAnyValue::SingleValue(AttributeArrayValue::Integer(1)), + ), + ( + "exception.lines", + AttributeAnyValue::Array(vec![ + AttributeArrayValue::String(" File \"\", line 1, in "), + AttributeArrayValue::String(" File \"\", line 1, in divide"), + AttributeArrayValue::String("RuntimeError: Cannot divide by zero"), + ]), + ), + ]), + }], + ..Default::default() + }; + + let serialized = rmp_serde::encode::to_vec_named(&span).unwrap(); + let mut serialized_slice = Buffer::>::new(serialized.as_ref()); + let deserialized = decode_span(&mut serialized_slice).unwrap(); + + assert_eq!(span.name, deserialized.name); + assert_eq!(span.resource, deserialized.resource); + assert_eq!( + span.span_links[0].trace_id, + deserialized.span_links[0].trace_id + ); + assert_eq!( + span.span_links[0].tracestate, + deserialized.span_links[0].tracestate + ); + assert_eq!(span.span_events[0].name, deserialized.span_events[0].name); + assert_eq!( + span.span_events[0].time_unix_nano, + deserialized.span_events[0].time_unix_nano + ); + for attribut in &deserialized.span_events[0].attributes { + assert!(span.span_events[0].attributes.contains_key(attribut.0)) + } + } + + #[test] + fn serialize_event_test() { + // `expected` is created by transforming the span into bytes + // and passing each bytes through `escaped_default` + let expected = b"\x88\xa7service\xa0\xa4name\xa0\xa8resource\xa0\xa8trace_id\x00\xa7span_id\x00\xa5start\x00\xa8duration\x00\xabspan_events\x91\x83\xaetime_unix_nano\xcf\x17\xf8I\xe1\xeb\xe5\x1f`\xa4name\xa4test\xaaattributes\x81\xaatest.event\x82\xa4type\x03\xacdouble_value\xcb@\x10\xcc\xcc\xcc\xcc\xcc\xcd"; + + let span: Span> = Span { + span_events: vec![SpanEvent { + time_unix_nano: 1727211691770716000, + name: "test", + attributes: HashMap::from([( + "test.event", + AttributeAnyValue::SingleValue(AttributeArrayValue::Double(4.2)), + )]), + }], + ..Default::default() + }; + + let serialized = rmp_serde::encode::to_vec_named(&span).unwrap(); + assert_eq!(expected, serialized.as_slice()); + } +} diff --git a/libdd-trace-utils/src/span/v05/mod.rs b/libdd-trace-utils/src/span/v05/mod.rs index e21746e1c0..a4f4e8ad9d 100644 --- a/libdd-trace-utils/src/span/v05/mod.rs +++ b/libdd-trace-utils/src/span/v05/mod.rs @@ -3,7 +3,7 @@ pub mod dict; -use crate::span::{v05::dict::SharedDict, SpanText}; +use crate::span::{v05::dict::SharedDict, TraceData}; use anyhow::Result; use serde::Serialize; use std::collections::HashMap; @@ -28,9 +28,9 @@ pub struct Span { pub r#type: u32, } -pub fn from_span( - span: crate::span::Span, - dict: &mut SharedDict, +pub fn from_v04_span( + span: crate::span::v04::Span, + dict: &mut SharedDict, ) -> Result { let meta_len = span.meta.len(); let metrics_len = span.metrics.len(); @@ -65,7 +65,7 @@ pub fn from_span( #[cfg(test)] mod tests { use super::*; - use crate::span::SpanBytes; + use crate::span::v04::SpanBytes; use libdd_tinybytes::BytesString; #[test] @@ -92,7 +92,7 @@ mod tests { }; let mut dict = SharedDict::default(); - let v05_span = from_span(span, &mut dict).unwrap(); + let v05_span = from_v04_span(span, &mut dict).unwrap(); let get_index_from_str = |str: &str| -> u32 { dict.iter() diff --git a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs index 5425442b1e..c1d3cfa5f5 100644 --- a/libdd-trace-utils/src/test_utils/datadog_test_agent.rs +++ b/libdd-trace-utils/src/test_utils/datadog_test_agent.rs @@ -235,7 +235,7 @@ impl DatadogAgentContainerBuilder { /// /// let data = SendData::new( /// 100, -/// TracerPayloadCollection::V04(vec![trace.clone()]), +/// TracerPayloadCollection::V04(vec![trace]), /// TracerHeaderTags::default(), /// &endpoint, /// ); diff --git a/libdd-trace-utils/src/test_utils/mod.rs b/libdd-trace-utils/src/test_utils/mod.rs index ba8858ee72..da4d2b8cff 100644 --- a/libdd-trace-utils/src/test_utils/mod.rs +++ b/libdd-trace-utils/src/test_utils/mod.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::time::Duration; use crate::send_data::SendData; -use crate::span::SpanBytes; +use crate::span::v04::SpanBytes; use crate::span::{v05, SharedDictBytes}; use crate::trace_utils::TracerHeaderTags; use crate::tracer_payload::TracerPayloadCollection; diff --git a/libdd-trace-utils/src/trace_utils.rs b/libdd-trace-utils/src/trace_utils.rs index e85cdab1da..b9311061ca 100644 --- a/libdd-trace-utils/src/trace_utils.rs +++ b/libdd-trace-utils/src/trace_utils.rs @@ -4,7 +4,7 @@ pub use crate::send_data::send_data_result::SendDataResult; pub use crate::send_data::SendData; use crate::span::v05::dict::SharedDict; -use crate::span::{v05, SpanText}; +use crate::span::{v05, TraceData}; pub use crate::tracer_header_tags::TracerHeaderTags; use crate::tracer_payload::TracerPayloadCollection; use crate::tracer_payload::{self, TraceChunks}; @@ -601,8 +601,8 @@ macro_rules! parse_root_span_tags { } } -pub fn collect_trace_chunks( - traces: Vec>>, +pub fn collect_trace_chunks( + traces: Vec>>, use_v05_format: bool, ) -> anyhow::Result> { if use_v05_format { @@ -613,7 +613,7 @@ pub fn collect_trace_chunks( let v05_trace = trace.into_iter().try_fold( Vec::with_capacity(trace_len), |mut acc, span| -> anyhow::Result> { - acc.push(v05::from_span(span, &mut shared_dict)?); + acc.push(v05::from_v04_span(span, &mut shared_dict)?); Ok(acc) }, )?; @@ -728,36 +728,32 @@ mod tests { #[test] fn test_coalescing_does_not_exceed_max_size() { - let dummy = SendData::new( - MAX_PAYLOAD_SIZE / 5 + 1, - TracerPayloadCollection::V07(vec![pb::TracerPayload { - container_id: "".to_string(), - language_name: "".to_string(), - language_version: "".to_string(), - tracer_version: "".to_string(), - runtime_id: "".to_string(), - chunks: vec![pb::TraceChunk { - priority: 0, - origin: "".to_string(), - spans: vec![], + fn dummy() -> SendData { + SendData::new( + MAX_PAYLOAD_SIZE / 5 + 1, + TracerPayloadCollection::V07(vec![pb::TracerPayload { + container_id: "".to_string(), + language_name: "".to_string(), + language_version: "".to_string(), + tracer_version: "".to_string(), + runtime_id: "".to_string(), + chunks: vec![pb::TraceChunk { + priority: 0, + origin: "".to_string(), + spans: vec![], + tags: Default::default(), + dropped_trace: false, + }], tags: Default::default(), - dropped_trace: false, - }], - tags: Default::default(), - env: "".to_string(), - hostname: "".to_string(), - app_version: "".to_string(), - }]), - TracerHeaderTags::default(), - &Endpoint::default(), - ); - let coalesced = coalesce_send_data(vec![ - dummy.clone(), - dummy.clone(), - dummy.clone(), - dummy.clone(), - dummy.clone(), - ]); + env: "".to_string(), + hostname: "".to_string(), + app_version: "".to_string(), + }]), + TracerHeaderTags::default(), + &Endpoint::default(), + ) + } + let coalesced = coalesce_send_data(vec![dummy(), dummy(), dummy(), dummy(), dummy()]); assert_eq!( 5, coalesced diff --git a/libdd-trace-utils/src/tracer_payload.rs b/libdd-trace-utils/src/tracer_payload.rs index 05143fbfc6..5649ed7c74 100644 --- a/libdd-trace-utils/src/tracer_payload.rs +++ b/libdd-trace-utils/src/tracer_payload.rs @@ -2,15 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use crate::span::v05::dict::SharedDict; -use crate::span::{v05, SharedDictBytes, Span, SpanBytes, SpanText}; +use crate::span::{v04, v05, BytesData, SharedDictBytes, TraceData}; use crate::trace_utils::collect_trace_chunks; use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads}; -use libdd_tinybytes::BytesString; use libdd_trace_protobuf::pb; use std::cmp::Ordering; use std::iter::Iterator; -pub type TracerPayloadV04 = Vec; +pub type TracerPayloadV04 = Vec; pub type TracerPayloadV05 = Vec; #[derive(Debug, Clone)] @@ -18,19 +17,19 @@ pub type TracerPayloadV05 = Vec; pub enum TraceEncoding { /// v0.4 encoding (TracerPayloadV04). V04, - /// v054 encoding (TracerPayloadV04). + /// v0.5 encoding (TracerPayloadV05). V05, } -#[derive(Debug, Clone)] -pub enum TraceChunks { +#[derive(Debug)] +pub enum TraceChunks { /// Collection of TraceChunkSpan. - V04(Vec>>), + V04(Vec>>), /// Collection of TraceChunkSpan with de-duplicated strings. - V05((SharedDict, Vec>)), + V05((SharedDict, Vec>)), } -impl TraceChunks { +impl TraceChunks { pub fn into_tracer_payload_collection(self) -> TracerPayloadCollection { match self { TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces), @@ -39,7 +38,7 @@ impl TraceChunks { } } -impl TraceChunks { +impl TraceChunks { /// Returns the number of traces in the chunk pub fn size(&self) -> usize { match self { @@ -49,13 +48,13 @@ impl TraceChunks { } } -#[derive(Debug, Clone)] +#[derive(Debug)] /// Enum representing a general abstraction for a collection of tracer payloads. pub enum TracerPayloadCollection { /// Collection of TracerPayloads. V07(Vec), /// Collection of TraceChunkSpan. - V04(Vec>), + V04(Vec>), /// Collection of TraceChunkSpan with de-duplicated strings. V05((SharedDictBytes, Vec>)), } @@ -223,7 +222,7 @@ impl TraceChunkProcessor for DefaultTraceChunkProcessor { pub fn decode_to_trace_chunks( data: libdd_tinybytes::Bytes, encoding_type: TraceEncoding, -) -> Result<(TraceChunks, usize), anyhow::Error> { +) -> Result<(TraceChunks, usize), anyhow::Error> { let (data, size) = match encoding_type { TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data), TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data), @@ -239,7 +238,7 @@ pub fn decode_to_trace_chunks( #[cfg(test)] mod tests { use super::*; - use crate::span::SpanBytes; + use crate::span::v04::SpanBytes; use crate::test_utils::create_test_no_alloc_span; use libdd_tinybytes::BytesString; use libdd_trace_protobuf::pb; @@ -278,34 +277,43 @@ mod tests { #[test] fn test_append_traces_v07() { + let mut two_traces = create_dummy_collection_v07(); + two_traces.append(&mut create_dummy_collection_v07()); + let mut trace = create_dummy_collection_v07(); - let empty = TracerPayloadCollection::V07(vec![]); + let mut empty = TracerPayloadCollection::V07(vec![]); - trace.append(&mut trace.clone()); + trace.append(&mut create_dummy_collection_v07()); assert_eq!(2, trace.size()); - trace.append(&mut trace.clone()); + trace.append(&mut two_traces); assert_eq!(4, trace.size()); - trace.append(&mut empty.clone()); + trace.append(&mut empty); assert_eq!(4, trace.size()); } #[test] fn test_append_traces_v04() { - let mut trace = - TracerPayloadCollection::V04(vec![vec![create_test_no_alloc_span(0, 1, 0, 2, true)]]); + fn create_trace() -> TracerPayloadCollection { + TracerPayloadCollection::V04(vec![vec![create_test_no_alloc_span(0, 1, 0, 2, true)]]) + } + + let mut two_traces = create_trace(); + two_traces.append(&mut create_trace()); + + let mut trace = create_trace(); - let empty = TracerPayloadCollection::V04(vec![]); + let mut empty = TracerPayloadCollection::V04(vec![]); - trace.append(&mut trace.clone()); + trace.append(&mut create_trace()); assert_eq!(2, trace.size()); - trace.append(&mut trace.clone()); + trace.append(&mut two_traces); assert_eq!(4, trace.size()); - trace.append(&mut empty.clone()); + trace.append(&mut empty); assert_eq!(4, trace.size()); } @@ -313,7 +321,7 @@ mod tests { fn test_merge_traces() { let mut trace = create_dummy_collection_v07(); - trace.append(&mut trace.clone()); + trace.append(&mut create_dummy_collection_v07()); assert_eq!(2, trace.size()); trace.merge(); @@ -429,7 +437,7 @@ mod tests { #[test] fn test_try_into_meta_metrics_success() { let dummy_trace = create_trace(); - let expected = vec![dummy_trace.clone()]; + let expected = vec![create_trace()]; let payload = rmp_serde::to_vec_named(&expected).unwrap(); let payload = libdd_tinybytes::Bytes::from(payload);