From b7384dd475f70c66ad8c1a3bf9536f0b103b628a Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 9 Jan 2026 14:22:14 -0500 Subject: [PATCH 1/6] feat: Add timeout for requests to span_dedup_service --- bottlecap/src/traces/span_dedup_service.rs | 17 ++++++++++++----- bottlecap/src/traces/trace_agent.rs | 4 ++-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/bottlecap/src/traces/span_dedup_service.rs b/bottlecap/src/traces/span_dedup_service.rs index 2cfb26d4d..bbb4072fb 100644 --- a/bottlecap/src/traces/span_dedup_service.rs +++ b/bottlecap/src/traces/span_dedup_service.rs @@ -2,7 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use tokio::sync::{mpsc, oneshot}; -use tracing::error; +use tokio::time::Duration; +use tracing::warn; use crate::traces::span_dedup::{DedupKey, Deduper}; @@ -12,6 +13,8 @@ pub enum DedupError { SendError(mpsc::error::SendError), #[error("Failed to receive response from deduper: {0}")] RecvError(oneshot::error::RecvError), + #[error("Timeout waiting for response from deduper")] + Timeout, } pub enum DedupCommand { @@ -34,14 +37,18 @@ impl DedupHandle { /// /// # Errors /// - /// Returns an error if the command cannot be sent to the deduper service - /// or if the response cannot be received. + /// Returns an error if the command cannot be sent to the deduper service, + /// if the response cannot be received, or if the operation times out after 5 seconds. pub async fn check_and_add(&self, key: DedupKey) -> Result { let (response_tx, response_rx) = oneshot::channel(); self.tx .send(DedupCommand::CheckAndAdd(key, response_tx)) .map_err(DedupError::SendError)?; - response_rx.await.map_err(DedupError::RecvError) + + tokio::time::timeout(Duration::from_secs(5), response_rx) + .await + .map_err(|_| DedupError::Timeout)? + .map_err(DedupError::RecvError) } } @@ -77,7 +84,7 @@ impl DedupService { DedupCommand::CheckAndAdd(key, response_tx) => { let was_added = self.deduper.check_and_add(key); if let Err(e) = response_tx.send(was_added) { - error!("Failed to send check_and_add response: {e:?}"); + warn!("Failed to send check_and_add response: {e:?}"); } } } diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 3ef29c783..28a622daf 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -19,7 +19,7 @@ use tokio::sync::{ }; use tokio_util::sync::CancellationToken; use tower_http::limit::RequestBodyLimitLayer; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use crate::traces::trace_processor::SendingTraceProcessor; use crate::{ @@ -556,7 +556,7 @@ impl TraceAgent { should_keep } Err(e) => { - error!("Failed to check span in deduper, keeping span: {e}"); + warn!("Failed to check span in deduper, keeping span: {e}"); true } }; From 6b30126bea37e30e5ef986fd984c49d725c025e2 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Mon, 12 Jan 2026 10:21:31 -0500 Subject: [PATCH 2/6] Add comment --- bottlecap/src/traces/span_dedup_service.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bottlecap/src/traces/span_dedup_service.rs b/bottlecap/src/traces/span_dedup_service.rs index bbb4072fb..bd59c7425 100644 --- a/bottlecap/src/traces/span_dedup_service.rs +++ b/bottlecap/src/traces/span_dedup_service.rs @@ -45,6 +45,9 @@ impl DedupHandle { .send(DedupCommand::CheckAndAdd(key, response_tx)) .map_err(DedupError::SendError)?; + // Sometimes the dedup service fails to send a response for unknown reasons, so we + // timeout after 5 seconds to avoid blocking the caller forever. We may remove the + // timeout if we can figure out and fix the root cause. tokio::time::timeout(Duration::from_secs(5), response_rx) .await .map_err(|_| DedupError::Timeout)? From 1c03ea5e3f39c266d3add6d87e50ff275fffd402 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Mon, 12 Jan 2026 14:53:32 -0500 Subject: [PATCH 3/6] fmt --- bottlecap/src/traces/span_dedup_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/traces/span_dedup_service.rs b/bottlecap/src/traces/span_dedup_service.rs index bd59c7425..8c68f23ea 100644 --- a/bottlecap/src/traces/span_dedup_service.rs +++ b/bottlecap/src/traces/span_dedup_service.rs @@ -45,7 +45,7 @@ impl DedupHandle { .send(DedupCommand::CheckAndAdd(key, response_tx)) .map_err(DedupError::SendError)?; - // Sometimes the dedup service fails to send a response for unknown reasons, so we + // Sometimes the dedup service fails to send a response for unknown reasons, so we // timeout after 5 seconds to avoid blocking the caller forever. We may remove the // timeout if we can figure out and fix the root cause. tokio::time::timeout(Duration::from_secs(5), response_rx) From 89cf322472f646ff3e3088a36cec548b6333ed0b Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Mon, 12 Jan 2026 17:21:02 -0500 Subject: [PATCH 4/6] Gate behind DD_SPAN_DEDUP_TIMEOUT --- bottlecap/src/config/env.rs | 10 ++++++++++ bottlecap/src/config/mod.rs | 2 ++ bottlecap/src/config/yaml.rs | 1 + bottlecap/src/traces/span_dedup_service.rs | 20 ++++++++++++++------ bottlecap/src/traces/trace_agent.rs | 2 +- 5 files changed, 28 insertions(+), 7 deletions(-) diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index af68a7a39..0bbc199ef 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -417,6 +417,13 @@ pub struct EnvConfig { /// Default is `false`. #[serde(deserialize_with = "deserialize_optional_bool_from_anything")] pub compute_trace_stats_on_extension: Option, + /// @env `DD_SPAN_DEDUP_TIMEOUT` + /// + /// The timeout for the span deduplication service to check if a span key exists, in seconds. + /// For now, this is a temporary field added to debug the failure of `check_and_add()` in span dedup service. + /// Do not use this field extensively in production. + #[serde(deserialize_with = "deserialize_optional_duration_from_seconds_ignore_zero")] + pub span_dedup_timeout: Option, /// @env `DD_API_KEY_SECRET_RELOAD_INTERVAL` /// /// The interval at which the Datadog API key is reloaded, in seconds. @@ -640,6 +647,7 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) { merge_option_to_value!(config, env_config, capture_lambda_payload); merge_option_to_value!(config, env_config, capture_lambda_payload_max_depth); merge_option_to_value!(config, env_config, compute_trace_stats_on_extension); + merge_option!(config, env_config, span_dedup_timeout); merge_option!(config, env_config, api_key_secret_reload_interval); merge_option_to_value!(config, env_config, serverless_appsec_enabled); merge_option!(config, env_config, appsec_rules); @@ -835,6 +843,7 @@ mod tests { jail.set_env("DD_CAPTURE_LAMBDA_PAYLOAD", "true"); jail.set_env("DD_CAPTURE_LAMBDA_PAYLOAD_MAX_DEPTH", "5"); jail.set_env("DD_COMPUTE_TRACE_STATS_ON_EXTENSION", "true"); + jail.set_env("DD_SPAN_DEDUP_TIMEOUT", "5"); jail.set_env("DD_API_KEY_SECRET_RELOAD_INTERVAL", "10"); jail.set_env("DD_SERVERLESS_APPSEC_ENABLED", "true"); jail.set_env("DD_APPSEC_RULES", "/path/to/rules.json"); @@ -988,6 +997,7 @@ mod tests { capture_lambda_payload: true, capture_lambda_payload_max_depth: 5, compute_trace_stats_on_extension: true, + span_dedup_timeout: Some(Duration::from_secs(5)), api_key_secret_reload_interval: Some(Duration::from_secs(10)), serverless_appsec_enabled: true, appsec_rules: Some("/path/to/rules.json".to_string()), diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 997e25807..48bc3c305 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -346,6 +346,7 @@ pub struct Config { pub capture_lambda_payload: bool, pub capture_lambda_payload_max_depth: u32, pub compute_trace_stats_on_extension: bool, + pub span_dedup_timeout: Option, pub api_key_secret_reload_interval: Option, pub serverless_appsec_enabled: bool, @@ -451,6 +452,7 @@ impl Default for Config { capture_lambda_payload: false, capture_lambda_payload_max_depth: 10, compute_trace_stats_on_extension: false, + span_dedup_timeout: None, api_key_secret_reload_interval: None, serverless_appsec_enabled: false, diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index 8075ef361..dbac692b2 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -995,6 +995,7 @@ api_security_sample_delay: 60 # Seconds capture_lambda_payload: true, capture_lambda_payload_max_depth: 5, compute_trace_stats_on_extension: true, + span_dedup_timeout: None, api_key_secret_reload_interval: None, serverless_appsec_enabled: true, diff --git a/bottlecap/src/traces/span_dedup_service.rs b/bottlecap/src/traces/span_dedup_service.rs index 8c68f23ea..44f0c692f 100644 --- a/bottlecap/src/traces/span_dedup_service.rs +++ b/bottlecap/src/traces/span_dedup_service.rs @@ -39,19 +39,27 @@ impl DedupHandle { /// /// Returns an error if the command cannot be sent to the deduper service, /// if the response cannot be received, or if the operation times out after 5 seconds. - pub async fn check_and_add(&self, key: DedupKey) -> Result { + pub async fn check_and_add( + &self, + key: DedupKey, + timeout: Option, + ) -> Result { let (response_tx, response_rx) = oneshot::channel(); self.tx .send(DedupCommand::CheckAndAdd(key, response_tx)) .map_err(DedupError::SendError)?; // Sometimes the dedup service fails to send a response for unknown reasons, so we - // timeout after 5 seconds to avoid blocking the caller forever. We may remove the + // add a timeout to avoid blocking the caller forever. We may remove the // timeout if we can figure out and fix the root cause. - tokio::time::timeout(Duration::from_secs(5), response_rx) - .await - .map_err(|_| DedupError::Timeout)? - .map_err(DedupError::RecvError) + if let Some(timeout) = timeout { + tokio::time::timeout(timeout, response_rx) + .await + .map_err(|_| DedupError::Timeout)? + .map_err(DedupError::RecvError) + } else { + response_rx.await.map_err(DedupError::RecvError) + } } } diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 28a622daf..305fc9186 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -545,7 +545,7 @@ impl TraceAgent { for mut span in original_chunk { // Check for duplicates let key = DedupKey::new(span.trace_id, span.span_id); - let should_keep = match deduper.check_and_add(key).await { + let should_keep = match deduper.check_and_add(key, config.span_dedup_timeout).await { Ok(should_keep) => { if !should_keep { debug!( From 5c97b41089f9feeef65a557e86f893c927030246 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Mon, 12 Jan 2026 20:21:59 -0500 Subject: [PATCH 5/6] fmt --- bottlecap/src/traces/trace_agent.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 305fc9186..fd72ec251 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -545,7 +545,8 @@ impl TraceAgent { for mut span in original_chunk { // Check for duplicates let key = DedupKey::new(span.trace_id, span.span_id); - let should_keep = match deduper.check_and_add(key, config.span_dedup_timeout).await { + let should_keep = match deduper.check_and_add(key, config.span_dedup_timeout).await + { Ok(should_keep) => { if !should_keep { debug!( From 3a57dfb5b16493bdc54f960e787fde25ee09b748 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Mon, 12 Jan 2026 22:24:58 -0500 Subject: [PATCH 6/6] Fix tests --- bottlecap/src/traces/span_dedup_service.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/bottlecap/src/traces/span_dedup_service.rs b/bottlecap/src/traces/span_dedup_service.rs index 44f0c692f..75ca941ff 100644 --- a/bottlecap/src/traces/span_dedup_service.rs +++ b/bottlecap/src/traces/span_dedup_service.rs @@ -130,17 +130,17 @@ mod tests { let key2 = DedupKey::new(100, 456); // First call should return true (key was added) - assert!(handle.check_and_add(key1).await.unwrap()); + assert!(handle.check_and_add(key1, None).await.unwrap()); // Second call should return false (key already exists) - assert!(!handle.check_and_add(key1).await.unwrap()); + assert!(!handle.check_and_add(key1, None).await.unwrap()); // Different key should return true again - assert!(handle.check_and_add(key2).await.unwrap()); + assert!(handle.check_and_add(key2, None).await.unwrap()); // Calling again on already-added keys should return false - assert!(!handle.check_and_add(key1).await.unwrap()); - assert!(!handle.check_and_add(key2).await.unwrap()); + assert!(!handle.check_and_add(key1, None).await.unwrap()); + assert!(!handle.check_and_add(key2, None).await.unwrap()); } #[tokio::test] @@ -157,17 +157,17 @@ mod tests { let key4 = DedupKey::new(4, 40); // Add 3 keys - assert!(handle.check_and_add(key1).await.unwrap()); - assert!(handle.check_and_add(key2).await.unwrap()); - assert!(handle.check_and_add(key3).await.unwrap()); + assert!(handle.check_and_add(key1, None).await.unwrap()); + assert!(handle.check_and_add(key2, None).await.unwrap()); + assert!(handle.check_and_add(key3, None).await.unwrap()); // Add a 4th key, should evict the oldest (key1) - assert!(handle.check_and_add(key4).await.unwrap()); + assert!(handle.check_and_add(key4, None).await.unwrap()); // Now key1 should be addable again (was evicted) - assert!(handle.check_and_add(key1).await.unwrap()); + assert!(handle.check_and_add(key1, None).await.unwrap()); // But key2 should now be evicted - assert!(handle.check_and_add(key2).await.unwrap()); + assert!(handle.check_and_add(key2, None).await.unwrap()); } }