Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
**Features**:

- Double write to legacy attributes for backwards compatibility. ([#5490](https://github.com/getsentry/relay/pull/5490))
- Forbid invalid profile chunks without a platform header. ([#5507](https://github.com/getsentry/relay/pull/5507))

**Internal**:

- Moves profile chunk processing to the new internal processing pipeline. ([#5505](https://github.com/getsentry/relay/pull/5505))

## 25.12.0

Expand Down
2 changes: 1 addition & 1 deletion relay-profiling/src/outcomes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::ProfileError;

pub fn discard_reason(err: ProfileError) -> &'static str {
pub fn discard_reason(err: &ProfileError) -> &'static str {
match err {
ProfileError::CannotSerializePayload => "profiling_failed_serialization",
ProfileError::ExceedSizeLimit => "profiling_exceed_size_limit",
Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/envelope/item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,12 @@ impl Item {
(DataCategory::Span, item_count),
(DataCategory::SpanIndexed, item_count),
],
// NOTE: semantically wrong, but too expensive to parse.
ItemType::ProfileChunk => match self.profile_type() {
Some(ProfileType::Backend) => smallvec![(DataCategory::ProfileChunk, item_count)],
Some(ProfileType::Ui) => smallvec![(DataCategory::ProfileChunkUi, item_count)],
None => smallvec![],
// Profile chunks without platform/profile type are considered invalid,
// fallback to `profile chunk` to still get outcomes.
None => smallvec![(DataCategory::ProfileChunk, item_count)],
},
ItemType::Integration => match self.integration() {
Some(Integration::Logs(LogsIntegration::OtelV1 { .. })) => smallvec![
Expand Down
6 changes: 4 additions & 2 deletions relay-server/src/processing/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::processing::ForwardContext;
use crate::processing::StoreHandle;
use crate::processing::check_ins::CheckInsProcessor;
use crate::processing::logs::LogsProcessor;
use crate::processing::profile_chunks::ProfileChunksProcessor;
use crate::processing::sessions::SessionsProcessor;
use crate::processing::spans::SpansProcessor;
use crate::processing::trace_attachments::TraceAttachmentsProcessor;
Expand Down Expand Up @@ -57,8 +58,9 @@ macro_rules! outputs {
outputs!(
CheckIns => CheckInsProcessor,
Logs => LogsProcessor,
TraceMetrics => TraceMetricsProcessor,
Spans => SpansProcessor,
ProfileChunks => ProfileChunksProcessor,
Sessions => SessionsProcessor,
Spans => SpansProcessor,
TraceAttachments => TraceAttachmentsProcessor,
TraceMetrics => TraceMetricsProcessor,
);
8 changes: 8 additions & 0 deletions relay-server/src/processing/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ impl ForwardContext<'_> {
return Retention::from(*retention);
}

self.event_retention()
}

/// Returns the event [`Retention`].
///
/// This retention is also often used for older products and can be considered a default
/// retention for products which do not define their own retention.
pub fn event_retention(&self) -> Retention {
Retention::from(RetentionConfig {
standard: self
.project_info
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub use self::limits::*;

pub mod check_ins;
pub mod logs;
pub mod profile_chunks;
pub mod sessions;
pub mod spans;
pub mod trace_attachments;
Expand Down
22 changes: 22 additions & 0 deletions relay-server/src/processing/profile_chunks/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use relay_dynamic_config::Feature;

use crate::processing::Context;
use crate::processing::profile_chunks::{Error, Result};

/// Checks whether the profile ingestion feature flag is enabled for the current project.
pub fn feature_flag(ctx: Context<'_>) -> Result<()> {
let feature = match ctx
.project_info
.has_feature(Feature::ContinuousProfilingBetaIngest)
{
// Legacy feature.
true => Feature::ContinuousProfilingBeta,
// The post release ingestion feature.
false => Feature::ContinuousProfiling,
};

match ctx.should_filter(feature) {
true => Err(Error::FilterFeatureFlag),
false => Ok(()),
}
}
191 changes: 191 additions & 0 deletions relay-server/src/processing/profile_chunks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
use std::sync::Arc;

use relay_profiling::ProfileType;
use relay_quotas::{DataCategory, RateLimits};

use crate::Envelope;
use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items};
use crate::managed::{Counted, Managed, ManagedEnvelope, ManagedResult as _, Quantities, Rejected};
use crate::processing::{self, Context, CountRateLimited, Forward, Output, QuotaRateLimiter};
use crate::services::outcome::{DiscardReason, Outcome};
use smallvec::smallvec;

mod filter;
mod process;

pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Error raised in [`relay_profiling`].
#[error("Profiling Error: {0}")]
Profiling(#[from] relay_profiling::ProfileError),
/// The profile chunks are rate limited.
#[error("rate limited")]
RateLimited(RateLimits),
/// Profile chunks filtered because of a missing feature flag.
#[error("profile chunks feature flag missing")]
FilterFeatureFlag,
}

impl From<RateLimits> for Error {
fn from(value: RateLimits) -> Self {
Self::RateLimited(value)
}
}

impl crate::managed::OutcomeError for Error {
type Error = Self;

fn consume(self) -> (Option<Outcome>, Self::Error) {
let outcome = match &self {
Self::Profiling(relay_profiling::ProfileError::Filtered(f)) => {
Some(Outcome::Filtered(f.clone()))
}
Self::Profiling(err) => Some(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
))),

Self::RateLimited(limits) => {
let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
Some(Outcome::RateLimited(reason_code))
}
Self::FilterFeatureFlag => None,
};
(outcome, self)
}
}

/// A processor for profile chunks.
///
/// It processes items of type: [`ItemType::ProfileChunk`].
#[derive(Debug)]
pub struct ProfileChunksProcessor {
limiter: Arc<QuotaRateLimiter>,
}

impl ProfileChunksProcessor {
/// Creates a new [`Self`].
pub fn new(limiter: Arc<QuotaRateLimiter>) -> Self {
Self { limiter }
}
}

impl processing::Processor for ProfileChunksProcessor {
type UnitOfWork = SerializedProfileChunks;
type Output = ProfileChunkOutput;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
let profile_chunks = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::ProfileChunk))
.into_vec();

if profile_chunks.is_empty() {
return None;
}

Some(Managed::from_envelope(
envelope,
SerializedProfileChunks {
headers: envelope.envelope().headers().clone(),
profile_chunks,
},
))
}

async fn process(
&self,
mut profile_chunks: Managed<Self::UnitOfWork>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Error>> {
filter::feature_flag(ctx).reject(&profile_chunks)?;

process::process(&mut profile_chunks, ctx);

let profile_chunks = self.limiter.enforce_quotas(profile_chunks, ctx).await?;

Ok(Output::just(ProfileChunkOutput(profile_chunks)))
}
}

/// Output produced by [`ProfileChunksProcessor`].
#[derive(Debug)]
pub struct ProfileChunkOutput(Managed<SerializedProfileChunks>);

impl Forward for ProfileChunkOutput {
fn serialize_envelope(
self,
_: processing::ForwardContext<'_>,
) -> Result<Managed<Box<Envelope>>, Rejected<()>> {
let Self(profile_chunks) = self;
Ok(profile_chunks
.map(|pc, _| Envelope::from_parts(pc.headers, Items::from_vec(pc.profile_chunks))))
}

#[cfg(feature = "processing")]
fn forward_store(
self,
s: processing::forward::StoreHandle<'_>,
ctx: processing::ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
use crate::services::store::StoreProfileChunk;

let Self(profile_chunks) = self;
let retention_days = ctx.event_retention().standard;

for item in profile_chunks.split(|pc| pc.profile_chunks) {
s.store(item.map(|item, _| StoreProfileChunk {
retention_days,
payload: item.payload(),
quantities: item.quantities(),
}));
}

Ok(())
}
}

/// Serialized profile chunks extracted from an envelope.
#[derive(Debug)]
pub struct SerializedProfileChunks {
/// Original envelope headers.
pub headers: EnvelopeHeaders,
/// List of serialized profile chunk items.
pub profile_chunks: Vec<Item>,
}

impl Counted for SerializedProfileChunks {
fn quantities(&self) -> Quantities {
let mut ui = 0;
let mut backend = 0;

for pc in &self.profile_chunks {
match pc.profile_type() {
Some(ProfileType::Ui) => ui += 1,
Some(ProfileType::Backend) => backend += 1,
// These are invalid and will be dropped, but we default outcomes to the backend
// profile chunk category.
None => backend += 1,
}
}

let mut quantities = smallvec![];
if ui > 0 {
quantities.push((DataCategory::ProfileChunkUi, ui));
}
if backend > 0 {
quantities.push((DataCategory::ProfileChunk, backend));
}

quantities
}
}

impl CountRateLimited for Managed<SerializedProfileChunks> {
type Error = Error;
}
46 changes: 46 additions & 0 deletions relay-server/src/processing/profile_chunks/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::envelope::{ContentType, Item, ItemType};
use crate::processing::Context;
use crate::processing::Managed;
use crate::processing::profile_chunks::{Result, SerializedProfileChunks};

/// Processes profile chunks.
pub fn process(profile_chunks: &mut Managed<SerializedProfileChunks>, ctx: Context<'_>) {
// Only run this 'expensive' processing step in processing Relays.
if !ctx.is_processing() {
return;
}

let client_ip = profile_chunks.headers.meta().client_addr();
let filter_settings = &ctx.project_info.config.filter_settings;

profile_chunks.retain(
|pc| &mut pc.profile_chunks,
|item, _| -> Result<()> {
let pc = relay_profiling::ProfileChunk::new(item.payload())?;

// Profile chunks must carry the same profile type in the item header as well as the
// payload.
if item.profile_type().is_none_or(|pt| pt != pc.profile_type()) {
relay_log::debug!("dropping profile chunk due to profile type/platform mismatch");
return Err(relay_profiling::ProfileError::InvalidProfileType.into());
}

pc.filter(client_ip, filter_settings, ctx.global_config)?;

let expanded = pc.expand()?;
if expanded.len() > ctx.config.max_profile_size() {
relay_log::debug!("dropping profile chunk exceeding the size limit");
return Err(relay_profiling::ProfileError::ExceedSizeLimit.into());
}

*item = {
let mut item = Item::new(ItemType::ProfileChunk);
item.set_profile_type(pc.profile_type());
item.set_payload(ContentType::Json, expanded);
item
};

Ok(())
},
);
}
8 changes: 5 additions & 3 deletions relay-server/src/processing/transactions/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub fn filter(
Err(err) => {
record_keeper.reject_err(
Outcome::Invalid(DiscardReason::Profiling(relay_profiling::discard_reason(
err,
&err,
))),
work.profile.take(),
);
Expand Down Expand Up @@ -246,15 +246,17 @@ fn expand_profile(
Ok(id)
} else {
Err(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(relay_profiling::ProfileError::ExceedSizeLimit),
relay_profiling::discard_reason(
&relay_profiling::ProfileError::ExceedSizeLimit,
),
)))
}
}
Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
Err(Outcome::Filtered(filter_stat_key))
}
Err(err) => Err(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
relay_profiling::discard_reason(&err),
))),
}
}
Expand Down
Loading
Loading