From 116b40631fba5552a9bd32dec495557caca9cabe Mon Sep 17 00:00:00 2001 From: Bryan English Date: Wed, 14 Jan 2026 14:24:24 -0500 Subject: [PATCH 1/3] initial change buffer impl --- libdd-trace-utils/src/change_buffer/mod.rs | 305 +++++++++++++++++++ libdd-trace-utils/src/change_buffer/trace.rs | 17 ++ libdd-trace-utils/src/change_buffer/utils.rs | 36 +++ libdd-trace-utils/src/lib.rs | 2 + 4 files changed, 360 insertions(+) create mode 100644 libdd-trace-utils/src/change_buffer/mod.rs create mode 100644 libdd-trace-utils/src/change_buffer/trace.rs create mode 100644 libdd-trace-utils/src/change_buffer/utils.rs diff --git a/libdd-trace-utils/src/change_buffer/mod.rs b/libdd-trace-utils/src/change_buffer/mod.rs new file mode 100644 index 0000000000..4a6999049f --- /dev/null +++ b/libdd-trace-utils/src/change_buffer/mod.rs @@ -0,0 +1,305 @@ +use std::collections::HashMap; + +use anyhow::{anyhow, Result}; + +mod utils; +use utils::*; + +mod trace; +use trace::*; + +use crate::span::{Span, SpanText}; + +#[derive(Clone, Copy)] +pub struct ChangeBuffer(*const u8); +unsafe impl Send for ChangeBuffer {} +unsafe impl Sync for ChangeBuffer {} + +impl std::ops::Deref for ChangeBuffer { + type Target = *const u8; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +pub struct ChangeBufferState { + change_buffer: ChangeBuffer, + spans: HashMap>, + traces: HashMap>, + string_table: HashMap, + tracer_service: T, + tracer_language: T, + pid: f64, +} + +fn new_span(span_id: u64, parent_id: u64, trace_id: u128) -> Span { + Span { + span_id, + trace_id, + parent_id, + ..Default::default() + } +} + +impl ChangeBufferState { + pub fn new(change_buffer: ChangeBuffer, tracer_service: T, tracer_language: T, pid: f64) -> Self { + ChangeBufferState { + change_buffer, + spans: HashMap::new(), + traces: HashMap::new(), + string_table: HashMap::new(), + tracer_service, + tracer_language, + pid, + } + } + + pub fn flush_chunk(&mut self, span_ids: Vec, first_is_local_root:bool) -> Result>> { + let mut chunk_trace_id: Option = None; + let mut is_local_root = first_is_local_root; + let mut is_chunk_root = true; + + let spans_vec = span_ids.iter().map(|span_id| -> Result> { + let maybe_span = self.spans.remove(span_id); + + let mut span = maybe_span.ok_or_else(|| anyhow!("span not found: {}", span_id))?; + chunk_trace_id = Some(span.trace_id); + + if is_local_root { + self.copy_in_sampling_tags(&mut span); + span.metrics.insert(T::from_static_str("_dd.top_level"), 1.0); + is_local_root = false; + } + if is_chunk_root { + self.copy_in_chunk_tags(&mut span); + is_chunk_root = false; + } + + self.process_one_span(&mut span); + + Ok(span) + }).collect::>>()?; + + Ok(spans_vec) + } + + fn copy_in_sampling_tags(&self, span: &mut Span) { + if let Some(trace) = self.traces.get(&span.trace_id) { + if let Some(rule) = trace.sampling_rule_decision { + span.metrics.insert(T::from_static_str("_dd.rule_psr"), rule); + } + if let Some(rule) = trace.sampling_limit_decision { + span.metrics.insert(T::from_static_str("_dd.;_psr"), rule); + } + if let Some(rule) = trace.sampling_agent_decision { + span.metrics.insert(T::from_static_str("_dd.agent_psr"), rule); + } + + } + } + + fn copy_in_chunk_tags(&self, span: &mut Span) { + if let Some(trace) = self.traces.get(&span.trace_id) { + span.meta.reserve(trace.meta.len()); + span.meta.extend(trace.meta.clone()); + span.metrics.reserve(trace.metrics.len()); + span.metrics.extend(trace.metrics.clone()); + } + } + + fn process_one_span(&self, span: &mut Span) { + // TODO span.sample(); + + if let Some(kind) = span.meta.get("kind") { + if kind != &T::from_static_str("internal") { + span.metrics.insert(T::from_static_str("_dd.measured"), 1.0); + } + } + + if span.service != self.tracer_service { + span.meta.insert(T::from_static_str("_dd.base_service"), self.tracer_service.clone()); + // TODO span.service should be added to the "extra services" used by RC, which is not + // yet implemented here + } + + // SKIP setting single-span ingestion. They should be set when sampling is finalized for + // the span. + + span.meta.insert(T::from_static_str("language"), self.tracer_language.clone()); + span.metrics.insert(T::from_static_str("process_id"), self.pid); + + if let Some(trace) = self.traces.get(&span.trace_id) { + if let Some(origin) = trace.origin.clone() { + span.meta.insert(T::from_static_str("_dd.origin"), origin); + } + } + + // SKIP hostname. This can be an option to the span constructor, so we'll set the tag at + // that point. + + // TODO Sampling priority, if we're not doing that ahead of time. + } + + pub fn flush_change_buffer (&mut self) -> Result<()>{ + let mut index = 0; + let buf = *self.change_buffer; + let mut count: u64 = get_num_raw(buf, &mut index); + + while count > 0 { + let op = BufferedOperation::from_buf(&self.change_buffer, &mut index); + self.interpret_operation(&mut index, &op)?; + count -= 1; + } + + // Write 0 back to the count position so the writing side of the buffer knows the queue was + // flushed + let buf_mut = buf as *mut u8; + unsafe { + std::ptr::copy_nonoverlapping([0u8; 8].as_ptr(), buf_mut, 8); + } + + Ok(()) + } + + fn get_string_arg(&self, index: &mut usize) -> Result { + let num: u32 = self.get_num_arg(index); + self.string_table.get(&num).cloned().ok_or_else(|| { + anyhow!("string not found internally: {}", num) + }) + } + + fn get_num_arg(&self, index: &mut usize) -> U { + get_num_raw(*self.change_buffer, index) + } + + fn get_mut_span(&mut self, id: &u64) -> Result<&mut Span> { + self.spans.get_mut(id).ok_or_else(|| { + anyhow!("span not found internally: {}", id) + }) + } + + fn get_span(&self, id: &u64) -> Result<&Span> { + self.spans.get(id).ok_or_else(|| { + anyhow!("span not found internally: {}", id) + }) + } + + fn interpret_operation(&mut self, index: &mut usize, op: &BufferedOperation) -> Result<()> { + match op.opcode { + OpCode::Create => { + let trace_id: u128 = self.get_num_arg(index); + let parent_id = self.get_num_arg(index); + let span = new_span(op.span_id, parent_id, trace_id); + self.spans.insert(op.span_id, span); + // Ensure trace exists (creates new one if this is the first span for this trace) + self.traces.entry(trace_id).or_default(); + + // *self.trace_span_counts.entry(trace_id).or_insert(0) += 1; + } + OpCode::SetMetaAttr => { + let name = self.get_string_arg(index)?; + let val = self.get_string_arg(index)?; + let span = self.get_mut_span(&op.span_id)?; + span.meta.insert(name, val); + } + OpCode::SetMetricAttr => { + let name = self.get_string_arg(index)?; + let val: f64 = self.get_num_arg(index); + let span = self.get_mut_span(&op.span_id)?; + span.metrics.insert(name, val); + } + OpCode::SetServiceName => { + self.get_mut_span(&op.span_id)?.service = self.get_string_arg(index)?; + } + OpCode::SetResourceName => { + self.get_mut_span(&op.span_id)?.resource = self.get_string_arg(index)?; + } + OpCode::SetError => { + self.get_mut_span(&op.span_id)?.error = self.get_num_arg(index); + } + OpCode::SetStart => { + self.get_mut_span(&op.span_id)?.start = self.get_num_arg(index); + } + OpCode::SetDuration => { + self.get_mut_span(&op.span_id)?.duration = self.get_num_arg(index); + } + OpCode::SetType => { + self.get_mut_span(&op.span_id)?.r#type = self.get_string_arg(index)?; + } + OpCode::SetName => { + self.get_mut_span(&op.span_id)?.name = self.get_string_arg(index)?; + } + OpCode::SetTraceMetaAttr => { + let name = self.get_string_arg(index)?; + let val = self.get_string_arg(index)?; + let trace_id = self.get_span(&op.span_id)?.trace_id; + if let Some(trace) = self.traces.get_mut(&trace_id) { + trace.meta.insert(name, val); + } + } + OpCode::SetTraceMetricsAttr => { + let name = self.get_string_arg(index)?; + let val = self.get_num_arg(index); + let trace_id = self.get_span(&op.span_id)?.trace_id; + if let Some(trace) = self.traces.get_mut(&trace_id) { + trace.metrics.insert(name, val); + } + } + OpCode::SetTraceOrigin => { + let origin = self.get_string_arg(index)?; + let trace_id = self.get_span(&op.span_id)?.trace_id; + if let Some(trace) = self.traces.get_mut(&trace_id) { + trace.origin = Some(origin); + } + } + }; + + Ok(()) + } + + pub fn string_table_insert_one(&mut self, key: u32, val: T) { + self.string_table.insert(key, val); + } + + pub fn string_table_evict_one(&mut self, key: u32) { + self.string_table.remove(&key); + } +} + +#[repr(u64)] +pub enum OpCode { + Create = 0, + SetMetaAttr = 1, + SetMetricAttr = 2, + SetServiceName = 3, + SetResourceName = 4, + SetError = 5, + SetStart = 6, + SetDuration = 7, + SetType = 8, + SetName = 9, + SetTraceMetaAttr = 10, + SetTraceMetricsAttr = 11, + SetTraceOrigin = 12, + // TODO: SpanLinks, SpanEvents, StructAttr +} + +impl From for OpCode { + fn from(val: u64) -> Self { + unsafe { std::mem::transmute(val) } + } +} + +pub struct BufferedOperation { + pub opcode: OpCode, + pub span_id: u64, +} + +impl BufferedOperation { + pub fn from_buf(buf: &ChangeBuffer, index: &mut usize) -> Self { + BufferedOperation { + opcode: get_num_raw::(**buf, index).into(), + span_id: get_num_raw(**buf, index), + } + } +} diff --git a/libdd-trace-utils/src/change_buffer/trace.rs b/libdd-trace-utils/src/change_buffer/trace.rs new file mode 100644 index 0000000000..bede3c85cb --- /dev/null +++ b/libdd-trace-utils/src/change_buffer/trace.rs @@ -0,0 +1,17 @@ +use std::collections::HashMap; + +#[derive(Default)] +pub struct Trace { + pub meta: HashMap, + pub metrics: HashMap, + pub origin: Option, + pub sampling_rule_decision: Option, + pub sampling_limit_decision: Option, + pub sampling_agent_decision: Option, +} + +impl Trace { + pub fn new() -> Self { + Default::default() + } +} diff --git a/libdd-trace-utils/src/change_buffer/utils.rs b/libdd-trace-utils/src/change_buffer/utils.rs new file mode 100644 index 0000000000..ba028cf5ce --- /dev/null +++ b/libdd-trace-utils/src/change_buffer/utils.rs @@ -0,0 +1,36 @@ +pub trait FromBytes: Sized { + type Bytes: ?Sized; + fn from_bytes(bytes: &[u8]) -> Self; +} + +macro_rules! impl_from_bytes { + ($ty:ty, $len:expr) => { + impl FromBytes for $ty { + type Bytes = $ty; + + // Note that this always does a copy into a new variable. This is + // because the values in the buffer are not aligned. We could save + // ourselves a copy by ensuring alignment from the managed side. + fn from_bytes(bytes: &[u8]) -> Self { + let mut code_buf = [0u8; $len]; + code_buf.copy_from_slice(bytes); + <$ty>::from_le_bytes(code_buf) + } + } + }; +} + +impl_from_bytes!(u128, 16); +impl_from_bytes!(u64, 8); +impl_from_bytes!(f64, 8); +impl_from_bytes!(i64, 8); +impl_from_bytes!(i32, 4); +impl_from_bytes!(u32, 4); + +pub fn get_num_raw(buf: *const u8, index: &mut usize) -> T { + let size = std::mem::size_of::(); + let result = unsafe { std::slice::from_raw_parts(buf.add(*index), size) }; + let result = T::from_bytes(result); + *index += size; + result +} diff --git a/libdd-trace-utils/src/lib.rs b/libdd-trace-utils/src/lib.rs index 62b0b8a2aa..376cf91f37 100644 --- a/libdd-trace-utils/src/lib.rs +++ b/libdd-trace-utils/src/lib.rs @@ -20,3 +20,5 @@ pub mod tracer_header_tags; pub mod tracer_payload; pub mod span; + +pub mod change_buffer; From 4c7660768813aa297157a18b55a79989cae4ca0f Mon Sep 17 00:00:00 2001 From: Bryan English Date: Thu, 15 Jan 2026 16:29:25 -0500 Subject: [PATCH 2/3] fixes --- libdd-trace-utils/src/change_buffer/mod.rs | 120 ++++++++++++------- libdd-trace-utils/src/change_buffer/trace.rs | 6 - 2 files changed, 80 insertions(+), 46 deletions(-) diff --git a/libdd-trace-utils/src/change_buffer/mod.rs b/libdd-trace-utils/src/change_buffer/mod.rs index 4a6999049f..2bcad4a7ff 100644 --- a/libdd-trace-utils/src/change_buffer/mod.rs +++ b/libdd-trace-utils/src/change_buffer/mod.rs @@ -22,11 +22,19 @@ impl std::ops::Deref for ChangeBuffer { } } +impl Default for ChangeBuffer { + fn default() -> Self { + ChangeBuffer(std::ptr::null_mut()) + } +} + +#[derive(Default)] pub struct ChangeBufferState { change_buffer: ChangeBuffer, spans: HashMap>, traces: HashMap>, string_table: HashMap, + trace_span_counts: HashMap, tracer_service: T, tracer_language: T, pid: f64, @@ -42,43 +50,68 @@ fn new_span(span_id: u64, parent_id: u64, trace_id: u128) -> Span ChangeBufferState { - pub fn new(change_buffer: ChangeBuffer, tracer_service: T, tracer_language: T, pid: f64) -> Self { + pub fn new( + change_buffer: ChangeBuffer, + tracer_service: T, + tracer_language: T, + pid: f64, + ) -> Self { ChangeBufferState { change_buffer, - spans: HashMap::new(), - traces: HashMap::new(), - string_table: HashMap::new(), tracer_service, tracer_language, pid, + ..Default::default() } } - pub fn flush_chunk(&mut self, span_ids: Vec, first_is_local_root:bool) -> Result>> { + pub fn flush_chunk( + &mut self, + span_ids: Vec, + first_is_local_root: bool, + ) -> Result>> { let mut chunk_trace_id: Option = None; let mut is_local_root = first_is_local_root; let mut is_chunk_root = true; - let spans_vec = span_ids.iter().map(|span_id| -> Result> { - let maybe_span = self.spans.remove(span_id); + let spans_vec = span_ids + .iter() + .map(|span_id| -> Result> { + let maybe_span = self.spans.remove(span_id); - let mut span = maybe_span.ok_or_else(|| anyhow!("span not found: {}", span_id))?; - chunk_trace_id = Some(span.trace_id); + let mut span = maybe_span.ok_or_else(|| anyhow!("span not found: {}", span_id))?; + chunk_trace_id = Some(span.trace_id); - if is_local_root { - self.copy_in_sampling_tags(&mut span); - span.metrics.insert(T::from_static_str("_dd.top_level"), 1.0); - is_local_root = false; - } - if is_chunk_root { - self.copy_in_chunk_tags(&mut span); - is_chunk_root = false; - } - - self.process_one_span(&mut span); + if is_local_root { + self.copy_in_sampling_tags(&mut span); + span.metrics + .insert(T::from_static_str("_dd.top_level"), 1.0); + is_local_root = false; + } + if is_chunk_root { + self.copy_in_chunk_tags(&mut span); + is_chunk_root = false; + } - Ok(span) - }).collect::>>()?; + self.process_one_span(&mut span); + + Ok(span) + }) + .collect::>>()?; + + // Clean up trace if no spans remain for it + if let Some(trace_id) = chunk_trace_id { + if let Some(count) = self.trace_span_counts.get_mut(&trace_id) { + let len = span_ids.len(); + if *count <= len { + // All spans for this trace have been flushed + self.traces.remove(&trace_id); + self.trace_span_counts.remove(&trace_id); + } else { + *count -= len; + } + } + } Ok(spans_vec) } @@ -86,15 +119,16 @@ impl ChangeBufferState { fn copy_in_sampling_tags(&self, span: &mut Span) { if let Some(trace) = self.traces.get(&span.trace_id) { if let Some(rule) = trace.sampling_rule_decision { - span.metrics.insert(T::from_static_str("_dd.rule_psr"), rule); + span.metrics + .insert(T::from_static_str("_dd.rule_psr"), rule); } if let Some(rule) = trace.sampling_limit_decision { span.metrics.insert(T::from_static_str("_dd.;_psr"), rule); } if let Some(rule) = trace.sampling_agent_decision { - span.metrics.insert(T::from_static_str("_dd.agent_psr"), rule); + span.metrics + .insert(T::from_static_str("_dd.agent_psr"), rule); } - } } @@ -109,7 +143,7 @@ impl ChangeBufferState { fn process_one_span(&self, span: &mut Span) { // TODO span.sample(); - + if let Some(kind) = span.meta.get("kind") { if kind != &T::from_static_str("internal") { span.metrics.insert(T::from_static_str("_dd.measured"), 1.0); @@ -117,7 +151,10 @@ impl ChangeBufferState { } if span.service != self.tracer_service { - span.meta.insert(T::from_static_str("_dd.base_service"), self.tracer_service.clone()); + span.meta.insert( + T::from_static_str("_dd.base_service"), + self.tracer_service.clone(), + ); // TODO span.service should be added to the "extra services" used by RC, which is not // yet implemented here } @@ -125,8 +162,10 @@ impl ChangeBufferState { // SKIP setting single-span ingestion. They should be set when sampling is finalized for // the span. - span.meta.insert(T::from_static_str("language"), self.tracer_language.clone()); - span.metrics.insert(T::from_static_str("process_id"), self.pid); + span.meta + .insert(T::from_static_str("language"), self.tracer_language.clone()); + span.metrics + .insert(T::from_static_str("process_id"), self.pid); if let Some(trace) = self.traces.get(&span.trace_id) { if let Some(origin) = trace.origin.clone() { @@ -140,7 +179,7 @@ impl ChangeBufferState { // TODO Sampling priority, if we're not doing that ahead of time. } - pub fn flush_change_buffer (&mut self) -> Result<()>{ + pub fn flush_change_buffer(&mut self) -> Result<()> { let mut index = 0; let buf = *self.change_buffer; let mut count: u64 = get_num_raw(buf, &mut index); @@ -163,9 +202,10 @@ impl ChangeBufferState { fn get_string_arg(&self, index: &mut usize) -> Result { let num: u32 = self.get_num_arg(index); - self.string_table.get(&num).cloned().ok_or_else(|| { - anyhow!("string not found internally: {}", num) - }) + self.string_table + .get(&num) + .cloned() + .ok_or_else(|| anyhow!("string not found internally: {}", num)) } fn get_num_arg(&self, index: &mut usize) -> U { @@ -173,15 +213,15 @@ impl ChangeBufferState { } fn get_mut_span(&mut self, id: &u64) -> Result<&mut Span> { - self.spans.get_mut(id).ok_or_else(|| { - anyhow!("span not found internally: {}", id) - }) + self.spans + .get_mut(id) + .ok_or_else(|| anyhow!("span not found internally: {}", id)) } fn get_span(&self, id: &u64) -> Result<&Span> { - self.spans.get(id).ok_or_else(|| { - anyhow!("span not found internally: {}", id) - }) + self.spans + .get(id) + .ok_or_else(|| anyhow!("span not found internally: {}", id)) } fn interpret_operation(&mut self, index: &mut usize, op: &BufferedOperation) -> Result<()> { @@ -194,7 +234,7 @@ impl ChangeBufferState { // Ensure trace exists (creates new one if this is the first span for this trace) self.traces.entry(trace_id).or_default(); - // *self.trace_span_counts.entry(trace_id).or_insert(0) += 1; + *self.trace_span_counts.entry(trace_id).or_insert(0) += 1; } OpCode::SetMetaAttr => { let name = self.get_string_arg(index)?; diff --git a/libdd-trace-utils/src/change_buffer/trace.rs b/libdd-trace-utils/src/change_buffer/trace.rs index bede3c85cb..e0cdbf95e8 100644 --- a/libdd-trace-utils/src/change_buffer/trace.rs +++ b/libdd-trace-utils/src/change_buffer/trace.rs @@ -9,9 +9,3 @@ pub struct Trace { pub sampling_limit_decision: Option, pub sampling_agent_decision: Option, } - -impl Trace { - pub fn new() -> Self { - Default::default() - } -} From e39f1463e75c8e42fe7d6b3955d8595dab1f2151 Mon Sep 17 00:00:00 2001 From: Bryan English Date: Fri, 16 Jan 2026 15:57:45 -0500 Subject: [PATCH 3/3] add from --- libdd-trace-utils/src/change_buffer/mod.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/libdd-trace-utils/src/change_buffer/mod.rs b/libdd-trace-utils/src/change_buffer/mod.rs index 2bcad4a7ff..02dbf3619c 100644 --- a/libdd-trace-utils/src/change_buffer/mod.rs +++ b/libdd-trace-utils/src/change_buffer/mod.rs @@ -28,6 +28,12 @@ impl Default for ChangeBuffer { } } +impl From<*const u8> for ChangeBuffer { + fn from(val: *const u8) -> Self { + Self(val) + } +} + #[derive(Default)] pub struct ChangeBufferState { change_buffer: ChangeBuffer,