Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions snuba/web/rpc/storage_routing/routing_strategies/load_based.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import sentry_sdk

from snuba.configs.configuration import Configuration
from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import (
BaseRoutingStrategy,
RoutingDecision,
)


class LoadBasedRoutingStrategy(BaseRoutingStrategy):
"""
If cluster load is under a threshold, ignore recommendations and allow the query to pass through. This routing strategy does not decide tiering
"""

def _additional_config_definitions(self) -> list[Configuration]:
return [
Configuration(
name="pass_through_load_percentage",
description="If cluster load is below this percentage, allow the query to run regardless of allocation policies",
value_type=int,
default=10,
),
Configuration(
name="pass_through_max_threads",
description="Max threads to use when allowing the query to pass through under low load",
value_type=int,
default=10,
),
]

def _update_routing_decision(
self,
routing_decision: RoutingDecision,
) -> None:
load_info = routing_decision.routing_context.cluster_load_info
if load_info is None or load_info.cluster_load < 0:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means something in load retriever went wrong, in which case this would be as if only outcomes based routing strategy ran, which effectively means everything works the same as before

return

pass_through_threshold = int(self.get_config_value("pass_through_load_percentage"))
pass_through_max_threads = int(self.get_config_value("pass_through_max_threads"))

if load_info.cluster_load < pass_through_threshold:
routing_decision.can_run = True
routing_decision.is_throttled = False
routing_decision.clickhouse_settings["max_threads"] = pass_through_max_threads
routing_decision.routing_context.extra_info["load_based_pass_through"] = {
"threshold": pass_through_threshold,
"max_threads": pass_through_max_threads,
}
sentry_sdk.update_current_span( # pyright: ignore[reportUndefinedVariable]
attributes={
"load_based_pass_through": True,
"cluster_load": load_info.cluster_load,
"pass_through_threshold": pass_through_threshold,
"pass_through_max_threads": pass_through_max_threads,
}
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from snuba.configs.configuration import Configuration
from snuba.web.rpc.storage_routing.routing_strategies.load_based import (
LoadBasedRoutingStrategy,
)
from snuba.web.rpc.storage_routing.routing_strategies.outcomes_based import (
OutcomesBasedRoutingStrategy,
)
from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import (
BaseRoutingStrategy,
RoutingDecision,
)


class LoadBasedOutcomesRoutingStrategy(BaseRoutingStrategy):
"""
Chains outcomes-based routing followed by load-based adjustments.
"""

def __init__(self) -> None:
super().__init__()
self._outcomes_based_routing_strategy = OutcomesBasedRoutingStrategy()
self._load_based_routing_strategy = LoadBasedRoutingStrategy()

def _additional_config_definitions(self) -> list[Configuration]:
return (
self._outcomes_based_routing_strategy.additional_config_definitions()
+ self._load_based_routing_strategy.additional_config_definitions()
)

def _update_routing_decision(self, routing_decision: RoutingDecision) -> None:
self._load_based_routing_strategy._update_routing_decision(routing_decision)
self._outcomes_based_routing_strategy._update_routing_decision(routing_decision)
108 changes: 108 additions & 0 deletions tests/web/rpc/v1/routing_strategies/test_load_based.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import uuid
from datetime import UTC, datetime, timedelta
from unittest.mock import patch

import pytest
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import TraceItemTableRequest
from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta, TraceItemType

from snuba.configs.configuration import Configuration, ResourceIdentifier
from snuba.datasets.storages.storage_key import StorageKey
from snuba.query.allocation_policies import (
MAX_THRESHOLD,
NO_SUGGESTION,
NO_UNITS,
AllocationPolicy,
QueryResultOrError,
QuotaAllowance,
)
from snuba.utils.metrics.timer import Timer
from snuba.web.rpc.storage_routing.load_retriever import LoadInfo
from snuba.web.rpc.storage_routing.routing_strategies.load_based_outcomes import (
LoadBasedOutcomesRoutingStrategy,
)
from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import (
BaseRoutingStrategy,
RoutingContext,
)

BASE_TIME = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0)
_PROJECT_ID = 1
_ORG_ID = 1


def _get_request_meta(hour_interval: int = 1) -> RequestMeta:
start = BASE_TIME - timedelta(hours=hour_interval)
end = BASE_TIME
return RequestMeta(
project_ids=[_PROJECT_ID],
organization_id=_ORG_ID,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=int(start.timestamp())),
end_timestamp=Timestamp(seconds=int(end.timestamp())),
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
downsampled_storage_config=DownsampledStorageConfig(
mode=DownsampledStorageConfig.MODE_NORMAL
),
)


@pytest.mark.clickhouse_db
@pytest.mark.redis_db
def test_load_based_routing_pass_through_even_if_policies_reject() -> None:
class RejectAllPolicy(AllocationPolicy):
def _additional_config_definitions(self) -> list[Configuration]:
return []

def _get_quota_allowance(
self, tenant_ids: dict[str, str | int], query_id: str
) -> QuotaAllowance:
return QuotaAllowance(
can_run=False,
max_threads=0,
explanation={"reason": "reject all"},
is_throttled=True,
throttle_threshold=MAX_THRESHOLD,
rejection_threshold=MAX_THRESHOLD,
quota_used=0,
quota_unit=NO_UNITS,
suggestion=NO_SUGGESTION,
)

def _update_quota_balance(
self,
tenant_ids: dict[str, str | int],
query_id: str,
result_or_error: QueryResultOrError,
) -> None:
return

strategy = LoadBasedOutcomesRoutingStrategy()
request = TraceItemTableRequest(meta=_get_request_meta(hour_interval=1))
context = RoutingContext(
in_msg=request,
timer=Timer("test"),
query_id=uuid.uuid4().hex,
)

with patch.object(
BaseRoutingStrategy,
"get_allocation_policies",
return_value=[
RejectAllPolicy(ResourceIdentifier(StorageKey("doesntmatter")), ["org_id"], {})
],
):
with patch(
"snuba.web.rpc.storage_routing.routing_strategies.storage_routing.get_cluster_loadinfo",
return_value=LoadInfo(cluster_load=5.0, concurrent_queries=1),
):
routing_decision = strategy.get_routing_decision(context)

assert routing_decision.can_run is True
assert routing_decision.clickhouse_settings.get("max_threads") == 10
assert "load_based_pass_through" in routing_decision.routing_context.extra_info
assert routing_decision.routing_context.cluster_load_info is not None
assert routing_decision.routing_context.cluster_load_info.cluster_load == 5.0
Loading