diff --git a/libdd-trace-utils/src/change_buffer/mod.rs b/libdd-trace-utils/src/change_buffer/mod.rs new file mode 100644 index 0000000000..02dbf3619c --- /dev/null +++ b/libdd-trace-utils/src/change_buffer/mod.rs @@ -0,0 +1,351 @@ +use std::collections::HashMap; + +use anyhow::{anyhow, Result}; + +mod utils; +use utils::*; + +mod trace; +use trace::*; + +use crate::span::{Span, SpanText}; + +#[derive(Clone, Copy)] +pub struct ChangeBuffer(*const u8); +unsafe impl Send for ChangeBuffer {} +unsafe impl Sync for ChangeBuffer {} + +impl std::ops::Deref for ChangeBuffer { + type Target = *const u8; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Default for ChangeBuffer { + fn default() -> Self { + ChangeBuffer(std::ptr::null_mut()) + } +} + +impl From<*const u8> for ChangeBuffer { + fn from(val: *const u8) -> Self { + Self(val) + } +} + +#[derive(Default)] +pub struct ChangeBufferState { + change_buffer: ChangeBuffer, + spans: HashMap>, + traces: HashMap>, + string_table: HashMap, + trace_span_counts: HashMap, + tracer_service: T, + tracer_language: T, + pid: f64, +} + +fn new_span(span_id: u64, parent_id: u64, trace_id: u128) -> Span { + Span { + span_id, + trace_id, + parent_id, + ..Default::default() + } +} + +impl ChangeBufferState { + pub fn new( + change_buffer: ChangeBuffer, + tracer_service: T, + tracer_language: T, + pid: f64, + ) -> Self { + ChangeBufferState { + change_buffer, + tracer_service, + tracer_language, + pid, + ..Default::default() + } + } + + pub fn flush_chunk( + &mut self, + span_ids: Vec, + first_is_local_root: bool, + ) -> Result>> { + let mut chunk_trace_id: Option = None; + let mut is_local_root = first_is_local_root; + let mut is_chunk_root = true; + + let spans_vec = span_ids + .iter() + .map(|span_id| -> Result> { + let maybe_span = self.spans.remove(span_id); + + let mut span = maybe_span.ok_or_else(|| anyhow!("span not found: {}", span_id))?; + chunk_trace_id = Some(span.trace_id); + + if is_local_root { + self.copy_in_sampling_tags(&mut span); + span.metrics + .insert(T::from_static_str("_dd.top_level"), 1.0); + is_local_root = false; + } + if is_chunk_root { + self.copy_in_chunk_tags(&mut span); + is_chunk_root = false; + } + + self.process_one_span(&mut span); + + Ok(span) + }) + .collect::>>()?; + + // Clean up trace if no spans remain for it + if let Some(trace_id) = chunk_trace_id { + if let Some(count) = self.trace_span_counts.get_mut(&trace_id) { + let len = span_ids.len(); + if *count <= len { + // All spans for this trace have been flushed + self.traces.remove(&trace_id); + self.trace_span_counts.remove(&trace_id); + } else { + *count -= len; + } + } + } + + Ok(spans_vec) + } + + fn copy_in_sampling_tags(&self, span: &mut Span) { + if let Some(trace) = self.traces.get(&span.trace_id) { + if let Some(rule) = trace.sampling_rule_decision { + span.metrics + .insert(T::from_static_str("_dd.rule_psr"), rule); + } + if let Some(rule) = trace.sampling_limit_decision { + span.metrics.insert(T::from_static_str("_dd.;_psr"), rule); + } + if let Some(rule) = trace.sampling_agent_decision { + span.metrics + .insert(T::from_static_str("_dd.agent_psr"), rule); + } + } + } + + fn copy_in_chunk_tags(&self, span: &mut Span) { + if let Some(trace) = self.traces.get(&span.trace_id) { + span.meta.reserve(trace.meta.len()); + span.meta.extend(trace.meta.clone()); + span.metrics.reserve(trace.metrics.len()); + span.metrics.extend(trace.metrics.clone()); + } + } + + fn process_one_span(&self, span: &mut Span) { + // TODO span.sample(); + + if let Some(kind) = span.meta.get("kind") { + if kind != &T::from_static_str("internal") { + span.metrics.insert(T::from_static_str("_dd.measured"), 1.0); + } + } + + if span.service != self.tracer_service { + span.meta.insert( + T::from_static_str("_dd.base_service"), + self.tracer_service.clone(), + ); + // TODO span.service should be added to the "extra services" used by RC, which is not + // yet implemented here + } + + // SKIP setting single-span ingestion. They should be set when sampling is finalized for + // the span. + + span.meta + .insert(T::from_static_str("language"), self.tracer_language.clone()); + span.metrics + .insert(T::from_static_str("process_id"), self.pid); + + if let Some(trace) = self.traces.get(&span.trace_id) { + if let Some(origin) = trace.origin.clone() { + span.meta.insert(T::from_static_str("_dd.origin"), origin); + } + } + + // SKIP hostname. This can be an option to the span constructor, so we'll set the tag at + // that point. + + // TODO Sampling priority, if we're not doing that ahead of time. + } + + pub fn flush_change_buffer(&mut self) -> Result<()> { + let mut index = 0; + let buf = *self.change_buffer; + let mut count: u64 = get_num_raw(buf, &mut index); + + while count > 0 { + let op = BufferedOperation::from_buf(&self.change_buffer, &mut index); + self.interpret_operation(&mut index, &op)?; + count -= 1; + } + + // Write 0 back to the count position so the writing side of the buffer knows the queue was + // flushed + let buf_mut = buf as *mut u8; + unsafe { + std::ptr::copy_nonoverlapping([0u8; 8].as_ptr(), buf_mut, 8); + } + + Ok(()) + } + + fn get_string_arg(&self, index: &mut usize) -> Result { + let num: u32 = self.get_num_arg(index); + self.string_table + .get(&num) + .cloned() + .ok_or_else(|| anyhow!("string not found internally: {}", num)) + } + + fn get_num_arg(&self, index: &mut usize) -> U { + get_num_raw(*self.change_buffer, index) + } + + fn get_mut_span(&mut self, id: &u64) -> Result<&mut Span> { + self.spans + .get_mut(id) + .ok_or_else(|| anyhow!("span not found internally: {}", id)) + } + + fn get_span(&self, id: &u64) -> Result<&Span> { + self.spans + .get(id) + .ok_or_else(|| anyhow!("span not found internally: {}", id)) + } + + fn interpret_operation(&mut self, index: &mut usize, op: &BufferedOperation) -> Result<()> { + match op.opcode { + OpCode::Create => { + let trace_id: u128 = self.get_num_arg(index); + let parent_id = self.get_num_arg(index); + let span = new_span(op.span_id, parent_id, trace_id); + self.spans.insert(op.span_id, span); + // Ensure trace exists (creates new one if this is the first span for this trace) + self.traces.entry(trace_id).or_default(); + + *self.trace_span_counts.entry(trace_id).or_insert(0) += 1; + } + OpCode::SetMetaAttr => { + let name = self.get_string_arg(index)?; + let val = self.get_string_arg(index)?; + let span = self.get_mut_span(&op.span_id)?; + span.meta.insert(name, val); + } + OpCode::SetMetricAttr => { + let name = self.get_string_arg(index)?; + let val: f64 = self.get_num_arg(index); + let span = self.get_mut_span(&op.span_id)?; + span.metrics.insert(name, val); + } + OpCode::SetServiceName => { + self.get_mut_span(&op.span_id)?.service = self.get_string_arg(index)?; + } + OpCode::SetResourceName => { + self.get_mut_span(&op.span_id)?.resource = self.get_string_arg(index)?; + } + OpCode::SetError => { + self.get_mut_span(&op.span_id)?.error = self.get_num_arg(index); + } + OpCode::SetStart => { + self.get_mut_span(&op.span_id)?.start = self.get_num_arg(index); + } + OpCode::SetDuration => { + self.get_mut_span(&op.span_id)?.duration = self.get_num_arg(index); + } + OpCode::SetType => { + self.get_mut_span(&op.span_id)?.r#type = self.get_string_arg(index)?; + } + OpCode::SetName => { + self.get_mut_span(&op.span_id)?.name = self.get_string_arg(index)?; + } + OpCode::SetTraceMetaAttr => { + let name = self.get_string_arg(index)?; + let val = self.get_string_arg(index)?; + let trace_id = self.get_span(&op.span_id)?.trace_id; + if let Some(trace) = self.traces.get_mut(&trace_id) { + trace.meta.insert(name, val); + } + } + OpCode::SetTraceMetricsAttr => { + let name = self.get_string_arg(index)?; + let val = self.get_num_arg(index); + let trace_id = self.get_span(&op.span_id)?.trace_id; + if let Some(trace) = self.traces.get_mut(&trace_id) { + trace.metrics.insert(name, val); + } + } + OpCode::SetTraceOrigin => { + let origin = self.get_string_arg(index)?; + let trace_id = self.get_span(&op.span_id)?.trace_id; + if let Some(trace) = self.traces.get_mut(&trace_id) { + trace.origin = Some(origin); + } + } + }; + + Ok(()) + } + + pub fn string_table_insert_one(&mut self, key: u32, val: T) { + self.string_table.insert(key, val); + } + + pub fn string_table_evict_one(&mut self, key: u32) { + self.string_table.remove(&key); + } +} + +#[repr(u64)] +pub enum OpCode { + Create = 0, + SetMetaAttr = 1, + SetMetricAttr = 2, + SetServiceName = 3, + SetResourceName = 4, + SetError = 5, + SetStart = 6, + SetDuration = 7, + SetType = 8, + SetName = 9, + SetTraceMetaAttr = 10, + SetTraceMetricsAttr = 11, + SetTraceOrigin = 12, + // TODO: SpanLinks, SpanEvents, StructAttr +} + +impl From for OpCode { + fn from(val: u64) -> Self { + unsafe { std::mem::transmute(val) } + } +} + +pub struct BufferedOperation { + pub opcode: OpCode, + pub span_id: u64, +} + +impl BufferedOperation { + pub fn from_buf(buf: &ChangeBuffer, index: &mut usize) -> Self { + BufferedOperation { + opcode: get_num_raw::(**buf, index).into(), + span_id: get_num_raw(**buf, index), + } + } +} diff --git a/libdd-trace-utils/src/change_buffer/trace.rs b/libdd-trace-utils/src/change_buffer/trace.rs new file mode 100644 index 0000000000..e0cdbf95e8 --- /dev/null +++ b/libdd-trace-utils/src/change_buffer/trace.rs @@ -0,0 +1,11 @@ +use std::collections::HashMap; + +#[derive(Default)] +pub struct Trace { + pub meta: HashMap, + pub metrics: HashMap, + pub origin: Option, + pub sampling_rule_decision: Option, + pub sampling_limit_decision: Option, + pub sampling_agent_decision: Option, +} diff --git a/libdd-trace-utils/src/change_buffer/utils.rs b/libdd-trace-utils/src/change_buffer/utils.rs new file mode 100644 index 0000000000..ba028cf5ce --- /dev/null +++ b/libdd-trace-utils/src/change_buffer/utils.rs @@ -0,0 +1,36 @@ +pub trait FromBytes: Sized { + type Bytes: ?Sized; + fn from_bytes(bytes: &[u8]) -> Self; +} + +macro_rules! impl_from_bytes { + ($ty:ty, $len:expr) => { + impl FromBytes for $ty { + type Bytes = $ty; + + // Note that this always does a copy into a new variable. This is + // because the values in the buffer are not aligned. We could save + // ourselves a copy by ensuring alignment from the managed side. + fn from_bytes(bytes: &[u8]) -> Self { + let mut code_buf = [0u8; $len]; + code_buf.copy_from_slice(bytes); + <$ty>::from_le_bytes(code_buf) + } + } + }; +} + +impl_from_bytes!(u128, 16); +impl_from_bytes!(u64, 8); +impl_from_bytes!(f64, 8); +impl_from_bytes!(i64, 8); +impl_from_bytes!(i32, 4); +impl_from_bytes!(u32, 4); + +pub fn get_num_raw(buf: *const u8, index: &mut usize) -> T { + let size = std::mem::size_of::(); + let result = unsafe { std::slice::from_raw_parts(buf.add(*index), size) }; + let result = T::from_bytes(result); + *index += size; + result +} diff --git a/libdd-trace-utils/src/lib.rs b/libdd-trace-utils/src/lib.rs index 62b0b8a2aa..376cf91f37 100644 --- a/libdd-trace-utils/src/lib.rs +++ b/libdd-trace-utils/src/lib.rs @@ -20,3 +20,5 @@ pub mod tracer_header_tags; pub mod tracer_payload; pub mod span; + +pub mod change_buffer;