-
-
Notifications
You must be signed in to change notification settings - Fork 61
feat(cbrs): load based routing strategy #7519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
xurui-c
wants to merge
11
commits into
master
Choose a base branch
from
rachel/loadbasedrouting
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
1d2cd49
idk
f85a644
feat(cbrs): load based routing strategy
300ca77
revert file
0c56783
fix
f9c5724
chain
08c12bf
10
5d73f81
better
b5eb5e4
load info
b10ab53
switch
4880d33
fix name and comment
f48f85b
c
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
57 changes: 57 additions & 0 deletions
57
snuba/web/rpc/storage_routing/routing_strategies/load_based.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| import sentry_sdk | ||
|
|
||
| from snuba.configs.configuration import Configuration | ||
| from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( | ||
| BaseRoutingStrategy, | ||
| RoutingDecision, | ||
| ) | ||
|
|
||
|
|
||
| class LoadBasedRoutingStrategy(BaseRoutingStrategy): | ||
| """ | ||
| If cluster load is under a threshold, ignore recommendations and allow the query to pass through. This routing strategy does not decide tiering | ||
| """ | ||
|
|
||
| def _additional_config_definitions(self) -> list[Configuration]: | ||
| return [ | ||
| Configuration( | ||
| name="pass_through_load_percentage", | ||
| description="If cluster load is below this percentage, allow the query to run regardless of allocation policies", | ||
| value_type=int, | ||
| default=10, | ||
| ), | ||
| Configuration( | ||
| name="pass_through_max_threads", | ||
| description="Max threads to use when allowing the query to pass through under low load", | ||
| value_type=int, | ||
| default=10, | ||
| ), | ||
| ] | ||
|
|
||
| def _update_routing_decision( | ||
| self, | ||
| routing_decision: RoutingDecision, | ||
| ) -> None: | ||
| load_info = routing_decision.routing_context.cluster_load_info | ||
| if load_info is None or load_info.cluster_load < 0: | ||
| return | ||
|
|
||
| pass_through_threshold = int(self.get_config_value("pass_through_load_percentage")) | ||
| pass_through_max_threads = int(self.get_config_value("pass_through_max_threads")) | ||
|
|
||
| if load_info.cluster_load < pass_through_threshold: | ||
| routing_decision.can_run = True | ||
| routing_decision.is_throttled = False | ||
| routing_decision.clickhouse_settings["max_threads"] = pass_through_max_threads | ||
| routing_decision.routing_context.extra_info["load_based_pass_through"] = { | ||
| "threshold": pass_through_threshold, | ||
| "max_threads": pass_through_max_threads, | ||
| } | ||
| sentry_sdk.update_current_span( # pyright: ignore[reportUndefinedVariable] | ||
| attributes={ | ||
| "load_based_pass_through": True, | ||
| "cluster_load": load_info.cluster_load, | ||
| "pass_through_threshold": pass_through_threshold, | ||
| "pass_through_max_threads": pass_through_max_threads, | ||
| } | ||
| ) | ||
32 changes: 32 additions & 0 deletions
32
snuba/web/rpc/storage_routing/routing_strategies/load_based_outcomes.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| from snuba.configs.configuration import Configuration | ||
| from snuba.web.rpc.storage_routing.routing_strategies.load_based import ( | ||
| LoadBasedRoutingStrategy, | ||
| ) | ||
| from snuba.web.rpc.storage_routing.routing_strategies.outcomes_based import ( | ||
| OutcomesBasedRoutingStrategy, | ||
| ) | ||
| from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( | ||
| BaseRoutingStrategy, | ||
| RoutingDecision, | ||
| ) | ||
|
|
||
|
|
||
| class LoadBasedOutcomesRoutingStrategy(BaseRoutingStrategy): | ||
| """ | ||
| Chains outcomes-based routing followed by load-based adjustments. | ||
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| super().__init__() | ||
| self._outcomes_based_routing_strategy = OutcomesBasedRoutingStrategy() | ||
| self._load_based_routing_strategy = LoadBasedRoutingStrategy() | ||
|
|
||
| def _additional_config_definitions(self) -> list[Configuration]: | ||
| return ( | ||
| self._outcomes_based_routing_strategy.additional_config_definitions() | ||
| + self._load_based_routing_strategy.additional_config_definitions() | ||
| ) | ||
|
|
||
| def _update_routing_decision(self, routing_decision: RoutingDecision) -> None: | ||
| self._load_based_routing_strategy._update_routing_decision(routing_decision) | ||
| self._outcomes_based_routing_strategy._update_routing_decision(routing_decision) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| import uuid | ||
| from datetime import UTC, datetime, timedelta | ||
| from unittest.mock import patch | ||
|
|
||
| import pytest | ||
| from google.protobuf.timestamp_pb2 import Timestamp | ||
| from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig | ||
| from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import TraceItemTableRequest | ||
| from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta, TraceItemType | ||
|
|
||
| from snuba.configs.configuration import Configuration, ResourceIdentifier | ||
| from snuba.datasets.storages.storage_key import StorageKey | ||
| from snuba.query.allocation_policies import ( | ||
| MAX_THRESHOLD, | ||
| NO_SUGGESTION, | ||
| NO_UNITS, | ||
| AllocationPolicy, | ||
| QueryResultOrError, | ||
| QuotaAllowance, | ||
| ) | ||
| from snuba.utils.metrics.timer import Timer | ||
| from snuba.web.rpc.storage_routing.load_retriever import LoadInfo | ||
| from snuba.web.rpc.storage_routing.routing_strategies.load_based_outcomes import ( | ||
| LoadBasedOutcomesRoutingStrategy, | ||
| ) | ||
| from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( | ||
| BaseRoutingStrategy, | ||
| RoutingContext, | ||
| ) | ||
|
|
||
| BASE_TIME = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0) | ||
| _PROJECT_ID = 1 | ||
| _ORG_ID = 1 | ||
|
|
||
|
|
||
| def _get_request_meta(hour_interval: int = 1) -> RequestMeta: | ||
| start = BASE_TIME - timedelta(hours=hour_interval) | ||
| end = BASE_TIME | ||
| return RequestMeta( | ||
| project_ids=[_PROJECT_ID], | ||
| organization_id=_ORG_ID, | ||
| cogs_category="something", | ||
| referrer="something", | ||
| start_timestamp=Timestamp(seconds=int(start.timestamp())), | ||
| end_timestamp=Timestamp(seconds=int(end.timestamp())), | ||
| trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, | ||
| downsampled_storage_config=DownsampledStorageConfig( | ||
| mode=DownsampledStorageConfig.MODE_NORMAL | ||
| ), | ||
| ) | ||
|
|
||
|
|
||
| @pytest.mark.clickhouse_db | ||
| @pytest.mark.redis_db | ||
| def test_load_based_routing_pass_through_even_if_policies_reject() -> None: | ||
| class RejectAllPolicy(AllocationPolicy): | ||
| def _additional_config_definitions(self) -> list[Configuration]: | ||
| return [] | ||
|
|
||
| def _get_quota_allowance( | ||
| self, tenant_ids: dict[str, str | int], query_id: str | ||
| ) -> QuotaAllowance: | ||
| return QuotaAllowance( | ||
| can_run=False, | ||
| max_threads=0, | ||
| explanation={"reason": "reject all"}, | ||
| is_throttled=True, | ||
| throttle_threshold=MAX_THRESHOLD, | ||
| rejection_threshold=MAX_THRESHOLD, | ||
| quota_used=0, | ||
| quota_unit=NO_UNITS, | ||
| suggestion=NO_SUGGESTION, | ||
| ) | ||
|
|
||
| def _update_quota_balance( | ||
| self, | ||
| tenant_ids: dict[str, str | int], | ||
| query_id: str, | ||
| result_or_error: QueryResultOrError, | ||
| ) -> None: | ||
| return | ||
|
|
||
| strategy = LoadBasedOutcomesRoutingStrategy() | ||
| request = TraceItemTableRequest(meta=_get_request_meta(hour_interval=1)) | ||
| context = RoutingContext( | ||
| in_msg=request, | ||
| timer=Timer("test"), | ||
| query_id=uuid.uuid4().hex, | ||
| ) | ||
|
|
||
| with patch.object( | ||
| BaseRoutingStrategy, | ||
| "get_allocation_policies", | ||
| return_value=[ | ||
| RejectAllPolicy(ResourceIdentifier(StorageKey("doesntmatter")), ["org_id"], {}) | ||
| ], | ||
| ): | ||
| with patch( | ||
| "snuba.web.rpc.storage_routing.routing_strategies.storage_routing.get_cluster_loadinfo", | ||
| return_value=LoadInfo(cluster_load=5.0, concurrent_queries=1), | ||
| ): | ||
| routing_decision = strategy.get_routing_decision(context) | ||
|
|
||
| assert routing_decision.can_run is True | ||
| assert routing_decision.clickhouse_settings.get("max_threads") == 10 | ||
| assert "load_based_pass_through" in routing_decision.routing_context.extra_info | ||
| assert routing_decision.routing_context.cluster_load_info is not None | ||
| assert routing_decision.routing_context.cluster_load_info.cluster_load == 5.0 |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this means something in load retriever went wrong, in which case this would be as if only outcomes based routing strategy ran, which effectively means everything works the same as before