From d38b4ea753a9d3d9bd00073252dd03150ddba790 Mon Sep 17 00:00:00 2001 From: Caleb Metz Date: Tue, 13 Jan 2026 13:58:37 -0500 Subject: [PATCH] adds `total_bytes_received` metric Signed-off-by: Caleb Metz --- lading/src/blackhole/common.rs | 5 +++++ lading/src/blackhole/datadog.rs | 11 ++++++++++- lading/src/blackhole/datadog_stateful_logs.rs | 7 +++++-- lading/src/blackhole/http.rs | 7 +++++-- lading/src/blackhole/otlp.rs | 1 + lading/src/blackhole/otlp/grpc.rs | 16 ++++++++++------ lading/src/blackhole/otlp/http.rs | 7 +++++-- lading/src/blackhole/splunk_hec.rs | 7 +++++-- lading/src/blackhole/sqs.rs | 7 +++++-- lading/src/blackhole/tcp.rs | 6 +++++- lading/src/blackhole/udp.rs | 3 +++ lading/src/blackhole/unix_datagram.rs | 3 +++ lading/src/blackhole/unix_stream.rs | 6 +++++- 13 files changed, 67 insertions(+), 19 deletions(-) diff --git a/lading/src/blackhole/common.rs b/lading/src/blackhole/common.rs index 7ea3b54e6..8d03947eb 100644 --- a/lading/src/blackhole/common.rs +++ b/lading/src/blackhole/common.rs @@ -16,6 +16,11 @@ use tokio::{ }; use tracing::{debug, error, info, warn}; +/// Common labels for metrics shared by all blackholes. +/// This ensures a single aggregated series across all blackhole types. +/// Only `component` label is needed to ensure a single aggregated series across all blackhole types. +pub(super) static COMMON_BLACKHOLE_LABELS: &[(&str, &str)] = &[("component", "blackhole")]; + #[derive(thiserror::Error, Debug)] pub enum Error { /// Wrapper for [`std::io::Error`]. diff --git a/lading/src/blackhole/datadog.rs b/lading/src/blackhole/datadog.rs index 1340dd0ae..8fbec612c 100644 --- a/lading/src/blackhole/datadog.rs +++ b/lading/src/blackhole/datadog.rs @@ -5,6 +5,12 @@ //! //! All other endpoints return `202 Accepted` without processing. //! +//! ## Metrics +//! +//! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types +//! `requests_received`: Total requests received +//! //! # Payload //! //! The V2 protobuf format is defined in `proto/agent_payload.proto`. @@ -39,6 +45,7 @@ use tokio::net::TcpListener; use tracing::{debug, error, info, trace, warn}; use super::General; +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; use crate::proto::datadog::intake::metrics::MetricPayload; #[derive(thiserror::Error, Debug)] @@ -212,7 +219,9 @@ async fn handle_request( } }; - counter!("bytes_received", labels).increment(whole_body.len() as u64); + let body_len = whole_body.len() as u64; + counter!("bytes_received", labels).increment(body_len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(body_len); let content_type = headers .get(header::CONTENT_TYPE) diff --git a/lading/src/blackhole/datadog_stateful_logs.rs b/lading/src/blackhole/datadog_stateful_logs.rs index 2e5a91b5e..2a641b991 100644 --- a/lading/src/blackhole/datadog_stateful_logs.rs +++ b/lading/src/blackhole/datadog_stateful_logs.rs @@ -6,6 +6,7 @@ //! ## Metrics //! //! - `bytes_received`: Total bytes received +//! - `total_bytes_received`: Aggregated bytes received across all blackhole types //! - `streams_received`: Total streams received //! - `batches_received`: Total batches received //! - `data_items_received`: Total data items in batches @@ -29,6 +30,7 @@ use tonic::{Request, Response, Status, transport}; use tracing::{error, info}; use super::General; +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors produced by [`DatadogStatefulLogs`]. @@ -183,9 +185,10 @@ impl StatefulLogsService for StatefulLogsServiceImpl { match result { Ok(batch) => { let batch_id = batch.batch_id; - let size = batch.encoded_len(); + let size = batch.encoded_len() as u64; - counter!("bytes_received", &labels).increment(size as u64); + counter!("bytes_received", &labels).increment(size); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(size); counter!("batches_received", &labels).increment(1); // Count data items in the batch diff --git a/lading/src/blackhole/http.rs b/lading/src/blackhole/http.rs index a627ff925..c2c258dc7 100644 --- a/lading/src/blackhole/http.rs +++ b/lading/src/blackhole/http.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `bytes_received_distr`: Distribution of compressed bytes per request (with `path` label) //! `decoded_bytes_received`: Total decoded bytes received //! `decoded_bytes_received_distr`: Distribution of decompressed bytes per request (with `path` label) @@ -19,7 +20,7 @@ use std::{net::SocketAddr, time::Duration}; use tracing::error; use super::General; -use crate::blackhole::common; +use crate::blackhole::common::{self, COMMON_BLACKHOLE_LABELS}; fn default_concurrent_requests_max() -> usize { 100 @@ -160,7 +161,9 @@ async fn srv( let body: Bytes = body.boxed().collect().await?.to_bytes(); - counter!("bytes_received", &metric_labels).increment(body.len() as u64); + let body_len = body.len() as u64; + counter!("bytes_received", &metric_labels).increment(body_len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(body_len); let mut labels_with_path = metric_labels.clone(); labels_with_path.push(("path".to_string(), path)); diff --git a/lading/src/blackhole/otlp.rs b/lading/src/blackhole/otlp.rs index 20106700e..d8cd3d52b 100644 --- a/lading/src/blackhole/otlp.rs +++ b/lading/src/blackhole/otlp.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `requests_received`: Total requests received //! `metrics_received`: Number of metric data points received //! `spans_received`: Number of spans received diff --git a/lading/src/blackhole/otlp/grpc.rs b/lading/src/blackhole/otlp/grpc.rs index e86c215c6..e1cd6772d 100644 --- a/lading/src/blackhole/otlp/grpc.rs +++ b/lading/src/blackhole/otlp/grpc.rs @@ -1,5 +1,6 @@ //! gRPC implementation of the OTLP blackhole. +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; use metrics::counter; use opentelemetry_proto::tonic::collector::logs::v1::{ ExportLogsServiceRequest, ExportLogsServiceResponse, @@ -83,9 +84,10 @@ impl MetricsService for OtlpMetricsService { request: tonic::Request, ) -> Result, Status> { let request = request.into_inner(); - let size = request.encoded_len(); + let size = request.encoded_len() as u64; - counter!("bytes_received", &self.labels).increment(size as u64); + counter!("bytes_received", &self.labels).increment(size); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(size); counter!("requests_received", &self.labels).increment(1); let mut total_points: u64 = 0; @@ -132,9 +134,10 @@ impl TraceService for OtlpTracesService { request: tonic::Request, ) -> Result, Status> { let request = request.into_inner(); - let size = request.encoded_len(); + let size = request.encoded_len() as u64; - counter!("bytes_received", &self.labels).increment(size as u64); + counter!("bytes_received", &self.labels).increment(size); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(size); counter!("requests_received", &self.labels).increment(1); let mut total_spans: u64 = 0; @@ -171,9 +174,10 @@ impl LogsService for OtlpLogsService { request: tonic::Request, ) -> Result, Status> { let request = request.into_inner(); - let size = request.encoded_len(); + let size = request.encoded_len() as u64; - counter!("bytes_received", &self.labels).increment(size as u64); + counter!("bytes_received", &self.labels).increment(size); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(size); counter!("requests_received", &self.labels).increment(1); let mut total_logs: u64 = 0; diff --git a/lading/src/blackhole/otlp/http.rs b/lading/src/blackhole/otlp/http.rs index 92a334918..bd5c30573 100644 --- a/lading/src/blackhole/otlp/http.rs +++ b/lading/src/blackhole/otlp/http.rs @@ -21,7 +21,7 @@ use std::time::Duration; use tokio::task::JoinHandle; use tracing::{error, info}; -use crate::blackhole::common; +use crate::blackhole::common::{self, COMMON_BLACKHOLE_LABELS}; /// Run the HTTP server for OTLP pub(crate) fn run_server( @@ -197,6 +197,7 @@ impl OtlpHttpHandler { && length == 0 { counter!("bytes_received", &self.labels).increment(0); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(0); let (response_bytes, content_type) = match (path_ref, response_format) { ("/v1/metrics", ResponseFormat::Json) => ( @@ -236,7 +237,9 @@ impl OtlpHttpHandler { let body_bytes = body.collect().await?.to_bytes(); - counter!("bytes_received", &self.labels).increment(body_bytes.len() as u64); + let body_len = body_bytes.len() as u64; + counter!("bytes_received", &self.labels).increment(body_len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(body_len); let response_bytes = match crate::codec::decode(content_encoding.as_ref(), body_bytes.clone()) { diff --git a/lading/src/blackhole/splunk_hec.rs b/lading/src/blackhole/splunk_hec.rs index 1599fbc70..4f4ab9151 100644 --- a/lading/src/blackhole/splunk_hec.rs +++ b/lading/src/blackhole/splunk_hec.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `requests_received`: Total requests received //! @@ -19,7 +20,7 @@ use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use super::General; -use crate::blackhole::common; +use crate::blackhole::common::{self, COMMON_BLACKHOLE_LABELS}; static ACK_ID: AtomicU64 = AtomicU64::new(0); @@ -102,7 +103,9 @@ async fn srv( let (parts, body) = req.into_parts(); let bytes = body.boxed().collect().await?.to_bytes(); - counter!("bytes_received", &*labels).increment(bytes.len() as u64); + let bytes_len = bytes.len() as u64; + counter!("bytes_received", &*labels).increment(bytes_len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(bytes_len); match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) { Err(response) => Ok(*response), diff --git a/lading/src/blackhole/sqs.rs b/lading/src/blackhole/sqs.rs index 2cae450f2..ee33d2826 100644 --- a/lading/src/blackhole/sqs.rs +++ b/lading/src/blackhole/sqs.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `requests_received`: Total messages received //! @@ -16,7 +17,7 @@ use std::{fmt::Write, net::SocketAddr}; use tracing::{debug, error}; use super::General; -use crate::blackhole::common; +use crate::blackhole::common::{self, COMMON_BLACKHOLE_LABELS}; #[derive(thiserror::Error, Debug)] /// Errors produced by [`Sqs`] @@ -235,7 +236,9 @@ async fn srv( let (_, body) = req.into_parts(); let bytes = body.boxed().collect().await?.to_bytes(); - counter!("bytes_received", &metric_labels).increment(bytes.len() as u64); + let bytes_len = bytes.len() as u64; + counter!("bytes_received", &metric_labels).increment(bytes_len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(bytes_len); let action = match serde_qs::from_bytes::(&bytes) { Ok(a) => a, diff --git a/lading/src/blackhole/tcp.rs b/lading/src/blackhole/tcp.rs index b8a0603c1..a211678bd 100644 --- a/lading/src/blackhole/tcp.rs +++ b/lading/src/blackhole/tcp.rs @@ -4,6 +4,7 @@ //! //! `connection_accepted`: Incoming connections received //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `message_received`: Total messages received //! @@ -17,6 +18,7 @@ use tokio_util::io::ReaderStream; use tracing::info; use super::General; +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors emitted by [`Tcp`] @@ -85,7 +87,9 @@ impl Tcp { while let Some(msg) = stream.next().await { counter!("message_received", labels).increment(1); if let Ok(msg) = msg { - counter!("bytes_received", labels).increment(msg.len() as u64); + let len = msg.len() as u64; + counter!("bytes_received", labels).increment(len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(len); } } } diff --git a/lading/src/blackhole/udp.rs b/lading/src/blackhole/udp.rs index 896a33f50..92fa2310d 100644 --- a/lading/src/blackhole/udp.rs +++ b/lading/src/blackhole/udp.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `packet_received`: Total packets received //! @@ -14,6 +15,7 @@ use tokio::net::UdpSocket; use tracing::info; use super::General; +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors produced by [`Udp`]. @@ -109,6 +111,7 @@ impl Udp { })?; counter!("packet_received", &self.metric_labels).increment(1); counter!("bytes_received", &self.metric_labels).increment(bytes as u64); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(bytes as u64); } () = &mut shutdown_wait => { info!("shutdown signal received"); diff --git a/lading/src/blackhole/unix_datagram.rs b/lading/src/blackhole/unix_datagram.rs index 88c494b85..cba28f107 100644 --- a/lading/src/blackhole/unix_datagram.rs +++ b/lading/src/blackhole/unix_datagram.rs @@ -3,6 +3,7 @@ //! ## Metrics //! //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! use std::{io, path::PathBuf}; @@ -14,6 +15,7 @@ use tokio::net; use tracing::info; use super::General; +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors produced by [`UnixDatagram`]. @@ -108,6 +110,7 @@ impl UnixDatagram { source: Box::new(source), })?; counter!("bytes_received", &self.metric_labels).increment(n as u64); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(n as u64); } () = &mut shutdown_wait => { info!("shutdown signal received"); diff --git a/lading/src/blackhole/unix_stream.rs b/lading/src/blackhole/unix_stream.rs index a1e90a2b9..73fcfea8d 100644 --- a/lading/src/blackhole/unix_stream.rs +++ b/lading/src/blackhole/unix_stream.rs @@ -4,6 +4,7 @@ //! //! `connection_accepted`: Incoming connections received //! `bytes_received`: Total bytes received +//! `total_bytes_received`: Aggregated bytes received across all blackhole types //! `requests_received`: Total requests received //! @@ -17,6 +18,7 @@ use tokio_util::io::ReaderStream; use tracing::info; use super::General; +use crate::blackhole::common::COMMON_BLACKHOLE_LABELS; #[derive(thiserror::Error, Debug)] /// Errors produced by [`UnixStream`]. @@ -127,7 +129,9 @@ impl UnixStream { while let Some(msg) = stream.next().await { counter!("message_received", labels).increment(1); if let Ok(msg) = msg { - counter!("bytes_received", labels).increment(msg.len() as u64); + let len = msg.len() as u64; + counter!("bytes_received", labels).increment(len); + counter!("total_bytes_received", COMMON_BLACKHOLE_LABELS).increment(len); } } }