From 37706a1fddc5b849dbeae1c2ac4e7bbd18527e4e Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 9 Jan 2026 14:42:22 -0800 Subject: [PATCH 1/4] [wip] move delete logic out of snuba/web --- snuba/admin/views.py | 10 +- snuba/cli/lw_deletions_consumer.py | 6 +- snuba/lw_deletions/bulk_delete_query.py | 333 ++++++++++++++ snuba/lw_deletions/delete_query.py | 402 ++++++++++++++++ snuba/lw_deletions/formatters.py | 5 +- snuba/lw_deletions/strategy.py | 15 +- snuba/web/bulk_delete_query.py | 368 ++------------- snuba/web/delete_query.py | 429 ++---------------- .../web/rpc/v1/endpoint_delete_trace_items.py | 2 +- snuba/web/views.py | 9 +- tests/lw_deletions/test_formatters.py | 5 +- tests/lw_deletions/test_lw_deletions.py | 2 +- tests/test_search_issues_api.py | 2 +- .../v1/test_endpoint_delete_trace_items.py | 10 +- tests/web/test_bulk_delete_query.py | 22 +- tests/web/test_delete_query.py | 4 +- tests/web/test_max_rows_enforcer.py | 2 +- 17 files changed, 854 insertions(+), 772 deletions(-) create mode 100644 snuba/lw_deletions/bulk_delete_query.py create mode 100644 snuba/lw_deletions/delete_query.py diff --git a/snuba/admin/views.py b/snuba/admin/views.py index 49ba5f29b96..09bdbe46c81 100644 --- a/snuba/admin/views.py +++ b/snuba/admin/views.py @@ -70,6 +70,11 @@ from snuba.datasets.factory import InvalidDatasetError, get_enabled_dataset_names from snuba.datasets.storages.factory import get_storage, get_writable_storage from snuba.datasets.storages.storage_key import StorageKey +from snuba.lw_deletions.delete_query import ( + DeletesNotEnabledError, + delete_from_storage, + deletes_are_enabled, +) from snuba.manual_jobs.runner import ( list_job_specs, list_job_specs_with_status, @@ -90,11 +95,6 @@ from snuba.state.explain_meta import explain_cleanup, get_explain_meta from snuba.utils.metrics.timer import Timer from snuba.utils.registered_class import InvalidConfigKeyError -from snuba.web.delete_query import ( - DeletesNotEnabledError, - delete_from_storage, - deletes_are_enabled, -) from snuba.web.rpc import RPCEndpoint, list_all_endpoint_names, run_rpc_handler from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( BaseRoutingStrategy, diff --git a/snuba/cli/lw_deletions_consumer.py b/snuba/cli/lw_deletions_consumer.py index 228c5198187..080b1b9abcb 100644 --- a/snuba/cli/lw_deletions_consumer.py +++ b/snuba/cli/lw_deletions_consumer.py @@ -19,11 +19,11 @@ from snuba.datasets.storages.factory import get_writable_storage from snuba.datasets.storages.storage_key import StorageKey from snuba.environment import setup_logging, setup_sentry +from snuba.lw_deletions.bulk_delete_query import STORAGE_TOPIC from snuba.lw_deletions.formatters import STORAGE_FORMATTER from snuba.lw_deletions.strategy import LWDeletionsConsumerStrategyFactory from snuba.utils.metrics.wrapper import MetricsWrapper from snuba.utils.streams.metrics_adapter import StreamMetricsAdapter -from snuba.web.bulk_delete_query import STORAGE_TOPIC # A longer batch time for deletes is reasonable # since we want fewer mutations @@ -126,9 +126,7 @@ def handler(signum: int, frame: Any) -> None: "consumer_group": consumer_group, "storage": storage, } - metrics = MetricsWrapper( - environment.metrics, "lw_deletions_consumer", tags=metrics_tags - ) + metrics = MetricsWrapper(environment.metrics, "lw_deletions_consumer", tags=metrics_tags) configure_metrics(StreamMetricsAdapter(metrics), force=True) consumer_config = resolve_consumer_config( storage_names=[storage], diff --git a/snuba/lw_deletions/bulk_delete_query.py b/snuba/lw_deletions/bulk_delete_query.py new file mode 100644 index 00000000000..b1847a491ca --- /dev/null +++ b/snuba/lw_deletions/bulk_delete_query.py @@ -0,0 +1,333 @@ +import logging +import time +from threading import Thread +from typing import Any, Dict, Mapping, MutableMapping, Optional, Sequence, TypedDict + +import rapidjson +from confluent_kafka import KafkaError +from confluent_kafka import Message as KafkaMessage +from confluent_kafka import Producer + +from snuba import environment, settings +from snuba.attribution.attribution_info import AttributionInfo +from snuba.clickhouse.columns import ColumnSet +from snuba.clickhouse.query import Query +from snuba.datasets.deletion_settings import DeletionSettings, get_trace_item_type_name +from snuba.datasets.storage import WritableTableStorage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.lw_deletions.delete_query import ( + DeletesNotEnabledError, + _construct_condition, + _enforce_max_rows, + _get_attribution_info, + deletes_are_enabled, +) +from snuba.lw_deletions.types import AttributeConditions, ConditionsBag, ConditionsType +from snuba.query.conditions import combine_or_conditions +from snuba.query.data_source.simple import Table +from snuba.query.dsl import literal +from snuba.query.exceptions import InvalidQueryException, NoRowsToDeleteException +from snuba.query.expressions import Expression +from snuba.reader import Result +from snuba.state import get_int_config, get_str_config +from snuba.utils.metrics.util import with_span +from snuba.utils.metrics.wrapper import MetricsWrapper +from snuba.utils.schemas import ColumnValidator, InvalidColumnType +from snuba.utils.streams.configuration_builder import build_kafka_producer_configuration +from snuba.utils.streams.topics import Topic + +metrics = MetricsWrapper(environment.metrics, "snuba.delete") +logger = logging.getLogger(__name__) + + +class WireAttributeCondition(TypedDict): + attr_key_type: int + attr_key_name: str + attr_values: Sequence[bool | str | int | float] + + +class DeleteQueryMessage(TypedDict, total=False): + rows_to_delete: int + storage_name: str + conditions: ConditionsType + tenant_ids: Mapping[str, str | int] + attribute_conditions: Optional[Dict[str, WireAttributeCondition]] + attribute_conditions_item_type: Optional[int] + + +PRODUCER_MAP: MutableMapping[str, Producer] = {} +STORAGE_TOPIC: Mapping[str, Topic] = { + StorageKey.SEARCH_ISSUES.value: Topic.LW_DELETIONS_GENERIC_EVENTS, + StorageKey.EAP_ITEMS.value: Topic.LW_DELETIONS_EAP_ITEMS, +} + + +class InvalidStorageTopic(Exception): + pass + + +def _get_kafka_producer(topic: Topic) -> Producer: + producer = PRODUCER_MAP.get(topic.value) + if not producer: + producer = Producer( + build_kafka_producer_configuration( + topic=topic, + ) + ) + PRODUCER_MAP[topic.value] = producer + return producer + + +def flush_producers() -> None: + """ + It's not guaranteed that there will be a steady stream of + DELETE requests so we can call producer.flush() for any active + producers to make sure the delivery callbacks are called. + """ + + def _flush_producers() -> None: + while True: + for storage, producer in PRODUCER_MAP.items(): + messages_remaining = producer.flush(5.0) + if messages_remaining: + logger.debug(f"{messages_remaining} {storage} messages pending delivery") + time.sleep(1) + + Thread(target=_flush_producers, name="flush_producers", daemon=True).start() + + +def _delete_query_delivery_callback(error: Optional[KafkaError], message: KafkaMessage) -> None: + metrics.increment( + "delete_query.delivery_callback", + tags={"status": "failure" if error else "success"}, + ) + + if error is not None: + logger.warning("Could not produce delete query due to error: %r", error) + + +FLUSH_PRODUCERS_THREAD_STARTED = False + + +def produce_delete_query(delete_query: DeleteQueryMessage) -> None: + global FLUSH_PRODUCERS_THREAD_STARTED + if not FLUSH_PRODUCERS_THREAD_STARTED: + FLUSH_PRODUCERS_THREAD_STARTED = True + flush_producers() + + storage_name = delete_query["storage_name"] + topic = STORAGE_TOPIC.get(storage_name) + if not topic: + raise InvalidStorageTopic(f"No topic found for {storage_name}") + try: + producer = _get_kafka_producer(topic) + data = rapidjson.dumps(delete_query).encode("utf-8") + producer.poll(0) # trigger queued delivery callbacks + producer.produce( + settings.KAFKA_TOPIC_MAP.get(topic.value, topic.value), + data, + on_delivery=_delete_query_delivery_callback, + ) + except Exception as ex: + logger.exception("Could not produce delete query due to error: %r", ex) + + +def _validate_attribute_conditions( + attribute_conditions: AttributeConditions, + delete_settings: DeletionSettings, +) -> None: + """ + Validates that the attribute_conditions are allowed for the configured item_type. + + Args: + attribute_conditions: AttributeConditions containing item_type and attribute mappings + delete_settings: The deletion settings for the storage + + Raises: + InvalidQueryException: If no attributes are configured for the item_type, + or if any requested attributes are not allowed + """ + allowed_attrs_config = delete_settings.allowed_attributes_by_item_type + + if not allowed_attrs_config: + raise InvalidQueryException("No attribute-based deletions configured for this storage") + + # Map the integer item_type to its string name used in configuration + try: + item_type_name = get_trace_item_type_name(attribute_conditions.item_type) + except ValueError as e: + raise InvalidQueryException(str(e)) + + # Check if this specific item_type has any allowed attributes configured + if item_type_name not in allowed_attrs_config: + raise InvalidQueryException( + f"No attribute-based deletions configured for item_type {item_type_name} " + f"(value: {attribute_conditions.item_type}). Configured item types: " + f"{sorted(allowed_attrs_config.keys())}" + ) + + # Get the allowed attributes for this specific item_type + allowed_attrs = allowed_attrs_config[item_type_name] + + # Validate that all requested attributes are allowed + requested_attrs = set(attribute_conditions.attributes.keys()) + allowed_attrs_set = set(allowed_attrs) + invalid_attrs = requested_attrs - allowed_attrs_set + + if invalid_attrs: + raise InvalidQueryException( + f"Invalid attributes for deletion on item_type '{item_type_name}': {invalid_attrs}. " + f"Allowed attributes: {allowed_attrs_set}" + ) + + +@with_span() +def delete_from_storage( + storage: WritableTableStorage, + column_conditions: Dict[str, list[Any]], + attribution_info: Mapping[str, Any], + attribute_conditions: Optional[AttributeConditions] = None, +) -> dict[str, Result]: + """ + This method does a series of validation checks (outline below), + before `delete_from_tables` produces the delete query messages + to the appropriate topic. + + * runtime flag validation `storage_deletes_enabled` (done by region) + * storage validation that deletes are enabled + * column names are valid (allowed_columns storage setting) + * column types are valid + * attribute names are valid (allowed_attributes_by_item_type storage setting) + """ + if not deletes_are_enabled(): + raise DeletesNotEnabledError("Deletes not enabled in this region") + + delete_settings = storage.get_deletion_settings() + if not delete_settings.is_enabled: + raise DeletesNotEnabledError(f"Deletes not enabled for {storage.get_storage_key().value}") + + columns_diff = set(column_conditions.keys()) - set(delete_settings.allowed_columns) + if columns_diff != set(): + raise InvalidQueryException( + f"Invalid Columns to filter by, must be in {delete_settings.allowed_columns}" + ) + + # validate column types + columns = storage.get_schema().get_columns() + column_validator = ColumnValidator(columns) + try: + for col, values in column_conditions.items(): + column_validator.validate(col, values) + except InvalidColumnType as e: + raise InvalidQueryException(e.message) + + # validate attribute conditions if provided + if attribute_conditions: + _validate_attribute_conditions(attribute_conditions, delete_settings) + + if not get_int_config("permit_delete_by_attribute", default=0): + logger.error( + "valid attribute_conditions passed to delete_from_storage, but delete will be ignored " + "as functionality is not yet launched (permit_delete_by_attribute=0)" + ) + return {} + + attr_info = _get_attribution_info(attribution_info) + return delete_from_tables( + storage, + delete_settings.tables, + ConditionsBag( + column_conditions=column_conditions, + attribute_conditions=attribute_conditions, + ), + attr_info, + ) + + +def construct_query(storage: WritableTableStorage, table: str, condition: Expression) -> Query: + cluster_name = storage.get_cluster().get_clickhouse_cluster_name() + on_cluster = literal(cluster_name) if cluster_name else None + return Query( + from_clause=Table( + table, + ColumnSet([]), + storage_key=storage.get_storage_key(), + allocation_policies=storage.get_delete_allocation_policies(), + ), + condition=condition, + on_cluster=on_cluster, + is_delete=True, + ) + + +def _serialize_attribute_conditions( + attribute_conditions: AttributeConditions, +) -> Dict[str, WireAttributeCondition]: + result: Dict[str, WireAttributeCondition] = {} + for key, (attr_key_enum, values) in attribute_conditions.attributes.items(): + result[key] = { + "attr_key_type": attr_key_enum.type, + "attr_key_name": attr_key_enum.name, + "attr_values": values, + } + return result + + +def delete_from_tables( + storage: WritableTableStorage, + tables: Sequence[str], + conditions: ConditionsBag, + attribution_info: AttributionInfo, +) -> dict[str, Result]: + highest_rows_to_delete = 0 + result: dict[str, Result] = {} + for table in tables: + where_clause = _construct_condition(storage, conditions) + query = construct_query(storage, table, where_clause) + try: + num_rows_to_delete = _enforce_max_rows(query) + highest_rows_to_delete = max(highest_rows_to_delete, num_rows_to_delete) + result[table] = {"data": [{"rows_to_delete": num_rows_to_delete}]} + except NoRowsToDeleteException: + result[table] = {} + + if highest_rows_to_delete == 0: + return result + + storage_name = storage.get_storage_key().value + project_id = attribution_info.tenant_ids.get("project_id") + if project_id and should_use_killswitch(storage_name, str(project_id)): + return result + + delete_query: DeleteQueryMessage = { + "rows_to_delete": highest_rows_to_delete, + "storage_name": storage_name, + "conditions": conditions.column_conditions, + "tenant_ids": attribution_info.tenant_ids, + } + + # Add attribute_conditions to the message if present + if conditions.attribute_conditions: + delete_query["attribute_conditions"] = _serialize_attribute_conditions( + conditions.attribute_conditions + ) + delete_query["attribute_conditions_item_type"] = conditions.attribute_conditions.item_type + + produce_delete_query(delete_query) + return result + + +def construct_or_conditions( + storage: WritableTableStorage, + conditions: Sequence[ConditionsBag], +) -> Expression: + """ + Combines multiple AND conditions: (equals(project_id, 1) AND in(group_id, (2, 3, 4, 5)) + into OR conditions for a bulk delete + """ + return combine_or_conditions([_construct_condition(storage, cond) for cond in conditions]) + + +def should_use_killswitch(storage_name: str, project_id: str) -> bool: + killswitch_config = get_str_config(f"lw_deletes_killswitch_{storage_name}", default="") + return project_id in killswitch_config if killswitch_config else False diff --git a/snuba/lw_deletions/delete_query.py b/snuba/lw_deletions/delete_query.py new file mode 100644 index 00000000000..ab3cd584db8 --- /dev/null +++ b/snuba/lw_deletions/delete_query.py @@ -0,0 +1,402 @@ +import typing +import uuid +from typing import Any, Mapping, MutableMapping, Optional, Sequence + +from snuba import settings +from snuba.attribution import get_app_id +from snuba.attribution.attribution_info import AttributionInfo +from snuba.clickhouse.columns import ColumnSet +from snuba.clickhouse.errors import ClickhouseError +from snuba.clickhouse.formatter.query import format_query +from snuba.clickhouse.query import Query +from snuba.clickhouse.translators.snuba.mapping import SnubaClickhouseMappingTranslator +from snuba.clusters.cluster import ClickhouseClientSettings, ClickhouseCluster +from snuba.datasets.entities.entity_key import EntityKey +from snuba.datasets.entities.factory import get_entity +from snuba.datasets.storage import WritableTableStorage +from snuba.datasets.storages.factory import get_storage, get_writable_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.lw_deletions.types import ConditionsBag, ConditionsType +from snuba.protos.common import attribute_key_to_expression +from snuba.query import SelectedExpression +from snuba.query.allocation_policies import ( + AllocationPolicy, + AllocationPolicyViolations, + QueryResultOrError, +) +from snuba.query.conditions import combine_and_conditions +from snuba.query.data_source.simple import Table +from snuba.query.dsl import column, equals, in_cond, literal, literals_tuple +from snuba.query.exceptions import ( + InvalidQueryException, + NoRowsToDeleteException, + TooManyDeleteRowsException, +) +from snuba.query.expressions import Expression, FunctionCall +from snuba.query.query_settings import HTTPQuerySettings +from snuba.reader import Result +from snuba.state import get_config, get_int_config +from snuba.utils.metrics.util import with_span +from snuba.utils.schemas import ColumnValidator, InvalidColumnType +from snuba.web import QueryException, QueryExtraData, QueryResult +from snuba.web.db_query import _apply_allocation_policies_quota + + +class DeletesNotEnabledError(Exception): + pass + + +class TooManyOngoingMutationsError(Exception): + pass + + +@with_span() +def delete_from_storage( + storage: WritableTableStorage, + columns: ConditionsType, + attribution_info: Mapping[str, Any], +) -> dict[str, Result]: + """ + Inputs: + storage - storage to delete from + columns - a mapping from column-name to a list of column values + that defines the delete conditions. ex: + { + "id": [1, 2, 3] + "status": ["failed"] + } + represents + DELETE FROM ... WHERE id in (1,2,3) AND status='failed' + attribution_info - see other parts of repo + max_ongoing_mutations - the max number of mutations that can be in-progress + on any replica in the clickhouse cluster for the delete to be scheduled. + If any replica in the cluster has more than max_ongoing_mutations in progress, + then the delete will fail and wont be scheduled. + + Deletes all rows in the given storage, that satisfy the conditions + defined in 'columns' input. + + Returns a mapping from clickhouse table name to deletion results, there + will be an entry for every local clickhouse table that makes up the storage. + """ + if not deletes_are_enabled(): + raise DeletesNotEnabledError("Deletes not enabled in this region") + + delete_settings = storage.get_deletion_settings() + if not delete_settings.is_enabled: + raise DeletesNotEnabledError(f"Deletes not enabled for {storage.get_storage_key().value}") + + if delete_settings.bulk_delete_only: + raise DeletesNotEnabledError( + f"Synchronous deletes not enabled for {storage.get_storage_key().value}" + ) + + results: dict[str, Result] = {} + attr_info = _get_attribution_info(attribution_info) + + # fail if too many mutations ongoing + ongoing_mutations = _num_ongoing_mutations(storage.get_cluster(), delete_settings.tables) + max_ongoing_mutations = get_int_config( + "MAX_ONGOING_MUTATIONS_FOR_DELETE", + default=settings.MAX_ONGOING_MUTATIONS_FOR_DELETE, + ) + assert max_ongoing_mutations + if ongoing_mutations > max_ongoing_mutations: + raise TooManyOngoingMutationsError( + f"max ongoing mutations to do a delete is {max_ongoing_mutations}, but at least one replica has {ongoing_mutations} ongoing" + ) + + for table in delete_settings.tables: + result = _delete_from_table(storage, table, columns, attr_info) + results[table] = result + return results + + +def _preprocess_for_items(storage: WritableTableStorage, where_clause: Expression) -> Expression: + if storage.get_storage_key() != StorageKey.EAP_ITEMS: + return where_clause + + entity = get_entity(EntityKey.EAP_ITEMS) + for storage_connection in entity.get_all_storage_connections(): + if storage_connection.storage.get_storage_key() == StorageKey.EAP_ITEMS: + translation_mappers = storage_connection.translation_mappers + translator = SnubaClickhouseMappingTranslator(translation_mappers) + return where_clause.accept(translator) + + return where_clause + + +def _delete_from_table( + storage: WritableTableStorage, + table: str, + conditions: ConditionsType, + attribution_info: AttributionInfo, +) -> Result: + cluster_name = storage.get_cluster().get_clickhouse_cluster_name() + on_cluster = literal(cluster_name) if cluster_name else None + where_clause = _construct_condition(storage, ConditionsBag(column_conditions=conditions)) + query = Query( + from_clause=Table( + table, + ColumnSet([]), + storage_key=storage.get_storage_key(), + allocation_policies=storage.get_delete_allocation_policies(), + ), + condition=where_clause, + on_cluster=on_cluster, + is_delete=True, + ) + + columns = storage.get_schema().get_columns() + column_validator = ColumnValidator(columns) + try: + for col, values in conditions.items(): + column_validator.validate(col, values) + except InvalidColumnType as e: + raise InvalidQueryException(e.message) + + try: + _enforce_max_rows(query) + except NoRowsToDeleteException: + result: Result = {} + return result + + deletion_processors = storage.get_deletion_processors() + # These settings aren't needed at the moment + dummy_query_settings = HTTPQuerySettings() + for deletion_procesor in deletion_processors: + deletion_procesor.process_query(query, dummy_query_settings) + + return _execute_query( + query, storage, table, cluster_name, attribution_info, dummy_query_settings + ) + + +def _num_ongoing_mutations(cluster: ClickhouseCluster, tables: Sequence[str]) -> int: + """ + Given a clickhouse cluster and a list of tables, + returns the maximum of ongoing mutations for the tables, + across all replicas + """ + if cluster.is_single_node(): + query = f""" +SELECT max(cnt) +FROM ( + SELECT table, count() as cnt + FROM system.mutations + WHERE table IN ({", ".join(map(repr, tables))}) AND is_done=0 + GROUP BY table +) +""" + else: + query = f""" +SELECT max(cnt) +FROM ( + SELECT hostname() as host, table, count() as cnt + FROM clusterAllReplicas('{cluster.get_clickhouse_cluster_name()}', 'system', mutations) + WHERE table IN ({", ".join(map(repr, tables))}) AND is_done=0 + GROUP BY host, table + ) +""" + return int( + cluster.get_query_connection(ClickhouseClientSettings.QUERY).execute(query).results[0][0] + ) + + +def deletes_are_enabled() -> bool: + return bool(get_config("storage_deletes_enabled", 1)) + + +def _get_rows_to_delete(storage_key: StorageKey, select_query_to_count_rows: Query) -> int: + formatted_select_query_to_count_rows = format_query(select_query_to_count_rows) + select_query_results = ( + get_storage(storage_key) + .get_cluster() + .get_reader() + .execute(formatted_select_query_to_count_rows) + ) + return typing.cast(int, select_query_results["data"][0]["count"]) + + +def _enforce_max_rows(delete_query: Query) -> int: + """ + The cost of a lightweight delete operation depends on the number of matching rows in the WHERE clause and the current number of data parts. + This operation will be most efficient when matching a small number of rows, **and on wide parts** (where the `_row_exists` column is stored + in its own file) + + Because of the above, we want to limit the number of rows one deletes at a time. The `MaxRowsEnforcer` will query clickhouse to see how many + rows we plan on deleting and if it crosses the `max_rows_to_delete` set for that storage we will reject the query. + """ + storage_key = delete_query.get_from_clause().storage_key + + def get_new_from_clause() -> Table: + """ + The delete query targets the local table, but when we are checking the + row count we are querying the dist tables (if applicable). This function + updates the from_clause to have the correct table. + """ + dist_table_name = ( + get_writable_storage((storage_key)).get_table_writer().get_schema().get_table_name() + ) + from_clause = delete_query.get_from_clause() + return Table( + table_name=dist_table_name, + schema=from_clause.schema, + storage_key=from_clause.storage_key, + allocation_policies=from_clause.allocation_policies, + ) + + select_query_to_count_rows = Query( + selected_columns=[ + SelectedExpression("count", FunctionCall("count", "count", ())), + ], + from_clause=get_new_from_clause(), + condition=delete_query.get_condition(), + ) + rows_to_delete = _get_rows_to_delete( + storage_key=storage_key, select_query_to_count_rows=select_query_to_count_rows + ) + if rows_to_delete == 0: + raise NoRowsToDeleteException + max_rows_allowed = get_storage(storage_key).get_deletion_settings().max_rows_to_delete + if rows_to_delete > max_rows_allowed: + raise TooManyDeleteRowsException( + f"Too many rows to delete ({rows_to_delete}), maximum allowed is {max_rows_allowed}" + ) + + return rows_to_delete + + +def _get_attribution_info(attribution_info: Mapping[str, Any]) -> AttributionInfo: + info = dict(attribution_info) + info["app_id"] = get_app_id(attribution_info["app_id"]) + info["referrer"] = attribution_info["referrer"] + info["tenant_ids"] = attribution_info["tenant_ids"] + return AttributionInfo(**info) + + +def _get_delete_allocation_policies( + storage: WritableTableStorage, +) -> list[AllocationPolicy]: + """mostly here to be able to stub easily in tests""" + return storage.get_delete_allocation_policies() + + +def _execute_query( + query: Query, + storage: WritableTableStorage, + table: str, + cluster_name: Optional[str], + attribution_info: AttributionInfo, + query_settings: HTTPQuerySettings, +) -> Result: + """ + Formats and executes the delete query, taking into account + the delete allocation policies as well. + """ + + formatted_query = format_query(query) + allocation_policies = _get_delete_allocation_policies(storage) + query_id = uuid.uuid4().hex + query_settings.push_clickhouse_setting("query_id", query_id) + result = None + error = None + + stats: MutableMapping[str, Any] = { + "clickhouse_table": table, + "referrer": attribution_info.referrer, + "cluster_name": cluster_name or "", + } + + try: + _apply_allocation_policies_quota( + query_settings, + attribution_info, + formatted_query, + stats, + allocation_policies, + query_id, + ) + result = ( + storage.get_cluster() + .get_deleter() + .execute(formatted_query, query_settings.get_clickhouse_settings()) + ) + except AllocationPolicyViolations as e: + error = QueryException.from_args( + AllocationPolicyViolations.__name__, + "Query cannot be run due to allocation policies", + extra={ + "stats": stats, + "sql": "no sql run", + "experiments": {}, + }, + ) + error.__cause__ = e + except QueryException as e: + error = e + except Exception as e: + error = QueryException.from_args( + # This exception needs to have the message of the cause in it for sentry + # to pick it up properly + e.__class__.__name__, + e.message if isinstance(e, ClickhouseError) else str(e), + { + "stats": stats, + "sql": "", + "experiments": {}, + }, + ) + error.__cause__ = e + finally: + query_result = ( + QueryResult( + result=result, + extra=QueryExtraData(stats=stats, sql=formatted_query.get_sql(), experiments={}), + ) + if result + else None + ) + result_or_error = QueryResultOrError(query_result=query_result, error=error) + for allocation_policy in allocation_policies: + allocation_policy.update_quota_balance( + tenant_ids=attribution_info.tenant_ids, + query_id=query_id, + result_or_error=result_or_error, + ) + if result: + return result + raise error or Exception("No error or result when running query, this should never happen") + + +def _construct_condition( + storage: WritableTableStorage, conditions_bag: ConditionsBag +) -> Expression: + columns = conditions_bag.column_conditions + attr_conditions = conditions_bag.attribute_conditions + and_conditions = [] + for col, values in columns.items(): + if len(values) == 1: + exp = equals(column(col), literal(values[0])) + else: + literal_values = [literal(v) for v in values] + exp = in_cond(column(col), literals_tuple(alias=None, literals=literal_values)) + + and_conditions.append(exp) + + if attr_conditions: + for attr_key, attr_values in attr_conditions.attributes.values(): + virtual_column = attribute_key_to_expression(attr_key) + + if len(attr_values) == 1: + exp = equals(virtual_column, literal(attr_values[0])) + else: + literal_values = [literal(v) for v in attr_values] + exp = in_cond( + virtual_column, + literals_tuple(alias=None, literals=literal_values), + ) + and_conditions.append(exp) + + where_clause = combine_and_conditions(and_conditions) + return _preprocess_for_items(storage, where_clause) diff --git a/snuba/lw_deletions/formatters.py b/snuba/lw_deletions/formatters.py index ab14f4ecdea..4946580f661 100644 --- a/snuba/lw_deletions/formatters.py +++ b/snuba/lw_deletions/formatters.py @@ -15,8 +15,11 @@ from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey from snuba.datasets.storages.storage_key import StorageKey +from snuba.lw_deletions.bulk_delete_query import ( + DeleteQueryMessage, + WireAttributeCondition, +) from snuba.lw_deletions.types import AttributeConditions, ConditionsBag -from snuba.web.bulk_delete_query import DeleteQueryMessage, WireAttributeCondition class Formatter(ABC): diff --git a/snuba/lw_deletions/strategy.py b/snuba/lw_deletions/strategy.py index 9426072f274..9b4a46b8398 100644 --- a/snuba/lw_deletions/strategy.py +++ b/snuba/lw_deletions/strategy.py @@ -19,6 +19,15 @@ from snuba.datasets.storage import WritableTableStorage from snuba.datasets.storages.storage_key import StorageKey from snuba.lw_deletions.batching import BatchStepCustom, ValuesBatch +from snuba.lw_deletions.bulk_delete_query import ( + construct_or_conditions, + construct_query, +) +from snuba.lw_deletions.delete_query import ( + TooManyOngoingMutationsError, + _execute_query, + _num_ongoing_mutations, +) from snuba.lw_deletions.formatters import Formatter from snuba.lw_deletions.types import ConditionsBag from snuba.query.allocation_policies import AllocationPolicyViolations @@ -26,13 +35,7 @@ from snuba.state import get_int_config, get_str_config from snuba.utils.metrics import MetricsBackend from snuba.web import QueryException -from snuba.web.bulk_delete_query import construct_or_conditions, construct_query from snuba.web.constants import LW_DELETE_NON_RETRYABLE_CLICKHOUSE_ERROR_CODES -from snuba.web.delete_query import ( - TooManyOngoingMutationsError, - _execute_query, - _num_ongoing_mutations, -) TPayload = TypeVar("TPayload") diff --git a/snuba/web/bulk_delete_query.py b/snuba/web/bulk_delete_query.py index 0ab415dea59..7beeef342f7 100644 --- a/snuba/web/bulk_delete_query.py +++ b/snuba/web/bulk_delete_query.py @@ -1,333 +1,39 @@ -import logging -import time -from threading import Thread -from typing import Any, Dict, Mapping, MutableMapping, Optional, Sequence, TypedDict - -import rapidjson -from confluent_kafka import KafkaError -from confluent_kafka import Message as KafkaMessage -from confluent_kafka import Producer - -from snuba import environment, settings -from snuba.attribution.attribution_info import AttributionInfo -from snuba.clickhouse.columns import ColumnSet -from snuba.clickhouse.query import Query -from snuba.datasets.deletion_settings import DeletionSettings, get_trace_item_type_name -from snuba.datasets.storage import WritableTableStorage -from snuba.datasets.storages.storage_key import StorageKey -from snuba.lw_deletions.types import AttributeConditions, ConditionsBag, ConditionsType -from snuba.query.conditions import combine_or_conditions -from snuba.query.data_source.simple import Table -from snuba.query.dsl import literal -from snuba.query.exceptions import InvalidQueryException, NoRowsToDeleteException -from snuba.query.expressions import Expression -from snuba.reader import Result -from snuba.state import get_int_config, get_str_config -from snuba.utils.metrics.util import with_span -from snuba.utils.metrics.wrapper import MetricsWrapper -from snuba.utils.schemas import ColumnValidator, InvalidColumnType -from snuba.utils.streams.configuration_builder import build_kafka_producer_configuration -from snuba.utils.streams.topics import Topic -from snuba.web.delete_query import ( - DeletesNotEnabledError, - _construct_condition, - _enforce_max_rows, - _get_attribution_info, - deletes_are_enabled, +from snuba.lw_deletions.bulk_delete_query import ( + FLUSH_PRODUCERS_THREAD_STARTED, + PRODUCER_MAP, + STORAGE_TOPIC, + DeleteQueryMessage, + InvalidStorageTopic, + WireAttributeCondition, + _delete_query_delivery_callback, + _get_kafka_producer, + _serialize_attribute_conditions, + _validate_attribute_conditions, + construct_or_conditions, + construct_query, + delete_from_storage, + delete_from_tables, + flush_producers, + produce_delete_query, + should_use_killswitch, ) -metrics = MetricsWrapper(environment.metrics, "snuba.delete") -logger = logging.getLogger(__name__) - - -class WireAttributeCondition(TypedDict): - attr_key_type: int - attr_key_name: str - attr_values: Sequence[bool | str | int | float] - - -class DeleteQueryMessage(TypedDict, total=False): - rows_to_delete: int - storage_name: str - conditions: ConditionsType - tenant_ids: Mapping[str, str | int] - attribute_conditions: Optional[Dict[str, WireAttributeCondition]] - attribute_conditions_item_type: Optional[int] - - -PRODUCER_MAP: MutableMapping[str, Producer] = {} -STORAGE_TOPIC: Mapping[str, Topic] = { - StorageKey.SEARCH_ISSUES.value: Topic.LW_DELETIONS_GENERIC_EVENTS, - StorageKey.EAP_ITEMS.value: Topic.LW_DELETIONS_EAP_ITEMS, -} - - -class InvalidStorageTopic(Exception): - pass - - -def _get_kafka_producer(topic: Topic) -> Producer: - producer = PRODUCER_MAP.get(topic.value) - if not producer: - producer = Producer( - build_kafka_producer_configuration( - topic=topic, - ) - ) - PRODUCER_MAP[topic.value] = producer - return producer - - -def flush_producers() -> None: - """ - It's not guaranteed that there will be a steady stream of - DELETE requests so we can call producer.flush() for any active - producers to make sure the delivery callbacks are called. - """ - - def _flush_producers() -> None: - while True: - for storage, producer in PRODUCER_MAP.items(): - messages_remaining = producer.flush(5.0) - if messages_remaining: - logger.debug(f"{messages_remaining} {storage} messages pending delivery") - time.sleep(1) - - Thread(target=_flush_producers, name="flush_producers", daemon=True).start() - - -def _delete_query_delivery_callback(error: Optional[KafkaError], message: KafkaMessage) -> None: - metrics.increment( - "delete_query.delivery_callback", - tags={"status": "failure" if error else "success"}, - ) - - if error is not None: - logger.warning("Could not produce delete query due to error: %r", error) - - -FLUSH_PRODUCERS_THREAD_STARTED = False - - -def produce_delete_query(delete_query: DeleteQueryMessage) -> None: - global FLUSH_PRODUCERS_THREAD_STARTED - if not FLUSH_PRODUCERS_THREAD_STARTED: - FLUSH_PRODUCERS_THREAD_STARTED = True - flush_producers() - - storage_name = delete_query["storage_name"] - topic = STORAGE_TOPIC.get(storage_name) - if not topic: - raise InvalidStorageTopic(f"No topic found for {storage_name}") - try: - producer = _get_kafka_producer(topic) - data = rapidjson.dumps(delete_query).encode("utf-8") - producer.poll(0) # trigger queued delivery callbacks - producer.produce( - settings.KAFKA_TOPIC_MAP.get(topic.value, topic.value), - data, - on_delivery=_delete_query_delivery_callback, - ) - except Exception as ex: - logger.exception("Could not produce delete query due to error: %r", ex) - - -def _validate_attribute_conditions( - attribute_conditions: AttributeConditions, - delete_settings: DeletionSettings, -) -> None: - """ - Validates that the attribute_conditions are allowed for the configured item_type. - - Args: - attribute_conditions: AttributeConditions containing item_type and attribute mappings - delete_settings: The deletion settings for the storage - - Raises: - InvalidQueryException: If no attributes are configured for the item_type, - or if any requested attributes are not allowed - """ - allowed_attrs_config = delete_settings.allowed_attributes_by_item_type - - if not allowed_attrs_config: - raise InvalidQueryException("No attribute-based deletions configured for this storage") - - # Map the integer item_type to its string name used in configuration - try: - item_type_name = get_trace_item_type_name(attribute_conditions.item_type) - except ValueError as e: - raise InvalidQueryException(str(e)) - - # Check if this specific item_type has any allowed attributes configured - if item_type_name not in allowed_attrs_config: - raise InvalidQueryException( - f"No attribute-based deletions configured for item_type {item_type_name} " - f"(value: {attribute_conditions.item_type}). Configured item types: " - f"{sorted(allowed_attrs_config.keys())}" - ) - - # Get the allowed attributes for this specific item_type - allowed_attrs = allowed_attrs_config[item_type_name] - - # Validate that all requested attributes are allowed - requested_attrs = set(attribute_conditions.attributes.keys()) - allowed_attrs_set = set(allowed_attrs) - invalid_attrs = requested_attrs - allowed_attrs_set - - if invalid_attrs: - raise InvalidQueryException( - f"Invalid attributes for deletion on item_type '{item_type_name}': {invalid_attrs}. " - f"Allowed attributes: {allowed_attrs_set}" - ) - - -@with_span() -def delete_from_storage( - storage: WritableTableStorage, - column_conditions: Dict[str, list[Any]], - attribution_info: Mapping[str, Any], - attribute_conditions: Optional[AttributeConditions] = None, -) -> dict[str, Result]: - """ - This method does a series of validation checks (outline below), - before `delete_from_tables` produces the delete query messages - to the appropriate topic. - - * runtime flag validation `storage_deletes_enabled` (done by region) - * storage validation that deletes are enabled - * column names are valid (allowed_columns storage setting) - * column types are valid - * attribute names are valid (allowed_attributes_by_item_type storage setting) - """ - if not deletes_are_enabled(): - raise DeletesNotEnabledError("Deletes not enabled in this region") - - delete_settings = storage.get_deletion_settings() - if not delete_settings.is_enabled: - raise DeletesNotEnabledError(f"Deletes not enabled for {storage.get_storage_key().value}") - - columns_diff = set(column_conditions.keys()) - set(delete_settings.allowed_columns) - if columns_diff != set(): - raise InvalidQueryException( - f"Invalid Columns to filter by, must be in {delete_settings.allowed_columns}" - ) - - # validate column types - columns = storage.get_schema().get_columns() - column_validator = ColumnValidator(columns) - try: - for col, values in column_conditions.items(): - column_validator.validate(col, values) - except InvalidColumnType as e: - raise InvalidQueryException(e.message) - - # validate attribute conditions if provided - if attribute_conditions: - _validate_attribute_conditions(attribute_conditions, delete_settings) - - if not get_int_config("permit_delete_by_attribute", default=0): - logger.error( - "valid attribute_conditions passed to delete_from_storage, but delete will be ignored " - "as functionality is not yet launched (permit_delete_by_attribute=0)" - ) - return {} - - attr_info = _get_attribution_info(attribution_info) - return delete_from_tables( - storage, - delete_settings.tables, - ConditionsBag( - column_conditions=column_conditions, - attribute_conditions=attribute_conditions, - ), - attr_info, - ) - - -def construct_query(storage: WritableTableStorage, table: str, condition: Expression) -> Query: - cluster_name = storage.get_cluster().get_clickhouse_cluster_name() - on_cluster = literal(cluster_name) if cluster_name else None - return Query( - from_clause=Table( - table, - ColumnSet([]), - storage_key=storage.get_storage_key(), - allocation_policies=storage.get_delete_allocation_policies(), - ), - condition=condition, - on_cluster=on_cluster, - is_delete=True, - ) - - -def _serialize_attribute_conditions( - attribute_conditions: AttributeConditions, -) -> Dict[str, WireAttributeCondition]: - result: Dict[str, WireAttributeCondition] = {} - for key, (attr_key_enum, values) in attribute_conditions.attributes.items(): - result[key] = { - "attr_key_type": attr_key_enum.type, - "attr_key_name": attr_key_enum.name, - "attr_values": values, - } - return result - - -def delete_from_tables( - storage: WritableTableStorage, - tables: Sequence[str], - conditions: ConditionsBag, - attribution_info: AttributionInfo, -) -> dict[str, Result]: - highest_rows_to_delete = 0 - result: dict[str, Result] = {} - for table in tables: - where_clause = _construct_condition(storage, conditions) - query = construct_query(storage, table, where_clause) - try: - num_rows_to_delete = _enforce_max_rows(query) - highest_rows_to_delete = max(highest_rows_to_delete, num_rows_to_delete) - result[table] = {"data": [{"rows_to_delete": num_rows_to_delete}]} - except NoRowsToDeleteException: - result[table] = {} - - if highest_rows_to_delete == 0: - return result - - storage_name = storage.get_storage_key().value - project_id = attribution_info.tenant_ids.get("project_id") - if project_id and should_use_killswitch(storage_name, str(project_id)): - return result - - delete_query: DeleteQueryMessage = { - "rows_to_delete": highest_rows_to_delete, - "storage_name": storage_name, - "conditions": conditions.column_conditions, - "tenant_ids": attribution_info.tenant_ids, - } - - # Add attribute_conditions to the message if present - if conditions.attribute_conditions: - delete_query["attribute_conditions"] = _serialize_attribute_conditions( - conditions.attribute_conditions - ) - delete_query["attribute_conditions_item_type"] = conditions.attribute_conditions.item_type - - produce_delete_query(delete_query) - return result - - -def construct_or_conditions( - storage: WritableTableStorage, - conditions: Sequence[ConditionsBag], -) -> Expression: - """ - Combines multiple AND conditions: (equals(project_id, 1) AND in(group_id, (2, 3, 4, 5)) - into OR conditions for a bulk delete - """ - return combine_or_conditions([_construct_condition(storage, cond) for cond in conditions]) - - -def should_use_killswitch(storage_name: str, project_id: str) -> bool: - killswitch_config = get_str_config(f"lw_deletes_killswitch_{storage_name}", default="") - return project_id in killswitch_config if killswitch_config else False +__all__ = [ + "DeleteQueryMessage", + "FLUSH_PRODUCERS_THREAD_STARTED", + "InvalidStorageTopic", + "PRODUCER_MAP", + "STORAGE_TOPIC", + "WireAttributeCondition", + "_delete_query_delivery_callback", + "_get_kafka_producer", + "_serialize_attribute_conditions", + "_validate_attribute_conditions", + "construct_or_conditions", + "construct_query", + "delete_from_storage", + "delete_from_tables", + "flush_producers", + "produce_delete_query", + "should_use_killswitch", +] diff --git a/snuba/web/delete_query.py b/snuba/web/delete_query.py index ab3cd584db8..2b9b8a3e354 100644 --- a/snuba/web/delete_query.py +++ b/snuba/web/delete_query.py @@ -1,402 +1,31 @@ -import typing -import uuid -from typing import Any, Mapping, MutableMapping, Optional, Sequence - -from snuba import settings -from snuba.attribution import get_app_id -from snuba.attribution.attribution_info import AttributionInfo -from snuba.clickhouse.columns import ColumnSet -from snuba.clickhouse.errors import ClickhouseError -from snuba.clickhouse.formatter.query import format_query -from snuba.clickhouse.query import Query -from snuba.clickhouse.translators.snuba.mapping import SnubaClickhouseMappingTranslator -from snuba.clusters.cluster import ClickhouseClientSettings, ClickhouseCluster -from snuba.datasets.entities.entity_key import EntityKey -from snuba.datasets.entities.factory import get_entity -from snuba.datasets.storage import WritableTableStorage -from snuba.datasets.storages.factory import get_storage, get_writable_storage -from snuba.datasets.storages.storage_key import StorageKey -from snuba.lw_deletions.types import ConditionsBag, ConditionsType -from snuba.protos.common import attribute_key_to_expression -from snuba.query import SelectedExpression -from snuba.query.allocation_policies import ( - AllocationPolicy, - AllocationPolicyViolations, - QueryResultOrError, -) -from snuba.query.conditions import combine_and_conditions -from snuba.query.data_source.simple import Table -from snuba.query.dsl import column, equals, in_cond, literal, literals_tuple -from snuba.query.exceptions import ( - InvalidQueryException, - NoRowsToDeleteException, - TooManyDeleteRowsException, -) -from snuba.query.expressions import Expression, FunctionCall -from snuba.query.query_settings import HTTPQuerySettings -from snuba.reader import Result -from snuba.state import get_config, get_int_config -from snuba.utils.metrics.util import with_span -from snuba.utils.schemas import ColumnValidator, InvalidColumnType -from snuba.web import QueryException, QueryExtraData, QueryResult -from snuba.web.db_query import _apply_allocation_policies_quota - - -class DeletesNotEnabledError(Exception): - pass - - -class TooManyOngoingMutationsError(Exception): - pass - - -@with_span() -def delete_from_storage( - storage: WritableTableStorage, - columns: ConditionsType, - attribution_info: Mapping[str, Any], -) -> dict[str, Result]: - """ - Inputs: - storage - storage to delete from - columns - a mapping from column-name to a list of column values - that defines the delete conditions. ex: - { - "id": [1, 2, 3] - "status": ["failed"] - } - represents - DELETE FROM ... WHERE id in (1,2,3) AND status='failed' - attribution_info - see other parts of repo - max_ongoing_mutations - the max number of mutations that can be in-progress - on any replica in the clickhouse cluster for the delete to be scheduled. - If any replica in the cluster has more than max_ongoing_mutations in progress, - then the delete will fail and wont be scheduled. - - Deletes all rows in the given storage, that satisfy the conditions - defined in 'columns' input. - - Returns a mapping from clickhouse table name to deletion results, there - will be an entry for every local clickhouse table that makes up the storage. - """ - if not deletes_are_enabled(): - raise DeletesNotEnabledError("Deletes not enabled in this region") - - delete_settings = storage.get_deletion_settings() - if not delete_settings.is_enabled: - raise DeletesNotEnabledError(f"Deletes not enabled for {storage.get_storage_key().value}") - - if delete_settings.bulk_delete_only: - raise DeletesNotEnabledError( - f"Synchronous deletes not enabled for {storage.get_storage_key().value}" - ) - - results: dict[str, Result] = {} - attr_info = _get_attribution_info(attribution_info) - - # fail if too many mutations ongoing - ongoing_mutations = _num_ongoing_mutations(storage.get_cluster(), delete_settings.tables) - max_ongoing_mutations = get_int_config( - "MAX_ONGOING_MUTATIONS_FOR_DELETE", - default=settings.MAX_ONGOING_MUTATIONS_FOR_DELETE, - ) - assert max_ongoing_mutations - if ongoing_mutations > max_ongoing_mutations: - raise TooManyOngoingMutationsError( - f"max ongoing mutations to do a delete is {max_ongoing_mutations}, but at least one replica has {ongoing_mutations} ongoing" - ) - - for table in delete_settings.tables: - result = _delete_from_table(storage, table, columns, attr_info) - results[table] = result - return results - - -def _preprocess_for_items(storage: WritableTableStorage, where_clause: Expression) -> Expression: - if storage.get_storage_key() != StorageKey.EAP_ITEMS: - return where_clause - - entity = get_entity(EntityKey.EAP_ITEMS) - for storage_connection in entity.get_all_storage_connections(): - if storage_connection.storage.get_storage_key() == StorageKey.EAP_ITEMS: - translation_mappers = storage_connection.translation_mappers - translator = SnubaClickhouseMappingTranslator(translation_mappers) - return where_clause.accept(translator) - - return where_clause - - -def _delete_from_table( - storage: WritableTableStorage, - table: str, - conditions: ConditionsType, - attribution_info: AttributionInfo, -) -> Result: - cluster_name = storage.get_cluster().get_clickhouse_cluster_name() - on_cluster = literal(cluster_name) if cluster_name else None - where_clause = _construct_condition(storage, ConditionsBag(column_conditions=conditions)) - query = Query( - from_clause=Table( - table, - ColumnSet([]), - storage_key=storage.get_storage_key(), - allocation_policies=storage.get_delete_allocation_policies(), - ), - condition=where_clause, - on_cluster=on_cluster, - is_delete=True, - ) - - columns = storage.get_schema().get_columns() - column_validator = ColumnValidator(columns) - try: - for col, values in conditions.items(): - column_validator.validate(col, values) - except InvalidColumnType as e: - raise InvalidQueryException(e.message) - - try: - _enforce_max_rows(query) - except NoRowsToDeleteException: - result: Result = {} - return result - - deletion_processors = storage.get_deletion_processors() - # These settings aren't needed at the moment - dummy_query_settings = HTTPQuerySettings() - for deletion_procesor in deletion_processors: - deletion_procesor.process_query(query, dummy_query_settings) - - return _execute_query( - query, storage, table, cluster_name, attribution_info, dummy_query_settings - ) - - -def _num_ongoing_mutations(cluster: ClickhouseCluster, tables: Sequence[str]) -> int: - """ - Given a clickhouse cluster and a list of tables, - returns the maximum of ongoing mutations for the tables, - across all replicas - """ - if cluster.is_single_node(): - query = f""" -SELECT max(cnt) -FROM ( - SELECT table, count() as cnt - FROM system.mutations - WHERE table IN ({", ".join(map(repr, tables))}) AND is_done=0 - GROUP BY table +from snuba.lw_deletions.delete_query import ( + DeletesNotEnabledError, + TooManyOngoingMutationsError, + _construct_condition, + _delete_from_table, + _enforce_max_rows, + _execute_query, + _get_attribution_info, + _get_delete_allocation_policies, + _get_rows_to_delete, + _num_ongoing_mutations, + _preprocess_for_items, + delete_from_storage, + deletes_are_enabled, ) -""" - else: - query = f""" -SELECT max(cnt) -FROM ( - SELECT hostname() as host, table, count() as cnt - FROM clusterAllReplicas('{cluster.get_clickhouse_cluster_name()}', 'system', mutations) - WHERE table IN ({", ".join(map(repr, tables))}) AND is_done=0 - GROUP BY host, table - ) -""" - return int( - cluster.get_query_connection(ClickhouseClientSettings.QUERY).execute(query).results[0][0] - ) - - -def deletes_are_enabled() -> bool: - return bool(get_config("storage_deletes_enabled", 1)) - - -def _get_rows_to_delete(storage_key: StorageKey, select_query_to_count_rows: Query) -> int: - formatted_select_query_to_count_rows = format_query(select_query_to_count_rows) - select_query_results = ( - get_storage(storage_key) - .get_cluster() - .get_reader() - .execute(formatted_select_query_to_count_rows) - ) - return typing.cast(int, select_query_results["data"][0]["count"]) - - -def _enforce_max_rows(delete_query: Query) -> int: - """ - The cost of a lightweight delete operation depends on the number of matching rows in the WHERE clause and the current number of data parts. - This operation will be most efficient when matching a small number of rows, **and on wide parts** (where the `_row_exists` column is stored - in its own file) - - Because of the above, we want to limit the number of rows one deletes at a time. The `MaxRowsEnforcer` will query clickhouse to see how many - rows we plan on deleting and if it crosses the `max_rows_to_delete` set for that storage we will reject the query. - """ - storage_key = delete_query.get_from_clause().storage_key - - def get_new_from_clause() -> Table: - """ - The delete query targets the local table, but when we are checking the - row count we are querying the dist tables (if applicable). This function - updates the from_clause to have the correct table. - """ - dist_table_name = ( - get_writable_storage((storage_key)).get_table_writer().get_schema().get_table_name() - ) - from_clause = delete_query.get_from_clause() - return Table( - table_name=dist_table_name, - schema=from_clause.schema, - storage_key=from_clause.storage_key, - allocation_policies=from_clause.allocation_policies, - ) - - select_query_to_count_rows = Query( - selected_columns=[ - SelectedExpression("count", FunctionCall("count", "count", ())), - ], - from_clause=get_new_from_clause(), - condition=delete_query.get_condition(), - ) - rows_to_delete = _get_rows_to_delete( - storage_key=storage_key, select_query_to_count_rows=select_query_to_count_rows - ) - if rows_to_delete == 0: - raise NoRowsToDeleteException - max_rows_allowed = get_storage(storage_key).get_deletion_settings().max_rows_to_delete - if rows_to_delete > max_rows_allowed: - raise TooManyDeleteRowsException( - f"Too many rows to delete ({rows_to_delete}), maximum allowed is {max_rows_allowed}" - ) - - return rows_to_delete - - -def _get_attribution_info(attribution_info: Mapping[str, Any]) -> AttributionInfo: - info = dict(attribution_info) - info["app_id"] = get_app_id(attribution_info["app_id"]) - info["referrer"] = attribution_info["referrer"] - info["tenant_ids"] = attribution_info["tenant_ids"] - return AttributionInfo(**info) - - -def _get_delete_allocation_policies( - storage: WritableTableStorage, -) -> list[AllocationPolicy]: - """mostly here to be able to stub easily in tests""" - return storage.get_delete_allocation_policies() - - -def _execute_query( - query: Query, - storage: WritableTableStorage, - table: str, - cluster_name: Optional[str], - attribution_info: AttributionInfo, - query_settings: HTTPQuerySettings, -) -> Result: - """ - Formats and executes the delete query, taking into account - the delete allocation policies as well. - """ - - formatted_query = format_query(query) - allocation_policies = _get_delete_allocation_policies(storage) - query_id = uuid.uuid4().hex - query_settings.push_clickhouse_setting("query_id", query_id) - result = None - error = None - - stats: MutableMapping[str, Any] = { - "clickhouse_table": table, - "referrer": attribution_info.referrer, - "cluster_name": cluster_name or "", - } - - try: - _apply_allocation_policies_quota( - query_settings, - attribution_info, - formatted_query, - stats, - allocation_policies, - query_id, - ) - result = ( - storage.get_cluster() - .get_deleter() - .execute(formatted_query, query_settings.get_clickhouse_settings()) - ) - except AllocationPolicyViolations as e: - error = QueryException.from_args( - AllocationPolicyViolations.__name__, - "Query cannot be run due to allocation policies", - extra={ - "stats": stats, - "sql": "no sql run", - "experiments": {}, - }, - ) - error.__cause__ = e - except QueryException as e: - error = e - except Exception as e: - error = QueryException.from_args( - # This exception needs to have the message of the cause in it for sentry - # to pick it up properly - e.__class__.__name__, - e.message if isinstance(e, ClickhouseError) else str(e), - { - "stats": stats, - "sql": "", - "experiments": {}, - }, - ) - error.__cause__ = e - finally: - query_result = ( - QueryResult( - result=result, - extra=QueryExtraData(stats=stats, sql=formatted_query.get_sql(), experiments={}), - ) - if result - else None - ) - result_or_error = QueryResultOrError(query_result=query_result, error=error) - for allocation_policy in allocation_policies: - allocation_policy.update_quota_balance( - tenant_ids=attribution_info.tenant_ids, - query_id=query_id, - result_or_error=result_or_error, - ) - if result: - return result - raise error or Exception("No error or result when running query, this should never happen") - - -def _construct_condition( - storage: WritableTableStorage, conditions_bag: ConditionsBag -) -> Expression: - columns = conditions_bag.column_conditions - attr_conditions = conditions_bag.attribute_conditions - and_conditions = [] - for col, values in columns.items(): - if len(values) == 1: - exp = equals(column(col), literal(values[0])) - else: - literal_values = [literal(v) for v in values] - exp = in_cond(column(col), literals_tuple(alias=None, literals=literal_values)) - - and_conditions.append(exp) - - if attr_conditions: - for attr_key, attr_values in attr_conditions.attributes.values(): - virtual_column = attribute_key_to_expression(attr_key) - - if len(attr_values) == 1: - exp = equals(virtual_column, literal(attr_values[0])) - else: - literal_values = [literal(v) for v in attr_values] - exp = in_cond( - virtual_column, - literals_tuple(alias=None, literals=literal_values), - ) - and_conditions.append(exp) - where_clause = combine_and_conditions(and_conditions) - return _preprocess_for_items(storage, where_clause) +__all__ = [ + "DeletesNotEnabledError", + "TooManyOngoingMutationsError", + "_construct_condition", + "_delete_from_table", + "_enforce_max_rows", + "_execute_query", + "_get_attribution_info", + "_get_delete_allocation_policies", + "_get_rows_to_delete", + "_num_ongoing_mutations", + "_preprocess_for_items", + "delete_from_storage", + "deletes_are_enabled", +] diff --git a/snuba/web/rpc/v1/endpoint_delete_trace_items.py b/snuba/web/rpc/v1/endpoint_delete_trace_items.py index 5f1958fca5f..7cd96afb950 100644 --- a/snuba/web/rpc/v1/endpoint_delete_trace_items.py +++ b/snuba/web/rpc/v1/endpoint_delete_trace_items.py @@ -11,8 +11,8 @@ from snuba.attribution.appid import AppID from snuba.datasets.storages.factory import get_writable_storage from snuba.datasets.storages.storage_key import StorageKey +from snuba.lw_deletions.bulk_delete_query import delete_from_storage from snuba.lw_deletions.types import AttributeConditions -from snuba.web.bulk_delete_query import delete_from_storage from snuba.web.rpc import RPCEndpoint from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException diff --git a/snuba/web/views.py b/snuba/web/views.py index b5b28f6e6af..85ca6cdbc19 100644 --- a/snuba/web/views.py +++ b/snuba/web/views.py @@ -52,6 +52,13 @@ from snuba.datasets.factory import InvalidDatasetError, get_dataset_name from snuba.datasets.schemas.tables import TableSchema from snuba.datasets.storage import StorageNotAvailable, WritableTableStorage +from snuba.lw_deletions.bulk_delete_query import ( + delete_from_storage as bulk_delete_from_storage, +) +from snuba.lw_deletions.delete_query import ( + DeletesNotEnabledError, + TooManyOngoingMutationsError, +) from snuba.query.allocation_policies import AllocationPolicyViolations from snuba.query.exceptions import InvalidQueryException, QueryPlanException from snuba.query.query_settings import HTTPQuerySettings @@ -72,10 +79,8 @@ from snuba.utils.metrics.timer import Timer from snuba.utils.metrics.util import with_span from snuba.web import QueryException, QueryTooLongException -from snuba.web.bulk_delete_query import delete_from_storage as bulk_delete_from_storage from snuba.web.constants import get_http_status_for_clickhouse_error from snuba.web.converters import DatasetConverter, EntityConverter, StorageConverter -from snuba.web.delete_query import DeletesNotEnabledError, TooManyOngoingMutationsError from snuba.web.query import parse_and_run_query from snuba.web.rpc import run_rpc_handler from snuba.writer import BatchWriterEncoderWrapper, WriterTableRow diff --git a/tests/lw_deletions/test_formatters.py b/tests/lw_deletions/test_formatters.py index aae5ae5089a..20e5774b259 100644 --- a/tests/lw_deletions/test_formatters.py +++ b/tests/lw_deletions/test_formatters.py @@ -3,13 +3,16 @@ import pytest from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey +from snuba.lw_deletions.bulk_delete_query import ( + DeleteQueryMessage, + WireAttributeCondition, +) from snuba.lw_deletions.formatters import ( EAPItemsFormatter, Formatter, SearchIssuesFormatter, ) from snuba.lw_deletions.types import ConditionsBag, ConditionsType -from snuba.web.bulk_delete_query import DeleteQueryMessage, WireAttributeCondition def create_delete_query_message( diff --git a/tests/lw_deletions/test_lw_deletions.py b/tests/lw_deletions/test_lw_deletions.py index f87661e4a6d..e107446f8b6 100644 --- a/tests/lw_deletions/test_lw_deletions.py +++ b/tests/lw_deletions/test_lw_deletions.py @@ -13,11 +13,11 @@ from snuba.datasets.storages.factory import get_writable_storage from snuba.datasets.storages.storage_key import StorageKey from snuba.lw_deletions.batching import BatchStepCustom +from snuba.lw_deletions.bulk_delete_query import DeleteQueryMessage from snuba.lw_deletions.formatters import SearchIssuesFormatter from snuba.lw_deletions.strategy import FormatQuery, increment_by from snuba.lw_deletions.types import ConditionsType from snuba.utils.streams.topics import Topic as SnubaTopic -from snuba.web.bulk_delete_query import DeleteQueryMessage ROWS_CONDITIONS = { 5: {"project_id": [1], "group_id": [1, 2, 3, 4]}, diff --git a/tests/test_search_issues_api.py b/tests/test_search_issues_api.py index 804afa5de24..eab13f51426 100644 --- a/tests/test_search_issues_api.py +++ b/tests/test_search_issues_api.py @@ -104,7 +104,7 @@ def delete_query( headers={"referer": "test"}, ) - @patch("snuba.web.bulk_delete_query.produce_delete_query") + @patch("snuba.lw_deletions.bulk_delete_query.produce_delete_query") def test_simple_delete(self, mock_produce_delete: Mock) -> None: set_config("read_through_cache.short_circuit", 1) now = datetime.now().replace(minute=0, second=0, microsecond=0) diff --git a/tests/web/rpc/v1/test_endpoint_delete_trace_items.py b/tests/web/rpc/v1/test_endpoint_delete_trace_items.py index 437775a9870..6ff7070e5ac 100644 --- a/tests/web/rpc/v1/test_endpoint_delete_trace_items.py +++ b/tests/web/rpc/v1/test_endpoint_delete_trace_items.py @@ -119,7 +119,7 @@ def test_valid_trace_id_returns_success_response(self, setup_teardown: Any) -> N assert MessageToDict(response) == MessageToDict(expected_response) - @patch("snuba.web.bulk_delete_query.produce_delete_query") + @patch("snuba.lw_deletions.bulk_delete_query.produce_delete_query") def test_valid_trace_id_produces_bulk_delete_message( self, produce_delete_query_mock: Mock, setup_teardown: Any ) -> None: @@ -183,8 +183,8 @@ def test_filters_with_equals_operation_accepted(self) -> None: ], ) - with patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10): - with patch("snuba.web.bulk_delete_query.produce_delete_query") as mock_produce: + with patch("snuba.lw_deletions.bulk_delete_query._enforce_max_rows", return_value=10): + with patch("snuba.lw_deletions.bulk_delete_query.produce_delete_query") as mock_produce: EndpointDeleteTraceItems().execute(message) # Verify produce_delete_query was called with attribute_conditions @@ -221,8 +221,8 @@ def test_filters_with_in_operation_accepted(self) -> None: ], ) - with patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10): - with patch("snuba.web.bulk_delete_query.produce_delete_query"): + with patch("snuba.lw_deletions.bulk_delete_query._enforce_max_rows", return_value=10): + with patch("snuba.lw_deletions.bulk_delete_query.produce_delete_query"): assert isinstance( EndpointDeleteTraceItems().execute(message), DeleteTraceItemsResponse ) diff --git a/tests/web/test_bulk_delete_query.py b/tests/web/test_bulk_delete_query.py index 0a07b0f75ab..f4d5132c37a 100644 --- a/tests/web/test_bulk_delete_query.py +++ b/tests/web/test_bulk_delete_query.py @@ -13,14 +13,14 @@ from snuba import settings from snuba.datasets.storages.factory import get_writable_storage from snuba.datasets.storages.storage_key import StorageKey +from snuba.lw_deletions.bulk_delete_query import delete_from_storage +from snuba.lw_deletions.delete_query import DeletesNotEnabledError from snuba.lw_deletions.types import AttributeConditions from snuba.query.exceptions import InvalidQueryException from snuba.state import set_config from snuba.utils.manage_topics import create_topics from snuba.utils.streams.configuration_builder import get_default_kafka_configuration from snuba.utils.streams.topics import Topic -from snuba.web.bulk_delete_query import delete_from_storage -from snuba.web.delete_query import DeletesNotEnabledError # TraceItemType values from sentry_protos TRACE_ITEM_TYPE_SPAN = 1 @@ -48,7 +48,7 @@ def get_attribution_info(tenant_ids: Optional[Mapping[str, int | str]] = None) - } -@patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10) +@patch("snuba.lw_deletions.bulk_delete_query._enforce_max_rows", return_value=10) def test_delete_success(mock_enforce_max_row: Mock) -> None: admin_client = AdminClient(get_default_kafka_configuration()) create_topics(admin_client, [Topic.LW_DELETIONS_GENERIC_EVENTS]) @@ -107,8 +107,8 @@ def test_deletes_not_enabled_runtime_config() -> None: @pytest.mark.redis_db -@patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10) -@patch("snuba.web.bulk_delete_query.produce_delete_query") +@patch("snuba.lw_deletions.bulk_delete_query._enforce_max_rows", return_value=10) +@patch("snuba.lw_deletions.bulk_delete_query.produce_delete_query") def test_deletes_killswitch(mock_produce_query: Mock, mock_enforce_rows: Mock) -> None: storage = get_writable_storage(StorageKey("search_issues")) conditions = {"project_id": [1], "group_id": [1, 2, 3, 4]} @@ -177,8 +177,8 @@ def test_attribute_conditions_valid_occurrence() -> None: attr_info = get_attribution_info() # Mock out _enforce_max_rows to avoid needing actual data - with patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10): - with patch("snuba.web.bulk_delete_query.produce_delete_query") as mock_produce: + with patch("snuba.lw_deletions.bulk_delete_query._enforce_max_rows", return_value=10): + with patch("snuba.lw_deletions.bulk_delete_query.produce_delete_query") as mock_produce: # Should not raise an exception, but should return empty dict since # functionality is not yet launched (permit_delete_by_attribute=0 by default) result = delete_from_storage(storage, conditions, attr_info, attribute_conditions) @@ -225,8 +225,8 @@ def test_attribute_conditions_missing_item_type() -> None: # Since item_type is now in AttributeConditions, we need to test a different scenario # The validation now should pass, but we need to ensure item_type is also in conditions - with patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10): - with patch("snuba.web.bulk_delete_query.produce_delete_query"): + with patch("snuba.lw_deletions.bulk_delete_query._enforce_max_rows", return_value=10): + with patch("snuba.lw_deletions.bulk_delete_query.produce_delete_query"): # This should now succeed since we're no longer checking conditions dict delete_from_storage(storage, conditions, attr_info, attribute_conditions) @@ -268,8 +268,8 @@ def test_attribute_conditions_feature_flag_enabled() -> None: try: # Mock out _enforce_max_rows to avoid needing actual data - with patch("snuba.web.bulk_delete_query._enforce_max_rows", return_value=10): - with patch("snuba.web.bulk_delete_query.produce_delete_query") as mock_produce: + with patch("snuba.lw_deletions.bulk_delete_query._enforce_max_rows", return_value=10): + with patch("snuba.lw_deletions.bulk_delete_query.produce_delete_query") as mock_produce: # Should process normally and produce a message result = delete_from_storage(storage, conditions, attr_info, attribute_conditions) diff --git a/tests/web/test_delete_query.py b/tests/web/test_delete_query.py index ba6d10a6a9c..6d164896abd 100644 --- a/tests/web/test_delete_query.py +++ b/tests/web/test_delete_query.py @@ -11,6 +11,7 @@ from snuba.configs.configuration import Configuration, ResourceIdentifier from snuba.datasets.storages.factory import get_writable_storage from snuba.datasets.storages.storage_key import StorageKey +from snuba.lw_deletions.delete_query import _execute_query from snuba.query.allocation_policies import ( MAX_THRESHOLD, NO_SUGGESTION, @@ -24,7 +25,6 @@ from snuba.query.dsl import and_cond, column, equals, literal from snuba.query.query_settings import HTTPQuerySettings from snuba.web import QueryException -from snuba.web.delete_query import _execute_query @pytest.mark.clickhouse_db @@ -113,7 +113,7 @@ def _update_quota_balance( return with mock.patch( - "snuba.web.delete_query._get_delete_allocation_policies", + "snuba.lw_deletions.delete_query._get_delete_allocation_policies", return_value=[ RejectPolicy(ResourceIdentifier(StorageKey("doesntmatter")), ["a", "b", "c"], {}) ], diff --git a/tests/web/test_max_rows_enforcer.py b/tests/web/test_max_rows_enforcer.py index afa590644b5..d33cc505522 100644 --- a/tests/web/test_max_rows_enforcer.py +++ b/tests/web/test_max_rows_enforcer.py @@ -12,11 +12,11 @@ from snuba.datasets.entities.entity_key import EntityKey from snuba.datasets.entities.factory import get_entity from snuba.datasets.storages.storage_key import StorageKey +from snuba.lw_deletions.delete_query import _enforce_max_rows from snuba.query.data_source.simple import Table from snuba.query.dsl import and_cond, column, equals, literal from snuba.query.exceptions import TooManyDeleteRowsException from snuba.state import set_config -from snuba.web.delete_query import _enforce_max_rows from tests.base import BaseApiTest from tests.datasets.configuration.utils import ConfigurationTest from tests.helpers import write_unprocessed_events From 365bbdd14b61c5bf773cba6b2bc700fc9abc8cb6 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 9 Jan 2026 14:59:05 -0800 Subject: [PATCH 2/4] [wip] remove snuba/web delete re-exports --- snuba/web/bulk_delete_query.py | 39 ---------------------------------- snuba/web/delete_query.py | 31 --------------------------- 2 files changed, 70 deletions(-) delete mode 100644 snuba/web/bulk_delete_query.py delete mode 100644 snuba/web/delete_query.py diff --git a/snuba/web/bulk_delete_query.py b/snuba/web/bulk_delete_query.py deleted file mode 100644 index 7beeef342f7..00000000000 --- a/snuba/web/bulk_delete_query.py +++ /dev/null @@ -1,39 +0,0 @@ -from snuba.lw_deletions.bulk_delete_query import ( - FLUSH_PRODUCERS_THREAD_STARTED, - PRODUCER_MAP, - STORAGE_TOPIC, - DeleteQueryMessage, - InvalidStorageTopic, - WireAttributeCondition, - _delete_query_delivery_callback, - _get_kafka_producer, - _serialize_attribute_conditions, - _validate_attribute_conditions, - construct_or_conditions, - construct_query, - delete_from_storage, - delete_from_tables, - flush_producers, - produce_delete_query, - should_use_killswitch, -) - -__all__ = [ - "DeleteQueryMessage", - "FLUSH_PRODUCERS_THREAD_STARTED", - "InvalidStorageTopic", - "PRODUCER_MAP", - "STORAGE_TOPIC", - "WireAttributeCondition", - "_delete_query_delivery_callback", - "_get_kafka_producer", - "_serialize_attribute_conditions", - "_validate_attribute_conditions", - "construct_or_conditions", - "construct_query", - "delete_from_storage", - "delete_from_tables", - "flush_producers", - "produce_delete_query", - "should_use_killswitch", -] diff --git a/snuba/web/delete_query.py b/snuba/web/delete_query.py deleted file mode 100644 index 2b9b8a3e354..00000000000 --- a/snuba/web/delete_query.py +++ /dev/null @@ -1,31 +0,0 @@ -from snuba.lw_deletions.delete_query import ( - DeletesNotEnabledError, - TooManyOngoingMutationsError, - _construct_condition, - _delete_from_table, - _enforce_max_rows, - _execute_query, - _get_attribution_info, - _get_delete_allocation_policies, - _get_rows_to_delete, - _num_ongoing_mutations, - _preprocess_for_items, - delete_from_storage, - deletes_are_enabled, -) - -__all__ = [ - "DeletesNotEnabledError", - "TooManyOngoingMutationsError", - "_construct_condition", - "_delete_from_table", - "_enforce_max_rows", - "_execute_query", - "_get_attribution_info", - "_get_delete_allocation_policies", - "_get_rows_to_delete", - "_num_ongoing_mutations", - "_preprocess_for_items", - "delete_from_storage", - "deletes_are_enabled", -] From d5c8c134dffc8d3539bd77c4973d1caffb6b5959 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 9 Jan 2026 15:35:02 -0800 Subject: [PATCH 3/4] [wip] move delete tests under lw_deletions --- tests/{web => lw_deletions}/test_bulk_delete_query.py | 0 tests/{web => lw_deletions}/test_delete_query.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename tests/{web => lw_deletions}/test_bulk_delete_query.py (100%) rename tests/{web => lw_deletions}/test_delete_query.py (100%) diff --git a/tests/web/test_bulk_delete_query.py b/tests/lw_deletions/test_bulk_delete_query.py similarity index 100% rename from tests/web/test_bulk_delete_query.py rename to tests/lw_deletions/test_bulk_delete_query.py diff --git a/tests/web/test_delete_query.py b/tests/lw_deletions/test_delete_query.py similarity index 100% rename from tests/web/test_delete_query.py rename to tests/lw_deletions/test_delete_query.py From 351c6a111afb3899942518633d57db82d1a286e3 Mon Sep 17 00:00:00 2001 From: Oliver Newland Date: Fri, 9 Jan 2026 16:24:58 -0800 Subject: [PATCH 4/4] also move test_max_rows_enforcer --- tests/{web => lw_deletions}/test_max_rows_enforcer.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{web => lw_deletions}/test_max_rows_enforcer.py (100%) diff --git a/tests/web/test_max_rows_enforcer.py b/tests/lw_deletions/test_max_rows_enforcer.py similarity index 100% rename from tests/web/test_max_rows_enforcer.py rename to tests/lw_deletions/test_max_rows_enforcer.py