Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datadog-sidecar-ffi/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use libdd_common_ffi::slice::{AsBytes, CharSlice};
use libdd_tinybytes::{Bytes, BytesString};
use libdd_trace_utils::span::{
use libdd_trace_utils::span::v04::{
AttributeAnyValueBytes, AttributeArrayValueBytes, SpanBytes, SpanEventBytes, SpanLinkBytes,
};
use std::borrow::Cow;
Expand Down
2 changes: 1 addition & 1 deletion datadog-sidecar-ffi/tests/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use datadog_sidecar_ffi::span::*;
use libdd_common_ffi::slice::*;
use libdd_tinybytes::*;
use libdd_trace_utils::span::*;
use libdd_trace_utils::span::v04::*;
use std::collections::HashMap;

#[test]
Expand Down
5 changes: 2 additions & 3 deletions datadog-sidecar/src/service/tracing/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,8 @@ mod tests {
};

let send_data_1 = create_send_data(size, &target_endpoint);

let send_data_2 = send_data_1.clone();
let send_data_3 = send_data_1.clone();
let send_data_2 = create_send_data(size, &target_endpoint);
let send_data_3 = create_send_data(size, &target_endpoint);

trace_flusher.enqueue(send_data_1);
trace_flusher.enqueue(send_data_2);
Expand Down
2 changes: 1 addition & 1 deletion libdd-data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ mod tests {
use crate::error::ddog_trace_exporter_error_free;
use httpmock::prelude::*;
use httpmock::MockServer;
use libdd_trace_utils::span::SpanSlice;
use libdd_trace_utils::span::v04::SpanSlice;
use std::{borrow::Borrow, mem::MaybeUninit};

#[test]
Expand Down
2 changes: 1 addition & 1 deletion libdd-data-pipeline/src/stats_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ mod tests {
use httpmock::prelude::*;
use httpmock::MockServer;
use libdd_common::hyper_migration::new_default_client;
use libdd_trace_utils::span::{trace_utils, SpanSlice};
use libdd_trace_utils::span::{trace_utils, v04::SpanSlice};
use libdd_trace_utils::test_utils::poll_for_mock_hit;
use time::Duration;
use time::SystemTime;
Expand Down
12 changes: 6 additions & 6 deletions libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use libdd_trace_utils::msgpack_decoder;
use libdd_trace_utils::send_with_retry::{
send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult,
};
use libdd_trace_utils::span::{Span, SpanText};
use libdd_trace_utils::span::{v04::Span, TraceData};
use libdd_trace_utils::trace_utils::TracerHeaderTags;
use std::io;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -587,7 +587,7 @@ impl TraceExporter {
/// # Returns
/// * Ok(String): The response from the agent
/// * Err(TraceExporterError): An error detailing what went wrong in the process
pub fn send_trace_chunks<T: SpanText>(
pub fn send_trace_chunks<T: TraceData>(
&self,
trace_chunks: Vec<Vec<Span<T>>>,
) -> Result<AgentResponse, TraceExporterError> {
Expand All @@ -604,7 +604,7 @@ impl TraceExporter {
/// # Returns
/// * Ok(String): The response from the agent
/// * Err(TraceExporterError): An error detailing what went wrong in the process
pub async fn send_trace_chunks_async<T: SpanText>(
pub async fn send_trace_chunks_async<T: TraceData>(
&self,
trace_chunks: Vec<Vec<Span<T>>>,
) -> Result<AgentResponse, TraceExporterError> {
Expand Down Expand Up @@ -687,7 +687,7 @@ impl TraceExporter {
self.handle_send_result(result, chunks, payload_len).await
}

async fn send_trace_chunks_inner<T: SpanText>(
async fn send_trace_chunks_inner<T: TraceData>(
&self,
mut traces: Vec<Vec<Span<T>>>,
) -> Result<AgentResponse, TraceExporterError> {
Expand Down Expand Up @@ -1012,8 +1012,8 @@ mod tests {
use httpmock::MockServer;
use libdd_tinybytes::BytesString;
use libdd_trace_utils::msgpack_encoder;
use libdd_trace_utils::span::v04::SpanBytes;
use libdd_trace_utils::span::v05;
use libdd_trace_utils::span::SpanBytes;
use std::collections::HashMap;
use std::net;
use std::time::Duration;
Expand Down Expand Up @@ -2024,7 +2024,7 @@ mod single_threaded_tests {
use crate::agent_info;
use httpmock::prelude::*;
use libdd_trace_utils::msgpack_encoder;
use libdd_trace_utils::span::SpanBytes;
use libdd_trace_utils::span::v04::SpanBytes;
use std::time::Duration;
use tokio::time::sleep;

Expand Down
8 changes: 4 additions & 4 deletions libdd-data-pipeline/src/trace_exporter/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ pub(crate) fn handle_stats_enabled(
/// Add all spans from the given iterator into the stats concentrator
/// # Panic
/// Will panic if another thread panicked will holding the lock on `stats_concentrator`
fn add_spans_to_stats<T: libdd_trace_utils::span::SpanText>(
fn add_spans_to_stats<T: libdd_trace_utils::span::TraceData>(
stats_concentrator: &Mutex<SpanConcentrator>,
traces: &[Vec<libdd_trace_utils::span::Span<T>>],
traces: &[Vec<libdd_trace_utils::span::v04::Span<T>>],
) {
let mut stats_concentrator = stats_concentrator.lock_or_panic();

Expand All @@ -219,8 +219,8 @@ fn add_spans_to_stats<T: libdd_trace_utils::span::SpanText>(
}

/// Process traces for stats computation and update header tags accordingly
pub(crate) fn process_traces_for_stats<T: libdd_trace_utils::span::SpanText>(
traces: &mut Vec<Vec<libdd_trace_utils::span::Span<T>>>,
pub(crate) fn process_traces_for_stats<T: libdd_trace_utils::span::TraceData>(
traces: &mut Vec<Vec<libdd_trace_utils::span::v04::Span<T>>>,
header_tags: &mut libdd_trace_utils::trace_utils::TracerHeaderTags,
client_side_stats: &ArcSwap<StatsComputationStatus>,
client_computed_top_level: bool,
Expand Down
10 changes: 5 additions & 5 deletions libdd-data-pipeline/src/trace_exporter/trace_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use libdd_common::header::{
};
use libdd_trace_utils::msgpack_decoder::decode::error::DecodeError;
use libdd_trace_utils::msgpack_encoder;
use libdd_trace_utils::span::{Span, SpanText};
use libdd_trace_utils::span::{v04::Span, TraceData};
use libdd_trace_utils::trace_utils::{self, TracerHeaderTags};
use libdd_trace_utils::tracer_payload;
use std::collections::HashMap;
Expand Down Expand Up @@ -46,7 +46,7 @@ impl<'a> TraceSerializer<'a> {
}

/// Prepare traces payload and HTTP headers for sending to agent
pub(super) fn prepare_traces_payload<T: SpanText>(
pub(super) fn prepare_traces_payload<T: TraceData>(
&self,
traces: Vec<Vec<Span<T>>>,
header_tags: TracerHeaderTags,
Expand All @@ -64,7 +64,7 @@ impl<'a> TraceSerializer<'a> {
}

/// Collect trace chunks based on output format
fn collect_and_process_traces<T: SpanText>(
fn collect_and_process_traces<T: TraceData>(
&self,
traces: Vec<Vec<Span<T>>>,
) -> Result<tracer_payload::TraceChunks<T>, TraceExporterError> {
Expand Down Expand Up @@ -97,7 +97,7 @@ impl<'a> TraceSerializer<'a> {
}

/// Serialize payload to msgpack format
fn serialize_payload<T: SpanText>(
fn serialize_payload<T: TraceData>(
&self,
payload: &tracer_payload::TraceChunks<T>,
) -> Result<Vec<u8>, TraceExporterError> {
Expand All @@ -119,7 +119,7 @@ mod tests {
APPLICATION_MSGPACK_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR, DATADOG_TRACE_COUNT_STR,
};
use libdd_tinybytes::BytesString;
use libdd_trace_utils::span::SpanBytes;
use libdd_trace_utils::span::v04::SpanBytes;
use libdd_trace_utils::trace_utils::TracerHeaderTags;

fn create_test_span() -> SpanBytes {
Expand Down
34 changes: 26 additions & 8 deletions libdd-tinybytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@
#![cfg_attr(not(test), deny(clippy::todo))]
#![cfg_attr(not(test), deny(clippy::unimplemented))]

#[cfg(feature = "serde")]
use serde::Serialize;
use std::{
borrow, cmp, fmt, hash,
ops::{self, RangeBounds},
ptr::NonNull,
sync::atomic::AtomicUsize,
};

#[cfg(feature = "serde")]
use serde::Serialize;

/// Immutable bytes type with zero copy cloning and slicing.
#[derive(Clone)]
#[repr(C)] // fixed layout for ad-hoc conversion to slice
pub struct Bytes {
ptr: NonNull<u8>,
len: usize,
Expand Down Expand Up @@ -49,11 +49,24 @@ impl Bytes {
len: usize,
refcount: RefCountedCell,
) -> Self {
Self {
ptr,
len,
bytes: Some(refcount),
}
Self::from_raw(ptr, len, Some(refcount))
}

#[inline]
/// Creates a new `Bytes` from the given slice data and the refcount. Can be used after calling
/// into_raw().
///
/// # Safety
///
/// * the pointer should be valid for the given length
/// * the pointer should be valid for reads as long as the refcount or any of it's clone is not
/// dropped
pub const unsafe fn from_raw(
ptr: NonNull<u8>,
len: usize,
bytes: Option<RefCountedCell>,
) -> Self {
Self { ptr, len, bytes }
}

/// Creates empty `Bytes`.
Expand Down Expand Up @@ -235,6 +248,11 @@ impl Bytes {
// SAFETY: ptr is valid for the associated length
unsafe { std::slice::from_raw_parts(self.ptr.as_ptr().cast_const(), self.len()) }
}

#[inline]
pub fn into_raw(self) -> (NonNull<u8>, usize, Option<RefCountedCell>) {
(self.ptr, self.len, self.bytes)
}
}

// Implementations of `UnderlyingBytes` for common types.
Expand Down
2 changes: 1 addition & 1 deletion libdd-trace-stats/benches/span_concentrator_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{

use criterion::{criterion_group, Criterion};
use libdd_trace_stats::span_concentrator::SpanConcentrator;
use libdd_trace_utils::span::SpanBytes;
use libdd_trace_utils::span::v04::SpanBytes;

fn get_bucket_start(now: SystemTime, n: u64) -> i64 {
let start = now.duration_since(time::UNIX_EPOCH).unwrap() + Duration::from_secs(10 * n);
Expand Down
2 changes: 1 addition & 1 deletion libdd-trace-stats/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl

#[cfg(test)]
mod tests {
use libdd_trace_utils::span::{SpanBytes, SpanSlice};
use libdd_trace_utils::span::v04::{SpanBytes, SpanSlice};

use super::*;
use std::{collections::HashMap, hash::Hash};
Expand Down
5 changes: 3 additions & 2 deletions libdd-trace-stats/src/span_concentrator/stat_span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
//! support both trace-utils' Span and pb::Span.

use libdd_trace_protobuf::pb;
use libdd_trace_utils::span::{trace_utils, Span, SpanText};
use libdd_trace_utils::span::{trace_utils, v04::Span, TraceData};
use libdd_trace_utils::trace_utils as pb_utils;
use std::borrow::Borrow;

/// Common interface for spans used in stats computation
pub trait StatSpan<'a> {
Expand Down Expand Up @@ -38,7 +39,7 @@ pub trait StatSpan<'a> {
fn get_metrics(&'a self, key: &str) -> Option<f64>;
}

impl<'a, T: SpanText> StatSpan<'a> for Span<T> {
impl<'a, T: TraceData> StatSpan<'a> for Span<T> {
fn service(&'a self) -> &'a str {
self.service.borrow()
}
Expand Down
2 changes: 1 addition & 1 deletion libdd-trace-stats/src/span_concentrator/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::span_concentrator::aggregation::OwnedAggregationKey;

use super::*;
use libdd_trace_utils::span::{trace_utils::compute_top_level_span, SpanSlice};
use libdd_trace_utils::span::{trace_utils::compute_top_level_span, v04::SpanSlice};
use rand::{thread_rng, Rng};

const BUCKET_SIZE: u64 = Duration::from_secs(2).as_nanos() as u64;
Expand Down
63 changes: 63 additions & 0 deletions libdd-trace-utils/src/msgpack_decoder/decode/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::decode::error::DecodeError;
use crate::span::TraceData;
use rmp::decode;
use rmp::decode::DecodeStringError;

use std::borrow::Borrow;
use std::ops::Deref;

/// Read a string from `buf`.
///
/// # Errors
/// Fails if the buffer doesn't contain a valid utf8 msgpack string.
#[inline]
pub fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> {
decode::read_str_from_slice(buf).map_err(|e| match e {
DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()),
DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()),
DecodeStringError::TypeMismatch(marker) => {
DecodeError::InvalidType(format!("Type mismatch at marker {marker:?}"))
}
DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()),
_ => DecodeError::IOError,
})
}

/// Internal Buffer used to wrap msgpack data for decoding.
/// Provides a couple accessors to extract data from the buffer.
pub struct Buffer<T: TraceData>(T::Bytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we use &[u8] for the buffer ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to at least store the Bytes reference to construct the Bytes/BytesString object.


impl<T: TraceData> Buffer<T> {
pub fn new(data: T::Bytes) -> Self {
Buffer(data)
}

/// Returns a mutable reference to the underlying slice.
pub fn as_mut_slice(&mut self) -> &mut &'static [u8] {
T::get_mut_slice(&mut self.0)
}

/// Tries to extract a slice of `bytes` from the buffer and advances the buffer.
pub fn try_slice_and_advance(&mut self, bytes: usize) -> Option<T::Bytes> {
T::try_slice_and_advance(&mut self.0, bytes)
}

/// Read a string from the slices `buf`.
///
/// # Errors
/// Fails if the buffer doesn't contain a valid utf8 msgpack string.
pub fn read_string(&mut self) -> Result<T::Text, DecodeError> {
T::read_string(&mut self.0)
}
}

impl<T: TraceData> Deref for Buffer<T> {
type Target = [u8];

fn deref(&self) -> &Self::Target {
self.0.borrow()
}
}
12 changes: 7 additions & 5 deletions libdd-trace-utils/src/msgpack_decoder/decode/map.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::decode::error::DecodeError;
use crate::msgpack_decoder::decode::{buffer::Buffer, error::DecodeError};
use crate::span::TraceData;
use rmp::{decode, decode::RmpRead, Marker};
use std::collections::HashMap;

Expand Down Expand Up @@ -33,14 +34,14 @@ use std::collections::HashMap;
/// * `V` - The type of the values in the map.
/// * `F` - The type of the function used to read key-value pairs from the buffer.
#[inline]
pub fn read_map<'a, K, V, F>(
pub fn read_map<K, V, F, B>(
len: usize,
buf: &mut &'a [u8],
buf: &mut B,
read_pair: F,
) -> Result<HashMap<K, V>, DecodeError>
where
K: std::hash::Hash + Eq,
F: Fn(&mut &'a [u8]) -> Result<(K, V), DecodeError>,
F: Fn(&mut B) -> Result<(K, V), DecodeError>,
{
let mut map = HashMap::with_capacity(len);
for _ in 0..len {
Expand All @@ -67,7 +68,8 @@ where
/// - The buffer does not contain a map.
/// - There is an error reading from the buffer.
#[inline]
pub fn read_map_len(buf: &mut &[u8]) -> Result<usize, DecodeError> {
pub fn read_map_len<T: TraceData>(buf: &mut Buffer<T>) -> Result<usize, DecodeError> {
let buf = buf.as_mut_slice();
match decode::read_marker(buf)
.map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))?
{
Expand Down
Loading
Loading