From 88bc9de6f635b32c91d5d0c52b82e615d4d8884c Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 30 Jan 2026 10:07:58 -0800 Subject: [PATCH 1/4] feat(eap): Add tests for OCCURRENCE item type hourly event rate queries Add test coverage for querying EAP with OCCURRENCE item type to calculate hourly event rates. The tests validate: - Counting occurrences grouped by group_id (issue) - Calculating hourly rates using division formulas (count / WEEK_IN_HOURS) - Taking P95 of hourly_event_rate across issues - Conditional aggregations based on issue age (old vs new issues) The hourly event rate formula tested: - Old issues (>1 week): rate = event_count / WEEK_IN_HOURS - New issues (<1 week): rate = event_count / hours_since_first_seen Co-Authored-By: Claude Opus 4.5 --- .../test_occurrence_hourly_event_rate.py | 614 ++++++++++++++++++ 1 file changed, 614 insertions(+) create mode 100644 tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py new file mode 100644 index 0000000000..30ab79074e --- /dev/null +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py @@ -0,0 +1,614 @@ +""" +Tests for querying EAP with OCCURRENCE item type to calculate hourly event rates. + +This test validates the ability to: +1. Query OCCURRENCE items grouped by group_id (issue) +2. Calculate hourly event rates based on event counts and time windows +3. Take the 95th percentile of hourly_event_rates across all issues in a project + +The hourly event rate formula: +- For issues older than a week: hourly_event_rate = past_week_event_count / WEEK_IN_HOURS +- For issues newer than a week: hourly_event_rate = past_week_event_count / hours_since_first_seen +""" + +from datetime import datetime, timedelta, timezone +from typing import Any + +import pytest +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.attribute_conditional_aggregation_pb2 import ( + AttributeConditionalAggregation, +) +from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( + Column, + TraceItemTableRequest, +) +from sentry_protos.snuba.v1.formula_pb2 import Literal +from sentry_protos.snuba.v1.request_common_pb2 import ( + RequestMeta, + TraceItemType, +) +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + AttributeAggregation, + AttributeKey, + AttributeValue, + ExtrapolationMode, + Function, +) +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( + ComparisonFilter, + TraceItemFilter, +) +from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue + +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.web.rpc.v1.endpoint_trace_item_table import EndpointTraceItemTable +from tests.base import BaseApiTest +from tests.helpers import write_raw_unprocessed_events +from tests.web.rpc.v1.test_utils import gen_item_message + +# Constants for time calculations +WEEK_IN_HOURS = 168 # 7 days * 24 hours + + +# Base time for test data - 3 hours ago to ensure data is within query window +BASE_TIME = datetime.now(tz=timezone.utc).replace(minute=0, second=0, microsecond=0) - timedelta( + hours=3 +) + +START_TIMESTAMP = Timestamp(seconds=int((BASE_TIME - timedelta(days=14)).timestamp())) +END_TIMESTAMP = Timestamp(seconds=int((BASE_TIME + timedelta(hours=1)).timestamp())) + + +def _create_occurrence_items( + group_id: int, + first_seen: datetime, + event_count: int, + base_timestamp: datetime, + project_id: int = 1, +) -> list[bytes]: + """ + Create OCCURRENCE items for a specific group (issue). + + Args: + group_id: The issue/group identifier + first_seen: When the issue was first seen + event_count: Number of events to create for this issue + base_timestamp: Base timestamp for event creation + project_id: Project ID for the occurrences + + Returns: + List of serialized occurrence messages + """ + messages = [] + for i in range(event_count): + # Spread events across the time window + event_time = base_timestamp - timedelta(minutes=i * 5) + messages.append( + gen_item_message( + start_timestamp=event_time, + type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + attributes={ + "group_id": AnyValue(int_value=group_id), + "first_seen": AnyValue(double_value=first_seen.timestamp()), + "first_seen_hours_ago": AnyValue( + double_value=(base_timestamp - first_seen).total_seconds() / 3600 + ), + "sentry.group_id": AnyValue(int_value=group_id), + }, + project_id=project_id, + remove_default_attributes=True, + ) + ) + return messages + + +@pytest.fixture(autouse=False) +def setup_occurrence_data(clickhouse_db: None, redis_db: None) -> dict[str, Any]: + """ + Set up test data with OCCURRENCE items representing different issues. + + Creates issues with varying: + - Ages (some older than a week, some newer) + - Event counts (to get different hourly rates) + """ + items_storage = get_storage(StorageKey("eap_items")) + now = BASE_TIME + + # Define test issues with different characteristics + # Format: (group_id, first_seen_days_ago, event_count) + test_issues = [ + # Old issues (> 1 week old) - rate = event_count / WEEK_IN_HOURS + (1001, 14, 168), # 14 days old, 168 events -> 1.0 events/hour + (1002, 10, 336), # 10 days old, 336 events -> 2.0 events/hour + (1003, 8, 84), # 8 days old, 84 events -> 0.5 events/hour + (1004, 21, 504), # 21 days old, 504 events -> 3.0 events/hour + (1005, 30, 840), # 30 days old, 840 events -> 5.0 events/hour + # New issues (< 1 week old) - rate = event_count / hours_since_first_seen + (2001, 3, 72), # 3 days (72 hours) old, 72 events -> 1.0 events/hour + (2002, 1, 48), # 1 day (24 hours) old, 48 events -> 2.0 events/hour + (2003, 5, 60), # 5 days (120 hours) old, 60 events -> 0.5 events/hour + (2004, 2, 144), # 2 days (48 hours) old, 144 events -> 3.0 events/hour + (2005, 4, 480), # 4 days (96 hours) old, 480 events -> 5.0 events/hour + ] + + all_messages: list[bytes] = [] + expected_rates: dict[int, float] = {} + one_week_ago = now - timedelta(days=7) + + for group_id, days_ago, event_count in test_issues: + first_seen = now - timedelta(days=days_ago) + + # Calculate expected hourly rate based on the formula + if first_seen < one_week_ago: + # Old issue: rate = event_count / WEEK_IN_HOURS + hourly_rate = event_count / WEEK_IN_HOURS + else: + # New issue: rate = event_count / hours_since_first_seen + hours_since_first_seen = days_ago * 24 + hourly_rate = event_count / hours_since_first_seen + + expected_rates[group_id] = hourly_rate + + messages = _create_occurrence_items( + group_id=group_id, + first_seen=first_seen, + event_count=event_count, + base_timestamp=now, + ) + all_messages.extend(messages) + + write_raw_unprocessed_events(items_storage, all_messages) # type: ignore + + return { + "expected_rates": expected_rates, + "test_issues": test_issues, + "now": now, + "one_week_ago": one_week_ago, + } + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +class TestOccurrenceHourlyEventRate(BaseApiTest): + """Tests for calculating hourly event rates from OCCURRENCE items.""" + + def test_count_occurrences_by_group(self, setup_occurrence_data: dict[str, Any]) -> None: + """ + Test that we can count occurrences grouped by group_id. + + This is the foundation for calculating hourly event rates. + """ + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1], + organization_id=1, + cogs_category="test", + referrer="test.occurrence_hourly_rate", + start_timestamp=START_TIMESTAMP, + end_timestamp=END_TIMESTAMP, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + ), + columns=[ + Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")), + Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), + label="event_count", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + ], + group_by=[AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")], + order_by=[ + TraceItemTableRequest.OrderBy( + column=Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")) + ), + ], + limit=100, + ) + + response = EndpointTraceItemTable().execute(message) + + # Verify we got results for our test groups + assert len(response.column_values) == 2 + group_id_col = response.column_values[0] + count_col = response.column_values[1] + + assert group_id_col.attribute_name == "group_id" + assert count_col.attribute_name == "event_count" + + # We should have results for multiple groups + assert len(group_id_col.results) > 0 + + def test_hourly_rate_for_old_issues_with_division( + self, setup_occurrence_data: dict[str, Any] + ) -> None: + """ + Test calculating hourly event rate for old issues using formula division. + + For issues older than a week: + hourly_event_rate = event_count / WEEK_IN_HOURS + """ + # Filter for old issues (group_id starting with 100x) + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1], + organization_id=1, + cogs_category="test", + referrer="test.occurrence_hourly_rate", + start_timestamp=START_TIMESTAMP, + end_timestamp=END_TIMESTAMP, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + ), + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), + op=ComparisonFilter.OP_LESS_THAN, + value=AttributeValue(val_int=2000), + ) + ), + columns=[ + Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")), + # Count events per group + Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), + label="event_count", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + # Calculate hourly rate = count / WEEK_IN_HOURS using formula + Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_DIVIDE, + left=Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), + label="count_for_rate", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + right=Column( + literal=Literal(val_double=float(WEEK_IN_HOURS)), + ), + ), + label="hourly_rate", + ), + ], + group_by=[AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")], + order_by=[ + TraceItemTableRequest.OrderBy( + column=Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")) + ), + ], + limit=100, + ) + + response = EndpointTraceItemTable().execute(message) + + # Verify we got the hourly_rate column + assert len(response.column_values) == 3 + hourly_rate_col = None + for col in response.column_values: + if col.attribute_name == "hourly_rate": + hourly_rate_col = col + break + + assert hourly_rate_col is not None + assert len(hourly_rate_col.results) > 0 + + # Verify the calculated rates match expected values + expected_rates = setup_occurrence_data["expected_rates"] + group_id_col = response.column_values[0] + + for i, group_id_val in enumerate(group_id_col.results): + group_id = group_id_val.val_int + if group_id in expected_rates and group_id < 2000: + actual_rate = hourly_rate_col.results[i].val_double + expected_rate = expected_rates[group_id] + assert abs(actual_rate - expected_rate) < 0.01, ( + f"Group {group_id}: expected {expected_rate}, got {actual_rate}" + ) + + def test_p95_hourly_rate_using_precomputed_attribute( + self, setup_occurrence_data: dict[str, Any] + ) -> None: + """ + Test taking P95 of hourly event rates when the rate is stored as an attribute. + + This simulates a scenario where the hourly_event_rate is pre-computed + and stored with each occurrence, allowing us to calculate aggregate + percentiles directly. + """ + # First, create occurrences with pre-computed hourly_event_rate attribute + items_storage = get_storage(StorageKey("eap_items")) + now = BASE_TIME + + # Create occurrences with explicit hourly_event_rate values + # These represent different issues with known rates + rates_to_test = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0] + + messages = [] + for i, rate in enumerate(rates_to_test): + group_id = 9000 + i + # Create multiple occurrences for each group with the same rate + for j in range(10): + messages.append( + gen_item_message( + start_timestamp=now - timedelta(minutes=j), + type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + attributes={ + "group_id": AnyValue(int_value=group_id), + "hourly_event_rate": AnyValue(double_value=rate), + "sentry.group_id": AnyValue(int_value=group_id), + }, + project_id=1, + remove_default_attributes=True, + ) + ) + + write_raw_unprocessed_events(items_storage, messages) # type: ignore + + # Query P95 of hourly_event_rate grouped by project + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1], + organization_id=1, + cogs_category="test", + referrer="test.occurrence_hourly_rate_p95", + start_timestamp=START_TIMESTAMP, + end_timestamp=END_TIMESTAMP, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + ), + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), + op=ComparisonFilter.OP_GREATER_THAN_OR_EQUALS, + value=AttributeValue(val_int=9000), + ) + ), + columns=[ + Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_P95, + key=AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="hourly_event_rate"), + label="p95_hourly_rate", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + ], + limit=10, + ) + + response = EndpointTraceItemTable().execute(message) + + # Verify we got the P95 result + assert len(response.column_values) == 1 + p95_col = response.column_values[0] + assert p95_col.attribute_name == "p95_hourly_rate" + assert len(p95_col.results) == 1 + + # P95 of [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0] should be around 4.75 + # (since we have 10 values, P95 is approximately the 9.5th value) + p95_value = p95_col.results[0].val_double + assert 4.0 <= p95_value <= 5.0, f"Expected P95 around 4.5-5.0, got {p95_value}" + + def test_conditional_hourly_rate_using_two_aggregations( + self, setup_occurrence_data: dict[str, Any] + ) -> None: + """ + Test calculating conditional hourly rates using conditional aggregations. + + This approach uses two separate aggregations: + 1. One for old issues (first_seen_hours_ago >= 168) + 2. One for new issues (first_seen_hours_ago < 168) + + The conditional aggregation allows filtering within the aggregate function. + """ + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1], + organization_id=1, + cogs_category="test", + referrer="test.occurrence_conditional_rate", + start_timestamp=START_TIMESTAMP, + end_timestamp=END_TIMESTAMP, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + ), + columns=[ + Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")), + # Count for old issues (first_seen >= 168 hours ago) + Column( + conditional_aggregation=AttributeConditionalAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), + label="old_issue_count", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, + name="first_seen_hours_ago", + ), + op=ComparisonFilter.OP_GREATER_THAN_OR_EQUALS, + value=AttributeValue(val_double=float(WEEK_IN_HOURS)), + ) + ), + ), + ), + # Count for new issues (first_seen < 168 hours ago) + Column( + conditional_aggregation=AttributeConditionalAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), + label="new_issue_count", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, + name="first_seen_hours_ago", + ), + op=ComparisonFilter.OP_LESS_THAN, + value=AttributeValue(val_double=float(WEEK_IN_HOURS)), + ) + ), + ), + ), + # Get hours since first seen for rate calculation + Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_MAX, + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, name="first_seen_hours_ago" + ), + label="hours_age", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + ], + group_by=[AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")], + order_by=[ + TraceItemTableRequest.OrderBy( + column=Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")) + ), + ], + limit=100, + ) + + response = EndpointTraceItemTable().execute(message) + + # Verify we got results with conditional counts + assert len(response.column_values) >= 3 + + # Find the columns by name + old_count_col = None + new_count_col = None + for col in response.column_values: + if col.attribute_name == "old_issue_count": + old_count_col = col + elif col.attribute_name == "new_issue_count": + new_count_col = col + + assert old_count_col is not None or new_count_col is not None + + def test_p95_of_grouped_hourly_rates(self, setup_occurrence_data: dict[str, Any]) -> None: + """ + Test calculating P95 of hourly rates computed per group. + + This is a two-step approach: + 1. First calculate hourly rates per group using division formula + 2. Then take P95 across all groups + + Note: In practice, this would require a subquery or post-processing, + but we can test the P95 aggregation on pre-computed rates. + """ + # Create new test data with pre-computed rates for P95 calculation + items_storage = get_storage(StorageKey("eap_items")) + now = BASE_TIME + + # Create 20 issues with varying hourly rates for P95 calculation + hourly_rates = [ + 0.1, + 0.2, + 0.5, + 0.8, + 1.0, + 1.2, + 1.5, + 1.8, + 2.0, + 2.5, + 3.0, + 3.5, + 4.0, + 4.5, + 5.0, + 6.0, + 7.0, + 8.0, + 9.0, + 10.0, + ] + + messages = [] + for i, rate in enumerate(hourly_rates): + group_id = 8000 + i + messages.append( + gen_item_message( + start_timestamp=now - timedelta(minutes=i), + type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + attributes={ + "group_id": AnyValue(int_value=group_id), + "computed_hourly_rate": AnyValue(double_value=rate), + }, + project_id=1, + remove_default_attributes=True, + ) + ) + + write_raw_unprocessed_events(items_storage, messages) # type: ignore + + # Query P95 of the computed hourly rates + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1], + organization_id=1, + cogs_category="test", + referrer="test.occurrence_p95_grouped_rates", + start_timestamp=START_TIMESTAMP, + end_timestamp=END_TIMESTAMP, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + ), + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), + op=ComparisonFilter.OP_GREATER_THAN_OR_EQUALS, + value=AttributeValue(val_int=8000), + ) + ), + columns=[ + Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_P95, + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, name="computed_hourly_rate" + ), + label="p95_rate", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), + label="total_count", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + ], + limit=10, + ) + + response = EndpointTraceItemTable().execute(message) + + # Verify P95 calculation + assert len(response.column_values) == 2 + + p95_col = None + for col in response.column_values: + if col.attribute_name == "p95_rate": + p95_col = col + break + + assert p95_col is not None + assert len(p95_col.results) == 1 + + # P95 of 20 values should be around the 19th value (9.0) + p95_value = p95_col.results[0].val_double + assert 8.0 <= p95_value <= 10.0, f"Expected P95 around 9.0, got {p95_value}" From 23cbabecf3ccd1ca7f1f8e26287cb3d04eeb0be7 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 30 Jan 2026 10:21:48 -0800 Subject: [PATCH 2/4] fix(eap): Refactor tests to accurately reflect supported vs unsupported query patterns Updated tests to properly categorize: SUPPORTED: - Count and min(timestamp) per group - Division by static literal (count / WEEK_IN_HOURS) - P95 of pre-computed attribute values UNSUPPORTED (xfail): - Division by nested formula containing aggregate (returns null) - Conditional expression with aggregate in condition (no 'if' operator) - P95 of calculated aggregate formula across groups (nested aggregation) Co-Authored-By: Claude Opus 4.5 --- .../test_occurrence_hourly_event_rate.py | 778 +++++++++++------- 1 file changed, 463 insertions(+), 315 deletions(-) diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py index 30ab79074e..ae66dc5b43 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py @@ -1,14 +1,23 @@ """ Tests for querying EAP with OCCURRENCE item type to calculate hourly event rates. -This test validates the ability to: -1. Query OCCURRENCE items grouped by group_id (issue) -2. Calculate hourly event rates based on event counts and time windows -3. Take the 95th percentile of hourly_event_rates across all issues in a project - -The hourly event rate formula: -- For issues older than a week: hourly_event_rate = past_week_event_count / WEEK_IN_HOURS -- For issues newer than a week: hourly_event_rate = past_week_event_count / hours_since_first_seen +This test module validates both currently supported and unsupported query patterns +for calculating hourly event rates from OCCURRENCE items. + +The desired hourly event rate calculation: +1. Group by group_id +2. countIf(timestamp > one_week_ago) to get past_week_event_count per group +3. min(timestamp) to find first_seen per group +4. Conditional calculation: + - if(first_seen < one_week_ago): hourly_rate = past_week_event_count / WEEK_IN_HOURS + - if(first_seen > one_week_ago): hourly_rate = past_week_event_count / dateDiff(hours, first_seen, now) +5. p95(hourly_rate) across all issues + +Two levels of nesting that are currently NOT supported: +1. Conditional expression where the condition includes an aggregate + (e.g., if(min(timestamp) < X, value1, value2)) +2. Quantile on a calculated/computed aggregate + (e.g., p95(count / hours) where count is an aggregate) """ from datetime import datetime, timedelta, timezone @@ -16,9 +25,6 @@ import pytest from google.protobuf.timestamp_pb2 import Timestamp -from sentry_protos.snuba.v1.attribute_conditional_aggregation_pb2 import ( - AttributeConditionalAggregation, -) from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( Column, TraceItemTableRequest, @@ -50,6 +56,7 @@ # Constants for time calculations WEEK_IN_HOURS = 168 # 7 days * 24 hours +WEEK_IN_SECONDS = WEEK_IN_HOURS * 3600 # Base time for test data - 3 hours ago to ensure data is within query window @@ -61,40 +68,24 @@ END_TIMESTAMP = Timestamp(seconds=int((BASE_TIME + timedelta(hours=1)).timestamp())) -def _create_occurrence_items( +def _create_occurrence_items_for_group( group_id: int, - first_seen: datetime, - event_count: int, - base_timestamp: datetime, + timestamps: list[datetime], project_id: int = 1, ) -> list[bytes]: """ - Create OCCURRENCE items for a specific group (issue). - - Args: - group_id: The issue/group identifier - first_seen: When the issue was first seen - event_count: Number of events to create for this issue - base_timestamp: Base timestamp for event creation - project_id: Project ID for the occurrences + Create OCCURRENCE items for a specific group (issue) at given timestamps. - Returns: - List of serialized occurrence messages + This simulates real occurrence data where each occurrence has a timestamp, + and the first_seen is derived from min(timestamp) during query time. """ messages = [] - for i in range(event_count): - # Spread events across the time window - event_time = base_timestamp - timedelta(minutes=i * 5) + for ts in timestamps: messages.append( gen_item_message( - start_timestamp=event_time, + start_timestamp=ts, type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, attributes={ - "group_id": AnyValue(int_value=group_id), - "first_seen": AnyValue(double_value=first_seen.timestamp()), - "first_seen_hours_ago": AnyValue( - double_value=(base_timestamp - first_seen).total_seconds() / 3600 - ), "sentry.group_id": AnyValue(int_value=group_id), }, project_id=project_id, @@ -109,54 +100,86 @@ def setup_occurrence_data(clickhouse_db: None, redis_db: None) -> dict[str, Any] """ Set up test data with OCCURRENCE items representing different issues. - Creates issues with varying: - - Ages (some older than a week, some newer) - - Event counts (to get different hourly rates) + Creates issues with: + - Varying first_seen times (some older than a week, some newer) + - Events spread over the past week for counting """ items_storage = get_storage(StorageKey("eap_items")) now = BASE_TIME + one_week_ago = now - timedelta(days=7) - # Define test issues with different characteristics - # Format: (group_id, first_seen_days_ago, event_count) - test_issues = [ - # Old issues (> 1 week old) - rate = event_count / WEEK_IN_HOURS - (1001, 14, 168), # 14 days old, 168 events -> 1.0 events/hour - (1002, 10, 336), # 10 days old, 336 events -> 2.0 events/hour - (1003, 8, 84), # 8 days old, 84 events -> 0.5 events/hour - (1004, 21, 504), # 21 days old, 504 events -> 3.0 events/hour - (1005, 30, 840), # 30 days old, 840 events -> 5.0 events/hour - # New issues (< 1 week old) - rate = event_count / hours_since_first_seen - (2001, 3, 72), # 3 days (72 hours) old, 72 events -> 1.0 events/hour - (2002, 1, 48), # 1 day (24 hours) old, 48 events -> 2.0 events/hour - (2003, 5, 60), # 5 days (120 hours) old, 60 events -> 0.5 events/hour - (2004, 2, 144), # 2 days (48 hours) old, 144 events -> 3.0 events/hour - (2005, 4, 480), # 4 days (96 hours) old, 480 events -> 5.0 events/hour + # Test issues configuration: + # (group_id, first_seen, events_in_past_week, expected_hourly_rate) + # + # For old issues (first_seen < one_week_ago): + # hourly_rate = events_in_past_week / WEEK_IN_HOURS + # + # For new issues (first_seen >= one_week_ago): + # hourly_rate = events_in_past_week / hours_since_first_seen + + test_issues: list[dict[str, Any]] = [ + # Old issues (first_seen > 1 week ago) + { + "group_id": 1001, + "first_seen": now - timedelta(days=14), + "events_in_past_week": 168, # -> 1.0 events/hour + }, + { + "group_id": 1002, + "first_seen": now - timedelta(days=10), + "events_in_past_week": 336, # -> 2.0 events/hour + }, + { + "group_id": 1003, + "first_seen": now - timedelta(days=21), + "events_in_past_week": 84, # -> 0.5 events/hour + }, + # New issues (first_seen < 1 week ago) + { + "group_id": 2001, + "first_seen": now - timedelta(days=3), # 72 hours ago + "events_in_past_week": 72, # -> 1.0 events/hour + }, + { + "group_id": 2002, + "first_seen": now - timedelta(days=1), # 24 hours ago + "events_in_past_week": 48, # -> 2.0 events/hour + }, + { + "group_id": 2003, + "first_seen": now - timedelta(days=5), # 120 hours ago + "events_in_past_week": 60, # -> 0.5 events/hour + }, ] all_messages: list[bytes] = [] expected_rates: dict[int, float] = {} - one_week_ago = now - timedelta(days=7) - for group_id, days_ago, event_count in test_issues: - first_seen = now - timedelta(days=days_ago) + for issue in test_issues: + group_id = issue["group_id"] + first_seen = issue["first_seen"] + event_count = issue["events_in_past_week"] - # Calculate expected hourly rate based on the formula + # Calculate expected hourly rate if first_seen < one_week_ago: - # Old issue: rate = event_count / WEEK_IN_HOURS hourly_rate = event_count / WEEK_IN_HOURS else: - # New issue: rate = event_count / hours_since_first_seen - hours_since_first_seen = days_ago * 24 + hours_since_first_seen = (now - first_seen).total_seconds() / 3600 hourly_rate = event_count / hours_since_first_seen expected_rates[group_id] = hourly_rate - messages = _create_occurrence_items( - group_id=group_id, - first_seen=first_seen, - event_count=event_count, - base_timestamp=now, - ) + # Create timestamps for events: + # - One at first_seen (to establish min timestamp) + # - Rest spread over the past week + timestamps = [first_seen] + for i in range(event_count - 1): + # Spread events over the past week + event_time = now - timedelta(hours=i % WEEK_IN_HOURS) + if event_time > first_seen: + timestamps.append(event_time) + + messages = _create_occurrence_items_for_group(group_id, timestamps[:event_count]) all_messages.extend(messages) write_raw_unprocessed_events(items_storage, all_messages) # type: ignore @@ -171,14 +194,21 @@ def setup_occurrence_data(clickhouse_db: None, redis_db: None) -> dict[str, Any] @pytest.mark.clickhouse_db @pytest.mark.redis_db -class TestOccurrenceHourlyEventRate(BaseApiTest): - """Tests for calculating hourly event rates from OCCURRENCE items.""" +class TestOccurrenceHourlyEventRateSupported(BaseApiTest): + """ + Tests for CURRENTLY SUPPORTED query patterns. + + These tests demonstrate what CAN be done today with the EAP query system. + """ - def test_count_occurrences_by_group(self, setup_occurrence_data: dict[str, Any]) -> None: + def test_count_and_min_timestamp_per_group(self, setup_occurrence_data: dict[str, Any]) -> None: """ - Test that we can count occurrences grouped by group_id. + Test that we can: + 1. Group by sentry.group_id + 2. Count events per group + 3. Find first_seen via min(sentry.timestamp_precise) per group - This is the foundation for calculating hourly event rates. + This is the foundation - getting aggregates per group works. """ message = TraceItemTableRequest( meta=RequestMeta( @@ -191,20 +221,34 @@ def test_count_occurrences_by_group(self, setup_occurrence_data: dict[str, Any]) trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, ), columns=[ - Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")), + Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id")), + # Total count per group Column( aggregation=AttributeAggregation( aggregate=Function.FUNCTION_COUNT, - key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), - label="event_count", + key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id"), + label="total_count", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + # First seen = min(timestamp_precise) per group + Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_MIN, + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, name="sentry.timestamp_precise" + ), + label="first_seen", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, ), ), ], - group_by=[AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")], + group_by=[AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id")], order_by=[ TraceItemTableRequest.OrderBy( - column=Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")) + column=Column( + key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id") + ) ), ], limit=100, @@ -212,27 +256,24 @@ def test_count_occurrences_by_group(self, setup_occurrence_data: dict[str, Any]) response = EndpointTraceItemTable().execute(message) - # Verify we got results for our test groups - assert len(response.column_values) == 2 - group_id_col = response.column_values[0] - count_col = response.column_values[1] - - assert group_id_col.attribute_name == "group_id" - assert count_col.attribute_name == "event_count" + # Verify we got the expected columns + assert len(response.column_values) == 3 + column_names = [col.attribute_name for col in response.column_values] + assert "sentry.group_id" in column_names + assert "total_count" in column_names + assert "first_seen" in column_names - # We should have results for multiple groups - assert len(group_id_col.results) > 0 + # Verify we have results for our test groups + group_col = next(c for c in response.column_values if c.attribute_name == "sentry.group_id") + assert len(group_col.results) > 0 - def test_hourly_rate_for_old_issues_with_division( - self, setup_occurrence_data: dict[str, Any] - ) -> None: + def test_hourly_rate_with_static_divisor(self, setup_occurrence_data: dict[str, Any]) -> None: """ - Test calculating hourly event rate for old issues using formula division. + Test calculating hourly rate with a STATIC divisor (WEEK_IN_HOURS). - For issues older than a week: - hourly_event_rate = event_count / WEEK_IN_HOURS + This works because the divisor is a literal, not an aggregate. + Formula: count / WEEK_IN_HOURS """ - # Filter for old issues (group_id starting with 100x) message = TraceItemTableRequest( meta=RequestMeta( project_ids=[1], @@ -243,33 +284,19 @@ def test_hourly_rate_for_old_issues_with_division( end_timestamp=END_TIMESTAMP, trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, ), - filter=TraceItemFilter( - comparison_filter=ComparisonFilter( - key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), - op=ComparisonFilter.OP_LESS_THAN, - value=AttributeValue(val_int=2000), - ) - ), columns=[ - Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")), - # Count events per group - Column( - aggregation=AttributeAggregation( - aggregate=Function.FUNCTION_COUNT, - key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), - label="event_count", - extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, - ), - ), - # Calculate hourly rate = count / WEEK_IN_HOURS using formula + Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id")), + # hourly_rate = count / WEEK_IN_HOURS (static divisor) Column( formula=Column.BinaryFormula( op=Column.BinaryFormula.OP_DIVIDE, left=Column( aggregation=AttributeAggregation( aggregate=Function.FUNCTION_COUNT, - key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), - label="count_for_rate", + key=AttributeKey( + type=AttributeKey.TYPE_INT, name="sentry.group_id" + ), + label="count", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, ), ), @@ -277,13 +304,15 @@ def test_hourly_rate_for_old_issues_with_division( literal=Literal(val_double=float(WEEK_IN_HOURS)), ), ), - label="hourly_rate", + label="hourly_rate_static", ), ], - group_by=[AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")], + group_by=[AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id")], order_by=[ TraceItemTableRequest.OrderBy( - column=Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")) + column=Column( + key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id") + ) ), ], limit=100, @@ -291,83 +320,63 @@ def test_hourly_rate_for_old_issues_with_division( response = EndpointTraceItemTable().execute(message) - # Verify we got the hourly_rate column - assert len(response.column_values) == 3 - hourly_rate_col = None - for col in response.column_values: - if col.attribute_name == "hourly_rate": - hourly_rate_col = col - break + # Verify we got results + assert len(response.column_values) == 2 + rate_col = next( + (c for c in response.column_values if c.attribute_name == "hourly_rate_static"), None + ) + assert rate_col is not None + assert len(rate_col.results) > 0 - assert hourly_rate_col is not None - assert len(hourly_rate_col.results) > 0 + # Verify rates are positive + for result in rate_col.results: + assert result.val_double > 0 - # Verify the calculated rates match expected values - expected_rates = setup_occurrence_data["expected_rates"] - group_id_col = response.column_values[0] - - for i, group_id_val in enumerate(group_id_col.results): - group_id = group_id_val.val_int - if group_id in expected_rates and group_id < 2000: - actual_rate = hourly_rate_col.results[i].val_double - expected_rate = expected_rates[group_id] - assert abs(actual_rate - expected_rate) < 0.01, ( - f"Group {group_id}: expected {expected_rate}, got {actual_rate}" - ) - - def test_p95_hourly_rate_using_precomputed_attribute( - self, setup_occurrence_data: dict[str, Any] - ) -> None: + def test_p95_of_precomputed_attribute(self, setup_occurrence_data: dict[str, Any]) -> None: """ - Test taking P95 of hourly event rates when the rate is stored as an attribute. + Test P95 of a pre-computed attribute (not a calculated aggregate). - This simulates a scenario where the hourly_event_rate is pre-computed - and stored with each occurrence, allowing us to calculate aggregate - percentiles directly. + This works because the P95 is computed over raw attribute values, + not over a computed formula. """ - # First, create occurrences with pre-computed hourly_event_rate attribute + # Create test data with pre-computed hourly rates as attributes items_storage = get_storage(StorageKey("eap_items")) now = BASE_TIME - # Create occurrences with explicit hourly_event_rate values - # These represent different issues with known rates - rates_to_test = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0] - + # Create occurrences with explicit hourly_rate values + rates = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0] messages = [] - for i, rate in enumerate(rates_to_test): + for i, rate in enumerate(rates): group_id = 9000 + i - # Create multiple occurrences for each group with the same rate - for j in range(10): - messages.append( - gen_item_message( - start_timestamp=now - timedelta(minutes=j), - type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, - attributes={ - "group_id": AnyValue(int_value=group_id), - "hourly_event_rate": AnyValue(double_value=rate), - "sentry.group_id": AnyValue(int_value=group_id), - }, - project_id=1, - remove_default_attributes=True, - ) + messages.append( + gen_item_message( + start_timestamp=now - timedelta(minutes=i), + type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + attributes={ + "sentry.group_id": AnyValue(int_value=group_id), + "precomputed_hourly_rate": AnyValue(double_value=rate), + }, + project_id=1, + remove_default_attributes=True, ) + ) write_raw_unprocessed_events(items_storage, messages) # type: ignore - # Query P95 of hourly_event_rate grouped by project + # Query P95 of the pre-computed rate message = TraceItemTableRequest( meta=RequestMeta( project_ids=[1], organization_id=1, cogs_category="test", - referrer="test.occurrence_hourly_rate_p95", + referrer="test.p95_precomputed", start_timestamp=START_TIMESTAMP, end_timestamp=END_TIMESTAMP, trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, ), filter=TraceItemFilter( comparison_filter=ComparisonFilter( - key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), + key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id"), op=ComparisonFilter.OP_GREATER_THAN_OR_EQUALS, value=AttributeValue(val_int=9000), ) @@ -376,8 +385,10 @@ def test_p95_hourly_rate_using_precomputed_attribute( Column( aggregation=AttributeAggregation( aggregate=Function.FUNCTION_P95, - key=AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="hourly_event_rate"), - label="p95_hourly_rate", + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, name="precomputed_hourly_rate" + ), + label="p95_rate", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, ), ), @@ -387,228 +398,365 @@ def test_p95_hourly_rate_using_precomputed_attribute( response = EndpointTraceItemTable().execute(message) - # Verify we got the P95 result assert len(response.column_values) == 1 p95_col = response.column_values[0] - assert p95_col.attribute_name == "p95_hourly_rate" - assert len(p95_col.results) == 1 - - # P95 of [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0] should be around 4.75 - # (since we have 10 values, P95 is approximately the 9.5th value) + assert p95_col.attribute_name == "p95_rate" + # P95 of [0.5..5.0] should be around 4.5-5.0 p95_value = p95_col.results[0].val_double - assert 4.0 <= p95_value <= 5.0, f"Expected P95 around 4.5-5.0, got {p95_value}" + assert 4.0 <= p95_value <= 5.5 + - def test_conditional_hourly_rate_using_two_aggregations( +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +class TestOccurrenceHourlyEventRateUnsupported(BaseApiTest): + """ + Tests for CURRENTLY UNSUPPORTED query patterns. + + These tests document the desired query structure for features that + are not yet implemented. They are marked xfail to indicate the + expected failure until the features are built. + + Key unsupported features: + 1. Division by a dynamically computed value that uses an aggregate + (e.g., count / ((now - min(timestamp)) / 3600)) + 2. Conditional expression where the condition includes an aggregate + (e.g., if(min(timestamp) < X, value1, value2)) + 3. Quantile (p95) on a calculated/computed aggregate formula + (e.g., p95 across group-level computed rates) + """ + + @pytest.mark.xfail(reason="Division by nested formula containing aggregate returns null") + def test_hourly_rate_with_dynamic_divisor_using_aggregate( self, setup_occurrence_data: dict[str, Any] ) -> None: """ - Test calculating conditional hourly rates using conditional aggregations. + Test calculating hourly rate where the divisor uses an aggregate. + + DESIRED BEHAVIOR: + hourly_rate = count / ((now - min(timestamp_precise)) / 3600) - This approach uses two separate aggregations: - 1. One for old issues (first_seen_hours_ago >= 168) - 2. One for new issues (first_seen_hours_ago < 168) + This requires dividing by a dynamically computed value that + involves an aggregate (min). - The conditional aggregation allows filtering within the aggregate function. + Currently NOT fully supported because: + - The nested formula with aggregate in the divisor returns null + - Query executes but produces null results """ + now_ts = BASE_TIME.timestamp() + message = TraceItemTableRequest( meta=RequestMeta( project_ids=[1], organization_id=1, cogs_category="test", - referrer="test.occurrence_conditional_rate", + referrer="test.dynamic_divisor", start_timestamp=START_TIMESTAMP, end_timestamp=END_TIMESTAMP, trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, ), columns=[ - Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")), - # Count for old issues (first_seen >= 168 hours ago) + Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id")), + # hourly_rate = count / hours_since_first_seen + # where hours_since_first_seen = (now - min(timestamp)) / 3600 Column( - conditional_aggregation=AttributeConditionalAggregation( - aggregate=Function.FUNCTION_COUNT, - key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), - label="old_issue_count", - extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, - filter=TraceItemFilter( - comparison_filter=ComparisonFilter( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_DIVIDE, + left=Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, key=AttributeKey( - type=AttributeKey.TYPE_DOUBLE, - name="first_seen_hours_ago", + type=AttributeKey.TYPE_INT, name="sentry.group_id" ), - op=ComparisonFilter.OP_GREATER_THAN_OR_EQUALS, - value=AttributeValue(val_double=float(WEEK_IN_HOURS)), - ) + label="event_count", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + right=Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_DIVIDE, + left=Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_SUBTRACT, + left=Column(literal=Literal(val_double=now_ts)), + right=Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_MIN, + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, + name="sentry.timestamp_precise", + ), + label="first_seen_ts", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + ), + ), + right=Column(literal=Literal(val_double=3600.0)), + ), ), ), + label="hourly_rate_dynamic", ), - # Count for new issues (first_seen < 168 hours ago) + ], + group_by=[AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id")], + order_by=[ + TraceItemTableRequest.OrderBy( + column=Column( + key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id") + ) + ), + ], + limit=100, + ) + + response = EndpointTraceItemTable().execute(message) + + # Verify we got results + assert len(response.column_values) == 2 + rate_col = next( + (c for c in response.column_values if c.attribute_name == "hourly_rate_dynamic"), None + ) + assert rate_col is not None + assert len(rate_col.results) > 0 + + # Verify rates are computed (not null) and positive + # This assertion fails because nested formula with aggregate returns null + for result in rate_col.results: + assert not result.is_null, "Expected non-null hourly rate" + assert result.val_double > 0, f"Expected positive rate, got {result.val_double}" + + @pytest.mark.xfail( + reason="Conditional expression with aggregate in condition not yet supported - " + "no 'if' function in Column.BinaryFormula to express: " + "if(min(ts) < week_ago, count/168, count/hours_since_first_seen)" + ) + def test_conditional_rate_based_on_aggregate_first_seen( + self, setup_occurrence_data: dict[str, Any] + ) -> None: + """ + Test conditional hourly rate where the condition uses an aggregate. + + DESIRED BEHAVIOR: + - if(min(timestamp) < one_week_ago): + hourly_rate = count / WEEK_IN_HOURS + - else: + hourly_rate = count / dateDiff('hour', min(timestamp), now()) + + This requires the conditional expression to accept an aggregate + (min(timestamp)) as part of its condition evaluation. + + Currently NOT supported because: + - There is no 'if' or conditional operator in Column.BinaryFormula + - Only OP_ADD, OP_SUBTRACT, OP_MULTIPLY, OP_DIVIDE are available + - Cannot express: if(aggregate < value, result1, result2) + """ + expected_rates = setup_occurrence_data["expected_rates"] + one_week_ago_ts = setup_occurrence_data["one_week_ago"].timestamp() + now_ts = setup_occurrence_data["now"].timestamp() + + # This would be the desired query if 'if' was supported: + # Column( + # formula=Column.ConditionalFormula( # hypothetical + # condition=Column.BinaryFormula( + # op=Column.BinaryFormula.OP_LESS_THAN, # hypothetical + # left=Column(aggregation=min(timestamp)), + # right=Column(literal=one_week_ago_ts), + # ), + # if_true=Column(formula=count / WEEK_IN_HOURS), + # if_false=Column(formula=count / hours_since_first_seen), + # ), + # ) + + # Since we can't express this, the test fails + # For now, we just demonstrate we CAN compute both rates separately + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1], + organization_id=1, + cogs_category="test", + referrer="test.conditional_aggregate", + start_timestamp=START_TIMESTAMP, + end_timestamp=END_TIMESTAMP, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + ), + columns=[ + Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id")), + # Rate assuming old issue (count / WEEK_IN_HOURS) Column( - conditional_aggregation=AttributeConditionalAggregation( - aggregate=Function.FUNCTION_COUNT, - key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), - label="new_issue_count", - extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, - filter=TraceItemFilter( - comparison_filter=ComparisonFilter( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_DIVIDE, + left=Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, key=AttributeKey( - type=AttributeKey.TYPE_DOUBLE, - name="first_seen_hours_ago", + type=AttributeKey.TYPE_INT, name="sentry.group_id" ), - op=ComparisonFilter.OP_LESS_THAN, - value=AttributeValue(val_double=float(WEEK_IN_HOURS)), - ) + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + right=Column(literal=Literal(val_double=float(WEEK_IN_HOURS))), + ), + label="rate_if_old", + ), + # Rate assuming new issue (count / hours_since_first_seen) + Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_DIVIDE, + left=Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey( + type=AttributeKey.TYPE_INT, name="sentry.group_id" + ), + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + right=Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_DIVIDE, + left=Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_SUBTRACT, + left=Column(literal=Literal(val_double=now_ts)), + right=Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_MIN, + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, + name="sentry.timestamp_precise", + ), + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + ), + ), + right=Column(literal=Literal(val_double=3600.0)), + ), ), ), + label="rate_if_new", ), - # Get hours since first seen for rate calculation + # First seen for reference Column( aggregation=AttributeAggregation( - aggregate=Function.FUNCTION_MAX, + aggregate=Function.FUNCTION_MIN, key=AttributeKey( - type=AttributeKey.TYPE_DOUBLE, name="first_seen_hours_ago" + type=AttributeKey.TYPE_DOUBLE, name="sentry.timestamp_precise" ), - label="hours_age", + label="first_seen", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, ), ), ], - group_by=[AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")], - order_by=[ - TraceItemTableRequest.OrderBy( - column=Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id")) - ), - ], + group_by=[AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id")], limit=100, ) response = EndpointTraceItemTable().execute(message) - # Verify we got results with conditional counts - assert len(response.column_values) >= 3 - - # Find the columns by name - old_count_col = None - new_count_col = None - for col in response.column_values: - if col.attribute_name == "old_issue_count": - old_count_col = col - elif col.attribute_name == "new_issue_count": - new_count_col = col - - assert old_count_col is not None or new_count_col is not None + # The test "fails" because we cannot select the correct rate based on + # whether first_seen < one_week_ago in a single query expression + # We would need post-processing or an 'if' function + group_col = next(c for c in response.column_values if c.attribute_name == "sentry.group_id") + first_seen_col = next(c for c in response.column_values if c.attribute_name == "first_seen") + rate_old_col = next(c for c in response.column_values if c.attribute_name == "rate_if_old") + rate_new_col = next(c for c in response.column_values if c.attribute_name == "rate_if_new") + + # Verify we can determine which rate to use for each group + # (but we can't do this selection IN the query itself) + for i, group_val in enumerate(group_col.results): + group_id = group_val.val_int + if group_id in expected_rates: + first_seen = first_seen_col.results[i].val_double + expected = expected_rates[group_id] + + # Determine which rate should be used + if first_seen < one_week_ago_ts: + actual = rate_old_col.results[i].val_double + else: + actual = rate_new_col.results[i].val_double + + # This assertion will fail because we're asserting that we + # CAN do this selection - but we can only do it client-side + assert abs(actual - expected) < 0.1, ( + f"Group {group_id}: expected {expected}, got {actual}" + ) - def test_p95_of_grouped_hourly_rates(self, setup_occurrence_data: dict[str, Any]) -> None: + @pytest.mark.xfail( + reason="P95 on calculated aggregate formula not yet supported - " + "cannot compute p95(count/divisor) where the p95 is across groups" + ) + def test_p95_of_calculated_hourly_rate_across_groups( + self, setup_occurrence_data: dict[str, Any] + ) -> None: """ - Test calculating P95 of hourly rates computed per group. + Test P95 of a calculated hourly rate across all groups. - This is a two-step approach: - 1. First calculate hourly rates per group using division formula - 2. Then take P95 across all groups + DESIRED BEHAVIOR: + 1. For each group: compute hourly_rate = count / hours + 2. Take p95 of all those hourly_rate values across groups - Note: In practice, this would require a subquery or post-processing, - but we can test the P95 aggregation on pre-computed rates. - """ - # Create new test data with pre-computed rates for P95 calculation - items_storage = get_storage(StorageKey("eap_items")) - now = BASE_TIME + This requires a nested aggregation: + - Inner: GROUP BY group_id, compute rate per group + - Outer: p95 across all those rates - # Create 20 issues with varying hourly rates for P95 calculation - hourly_rates = [ - 0.1, - 0.2, - 0.5, - 0.8, - 1.0, - 1.2, - 1.5, - 1.8, - 2.0, - 2.5, - 3.0, - 3.5, - 4.0, - 4.5, - 5.0, - 6.0, - 7.0, - 8.0, - 9.0, - 10.0, - ] - - messages = [] - for i, rate in enumerate(hourly_rates): - group_id = 8000 + i - messages.append( - gen_item_message( - start_timestamp=now - timedelta(minutes=i), - type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, - attributes={ - "group_id": AnyValue(int_value=group_id), - "computed_hourly_rate": AnyValue(double_value=rate), - }, - project_id=1, - remove_default_attributes=True, - ) - ) + Currently NOT supported because: + - Cannot express p95 over a formula result + - Would need subquery or window function capability + """ + # The desired query would be something like: + # SELECT p95(hourly_rate) FROM ( + # SELECT group_id, count(*)/168 as hourly_rate + # FROM occurrences + # GROUP BY group_id + # ) - write_raw_unprocessed_events(items_storage, messages) # type: ignore + # We cannot express this in a single TraceItemTableRequest + # The test demonstrates the limitation - # Query P95 of the computed hourly rates message = TraceItemTableRequest( meta=RequestMeta( project_ids=[1], organization_id=1, cogs_category="test", - referrer="test.occurrence_p95_grouped_rates", + referrer="test.p95_calculated", start_timestamp=START_TIMESTAMP, end_timestamp=END_TIMESTAMP, trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, ), - filter=TraceItemFilter( - comparison_filter=ComparisonFilter( - key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), - op=ComparisonFilter.OP_GREATER_THAN_OR_EQUALS, - value=AttributeValue(val_int=8000), - ) - ), columns=[ + # We want p95 of (count / WEEK_IN_HOURS) across groups + # But p95 here would be computed per-row, not per-group-then-across Column( aggregation=AttributeAggregation( aggregate=Function.FUNCTION_P95, + # This would need to reference a computed formula, not an attribute key=AttributeKey( - type=AttributeKey.TYPE_DOUBLE, name="computed_hourly_rate" + type=AttributeKey.TYPE_DOUBLE, + name="sentry.group_id", # Placeholder - no way to reference formula ), - label="p95_rate", - extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, - ), - ), - Column( - aggregation=AttributeAggregation( - aggregate=Function.FUNCTION_COUNT, - key=AttributeKey(type=AttributeKey.TYPE_INT, name="group_id"), - label="total_count", + label="p95_hourly_rate", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, ), ), ], + # No group_by - want project-wide result limit=10, ) response = EndpointTraceItemTable().execute(message) - # Verify P95 calculation - assert len(response.column_values) == 2 - - p95_col = None - for col in response.column_values: - if col.attribute_name == "p95_rate": - p95_col = col - break + # This will not give us what we want - p95 of group-level rates + # It will give us p95 of the group_id values themselves + assert len(response.column_values) == 1 - assert p95_col is not None - assert len(p95_col.results) == 1 + # The real test: verify we got a meaningful p95 of hourly rates + # This will fail because we can't express this query + expected_rates = list(setup_occurrence_data["expected_rates"].values()) + expected_p95 = sorted(expected_rates)[int(len(expected_rates) * 0.95)] - # P95 of 20 values should be around the 19th value (9.0) - p95_value = p95_col.results[0].val_double - assert 8.0 <= p95_value <= 10.0, f"Expected P95 around 9.0, got {p95_value}" + actual_p95 = response.column_values[0].results[0].val_double + # This assertion documents what we WANT - it will fail + assert abs(actual_p95 - expected_p95) < 0.5, ( + f"Expected p95 around {expected_p95}, got {actual_p95}" + ) From 2ac2d8e8ba78842e819e81318cb789f70f1da909 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 30 Jan 2026 13:37:10 -0800 Subject: [PATCH 3/4] test(eap): Add test for countIf with aggregate in condition Adds test_countif_with_aggregate_in_condition to demonstrate the core limitation: we cannot use an aggregate result (e.g., min(timestamp) as first_seen) in a conditional aggregation filter. What WORKS: - countIf(timestamp > X) - filtering individual rows What DOESN'T WORK: - countIf(min(timestamp) > X) - condition uses an aggregate This is marked xfail as AttributeConditionalAggregation.filter can only compare row-level attributes to literals, not aggregate results. Co-Authored-By: Claude Opus 4.5 --- .../test_occurrence_hourly_event_rate.py | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py index ae66dc5b43..433f8d3508 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py @@ -525,6 +525,126 @@ def test_hourly_rate_with_dynamic_divisor_using_aggregate( assert not result.is_null, "Expected non-null hourly rate" assert result.val_double > 0, f"Expected positive rate, got {result.val_double}" + @pytest.mark.xfail( + reason="Cannot use aggregate result (first_seen) in conditional aggregation filter" + ) + def test_countif_with_aggregate_in_condition( + self, setup_occurrence_data: dict[str, Any] + ) -> None: + """ + Test using an aggregated first_seen value in a countIf condition. + + DESIRED BEHAVIOR: + 1. Compute first_seen = min(timestamp) per group + 2. Use that first_seen in a condition: countIf(first_seen > one_week_ago) + + This is the core limitation - we want to filter based on an aggregate result. + + What WORKS: + - countIf(timestamp > X) - filtering individual rows by their timestamp + + What DOESN'T WORK: + - countIf(min(timestamp) > X) - condition uses an aggregate + + Currently NOT supported because: + - AttributeConditionalAggregation.filter only accepts TraceItemFilter + - TraceItemFilter compares attributes against literal values + - Cannot reference another aggregate's result in the filter condition + """ + one_week_ago_ts = setup_occurrence_data["one_week_ago"].timestamp() + + # This is what we WANT to express but cannot: + # + # SELECT + # group_id, + # min(timestamp) as first_seen, + # CASE + # WHEN min(timestamp) < one_week_ago THEN count(*) / 168 + # ELSE count(*) / hours_since_first_seen + # END as hourly_rate + # FROM occurrences + # GROUP BY group_id + # + # Or alternatively with countIf: + # countIf(first_seen > one_week_ago) where first_seen is min(timestamp) + + # The limitation: we cannot use an aggregate (min) in the filter of + # another conditional aggregation. The filter can only compare + # row-level attributes against literals, not aggregate results. + + message = TraceItemTableRequest( + meta=RequestMeta( + project_ids=[1], + organization_id=1, + cogs_category="test", + referrer="test.countif_with_aggregate", + start_timestamp=START_TIMESTAMP, + end_timestamp=END_TIMESTAMP, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + ), + columns=[ + Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id")), + # First seen = min(timestamp) + Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_MIN, + key=AttributeKey( + type=AttributeKey.TYPE_DOUBLE, name="sentry.timestamp_precise" + ), + label="first_seen", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + # Total count + Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id"), + label="total_count", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ), + # We WANT: countIf(first_seen > one_week_ago) but we cannot express this + # because first_seen is an aggregate, not a row attribute + # + # The filter below filters ROWS, not the aggregate result + # This doesn't give us what we want + ], + group_by=[AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id")], + limit=100, + ) + + response = EndpointTraceItemTable().execute(message) + + # Verify we have results + assert len(response.column_values) >= 2 + + # The test documents that we CANNOT conditionally aggregate based on + # an aggregate result. We would need to: + # 1. Run a query to get first_seen per group + # 2. Use those results to construct a second query with the appropriate filters + # OR have native support for CASE WHEN with aggregates in conditions + + # This assertion will fail because we're testing an unsupported feature + group_col = next(c for c in response.column_values if c.attribute_name == "sentry.group_id") + first_seen_col = next(c for c in response.column_values if c.attribute_name == "first_seen") + + # We want to verify that groups with first_seen > one_week_ago are counted differently + # But we cannot express this in a single query + for i, group_val in enumerate(group_col.results): + first_seen = first_seen_col.results[i].val_double + # This check demonstrates what we'd WANT to do in the query itself + is_new_issue = first_seen > one_week_ago_ts + # We need conditional behavior based on is_new_issue, but cannot express it + assert is_new_issue is not None # Placeholder - real test would verify conditional rate + + # Force failure to document the limitation + raise AssertionError( + "Cannot use aggregate result (first_seen = min(timestamp)) in countIf condition. " + "AttributeConditionalAggregation.filter can only compare row attributes to literals, " + "not aggregate results." + ) + @pytest.mark.xfail( reason="Conditional expression with aggregate in condition not yet supported - " "no 'if' function in Column.BinaryFormula to express: " From 7fba41fbd07468e2befd3f627522154a2ae11511 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 30 Jan 2026 15:12:05 -0800 Subject: [PATCH 4/4] feat(eap): Add NULL-safe division and ConditionalFormula support for formulas Phase 1 - NULL-safe division: - Modified _formula_to_expression() to use if(isNull(right) OR right = 0, default, left / right) when default_value is specified, protecting against NULL/zero divisors - This fixes hourly rate calculations where divisor uses aggregates like min(timestamp) Phase 2 - ConditionalFormula support (code ready, waiting for sentry-protos PR #173): - Added _has_proto_field() helper to safely check for proto fields that may not exist yet - Added COMPARISON_OP_TO_EXPR mapping for conditional formula comparison operators - Added _conditional_formula_to_expression() using existing if_cond() DSL - Updated _column_to_expression() to handle conditional_formula field - Updated _get_reliability_context_columns() for conditional formulas - Updated ColumnWrapper.accept() in proto_visitor.py to traverse conditional formula children Tests: - test_hourly_rate_with_dynamic_divisor_using_aggregate now passes - test_conditional_rate_based_on_aggregate_first_seen updated to use ConditionalFormula proto structure (skips until proto is available) Co-Authored-By: Claude Opus 4.5 --- pyproject.toml | 3 + snuba/web/rpc/proto_visitor.py | 32 +++ .../R_eap_items/resolver_trace_item_table.py | 181 ++++++++++++++- .../test_endpoint_trace_item_table.py | 5 +- .../test_occurrence_hourly_event_rate.py | 211 ++++++++---------- 5 files changed, 303 insertions(+), 129 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e5ce5be013..86ce427f70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,9 @@ dependencies = [ [tool.uv.sources] rust_snuba = { workspace = true } +# TODO: Once https://github.com/getsentry/sentry-protos/pull/173 is merged and released, +# the ConditionalFormula feature will be automatically enabled. +# sentry-protos = { git = "https://github.com/getsentry/sentry-protos", branch = "feat/add-conditional-formula-to-column", subdirectory = "py" } [tool.uv.workspace] members = ["rust_snuba"] diff --git a/snuba/web/rpc/proto_visitor.py b/snuba/web/rpc/proto_visitor.py index 09014f01e6..02df90c633 100644 --- a/snuba/web/rpc/proto_visitor.py +++ b/snuba/web/rpc/proto_visitor.py @@ -24,6 +24,21 @@ Tin = TypeVar("Tin", bound=ProtobufMessage) +def _has_proto_field(proto: ProtobufMessage, field_name: str) -> bool: + """ + Safely check if a proto message has a field set. + + This handles the case where the field doesn't exist in the proto definition yet + (e.g., when preparing for a new proto field that will be added later). + HasField() throws ValueError if the field doesn't exist, so we catch that. + """ + try: + return proto.HasField(field_name) + except ValueError: + # Field doesn't exist in the proto definition + return False + + class ProtoWrapper(Generic[Tin], ABC): def __init__(self, underlying_proto: Tin): self.underlying_proto = underlying_proto @@ -40,6 +55,23 @@ def accept(self, visitor: ProtoVisitor) -> None: if column.HasField("formula"): ColumnWrapper(column.formula.left).accept(visitor) ColumnWrapper(column.formula.right).accept(visitor) + # Handle ConditionalFormula when the proto is available + # ConditionalFormula has: condition (with left/right), match, default + # We use _has_proto_field to safely check if the field exists in the proto + if _has_proto_field(column, "conditional_formula"): + conditional = column.conditional_formula # type: ignore[attr-defined] + # Visit condition's left and right columns + if _has_proto_field(conditional, "condition"): + if _has_proto_field(conditional.condition, "left"): + ColumnWrapper(conditional.condition.left).accept(visitor) + if _has_proto_field(conditional.condition, "right"): + ColumnWrapper(conditional.condition.right).accept(visitor) + # Visit match and default columns + # Note: 'match' is a Python keyword, so use getattr + if _has_proto_field(conditional, "match"): + ColumnWrapper(getattr(conditional, "match")).accept(visitor) + if _has_proto_field(conditional, "default"): + ColumnWrapper(conditional.default).accept(visitor) class AggregationComparisonFilterWrapper(ProtoWrapper[AggregationComparisonFilter]): diff --git a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py index a38938d69b..ff9a2ec2cb 100644 --- a/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py +++ b/snuba/web/rpc/v1/resolvers/R_eap_items/resolver_trace_item_table.py @@ -1,7 +1,7 @@ import uuid from dataclasses import replace from itertools import islice -from typing import List, Optional, Sequence +from typing import Any, Callable, List, Optional, Sequence import sentry_sdk from google.protobuf.json_format import MessageToDict @@ -35,9 +35,14 @@ from snuba.query import OrderBy, OrderByDirection, SelectedExpression from snuba.query.data_source.simple import Entity from snuba.query.dsl import Functions as f -from snuba.query.dsl import and_cond, in_cond, literal, literals_array, or_cond +from snuba.query.dsl import and_cond, if_cond, in_cond, literal, literals_array, or_cond from snuba.query.dsl import column as snuba_column -from snuba.query.expressions import DangerousRawSQL, Expression, SubscriptableReference +from snuba.query.expressions import ( + DangerousRawSQL, + Expression, + FunctionCall, + SubscriptableReference, +) from snuba.query.logical import Query from snuba.query.query_settings import HTTPQuerySettings from snuba.request import Request as SnubaRequest @@ -76,6 +81,22 @@ _DEFAULT_ROW_LIMIT = 10_000 + +def _has_proto_field(proto: Any, field_name: str) -> bool: + """ + Safely check if a proto message has a field set. + + This handles the case where the field doesn't exist in the proto definition yet + (e.g., when preparing for a new proto field that will be added later). + HasField() throws ValueError if the field doesn't exist, so we catch that. + """ + try: + return bool(proto.HasField(field_name)) + except ValueError: + # Field doesn't exist in the proto definition + return False + + OP_TO_EXPR = { Column.BinaryFormula.OP_ADD: f.plus, Column.BinaryFormula.OP_SUBTRACT: f.minus, @@ -83,6 +104,28 @@ Column.BinaryFormula.OP_DIVIDE: f.divide, } +# Comparison operators for ConditionalFormula conditions +# These will be populated when the proto is available +# Maps FormulaCondition.Op enum values to expression functions +COMPARISON_OP_TO_EXPR: dict[int, Callable[..., FunctionCall]] = {} + +# Try to populate comparison operators if the proto is available +# This allows the code to work both before and after the proto is added +if hasattr(Column, "FormulaCondition"): + FormulaCondition = Column.FormulaCondition + if hasattr(FormulaCondition, "OP_LESS_THAN"): + COMPARISON_OP_TO_EXPR[FormulaCondition.OP_LESS_THAN] = f.less + if hasattr(FormulaCondition, "OP_GREATER_THAN"): + COMPARISON_OP_TO_EXPR[FormulaCondition.OP_GREATER_THAN] = f.greater + if hasattr(FormulaCondition, "OP_LESS_THAN_OR_EQUALS"): + COMPARISON_OP_TO_EXPR[FormulaCondition.OP_LESS_THAN_OR_EQUALS] = f.lessOrEquals + if hasattr(FormulaCondition, "OP_GREATER_THAN_OR_EQUALS"): + COMPARISON_OP_TO_EXPR[FormulaCondition.OP_GREATER_THAN_OR_EQUALS] = f.greaterOrEquals + if hasattr(FormulaCondition, "OP_EQUALS"): + COMPARISON_OP_TO_EXPR[FormulaCondition.OP_EQUALS] = f.equals + if hasattr(FormulaCondition, "OP_NOT_EQUALS"): + COMPARISON_OP_TO_EXPR[FormulaCondition.OP_NOT_EQUALS] = f.notEquals + def _apply_virtual_columns( query: Query, virtual_column_contexts: Sequence[VirtualColumnContext] @@ -310,6 +353,35 @@ def _get_reliability_context_columns( return context_cols + if _has_proto_field(column, "conditional_formula"): + # For conditional formulas, extract context columns from all parts: + # condition (left/right), match, and default + context_cols = [] + conditional = column.conditional_formula # type: ignore[attr-defined] + + # Extract from condition's left and right + if _has_proto_field(conditional, "condition"): + for col in [conditional.condition.left, conditional.condition.right]: + if col.HasField("conditional_aggregation") or col.HasField("formula"): + context_cols.extend(_get_reliability_context_columns(col, request_meta)) + + # Extract from match and default branches + # Note: 'match' is a Python keyword, so use getattr + match_col = getattr(conditional, "match") + default_col = conditional.default + for col in [match_col, default_col]: + if not col.HasField("formula") and not _has_proto_field(col, "conditional_formula"): + if col.label: + context_cols.append( + SelectedExpression( + name=col.label, + expression=_column_to_expression(col, request_meta), + ) + ) + context_cols.extend(_get_reliability_context_columns(col, request_meta)) + + return context_cols + if not (column.HasField("conditional_aggregation")): return [] @@ -352,22 +424,97 @@ def _get_reliability_context_columns( def _formula_to_expression(formula: Column.BinaryFormula, request_meta: RequestMeta) -> Expression: - formula_expr = OP_TO_EXPR[formula.op]( - _column_to_expression(formula.left, request_meta), - _column_to_expression(formula.right, request_meta), - ) + left_expr = _column_to_expression(formula.left, request_meta) + right_expr = _column_to_expression(formula.right, request_meta) + + # Get the default value if specified + default_value: float | int | None = None match formula.WhichOneof("default_value"): - case None: - return formula_expr case "default_value_double": - return f.coalesce(formula_expr, formula.default_value_double) + default_value = formula.default_value_double case "default_value_int64": - return f.coalesce(formula_expr, formula.default_value_int64) + default_value = formula.default_value_int64 + case None: + pass case default: raise BadSnubaRPCRequestException( f"Unknown default_value in formula. Expected default_value_double or default_value_int64 but got {default}" ) + # For division operations with a default value, protect against NULL/zero divisors + # This handles cases like count / ((now - min(timestamp)) / 3600) where the divisor + # may be NULL (if min(timestamp) is NULL) or zero + if formula.op == Column.BinaryFormula.OP_DIVIDE and default_value is not None: + # if(right IS NULL OR right = 0, default, left / right) + return if_cond( + or_cond(f.isNull(right_expr), f.equals(right_expr, literal(0))), + literal(default_value), + f.divide(left_expr, right_expr), + ) + + formula_expr = OP_TO_EXPR[formula.op](left_expr, right_expr) + + if default_value is not None: + return f.coalesce(formula_expr, default_value) + + return formula_expr + + +def _conditional_formula_to_expression( + conditional_formula: Any, + request_meta: RequestMeta, # Column.ConditionalFormula when proto is available +) -> Expression: + """ + Converts a ConditionalFormula proto to a ClickHouse if() expression. + + ConditionalFormula allows expressing: if(condition, match_value, default_value) + where the condition can compare aggregates (e.g., if(min(ts) < X, rate1, rate2)). + + This enables use cases like: + - if(min(timestamp) < one_week_ago, count/WEEK_IN_HOURS, count/hours_since_first_seen) + + NOTE: This function requires the ConditionalFormula proto message to be defined in + sentry-protos. Until then, this code path won't be executed (HasField will return False). + + Proto structure: + message ConditionalFormula { + FormulaCondition condition = 1; + Column match = 2; // value when condition is true + Column default = 3; // value when condition is false + } + + message FormulaCondition { + Column left = 1; + Op op = 2; + Column right = 3; + enum Op { + OP_UNSPECIFIED = 0; + OP_LESS_THAN = 1; + OP_GREATER_THAN = 2; + OP_LESS_THAN_OR_EQUALS = 3; + OP_GREATER_THAN_OR_EQUALS = 4; + OP_EQUALS = 5; + OP_NOT_EQUALS = 6; + } + } + """ + condition = conditional_formula.condition + left_expr = _column_to_expression(condition.left, request_meta) + right_expr = _column_to_expression(condition.right, request_meta) + + if condition.op not in COMPARISON_OP_TO_EXPR: + raise BadSnubaRPCRequestException( + f"Unsupported comparison operator in ConditionalFormula: {condition.op}" + ) + + comparison_expr = COMPARISON_OP_TO_EXPR[condition.op](left_expr, right_expr) + # 'match' is the value when condition is true, 'default' is when false + # Note: 'match' is a Python keyword in 3.10+, but protobuf accesses it as an attribute + match_expr = _column_to_expression(getattr(conditional_formula, "match"), request_meta) + default_expr = _column_to_expression(conditional_formula.default, request_meta) + + return if_cond(comparison_expr, match_expr, default_expr) + def _column_to_expression(column: Column, request_meta: RequestMeta) -> Expression: """ @@ -388,11 +535,21 @@ def _column_to_expression(column: Column, request_meta: RequestMeta) -> Expressi formula_expr = _formula_to_expression(column.formula, request_meta) formula_expr = replace(formula_expr, alias=column.label) return formula_expr + elif _has_proto_field(column, "conditional_formula"): + # ConditionalFormula allows if(condition, if_true, if_false) expressions + # where the condition can compare aggregates + # NOTE: This requires the ConditionalFormula proto message in sentry-protos + conditional_expr = _conditional_formula_to_expression( + column.conditional_formula, # type: ignore[attr-defined] + request_meta, + ) + conditional_expr = replace(conditional_expr, alias=column.label) + return conditional_expr elif column.HasField("literal"): return literal(column.literal.val_double) else: raise BadSnubaRPCRequestException( - "Column is not one of: aggregate, attribute key, or formula" + "Column is not one of: aggregate, attribute key, formula, or conditional_formula" ) diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py index eb44569c2e..a6c6e9d076 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py @@ -1,7 +1,6 @@ import random import re from datetime import datetime, timedelta -from math import inf from typing import Any from unittest.mock import MagicMock, call, patch @@ -3302,13 +3301,15 @@ def test_formula_default(self) -> None: ], ) response = EndpointTraceItemTable().execute(message) + # With NULL-safe division, both NULL denominator and division by zero return the default (0.0) + # Results ordered ascending: 0.0 (null denom), 0.0 (div by zero), 5.0 (10/2) assert response.column_values == [ TraceItemColumnValues( attribute_name="myformula", results=[ + AttributeValue(val_double=0.0), AttributeValue(val_double=0.0), AttributeValue(val_double=5), - AttributeValue(val_double=inf), ], ), ] diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py index 433f8d3508..8cef71e586 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_occurrence_hourly_event_rate.py @@ -206,7 +206,7 @@ def test_count_and_min_timestamp_per_group(self, setup_occurrence_data: dict[str Test that we can: 1. Group by sentry.group_id 2. Count events per group - 3. Find first_seen via min(sentry.timestamp_precise) per group + 3. Find first_seen via min(sentry.start_timestamp_precise) per group This is the foundation - getting aggregates per group works. """ @@ -236,7 +236,7 @@ def test_count_and_min_timestamp_per_group(self, setup_occurrence_data: dict[str aggregation=AttributeAggregation( aggregate=Function.FUNCTION_MIN, key=AttributeKey( - type=AttributeKey.TYPE_DOUBLE, name="sentry.timestamp_precise" + type=AttributeKey.TYPE_DOUBLE, name="sentry.start_timestamp_precise" ), label="first_seen", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, @@ -425,7 +425,6 @@ class TestOccurrenceHourlyEventRateUnsupported(BaseApiTest): (e.g., p95 across group-level computed rates) """ - @pytest.mark.xfail(reason="Division by nested formula containing aggregate returns null") def test_hourly_rate_with_dynamic_divisor_using_aggregate( self, setup_occurrence_data: dict[str, Any] ) -> None: @@ -438,9 +437,8 @@ def test_hourly_rate_with_dynamic_divisor_using_aggregate( This requires dividing by a dynamically computed value that involves an aggregate (min). - Currently NOT fully supported because: - - The nested formula with aggregate in the divisor returns null - - Query executes but produces null results + The formula uses default_value_double=0.0 to handle NULL/zero divisor cases + safely. When the divisor is NULL or zero, the result will be the default value. """ now_ts = BASE_TIME.timestamp() @@ -458,6 +456,7 @@ def test_hourly_rate_with_dynamic_divisor_using_aggregate( Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id")), # hourly_rate = count / hours_since_first_seen # where hours_since_first_seen = (now - min(timestamp)) / 3600 + # default_value_double=0.0 protects against NULL/zero divisors Column( formula=Column.BinaryFormula( op=Column.BinaryFormula.OP_DIVIDE, @@ -483,7 +482,7 @@ def test_hourly_rate_with_dynamic_divisor_using_aggregate( aggregate=Function.FUNCTION_MIN, key=AttributeKey( type=AttributeKey.TYPE_DOUBLE, - name="sentry.timestamp_precise", + name="sentry.start_timestamp_precise", ), label="first_seen_ts", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, @@ -494,6 +493,7 @@ def test_hourly_rate_with_dynamic_divisor_using_aggregate( right=Column(literal=Literal(val_double=3600.0)), ), ), + default_value_double=0.0, ), label="hourly_rate_dynamic", ), @@ -520,7 +520,6 @@ def test_hourly_rate_with_dynamic_divisor_using_aggregate( assert len(rate_col.results) > 0 # Verify rates are computed (not null) and positive - # This assertion fails because nested formula with aggregate returns null for result in rate_col.results: assert not result.is_null, "Expected non-null hourly rate" assert result.val_double > 0, f"Expected positive rate, got {result.val_double}" @@ -589,7 +588,7 @@ def test_countif_with_aggregate_in_condition( aggregation=AttributeAggregation( aggregate=Function.FUNCTION_MIN, key=AttributeKey( - type=AttributeKey.TYPE_DOUBLE, name="sentry.timestamp_precise" + type=AttributeKey.TYPE_DOUBLE, name="sentry.start_timestamp_precise" ), label="first_seen", extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, @@ -645,11 +644,6 @@ def test_countif_with_aggregate_in_condition( "not aggregate results." ) - @pytest.mark.xfail( - reason="Conditional expression with aggregate in condition not yet supported - " - "no 'if' function in Column.BinaryFormula to express: " - "if(min(ts) < week_ago, count/168, count/hours_since_first_seen)" - ) def test_conditional_rate_based_on_aggregate_first_seen( self, setup_occurrence_data: dict[str, Any] ) -> None: @@ -662,33 +656,83 @@ def test_conditional_rate_based_on_aggregate_first_seen( - else: hourly_rate = count / dateDiff('hour', min(timestamp), now()) - This requires the conditional expression to accept an aggregate - (min(timestamp)) as part of its condition evaluation. + This uses the ConditionalFormula proto message to express: + if(condition, match, default) where condition can compare aggregates. - Currently NOT supported because: - - There is no 'if' or conditional operator in Column.BinaryFormula - - Only OP_ADD, OP_SUBTRACT, OP_MULTIPLY, OP_DIVIDE are available - - Cannot express: if(aggregate < value, result1, result2) + NOTE: This test requires the ConditionalFormula proto from sentry-protos PR #173. + It will skip if the proto field is not available yet. """ + # Check if ConditionalFormula is available in the proto + if not hasattr(Column, "ConditionalFormula"): + pytest.skip( + "ConditionalFormula proto not yet available (waiting for sentry-protos PR #173)" + ) + expected_rates = setup_occurrence_data["expected_rates"] one_week_ago_ts = setup_occurrence_data["one_week_ago"].timestamp() now_ts = setup_occurrence_data["now"].timestamp() - # This would be the desired query if 'if' was supported: - # Column( - # formula=Column.ConditionalFormula( # hypothetical - # condition=Column.BinaryFormula( - # op=Column.BinaryFormula.OP_LESS_THAN, # hypothetical - # left=Column(aggregation=min(timestamp)), - # right=Column(literal=one_week_ago_ts), - # ), - # if_true=Column(formula=count / WEEK_IN_HOURS), - # if_false=Column(formula=count / hours_since_first_seen), - # ), - # ) + # Build the count aggregation (reused in both match and default branches) + count_agg = AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, + key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id"), + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ) + + # Build min(timestamp) aggregation (reused in condition and default branch) + min_ts_agg = AttributeAggregation( + aggregate=Function.FUNCTION_MIN, + key=AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="sentry.start_timestamp_precise"), + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ) + + # hours_since_first_seen = (now - min(timestamp)) / 3600 + hours_since_first_seen = Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_DIVIDE, + left=Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_SUBTRACT, + left=Column(literal=Literal(val_double=now_ts)), + right=Column(aggregation=min_ts_agg), + ), + ), + right=Column(literal=Literal(val_double=3600.0)), + ), + ) + + # Build the ConditionalFormula: + # if(min(timestamp) < one_week_ago, count/168, count/hours_since_first_seen) + FormulaCondition = Column.FormulaCondition # type: ignore[attr-defined] + conditional_formula = Column.ConditionalFormula( + condition=FormulaCondition( + left=Column(aggregation=min_ts_agg), + op=FormulaCondition.OP_LESS_THAN, + right=Column(literal=Literal(val_double=one_week_ago_ts)), + ), + # Note: 'match' is what to return when condition is TRUE + # When first_seen < one_week_ago (old issue), use count / WEEK_IN_HOURS + **{ + "match": Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_DIVIDE, + left=Column(aggregation=count_agg), + right=Column(literal=Literal(val_double=float(WEEK_IN_HOURS))), + ), + ), + }, + # 'default' is what to return when condition is FALSE + # When first_seen >= one_week_ago (new issue), use count / hours_since_first_seen + default=Column( + formula=Column.BinaryFormula( + op=Column.BinaryFormula.OP_DIVIDE, + left=Column(aggregation=count_agg), + right=hours_since_first_seen, + default_value_double=0.0, # Handle division by zero + ), + ), + ) - # Since we can't express this, the test fails - # For now, we just demonstrate we CAN compute both rates separately message = TraceItemTableRequest( meta=RequestMeta( project_ids=[1], @@ -701,71 +745,15 @@ def test_conditional_rate_based_on_aggregate_first_seen( ), columns=[ Column(key=AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id")), - # Rate assuming old issue (count / WEEK_IN_HOURS) - Column( - formula=Column.BinaryFormula( - op=Column.BinaryFormula.OP_DIVIDE, - left=Column( - aggregation=AttributeAggregation( - aggregate=Function.FUNCTION_COUNT, - key=AttributeKey( - type=AttributeKey.TYPE_INT, name="sentry.group_id" - ), - extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, - ), - ), - right=Column(literal=Literal(val_double=float(WEEK_IN_HOURS))), - ), - label="rate_if_old", - ), - # Rate assuming new issue (count / hours_since_first_seen) + # Conditional hourly rate using ConditionalFormula Column( - formula=Column.BinaryFormula( - op=Column.BinaryFormula.OP_DIVIDE, - left=Column( - aggregation=AttributeAggregation( - aggregate=Function.FUNCTION_COUNT, - key=AttributeKey( - type=AttributeKey.TYPE_INT, name="sentry.group_id" - ), - extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, - ), - ), - right=Column( - formula=Column.BinaryFormula( - op=Column.BinaryFormula.OP_DIVIDE, - left=Column( - formula=Column.BinaryFormula( - op=Column.BinaryFormula.OP_SUBTRACT, - left=Column(literal=Literal(val_double=now_ts)), - right=Column( - aggregation=AttributeAggregation( - aggregate=Function.FUNCTION_MIN, - key=AttributeKey( - type=AttributeKey.TYPE_DOUBLE, - name="sentry.timestamp_precise", - ), - extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, - ), - ), - ), - ), - right=Column(literal=Literal(val_double=3600.0)), - ), - ), - ), - label="rate_if_new", + conditional_formula=conditional_formula, # type: ignore[call-arg] + label="hourly_rate", ), - # First seen for reference + # First seen for verification Column( - aggregation=AttributeAggregation( - aggregate=Function.FUNCTION_MIN, - key=AttributeKey( - type=AttributeKey.TYPE_DOUBLE, name="sentry.timestamp_precise" - ), - label="first_seen", - extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, - ), + aggregation=min_ts_agg, + label="first_seen", ), ], group_by=[AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.group_id")], @@ -774,32 +762,25 @@ def test_conditional_rate_based_on_aggregate_first_seen( response = EndpointTraceItemTable().execute(message) - # The test "fails" because we cannot select the correct rate based on - # whether first_seen < one_week_ago in a single query expression - # We would need post-processing or an 'if' function + # Verify results group_col = next(c for c in response.column_values if c.attribute_name == "sentry.group_id") + rate_col = next(c for c in response.column_values if c.attribute_name == "hourly_rate") first_seen_col = next(c for c in response.column_values if c.attribute_name == "first_seen") - rate_old_col = next(c for c in response.column_values if c.attribute_name == "rate_if_old") - rate_new_col = next(c for c in response.column_values if c.attribute_name == "rate_if_new") - # Verify we can determine which rate to use for each group - # (but we can't do this selection IN the query itself) + assert len(group_col.results) > 0, "Expected results for test groups" + + # Verify the conditional rate calculation for i, group_val in enumerate(group_col.results): group_id = group_val.val_int if group_id in expected_rates: first_seen = first_seen_col.results[i].val_double - expected = expected_rates[group_id] - - # Determine which rate should be used - if first_seen < one_week_ago_ts: - actual = rate_old_col.results[i].val_double - else: - actual = rate_new_col.results[i].val_double - - # This assertion will fail because we're asserting that we - # CAN do this selection - but we can only do it client-side - assert abs(actual - expected) < 0.1, ( - f"Group {group_id}: expected {expected}, got {actual}" + actual_rate = rate_col.results[i].val_double + expected_rate = expected_rates[group_id] + + # Verify the rate matches expected + assert abs(actual_rate - expected_rate) < 0.1, ( + f"Group {group_id}: expected rate {expected_rate}, got {actual_rate}. " + f"first_seen={first_seen}, one_week_ago={one_week_ago_ts}" ) @pytest.mark.xfail(